diff --git a/app.py b/app.py index 3ebc673..db75263 100644 --- a/app.py +++ b/app.py @@ -189,6 +189,11 @@ def transform(args): controller = get_transformer_controller() controller.transform_all_untransformed() +def transform_info(args): + logger.info(f"Transforming untransformed channel info") + + controller = get_transformer_controller() + controller.transform_all_untransformed_info() def init_db(): engine = create_engine(os.environ["DB"]) @@ -231,5 +236,8 @@ if __name__ == "__main__": elif args.command == "transform": logger.add("logs/transform.log", level="TRACE", rotation="100 MB") transform(args) + elif args.command == "transform-info": + logger.add("logs/transform-info.log", level="TRACE", rotation="100 MB") + transform_info(args) else: logger.error(f"Unrecognized command {args.command}") diff --git a/cisticola/base.py b/cisticola/base.py index 9ca32ee..50bf0c7 100644 --- a/cisticola/base.py +++ b/cisticola/base.py @@ -114,6 +114,49 @@ class RawChannelInfo: #: Datetime (relative to UTC) that the scraped post was archived at. date_archived: datetime +@dataclass +class ChannelInfo: + """A processed set of information about a channel. + """ + + # Foreign key from the raw_channel_info table + raw_channel_info_id: int + + # Foreign ckey from the channels table + channel: int + + # platform specific ID of the channel + platform_id: str + + #: Name of platform from which result was scraped, e.g. ``"Twitter"``. + platform: str + + #: String specifying name and version of scraper used to generate result, e.g. ``"TwitterScraper 0.0.1"``. + scraper: str + + #: String specifying name and version of transformer used to tranform result, e.g. ``"TwitterTransformer 0.0.1"``. + transformer: str + + #: attributes extracted from the raw channel info object + screenname: str + name: str + description: str + description_url: str + description_location: str + followers: int + following: int + verified: bool + date_created: datetime + + #: Datetime (relative to UTC) that the scraped channel info was archived at. + date_archived: datetime + + #: Datetime (UTC) that the scraped channel info was transformed at. + date_transformed: datetime + + def hydrate(self): + pass + nlp_en = spacy.load('en_core_web_sm', disable=['parser', 'tok2vec', 'attribute_ruler']) nlp_de = spacy.load('de_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler']) nlp_it = spacy.load('it_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler']) @@ -349,6 +392,23 @@ raw_channel_info_table = Table('raw_channel_info', mapper_registry.metadata, Column('raw_data', String), Column('date_archived', DateTime, index=True)) +channel_info_table = Table('channel_info', mapper_registry.metadata, + Column('id', Integer, primary_key=True, autoincrement=True), + Column('raw_channel_info_id', Integer, ForeignKey('raw_channel_info.id'), index=True), + Column('channel', Integer, ForeignKey('channels.id'), index=True), + Column('screenname', String), + Column('name', String), + Column('description', String), + Column('description_url', String), + Column('description_location', String), + Column('followers', Integer), + Column('following', Integer), + Column('verified', Boolean), + Column('date_created', DateTime), + Column('date_archived', DateTime, index=True), + Column('date_transformed', DateTime, index=True) + ) + channel_table = Table('channels', mapper_registry.metadata, Column('id', Integer, primary_key=True, autoincrement=True), Column('name', String), @@ -406,6 +466,7 @@ mapper_registry.map_imperatively(Post, post_table) mapper_registry.map_imperatively(Channel, channel_table) mapper_registry.map_imperatively(ScraperResult, raw_posts_table) mapper_registry.map_imperatively(RawChannelInfo, raw_channel_info_table) +mapper_registry.map_imperatively(ChannelInfo, channel_info_table) mapper_registry.map_imperatively(Media, media_table, polymorphic_on='type', polymorphic_identity='media') mapper_registry.map_imperatively(Image, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='image') mapper_registry.map_imperatively(Video, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='video') \ No newline at end of file diff --git a/cisticola/transformer/base.py b/cisticola/transformer/base.py index 385f593..053de19 100644 --- a/cisticola/transformer/base.py +++ b/cisticola/transformer/base.py @@ -6,7 +6,7 @@ from sqlalchemy.sql.expression import func from collections import defaultdict from datetime import datetime -from cisticola.base import ScraperResult, Post, Media, Channel, mapper_registry +from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Media, Channel, mapper_registry class Transformer: @@ -194,27 +194,71 @@ class ETLController: batch = [] query = (session.query(ScraperResult) - # .filter_by(platform="Telegram") - # .filter(ScraperResult.raw_data.notlike("%MessageService%")) .join(Post, isouter=True) .where(Post.raw_id == None) - # .order_by(func.random()) .order_by(ScraperResult.date.asc()) ) while len(batch) > 0 or offset == 0: - logger.info(f"Fetching untransformed batch of {BATCH_SIZE}, offset {offset}") + logger.info(f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {offset}") batch = query.slice(offset, offset + BATCH_SIZE).all() - # untransformed = ( - - # .limit(BATCH_SIZE) - # .offset(offset) - # .all() - # ) - offset += BATCH_SIZE logger.info(f"Found {len(batch)} items to ETL ({offset} already processed)") self.transform_results(batch, hydrate=hydrate) + + @logger.catch(reraise=True) + def transform_info(self, results: List[ChannelInfo]): + if self.session is None: + logger.error("No DB session") + return + + session = self.session() + + for result in results: + if result.scraper is not None and result.platform is not None: + for transformer in self.transformers: + handled = False + + if transformer.can_handle(result): + logger.trace(f"{transformer} is handling raw info result {result.id} ({result.date_archived})") + handled = True + + transformer.transform_info(result, lambda obj: self.insert_or_select(obj, session, False), session) + + session.commit() + break + + if handled == False: + logger.warning(f"No Transformer could handle raw channel info ID {result.id} with platform {result.platform} ({result.date_archived})") + + + @logger.catch(reraise=True) + def transform_all_untransformed_info(self): + if self.session is None: + logger.error("No DB session") + return + + session = self.session() + + BATCH_SIZE = 50000 + offset = 0 + batch = [] + + query = (session.query(RawChannelInfo) + .join(ChannelInfo, isouter=True) + .where(ChannelInfo.raw_channel_info_id == None) + .order_by(RawChannelInfo.date_archived.asc()) + ) + + while len(batch) > 0 or offset == 0: + logger.info(f"Fetching untransformed info batch of {BATCH_SIZE}, offset {offset}") + + batch = query.slice(offset, offset + BATCH_SIZE).all() + offset += BATCH_SIZE + + logger.info(f"Found {len(batch)} info items to ETL ({offset} already processed)") + + self.transform_info(batch) diff --git a/cisticola/transformer/telegram_telethon.py b/cisticola/transformer/telegram_telethon.py index b98003e..f233516 100644 --- a/cisticola/transformer/telegram_telethon.py +++ b/cisticola/transformer/telegram_telethon.py @@ -12,11 +12,11 @@ import os from datetime import datetime, timezone from cisticola.transformer.base import Transformer -from cisticola.base import ScraperResult, Post, Image, Video, Media, Channel +from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Image, Video, Media, Channel class TelegramTelethonTransformer(Transformer): - __version__ = 'TelegramTelethonTransformer 0.0.1' + __version__ = 'TelegramTelethonTransformer 0.0.2' bad_channels = {} @@ -88,6 +88,33 @@ class TelegramTelethonTransformer(Transformer): return name + def transform_info(self, data: RawChannelInfo, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: + raw = json.loads(data.raw_data) + + chat_raw = raw['chats'][0] + + transformed = ChannelInfo( + raw_channel_info_id=data.id, + channel=data.channel, + platform_id=raw['full_chat']['id'], + platform=data.platform, + scraper=data.scraper, + transformer=self.__version__, + screenname=chat_raw['username'], + name=chat_raw['title'], + description=raw['full_chat']['about'], + description_url='', # does not exist for Telegram + description_location='', # does not exist for Telegram + followers=raw['full_chat']['participants_count'], + following=-1, # does not exist for Telegram + verified=False, #does not exist for Telegram + date_created=dateutil.parser.parse(chat_raw['date']), + date_archived=data.date_archived, + date_transformed=datetime.now(timezone.utc) + ) + + transformed = insert(transformed) + def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: raw = json.loads(data.raw_data)