From 9948af2c4a4f67d5434765e00de75e1137a4dd39 Mon Sep 17 00:00:00 2001 From: Logan Williams Date: Wed, 8 Jun 2022 16:41:46 +0200 Subject: [PATCH] Media archiving ETL working for Telegram --- app.py | 9 +++ cisticola/base.py | 12 +++- cisticola/transformer/base.py | 73 ++++++++++++++++++++++ cisticola/transformer/telegram_telethon.py | 31 +++++---- 4 files changed, 111 insertions(+), 14 deletions(-) diff --git a/app.py b/app.py index 9fc0cb4..b51f007 100644 --- a/app.py +++ b/app.py @@ -100,6 +100,12 @@ def transform_info(args): controller = get_transformer_controller() controller.transform_all_untransformed_info() +def transform_media(args): + logger.info(f"Transforming untransformed channel media") + + controller = get_transformer_controller() + controller.transform_all_untransformed_media() + def init_db(): engine = create_engine(os.environ["DB"]) mapper_registry.metadata.create_all(bind=engine) @@ -144,5 +150,8 @@ if __name__ == "__main__": elif args.command == "transform-info": logger.add("logs/transform-info.log", level="TRACE", rotation="100 MB") transform_info(args) + elif args.command == "transform-media": + logger.add("logs/transform-media.log", level="TRACE", rotation="100 MB") + transform_media(args) else: logger.error(f"Unrecognized command {args.command}") diff --git a/cisticola/base.py b/cisticola/base.py index 28ff9f8..826c948 100644 --- a/cisticola/base.py +++ b/cisticola/base.py @@ -384,7 +384,14 @@ class Image(Media): @dataclass class Video(Media): - """Class for organizing information about an image file. + """Class for organizing information about an video file. + """ + + pass + +@dataclass +class Audio(Media): + """Class for organizing information about an audio file. """ pass @@ -499,4 +506,5 @@ mapper_registry.map_imperatively(RawChannelInfo, raw_channel_info_table) mapper_registry.map_imperatively(ChannelInfo, channel_info_table) mapper_registry.map_imperatively(Media, media_table, polymorphic_on='type', polymorphic_identity='media') mapper_registry.map_imperatively(Image, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='image') -mapper_registry.map_imperatively(Video, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='video') \ No newline at end of file +mapper_registry.map_imperatively(Video, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='video') +mapper_registry.map_imperatively(Audio, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='audio') diff --git a/cisticola/transformer/base.py b/cisticola/transformer/base.py index 32ed37c..2dafe6c 100644 --- a/cisticola/transformer/base.py +++ b/cisticola/transformer/base.py @@ -264,3 +264,76 @@ class ETLController: logger.info(f"Found {len(batch)} info items to ETL ({offset} already processed)") self.transform_info(batch) + + @logger.catch(reraise=True) + def transform_media(self, results: List, hydrate: bool = True): + """Transforms raw ScraperResults objects into Post objects and + Media objects. Then, adds them to the database. + + Parameters + ---------- + results : List[ScraperResult] + A list of ScraperResult objects to be transformed + hydrate : bool + Whether or not to fully hydrate transformed media. Default True. + """ + if self.session is None: + logger.error("No DB session") + return + + session = self.session() + + for total_result in results: + result = total_result.ScraperResult + if result.scraper is not None and result.platform is not None: + for transformer in self.transformers: + handled = False + + if transformer.can_handle(result): + logger.trace(f"{transformer} is handling result {result.id} ({result.date})") + handled = True + + transformer.transform_media(result, total_result.Post, lambda obj: self.insert_or_select(obj, session, hydrate), session) + + session.commit() + break + + if handled == False: + logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})") + + @logger.catch(reraise=True) + def transform_all_untransformed_media(self, hydrate=True): + """Transform all ScraperResult objects in the database that do not have an + equivalent Post object stored. + + Parameters + ---------- + hydrate : bool + Whether or not to fully hydrate transformed media. Default True. + """ + + if self.session is None: + logger.error("No DB session") + return + + session = self.session() + + BATCH_SIZE = 50000 + offset = 0 + batch = [] + + query = (session.query(ScraperResult, Post) + .join(Post) + .filter((ScraperResult.media_archived != None) & (ScraperResult.archived_urls != '{}')) + .order_by(ScraperResult.date.asc()) + ) + + while len(batch) > 0 or offset == 0: + logger.info(f"Fetching untransformed post media batch of {BATCH_SIZE}, offset {offset}") + + batch = query.slice(offset, offset + BATCH_SIZE).all() + offset += BATCH_SIZE + + logger.info(f"Found {len(batch)} items to ETL ({offset} already processed)") + + self.transform_media(batch, hydrate=hydrate) diff --git a/cisticola/transformer/telegram_telethon.py b/cisticola/transformer/telegram_telethon.py index b5326e2..e49acc6 100644 --- a/cisticola/transformer/telegram_telethon.py +++ b/cisticola/transformer/telegram_telethon.py @@ -16,7 +16,7 @@ from datetime import datetime, timezone from sqlalchemy import func from cisticola.transformer.base import Transformer -from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Image, Video, Media, Channel +from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Image, Video, Audio, Media, Channel class TelegramTelethonTransformer(Transformer): @@ -121,6 +121,23 @@ class TelegramTelethonTransformer(Transformer): transformed = insert(transformed) + def transform_media(self, data: ScraperResult, transformed: Post, insert: Callable, session): + for k in data.archived_urls: + if data.archived_urls[k]: + archived_url = data.archived_urls[k] + filename = archived_url.split('/')[-1] + ext = None if '.' not in filename else filename.split('.')[-1].lower() + + if ext == 'mp4' or ext == 'mov' or ext == 'avi' or ext =='mkv': + insert(Video(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k)) + elif ext == 'oga' or ext == 'mp3' or ext == "wav" or ext == 'aif' or ext == 'aiff' or ext == 'aac': + insert(Audio(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k)) + elif ext == 'jpg' or ext == 'jpeg' or ext == 'png' or ext == 'gif' or ext == 'bmp' or ext == 'heic' or ext == 'tiff': + insert(Image(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k)) + else: + logger.warning(f"Unknown file extension {ext}") + insert(Media(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k)) + def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: raw = json.loads(data.raw_data) @@ -227,16 +244,6 @@ class TelegramTelethonTransformer(Transformer): transformed = insert(transformed) - # for k in data.archived_urls: - # if data.archived_urls[k]: - # archived_url = data.archived_urls[k] - # ext = archived_url.split('.')[-1] - - # if ext == 'mp4' or ext == 'mov' or ext == 'avi' or ext =='mkv': - # insert(Video(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k)) - # else: - # insert(Image(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k)) - def add_markdown_links(raw_post): global_offset = 0 @@ -257,4 +264,4 @@ def add_markdown_links(raw_post): transformed_content = before_link + link_text + link_href + trailing_whitespace + after_link global_offset += (4 + len(url)) - return transformed_content \ No newline at end of file + return transformed_content