Merge branch 'main' of github.com:bellingcat/cisticola

This commit is contained in:
Logan Williams
2023-05-04 14:06:59 +02:00
5 changed files with 1630 additions and 141 deletions

1640
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

13
app.py
View File

@@ -1,7 +1,9 @@
import argparse import argparse
from asyncio import streams
from loguru import logger from loguru import logger
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
import datetime
import os import os
import sys import sys
@@ -112,12 +114,12 @@ def transform(args):
controller = get_transformer_controller(args) controller = get_transformer_controller(args)
if args.min_id: if args.min_date:
min_id = int(args.min_id) min_date = datetime.datetime.fromisoformat(args.min_date)
else: else:
min_id = 0 min_date = 0
controller.transform_all_untransformed(min_id=min_id) controller.transform_all_untransformed(min_date=min_date)
def transform_info(args): def transform_info(args):
logger.info(f"Transforming untransformed channel info") logger.info(f"Transforming untransformed channel info")
@@ -156,7 +158,7 @@ if __name__ == "__main__":
) )
parser.add_argument("--chronological", action="store_true") parser.add_argument("--chronological", action="store_true")
parser.add_argument("--telethon_session", type=str) parser.add_argument("--telethon_session", type=str)
parser.add_argument("--min_id", type=int) parser.add_argument("--min_date", type=str)
args = parser.parse_args() args = parser.parse_args()
@@ -179,6 +181,7 @@ if __name__ == "__main__":
scrape_channel_info(args) scrape_channel_info(args)
elif args.command == "transform": elif args.command == "transform":
logger.add("logs/transform.log", level="DEBUG", rotation="100 MB", retention="2 weeks", compression="zip") logger.add("logs/transform.log", level="DEBUG", rotation="100 MB", retention="2 weeks", compression="zip")
logger.add("logs/transform_trace.log", level="TRACE", retention="7 days")
transform(args) transform(args)
elif args.command == "transform-info": elif args.command == "transform-info":
logger.add("logs/transform-info.log", level="DEBUG", rotation="100 MB", retention="2 weeks", compression="zip") logger.add("logs/transform-info.log", level="DEBUG", rotation="100 MB", retention="2 weeks", compression="zip")

View File

@@ -258,7 +258,7 @@ class Post:
URL_REGEX = r"""(?i)\b((?:https?:(?:/{1,3}|[a-z0-9%])|[a-z0-9.\-]+[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)/)(?:[^\s()<>{}\[\]]+|\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\))+(?:\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\)|[^\s`!()\[\]{};:\'\".,<>?«»“”‘’])|(?:(?<!@)[a-z0-9]+(?:[.\-][a-z0-9]+)*[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)\b/?(?!@)))""" URL_REGEX = r"""(?i)\b((?:https?:(?:/{1,3}|[a-z0-9%])|[a-z0-9.\-]+[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)/)(?:[^\s()<>{}\[\]]+|\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\))+(?:\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\)|[^\s`!()\[\]{};:\'\".,<>?«»“”‘’])|(?:(?<!@)[a-z0-9]+(?:[.\-][a-z0-9]+)*[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)\b/?(?!@)))"""
# replace is here in order to prevent catastrophic backtracking # replace is here in order to prevent catastrophic backtracking
urls = re.findall(URL_REGEX, self.content.replace("::::::::", "")) urls = re.findall(URL_REGEX, self.content.replace("::::::::", "").replace("........", ""))
self.outlinks += urls self.outlinks += urls
self.outlinks = list(set(outlink for outlink in self.outlinks)) self.outlinks = list(set(outlink for outlink in self.outlinks))
@@ -301,7 +301,6 @@ class Post:
elif self.detected_language == 'nl': elif self.detected_language == 'nl':
nlp = nlp_nl nlp = nlp_nl
else: else:
logger.info(f"No language model for {self.detected_language}")
nlp = nlp_xx nlp = nlp_xx
ner_only = True ner_only = True

View File

@@ -106,7 +106,7 @@ class ETLController:
# create tables # create tables
mapper_registry.metadata.create_all(bind=engine) mapper_registry.metadata.create_all(bind=engine)
self.session = sessionmaker() self.session = sessionmaker(expire_on_commit=False)
self.session.configure(bind=engine) self.session.configure(bind=engine)
# MAY4 can try adding some new functions for batching post inserts # MAY4 can try adding some new functions for batching post inserts
@@ -236,14 +236,16 @@ class ETLController:
transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate), session) transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate), session)
session.commit()
break break
if handled == False: if handled == False:
logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})") logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})")
session.commit()
@logger.catch(reraise=True) @logger.catch(reraise=True)
def transform_all_untransformed(self, hydrate: bool = True, min_id=0): def transform_all_untransformed(self, hydrate: bool = True, min_date=datetime(2010, 1, 1)):
"""Transform all ScraperResult objects in the database that do not have an """Transform all ScraperResult objects in the database that do not have an
equivalent Post object stored. equivalent Post object stored.
@@ -267,7 +269,7 @@ class ETLController:
batch = (session.query(ScraperResult) batch = (session.query(ScraperResult)
.join(Post, isouter=True) .join(Post, isouter=True)
.where(ScraperResult.id > min_id) .where(ScraperResult.date > min_date)
.where(Post.raw_id == None) .where(Post.raw_id == None)
.order_by(ScraperResult.date.asc()) .order_by(ScraperResult.date.asc())
.limit(BATCH_SIZE) .limit(BATCH_SIZE)
@@ -282,7 +284,7 @@ class ETLController:
batch = (session.query(ScraperResult) batch = (session.query(ScraperResult)
.join(Post, isouter=True) .join(Post, isouter=True)
.where(ScraperResult.id > min_id) .where(ScraperResult.date > min_date)
.where(Post.raw_id == None) .where(Post.raw_id == None)
.where(ScraperResult.id != batch[-1].id) .where(ScraperResult.id != batch[-1].id)
.where(ScraperResult.date >= batch[-1].date) .where(ScraperResult.date >= batch[-1].date)

View File

@@ -10,6 +10,7 @@ from telethon.sync import TelegramClient
from telethon.errors.rpcerrorlist import ChannelPrivateError, ChannelInvalidError from telethon.errors.rpcerrorlist import ChannelPrivateError, ChannelInvalidError
from telethon.tl import types from telethon.tl import types
from telethon.helpers import add_surrogate, del_surrogate from telethon.helpers import add_surrogate, del_surrogate
from itertools import takewhile
import os import os
from datetime import datetime, timezone from datetime import datetime, timezone
@@ -23,6 +24,9 @@ class TelegramTelethonTransformer(Transformer):
__version__ = 'TelegramTelethonTransformer 0.0.4' __version__ = 'TelegramTelethonTransformer 0.0.4'
bad_channels = {} bad_channels = {}
channels_cache_by_platformid = {}
channels_cache_by_id = {}
channels_cache_by_screenname = {}
def can_handle(self, data: ScraperResult) -> bool: def can_handle(self, data: ScraperResult) -> bool:
scraper = data.scraper.split(' ') scraper = data.scraper.split(' ')
@@ -53,13 +57,10 @@ class TelegramTelethonTransformer(Transformer):
else: else:
return (data.username, data.title, "") return (data.username, data.title, "")
except ChannelPrivateError: except ChannelPrivateError:
logger.info("ChannelPrivateError")
return ("", "", "ChannelPrivateError") return ("", "", "ChannelPrivateError")
except ChannelInvalidError: except ChannelInvalidError:
logger.info("ChannelInvalidError")
return ("", "", "ChannelInvalidError") return ("", "", "ChannelInvalidError")
except ValueError: except ValueError:
logger.info("ValueError")
return ("", "", "ValueError") return ("", "", "ValueError")
def get_name_from_web_interface(self, orig_screenname, id): def get_name_from_web_interface(self, orig_screenname, id):
@@ -168,36 +169,41 @@ class TelegramTelethonTransformer(Transformer):
fwd_from = None fwd_from = None
if raw['fwd_from'] and raw['fwd_from']['from_id'] and 'channel_id' in raw['fwd_from']['from_id']: if raw['fwd_from'] and raw['fwd_from']['from_id'] and 'channel_id' in raw['fwd_from']['from_id']:
channel = session.query(Channel).filter_by(platform_id=str(raw['fwd_from']['from_id']['channel_id']), platform = 'Telegram').first() # use cache rather than a DB request if possible
if str(raw['fwd_from']['from_id']['channel_id']) not in self.channels_cache_by_platformid:
channel = session.query(Channel).filter_by(platform_id=str(raw['fwd_from']['from_id']['channel_id']), platform = 'Telegram').first()
if channel is None: if channel is None:
(screenname, name, notes) = self.get_screenname_from_id(raw['fwd_from']['from_id']['channel_id']) (screenname, name, notes) = self.get_screenname_from_id(raw['fwd_from']['from_id']['channel_id'])
if name == "": if name == "":
logger.info("Trying fallback web interface") logger.info("Trying fallback web interface")
orig_channel = session.query(Channel).filter_by(id=data.channel).first() orig_channel = session.query(Channel).filter_by(id=data.channel).first()
if orig_channel.screenname is not None: if orig_channel.screenname is not None:
name = self.get_name_from_web_interface(orig_channel.screenname, raw['id']) name = self.get_name_from_web_interface(orig_channel.screenname, raw['id'])
channel = Channel( channel = Channel(
name=name, name=name,
platform_id=raw['fwd_from']['from_id']['channel_id'], platform_id=raw['fwd_from']['from_id']['channel_id'],
platform=data.platform, platform=data.platform,
url="https://t.me/s/" + screenname if screenname is not None else "", url="https://t.me/s/" + screenname if screenname is not None else "",
screenname=screenname, screenname=screenname,
category='forwarded', category='forwarded',
source=self.__version__, source=self.__version__,
notes=notes notes=notes
) )
channel = insert(channel) channel = insert(channel)
logger.info(f"Added {channel}") logger.info(f"Added {channel}")
fwd_from = channel.id self.channels_cache_by_platformid[str(raw['fwd_from']['from_id']['channel_id'])] = channel
fwd_from = self.channels_cache_by_platformid[str(raw['fwd_from']['from_id']['channel_id'])].id
reply_to = None reply_to = None
if raw['reply_to']: if raw['reply_to']:
reply_to_id = str(raw['reply_to']['reply_to_msg_id']) reply_to_id = str(raw['reply_to']['reply_to_msg_id'])
session.commit()
post = session.query(Post).filter_by(channel=data.channel, platform_id=reply_to_id).first() post = session.query(Post).filter_by(channel=data.channel, platform_id=reply_to_id).first()
if post is None: if post is None:
reply_to = -1 reply_to = -1
@@ -207,32 +213,41 @@ class TelegramTelethonTransformer(Transformer):
mentions = [] mentions = []
for mention_entity in [entity for entity in raw['entities'] if entity['_'] == 'MessageEntityMention']: for mention_entity in [entity for entity in raw['entities'] if entity['_'] == 'MessageEntityMention']:
offset = mention_entity['offset'] offset = mention_entity['offset']
length = mention_entity['length'] length = mention_entity['length']
screenname = add_surrogate(raw['message'])[offset:offset+length].strip('@').strip() screenname = add_surrogate(raw['message'])[offset:offset+length].strip('@').strip()
channel = session.query(Channel).filter(func.lower(Channel.screenname)==func.lower(screenname)).first() # use cache rather than a DB request if possible
if screenname.lower() not in self.channels_cache_by_screenname:
channel = session.query(Channel).filter(func.lower(Channel.screenname)==func.lower(screenname)).first()
if channel is None: if channel is None:
channel = Channel( channel = Channel(
name = None, name = None,
platform_id = None, platform_id = None,
platform = 'Telegram', platform = 'Telegram',
url="https://t.me/s/" + screenname, url="https://t.me/s/" + screenname,
screenname=screenname, screenname=screenname,
category='mentioned', category='mentioned',
source=self.__version__, source=self.__version__,
) )
channel = insert(channel) channel = insert(channel)
logger.info(f"Added {channel}") logger.info(f"Added {channel}")
self.channels_cache_by_screenname[screenname.lower()] = channel
channel = self.channels_cache_by_screenname[screenname.lower()]
mentions.append(channel.id) mentions.append(channel.id)
channel = session.query(Channel).filter_by(id=int(data.channel)).first() # use cache rather than a DB request if possible
if int(data.channel) not in self.channels_cache_by_id:
channel = session.query(Channel).filter_by(id=int(data.channel)).first()
self.channels_cache_by_id[int(data.channel)] = channel
channel = self.channels_cache_by_id[int(data.channel)]
if channel is not None and channel.url: if channel is not None and channel.url:
url = channel.url.strip('/') + f"/{raw['id']}" url = channel.url.strip('/') + f"/{raw['id']}"
@@ -281,6 +296,14 @@ def stripped(s):
return lstripped + rstripped return lstripped + rstripped
def stripped(s):
"""https://stackoverflow.com/a/29933716"""
lstripped = ''.join(takewhile(str.isspace, s))
rstripped = ''.join(reversed(tuple(takewhile(str.isspace, reversed(s)))))
return lstripped + rstripped
def add_markdown_links(raw_post): def add_markdown_links(raw_post):
"""This function is necessary because Telethon's markdown.unparse doesn't """This function is necessary because Telethon's markdown.unparse doesn't
correctly handle trailing whitespace or multi-line links""" correctly handle trailing whitespace or multi-line links"""