diff --git a/cisticola/base.py b/cisticola/base.py index 66500ea..670c8ab 100644 --- a/cisticola/base.py +++ b/cisticola/base.py @@ -149,6 +149,9 @@ class Post: #: Datetime (relative to UTC) that the scraped post was archived at. date_archived: datetime + + #: Datetime (UTC) that the scraped post was transformed at. + date_transformed: datetime #: URL of the original post url: str @@ -330,7 +333,7 @@ raw_posts_table = Table('raw_posts', mapper_registry.metadata, Column('scraper', String), Column('platform', String), Column('channel', Integer, ForeignKey('channels.id'), index=True), - Column('platform_id', String), + Column('platform_id', String, index=True), Column('date', DateTime, index=True), Column('raw_data', String), Column('date_archived', DateTime, index=True), @@ -365,19 +368,20 @@ post_table = Table('posts', mapper_registry.metadata, Column('id', Integer, primary_key=True, autoincrement=True), Column('raw_id', Integer, ForeignKey('raw_posts.id'), index=True), - Column('platform_id', Integer), + Column('platform_id', Integer, index=True), Column('scraper', String), Column('transformer', String), Column('platform', String), Column('channel', Integer, ForeignKey('channels.id'), index=True), Column('date', DateTime, index=True), Column('date_archived', DateTime, index=True), + Column('date_transformed', DateTime, index=True), Column('url', String), Column('author_id', String), Column('author_username', String), Column('content', String), Column('forwarded_from', Integer, ForeignKey('channels.id'), index=True), - Column('reply_to', Integer, ForeignKey('posts.id'), index=True), + Column('reply_to', Integer, ForeignKey('posts.id', ondelete="CASCADE"), index=True), Column('named_entities', JSON), Column('cryptocurrency_addresses', JSON), Column('hashtags', JSON), diff --git a/cisticola/transformer/base.py b/cisticola/transformer/base.py index f1af379..fa61b49 100644 --- a/cisticola/transformer/base.py +++ b/cisticola/transformer/base.py @@ -2,7 +2,9 @@ from typing import List, Generator, Union, Callable from loguru import logger from sqlalchemy.orm import sessionmaker, make_transient from sqlalchemy.engine.base import Engine +from sqlalchemy.sql.expression import func from collections import defaultdict +from datetime import datetime from cisticola.base import ScraperResult, Post, Media, Channel, mapper_registry @@ -130,8 +132,9 @@ class ETLController: if hydrate: obj.hydrate() - # logger.info(f"Inserting new object {obj}") session.add(obj) + logger.trace(f"Inserted new object {obj}") + return obj @logger.catch(reraise=True) @@ -161,7 +164,7 @@ class ETLController: handled = True transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate), session) - + session.commit() break @@ -190,8 +193,9 @@ class ETLController: .filter(ScraperResult.raw_data.notlike("%MessageService%")) .join(Post, isouter=True) .where(Post.raw_id == None) + # .order_by(func.random()) .order_by(ScraperResult.date.asc()) - .limit(50000) + .limit(100000) .all() ) logger.info(f"Found {len(untransformed)} items to ETL") diff --git a/cisticola/transformer/telegram_telethon.py b/cisticola/transformer/telegram_telethon.py index 9190c75..b1fa3ff 100644 --- a/cisticola/transformer/telegram_telethon.py +++ b/cisticola/transformer/telegram_telethon.py @@ -9,6 +9,7 @@ import time from telethon.sync import TelegramClient from telethon.errors.rpcerrorlist import ChannelPrivateError, ChannelInvalidError import os +from datetime import datetime, timezone from cisticola.transformer.base import Transformer from cisticola.base import ScraperResult, Post, Image, Video, Media, Channel @@ -41,6 +42,9 @@ class TelegramTelethonTransformer(Transformer): except ChannelInvalidError: logger.info("ChannelInvalidError") return ("", "", "ChannelInvalidError") + except ValueError: + logger.info("ValueError") + return ("", "", "ValueError") def get_name_from_web_interface(self, orig_screenname, id): url = "https://t.me/s/" + orig_screenname + "/" + str(id) @@ -102,7 +106,8 @@ class TelegramTelethonTransformer(Transformer): if name == "": logger.info("Trying fallback web interface") orig_channel = session.query(Channel).filter_by(id=data.channel).first() - name = self.get_name_from_web_interface(orig_channel.screenname, raw['id']) + if orig_channel.screenname is not None: + name = self.get_name_from_web_interface(orig_channel.screenname, raw['id']) channel = Channel( name=name, @@ -138,6 +143,7 @@ class TelegramTelethonTransformer(Transformer): channel=data.channel, date=dateutil.parser.parse(raw['date']), date_archived=data.date_archived, + date_transformed=datetime.now(timezone.utc), url="", content=raw['message'], author_id=raw['post_author'],