Transformer for Telegram, base transformer NLP hydration; no media

This commit is contained in:
Logan Williams
2022-04-14 11:45:09 +02:00
parent 59bab0d812
commit 4c221d1133
9 changed files with 263 additions and 79 deletions

View File

@@ -22,6 +22,8 @@ psycopg2 = "*"
tqdm = "*"
ratelimit = "*"
pytz = "*"
langdetect = "*"
spacy = "==3.2.4"
[dev-packages]
pytest = "*"

2
Pipfile.lock generated
View File

@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "bd884a30c799fc7b881926bd6a894fb36cc6710f1221c84c0e6be34b8836fa7d"
"sha256": "2fbcb9a9c7df5e0c994f4be5e88b9ac88aed1ddd4383760b2d6cf964738cf993"
},
"pipfile-spec": 6,
"requires": {

21
app.py
View File

@@ -16,6 +16,7 @@ from cisticola.scraper import (
BitchuteScraper,
RumbleScraper,
)
from cisticola.transformer import (ETLController, TelegramTelethonTransformer)
def sync_channels(args):
@@ -122,6 +123,18 @@ def get_scraper_controller():
return controller
def get_transformer_controller():
engine = create_engine(os.environ["DB"])
controller = ETLController()
controller.connect_to_db(engine)
transformers = [TelegramTelethonTransformer()]
controller.register_transformers(transformers)
return controller
def scrape_channels(args):
logger.info(f"Scraping channels, media: {args.media}")
@@ -143,6 +156,12 @@ def archive_media(args):
controller = get_scraper_controller()
controller.archive_unarchived_media()
def transform(args):
logger.info(f"Transforming untransformed media")
controller = get_transformer_controller()
controller.transform_all_untransformed()
def init_db():
engine = create_engine(os.environ["DB"])
@@ -179,5 +198,7 @@ if __name__ == "__main__":
archive_media(args)
elif args.command == "channel-info":
scrape_channel_info(args)
elif args.command == "transform":
transform(args)
else:
logger.error(f"Unrecognized command {args.command}")

View File

@@ -1,5 +1,5 @@
from typing import List
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime
import tempfile
import json
@@ -10,6 +10,11 @@ from sqlalchemy import Table, Column, Integer, String, JSON, DateTime, ForeignKe
import pytesseract
import PIL
import exiftool
import re
from langdetect import detect, DetectorFactory
from langdetect.lang_detect_exception import LangDetectException
from loguru import logger
import spacy
from .utils import make_request
@@ -109,6 +114,14 @@ class RawChannelInfo:
#: Datetime (relative to UTC) that the scraped post was archived at.
date_archived: datetime
nlp_en = spacy.load('en_core_web_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
nlp_de = spacy.load('de_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
nlp_it = spacy.load('it_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
nlp_fr = spacy.load('fr_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
nlp_ru = spacy.load('ru_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
nlp_nl = spacy.load('nl_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
nlp_xx = spacy.load('xx_ent_wiki_sm')
@dataclass
class Post:
"""An object with fields for columns in the analysis table"""
@@ -149,6 +162,24 @@ class Post:
#: Text of the original post
content: str
#: Named entities detected in post
named_entities: list = field(default_factory=list)
#: Any cryptocurrency addresses in post
cryptocurrency_addresses: list = field(default_factory=list)
#: Hashtags in post
hashtags: list = field(default_factory=list)
#: Links to any other websites
outlinks: list = field(default_factory=list)
#: Detected language of post
detected_language: str = ""
#: Normalized post content
normalized_content: str = ""
#: The ID of the Channel that the post was forwarded or quoted from
forwarded_from: int = None
@@ -156,7 +187,59 @@ class Post:
reply_to: int = None
def hydrate(self):
pass
URL_REGEX = r"""(?i)\b((?:https?:(?:/{1,3}|[a-z0-9%])|[a-z0-9.\-]+[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)/)(?:[^\s()<>{}\[\]]+|\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\))+(?:\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\)|[^\s`!()\[\]{};:\'\".,<>?«»“”‘’])|(?:(?<!@)[a-z0-9]+(?:[.\-][a-z0-9]+)*[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)\b/?(?!@)))"""
urls = re.findall(URL_REGEX, self.content)
self.outlinks = urls
HASHTAG_REGEX = r"(?:^|\s)[#]{1}(\w+)"
hashtags = re.findall(HASHTAG_REGEX, self.content)
self.hashtags = hashtags
# regex patterns for finding crypto addresses
BTC_REGEX = r'\b(bc(0([ac-hj-np-z02-9]{39}|[ac-hj-np-z02-9]{59})|1[ac-hj-np-z02-9]{8,87})|[13][a-km-zA-HJ-NP-Z1-9]{25,35})\b'
ETHER_REGEX = r'(0x[a-fA-F0-9]{40})'
self.cryptocurrency_addresses = [m[0] for m in re.findall(BTC_REGEX, self.content)] + re.findall(ETHER_REGEX, self.content)
try:
self.detected_language = detect(self.content)
except LangDetectException:
self.detected_language = ""
self.hydrate_spacy()
def hydrate_spacy(self):
ner_only = False
if self.detected_language == 'en':
nlp = nlp_en
elif self.detected_language == 'de':
nlp = nlp_de
elif self.detected_language == 'it':
nlp = nlp_it
elif self.detected_language == 'fr':
nlp = nlp_fr
elif self.detected_language == 'ru':
nlp = nlp_ru
elif self.detected_language == 'nl':
nlp = nlp_nl
else:
logger.info(f"No language model for {self.detected_language}")
nlp = nlp_xx
ner_only = True
doc = nlp(self.content)
if not ner_only:
punctuation = ['?',':','!',',','.',';','|','(',')','--','#','=','+']
tokens = [t.lemma_ for t in doc if not t.is_stop and t.lemma_ not in punctuation]
self.normalized_content = ' '.join(tokens)
else:
self.normalized_content = ''
self.named_entities = [{'text': ent.text, 'type': ent.label_} for ent in doc.ents]
@dataclass
@@ -294,7 +377,13 @@ post_table = Table('posts', mapper_registry.metadata,
Column('author_username', String),
Column('content', String),
Column('forwarded_from', Integer, ForeignKey('channels.id'), index=True),
Column('reply_to', Integer, ForeignKey('posts.id'), index=True)
Column('reply_to', Integer, ForeignKey('posts.id'), index=True),
Column('named_entities', JSON),
Column('cryptocurrency_addresses', JSON),
Column('hashtags', JSON),
Column('outlinks', JSON),
Column('detected_language', String),
Column('normalized_content', String)
)
media_table = Table('media', mapper_registry.metadata,

View File

@@ -6,7 +6,6 @@ from .gettr import GettrScraper
from .instagram import InstagramScraper
from .odysee import OdyseeScraper
from .rumble import RumbleScraper
from .telegram_snscrape import TelegramSnscrapeScraper
from .telegram_telethon import TelegramTelethonScraper
from .twitter import TwitterScraper
from .vkontakte import VkontakteScraper

View File

@@ -1,70 +0,0 @@
from typing import Generator
from datetime import datetime, timezone
import json
import snscrape.modules
from loguru import logger
from cisticola.base import Channel, ScraperResult, RawChannelInfo
from cisticola.scraper.base import Scraper
class TelegramSnscrapeScraper(Scraper):
"""An implementation of a Scraper for Telegram, using snscrape library"""
__version__ = "TelegramSnscrapeScraper 0.0.0"
def can_handle(self, channel):
if channel.platform == "Telegram" and channel.public and not channel.chat:
return True
@logger.catch
def get_posts(self, channel: Channel, since: ScraperResult = None, archive_media: bool = True) -> Generator[ScraperResult, None, None]:
scr = snscrape.modules.telegram.TelegramChannelScraper(
channel.screenname)
g = scr.get_items()
for post in g:
if since is not None and post.date.replace(tzinfo=timezone.utc) <= since.date.replace(tzinfo=timezone.utc):
logger.info(f'Timestamp of post {post} is earlier than the previous archived timestamp {post.date.replace(tzinfo=timezone.utc)}')
break
logger.info(f'Processing post {post}')
archived_urls = {}
for image_url in post.images:
archived_urls[image_url] = None
for video_url in post.videos:
archived_urls[video_url] = None
if archive_media:
for url in archived_urls:
media_blob, content_type, key = self.url_to_blob(url)
archived_url = self.archive_blob(media_blob, content_type, key)
archived_urls[url] = archived_url
yield ScraperResult(
scraper=self.__version__,
platform="Telegram",
channel=channel.id,
platform_id=post.url,
date=post.date,
date_archived=datetime.now(timezone.utc),
raw_data=post.json(),
archived_urls=archived_urls,
media_archived=datetime.now(timezone.utc) if archive_media else None
)
@logger.catch
def get_profile(self, channel: Channel) -> RawChannelInfo:
scr = snscrape.modules.telegram.TelegramChannelScraper(
channel.screenname)
profile = scr._get_entity().__dict__
return RawChannelInfo(scraper=self.__version__,
platform=channel.platform,
channel=channel.id,
raw_data=json.dumps(profile),
date_archived=datetime.now(timezone.utc))

View File

@@ -1,3 +1,4 @@
from .base import ETLController
from .twitter import TwitterTransformer
from .bitchute import BitchuteTransformer
from .bitchute import BitchuteTransformer
from .telegram_telethon import TelegramTelethonTransformer

View File

@@ -68,6 +68,10 @@ class ETLController:
self.transformers.append(transformer)
def register_transformers(self, transformers):
for t in transformers:
self.register_transformer(t)
def connect_to_db(self, engine: Engine):
"""Connects the ETLController to a SQLAlchemy engine.
@@ -93,7 +97,8 @@ class ETLController:
instance = session.query(Channel).filter_by(url=obj.url, platform_id=obj.platform_id, platform=obj.platform).first()
elif type(obj) == Post:
instance = session.query(Post).filter_by(platform=obj.platform, platform_id=obj.platform_id).first()
instance = None
# instance = session.query(Post).filter_by(platform=obj.platform, platform_id=obj.platform_id).first()
elif issubclass(type(obj), Media):
instance = session.query(type(obj)).filter_by(original_url=obj.original_url, post=obj.post).first()
@@ -151,11 +156,11 @@ class ETLController:
handled = False
if transformer.can_handle(result):
logger.info(f"{transformer} is handling result {result}")
logger.trace(f"{transformer} is handling result {result}")
handled = True
session = self.session()
transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate))
transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate), session)
session.commit()
break
@@ -182,6 +187,7 @@ class ETLController:
session.query(ScraperResult)
.join(Post, isouter=True)
.where(Post.raw_id == None)
.order_by(ScraperResult.date.asc())
.all()
)
logger.info(f"Found {len(untransformed)} items to ETL")

View File

@@ -0,0 +1,136 @@
import json
from loguru import logger
from typing import Generator, Union, Callable
import dateutil.parser
from bs4 import BeautifulSoup
import requests
import time
from cisticola.transformer.base import Transformer
from cisticola.base import ScraperResult, Post, Image, Video, Media, Channel
class TelegramTelethonTransformer(Transformer):
__version__ = 'TelegramTelethonTransformer 0.0.1'
bad_channels = {}
def can_handle(self, data: ScraperResult) -> bool:
scraper = data.scraper.split(' ')
if scraper[0] == "TelegramTelethonScraper":
return True
return False
def get_screenname_from_id(self, orig_screenname, id):
if orig_screenname in self.bad_channels:
logger.debug(f"Skipping screenname because it is not accessible for channel {orig_screenname}")
return ("", "")
url = "https://t.me/s/" + orig_screenname + "/" + str(id)
logger.info(f"Finding channel from URL {url}")
r = requests.get(url)
if r.url != url:
self.bad_channels[orig_screenname] = True
return ("", "")
soup = BeautifulSoup(r.content)
post = soup.findAll("div", {"data-post" : orig_screenname + "/" + str(id)})
if len(post) == 0:
logger.warning(f"Could not find post from {url}")
screenname = ""
name = ""
else:
fwd_tag = post[0].findAll("a", {"class", "tgme_widget_message_forwarded_from_name"})
if len(fwd_tag) > 0:
fwd_tag = fwd_tag[0]
name = fwd_tag.text
screenname = fwd_tag['href'].split('/')[-2]
else:
fwd_tag = post[0].findAll("span", {"class", "tgme_widget_message_forwarded_from_name"})
name = fwd_tag[0].text
screenname = ""
return (screenname, name)
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
if raw['_'] != 'Message':
logger.warning(f"Cannot convert type {raw['_']} to post")
return
fwd_from = None
if raw['fwd_from'] and raw['fwd_from']['from_id'] and 'channel_id' in raw['fwd_from']['from_id']:
channel = session.query(Channel).filter_by(platform_id=raw['fwd_from']['from_id']['channel_id']).first()
if channel is None:
orig_channel = session.query(Channel).filter_by(id=data.channel).first()
(screenname, name) = self.get_screenname_from_id(orig_channel.screenname, raw['id'])
channel = Channel(
name=name,
platform_id=raw['fwd_from']['from_id']['channel_id'],
platform=data.platform,
url="https://t.me/s/" + screenname,
screenname=screenname,
category='forwarded',
source=self.__version__
)
channel = insert(channel)
elif channel.screenname == "":
# if the screenname is empty, we can fill it in
orig_channel = session.query(Channel).filter_by(id=data.channel).first()
(screenname, name) = self.get_screenname_from_id(orig_channel.screenname, raw['id'])
channel.screenname = screenname
channel.name = name
channel.url = "https://t.me/s/" + screenname
session.flush()
fwd_from = channel.id
reply_to = None
if raw['reply_to']:
reply_to_id = raw['reply_to']['reply_to_msg_id']
post = session.query(Post).filter_by(channel=data.channel, platform_id=reply_to_id).first()
if post is None:
reply_to = -1
else:
reply_to = post.id
transformed = Post(
raw_id = data.id,
platform_id = raw['id'],
scraper = data.scraper,
transformer=self.__version__,
platform=data.platform,
channel=data.channel,
date=dateutil.parser.parse(raw['date']),
date_archived=data.date_archived,
url="",
content=raw['message'],
author_id=raw['post_author'],
author_username="",
forwarded_from=fwd_from,
reply_to=reply_to
)
transformed = insert(transformed)
for k in data.archived_urls:
if data.archived_urls[k]:
archived_url = data.archived_urls[k]
ext = archived_url.split('.')[-1]
if ext == 'mp4' or ext == 'mov' or ext == 'avi' or ext =='mkv':
insert(Video(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))
else:
insert(Image(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))