This commit is contained in:
Logan Williams
2023-03-02 15:28:24 +00:00
12 changed files with 412 additions and 1509 deletions

View File

@@ -19,7 +19,6 @@ pytesseract = "*"
instaloader = "*"
gspread = "*"
cryptg = "*"
psycopg2 = "*"
tqdm = "*"
ratelimit = "*"
pytz = "*"
@@ -28,6 +27,7 @@ spacy = "==3.2.4"
ocrd-pyexiftool = "*"
filelock = "*"
telethon = "*"
psycopg2 = "*"
[dev-packages]
pytest = "*"

1840
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

2
app.py
View File

@@ -112,6 +112,8 @@ def transform_info(args):
controller = get_transformer_controller()
controller.transform_all_untransformed_info()
# sync_channels(args, get_db_session())
def transform_media(args):
logger.info(f"Transforming untransformed channel media")

View File

@@ -343,7 +343,7 @@ class ScraperController:
session = self.session()
# TODO there should be a better/more generic way of selecting scrapeable channels
channels = session.query(Channel).filter((Channel.source=='researcher')|(Channel.source=='snowball_it')|(Channel.source=='snowball_complete')).all()
channels = session.query(Channel).filter((Channel.source=='researcher')|(Channel.source=='snowball_it')|(Channel.source=='snowball_complete')|(Channel.source=='linked_channel')).all()
session.close()
@@ -360,7 +360,7 @@ class ScraperController:
# This will sort the channels by the least recently scraped.
most_recently_archived = session.query(func.max(RawChannelInfo.date_archived).label("date"), RawChannelInfo.channel.label("channel")).group_by(RawChannelInfo.channel).subquery()
channels = session.query(Channel).\
filter((Channel.source=='researcher')|(Channel.source=='snowball_it')|(Channel.source=='snowball_complete')).\
filter((Channel.source=='researcher')|(Channel.source=='snowball_it')|(Channel.source=='snowball_complete')|(Channel.source=='linked_channel')).\
outerjoin(most_recently_archived, Channel.id == most_recently_archived.c.channel).\
order_by(nullsfirst(most_recently_archived.c.date.asc())).all()

View File

@@ -115,7 +115,10 @@ class ETLController:
# This is using some adhoc unique constraints that might be worth formalizing at some point
if type(obj) == Channel:
instance = session.query(Channel).filter_by(url=obj.url, platform_id=str(obj.platform_id or '') or obj.platform_id, platform=obj.platform).first()
instance = session.query(Channel).filter(Channel.platform==obj.platform).filter(
(Channel.url==obj.url)|
(Channel.platform_id==(str(obj.platform_id or '') or obj.platform_id))
).first()
elif type(obj) == Post:
instance = None
@@ -233,7 +236,7 @@ class ETLController:
logger.info(f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {max(batch, key=lambda v: v.date).date}")
batch = (session.query(ScraperResult)
batch = (query(ScraperResult)
.join(Post, isouter=True)
.where(ScraperResult.id > 35000000) # TODO this can be a CLI argument or something
.where(Post.raw_id == None)
@@ -252,7 +255,9 @@ class ETLController:
session = self.session()
for result in results:
for data in results:
result = data.RawChannelInfo
if result.scraper is not None and result.platform is not None:
handled = False
@@ -261,7 +266,7 @@ class ETLController:
logger.trace(f"{transformer} is handling raw info result {result.id} ({result.date_archived})")
handled = True
transformer.transform_info(result, lambda obj: self.insert_or_select(obj, session, False), session)
transformer.transform_info(result, lambda obj: self.insert_or_select(obj, session, False), session, channel=data.Channel)
session.commit()
break
@@ -282,9 +287,11 @@ class ETLController:
offset = 0
batch = []
query = (session.query(RawChannelInfo)
query = (session.query(RawChannelInfo, Channel)
.select_from(RawChannelInfo)
.join(ChannelInfo, isouter=True)
.where(ChannelInfo.raw_channel_info_id == None)
.join(Channel, RawChannelInfo.channel==Channel.id)
.where(ChannelInfo.id == None)
.order_by(RawChannelInfo.date_archived.asc())
)

View File

@@ -22,7 +22,7 @@ class GettrTransformer(Transformer):
return False
def transform_info(self, data: RawChannelInfo, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def transform_info(self, data: RawChannelInfo, insert: Callable, session, channel=None) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
transformed = ChannelInfo(

View File

@@ -20,7 +20,7 @@ class RumbleTransformer(Transformer):
return False
def transform_info(self, data: RawChannelInfo, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def transform_info(self, data: RawChannelInfo, insert: Callable, session, channel=None) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
if 'id' not in raw:

View File

@@ -94,7 +94,7 @@ class TelegramTelethonTransformer(Transformer):
return name
def transform_info(self, data: RawChannelInfo, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def transform_info(self, data: RawChannelInfo, insert: Callable, session, channel=None) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
chat_raw = raw['chats'][0]
@@ -121,6 +121,33 @@ class TelegramTelethonTransformer(Transformer):
transformed = insert(transformed)
if channel.platform_id is None:
logger.info(f"Missing platform ID on {channel}, setting to {raw['full_chat']['id']}")
new_channel = session.query(Channel).where(Channel.id == channel.id).one()
new_channel.platform_id = raw['full_chat']['id']
session.flush()
session.commit()
if len(raw['chats']) > 1:
for chat in raw['chats'][1:]:
new_chat = Channel(
name=chat["title"],
platform_id=chat["id"],
category=channel.category, # this should be the same as the "parent"
platform=channel.platform, # this should be the same as the "parent"
url="",
screenname=chat["username"] if "username" in chat else "",
country=channel.country, # this should be the same as the "parent"
influencer=channel.influencer, # this should be the same as the "parent"
public=None,
chat=None,
notes=channel.id, # this should be the channel ID of the parent
source="linked_channel"
)
insert(new_chat)
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)

View File

@@ -46,7 +46,7 @@ class TwitterTransformer(Transformer):
yield m
def transform_info(self, data: RawChannelInfo, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def transform_info(self, data: RawChannelInfo, insert: Callable, session, channel=None) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
transformed = ChannelInfo(

View File

@@ -20,7 +20,7 @@ class VkontakteTransformer(Transformer):
return False
def transform_info(self, data: RawChannelInfo, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
def transform_info(self, data: RawChannelInfo, insert: Callable, session, channel=None) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
transformed = ChannelInfo(

View File

@@ -1,9 +1,9 @@
#!/bin/bash
pipenv run python -m spacy download xx_ent_wiki_sm
pipenv run python -m spacy download fr_core_news_sm
pipenv run python -m spacy download de_core_news_sm
pipenv run python -m spacy download nl_core_news_sm
pipenv run python -m spacy download it_core_news_sm
pipenv run python -m spacy download ru_core_news_sm
pipenv run python -m spacy download en_core_web_sm
python -m spacy download xx_ent_wiki_sm
python -m spacy download fr_core_news_sm
python -m spacy download de_core_news_sm
python -m spacy download nl_core_news_sm
python -m spacy download it_core_news_sm
python -m spacy download ru_core_news_sm
python -m spacy download en_core_web_sm

View File

@@ -1,6 +1,7 @@
import gspread
import time
from loguru import logger
import os
from cisticola.base import Channel, ChannelInfo
@@ -32,7 +33,7 @@ def sync_channels(args, session):
gc = gspread.service_account(filename="service_account.json")
# Open a sheet from a spreadsheet in one go
wks = gc.open_by_url(args.gsheet).worksheet("channels")
wks = gc.open_by_url(os.environ['GSHEET']).worksheet("channels")
channels = wks.get_all_records(expected_headers = expected_headers)
row = 2