added capability to retransform/update posts in database

This commit is contained in:
Tristan Lee
2022-10-26 07:20:19 -05:00
parent 0c2360c1dd
commit f29da4d5f3
10 changed files with 228 additions and 16 deletions

9
app.py
View File

@@ -113,6 +113,12 @@ def transform_media(args):
controller = get_transformer_controller()
controller.transform_all_untransformed_media()
def retransform(args):
logger.info(f"Transforming untransformed posts")
controller = get_transformer_controller()
controller.retransform_all(query_kwargs = {'platform': 'Telegram'})
def init_db():
engine = create_engine(os.environ["DB"])
mapper_registry.metadata.create_all(bind=engine)
@@ -162,5 +168,8 @@ if __name__ == "__main__":
elif args.command == "transform-media":
logger.add("logs/transform-media.log", level="TRACE", rotation="100 MB")
transform_media(args)
elif args.command == "retransform":
logger.add("logs/retransform.log", level="TRACE", rotation="100 MB")
retransform(args)
else:
logger.error(f"Unrecognized command {args.command}")

View File

@@ -475,7 +475,7 @@ channel_table = Table('channels', mapper_registry.metadata,
Column('platform', String),
Column('url', String),
Column('screenname', String),
Column('country', String),
Column('country', JSON),
Column('influencer', String),
Column('public', Boolean),
Column('chat', Boolean),
@@ -511,7 +511,7 @@ post_table = Table('posts', mapper_registry.metadata,
Column('views', Integer),
Column('video_title', String),
Column('video_duration', Integer),
Column('detected_language', String),
Column('detected_language', String, index = True),
Column('normalized_content', String)
)

View File

@@ -1,4 +1,4 @@
from typing import List, Generator, Union, Callable
from typing import List, Generator, Union, Callable, Tuple
from loguru import logger
from sqlalchemy import cast, String
from sqlalchemy.orm import sessionmaker, make_transient
@@ -6,6 +6,7 @@ from sqlalchemy.engine.base import Engine
from sqlalchemy.sql.expression import func
from collections import defaultdict
from datetime import datetime, timezone
import dataclasses
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Media, Channel, mapper_registry, Image, Video, Audio
@@ -35,10 +36,8 @@ class Transformer:
pass
def transform(data: ScraperResult, insert: Callable) -> Generator[Union[Post, Channel, Media], None, None]:
"""Transform a ScraperResult into objects with additional parameters for analysis. This function can
yield multiple objects, as it will find references to quoted/replied posts, media objects, and Channel
objects and provide all of these to be inserted into the database.
def get_transformed_post(data: ScraperResult, insert: Callable) -> Post:
"""Return the transformed Post from a ScraperResult.
Parameters
----------
@@ -51,6 +50,21 @@ class Transformer:
pass
def transform(self, data: ScraperResult, insert: Callable, session) -> None:
"""Transform a ScraperResult into objects with additional parameters for analysis.
Parameters
----------
data : ScraperResult
The ScraperResult object to process.
insert : Callable
A function that either inserts the object into a database or finds an object with the
relevant unique constraints if applicable.
"""
transformed = self.get_transformed_post(data = data, insert = insert, session = session)
transformed = insert(transformed)
def transform_media(self, data: ScraperResult, transformed: Post, insert: Callable):
'''Transform media'''
for k in data.archived_urls:
@@ -195,6 +209,42 @@ class ETLController:
if handled == False:
logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})")
@logger.catch(reraise=True)
def retransform_results(self, results: List[Tuple[ScraperResult, Post]], columns: List[str] = None, hydrate: bool = True):
if self.session is None:
logger.error("No DB session")
return
session = self.session()
# default to updating all fields
if 'date_transformed' not in columns:
columns.append('date_transformed')
for result, old_post in results:
if result.scraper is not None and result.platform is not None:
for transformer in self.transformers:
handled = False
if transformer.can_handle(result):
logger.trace(f"{transformer} is handling result {result.id} ({result.date})")
handled = True
new_post = transformer.get_transformed_post(result, lambda obj: self.insert_or_select(obj, session, hydrate), session)
if hydrate:
new_post.hydrate()
for column in columns:
setattr(old_post, column, getattr(new_post, column))
session.commit()
break
if handled == False:
logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})")
@logger.catch(reraise=True)
def transform_all_untransformed(self, hydrate: bool = True):
"""Transform all ScraperResult objects in the database that do not have an
@@ -240,7 +290,49 @@ class ETLController:
.limit(BATCH_SIZE)
).all()
def retransform_all(self, columns = None, query_kwargs: dict = None, hydrate: bool = True):
if self.session is None:
logger.error("No DB session")
return
if columns is None:
columns = [field.name for field in dataclasses.fields(Post)]
elif isinstance(columns, list):
if len(columns) == 0:
raise ValueError('`columns` argument must be non-empty list of strings specifying columns to update')
if query_kwargs is None:
query_kwargs = {}
session = self.session()
BATCH_SIZE = 5000
batch = []
logger.info(f"Fetching first post batch of {BATCH_SIZE} to re-transform")
batch = (session.query(ScraperResult, Post)
.filter(Post.raw_id == ScraperResult.id)
.filter_by(**query_kwargs)
.order_by(ScraperResult.date.asc())
.limit(BATCH_SIZE)
).all()
while len(batch) > 0:
logger.info(f"Found {len(batch)} items to ETL")
self.retransform_results(batch, hydrate=hydrate, columns=columns)
logger.info(f"Fetching posts batch of {BATCH_SIZE} to re-transform, offset {max([raw.date for raw, _ in batch])}")
batch = (session.query(ScraperResult, Post)
.filter(Post.raw_id == ScraperResult.id)
.filter_by(**query_kwargs)
.where(ScraperResult.date >= max([raw.date for raw, _ in batch]))
.order_by(ScraperResult.date.asc())
.limit(BATCH_SIZE)
).all()
@logger.catch(reraise=True)
def transform_info(self, results: List[ChannelInfo]):

View File

@@ -56,7 +56,7 @@ class BitchuteTransformer(Transformer):
transformed = insert(transformed)
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def get_transformed_post(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
if raw['category'] == 'comment':

View File

@@ -81,7 +81,7 @@ class GettrTransformer(Transformer):
return channel.id
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def get_transformed_post(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
if raw["activity"]["action"] == "shares_pst":

View File

@@ -57,7 +57,7 @@ class RumbleTransformer(Transformer):
transformed = insert(transformed)
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def get_transformed_post(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
transformed = Post(

View File

@@ -123,7 +123,7 @@ class TelegramTelethonTransformer(Transformer):
transformed = insert(transformed)
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def get_transformed_post(self, data: ScraperResult, insert: Callable, session) -> Post:
raw = json.loads(data.raw_data)
if raw['_'] != 'Message':
@@ -206,7 +206,7 @@ class TelegramTelethonTransformer(Transformer):
url = ""
author_username = ""
transformed = Post(
return Post(
raw_id = data.id,
platform_id = raw['id'],
scraper = data.scraper,
@@ -227,8 +227,6 @@ class TelegramTelethonTransformer(Transformer):
views = raw.get('views')
)
transformed = insert(transformed)
def stripped(s):
"""https://stackoverflow.com/a/29933716"""

View File

@@ -72,7 +72,7 @@ class TwitterTransformer(Transformer):
transformed = insert(transformed)
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def get_transformed_post(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
transformed = Post(

View File

@@ -46,7 +46,7 @@ class VkontakteTransformer(Transformer):
transformed = insert(transformed)
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def get_transformed_post(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
transformed = Post(

113
retransform.py Normal file
View File

@@ -0,0 +1,113 @@
import argparse
from loguru import logger
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import os
import sys
from cisticola.base import mapper_registry
from cisticola.scraper import (
ScraperController,
VkontakteScraper,
TelegramTelethonScraper,
GettrScraper,
BitchuteScraper,
RumbleScraper,
)
from cisticola.transformer import (
ETLController,
TelegramTelethonTransformer,
GettrTransformer,
RumbleTransformer,
BitchuteTransformer,
VkontakteTransformer,
)
from sync_with_gsheet import sync_channels
def get_db_session():
engine = create_engine(os.environ["DB"])
session_generator = sessionmaker()
session_generator.configure(bind=engine)
session = session_generator()
return session
def get_scraper_controller(telethon_session_name = None):
engine = create_engine(os.environ["DB"])
controller = ScraperController()
controller.connect_to_db(engine)
scrapers = [VkontakteScraper(),
TelegramTelethonScraper(telethon_session_name = telethon_session_name),
GettrScraper(),
BitchuteScraper(),
RumbleScraper()]
controller.register_scrapers(scrapers)
return controller
def get_transformer_controller():
engine = create_engine(os.environ["DB"])
controller = ETLController()
controller.connect_to_db(engine)
transformers = [VkontakteTransformer(),
TelegramTelethonTransformer(),
GettrTransformer(),
BitchuteTransformer(),
RumbleTransformer()]
controller.register_transformers(transformers)
return controller
def scrape_channels(args):
logger.info(f"Scraping channels, media: {args.media}")
controller = get_scraper_controller()
controller.scrape_all_channels(archive_media=args.media)
def scrape_channel_info(args):
logger.info(f"Scraping channel info")
controller = get_scraper_controller()
controller.scrape_all_channel_info()
def archive_media(args):
logger.info(f"Archiving unarchived media")
if args.telethon_session:
controller = get_scraper_controller(telethon_session_name=args.telethon_session)
else:
controller = get_scraper_controller()
if args.chronological:
controller.archive_unarchived_media(chronological=True)
else:
controller.archive_unarchived_media()
def retransform():
logger.info(f"Transforming untransformed posts")
controller = get_transformer_controller()
controller.retransform_all(query_kwargs = {'channel': 6})
def init_db():
engine = create_engine(os.environ["DB"])
mapper_registry.metadata.create_all(bind=engine)
if __name__ == "__main__":
retransform()