Ad hoc changes to transformers

This commit is contained in:
Logan Williams
2022-04-16 13:46:26 +00:00
parent 4c221d1133
commit 8535a87def
3 changed files with 1230 additions and 70 deletions

1193
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -94,7 +94,7 @@ class ETLController:
# This is using some adhoc unique constraints that might be worth formalizing at some point
if type(obj) == Channel:
instance = session.query(Channel).filter_by(url=obj.url, platform_id=obj.platform_id, platform=obj.platform).first()
instance = session.query(Channel).filter_by(url=obj.url, platform_id=str(obj.platform_id), platform=obj.platform).first()
elif type(obj) == Post:
instance = None
@@ -130,9 +130,8 @@ class ETLController:
if hydrate:
obj.hydrate()
logger.info(f"Inserting new object {obj}")
# logger.info(f"Inserting new object {obj}")
session.add(obj)
session.flush()
return obj
@logger.catch(reraise=True)
@@ -151,6 +150,8 @@ class ETLController:
logger.error("No DB session")
return
session = self.session()
for result in results:
for transformer in self.transformers:
handled = False
@@ -158,9 +159,9 @@ class ETLController:
if transformer.can_handle(result):
logger.trace(f"{transformer} is handling result {result}")
handled = True
session = self.session()
transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate), session)
session.commit()
break
@@ -185,9 +186,12 @@ class ETLController:
session = self.session()
untransformed = (
session.query(ScraperResult)
.filter_by(platform="Telegram")
.filter(ScraperResult.raw_data.notlike("%MessageService%"))
.join(Post, isouter=True)
.where(Post.raw_id == None)
.order_by(ScraperResult.date.asc())
.limit(50000)
.all()
)
logger.info(f"Found {len(untransformed)} items to ETL")

View File

@@ -3,8 +3,12 @@ from loguru import logger
from typing import Generator, Union, Callable
import dateutil.parser
from bs4 import BeautifulSoup
from psycopg2 import DatabaseError
import requests
import time
from telethon.sync import TelegramClient
from telethon.errors.rpcerrorlist import ChannelPrivateError, ChannelInvalidError
import os
from cisticola.transformer.base import Transformer
from cisticola.base import ScraperResult, Post, Image, Video, Media, Channel
@@ -22,39 +26,63 @@ class TelegramTelethonTransformer(Transformer):
return False
def get_screenname_from_id(self, orig_screenname, id):
def get_screenname_from_id(self, channel_id):
api_id = os.environ['TELEGRAM_API_ID']
api_hash = os.environ['TELEGRAM_API_HASH']
try:
with TelegramClient("transform.session", api_id, api_hash) as client:
data = client.get_entity(channel_id)
return (data.username, data.title, "")
except ChannelPrivateError:
logger.info("ChannelPrivateError")
return ("", "", "ChannelPrivateError")
except ChannelInvalidError:
logger.info("ChannelInvalidError")
return ("", "", "ChannelInvalidError")
def get_name_from_web_interface(self, orig_screenname, id):
url = "https://t.me/s/" + orig_screenname + "/" + str(id)
# this doesn't work for chat channels
if orig_screenname in self.bad_channels:
logger.debug(f"Skipping screenname because it is not accessible for channel {orig_screenname}")
return ("", "")
url = "https://t.me/s/" + orig_screenname + "/" + str(id)
return ""
logger.info(f"Finding channel from URL {url}")
r = requests.get(url)
if r.url != url:
self.bad_channels[orig_screenname] = True
return ("", "")
return ""
soup = BeautifulSoup(r.content)
post = soup.findAll("div", {"data-post" : orig_screenname + "/" + str(id)})
# multiple posts can be combined into one result in the web interface
decrement = 0
while len(post) == 0:
decrement += 1
if decrement > 8:
break
logger.info(f"Could not find post from {url}, looking for id {id - decrement}")
post = soup.findAll("div", {"data-post" : orig_screenname + "/" + str(id - decrement)})
if len(post) == 0:
logger.warning(f"Could not find post from {url}")
screenname = ""
name = ""
else:
fwd_tag = post[0].findAll("a", {"class", "tgme_widget_message_forwarded_from_name"})
if len(fwd_tag) > 0:
fwd_tag = fwd_tag[0]
name = fwd_tag.text
screenname = fwd_tag['href'].split('/')[-2]
else:
if len(fwd_tag) == 0:
fwd_tag = post[0].findAll("span", {"class", "tgme_widget_message_forwarded_from_name"})
if len(fwd_tag) >= 1:
name = fwd_tag[0].text
screenname = ""
return (screenname, name)
return name
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
@@ -66,32 +94,29 @@ class TelegramTelethonTransformer(Transformer):
fwd_from = None
if raw['fwd_from'] and raw['fwd_from']['from_id'] and 'channel_id' in raw['fwd_from']['from_id']:
channel = session.query(Channel).filter_by(platform_id=raw['fwd_from']['from_id']['channel_id']).first()
channel = session.query(Channel).filter_by(platform_id=str(raw['fwd_from']['from_id']['channel_id'])).first()
if channel is None:
orig_channel = session.query(Channel).filter_by(id=data.channel).first()
(screenname, name) = self.get_screenname_from_id(orig_channel.screenname, raw['id'])
(screenname, name, notes) = self.get_screenname_from_id(raw['fwd_from']['from_id']['channel_id'])
if name == "":
logger.info("Trying fallback web interface")
orig_channel = session.query(Channel).filter_by(id=data.channel).first()
name = self.get_name_from_web_interface(orig_channel.screenname, raw['id'])
channel = Channel(
name=name,
platform_id=raw['fwd_from']['from_id']['channel_id'],
platform=data.platform,
url="https://t.me/s/" + screenname,
url="https://t.me/s/" + screenname if screenname is not None else "",
screenname=screenname,
category='forwarded',
source=self.__version__
source=self.__version__,
notes=notes
)
channel = insert(channel)
elif channel.screenname == "":
# if the screenname is empty, we can fill it in
orig_channel = session.query(Channel).filter_by(id=data.channel).first()
(screenname, name) = self.get_screenname_from_id(orig_channel.screenname, raw['id'])
channel.screenname = screenname
channel.name = name
channel.url = "https://t.me/s/" + screenname
session.flush()
logger.info(f"Added {channel}")
fwd_from = channel.id
@@ -123,14 +148,14 @@ class TelegramTelethonTransformer(Transformer):
transformed = insert(transformed)
for k in data.archived_urls:
if data.archived_urls[k]:
archived_url = data.archived_urls[k]
ext = archived_url.split('.')[-1]
# for k in data.archived_urls:
# if data.archived_urls[k]:
# archived_url = data.archived_urls[k]
# ext = archived_url.split('.')[-1]
if ext == 'mp4' or ext == 'mov' or ext == 'avi' or ext =='mkv':
insert(Video(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))
else:
insert(Image(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))
# if ext == 'mp4' or ext == 'mov' or ext == 'avi' or ext =='mkv':
# insert(Video(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))
# else:
# insert(Image(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))