mirror of
https://github.com/bellingcat/cisticola.git
synced 2026-06-11 21:08:34 +03:00
Add date_transformed; refinements to telethon transformer
This commit is contained in:
@@ -149,6 +149,9 @@ class Post:
|
||||
|
||||
#: Datetime (relative to UTC) that the scraped post was archived at.
|
||||
date_archived: datetime
|
||||
|
||||
#: Datetime (UTC) that the scraped post was transformed at.
|
||||
date_transformed: datetime
|
||||
|
||||
#: URL of the original post
|
||||
url: str
|
||||
@@ -330,7 +333,7 @@ raw_posts_table = Table('raw_posts', mapper_registry.metadata,
|
||||
Column('scraper', String),
|
||||
Column('platform', String),
|
||||
Column('channel', Integer, ForeignKey('channels.id'), index=True),
|
||||
Column('platform_id', String),
|
||||
Column('platform_id', String, index=True),
|
||||
Column('date', DateTime, index=True),
|
||||
Column('raw_data', String),
|
||||
Column('date_archived', DateTime, index=True),
|
||||
@@ -365,19 +368,20 @@ post_table = Table('posts', mapper_registry.metadata,
|
||||
Column('id', Integer, primary_key=True,
|
||||
autoincrement=True),
|
||||
Column('raw_id', Integer, ForeignKey('raw_posts.id'), index=True),
|
||||
Column('platform_id', Integer),
|
||||
Column('platform_id', Integer, index=True),
|
||||
Column('scraper', String),
|
||||
Column('transformer', String),
|
||||
Column('platform', String),
|
||||
Column('channel', Integer, ForeignKey('channels.id'), index=True),
|
||||
Column('date', DateTime, index=True),
|
||||
Column('date_archived', DateTime, index=True),
|
||||
Column('date_transformed', DateTime, index=True),
|
||||
Column('url', String),
|
||||
Column('author_id', String),
|
||||
Column('author_username', String),
|
||||
Column('content', String),
|
||||
Column('forwarded_from', Integer, ForeignKey('channels.id'), index=True),
|
||||
Column('reply_to', Integer, ForeignKey('posts.id'), index=True),
|
||||
Column('reply_to', Integer, ForeignKey('posts.id', ondelete="CASCADE"), index=True),
|
||||
Column('named_entities', JSON),
|
||||
Column('cryptocurrency_addresses', JSON),
|
||||
Column('hashtags', JSON),
|
||||
|
||||
@@ -2,7 +2,9 @@ from typing import List, Generator, Union, Callable
|
||||
from loguru import logger
|
||||
from sqlalchemy.orm import sessionmaker, make_transient
|
||||
from sqlalchemy.engine.base import Engine
|
||||
from sqlalchemy.sql.expression import func
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
|
||||
from cisticola.base import ScraperResult, Post, Media, Channel, mapper_registry
|
||||
|
||||
@@ -130,8 +132,9 @@ class ETLController:
|
||||
if hydrate:
|
||||
obj.hydrate()
|
||||
|
||||
# logger.info(f"Inserting new object {obj}")
|
||||
session.add(obj)
|
||||
logger.trace(f"Inserted new object {obj}")
|
||||
|
||||
return obj
|
||||
|
||||
@logger.catch(reraise=True)
|
||||
@@ -161,7 +164,7 @@ class ETLController:
|
||||
handled = True
|
||||
|
||||
transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate), session)
|
||||
|
||||
|
||||
session.commit()
|
||||
break
|
||||
|
||||
@@ -190,8 +193,9 @@ class ETLController:
|
||||
.filter(ScraperResult.raw_data.notlike("%MessageService%"))
|
||||
.join(Post, isouter=True)
|
||||
.where(Post.raw_id == None)
|
||||
# .order_by(func.random())
|
||||
.order_by(ScraperResult.date.asc())
|
||||
.limit(50000)
|
||||
.limit(100000)
|
||||
.all()
|
||||
)
|
||||
logger.info(f"Found {len(untransformed)} items to ETL")
|
||||
|
||||
@@ -9,6 +9,7 @@ import time
|
||||
from telethon.sync import TelegramClient
|
||||
from telethon.errors.rpcerrorlist import ChannelPrivateError, ChannelInvalidError
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from cisticola.transformer.base import Transformer
|
||||
from cisticola.base import ScraperResult, Post, Image, Video, Media, Channel
|
||||
@@ -41,6 +42,9 @@ class TelegramTelethonTransformer(Transformer):
|
||||
except ChannelInvalidError:
|
||||
logger.info("ChannelInvalidError")
|
||||
return ("", "", "ChannelInvalidError")
|
||||
except ValueError:
|
||||
logger.info("ValueError")
|
||||
return ("", "", "ValueError")
|
||||
|
||||
def get_name_from_web_interface(self, orig_screenname, id):
|
||||
url = "https://t.me/s/" + orig_screenname + "/" + str(id)
|
||||
@@ -102,7 +106,8 @@ class TelegramTelethonTransformer(Transformer):
|
||||
if name == "":
|
||||
logger.info("Trying fallback web interface")
|
||||
orig_channel = session.query(Channel).filter_by(id=data.channel).first()
|
||||
name = self.get_name_from_web_interface(orig_channel.screenname, raw['id'])
|
||||
if orig_channel.screenname is not None:
|
||||
name = self.get_name_from_web_interface(orig_channel.screenname, raw['id'])
|
||||
|
||||
channel = Channel(
|
||||
name=name,
|
||||
@@ -138,6 +143,7 @@ class TelegramTelethonTransformer(Transformer):
|
||||
channel=data.channel,
|
||||
date=dateutil.parser.parse(raw['date']),
|
||||
date_archived=data.date_archived,
|
||||
date_transformed=datetime.now(timezone.utc),
|
||||
url="",
|
||||
content=raw['message'],
|
||||
author_id=raw['post_author'],
|
||||
|
||||
Reference in New Issue
Block a user