Fix bugs in Gettr/Rumble transformers, avoid offset in batch requests

This commit is contained in:
Logan Williams
2022-07-04 14:30:40 +00:00
parent ed4723ed1e
commit c24babb081
3 changed files with 25 additions and 15 deletions

View File

@@ -191,26 +191,36 @@ class ETLController:
session = self.session()
BATCH_SIZE = 50000
BATCH_SIZE = 5000
offset = 0
batch = []
query = (session.query(ScraperResult)
logger.info(f"Fetching first untransformed post batch of {BATCH_SIZE}")
batch = (session.query(ScraperResult)
.join(Post, isouter=True)
.where(Post.raw_id == None)
.order_by(ScraperResult.date.asc())
)
.limit(BATCH_SIZE)
).all()
while len(batch) > 0 or offset == 0:
logger.info(f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {offset}")
batch = query.slice(offset, offset + BATCH_SIZE).all()
offset += BATCH_SIZE
logger.info(f"Found {len(batch)} items to ETL ({offset} already processed)")
while len(batch) > 0:
logger.info(f"Found {len(batch)} items to ETL")
self.transform_results(batch, hydrate=hydrate)
logger.info(f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {max(batch, key=lambda v: v.date).date}")
batch = (session.query(ScraperResult)
.join(Post, isouter=True)
.where(Post.raw_id == None)
.where(ScraperResult.date >= max(batch, key=lambda v: v.date).date)
.order_by(ScraperResult.date.asc())
.limit(BATCH_SIZE)
).all()
@logger.catch(reraise=True)
def transform_info(self, results: List[ChannelInfo]):
if self.session is None:
@@ -221,9 +231,9 @@ class ETLController:
for result in results:
if result.scraper is not None and result.platform is not None:
for transformer in self.transformers:
handled = False
handled = False
for transformer in self.transformers:
if transformer.can_handle(result):
logger.trace(f"{transformer} is handling raw info result {result.id} ({result.date_archived})")
handled = True
@@ -245,7 +255,7 @@ class ETLController:
session = self.session()
BATCH_SIZE = 50000
BATCH_SIZE = 10000
offset = 0
batch = []

View File

@@ -49,7 +49,7 @@ class GettrTransformer(Transformer):
def _get_channel_id(self, username: str, category: str, insert: Callable, session):
channel = session.query(Channel).filter(func.lower(Channel.screenname)==func.lower(username), platform = 'Gettr').first()
channel = session.query(Channel).where((func.lower(Channel.screenname)==func.lower(username)) & (Channel.platform == 'Gettr')).first()
if channel is None:
try:

View File

@@ -90,7 +90,7 @@ def _process_number(s):
if s is None:
return None
else:
s = s.replace(' ', '')
s = s.replace(' ', '').replace(',','')
if s.endswith('M'):
return int(float(s[:-1]) * 1e6)
elif s.endswith('K'):