Add rather hacky bulk insert functionality

This commit is contained in:
Logan Williams
2023-05-04 15:26:52 +02:00
parent f9bf2bc2ee
commit 91de6482e0
8 changed files with 18 additions and 14 deletions

2
app.py
View File

@@ -117,7 +117,7 @@ def transform(args):
if args.min_date:
min_date = datetime.datetime.fromisoformat(args.min_date)
else:
min_date = 0
min_date = datetime.datetime(1970, 1, 1)
controller.transform_all_untransformed(min_date=min_date)

View File

@@ -234,13 +234,14 @@ class ETLController:
logger.trace(f"{transformer} is handling result {result.id} ({result.date})")
handled = True
transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate), session)
transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate), session, lambda obj: self.insert_post(obj, session, hydrate, flush=False), lambda: self.flush_posts(session))
break
if handled == False:
logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})")
self.flush_posts(session)
session.commit()

View File

@@ -56,7 +56,7 @@ class BitchuteTransformer(Transformer):
transformed = insert(transformed)
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def transform(self, data: ScraperResult, insert: Callable, session, insert_post, flush_posts) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
if raw['category'] == 'comment':
@@ -64,6 +64,7 @@ class BitchuteTransformer(Transformer):
reply_to_id = raw['thread_id']
else:
reply_to_id = raw['parent_id']
flush_posts()
post = session.query(Post).filter_by(channel=data.channel, platform_id=reply_to_id).first()
if post is None:
if raw['parent_id'] is not None:
@@ -108,7 +109,7 @@ class BitchuteTransformer(Transformer):
video_duration = _parse_duration_str(raw['length']))
# insert_post
transformed = insert(transformed)
transformed = insert_post(transformed)
def parse_created(created: str, date_archived: datetime) -> datetime:
"""Convert a created string (e.g. ``"1 year, 10 months ago"``) to a datetime

View File

@@ -81,7 +81,7 @@ class GettrTransformer(Transformer):
return channel.id
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def transform(self, data: ScraperResult, insert: Callable, session, insert_post, flush_posts) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
if raw["activity"]["action"] == "shares_pst":
@@ -119,7 +119,8 @@ class GettrTransformer(Transformer):
views = raw.get('vfpst')
)
insert(transformed)
# insert_post
insert_post(transformed)
# media = self.process_media(raw, transformed.id, data)
# for m in media:

View File

@@ -57,7 +57,7 @@ class RumbleTransformer(Transformer):
transformed = insert(transformed)
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def transform(self, data: ScraperResult, insert: Callable, session, insert_post, flush_posts) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
transformed = Post(
@@ -80,7 +80,7 @@ class RumbleTransformer(Transformer):
video_duration=_parse_duration_str(raw['duration']))
# insert_post
insert(transformed)
insert_post(transformed)
def _process_number(s):

View File

@@ -159,7 +159,7 @@ class TelegramTelethonTransformer(Transformer):
insert(new_chat)
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def transform(self, data: ScraperResult, insert: Callable, session, insert_post, flush_posts) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
if raw['_'] != 'Message':
@@ -204,6 +204,7 @@ class TelegramTelethonTransformer(Transformer):
if raw['reply_to']:
reply_to_id = str(raw['reply_to']['reply_to_msg_id'])
session.commit()
flush_posts()
post = session.query(Post).filter_by(channel=data.channel, platform_id=reply_to_id).first()
if post is None:
reply_to = -1
@@ -286,7 +287,7 @@ class TelegramTelethonTransformer(Transformer):
)
# insert_post
insert(transformed)
insert_post(transformed)
def stripped(s):
"""https://stackoverflow.com/a/29933716"""

View File

@@ -72,7 +72,7 @@ class TwitterTransformer(Transformer):
transformed = insert(transformed)
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def transform(self, data: ScraperResult, insert: Callable, session, insert_post, flush_posts) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
transformed = Post(
@@ -134,4 +134,4 @@ class TwitterTransformer(Transformer):
subtweet(raw['quotedTweet'])
#insert_post
insert(transformed)
insert_post(transformed)

View File

@@ -46,7 +46,7 @@ class VkontakteTransformer(Transformer):
transformed = insert(transformed)
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def transform(self, data: ScraperResult, insert: Callable, session, insert_post, flush_posts) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
transformed = Post(
@@ -67,7 +67,7 @@ class VkontakteTransformer(Transformer):
)
# insert_post
insert(transformed)
insert_post(transformed)
# media = self.process_media(raw, transformed.id, data)
# for m in media: