Media archiving ETL working for Telegram

This commit is contained in:
Logan Williams
2022-06-08 16:41:46 +02:00
parent ed4723ed1e
commit 9948af2c4a
4 changed files with 111 additions and 14 deletions

9
app.py
View File

@@ -100,6 +100,12 @@ def transform_info(args):
controller = get_transformer_controller()
controller.transform_all_untransformed_info()
def transform_media(args):
logger.info(f"Transforming untransformed channel media")
controller = get_transformer_controller()
controller.transform_all_untransformed_media()
def init_db():
engine = create_engine(os.environ["DB"])
mapper_registry.metadata.create_all(bind=engine)
@@ -144,5 +150,8 @@ if __name__ == "__main__":
elif args.command == "transform-info":
logger.add("logs/transform-info.log", level="TRACE", rotation="100 MB")
transform_info(args)
elif args.command == "transform-media":
logger.add("logs/transform-media.log", level="TRACE", rotation="100 MB")
transform_media(args)
else:
logger.error(f"Unrecognized command {args.command}")

View File

@@ -384,7 +384,14 @@ class Image(Media):
@dataclass
class Video(Media):
"""Class for organizing information about an image file.
"""Class for organizing information about an video file.
"""
pass
@dataclass
class Audio(Media):
"""Class for organizing information about an audio file.
"""
pass
@@ -499,4 +506,5 @@ mapper_registry.map_imperatively(RawChannelInfo, raw_channel_info_table)
mapper_registry.map_imperatively(ChannelInfo, channel_info_table)
mapper_registry.map_imperatively(Media, media_table, polymorphic_on='type', polymorphic_identity='media')
mapper_registry.map_imperatively(Image, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='image')
mapper_registry.map_imperatively(Video, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='video')
mapper_registry.map_imperatively(Video, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='video')
mapper_registry.map_imperatively(Audio, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='audio')

View File

@@ -264,3 +264,76 @@ class ETLController:
logger.info(f"Found {len(batch)} info items to ETL ({offset} already processed)")
self.transform_info(batch)
@logger.catch(reraise=True)
def transform_media(self, results: List, hydrate: bool = True):
"""Transforms raw ScraperResults objects into Post objects and
Media objects. Then, adds them to the database.
Parameters
----------
results : List[ScraperResult]
A list of ScraperResult objects to be transformed
hydrate : bool
Whether or not to fully hydrate transformed media. Default True.
"""
if self.session is None:
logger.error("No DB session")
return
session = self.session()
for total_result in results:
result = total_result.ScraperResult
if result.scraper is not None and result.platform is not None:
for transformer in self.transformers:
handled = False
if transformer.can_handle(result):
logger.trace(f"{transformer} is handling result {result.id} ({result.date})")
handled = True
transformer.transform_media(result, total_result.Post, lambda obj: self.insert_or_select(obj, session, hydrate), session)
session.commit()
break
if handled == False:
logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})")
@logger.catch(reraise=True)
def transform_all_untransformed_media(self, hydrate=True):
"""Transform all ScraperResult objects in the database that do not have an
equivalent Post object stored.
Parameters
----------
hydrate : bool
Whether or not to fully hydrate transformed media. Default True.
"""
if self.session is None:
logger.error("No DB session")
return
session = self.session()
BATCH_SIZE = 50000
offset = 0
batch = []
query = (session.query(ScraperResult, Post)
.join(Post)
.filter((ScraperResult.media_archived != None) & (ScraperResult.archived_urls != '{}'))
.order_by(ScraperResult.date.asc())
)
while len(batch) > 0 or offset == 0:
logger.info(f"Fetching untransformed post media batch of {BATCH_SIZE}, offset {offset}")
batch = query.slice(offset, offset + BATCH_SIZE).all()
offset += BATCH_SIZE
logger.info(f"Found {len(batch)} items to ETL ({offset} already processed)")
self.transform_media(batch, hydrate=hydrate)

View File

@@ -16,7 +16,7 @@ from datetime import datetime, timezone
from sqlalchemy import func
from cisticola.transformer.base import Transformer
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Image, Video, Media, Channel
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Image, Video, Audio, Media, Channel
class TelegramTelethonTransformer(Transformer):
@@ -121,6 +121,23 @@ class TelegramTelethonTransformer(Transformer):
transformed = insert(transformed)
def transform_media(self, data: ScraperResult, transformed: Post, insert: Callable, session):
for k in data.archived_urls:
if data.archived_urls[k]:
archived_url = data.archived_urls[k]
filename = archived_url.split('/')[-1]
ext = None if '.' not in filename else filename.split('.')[-1].lower()
if ext == 'mp4' or ext == 'mov' or ext == 'avi' or ext =='mkv':
insert(Video(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))
elif ext == 'oga' or ext == 'mp3' or ext == "wav" or ext == 'aif' or ext == 'aiff' or ext == 'aac':
insert(Audio(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))
elif ext == 'jpg' or ext == 'jpeg' or ext == 'png' or ext == 'gif' or ext == 'bmp' or ext == 'heic' or ext == 'tiff':
insert(Image(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))
else:
logger.warning(f"Unknown file extension {ext}")
insert(Media(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
@@ -227,16 +244,6 @@ class TelegramTelethonTransformer(Transformer):
transformed = insert(transformed)
# for k in data.archived_urls:
# if data.archived_urls[k]:
# archived_url = data.archived_urls[k]
# ext = archived_url.split('.')[-1]
# if ext == 'mp4' or ext == 'mov' or ext == 'avi' or ext =='mkv':
# insert(Video(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))
# else:
# insert(Image(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))
def add_markdown_links(raw_post):
global_offset = 0
@@ -257,4 +264,4 @@ def add_markdown_links(raw_post):
transformed_content = before_link + link_text + link_href + trailing_whitespace + after_link
global_offset += (4 + len(url))
return transformed_content
return transformed_content