diff --git a/Pipfile b/Pipfile index 5e4e56b..8e5a39d 100644 --- a/Pipfile +++ b/Pipfile @@ -22,6 +22,8 @@ psycopg2 = "*" tqdm = "*" ratelimit = "*" pytz = "*" +langdetect = "*" +spacy = "==3.2.4" [dev-packages] pytest = "*" diff --git a/Pipfile.lock b/Pipfile.lock index e373452..51faba4 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "bd884a30c799fc7b881926bd6a894fb36cc6710f1221c84c0e6be34b8836fa7d" + "sha256": "2fbcb9a9c7df5e0c994f4be5e88b9ac88aed1ddd4383760b2d6cf964738cf993" }, "pipfile-spec": 6, "requires": { diff --git a/app.py b/app.py index 3a7bc29..5175a25 100644 --- a/app.py +++ b/app.py @@ -16,6 +16,7 @@ from cisticola.scraper import ( BitchuteScraper, RumbleScraper, ) +from cisticola.transformer import (ETLController, TelegramTelethonTransformer) def sync_channels(args): @@ -122,6 +123,18 @@ def get_scraper_controller(): return controller +def get_transformer_controller(): + engine = create_engine(os.environ["DB"]) + + controller = ETLController() + controller.connect_to_db(engine) + + transformers = [TelegramTelethonTransformer()] + + controller.register_transformers(transformers) + + return controller + def scrape_channels(args): logger.info(f"Scraping channels, media: {args.media}") @@ -143,6 +156,12 @@ def archive_media(args): controller = get_scraper_controller() controller.archive_unarchived_media() +def transform(args): + logger.info(f"Transforming untransformed media") + + controller = get_transformer_controller() + controller.transform_all_untransformed() + def init_db(): engine = create_engine(os.environ["DB"]) @@ -179,5 +198,7 @@ if __name__ == "__main__": archive_media(args) elif args.command == "channel-info": scrape_channel_info(args) + elif args.command == "transform": + transform(args) else: logger.error(f"Unrecognized command {args.command}") diff --git a/cisticola/base.py b/cisticola/base.py index bd70c12..66500ea 100644 --- a/cisticola/base.py +++ b/cisticola/base.py @@ -1,5 +1,5 @@ from typing import List -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime import tempfile import json @@ -10,6 +10,11 @@ from sqlalchemy import Table, Column, Integer, String, JSON, DateTime, ForeignKe import pytesseract import PIL import exiftool +import re +from langdetect import detect, DetectorFactory +from langdetect.lang_detect_exception import LangDetectException +from loguru import logger +import spacy from .utils import make_request @@ -109,6 +114,14 @@ class RawChannelInfo: #: Datetime (relative to UTC) that the scraped post was archived at. date_archived: datetime +nlp_en = spacy.load('en_core_web_sm', disable=['parser', 'tok2vec', 'attribute_ruler']) +nlp_de = spacy.load('de_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler']) +nlp_it = spacy.load('it_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler']) +nlp_fr = spacy.load('fr_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler']) +nlp_ru = spacy.load('ru_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler']) +nlp_nl = spacy.load('nl_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler']) +nlp_xx = spacy.load('xx_ent_wiki_sm') + @dataclass class Post: """An object with fields for columns in the analysis table""" @@ -149,6 +162,24 @@ class Post: #: Text of the original post content: str + #: Named entities detected in post + named_entities: list = field(default_factory=list) + + #: Any cryptocurrency addresses in post + cryptocurrency_addresses: list = field(default_factory=list) + + #: Hashtags in post + hashtags: list = field(default_factory=list) + + #: Links to any other websites + outlinks: list = field(default_factory=list) + + #: Detected language of post + detected_language: str = "" + + #: Normalized post content + normalized_content: str = "" + #: The ID of the Channel that the post was forwarded or quoted from forwarded_from: int = None @@ -156,7 +187,59 @@ class Post: reply_to: int = None def hydrate(self): - pass + URL_REGEX = r"""(?i)\b((?:https?:(?:/{1,3}|[a-z0-9%])|[a-z0-9.\-]+[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)/)(?:[^\s()<>{}\[\]]+|\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\))+(?:\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\)|[^\s`!()\[\]{};:\'\".,<>?«»“”‘’])|(?:(? Generator[ScraperResult, None, None]: - scr = snscrape.modules.telegram.TelegramChannelScraper( - channel.screenname) - - g = scr.get_items() - - for post in g: - if since is not None and post.date.replace(tzinfo=timezone.utc) <= since.date.replace(tzinfo=timezone.utc): - logger.info(f'Timestamp of post {post} is earlier than the previous archived timestamp {post.date.replace(tzinfo=timezone.utc)}') - break - - logger.info(f'Processing post {post}') - - archived_urls = {} - - for image_url in post.images: - archived_urls[image_url] = None - - for video_url in post.videos: - archived_urls[video_url] = None - - if archive_media: - for url in archived_urls: - media_blob, content_type, key = self.url_to_blob(url) - archived_url = self.archive_blob(media_blob, content_type, key) - archived_urls[url] = archived_url - - yield ScraperResult( - scraper=self.__version__, - platform="Telegram", - channel=channel.id, - platform_id=post.url, - date=post.date, - date_archived=datetime.now(timezone.utc), - raw_data=post.json(), - archived_urls=archived_urls, - media_archived=datetime.now(timezone.utc) if archive_media else None - ) - - @logger.catch - def get_profile(self, channel: Channel) -> RawChannelInfo: - - scr = snscrape.modules.telegram.TelegramChannelScraper( - channel.screenname) - - profile = scr._get_entity().__dict__ - - return RawChannelInfo(scraper=self.__version__, - platform=channel.platform, - channel=channel.id, - raw_data=json.dumps(profile), - date_archived=datetime.now(timezone.utc)) diff --git a/cisticola/transformer/__init__.py b/cisticola/transformer/__init__.py index 7812b52..b4e0968 100644 --- a/cisticola/transformer/__init__.py +++ b/cisticola/transformer/__init__.py @@ -1,3 +1,4 @@ from .base import ETLController from .twitter import TwitterTransformer -from .bitchute import BitchuteTransformer \ No newline at end of file +from .bitchute import BitchuteTransformer +from .telegram_telethon import TelegramTelethonTransformer \ No newline at end of file diff --git a/cisticola/transformer/base.py b/cisticola/transformer/base.py index 38da38a..3ccd787 100644 --- a/cisticola/transformer/base.py +++ b/cisticola/transformer/base.py @@ -68,6 +68,10 @@ class ETLController: self.transformers.append(transformer) + def register_transformers(self, transformers): + for t in transformers: + self.register_transformer(t) + def connect_to_db(self, engine: Engine): """Connects the ETLController to a SQLAlchemy engine. @@ -93,7 +97,8 @@ class ETLController: instance = session.query(Channel).filter_by(url=obj.url, platform_id=obj.platform_id, platform=obj.platform).first() elif type(obj) == Post: - instance = session.query(Post).filter_by(platform=obj.platform, platform_id=obj.platform_id).first() + instance = None + # instance = session.query(Post).filter_by(platform=obj.platform, platform_id=obj.platform_id).first() elif issubclass(type(obj), Media): instance = session.query(type(obj)).filter_by(original_url=obj.original_url, post=obj.post).first() @@ -151,11 +156,11 @@ class ETLController: handled = False if transformer.can_handle(result): - logger.info(f"{transformer} is handling result {result}") + logger.trace(f"{transformer} is handling result {result}") handled = True session = self.session() - transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate)) + transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate), session) session.commit() break @@ -182,6 +187,7 @@ class ETLController: session.query(ScraperResult) .join(Post, isouter=True) .where(Post.raw_id == None) + .order_by(ScraperResult.date.asc()) .all() ) logger.info(f"Found {len(untransformed)} items to ETL") diff --git a/cisticola/transformer/telegram_telethon.py b/cisticola/transformer/telegram_telethon.py new file mode 100644 index 0000000..4daa9b5 --- /dev/null +++ b/cisticola/transformer/telegram_telethon.py @@ -0,0 +1,136 @@ +import json +from loguru import logger +from typing import Generator, Union, Callable +import dateutil.parser +from bs4 import BeautifulSoup +import requests +import time + +from cisticola.transformer.base import Transformer +from cisticola.base import ScraperResult, Post, Image, Video, Media, Channel + + +class TelegramTelethonTransformer(Transformer): + __version__ = 'TelegramTelethonTransformer 0.0.1' + + bad_channels = {} + + def can_handle(self, data: ScraperResult) -> bool: + scraper = data.scraper.split(' ') + if scraper[0] == "TelegramTelethonScraper": + return True + + return False + + def get_screenname_from_id(self, orig_screenname, id): + if orig_screenname in self.bad_channels: + logger.debug(f"Skipping screenname because it is not accessible for channel {orig_screenname}") + return ("", "") + + url = "https://t.me/s/" + orig_screenname + "/" + str(id) + + logger.info(f"Finding channel from URL {url}") + r = requests.get(url) + + if r.url != url: + self.bad_channels[orig_screenname] = True + return ("", "") + + soup = BeautifulSoup(r.content) + post = soup.findAll("div", {"data-post" : orig_screenname + "/" + str(id)}) + if len(post) == 0: + logger.warning(f"Could not find post from {url}") + screenname = "" + name = "" + else: + fwd_tag = post[0].findAll("a", {"class", "tgme_widget_message_forwarded_from_name"}) + + if len(fwd_tag) > 0: + fwd_tag = fwd_tag[0] + name = fwd_tag.text + screenname = fwd_tag['href'].split('/')[-2] + else: + fwd_tag = post[0].findAll("span", {"class", "tgme_widget_message_forwarded_from_name"}) + name = fwd_tag[0].text + screenname = "" + + return (screenname, name) + + def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: + raw = json.loads(data.raw_data) + + if raw['_'] != 'Message': + logger.warning(f"Cannot convert type {raw['_']} to post") + return + + fwd_from = None + + if raw['fwd_from'] and raw['fwd_from']['from_id'] and 'channel_id' in raw['fwd_from']['from_id']: + channel = session.query(Channel).filter_by(platform_id=raw['fwd_from']['from_id']['channel_id']).first() + + if channel is None: + orig_channel = session.query(Channel).filter_by(id=data.channel).first() + (screenname, name) = self.get_screenname_from_id(orig_channel.screenname, raw['id']) + + channel = Channel( + name=name, + platform_id=raw['fwd_from']['from_id']['channel_id'], + platform=data.platform, + url="https://t.me/s/" + screenname, + screenname=screenname, + category='forwarded', + source=self.__version__ + ) + + channel = insert(channel) + elif channel.screenname == "": + # if the screenname is empty, we can fill it in + orig_channel = session.query(Channel).filter_by(id=data.channel).first() + (screenname, name) = self.get_screenname_from_id(orig_channel.screenname, raw['id']) + + channel.screenname = screenname + channel.name = name + channel.url = "https://t.me/s/" + screenname + session.flush() + + fwd_from = channel.id + + reply_to = None + if raw['reply_to']: + reply_to_id = raw['reply_to']['reply_to_msg_id'] + post = session.query(Post).filter_by(channel=data.channel, platform_id=reply_to_id).first() + if post is None: + reply_to = -1 + else: + reply_to = post.id + + transformed = Post( + raw_id = data.id, + platform_id = raw['id'], + scraper = data.scraper, + transformer=self.__version__, + platform=data.platform, + channel=data.channel, + date=dateutil.parser.parse(raw['date']), + date_archived=data.date_archived, + url="", + content=raw['message'], + author_id=raw['post_author'], + author_username="", + forwarded_from=fwd_from, + reply_to=reply_to + ) + + transformed = insert(transformed) + + for k in data.archived_urls: + if data.archived_urls[k]: + archived_url = data.archived_urls[k] + ext = archived_url.split('.')[-1] + + if ext == 'mp4' or ext == 'mov' or ext == 'avi' or ext =='mkv': + insert(Video(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k)) + else: + insert(Image(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k)) + + \ No newline at end of file