mirror of
https://github.com/bellingcat/cisticola.git
synced 2026-06-08 03:18:34 +03:00
Add ORM for Channel class; update foreign key relations; add platform_id to TransformedResult
This commit is contained in:
@@ -2,7 +2,7 @@ from typing import List
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from sqlalchemy.orm import registry
|
||||
from sqlalchemy import Table, Column, Integer, String, JSON, DateTime, ForeignKey
|
||||
from sqlalchemy import Table, Column, Integer, String, JSON, DateTime, ForeignKey, Boolean
|
||||
import pytesseract
|
||||
import PIL
|
||||
import io
|
||||
@@ -20,7 +20,7 @@ class ScraperResult:
|
||||
|
||||
scraper: str
|
||||
platform: str
|
||||
channel: int #TODO there is probably a way of making this a Channel object foreign key
|
||||
channel: int
|
||||
platform_id: str
|
||||
date: datetime
|
||||
raw_data: str
|
||||
@@ -33,7 +33,7 @@ raw_data_table = Table('raw_data', mapper_registry.metadata,
|
||||
autoincrement=True),
|
||||
Column('scraper', String),
|
||||
Column('platform', String),
|
||||
Column('channel', Integer),
|
||||
Column('channel', Integer, ForeignKey('channels.id')),
|
||||
Column('platform_id', String),
|
||||
Column('date', DateTime),
|
||||
Column('raw_data', String),
|
||||
@@ -45,25 +45,45 @@ mapper_registry.map_imperatively(ScraperResult, raw_data_table)
|
||||
|
||||
@dataclass
|
||||
class Channel:
|
||||
id: int
|
||||
name: str
|
||||
platform_id: str
|
||||
category: str
|
||||
followers: int
|
||||
platform: str
|
||||
url: str
|
||||
screenname: str
|
||||
country: str
|
||||
influencer: str
|
||||
public: bool
|
||||
chat: bool
|
||||
notes: str
|
||||
country: str = None
|
||||
influencer: str = None
|
||||
public: bool = None
|
||||
chat: bool = None
|
||||
notes: str = ""
|
||||
source: str = None
|
||||
|
||||
def hydrate(self):
|
||||
pass
|
||||
|
||||
channel_table = Table('channels', mapper_registry.metadata,
|
||||
Column('id', Integer, primary_key=True, autoincrement=True),
|
||||
Column('name', String),
|
||||
Column('platform_id', Integer),
|
||||
Column('category', String),
|
||||
Column('platform', String),
|
||||
Column('url', String),
|
||||
Column('screenname', String),
|
||||
Column('country', String),
|
||||
Column('influencer', String),
|
||||
Column('public', Boolean),
|
||||
Column('chat', Boolean),
|
||||
Column('notes', String),
|
||||
Column('source', String)
|
||||
)
|
||||
|
||||
mapper_registry.map_imperatively(Channel, channel_table)
|
||||
|
||||
@dataclass
|
||||
class TransformedResult:
|
||||
"""An object with fields for columns in the analysis table"""
|
||||
raw_id: int
|
||||
platform_id: str
|
||||
scraper: str
|
||||
transformer: str
|
||||
platform: str
|
||||
@@ -74,6 +94,11 @@ class TransformedResult:
|
||||
author_id: str
|
||||
author_username: str
|
||||
content: str
|
||||
forwarded_from: int = None
|
||||
reply_to: int = None
|
||||
|
||||
def hydrate(self):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
@@ -81,16 +106,19 @@ analysis_table = Table('analysis', mapper_registry.metadata,
|
||||
Column('id', Integer, primary_key=True,
|
||||
autoincrement=True),
|
||||
Column('raw_id', Integer, ForeignKey('raw_data.id')),
|
||||
Column('platform_id', Integer),
|
||||
Column('scraper', String),
|
||||
Column('transformer', String),
|
||||
Column('platform', String),
|
||||
Column('channel', Integer),
|
||||
Column('channel', Integer, ForeignKey('channels.id')),
|
||||
Column('date', DateTime),
|
||||
Column('date_archived', DateTime),
|
||||
Column('url', String),
|
||||
Column('author_id', String),
|
||||
Column('author_username', String),
|
||||
Column('content', String)
|
||||
Column('content', String),
|
||||
Column('forwarded_from', Integer, ForeignKey('channels.id')),
|
||||
Column('reply_to', Integer, ForeignKey('analysis.id'))
|
||||
)
|
||||
|
||||
mapper_registry.map_imperatively(TransformedResult, analysis_table)
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
from typing import List, Generator
|
||||
from typing import List, Generator, Union, Callable
|
||||
from loguru import logger
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from sqlalchemy.orm import sessionmaker, make_transient
|
||||
from sqlalchemy.engine.base import Engine
|
||||
from collections import defaultdict
|
||||
|
||||
from cisticola.base import ScraperResult, TransformedResult, Media, Channel, mapper_registry
|
||||
|
||||
from cisticola.base import ScraperResult, TransformedResult, Media, mapper_registry
|
||||
|
||||
class Transformer:
|
||||
"""Interface class for transformers."""
|
||||
@@ -16,12 +18,12 @@ class Transformer:
|
||||
def can_handle(data: ScraperResult) -> bool:
|
||||
"""Specifies whether or not a Transformer is capable of handling a particular
|
||||
piece of scraped data.
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
data : ScraperResult
|
||||
The ScraperResult object to check for ability to handle.
|
||||
|
||||
|
||||
Returns
|
||||
-------
|
||||
bool
|
||||
@@ -30,39 +32,18 @@ class Transformer:
|
||||
|
||||
pass
|
||||
|
||||
def transform_media(self, data: ScraperResult, transformed: TransformedResult) -> Generator[Media, None, None]:
|
||||
"""Yields Media objects from each piece of media present in a raw ScraperResult.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
data : ScraperResult
|
||||
The ScraperResult object to process
|
||||
transformed : TransformedResult
|
||||
The TransformedResult version of `data`. (E.g. as generated by `Transformer.transform()`)
|
||||
def transform(data: ScraperResult, insert: Callable) -> Generator[Union[TransformedResult, Channel, Media], None, None]:
|
||||
"""Transform a ScraperResult into objects with additional parameters for analysis. This function can
|
||||
yield multiple objects, as it will find references to quoted/replied posts, media objects, and Channel
|
||||
objects and provide all of these to be inserted into the database.
|
||||
|
||||
Yields
|
||||
------
|
||||
Media
|
||||
A media object generated from the ScraperResult. One ScraperResult can have multiple pieces
|
||||
of media contained within it, so this can generate an arbitrary number of Media objects
|
||||
(or their subclasses.) These Media objects are not fully hydrated.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
def transform(data: ScraperResult) -> TransformedResult:
|
||||
"""Transform a ScraperResult into a TransformedResult object. This extracts additional attributes
|
||||
that can be used directly for analysis.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
data : ScraperResult
|
||||
The ScraperResult object to process.
|
||||
|
||||
Returns
|
||||
-------
|
||||
TransformedResult
|
||||
A TransformedResult representation of the `data` object.
|
||||
insert : Callable
|
||||
A function that either inserts the object into a database or finds an object with the
|
||||
relevant unique constraints if applicable.
|
||||
"""
|
||||
|
||||
pass
|
||||
@@ -78,7 +59,7 @@ class ETLController:
|
||||
|
||||
def register_transformer(self, transformer: Transformer):
|
||||
"""Adds a Transformer to the list of available Transformers.
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
transformer : Transformer
|
||||
@@ -89,7 +70,7 @@ class ETLController:
|
||||
|
||||
def connect_to_db(self, engine: Engine):
|
||||
"""Connects the ETLController to a SQLAlchemy engine.
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
engine : Engine
|
||||
@@ -101,11 +82,59 @@ class ETLController:
|
||||
self.session = sessionmaker()
|
||||
self.session.configure(bind=engine)
|
||||
|
||||
@logger.catch(reraise = True)
|
||||
def insert_or_select(self, obj, session, hydrate: bool = True):
|
||||
"""Inserts an object into the database or returns an existing object from the database.
|
||||
Regardless, the resulting object has an `id` attribute that can be referenced later."""
|
||||
|
||||
instance = None
|
||||
|
||||
# This is using some adhoc unique constraints that might be worth formalizing at some point
|
||||
if type(obj) == Channel:
|
||||
instance = session.query(Channel).filter_by(url=obj.url, platform_id=obj.platform_id, platform=obj.platform).first()
|
||||
|
||||
elif type(obj) == TransformedResult:
|
||||
instance = session.query(TransformedResult).filter_by(platform=obj.platform, platform_id=obj.platform_id).first()
|
||||
|
||||
elif issubclass(type(obj), Media):
|
||||
instance = session.query(type(obj)).filter_by(original_url=obj.original_url, post=obj.post).first()
|
||||
if instance:
|
||||
logger.info(f"Found matching DB entry for {obj}: {instance}")
|
||||
return instance
|
||||
|
||||
instance = session.query(type(obj)).filter_by(original_url=obj.original_url).first()
|
||||
|
||||
# For Media objects we want to duplicate the entry to preserve the relationship with the post.
|
||||
# However, we also want to avoid rehydration, hence the code below:
|
||||
if instance:
|
||||
logger.info(f"Found matching media record, duplicating and inserting for new post")
|
||||
|
||||
session.expunge(instance)
|
||||
make_transient(instance)
|
||||
instance.id = None
|
||||
instance.post = obj.post
|
||||
instance.raw_id = obj.raw_id
|
||||
|
||||
session.add(instance)
|
||||
session.flush()
|
||||
return instance
|
||||
|
||||
if instance:
|
||||
logger.info(f"Found matching DB entry for {obj}: {instance}")
|
||||
return instance
|
||||
|
||||
if hydrate:
|
||||
obj.hydrate()
|
||||
|
||||
logger.info(f"Inserting new object {obj}")
|
||||
session.add(obj)
|
||||
session.flush()
|
||||
return obj
|
||||
|
||||
@logger.catch(reraise=True)
|
||||
def transform_results(self, results: List[ScraperResult], hydrate: bool = True):
|
||||
"""Transforms raw ScraperResults objects into TransformedResult objects and
|
||||
Media objects. Then, adds them to the database.
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
results : List[ScraperResult]
|
||||
@@ -126,34 +155,18 @@ class ETLController:
|
||||
handled = True
|
||||
session = self.session()
|
||||
|
||||
transformed = transformer.transform(result)
|
||||
|
||||
session.add(transformed)
|
||||
session.flush()
|
||||
|
||||
media = transformer.transform_media(result, transformed)
|
||||
|
||||
count = 0
|
||||
for obj in media:
|
||||
if hydrate:
|
||||
logger.info(f"Hydrating {obj}")
|
||||
obj.hydrate()
|
||||
|
||||
session.add(obj)
|
||||
count += 1
|
||||
|
||||
transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate))
|
||||
session.commit()
|
||||
logger.info(f"{transformer} generated {count} media objects")
|
||||
break
|
||||
|
||||
if handled == False:
|
||||
logger.warning(f"No Transformer could handle {result}")
|
||||
|
||||
@logger.catch(reraise = True)
|
||||
@logger.catch(reraise=True)
|
||||
def transform_all_untransformed(self, hydrate: bool = True):
|
||||
"""Transform all ScraperResult objects in the database that do not have an
|
||||
equivalent TransformedResult object stored.
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
hydrate : bool
|
||||
@@ -165,7 +178,12 @@ class ETLController:
|
||||
return
|
||||
|
||||
session = self.session()
|
||||
untransformed = session.query(ScraperResult).join(TransformedResult, isouter=True).where(TransformedResult.raw_id == None).all()
|
||||
untransformed = (
|
||||
session.query(ScraperResult)
|
||||
.join(TransformedResult, isouter=True)
|
||||
.where(TransformedResult.raw_id == None)
|
||||
.all()
|
||||
)
|
||||
logger.info(f"Found {len(untransformed)} items to ETL")
|
||||
|
||||
self.transform_results(untransformed, hydrate=hydrate)
|
||||
self.transform_results(untransformed, hydrate=hydrate)
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
import json
|
||||
from loguru import logger
|
||||
from typing import Generator
|
||||
from typing import Generator, Union, Callable
|
||||
import dateutil.parser
|
||||
|
||||
from cisticola.transformer.base import Transformer
|
||||
from cisticola.base import ScraperResult, TransformedResult, Image, Video, Media
|
||||
from cisticola.base import ScraperResult, TransformedResult, Image, Video, Media, Channel
|
||||
|
||||
class TwitterTransformer(Transformer):
|
||||
"""A Twitter specific ScraperResult, with a method ETL/transforming"""
|
||||
@@ -17,11 +18,9 @@ class TwitterTransformer(Transformer):
|
||||
|
||||
return False
|
||||
|
||||
def transform_media(self, data: ScraperResult, transformed: TransformedResult) -> Generator[Media, None, None]:
|
||||
raw = json.loads(data.raw_data)
|
||||
|
||||
if raw['media']:
|
||||
for media in raw['media']:
|
||||
def process_media(self, tweet, post_id, data):
|
||||
if tweet['media']:
|
||||
for media in tweet['media']:
|
||||
orig = None
|
||||
|
||||
if media["_type"] == "snscrape.modules.twitter.Photo":
|
||||
@@ -40,26 +39,77 @@ class TwitterTransformer(Transformer):
|
||||
new = data.archived_urls[orig]
|
||||
|
||||
if media["_type"] == "snscrape.modules.twitter.Photo":
|
||||
m = Image(url=new, post=transformed.id, raw_id=data.id, original_url=orig)
|
||||
m = Image(url=new, post=post_id, raw_id=data.id, original_url=orig)
|
||||
else:
|
||||
m = Video(url=new, post=transformed.id, raw_id=data.id, original_url=orig)
|
||||
m = Video(url=new, post=post_id, raw_id=data.id, original_url=orig)
|
||||
|
||||
yield m
|
||||
|
||||
def transform(self, data: ScraperResult) -> TransformedResult:
|
||||
|
||||
def transform(self, data: ScraperResult, insert: Callable) -> Generator[Union[TransformedResult, Channel, Media], None, None]:
|
||||
raw = json.loads(data.raw_data)
|
||||
|
||||
transformed = TransformedResult(
|
||||
raw_id=data.id,
|
||||
platform_id=raw['id'],
|
||||
scraper=data.scraper,
|
||||
transformer=self.__version__,
|
||||
platform=data.platform,
|
||||
channel=data.channel,
|
||||
date=data.date,
|
||||
date=dateutil.parser.parse(raw['date']),
|
||||
date_archived=data.date_archived,
|
||||
url=raw['url'],
|
||||
content=raw['content'],
|
||||
author_id=raw['user']['id'],
|
||||
author_username=raw['user']['username'])
|
||||
|
||||
return transformed
|
||||
def subtweet(tweet):
|
||||
channel = Channel(
|
||||
name=tweet['user']['displayname'],
|
||||
platform_id=tweet['user']['id'],
|
||||
platform=data.platform,
|
||||
url=tweet['user']['url'],
|
||||
screenname=tweet['user']['username'],
|
||||
category='forwarded',
|
||||
source=self.__version__
|
||||
)
|
||||
|
||||
channel = insert(channel)
|
||||
|
||||
original = TransformedResult(
|
||||
raw_id=data.id,
|
||||
platform_id=tweet['id'],
|
||||
scraper=data.scraper,
|
||||
transformer=self.__version__,
|
||||
platform=data.platform,
|
||||
channel=channel.id,
|
||||
date=dateutil.parser.parse(tweet['date']),
|
||||
date_archived=data.date_archived,
|
||||
url=tweet['url'],
|
||||
content=tweet['content'],
|
||||
author_id=tweet['user']['id'],
|
||||
author_username=tweet['user']['username']
|
||||
)
|
||||
|
||||
original = insert(original)
|
||||
transformed.forwarded_from = channel.id
|
||||
transformed.reply_to = original.id
|
||||
|
||||
media = self.process_media(tweet, original.id, data)
|
||||
for m in media:
|
||||
insert(m)
|
||||
|
||||
if raw['retweetedTweet'] is not None:
|
||||
subtweet(raw['retweetedTweet'])
|
||||
|
||||
if raw['quotedTweet'] is not None:
|
||||
subtweet(raw['quotedTweet'])
|
||||
|
||||
insert(transformed)
|
||||
|
||||
media = self.process_media(raw, transformed.id, data)
|
||||
for m in media:
|
||||
insert(m)
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user