diff --git a/cisticola/base.py b/cisticola/base.py index 262089d..ffcbfa4 100644 --- a/cisticola/base.py +++ b/cisticola/base.py @@ -2,7 +2,7 @@ from typing import List from dataclasses import dataclass from datetime import datetime from sqlalchemy.orm import registry -from sqlalchemy import Table, Column, Integer, String, JSON, DateTime, ForeignKey +from sqlalchemy import Table, Column, Integer, String, JSON, DateTime, ForeignKey, Boolean import pytesseract import PIL import io @@ -20,7 +20,7 @@ class ScraperResult: scraper: str platform: str - channel: int #TODO there is probably a way of making this a Channel object foreign key + channel: int platform_id: str date: datetime raw_data: str @@ -33,7 +33,7 @@ raw_data_table = Table('raw_data', mapper_registry.metadata, autoincrement=True), Column('scraper', String), Column('platform', String), - Column('channel', Integer), + Column('channel', Integer, ForeignKey('channels.id')), Column('platform_id', String), Column('date', DateTime), Column('raw_data', String), @@ -45,25 +45,45 @@ mapper_registry.map_imperatively(ScraperResult, raw_data_table) @dataclass class Channel: - id: int name: str platform_id: str category: str - followers: int platform: str url: str screenname: str - country: str - influencer: str - public: bool - chat: bool - notes: str + country: str = None + influencer: str = None + public: bool = None + chat: bool = None + notes: str = "" + source: str = None + def hydrate(self): + pass + +channel_table = Table('channels', mapper_registry.metadata, + Column('id', Integer, primary_key=True, autoincrement=True), + Column('name', String), + Column('platform_id', Integer), + Column('category', String), + Column('platform', String), + Column('url', String), + Column('screenname', String), + Column('country', String), + Column('influencer', String), + Column('public', Boolean), + Column('chat', Boolean), + Column('notes', String), + Column('source', String) + ) + +mapper_registry.map_imperatively(Channel, channel_table) @dataclass class TransformedResult: """An object with fields for columns in the analysis table""" raw_id: int + platform_id: str scraper: str transformer: str platform: str @@ -74,6 +94,11 @@ class TransformedResult: author_id: str author_username: str content: str + forwarded_from: int = None + reply_to: int = None + + def hydrate(self): + pass @@ -81,16 +106,19 @@ analysis_table = Table('analysis', mapper_registry.metadata, Column('id', Integer, primary_key=True, autoincrement=True), Column('raw_id', Integer, ForeignKey('raw_data.id')), + Column('platform_id', Integer), Column('scraper', String), Column('transformer', String), Column('platform', String), - Column('channel', Integer), + Column('channel', Integer, ForeignKey('channels.id')), Column('date', DateTime), Column('date_archived', DateTime), Column('url', String), Column('author_id', String), Column('author_username', String), - Column('content', String) + Column('content', String), + Column('forwarded_from', Integer, ForeignKey('channels.id')), + Column('reply_to', Integer, ForeignKey('analysis.id')) ) mapper_registry.map_imperatively(TransformedResult, analysis_table) diff --git a/cisticola/transformer/base.py b/cisticola/transformer/base.py index ce22f03..2916f01 100644 --- a/cisticola/transformer/base.py +++ b/cisticola/transformer/base.py @@ -1,9 +1,11 @@ -from typing import List, Generator +from typing import List, Generator, Union, Callable from loguru import logger -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import sessionmaker, make_transient from sqlalchemy.engine.base import Engine +from collections import defaultdict + +from cisticola.base import ScraperResult, TransformedResult, Media, Channel, mapper_registry -from cisticola.base import ScraperResult, TransformedResult, Media, mapper_registry class Transformer: """Interface class for transformers.""" @@ -16,12 +18,12 @@ class Transformer: def can_handle(data: ScraperResult) -> bool: """Specifies whether or not a Transformer is capable of handling a particular piece of scraped data. - + Parameters ---------- data : ScraperResult The ScraperResult object to check for ability to handle. - + Returns ------- bool @@ -30,39 +32,18 @@ class Transformer: pass - def transform_media(self, data: ScraperResult, transformed: TransformedResult) -> Generator[Media, None, None]: - """Yields Media objects from each piece of media present in a raw ScraperResult. - - Parameters - ---------- - data : ScraperResult - The ScraperResult object to process - transformed : TransformedResult - The TransformedResult version of `data`. (E.g. as generated by `Transformer.transform()`) + def transform(data: ScraperResult, insert: Callable) -> Generator[Union[TransformedResult, Channel, Media], None, None]: + """Transform a ScraperResult into objects with additional parameters for analysis. This function can + yield multiple objects, as it will find references to quoted/replied posts, media objects, and Channel + objects and provide all of these to be inserted into the database. - Yields - ------ - Media - A media object generated from the ScraperResult. One ScraperResult can have multiple pieces - of media contained within it, so this can generate an arbitrary number of Media objects - (or their subclasses.) These Media objects are not fully hydrated. - """ - - pass - - def transform(data: ScraperResult) -> TransformedResult: - """Transform a ScraperResult into a TransformedResult object. This extracts additional attributes - that can be used directly for analysis. - Parameters ---------- data : ScraperResult The ScraperResult object to process. - - Returns - ------- - TransformedResult - A TransformedResult representation of the `data` object. + insert : Callable + A function that either inserts the object into a database or finds an object with the + relevant unique constraints if applicable. """ pass @@ -78,7 +59,7 @@ class ETLController: def register_transformer(self, transformer: Transformer): """Adds a Transformer to the list of available Transformers. - + Parameters ---------- transformer : Transformer @@ -89,7 +70,7 @@ class ETLController: def connect_to_db(self, engine: Engine): """Connects the ETLController to a SQLAlchemy engine. - + Parameters ---------- engine : Engine @@ -101,11 +82,59 @@ class ETLController: self.session = sessionmaker() self.session.configure(bind=engine) - @logger.catch(reraise = True) + def insert_or_select(self, obj, session, hydrate: bool = True): + """Inserts an object into the database or returns an existing object from the database. + Regardless, the resulting object has an `id` attribute that can be referenced later.""" + + instance = None + + # This is using some adhoc unique constraints that might be worth formalizing at some point + if type(obj) == Channel: + instance = session.query(Channel).filter_by(url=obj.url, platform_id=obj.platform_id, platform=obj.platform).first() + + elif type(obj) == TransformedResult: + instance = session.query(TransformedResult).filter_by(platform=obj.platform, platform_id=obj.platform_id).first() + + elif issubclass(type(obj), Media): + instance = session.query(type(obj)).filter_by(original_url=obj.original_url, post=obj.post).first() + if instance: + logger.info(f"Found matching DB entry for {obj}: {instance}") + return instance + + instance = session.query(type(obj)).filter_by(original_url=obj.original_url).first() + + # For Media objects we want to duplicate the entry to preserve the relationship with the post. + # However, we also want to avoid rehydration, hence the code below: + if instance: + logger.info(f"Found matching media record, duplicating and inserting for new post") + + session.expunge(instance) + make_transient(instance) + instance.id = None + instance.post = obj.post + instance.raw_id = obj.raw_id + + session.add(instance) + session.flush() + return instance + + if instance: + logger.info(f"Found matching DB entry for {obj}: {instance}") + return instance + + if hydrate: + obj.hydrate() + + logger.info(f"Inserting new object {obj}") + session.add(obj) + session.flush() + return obj + + @logger.catch(reraise=True) def transform_results(self, results: List[ScraperResult], hydrate: bool = True): """Transforms raw ScraperResults objects into TransformedResult objects and Media objects. Then, adds them to the database. - + Parameters ---------- results : List[ScraperResult] @@ -126,34 +155,18 @@ class ETLController: handled = True session = self.session() - transformed = transformer.transform(result) - - session.add(transformed) - session.flush() - - media = transformer.transform_media(result, transformed) - - count = 0 - for obj in media: - if hydrate: - logger.info(f"Hydrating {obj}") - obj.hydrate() - - session.add(obj) - count += 1 - + transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate)) session.commit() - logger.info(f"{transformer} generated {count} media objects") break if handled == False: logger.warning(f"No Transformer could handle {result}") - @logger.catch(reraise = True) + @logger.catch(reraise=True) def transform_all_untransformed(self, hydrate: bool = True): """Transform all ScraperResult objects in the database that do not have an equivalent TransformedResult object stored. - + Parameters ---------- hydrate : bool @@ -165,7 +178,12 @@ class ETLController: return session = self.session() - untransformed = session.query(ScraperResult).join(TransformedResult, isouter=True).where(TransformedResult.raw_id == None).all() + untransformed = ( + session.query(ScraperResult) + .join(TransformedResult, isouter=True) + .where(TransformedResult.raw_id == None) + .all() + ) logger.info(f"Found {len(untransformed)} items to ETL") - self.transform_results(untransformed, hydrate=hydrate) \ No newline at end of file + self.transform_results(untransformed, hydrate=hydrate) diff --git a/cisticola/transformer/twitter.py b/cisticola/transformer/twitter.py index 6c0838c..b51afb2 100644 --- a/cisticola/transformer/twitter.py +++ b/cisticola/transformer/twitter.py @@ -1,9 +1,10 @@ import json from loguru import logger -from typing import Generator +from typing import Generator, Union, Callable +import dateutil.parser from cisticola.transformer.base import Transformer -from cisticola.base import ScraperResult, TransformedResult, Image, Video, Media +from cisticola.base import ScraperResult, TransformedResult, Image, Video, Media, Channel class TwitterTransformer(Transformer): """A Twitter specific ScraperResult, with a method ETL/transforming""" @@ -17,11 +18,9 @@ class TwitterTransformer(Transformer): return False - def transform_media(self, data: ScraperResult, transformed: TransformedResult) -> Generator[Media, None, None]: - raw = json.loads(data.raw_data) - - if raw['media']: - for media in raw['media']: + def process_media(self, tweet, post_id, data): + if tweet['media']: + for media in tweet['media']: orig = None if media["_type"] == "snscrape.modules.twitter.Photo": @@ -40,26 +39,77 @@ class TwitterTransformer(Transformer): new = data.archived_urls[orig] if media["_type"] == "snscrape.modules.twitter.Photo": - m = Image(url=new, post=transformed.id, raw_id=data.id, original_url=orig) + m = Image(url=new, post=post_id, raw_id=data.id, original_url=orig) else: - m = Video(url=new, post=transformed.id, raw_id=data.id, original_url=orig) + m = Video(url=new, post=post_id, raw_id=data.id, original_url=orig) yield m - def transform(self, data: ScraperResult) -> TransformedResult: + + def transform(self, data: ScraperResult, insert: Callable) -> Generator[Union[TransformedResult, Channel, Media], None, None]: raw = json.loads(data.raw_data) transformed = TransformedResult( raw_id=data.id, + platform_id=raw['id'], scraper=data.scraper, transformer=self.__version__, platform=data.platform, channel=data.channel, - date=data.date, + date=dateutil.parser.parse(raw['date']), date_archived=data.date_archived, url=raw['url'], content=raw['content'], author_id=raw['user']['id'], author_username=raw['user']['username']) - return transformed + def subtweet(tweet): + channel = Channel( + name=tweet['user']['displayname'], + platform_id=tweet['user']['id'], + platform=data.platform, + url=tweet['user']['url'], + screenname=tweet['user']['username'], + category='forwarded', + source=self.__version__ + ) + + channel = insert(channel) + + original = TransformedResult( + raw_id=data.id, + platform_id=tweet['id'], + scraper=data.scraper, + transformer=self.__version__, + platform=data.platform, + channel=channel.id, + date=dateutil.parser.parse(tweet['date']), + date_archived=data.date_archived, + url=tweet['url'], + content=tweet['content'], + author_id=tweet['user']['id'], + author_username=tweet['user']['username'] + ) + + original = insert(original) + transformed.forwarded_from = channel.id + transformed.reply_to = original.id + + media = self.process_media(tweet, original.id, data) + for m in media: + insert(m) + + if raw['retweetedTweet'] is not None: + subtweet(raw['retweetedTweet']) + + if raw['quotedTweet'] is not None: + subtweet(raw['quotedTweet']) + + insert(transformed) + + media = self.process_media(raw, transformed.id, data) + for m in media: + insert(m) + + +