From c24babb0812e81d0af9ef2d1a6d055f082bb0452 Mon Sep 17 00:00:00 2001 From: Logan Williams Date: Mon, 4 Jul 2022 14:30:40 +0000 Subject: [PATCH] Fix bugs in Gettr/Rumble transformers, avoid offset in batch requests --- cisticola/transformer/base.py | 36 +++++++++++++++++++++------------ cisticola/transformer/gettr.py | 2 +- cisticola/transformer/rumble.py | 2 +- 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/cisticola/transformer/base.py b/cisticola/transformer/base.py index 32ed37c..20ca1e1 100644 --- a/cisticola/transformer/base.py +++ b/cisticola/transformer/base.py @@ -191,26 +191,36 @@ class ETLController: session = self.session() - BATCH_SIZE = 50000 + BATCH_SIZE = 5000 offset = 0 batch = [] - query = (session.query(ScraperResult) + logger.info(f"Fetching first untransformed post batch of {BATCH_SIZE}") + + batch = (session.query(ScraperResult) .join(Post, isouter=True) .where(Post.raw_id == None) .order_by(ScraperResult.date.asc()) - ) + .limit(BATCH_SIZE) + ).all() - while len(batch) > 0 or offset == 0: - logger.info(f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {offset}") - - batch = query.slice(offset, offset + BATCH_SIZE).all() - offset += BATCH_SIZE - - logger.info(f"Found {len(batch)} items to ETL ({offset} already processed)") + while len(batch) > 0: + logger.info(f"Found {len(batch)} items to ETL") self.transform_results(batch, hydrate=hydrate) + logger.info(f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {max(batch, key=lambda v: v.date).date}") + + batch = (session.query(ScraperResult) + .join(Post, isouter=True) + .where(Post.raw_id == None) + .where(ScraperResult.date >= max(batch, key=lambda v: v.date).date) + .order_by(ScraperResult.date.asc()) + .limit(BATCH_SIZE) + ).all() + + + @logger.catch(reraise=True) def transform_info(self, results: List[ChannelInfo]): if self.session is None: @@ -221,9 +231,9 @@ class ETLController: for result in results: if result.scraper is not None and result.platform is not None: - for transformer in self.transformers: - handled = False + handled = False + for transformer in self.transformers: if transformer.can_handle(result): logger.trace(f"{transformer} is handling raw info result {result.id} ({result.date_archived})") handled = True @@ -245,7 +255,7 @@ class ETLController: session = self.session() - BATCH_SIZE = 50000 + BATCH_SIZE = 10000 offset = 0 batch = [] diff --git a/cisticola/transformer/gettr.py b/cisticola/transformer/gettr.py index 0e0a4d0..b6cf0e7 100644 --- a/cisticola/transformer/gettr.py +++ b/cisticola/transformer/gettr.py @@ -49,7 +49,7 @@ class GettrTransformer(Transformer): def _get_channel_id(self, username: str, category: str, insert: Callable, session): - channel = session.query(Channel).filter(func.lower(Channel.screenname)==func.lower(username), platform = 'Gettr').first() + channel = session.query(Channel).where((func.lower(Channel.screenname)==func.lower(username)) & (Channel.platform == 'Gettr')).first() if channel is None: try: diff --git a/cisticola/transformer/rumble.py b/cisticola/transformer/rumble.py index 79289f5..9c71702 100644 --- a/cisticola/transformer/rumble.py +++ b/cisticola/transformer/rumble.py @@ -90,7 +90,7 @@ def _process_number(s): if s is None: return None else: - s = s.replace(' ', '') + s = s.replace(' ', '').replace(',','') if s.endswith('M'): return int(float(s[:-1]) * 1e6) elif s.endswith('K'):