From ee24367caa98041cbf882f932a46e04af650f773 Mon Sep 17 00:00:00 2001 From: Logan Williams Date: Wed, 20 Jul 2022 09:26:47 +0000 Subject: [PATCH] Add features for running archive-media simultaneously --- app.py | 17 ++++++-- cisticola/base.py | 4 +- cisticola/scraper/base.py | 57 ++++++++++++++------------ cisticola/scraper/telegram_telethon.py | 8 +++- 4 files changed, 53 insertions(+), 33 deletions(-) diff --git a/app.py b/app.py index b51f007..27b9093 100644 --- a/app.py +++ b/app.py @@ -35,14 +35,14 @@ def get_db_session(): return session -def get_scraper_controller(): +def get_scraper_controller(telethon_session_name = None): engine = create_engine(os.environ["DB"]) controller = ScraperController() controller.connect_to_db(engine) scrapers = [VkontakteScraper(), - TelegramTelethonScraper(), + TelegramTelethonScraper(telethon_session_name = telethon_session_name), GettrScraper(), BitchuteScraper(), RumbleScraper()] @@ -85,8 +85,15 @@ def scrape_channel_info(args): def archive_media(args): logger.info(f"Archiving unarchived media") - controller = get_scraper_controller() - controller.archive_unarchived_media() + if args.telethon_session: + controller = get_scraper_controller(telethon_session_name=args.telethon_session) + else: + controller = get_scraper_controller() + + if args.chronological: + controller.archive_unarchived_media(chronological=True) + else: + controller.archive_unarchived_media() def transform(args): logger.info(f"Transforming untransformed posts") @@ -127,6 +134,8 @@ if __name__ == "__main__": parser.add_argument( "--media", action="store_true", help="[scrape-channels] Add this flag to media" ) + parser.add_argument("--chronological", action="store_true") + parser.add_argument("--telethon_session", type=str) args = parser.parse_args() diff --git a/cisticola/base.py b/cisticola/base.py index 2a0cb9e..35645d6 100644 --- a/cisticola/base.py +++ b/cisticola/base.py @@ -354,6 +354,7 @@ class Media: blob = make_request(self.url) return blob.content + @logger.catch def hydrate(self, blob = None): """Download media file as bytes blob and extract data from content. """ @@ -382,6 +383,7 @@ class Image(Media): #: Extracted OCR content from image ocr: str = None + @logger.catch def hydrate(self, blob=None): """Download image file as bytes blob and extract Exif and OCR content from the image. @@ -510,7 +512,7 @@ media_table = Table('media', mapper_registry.metadata, Column('id', Integer, primary_key=True, autoincrement=True), Column('type', String), - Column('raw_id', Integer, ForeignKey('raw_posts.id')), + Column('raw_id', Integer, ForeignKey('raw_posts.id'), index=True), Column('post', Integer, ForeignKey('posts.id')), Column('url', String), Column('original_url', String), diff --git a/cisticola/scraper/base.py b/cisticola/scraper/base.py index 47a6635..892da45 100644 --- a/cisticola/scraper/base.py +++ b/cisticola/scraper/base.py @@ -408,11 +408,14 @@ class ScraperController: added = 0 # get most recent post + # Note: a "bug" in Postgres can cause this query to hang for a really long time + # when searching for a single row, hence the limit(10).all() when we really just need + # the first row. rows = session.query(ScraperResult).where( ScraperResult.channel == channel.id).order_by( - ScraperResult.date.desc()).limit(1).all() + ScraperResult.date.desc()).limit(10).all() - if len(rows) == 1: + if len(rows) > 0: since = rows[0] else: since = None @@ -424,9 +427,6 @@ class ScraperController: session.commit() added += 1 - if added > 100: - break - session.commit() logger.info( f"{scraper} found {added} new posts from {channel}") @@ -438,39 +438,44 @@ class ScraperController: session.close() @logger.catch(reraise = True) - def archive_unarchived_media(self): + def archive_unarchived_media(self, chronological=False): if self.session is None: logger.error("No DB session") return session = self.session() - # this query is really slow (~2.5 minutes) because of the shuffle. shuffling is so that multiple media archivers could work - # simultaneously with low risk of collision (at least while the number of unarchived items is very large) - posts = session.query(ScraperResult).where(ScraperResult.media_archived == None).order_by(func.random()).limit(4000).all() + while True: + if chronological: + posts = session.query(ScraperResult).where(ScraperResult.media_archived == None).order_by(ScraperResult.date.desc()).limit(5000).all() + else: + # this query is really slow (~2.5 minutes) because of the shuffle. shuffling is so that multiple media archivers could work + # simultaneously with low risk of collision (at least while the number of unarchived items is very large) + posts = session.query(ScraperResult).where(ScraperResult.media_archived == None).order_by(func.random()).limit(5000).all() - logger.info(f"Found {len(posts)} posts without media. Archiving now") + logger.info(f"Found {len(posts)} posts without media. Archiving now") - for post in posts: - handled = False + for post in posts: + handled = False - for scraper in self.scrapers: - # compare major versions - if scraper.__version__.split('.')[0] == post.scraper.split('.')[0]: - handled = True - logger.debug(f"{scraper} is archiving media for ID {post.id}") - post = scraper.archive_files(post) + for scraper in self.scrapers: + # compare major versions + if scraper.__version__.split('.')[0] == post.scraper.split('.')[0]: + handled = True + logger.debug(f"{scraper} is archiving media for ID {post.id}") + post = scraper.archive_files(post) - if post: - session.query(ScraperResult).where(ScraperResult.id == post.id).update({'archived_urls': post.archived_urls, 'media_archived': post.media_archived}) - session.commit() + if post: + session.query(ScraperResult).where(ScraperResult.id == post.id).update({'archived_urls': post.archived_urls, 'media_archived': post.media_archived}) + session.commit() - break + break + + if not handled: + logger.warning(f"No handler found for post scraped with {post.scraper}") + + session.commit() - if not handled: - logger.warning(f"No handler found for post scraped with {post.scraper}") - - session.commit() session.close() @logger.catch(reraise = True) diff --git a/cisticola/scraper/telegram_telethon.py b/cisticola/scraper/telegram_telethon.py index d82c1ab..f42d1c5 100644 --- a/cisticola/scraper/telegram_telethon.py +++ b/cisticola/scraper/telegram_telethon.py @@ -21,15 +21,18 @@ class TelegramTelethonScraper(Scraper): __version__ = "TelegramTelethonScraper 0.0.3" client = None - def __init__(self): + def __init__(self, telethon_session_name = None): super().__init__() api_id = os.environ['TELEGRAM_API_ID'] api_hash = os.environ['TELEGRAM_API_HASH'] phone = os.environ['TELEGRAM_PHONE'] + if telethon_session_name is None: + telethon_session_name = phone + # set up a persistent client for Telethon - self.client = TelegramClient(phone, api_id, api_hash) + self.client = TelegramClient(telethon_session_name, api_id, api_hash) self.client.connect() def __del__(self): @@ -70,6 +73,7 @@ class TelegramTelethonScraper(Scraper): message = self.client.get_messages(raw['peer_id']['channel_id'], ids=[raw['id']]) blob = None + output_file_with_ext = None if len(message) > 0 and message[0] is not None: blob, output_file_with_ext = self.archive_post_media(message[0]) else: