mirror of
https://github.com/bellingcat/cisticola.git
synced 2026-06-08 03:18:34 +03:00
Streamline logging; fix markdown formating in Telegram
This commit is contained in:
1
Pipfile
1
Pipfile
@@ -28,6 +28,7 @@ ocrd-pyexiftool = "*"
|
||||
filelock = "*"
|
||||
telethon = "*"
|
||||
psycopg2 = "*"
|
||||
joblib = "*"
|
||||
|
||||
[dev-packages]
|
||||
pytest = "*"
|
||||
|
||||
1738
Pipfile.lock
generated
1738
Pipfile.lock
generated
File diff suppressed because it is too large
Load Diff
13
app.py
13
app.py
@@ -1,7 +1,9 @@
|
||||
import argparse
|
||||
from asyncio import streams
|
||||
from loguru import logger
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
import datetime
|
||||
import os
|
||||
import sys
|
||||
|
||||
@@ -112,12 +114,12 @@ def transform(args):
|
||||
|
||||
controller = get_transformer_controller(args)
|
||||
|
||||
if args.min_id:
|
||||
min_id = int(args.min_id)
|
||||
if args.min_date:
|
||||
min_date = datetime.datetime.fromisoformat(args.min_date)
|
||||
else:
|
||||
min_id = 0
|
||||
min_date = 0
|
||||
|
||||
controller.transform_all_untransformed(min_id=min_id)
|
||||
controller.transform_all_untransformed(min_date=min_date)
|
||||
|
||||
def transform_info(args):
|
||||
logger.info(f"Transforming untransformed channel info")
|
||||
@@ -156,7 +158,7 @@ if __name__ == "__main__":
|
||||
)
|
||||
parser.add_argument("--chronological", action="store_true")
|
||||
parser.add_argument("--telethon_session", type=str)
|
||||
parser.add_argument("--min_id", type=int)
|
||||
parser.add_argument("--min_date", type=str)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -179,6 +181,7 @@ if __name__ == "__main__":
|
||||
scrape_channel_info(args)
|
||||
elif args.command == "transform":
|
||||
logger.add("logs/transform.log", level="DEBUG", rotation="100 MB", retention="2 weeks", compression="zip")
|
||||
logger.add("logs/transform_trace.log", level="TRACE", retention="7 days")
|
||||
transform(args)
|
||||
elif args.command == "transform-info":
|
||||
logger.add("logs/transform-info.log", level="DEBUG", rotation="100 MB", retention="2 weeks", compression="zip")
|
||||
|
||||
@@ -258,7 +258,7 @@ class Post:
|
||||
URL_REGEX = r"""(?i)\b((?:https?:(?:/{1,3}|[a-z0-9%])|[a-z0-9.\-]+[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)/)(?:[^\s()<>{}\[\]]+|\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\))+(?:\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\)|[^\s`!()\[\]{};:\'\".,<>?«»“”‘’])|(?:(?<!@)[a-z0-9]+(?:[.\-][a-z0-9]+)*[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)\b/?(?!@)))"""
|
||||
|
||||
# replace is here in order to prevent catastrophic backtracking
|
||||
urls = re.findall(URL_REGEX, self.content.replace("::::::::", ""))
|
||||
urls = re.findall(URL_REGEX, self.content.replace("::::::::", "").replace("........", ""))
|
||||
self.outlinks += urls
|
||||
self.outlinks = list(set(outlink for outlink in self.outlinks))
|
||||
|
||||
@@ -301,7 +301,6 @@ class Post:
|
||||
elif self.detected_language == 'nl':
|
||||
nlp = nlp_nl
|
||||
else:
|
||||
logger.info(f"No language model for {self.detected_language}")
|
||||
nlp = nlp_xx
|
||||
ner_only = True
|
||||
|
||||
|
||||
@@ -104,7 +104,7 @@ class ETLController:
|
||||
# create tables
|
||||
mapper_registry.metadata.create_all(bind=engine)
|
||||
|
||||
self.session = sessionmaker()
|
||||
self.session = sessionmaker(expire_on_commit=False)
|
||||
self.session.configure(bind=engine)
|
||||
|
||||
def insert_or_select(self, obj, session, hydrate: bool = True):
|
||||
@@ -211,14 +211,16 @@ class ETLController:
|
||||
|
||||
transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate), session)
|
||||
|
||||
session.commit()
|
||||
break
|
||||
|
||||
if handled == False:
|
||||
logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})")
|
||||
|
||||
session.commit()
|
||||
|
||||
|
||||
@logger.catch(reraise=True)
|
||||
def transform_all_untransformed(self, hydrate: bool = True, min_id=0):
|
||||
def transform_all_untransformed(self, hydrate: bool = True, min_date=datetime(2010, 1, 1)):
|
||||
"""Transform all ScraperResult objects in the database that do not have an
|
||||
equivalent Post object stored.
|
||||
|
||||
@@ -242,7 +244,7 @@ class ETLController:
|
||||
|
||||
batch = (session.query(ScraperResult)
|
||||
.join(Post, isouter=True)
|
||||
.where(ScraperResult.id > min_id)
|
||||
.where(ScraperResult.date > min_date)
|
||||
.where(Post.raw_id == None)
|
||||
.order_by(ScraperResult.date.asc())
|
||||
.limit(BATCH_SIZE)
|
||||
@@ -257,7 +259,7 @@ class ETLController:
|
||||
|
||||
batch = (session.query(ScraperResult)
|
||||
.join(Post, isouter=True)
|
||||
.where(ScraperResult.id > min_id)
|
||||
.where(ScraperResult.date > min_date)
|
||||
.where(Post.raw_id == None)
|
||||
.where(ScraperResult.id != batch[-1].id)
|
||||
.where(ScraperResult.date >= batch[-1].date)
|
||||
|
||||
@@ -9,7 +9,8 @@ import time
|
||||
from telethon.sync import TelegramClient
|
||||
from telethon.errors.rpcerrorlist import ChannelPrivateError, ChannelInvalidError
|
||||
from telethon.tl import types
|
||||
from telethon.helpers import add_surrogate
|
||||
from telethon.helpers import add_surrogate, del_surrogate
|
||||
from itertools import takewhile
|
||||
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
@@ -23,6 +24,9 @@ class TelegramTelethonTransformer(Transformer):
|
||||
__version__ = 'TelegramTelethonTransformer 0.0.4'
|
||||
|
||||
bad_channels = {}
|
||||
channels_cache_by_platformid = {}
|
||||
channels_cache_by_id = {}
|
||||
channels_cache_by_screenname = {}
|
||||
|
||||
def can_handle(self, data: ScraperResult) -> bool:
|
||||
scraper = data.scraper.split(' ')
|
||||
@@ -53,13 +57,10 @@ class TelegramTelethonTransformer(Transformer):
|
||||
else:
|
||||
return (data.username, data.title, "")
|
||||
except ChannelPrivateError:
|
||||
logger.info("ChannelPrivateError")
|
||||
return ("", "", "ChannelPrivateError")
|
||||
except ChannelInvalidError:
|
||||
logger.info("ChannelInvalidError")
|
||||
return ("", "", "ChannelInvalidError")
|
||||
except ValueError:
|
||||
logger.info("ValueError")
|
||||
return ("", "", "ValueError")
|
||||
|
||||
def get_name_from_web_interface(self, orig_screenname, id):
|
||||
@@ -168,36 +169,41 @@ class TelegramTelethonTransformer(Transformer):
|
||||
fwd_from = None
|
||||
|
||||
if raw['fwd_from'] and raw['fwd_from']['from_id'] and 'channel_id' in raw['fwd_from']['from_id']:
|
||||
channel = session.query(Channel).filter_by(platform_id=str(raw['fwd_from']['from_id']['channel_id']), platform = 'Telegram').first()
|
||||
# use cache rather than a DB request if possible
|
||||
if str(raw['fwd_from']['from_id']['channel_id']) not in self.channels_cache_by_platformid:
|
||||
channel = session.query(Channel).filter_by(platform_id=str(raw['fwd_from']['from_id']['channel_id']), platform = 'Telegram').first()
|
||||
|
||||
if channel is None:
|
||||
(screenname, name, notes) = self.get_screenname_from_id(raw['fwd_from']['from_id']['channel_id'])
|
||||
if channel is None:
|
||||
(screenname, name, notes) = self.get_screenname_from_id(raw['fwd_from']['from_id']['channel_id'])
|
||||
|
||||
if name == "":
|
||||
logger.info("Trying fallback web interface")
|
||||
orig_channel = session.query(Channel).filter_by(id=data.channel).first()
|
||||
if orig_channel.screenname is not None:
|
||||
name = self.get_name_from_web_interface(orig_channel.screenname, raw['id'])
|
||||
if name == "":
|
||||
logger.info("Trying fallback web interface")
|
||||
orig_channel = session.query(Channel).filter_by(id=data.channel).first()
|
||||
if orig_channel.screenname is not None:
|
||||
name = self.get_name_from_web_interface(orig_channel.screenname, raw['id'])
|
||||
|
||||
channel = Channel(
|
||||
name=name,
|
||||
platform_id=raw['fwd_from']['from_id']['channel_id'],
|
||||
platform=data.platform,
|
||||
url="https://t.me/s/" + screenname if screenname is not None else "",
|
||||
screenname=screenname,
|
||||
category='forwarded',
|
||||
source=self.__version__,
|
||||
notes=notes
|
||||
)
|
||||
channel = Channel(
|
||||
name=name,
|
||||
platform_id=raw['fwd_from']['from_id']['channel_id'],
|
||||
platform=data.platform,
|
||||
url="https://t.me/s/" + screenname if screenname is not None else "",
|
||||
screenname=screenname,
|
||||
category='forwarded',
|
||||
source=self.__version__,
|
||||
notes=notes
|
||||
)
|
||||
|
||||
channel = insert(channel)
|
||||
logger.info(f"Added {channel}")
|
||||
channel = insert(channel)
|
||||
logger.info(f"Added {channel}")
|
||||
|
||||
fwd_from = channel.id
|
||||
self.channels_cache_by_platformid[str(raw['fwd_from']['from_id']['channel_id'])] = channel
|
||||
|
||||
fwd_from = self.channels_cache_by_platformid[str(raw['fwd_from']['from_id']['channel_id'])].id
|
||||
|
||||
reply_to = None
|
||||
if raw['reply_to']:
|
||||
reply_to_id = str(raw['reply_to']['reply_to_msg_id'])
|
||||
session.commit()
|
||||
post = session.query(Post).filter_by(channel=data.channel, platform_id=reply_to_id).first()
|
||||
if post is None:
|
||||
reply_to = -1
|
||||
@@ -207,32 +213,41 @@ class TelegramTelethonTransformer(Transformer):
|
||||
mentions = []
|
||||
|
||||
for mention_entity in [entity for entity in raw['entities'] if entity['_'] == 'MessageEntityMention']:
|
||||
|
||||
offset = mention_entity['offset']
|
||||
length = mention_entity['length']
|
||||
|
||||
screenname = add_surrogate(raw['message'])[offset:offset+length].strip('@').strip()
|
||||
|
||||
channel = session.query(Channel).filter(func.lower(Channel.screenname)==func.lower(screenname)).first()
|
||||
# use cache rather than a DB request if possible
|
||||
if screenname.lower() not in self.channels_cache_by_screenname:
|
||||
channel = session.query(Channel).filter(func.lower(Channel.screenname)==func.lower(screenname)).first()
|
||||
|
||||
if channel is None:
|
||||
if channel is None:
|
||||
|
||||
channel = Channel(
|
||||
name = None,
|
||||
platform_id = None,
|
||||
platform = 'Telegram',
|
||||
url="https://t.me/s/" + screenname,
|
||||
screenname=screenname,
|
||||
category='mentioned',
|
||||
source=self.__version__,
|
||||
)
|
||||
channel = Channel(
|
||||
name = None,
|
||||
platform_id = None,
|
||||
platform = 'Telegram',
|
||||
url="https://t.me/s/" + screenname,
|
||||
screenname=screenname,
|
||||
category='mentioned',
|
||||
source=self.__version__,
|
||||
)
|
||||
|
||||
channel = insert(channel)
|
||||
logger.info(f"Added {channel}")
|
||||
channel = insert(channel)
|
||||
logger.info(f"Added {channel}")
|
||||
|
||||
self.channels_cache_by_screenname[screenname.lower()] = channel
|
||||
channel = self.channels_cache_by_screenname[screenname.lower()]
|
||||
|
||||
mentions.append(channel.id)
|
||||
|
||||
channel = session.query(Channel).filter_by(id=int(data.channel)).first()
|
||||
# use cache rather than a DB request if possible
|
||||
if int(data.channel) not in self.channels_cache_by_id:
|
||||
channel = session.query(Channel).filter_by(id=int(data.channel)).first()
|
||||
self.channels_cache_by_id[int(data.channel)] = channel
|
||||
|
||||
channel = self.channels_cache_by_id[int(data.channel)]
|
||||
|
||||
if channel is not None and channel.url:
|
||||
url = channel.url.strip('/') + f"/{raw['id']}"
|
||||
@@ -272,10 +287,20 @@ class TelegramTelethonTransformer(Transformer):
|
||||
|
||||
transformed = insert(transformed)
|
||||
|
||||
def stripped(s):
|
||||
"""https://stackoverflow.com/a/29933716"""
|
||||
|
||||
lstripped = ''.join(takewhile(str.isspace, s))
|
||||
rstripped = ''.join(reversed(tuple(takewhile(str.isspace, reversed(s)))))
|
||||
|
||||
return lstripped + rstripped
|
||||
|
||||
def add_markdown_links(raw_post):
|
||||
"""This function is necessary because Telethon's markdown.unparse doesn't
|
||||
correctly handle trailing whitespace or multi-line links"""
|
||||
|
||||
global_offset = 0
|
||||
transformed_content = raw_post['message']
|
||||
transformed_content = add_surrogate(raw_post['message'])
|
||||
links = [entity for entity in raw_post['entities'] if entity['_'] == 'MessageEntityTextUrl']
|
||||
|
||||
for link in links:
|
||||
@@ -284,12 +309,18 @@ def add_markdown_links(raw_post):
|
||||
url = link['url']
|
||||
|
||||
before_link = transformed_content[:offset]
|
||||
link_text = f"[{transformed_content[offset:offset+length].strip()}]"
|
||||
trailing_whitespace = ''.join([c for c in transformed_content[offset:offset+length] if c.isspace()])
|
||||
link_href = f"({url})"
|
||||
after_link = transformed_content[offset+length:]
|
||||
|
||||
transformed_content = before_link + link_text + link_href + trailing_whitespace + after_link
|
||||
global_offset += (4 + len(url))
|
||||
inner_text = transformed_content[offset:offset+length]
|
||||
|
||||
return transformed_content
|
||||
# skip creation of link if inner link text is only whitespace
|
||||
if inner_text.replace('\u200b', '').strip():
|
||||
|
||||
processed_inner_text = inner_text.strip().replace('\n', '\\\n')
|
||||
link_text = f"[{processed_inner_text}]"
|
||||
trailing_whitespace = stripped(transformed_content[offset:offset+length])
|
||||
link_href = f"({url})"
|
||||
after_link = transformed_content[offset+length:]
|
||||
|
||||
transformed_content = before_link + link_text + link_href + trailing_whitespace + after_link
|
||||
global_offset += (4 + len(url) + inner_text.strip().count('\n'))
|
||||
|
||||
return del_surrogate(transformed_content)
|
||||
Reference in New Issue
Block a user