From d05584a09f50cffefe082445d62236f4c4498787 Mon Sep 17 00:00:00 2001 From: Logan Williams Date: Thu, 28 Jul 2022 08:42:59 +0000 Subject: [PATCH] Minor bug fixes; helper tool for Telethon sessions --- cisticola/base.py | 11 +++++++-- cisticola/scraper/telegram_telethon.py | 6 +++-- cisticola/transformer/base.py | 33 ++++++++++++++++---------- telethon_session_init.py | 22 +++++++++++++++++ 4 files changed, 55 insertions(+), 17 deletions(-) create mode 100644 telethon_session_init.py diff --git a/cisticola/base.py b/cisticola/base.py index 35645d6..d1f80b8 100644 --- a/cisticola/base.py +++ b/cisticola/base.py @@ -18,6 +18,9 @@ import spacy from .utils import make_request +# Disable decompression bomb check +PIL.Image.MAX_IMAGE_PIXELS = 1024 * 1024 * 256 + @dataclass class ScraperResult: """A minimally processed result from a scraper @@ -275,6 +278,10 @@ class Post: except LangDetectException: self.detected_language = "" + # Dutch (NL) is often misdetected as Afrikaans (af) + if self.detected_language == "af": + self.detected_language = "nl" + self.hydrate_spacy() def hydrate_spacy(self): @@ -493,7 +500,7 @@ post_table = Table('posts', mapper_registry.metadata, Column('author_username', String), Column('content', String), Column('forwarded_from', Integer, ForeignKey('channels.id'), index=True), - Column('reply_to', Integer, ForeignKey('posts.id', ondelete="CASCADE"), index=True), + Column('reply_to', Integer, ForeignKey('posts.id'), index=True), Column('named_entities', JSON), Column('cryptocurrency_addresses', JSON), Column('hashtags', JSON), @@ -513,7 +520,7 @@ media_table = Table('media', mapper_registry.metadata, autoincrement=True), Column('type', String), Column('raw_id', Integer, ForeignKey('raw_posts.id'), index=True), - Column('post', Integer, ForeignKey('posts.id')), + Column('post', Integer, ForeignKey('posts.id'), index=True), Column('url', String), Column('original_url', String), Column('exif', String), diff --git a/cisticola/scraper/telegram_telethon.py b/cisticola/scraper/telegram_telethon.py index f42d1c5..9845a6d 100644 --- a/cisticola/scraper/telegram_telethon.py +++ b/cisticola/scraper/telegram_telethon.py @@ -32,7 +32,7 @@ class TelegramTelethonScraper(Scraper): telethon_session_name = phone # set up a persistent client for Telethon - self.client = TelegramClient(telethon_session_name, api_id, api_hash) + self.client = TelegramClient(telethon_session_name, api_id, api_hash) self.client.connect() def __del__(self): @@ -145,9 +145,11 @@ class TelegramTelethonScraper(Scraper): break archived_urls = {} + media_archived = datetime(1970, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc) if post.media is not None: archived_urls[post_url] = None + media_archived = None # if archive_media: # blob, output_file_with_ext = self.archive_post_media(post, client) @@ -165,7 +167,7 @@ class TelegramTelethonScraper(Scraper): date_archived=datetime.now(timezone.utc), raw_data=json.dumps(post.to_dict(), default=str), archived_urls=archived_urls, - media_archived=datetime.now(timezone.utc) if archive_media else None) + media_archived=media_archived) @logger.catch def get_profile(self, channel: Channel) -> RawChannelInfo: diff --git a/cisticola/transformer/base.py b/cisticola/transformer/base.py index d42f24b..c9b777a 100644 --- a/cisticola/transformer/base.py +++ b/cisticola/transformer/base.py @@ -349,23 +349,30 @@ class ETLController: session = self.session() - BATCH_SIZE = 5000 - offset = 0 - batch = [] + BATCH_SIZE = 50000 - query = (session.query(ScraperResult, Post) + logger.info(f"Fetching first untransformed post media batch of {BATCH_SIZE}") + + batch = (session.query(ScraperResult, Post) .join(Post) .join(Media, isouter=True) .filter((ScraperResult.media_archived != None) & (cast(ScraperResult.archived_urls, String) != '{}') & (Media.id == None)) - .order_by(ScraperResult.date.asc()) - ) + .order_by(ScraperResult.date.desc()) + .limit(BATCH_SIZE) + ).all() - while len(batch) > 0 or offset == 0: - logger.info(f"Fetching untransformed post media batch of {BATCH_SIZE}, offset {offset}") - - batch = query.slice(offset, offset + BATCH_SIZE).all() - offset += BATCH_SIZE - - logger.info(f"Found {len(batch)} items to ETL ({offset} already processed)") + while len(batch) > 0: + logger.info(f"Found {len(batch)} items to ETL") self.transform_media(batch, hydrate=hydrate) + + logger.info(f"Fetching untransformed post media batch of {BATCH_SIZE}, offset {min(batch, key=lambda v: v.ScraperResult.date).ScraperResult.date}") + + batch = (session.query(ScraperResult, Post) + .join(Post) + .join(Media, isouter=True) + .where(ScraperResult.date <= min(batch, key=lambda v: v.ScraperResult.date).ScraperResult.date) + .filter((ScraperResult.media_archived != None) & (cast(ScraperResult.archived_urls, String) != '{}') & (Media.id == None)) + .order_by(ScraperResult.date.desc()) + .limit(BATCH_SIZE) + ).all() diff --git a/telethon_session_init.py b/telethon_session_init.py new file mode 100644 index 0000000..c9ba713 --- /dev/null +++ b/telethon_session_init.py @@ -0,0 +1,22 @@ +import argparse +from telethon.sync import TelegramClient +import os + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Cisticola command line tools") + parser.add_argument("--telethon_session", type=str) + + args = parser.parse_args() + + api_id = os.environ['TELEGRAM_API_ID'] + api_hash = os.environ['TELEGRAM_API_HASH'] + phone = os.environ['TELEGRAM_PHONE'] + telethon_session_name = args.telethon_session + + if telethon_session_name is None: + telethon_session_name = phone + + client = TelegramClient(telethon_session_name, api_id, api_hash) + client.start() + + client.disconnect() \ No newline at end of file