Add new function for insert post (faster/bulk)

This commit is contained in:
Logan Williams
2023-05-04 14:04:55 +02:00
parent 2320ea1efd
commit ebbc6b69dd
11 changed files with 209 additions and 150 deletions

View File

@@ -75,6 +75,8 @@ class ETLController:
for analysis by using Transformer objects that have been registered with the controller.
"""
posts_to_insert = []
def __init__(self):
self.transformers = []
@@ -107,6 +109,29 @@ class ETLController:
self.session = sessionmaker()
self.session.configure(bind=engine)
# MAY4 can try adding some new functions for batching post inserts
def flush_posts(self, session):
session.bulk_save_objects(self.posts_to_insert)
logger.info(f"Bulk saved {len(self.posts_to_insert)} posts")
self.posts_to_insert = []
def insert_post(self, obj, session, hydrate: bool = True, flush: bool = False):
if hydrate and type(obj) != Video:
obj.hydrate()
if flush:
self.flush_posts()
session.add(obj)
session.flush()
logger.trace(f"Inserted new object {obj}")
return obj
else:
self.posts_to_insert.append(obj)
return None
def insert_or_select(self, obj, session, hydrate: bool = True):
"""Inserts an object into the database or returns an existing object from the database.
Regardless, the resulting object has an `id` attribute that can be referenced later."""
@@ -122,7 +147,7 @@ class ETLController:
(Channel.platform==obj.platform)).first()
elif type(obj) == Post:
instance = None
return self.insert_post(obj, session, hydrate)
# instance = session.query(Post).filter_by(platform=obj.platform, platform_id=obj.platform_id).first()
elif issubclass(type(obj), Media):
@@ -216,7 +241,7 @@ class ETLController:
if handled == False:
logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})")
@logger.catch(reraise=True)
def transform_all_untransformed(self, hydrate: bool = True, min_id=0):
"""Transform all ScraperResult objects in the database that do not have an

View File

@@ -107,8 +107,8 @@ class BitchuteTransformer(Transformer):
video_title = raw['subject'],
video_duration = _parse_duration_str(raw['length']))
# insert_post
transformed = insert(transformed)
session.flush()
def parse_created(created: str, date_archived: datetime) -> datetime:
"""Convert a created string (e.g. ``"1 year, 10 months ago"``) to a datetime

View File

@@ -79,12 +79,9 @@ class RumbleTransformer(Transformer):
video_title = raw['title'],
video_duration=_parse_duration_str(raw['duration']))
# insert_post
insert(transformed)
# media = self.process_media(raw, transformed.id, data)
# for m in media:
# insert(m)
def _process_number(s):
if s is None:

View File

@@ -9,7 +9,7 @@ import time
from telethon.sync import TelegramClient
from telethon.errors.rpcerrorlist import ChannelPrivateError, ChannelInvalidError
from telethon.tl import types
from telethon.helpers import add_surrogate
from telethon.helpers import add_surrogate, del_surrogate
import os
from datetime import datetime, timezone
@@ -270,12 +270,23 @@ class TelegramTelethonTransformer(Transformer):
views = raw.get('views')
)
transformed = insert(transformed)
# insert_post
insert(transformed)
def stripped(s):
"""https://stackoverflow.com/a/29933716"""
lstripped = ''.join(takewhile(str.isspace, s))
rstripped = ''.join(reversed(tuple(takewhile(str.isspace, reversed(s)))))
return lstripped + rstripped
def add_markdown_links(raw_post):
"""This function is necessary because Telethon's markdown.unparse doesn't
correctly handle trailing whitespace or multi-line links"""
global_offset = 0
transformed_content = raw_post['message']
transformed_content = add_surrogate(raw_post['message'])
links = [entity for entity in raw_post['entities'] if entity['_'] == 'MessageEntityTextUrl']
for link in links:
@@ -284,12 +295,18 @@ def add_markdown_links(raw_post):
url = link['url']
before_link = transformed_content[:offset]
link_text = f"[{transformed_content[offset:offset+length].strip()}]"
trailing_whitespace = ''.join([c for c in transformed_content[offset:offset+length] if c.isspace()])
link_href = f"({url})"
after_link = transformed_content[offset+length:]
transformed_content = before_link + link_text + link_href + trailing_whitespace + after_link
global_offset += (4 + len(url))
inner_text = transformed_content[offset:offset+length]
return transformed_content
# skip creation of link if inner link text is only whitespace
if inner_text.replace('\u200b', '').strip():
processed_inner_text = inner_text.strip().replace('\n', '\\\n')
link_text = f"[{processed_inner_text}]"
trailing_whitespace = stripped(transformed_content[offset:offset+length])
link_href = f"({url})"
after_link = transformed_content[offset+length:]
transformed_content = before_link + link_text + link_href + trailing_whitespace + after_link
global_offset += (4 + len(url) + inner_text.strip().count('\n'))
return del_surrogate(transformed_content)

View File

@@ -133,8 +133,5 @@ class TwitterTransformer(Transformer):
if raw['quotedTweet'] is not None:
subtweet(raw['quotedTweet'])
insert(transformed)
media = self.process_media(raw, transformed.id, data)
for m in media:
insert(m)
#insert_post
insert(transformed)

View File

@@ -66,6 +66,7 @@ class VkontakteTransformer(Transformer):
outlinks =list(filter(None, raw["outlinks"])) if raw['outlinks'] else [],
)
# insert_post
insert(transformed)
# media = self.process_media(raw, transformed.id, data)