Support related Telegram chats (associated discussion groups)

This commit is contained in:
Logan Williams
2023-03-02 16:21:43 +01:00
parent 351e471ff4
commit 531059ca02
12 changed files with 412 additions and 1509 deletions

View File

@@ -19,7 +19,6 @@ pytesseract = "*"
instaloader = "*" instaloader = "*"
gspread = "*" gspread = "*"
cryptg = "*" cryptg = "*"
psycopg2 = "*"
tqdm = "*" tqdm = "*"
ratelimit = "*" ratelimit = "*"
pytz = "*" pytz = "*"
@@ -28,6 +27,7 @@ spacy = "==3.2.4"
ocrd-pyexiftool = "*" ocrd-pyexiftool = "*"
filelock = "*" filelock = "*"
telethon = "*" telethon = "*"
psycopg2 = "*"
[dev-packages] [dev-packages]
pytest = "*" pytest = "*"

1840
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

2
app.py
View File

@@ -112,6 +112,8 @@ def transform_info(args):
controller = get_transformer_controller() controller = get_transformer_controller()
controller.transform_all_untransformed_info() controller.transform_all_untransformed_info()
# sync_channels(args, get_db_session())
def transform_media(args): def transform_media(args):
logger.info(f"Transforming untransformed channel media") logger.info(f"Transforming untransformed channel media")

View File

@@ -343,7 +343,7 @@ class ScraperController:
session = self.session() session = self.session()
# TODO there should be a better/more generic way of selecting scrapeable channels # TODO there should be a better/more generic way of selecting scrapeable channels
channels = session.query(Channel).filter((Channel.source=='researcher')|(Channel.source=='snowball_it')|(Channel.source=='snowball_complete')).all() channels = session.query(Channel).filter((Channel.source=='researcher')|(Channel.source=='snowball_it')|(Channel.source=='snowball_complete')|(Channel.source=='linked_channel')).all()
session.close() session.close()
@@ -360,7 +360,7 @@ class ScraperController:
# This will sort the channels by the least recently scraped. # This will sort the channels by the least recently scraped.
most_recently_archived = session.query(func.max(RawChannelInfo.date_archived).label("date"), RawChannelInfo.channel.label("channel")).group_by(RawChannelInfo.channel).subquery() most_recently_archived = session.query(func.max(RawChannelInfo.date_archived).label("date"), RawChannelInfo.channel.label("channel")).group_by(RawChannelInfo.channel).subquery()
channels = session.query(Channel).\ channels = session.query(Channel).\
filter((Channel.source=='researcher')|(Channel.source=='snowball_it')|(Channel.source=='snowball_complete')).\ filter((Channel.source=='researcher')|(Channel.source=='snowball_it')|(Channel.source=='snowball_complete')|(Channel.source=='linked_channel')).\
outerjoin(most_recently_archived, Channel.id == most_recently_archived.c.channel).\ outerjoin(most_recently_archived, Channel.id == most_recently_archived.c.channel).\
order_by(nullsfirst(most_recently_archived.c.date.asc())).all() order_by(nullsfirst(most_recently_archived.c.date.asc())).all()

View File

@@ -115,7 +115,10 @@ class ETLController:
# This is using some adhoc unique constraints that might be worth formalizing at some point # This is using some adhoc unique constraints that might be worth formalizing at some point
if type(obj) == Channel: if type(obj) == Channel:
instance = session.query(Channel).filter_by(url=obj.url, platform_id=str(obj.platform_id or '') or obj.platform_id, platform=obj.platform).first() instance = session.query(Channel).filter(Channel.platform==obj.platform).filter(
(Channel.url==obj.url)|
(Channel.platform_id==(str(obj.platform_id or '') or obj.platform_id))
).first()
elif type(obj) == Post: elif type(obj) == Post:
instance = None instance = None
@@ -233,7 +236,7 @@ class ETLController:
logger.info(f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {max(batch, key=lambda v: v.date).date}") logger.info(f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {max(batch, key=lambda v: v.date).date}")
batch = (session.query(ScraperResult) batch = (query(ScraperResult)
.join(Post, isouter=True) .join(Post, isouter=True)
.where(ScraperResult.id > 35000000) # TODO this can be a CLI argument or something .where(ScraperResult.id > 35000000) # TODO this can be a CLI argument or something
.where(Post.raw_id == None) .where(Post.raw_id == None)
@@ -252,7 +255,9 @@ class ETLController:
session = self.session() session = self.session()
for result in results: for data in results:
result = data.RawChannelInfo
if result.scraper is not None and result.platform is not None: if result.scraper is not None and result.platform is not None:
handled = False handled = False
@@ -261,7 +266,7 @@ class ETLController:
logger.trace(f"{transformer} is handling raw info result {result.id} ({result.date_archived})") logger.trace(f"{transformer} is handling raw info result {result.id} ({result.date_archived})")
handled = True handled = True
transformer.transform_info(result, lambda obj: self.insert_or_select(obj, session, False), session) transformer.transform_info(result, lambda obj: self.insert_or_select(obj, session, False), session, channel=data.Channel)
session.commit() session.commit()
break break
@@ -282,9 +287,11 @@ class ETLController:
offset = 0 offset = 0
batch = [] batch = []
query = (session.query(RawChannelInfo) query = (session.query(RawChannelInfo, Channel)
.select_from(RawChannelInfo)
.join(ChannelInfo, isouter=True) .join(ChannelInfo, isouter=True)
.where(ChannelInfo.raw_channel_info_id == None) .join(Channel, RawChannelInfo.channel==Channel.id)
.where(ChannelInfo.id == None)
.order_by(RawChannelInfo.date_archived.asc()) .order_by(RawChannelInfo.date_archived.asc())
) )

View File

@@ -22,7 +22,7 @@ class GettrTransformer(Transformer):
return False return False
def transform_info(self, data: RawChannelInfo, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: def transform_info(self, data: RawChannelInfo, insert: Callable, session, channel=None) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data) raw = json.loads(data.raw_data)
transformed = ChannelInfo( transformed = ChannelInfo(

View File

@@ -20,7 +20,7 @@ class RumbleTransformer(Transformer):
return False return False
def transform_info(self, data: RawChannelInfo, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: def transform_info(self, data: RawChannelInfo, insert: Callable, session, channel=None) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data) raw = json.loads(data.raw_data)
if 'id' not in raw: if 'id' not in raw:

View File

@@ -94,7 +94,7 @@ class TelegramTelethonTransformer(Transformer):
return name return name
def transform_info(self, data: RawChannelInfo, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: def transform_info(self, data: RawChannelInfo, insert: Callable, session, channel=None) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data) raw = json.loads(data.raw_data)
chat_raw = raw['chats'][0] chat_raw = raw['chats'][0]
@@ -121,6 +121,33 @@ class TelegramTelethonTransformer(Transformer):
transformed = insert(transformed) transformed = insert(transformed)
if channel.platform_id is None:
logger.info(f"Missing platform ID on {channel}, setting to {raw['full_chat']['id']}")
new_channel = session.query(Channel).where(Channel.id == channel.id).one()
new_channel.platform_id = raw['full_chat']['id']
session.flush()
session.commit()
if len(raw['chats']) > 1:
for chat in raw['chats'][1:]:
new_chat = Channel(
name=chat["title"],
platform_id=chat["id"],
category=channel.category, # this should be the same as the "parent"
platform=channel.platform, # this should be the same as the "parent"
url="",
screenname=chat["username"] if "username" in chat else "",
country=channel.country, # this should be the same as the "parent"
influencer=channel.influencer, # this should be the same as the "parent"
public=None,
chat=None,
notes=channel.id, # this should be the channel ID of the parent
source="linked_channel"
)
insert(new_chat)
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data) raw = json.loads(data.raw_data)

View File

@@ -46,7 +46,7 @@ class TwitterTransformer(Transformer):
yield m yield m
def transform_info(self, data: RawChannelInfo, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: def transform_info(self, data: RawChannelInfo, insert: Callable, session, channel=None) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data) raw = json.loads(data.raw_data)
transformed = ChannelInfo( transformed = ChannelInfo(

View File

@@ -20,7 +20,7 @@ class VkontakteTransformer(Transformer):
return False return False
def transform_info(self, data: RawChannelInfo, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]: def transform_info(self, data: RawChannelInfo, insert: Callable, session, channel=None) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data) raw = json.loads(data.raw_data)
transformed = ChannelInfo( transformed = ChannelInfo(

View File

@@ -1,9 +1,9 @@
#!/bin/bash #!/bin/bash
pipenv run python -m spacy download xx_ent_wiki_sm python -m spacy download xx_ent_wiki_sm
pipenv run python -m spacy download fr_core_news_sm python -m spacy download fr_core_news_sm
pipenv run python -m spacy download de_core_news_sm python -m spacy download de_core_news_sm
pipenv run python -m spacy download nl_core_news_sm python -m spacy download nl_core_news_sm
pipenv run python -m spacy download it_core_news_sm python -m spacy download it_core_news_sm
pipenv run python -m spacy download ru_core_news_sm python -m spacy download ru_core_news_sm
pipenv run python -m spacy download en_core_web_sm python -m spacy download en_core_web_sm

View File

@@ -1,6 +1,7 @@
import gspread import gspread
import time import time
from loguru import logger from loguru import logger
import os
from cisticola.base import Channel, ChannelInfo from cisticola.base import Channel, ChannelInfo
@@ -32,7 +33,7 @@ def sync_channels(args, session):
gc = gspread.service_account(filename="service_account.json") gc = gspread.service_account(filename="service_account.json")
# Open a sheet from a spreadsheet in one go # Open a sheet from a spreadsheet in one go
wks = gc.open_by_url(args.gsheet).worksheet("channels") wks = gc.open_by_url(os.environ['GSHEET']).worksheet("channels")
channels = wks.get_all_records(expected_headers = expected_headers) channels = wks.get_all_records(expected_headers = expected_headers)
row = 2 row = 2