Merge pull request #58 from bellingcat/media-etl

Media ETL
This commit is contained in:
Logan Williams
2022-07-05 11:51:08 +02:00
committed by GitHub
9 changed files with 667 additions and 399 deletions

View File

@@ -29,6 +29,7 @@ ocrd-pyexiftool = "*"
gabber = {git = "https://github.com/stanfordio/gabber.git"}
snscrape = {git = "https://github.com/bellingcat/snscrape"}
polyphemus = {git = "https://github.com/bellingcat/polyphemus"}
filelock = "*"
[dev-packages]
pytest = "*"

862
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

9
app.py
View File

@@ -100,6 +100,12 @@ def transform_info(args):
controller = get_transformer_controller()
controller.transform_all_untransformed_info()
def transform_media(args):
logger.info(f"Transforming untransformed channel media")
controller = get_transformer_controller()
controller.transform_all_untransformed_media()
def init_db():
engine = create_engine(os.environ["DB"])
mapper_registry.metadata.create_all(bind=engine)
@@ -144,5 +150,8 @@ if __name__ == "__main__":
elif args.command == "transform-info":
logger.add("logs/transform-info.log", level="TRACE", rotation="100 MB")
transform_info(args)
elif args.command == "transform-media":
logger.add("logs/transform-media.log", level="TRACE", rotation="100 MB")
transform_media(args)
else:
logger.error(f"Unrecognized command {args.command}")

View File

@@ -326,6 +326,24 @@ class Media:
#: Original URL of the media from the the original post.
original_url: str
#: String specifying name and version of scraper used to generate result, e.g. ``"TwitterScraper 0.0.1"``.
scraper: str
#: String specifying name and version of transformer used to tranform result, e.g. ``"TwitterTransformer 0.0.1"``.
transformer: str
#: Name of platform from which result was scraped, e.g. ``"Twitter"``.
platform: str
#: Datetime (relative to UTC) that the scraped post was created at.
date: datetime
#: Datetime (relative to UTC) that the scraped post was archived at.
date_archived: datetime
#: Datetime (UTC) that the scraped post was transformed at.
date_transformed: datetime
#: JSON dump of the dict containing metadata information for the media file.
exif: str = None
@@ -384,7 +402,14 @@ class Image(Media):
@dataclass
class Video(Media):
"""Class for organizing information about an image file.
"""Class for organizing information about an video file.
"""
pass
@dataclass
class Audio(Media):
"""Class for organizing information about an audio file.
"""
pass
@@ -490,7 +515,13 @@ media_table = Table('media', mapper_registry.metadata,
Column('url', String),
Column('original_url', String),
Column('exif', String),
Column('ocr', String))
Column('ocr', String),
Column('date', DateTime, index=True),
Column('date_archived', DateTime, index=True),
Column('date_transformed', DateTime, index=True),
Column('scraper', String),
Column('transformer', String)
)
mapper_registry.map_imperatively(Post, post_table)
mapper_registry.map_imperatively(Channel, channel_table)
@@ -499,4 +530,5 @@ mapper_registry.map_imperatively(RawChannelInfo, raw_channel_info_table)
mapper_registry.map_imperatively(ChannelInfo, channel_info_table)
mapper_registry.map_imperatively(Media, media_table, polymorphic_on='type', polymorphic_identity='media')
mapper_registry.map_imperatively(Image, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='image')
mapper_registry.map_imperatively(Video, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='video')
mapper_registry.map_imperatively(Video, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='video')
mapper_registry.map_imperatively(Audio, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='audio')

View File

@@ -421,6 +421,9 @@ class ScraperController:
session.commit()
added += 1
if added > 100:
break
session.commit()
logger.info(
f"{scraper} found {added} new posts from {channel}")

View File

@@ -1,12 +1,13 @@
from typing import List, Generator, Union, Callable
from loguru import logger
from sqlalchemy import cast, String
from sqlalchemy.orm import sessionmaker, make_transient
from sqlalchemy.engine.base import Engine
from sqlalchemy.sql.expression import func
from collections import defaultdict
from datetime import datetime
from datetime import datetime, timezone
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Media, Channel, mapper_registry
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Media, Channel, mapper_registry, Image, Video, Audio
class Transformer:
@@ -50,6 +51,24 @@ class Transformer:
pass
def transform_media(self, data: ScraperResult, transformed: Post, insert: Callable):
'''Transform media'''
for k in data.archived_urls:
if data.archived_urls[k]:
archived_url = data.archived_urls[k]
filename = archived_url.split('/')[-1]
ext = None if '.' not in filename else filename.split('.')[-1].lower()
if ext == 'mp4' or ext == 'mov' or ext == 'avi' or ext =='mkv':
insert(Video(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k, date=data.date, date_archived=data.date_archived, date_transformed=datetime.now(timezone.utc), transformer=self.__version__, scraper=data.scraper, platform=data.platform))
elif ext == 'oga' or ext == 'mp3' or ext == "wav" or ext == 'aif' or ext == 'aiff' or ext == 'aac':
insert(Audio(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k, date=data.date, date_archived=data.date_archived, date_transformed=datetime.now(timezone.utc), transformer=self.__version__, scraper=data.scraper, platform=data.platform))
elif ext == 'jpg' or ext == 'jpeg' or ext == 'png' or ext == 'gif' or ext == 'bmp' or ext == 'heic' or ext == 'tiff':
insert(Image(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k, date=data.date, date_archived=data.date_archived, date_transformed=datetime.now(timezone.utc), transformer=self.__version__, scraper=data.scraper, platform=data.platform))
else:
logger.warning(f"Unknown file extension {ext}")
insert(Media(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k, date=data.date, date_archived=data.date_archived, date_transformed=datetime.now(timezone.utc), transformer=self.__version__, scraper=data.scraper, platform=data.platform))
class ETLController:
"""An ETLController will transform raw scraped data (ScrapedResult objects) into a more detailed format
@@ -103,33 +122,35 @@ class ETLController:
# instance = session.query(Post).filter_by(platform=obj.platform, platform_id=obj.platform_id).first()
elif issubclass(type(obj), Media):
instance = session.query(type(obj)).filter_by(original_url=obj.original_url, post=obj.post).first()
if instance:
logger.info(f"Found matching DB entry for {obj}: {instance}")
return instance
instance = None
# instance = session.query(type(obj)).filter_by(original_url=obj.original_url, post=obj.post).first()
# if instance:
# logger.info(f"Found matching DB entry for {obj}: {instance}")
# return instance
instance = session.query(type(obj)).filter_by(original_url=obj.original_url).first()
# instance = session.query(type(obj)).filter_by(original_url=obj.original_url).first()
# For Media objects we want to duplicate the entry to preserve the relationship with the post.
# However, we also want to avoid rehydration, hence the code below:
if instance:
logger.info(f"Found matching media record, duplicating and inserting for new post")
# # For Media objects we want to duplicate the entry to preserve the relationship with the post.
# # However, we also want to avoid rehydration, hence the code below:
# if instance:
# logger.info(f"Found matching media record, duplicating and inserting for new post")
session.expunge(instance)
make_transient(instance)
instance.id = None
instance.post = obj.post
instance.raw_id = obj.raw_id
# session.expunge(instance)
# make_transient(instance)
# instance.id = None
# instance.post = obj.post
# instance.raw_id = obj.raw_id
session.add(instance)
session.flush()
return instance
# session.add(instance)
# session.flush()
# return instance
if instance:
logger.info(f"Found matching DB entry for {obj}: {instance}")
return instance
if hydrate:
# Don't hydrate videos, because they can be quite large and this is time consuming
if hydrate and type(obj) != Video:
obj.hydrate()
session.add(obj)
@@ -274,3 +295,77 @@ class ETLController:
logger.info(f"Found {len(batch)} info items to ETL ({offset} already processed)")
self.transform_info(batch)
@logger.catch(reraise=True)
def transform_media(self, results: List, hydrate: bool = True):
"""Transforms raw ScraperResults objects into Post objects and
Media objects. Then, adds them to the database.
Parameters
----------
results : List[ScraperResult]
A list of ScraperResult objects to be transformed
hydrate : bool
Whether or not to fully hydrate transformed media. Default True.
"""
if self.session is None:
logger.error("No DB session")
return
session = self.session()
for total_result in results:
result = total_result.ScraperResult
if result.scraper is not None and result.platform is not None:
for transformer in self.transformers:
handled = False
if transformer.can_handle(result):
logger.trace(f"{transformer} is handling result {result.id} ({result.date})")
handled = True
transformer.transform_media(result, total_result.Post, lambda obj: self.insert_or_select(obj, session, hydrate))
session.commit()
break
if handled == False:
logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})")
@logger.catch(reraise=True)
def transform_all_untransformed_media(self, hydrate=True):
"""Transform all ScraperResult objects in the database that do not have an
equivalent Post object stored.
Parameters
----------
hydrate : bool
Whether or not to fully hydrate transformed media. Default True.
"""
if self.session is None:
logger.error("No DB session")
return
session = self.session()
BATCH_SIZE = 5000
offset = 0
batch = []
query = (session.query(ScraperResult, Post)
.join(Post)
.join(Media, isouter=True)
.filter((ScraperResult.media_archived != None) & (cast(ScraperResult.archived_urls, String) != '{}') & (Media.id == None))
.order_by(ScraperResult.date.asc())
)
while len(batch) > 0 or offset == 0:
logger.info(f"Fetching untransformed post media batch of {BATCH_SIZE}, offset {offset}")
batch = query.slice(offset, offset + BATCH_SIZE).all()
offset += BATCH_SIZE
logger.info(f"Found {len(batch)} items to ETL ({offset} already processed)")
self.transform_media(batch, hydrate=hydrate)

View File

@@ -21,13 +21,13 @@ class BitchuteTransformer(Transformer):
return False
def transform_media(self, data: ScraperResult, insert: Callable, transformed: Post) -> Generator[Media, None, None]:
def transform_media(self, data: ScraperResult, transformed: Post, insert: Callable) -> Generator[Media, None, None]:
raw = json.loads(data.raw_data)
orig = raw['video_url']
new = data.archived_urls[orig]
m = Video(url=new, post=transformed.id, raw_id=data.id, original_url=orig)
m = Video(url=new, post=transformed.id, raw_id=data.id, original_url=orig, date=data.date, date_archived=data.date_archived, date_transformed=datetime.now(timezone.utc), transformer=self.__version__, scraper=data.scraper, platform=data.platform)
insert(m)

View File

@@ -16,7 +16,7 @@ from datetime import datetime, timezone
from sqlalchemy import func
from cisticola.transformer.base import Transformer
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Image, Video, Media, Channel
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Image, Video, Audio, Media, Channel
class TelegramTelethonTransformer(Transformer):
@@ -227,16 +227,6 @@ class TelegramTelethonTransformer(Transformer):
transformed = insert(transformed)
# for k in data.archived_urls:
# if data.archived_urls[k]:
# archived_url = data.archived_urls[k]
# ext = archived_url.split('.')[-1]
# if ext == 'mp4' or ext == 'mov' or ext == 'avi' or ext =='mkv':
# insert(Video(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))
# else:
# insert(Image(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))
def add_markdown_links(raw_post):
global_offset = 0
@@ -257,4 +247,4 @@ def add_markdown_links(raw_post):
transformed_content = before_link + link_text + link_href + trailing_whitespace + after_link
global_offset += (4 + len(url))
return transformed_content
return transformed_content

0
spacy_setup.sh Normal file → Executable file
View File