From f29da4d5f344009869ea9d3713cf44e845738122 Mon Sep 17 00:00:00 2001 From: Tristan Lee Date: Wed, 26 Oct 2022 07:20:19 -0500 Subject: [PATCH] added capability to retransform/update posts in database --- app.py | 9 ++ cisticola/base.py | 4 +- cisticola/transformer/base.py | 102 ++++++++++++++++++- cisticola/transformer/bitchute.py | 2 +- cisticola/transformer/gettr.py | 2 +- cisticola/transformer/rumble.py | 2 +- cisticola/transformer/telegram_telethon.py | 6 +- cisticola/transformer/twitter.py | 2 +- cisticola/transformer/vkontakte.py | 2 +- retransform.py | 113 +++++++++++++++++++++ 10 files changed, 228 insertions(+), 16 deletions(-) create mode 100644 retransform.py diff --git a/app.py b/app.py index 27b9093..b0c31e6 100644 --- a/app.py +++ b/app.py @@ -113,6 +113,12 @@ def transform_media(args): controller = get_transformer_controller() controller.transform_all_untransformed_media() +def retransform(args): + logger.info(f"Transforming untransformed posts") + + controller = get_transformer_controller() + controller.retransform_all(query_kwargs = {'platform': 'Telegram'}) + def init_db(): engine = create_engine(os.environ["DB"]) mapper_registry.metadata.create_all(bind=engine) @@ -162,5 +168,8 @@ if __name__ == "__main__": elif args.command == "transform-media": logger.add("logs/transform-media.log", level="TRACE", rotation="100 MB") transform_media(args) + elif args.command == "retransform": + logger.add("logs/retransform.log", level="TRACE", rotation="100 MB") + retransform(args) else: logger.error(f"Unrecognized command {args.command}") diff --git a/cisticola/base.py b/cisticola/base.py index 01df6f4..7a1d45a 100644 --- a/cisticola/base.py +++ b/cisticola/base.py @@ -475,7 +475,7 @@ channel_table = Table('channels', mapper_registry.metadata, Column('platform', String), Column('url', String), Column('screenname', String), - Column('country', String), + Column('country', JSON), Column('influencer', String), Column('public', Boolean), Column('chat', Boolean), @@ -511,7 +511,7 @@ post_table = Table('posts', mapper_registry.metadata, Column('views', Integer), Column('video_title', String), Column('video_duration', Integer), - Column('detected_language', String), + Column('detected_language', String, index = True), Column('normalized_content', String) ) diff --git a/cisticola/transformer/base.py b/cisticola/transformer/base.py index c9b777a..2649032 100644 --- a/cisticola/transformer/base.py +++ b/cisticola/transformer/base.py @@ -1,4 +1,4 @@ -from typing import List, Generator, Union, Callable +from typing import List, Generator, Union, Callable, Tuple from loguru import logger from sqlalchemy import cast, String from sqlalchemy.orm import sessionmaker, make_transient @@ -6,6 +6,7 @@ from sqlalchemy.engine.base import Engine from sqlalchemy.sql.expression import func from collections import defaultdict from datetime import datetime, timezone +import dataclasses from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Media, Channel, mapper_registry, Image, Video, Audio @@ -35,10 +36,8 @@ class Transformer: pass - def transform(data: ScraperResult, insert: Callable) -> Generator[Union[Post, Channel, Media], None, None]: - """Transform a ScraperResult into objects with additional parameters for analysis. This function can - yield multiple objects, as it will find references to quoted/replied posts, media objects, and Channel - objects and provide all of these to be inserted into the database. + def get_transformed_post(data: ScraperResult, insert: Callable) -> Post: + """Return the transformed Post from a ScraperResult. Parameters ---------- @@ -51,6 +50,21 @@ class Transformer: pass + def transform(self, data: ScraperResult, insert: Callable, session) -> None: + """Transform a ScraperResult into objects with additional parameters for analysis. + + Parameters + ---------- + data : ScraperResult + The ScraperResult object to process. + insert : Callable + A function that either inserts the object into a database or finds an object with the + relevant unique constraints if applicable. + """ + + transformed = self.get_transformed_post(data = data, insert = insert, session = session) + transformed = insert(transformed) + def transform_media(self, data: ScraperResult, transformed: Post, insert: Callable): '''Transform media''' for k in data.archived_urls: @@ -195,6 +209,42 @@ class ETLController: if handled == False: logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})") + @logger.catch(reraise=True) + def retransform_results(self, results: List[Tuple[ScraperResult, Post]], columns: List[str] = None, hydrate: bool = True): + + if self.session is None: + logger.error("No DB session") + return + + session = self.session() + + # default to updating all fields + if 'date_transformed' not in columns: + columns.append('date_transformed') + + for result, old_post in results: + if result.scraper is not None and result.platform is not None: + for transformer in self.transformers: + handled = False + + if transformer.can_handle(result): + logger.trace(f"{transformer} is handling result {result.id} ({result.date})") + handled = True + + new_post = transformer.get_transformed_post(result, lambda obj: self.insert_or_select(obj, session, hydrate), session) + + if hydrate: + new_post.hydrate() + + for column in columns: + setattr(old_post, column, getattr(new_post, column)) + + session.commit() + break + + if handled == False: + logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})") + @logger.catch(reraise=True) def transform_all_untransformed(self, hydrate: bool = True): """Transform all ScraperResult objects in the database that do not have an @@ -240,7 +290,49 @@ class ETLController: .limit(BATCH_SIZE) ).all() + def retransform_all(self, columns = None, query_kwargs: dict = None, hydrate: bool = True): + if self.session is None: + logger.error("No DB session") + return + if columns is None: + columns = [field.name for field in dataclasses.fields(Post)] + + elif isinstance(columns, list): + if len(columns) == 0: + raise ValueError('`columns` argument must be non-empty list of strings specifying columns to update') + + if query_kwargs is None: + query_kwargs = {} + + session = self.session() + + BATCH_SIZE = 5000 + batch = [] + + logger.info(f"Fetching first post batch of {BATCH_SIZE} to re-transform") + + batch = (session.query(ScraperResult, Post) + .filter(Post.raw_id == ScraperResult.id) + .filter_by(**query_kwargs) + .order_by(ScraperResult.date.asc()) + .limit(BATCH_SIZE) + ).all() + + while len(batch) > 0: + logger.info(f"Found {len(batch)} items to ETL") + + self.retransform_results(batch, hydrate=hydrate, columns=columns) + + logger.info(f"Fetching posts batch of {BATCH_SIZE} to re-transform, offset {max([raw.date for raw, _ in batch])}") + + batch = (session.query(ScraperResult, Post) + .filter(Post.raw_id == ScraperResult.id) + .filter_by(**query_kwargs) + .where(ScraperResult.date >= max([raw.date for raw, _ in batch])) + .order_by(ScraperResult.date.asc()) + .limit(BATCH_SIZE) + ).all() @logger.catch(reraise=True) def transform_info(self, results: List[ChannelInfo]): diff --git a/cisticola/transformer/bitchute.py b/cisticola/transformer/bitchute.py index 3619f75..aecfb62 100644 --- a/cisticola/transformer/bitchute.py +++ b/cisticola/transformer/bitchute.py @@ -56,7 +56,7 @@ class BitchuteTransformer(Transformer): transformed = insert(transformed) - def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: + def get_transformed_post(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: raw = json.loads(data.raw_data) if raw['category'] == 'comment': diff --git a/cisticola/transformer/gettr.py b/cisticola/transformer/gettr.py index b6cf0e7..42098ae 100644 --- a/cisticola/transformer/gettr.py +++ b/cisticola/transformer/gettr.py @@ -81,7 +81,7 @@ class GettrTransformer(Transformer): return channel.id - def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: + def get_transformed_post(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: raw = json.loads(data.raw_data) if raw["activity"]["action"] == "shares_pst": diff --git a/cisticola/transformer/rumble.py b/cisticola/transformer/rumble.py index 9c71702..af49938 100644 --- a/cisticola/transformer/rumble.py +++ b/cisticola/transformer/rumble.py @@ -57,7 +57,7 @@ class RumbleTransformer(Transformer): transformed = insert(transformed) - def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: + def get_transformed_post(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: raw = json.loads(data.raw_data) transformed = Post( diff --git a/cisticola/transformer/telegram_telethon.py b/cisticola/transformer/telegram_telethon.py index 444460d..cd9d575 100644 --- a/cisticola/transformer/telegram_telethon.py +++ b/cisticola/transformer/telegram_telethon.py @@ -123,7 +123,7 @@ class TelegramTelethonTransformer(Transformer): transformed = insert(transformed) - def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: + def get_transformed_post(self, data: ScraperResult, insert: Callable, session) -> Post: raw = json.loads(data.raw_data) if raw['_'] != 'Message': @@ -206,7 +206,7 @@ class TelegramTelethonTransformer(Transformer): url = "" author_username = "" - transformed = Post( + return Post( raw_id = data.id, platform_id = raw['id'], scraper = data.scraper, @@ -227,8 +227,6 @@ class TelegramTelethonTransformer(Transformer): views = raw.get('views') ) - transformed = insert(transformed) - def stripped(s): """https://stackoverflow.com/a/29933716""" diff --git a/cisticola/transformer/twitter.py b/cisticola/transformer/twitter.py index f5e5a01..dee9822 100644 --- a/cisticola/transformer/twitter.py +++ b/cisticola/transformer/twitter.py @@ -72,7 +72,7 @@ class TwitterTransformer(Transformer): transformed = insert(transformed) - def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: + def get_transformed_post(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: raw = json.loads(data.raw_data) transformed = Post( diff --git a/cisticola/transformer/vkontakte.py b/cisticola/transformer/vkontakte.py index 2351972..de680d8 100644 --- a/cisticola/transformer/vkontakte.py +++ b/cisticola/transformer/vkontakte.py @@ -46,7 +46,7 @@ class VkontakteTransformer(Transformer): transformed = insert(transformed) - def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: + def get_transformed_post(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: raw = json.loads(data.raw_data) transformed = Post( diff --git a/retransform.py b/retransform.py new file mode 100644 index 0000000..35652f9 --- /dev/null +++ b/retransform.py @@ -0,0 +1,113 @@ +import argparse +from loguru import logger +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +import os +import sys + +from cisticola.base import mapper_registry +from cisticola.scraper import ( + ScraperController, + VkontakteScraper, + TelegramTelethonScraper, + GettrScraper, + BitchuteScraper, + RumbleScraper, +) +from cisticola.transformer import ( + ETLController, + TelegramTelethonTransformer, + GettrTransformer, + RumbleTransformer, + BitchuteTransformer, + VkontakteTransformer, +) + +from sync_with_gsheet import sync_channels + +def get_db_session(): + engine = create_engine(os.environ["DB"]) + + session_generator = sessionmaker() + session_generator.configure(bind=engine) + session = session_generator() + + return session + + +def get_scraper_controller(telethon_session_name = None): + engine = create_engine(os.environ["DB"]) + + controller = ScraperController() + controller.connect_to_db(engine) + + scrapers = [VkontakteScraper(), + TelegramTelethonScraper(telethon_session_name = telethon_session_name), + GettrScraper(), + BitchuteScraper(), + RumbleScraper()] + + controller.register_scrapers(scrapers) + + return controller + +def get_transformer_controller(): + engine = create_engine(os.environ["DB"]) + + controller = ETLController() + controller.connect_to_db(engine) + + transformers = [VkontakteTransformer(), + TelegramTelethonTransformer(), + GettrTransformer(), + BitchuteTransformer(), + RumbleTransformer()] + + controller.register_transformers(transformers) + + return controller + + +def scrape_channels(args): + logger.info(f"Scraping channels, media: {args.media}") + + controller = get_scraper_controller() + controller.scrape_all_channels(archive_media=args.media) + + +def scrape_channel_info(args): + logger.info(f"Scraping channel info") + + controller = get_scraper_controller() + controller.scrape_all_channel_info() + + +def archive_media(args): + logger.info(f"Archiving unarchived media") + + if args.telethon_session: + controller = get_scraper_controller(telethon_session_name=args.telethon_session) + else: + controller = get_scraper_controller() + + if args.chronological: + controller.archive_unarchived_media(chronological=True) + else: + controller.archive_unarchived_media() + +def retransform(): + logger.info(f"Transforming untransformed posts") + + controller = get_transformer_controller() + controller.retransform_all(query_kwargs = {'channel': 6}) + +def init_db(): + engine = create_engine(os.environ["DB"]) + mapper_registry.metadata.create_all(bind=engine) + + +if __name__ == "__main__": + + retransform() + +