mirror of
https://github.com/bellingcat/cisticola.git
synced 2026-06-08 03:18:34 +03:00
Merge pull request #54 from bellingcat/transformers
Functional Telegram transformer
This commit is contained in:
2
Pipfile
2
Pipfile
@@ -22,6 +22,8 @@ psycopg2 = "*"
|
||||
tqdm = "*"
|
||||
ratelimit = "*"
|
||||
pytz = "*"
|
||||
langdetect = "*"
|
||||
spacy = "==3.2.4"
|
||||
|
||||
[dev-packages]
|
||||
pytest = "*"
|
||||
|
||||
1195
Pipfile.lock
generated
1195
Pipfile.lock
generated
File diff suppressed because it is too large
Load Diff
51
app.py
51
app.py
@@ -16,6 +16,7 @@ from cisticola.scraper import (
|
||||
BitchuteScraper,
|
||||
RumbleScraper,
|
||||
)
|
||||
from cisticola.transformer import (ETLController, TelegramTelethonTransformer)
|
||||
|
||||
|
||||
def sync_channels(args):
|
||||
@@ -59,11 +60,17 @@ def sync_channels(args):
|
||||
channel = (
|
||||
session.query(Channel)
|
||||
.filter_by(
|
||||
platform_id=str(platform_id), platform=c["platform"], url=c["url"]
|
||||
platform_id=str(platform_id), platform=str(c["platform"])
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not channel:
|
||||
channel = session.query(Channel).filter_by(platform=str(c["platform"]), url=str(c["url"])).first()
|
||||
|
||||
if not channel and c["screenname"] != '' and c["screenname"] is not None:
|
||||
channel = session.query(Channel).filter_by(platform=str(c["platform"]), screenname=str(c["screenname"])).first()
|
||||
|
||||
if not channel:
|
||||
channel = Channel(**c, source="researcher")
|
||||
logger.debug(f"{channel} does not exist, adding")
|
||||
@@ -73,6 +80,26 @@ def sync_channels(args):
|
||||
|
||||
wks.update_cell(row, 1, channel.id)
|
||||
time.sleep(1)
|
||||
else:
|
||||
logger.info(f"Channel found, updating channel {channel}")
|
||||
channel.name = c["name"]
|
||||
channel.category = c["category"]
|
||||
channel.platform = c["platform"]
|
||||
channel.url = c["url"]
|
||||
channel.screenname = c["screenname"]
|
||||
channel.country = c["country"]
|
||||
channel.influencer = c["influencer"]
|
||||
channel.public = c["public"]
|
||||
channel.chat = c["chat"]
|
||||
channel.notes = c["notes"]
|
||||
channel.source = "researcher"
|
||||
|
||||
session.flush()
|
||||
session.commit()
|
||||
|
||||
wks.update_cell(row, 1, channel.id)
|
||||
time.sleep(1)
|
||||
|
||||
else:
|
||||
channel = session.query(Channel).filter_by(id=int(c["id"])).first()
|
||||
|
||||
@@ -87,6 +114,7 @@ def sync_channels(args):
|
||||
channel.public = c["public"]
|
||||
channel.chat = c["chat"]
|
||||
channel.notes = c["notes"]
|
||||
channel.source = "researcher"
|
||||
|
||||
session.flush()
|
||||
session.commit()
|
||||
@@ -122,6 +150,18 @@ def get_scraper_controller():
|
||||
|
||||
return controller
|
||||
|
||||
def get_transformer_controller():
|
||||
engine = create_engine(os.environ["DB"])
|
||||
|
||||
controller = ETLController()
|
||||
controller.connect_to_db(engine)
|
||||
|
||||
transformers = [TelegramTelethonTransformer()]
|
||||
|
||||
controller.register_transformers(transformers)
|
||||
|
||||
return controller
|
||||
|
||||
|
||||
def scrape_channels(args):
|
||||
logger.info(f"Scraping channels, media: {args.media}")
|
||||
@@ -143,6 +183,12 @@ def archive_media(args):
|
||||
controller = get_scraper_controller()
|
||||
controller.archive_unarchived_media()
|
||||
|
||||
def transform(args):
|
||||
logger.info(f"Transforming untransformed media")
|
||||
|
||||
controller = get_transformer_controller()
|
||||
controller.transform_all_untransformed()
|
||||
|
||||
|
||||
def init_db():
|
||||
engine = create_engine(os.environ["DB"])
|
||||
@@ -182,5 +228,8 @@ if __name__ == "__main__":
|
||||
elif args.command == "channel-info":
|
||||
logger.add("logs/channel-info.log", level="TRACE", rotation="100 MB")
|
||||
scrape_channel_info(args)
|
||||
elif args.command == "transform":
|
||||
logger.add("logs/transform.log", level="TRACE", rotation="100 MB")
|
||||
transform(args)
|
||||
else:
|
||||
logger.error(f"Unrecognized command {args.command}")
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from typing import List
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
import tempfile
|
||||
import json
|
||||
@@ -10,6 +10,11 @@ from sqlalchemy import Table, Column, Integer, String, JSON, DateTime, ForeignKe
|
||||
import pytesseract
|
||||
import PIL
|
||||
import exiftool
|
||||
import re
|
||||
from langdetect import detect, DetectorFactory
|
||||
from langdetect.lang_detect_exception import LangDetectException
|
||||
from loguru import logger
|
||||
import spacy
|
||||
|
||||
from .utils import make_request
|
||||
|
||||
@@ -109,6 +114,14 @@ class RawChannelInfo:
|
||||
#: Datetime (relative to UTC) that the scraped post was archived at.
|
||||
date_archived: datetime
|
||||
|
||||
nlp_en = spacy.load('en_core_web_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
|
||||
nlp_de = spacy.load('de_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
|
||||
nlp_it = spacy.load('it_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
|
||||
nlp_fr = spacy.load('fr_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
|
||||
nlp_ru = spacy.load('ru_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
|
||||
nlp_nl = spacy.load('nl_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
|
||||
nlp_xx = spacy.load('xx_ent_wiki_sm')
|
||||
|
||||
@dataclass
|
||||
class Post:
|
||||
"""An object with fields for columns in the analysis table"""
|
||||
@@ -136,6 +149,9 @@ class Post:
|
||||
|
||||
#: Datetime (relative to UTC) that the scraped post was archived at.
|
||||
date_archived: datetime
|
||||
|
||||
#: Datetime (UTC) that the scraped post was transformed at.
|
||||
date_transformed: datetime
|
||||
|
||||
#: URL of the original post
|
||||
url: str
|
||||
@@ -149,6 +165,24 @@ class Post:
|
||||
#: Text of the original post
|
||||
content: str
|
||||
|
||||
#: Named entities detected in post
|
||||
named_entities: list = field(default_factory=list)
|
||||
|
||||
#: Any cryptocurrency addresses in post
|
||||
cryptocurrency_addresses: list = field(default_factory=list)
|
||||
|
||||
#: Hashtags in post
|
||||
hashtags: list = field(default_factory=list)
|
||||
|
||||
#: Links to any other websites
|
||||
outlinks: list = field(default_factory=list)
|
||||
|
||||
#: Detected language of post
|
||||
detected_language: str = ""
|
||||
|
||||
#: Normalized post content
|
||||
normalized_content: str = ""
|
||||
|
||||
#: The ID of the Channel that the post was forwarded or quoted from
|
||||
forwarded_from: int = None
|
||||
|
||||
@@ -156,7 +190,60 @@ class Post:
|
||||
reply_to: int = None
|
||||
|
||||
def hydrate(self):
|
||||
pass
|
||||
URL_REGEX = r"""(?i)\b((?:https?:(?:/{1,3}|[a-z0-9%])|[a-z0-9.\-]+[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)/)(?:[^\s()<>{}\[\]]+|\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\))+(?:\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\)|[^\s`!()\[\]{};:\'\".,<>?«»“”‘’])|(?:(?<!@)[a-z0-9]+(?:[.\-][a-z0-9]+)*[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)\b/?(?!@)))"""
|
||||
|
||||
# replace is here in order to prevent catastrophic backtracking
|
||||
urls = re.findall(URL_REGEX, self.content.replace("::::::::", ""))
|
||||
self.outlinks = urls
|
||||
|
||||
HASHTAG_REGEX = r"(?:^|\s)[##]{1}(\w+)"
|
||||
|
||||
hashtags = re.findall(HASHTAG_REGEX, self.content)
|
||||
self.hashtags = hashtags
|
||||
|
||||
# regex patterns for finding crypto addresses
|
||||
BTC_REGEX = r'\b(bc(0([ac-hj-np-z02-9]{39}|[ac-hj-np-z02-9]{59})|1[ac-hj-np-z02-9]{8,87})|[13][a-km-zA-HJ-NP-Z1-9]{25,35})\b'
|
||||
ETHER_REGEX = r'(0x[a-fA-F0-9]{40})'
|
||||
|
||||
self.cryptocurrency_addresses = [m[0] for m in re.findall(BTC_REGEX, self.content)] + re.findall(ETHER_REGEX, self.content)
|
||||
|
||||
try:
|
||||
self.detected_language = detect(self.content)
|
||||
except LangDetectException:
|
||||
self.detected_language = ""
|
||||
|
||||
self.hydrate_spacy()
|
||||
|
||||
def hydrate_spacy(self):
|
||||
ner_only = False
|
||||
|
||||
if self.detected_language == 'en':
|
||||
nlp = nlp_en
|
||||
elif self.detected_language == 'de':
|
||||
nlp = nlp_de
|
||||
elif self.detected_language == 'it':
|
||||
nlp = nlp_it
|
||||
elif self.detected_language == 'fr':
|
||||
nlp = nlp_fr
|
||||
elif self.detected_language == 'ru':
|
||||
nlp = nlp_ru
|
||||
elif self.detected_language == 'nl':
|
||||
nlp = nlp_nl
|
||||
else:
|
||||
logger.info(f"No language model for {self.detected_language}")
|
||||
nlp = nlp_xx
|
||||
ner_only = True
|
||||
|
||||
doc = nlp(self.content)
|
||||
|
||||
if not ner_only:
|
||||
punctuation = ['?',':','!',',','.',';','|','(',')','--','#','=','+']
|
||||
tokens = [t.lemma_ for t in doc if not t.is_stop and t.lemma_ not in punctuation]
|
||||
self.normalized_content = ' '.join(tokens)
|
||||
else:
|
||||
self.normalized_content = ''
|
||||
|
||||
self.named_entities = [{'text': ent.text, 'type': ent.label_} for ent in doc.ents]
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -247,7 +334,7 @@ raw_posts_table = Table('raw_posts', mapper_registry.metadata,
|
||||
Column('scraper', String),
|
||||
Column('platform', String),
|
||||
Column('channel', Integer, ForeignKey('channels.id'), index=True),
|
||||
Column('platform_id', String),
|
||||
Column('platform_id', String, index=True),
|
||||
Column('date', DateTime, index=True),
|
||||
Column('raw_data', String),
|
||||
Column('date_archived', DateTime, index=True),
|
||||
@@ -282,19 +369,26 @@ post_table = Table('posts', mapper_registry.metadata,
|
||||
Column('id', Integer, primary_key=True,
|
||||
autoincrement=True),
|
||||
Column('raw_id', Integer, ForeignKey('raw_posts.id'), index=True),
|
||||
Column('platform_id', Integer),
|
||||
Column('platform_id', Integer, index=True),
|
||||
Column('scraper', String),
|
||||
Column('transformer', String),
|
||||
Column('platform', String),
|
||||
Column('channel', Integer, ForeignKey('channels.id'), index=True),
|
||||
Column('date', DateTime, index=True),
|
||||
Column('date_archived', DateTime, index=True),
|
||||
Column('date_transformed', DateTime, index=True),
|
||||
Column('url', String),
|
||||
Column('author_id', String),
|
||||
Column('author_username', String),
|
||||
Column('content', String),
|
||||
Column('forwarded_from', Integer, ForeignKey('channels.id'), index=True),
|
||||
Column('reply_to', Integer, ForeignKey('posts.id'), index=True)
|
||||
Column('reply_to', Integer, ForeignKey('posts.id', ondelete="CASCADE"), index=True),
|
||||
Column('named_entities', JSON),
|
||||
Column('cryptocurrency_addresses', JSON),
|
||||
Column('hashtags', JSON),
|
||||
Column('outlinks', JSON),
|
||||
Column('detected_language', String),
|
||||
Column('normalized_content', String)
|
||||
)
|
||||
|
||||
media_table = Table('media', mapper_registry.metadata,
|
||||
|
||||
@@ -6,7 +6,6 @@ from .gettr import GettrScraper
|
||||
from .instagram import InstagramScraper
|
||||
from .odysee import OdyseeScraper
|
||||
from .rumble import RumbleScraper
|
||||
from .telegram_snscrape import TelegramSnscrapeScraper
|
||||
from .telegram_telethon import TelegramTelethonScraper
|
||||
from .twitter import TwitterScraper
|
||||
from .vkontakte import VkontakteScraper
|
||||
|
||||
@@ -1,70 +0,0 @@
|
||||
from typing import Generator
|
||||
from datetime import datetime, timezone
|
||||
import json
|
||||
import snscrape.modules
|
||||
from loguru import logger
|
||||
|
||||
from cisticola.base import Channel, ScraperResult, RawChannelInfo
|
||||
from cisticola.scraper.base import Scraper
|
||||
|
||||
class TelegramSnscrapeScraper(Scraper):
|
||||
"""An implementation of a Scraper for Telegram, using snscrape library"""
|
||||
__version__ = "TelegramSnscrapeScraper 0.0.0"
|
||||
|
||||
def can_handle(self, channel):
|
||||
if channel.platform == "Telegram" and channel.public and not channel.chat:
|
||||
return True
|
||||
|
||||
@logger.catch
|
||||
def get_posts(self, channel: Channel, since: ScraperResult = None, archive_media: bool = True) -> Generator[ScraperResult, None, None]:
|
||||
scr = snscrape.modules.telegram.TelegramChannelScraper(
|
||||
channel.screenname)
|
||||
|
||||
g = scr.get_items()
|
||||
|
||||
for post in g:
|
||||
if since is not None and post.date.replace(tzinfo=timezone.utc) <= since.date.replace(tzinfo=timezone.utc):
|
||||
logger.info(f'Timestamp of post {post} is earlier than the previous archived timestamp {post.date.replace(tzinfo=timezone.utc)}')
|
||||
break
|
||||
|
||||
logger.info(f'Processing post {post}')
|
||||
|
||||
archived_urls = {}
|
||||
|
||||
for image_url in post.images:
|
||||
archived_urls[image_url] = None
|
||||
|
||||
for video_url in post.videos:
|
||||
archived_urls[video_url] = None
|
||||
|
||||
if archive_media:
|
||||
for url in archived_urls:
|
||||
media_blob, content_type, key = self.url_to_blob(url)
|
||||
archived_url = self.archive_blob(media_blob, content_type, key)
|
||||
archived_urls[url] = archived_url
|
||||
|
||||
yield ScraperResult(
|
||||
scraper=self.__version__,
|
||||
platform="Telegram",
|
||||
channel=channel.id,
|
||||
platform_id=post.url,
|
||||
date=post.date,
|
||||
date_archived=datetime.now(timezone.utc),
|
||||
raw_data=post.json(),
|
||||
archived_urls=archived_urls,
|
||||
media_archived=datetime.now(timezone.utc) if archive_media else None
|
||||
)
|
||||
|
||||
@logger.catch
|
||||
def get_profile(self, channel: Channel) -> RawChannelInfo:
|
||||
|
||||
scr = snscrape.modules.telegram.TelegramChannelScraper(
|
||||
channel.screenname)
|
||||
|
||||
profile = scr._get_entity().__dict__
|
||||
|
||||
return RawChannelInfo(scraper=self.__version__,
|
||||
platform=channel.platform,
|
||||
channel=channel.id,
|
||||
raw_data=json.dumps(profile),
|
||||
date_archived=datetime.now(timezone.utc))
|
||||
@@ -1,3 +1,4 @@
|
||||
from .base import ETLController
|
||||
from .twitter import TwitterTransformer
|
||||
from .bitchute import BitchuteTransformer
|
||||
from .bitchute import BitchuteTransformer
|
||||
from .telegram_telethon import TelegramTelethonTransformer
|
||||
@@ -2,7 +2,9 @@ from typing import List, Generator, Union, Callable
|
||||
from loguru import logger
|
||||
from sqlalchemy.orm import sessionmaker, make_transient
|
||||
from sqlalchemy.engine.base import Engine
|
||||
from sqlalchemy.sql.expression import func
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
|
||||
from cisticola.base import ScraperResult, Post, Media, Channel, mapper_registry
|
||||
|
||||
@@ -68,6 +70,10 @@ class ETLController:
|
||||
|
||||
self.transformers.append(transformer)
|
||||
|
||||
def register_transformers(self, transformers):
|
||||
for t in transformers:
|
||||
self.register_transformer(t)
|
||||
|
||||
def connect_to_db(self, engine: Engine):
|
||||
"""Connects the ETLController to a SQLAlchemy engine.
|
||||
|
||||
@@ -90,10 +96,11 @@ class ETLController:
|
||||
|
||||
# This is using some adhoc unique constraints that might be worth formalizing at some point
|
||||
if type(obj) == Channel:
|
||||
instance = session.query(Channel).filter_by(url=obj.url, platform_id=obj.platform_id, platform=obj.platform).first()
|
||||
instance = session.query(Channel).filter_by(url=obj.url, platform_id=str(obj.platform_id), platform=obj.platform).first()
|
||||
|
||||
elif type(obj) == Post:
|
||||
instance = session.query(Post).filter_by(platform=obj.platform, platform_id=obj.platform_id).first()
|
||||
instance = None
|
||||
# instance = session.query(Post).filter_by(platform=obj.platform, platform_id=obj.platform_id).first()
|
||||
|
||||
elif issubclass(type(obj), Media):
|
||||
instance = session.query(type(obj)).filter_by(original_url=obj.original_url, post=obj.post).first()
|
||||
@@ -125,9 +132,9 @@ class ETLController:
|
||||
if hydrate:
|
||||
obj.hydrate()
|
||||
|
||||
logger.info(f"Inserting new object {obj}")
|
||||
session.add(obj)
|
||||
session.flush()
|
||||
logger.trace(f"Inserted new object {obj}")
|
||||
|
||||
return obj
|
||||
|
||||
@logger.catch(reraise=True)
|
||||
@@ -146,21 +153,24 @@ class ETLController:
|
||||
logger.error("No DB session")
|
||||
return
|
||||
|
||||
session = self.session()
|
||||
|
||||
for result in results:
|
||||
for transformer in self.transformers:
|
||||
handled = False
|
||||
if result.scraper is not None and result.platform is not None:
|
||||
for transformer in self.transformers:
|
||||
handled = False
|
||||
|
||||
if transformer.can_handle(result):
|
||||
logger.info(f"{transformer} is handling result {result}")
|
||||
handled = True
|
||||
session = self.session()
|
||||
if transformer.can_handle(result):
|
||||
logger.trace(f"{transformer} is handling result {result.id} ({result.date})")
|
||||
handled = True
|
||||
|
||||
transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate))
|
||||
session.commit()
|
||||
break
|
||||
transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate), session)
|
||||
|
||||
if handled == False:
|
||||
logger.warning(f"No Transformer could handle {result}")
|
||||
session.commit()
|
||||
break
|
||||
|
||||
if handled == False:
|
||||
logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})")
|
||||
|
||||
@logger.catch(reraise=True)
|
||||
def transform_all_untransformed(self, hydrate: bool = True):
|
||||
@@ -178,12 +188,33 @@ class ETLController:
|
||||
return
|
||||
|
||||
session = self.session()
|
||||
untransformed = (
|
||||
session.query(ScraperResult)
|
||||
|
||||
BATCH_SIZE = 50000
|
||||
offset = 0
|
||||
batch = []
|
||||
|
||||
query = (session.query(ScraperResult)
|
||||
# .filter_by(platform="Telegram")
|
||||
# .filter(ScraperResult.raw_data.notlike("%MessageService%"))
|
||||
.join(Post, isouter=True)
|
||||
.where(Post.raw_id == None)
|
||||
.all()
|
||||
# .order_by(func.random())
|
||||
.order_by(ScraperResult.date.asc())
|
||||
)
|
||||
logger.info(f"Found {len(untransformed)} items to ETL")
|
||||
|
||||
self.transform_results(untransformed, hydrate=hydrate)
|
||||
while len(batch) > 0 or offset == 0:
|
||||
logger.info(f"Fetching untransformed batch of {BATCH_SIZE}, offset {offset}")
|
||||
|
||||
batch = query.slice(offset, offset + BATCH_SIZE).all()
|
||||
# untransformed = (
|
||||
|
||||
# .limit(BATCH_SIZE)
|
||||
# .offset(offset)
|
||||
# .all()
|
||||
# )
|
||||
|
||||
offset += BATCH_SIZE
|
||||
|
||||
logger.info(f"Found {len(batch)} items to ETL ({offset} already processed)")
|
||||
|
||||
self.transform_results(batch, hydrate=hydrate)
|
||||
|
||||
167
cisticola/transformer/telegram_telethon.py
Normal file
167
cisticola/transformer/telegram_telethon.py
Normal file
@@ -0,0 +1,167 @@
|
||||
import json
|
||||
from loguru import logger
|
||||
from typing import Generator, Union, Callable
|
||||
import dateutil.parser
|
||||
from bs4 import BeautifulSoup
|
||||
from psycopg2 import DatabaseError
|
||||
import requests
|
||||
import time
|
||||
from telethon.sync import TelegramClient
|
||||
from telethon.errors.rpcerrorlist import ChannelPrivateError, ChannelInvalidError
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from cisticola.transformer.base import Transformer
|
||||
from cisticola.base import ScraperResult, Post, Image, Video, Media, Channel
|
||||
|
||||
|
||||
class TelegramTelethonTransformer(Transformer):
|
||||
__version__ = 'TelegramTelethonTransformer 0.0.1'
|
||||
|
||||
bad_channels = {}
|
||||
|
||||
def can_handle(self, data: ScraperResult) -> bool:
|
||||
scraper = data.scraper.split(' ')
|
||||
if scraper[0] == "TelegramTelethonScraper":
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def get_screenname_from_id(self, channel_id):
|
||||
api_id = os.environ['TELEGRAM_API_ID']
|
||||
api_hash = os.environ['TELEGRAM_API_HASH']
|
||||
|
||||
try:
|
||||
with TelegramClient("transform.session", api_id, api_hash) as client:
|
||||
data = client.get_entity(channel_id)
|
||||
|
||||
return (data.username, data.title, "")
|
||||
except ChannelPrivateError:
|
||||
logger.info("ChannelPrivateError")
|
||||
return ("", "", "ChannelPrivateError")
|
||||
except ChannelInvalidError:
|
||||
logger.info("ChannelInvalidError")
|
||||
return ("", "", "ChannelInvalidError")
|
||||
except ValueError:
|
||||
logger.info("ValueError")
|
||||
return ("", "", "ValueError")
|
||||
|
||||
def get_name_from_web_interface(self, orig_screenname, id):
|
||||
url = "https://t.me/s/" + orig_screenname + "/" + str(id)
|
||||
|
||||
# this doesn't work for chat channels
|
||||
if orig_screenname in self.bad_channels:
|
||||
logger.debug(f"Skipping screenname because it is not accessible for channel {orig_screenname}")
|
||||
return ""
|
||||
|
||||
logger.info(f"Finding channel from URL {url}")
|
||||
r = requests.get(url)
|
||||
|
||||
if r.url != url:
|
||||
self.bad_channels[orig_screenname] = True
|
||||
return ""
|
||||
|
||||
soup = BeautifulSoup(r.content)
|
||||
post = soup.findAll("div", {"data-post" : orig_screenname + "/" + str(id)})
|
||||
name = ""
|
||||
|
||||
# multiple posts can be combined into one result in the web interface
|
||||
decrement = 0
|
||||
while len(post) == 0:
|
||||
decrement += 1
|
||||
if decrement > 8:
|
||||
break
|
||||
|
||||
logger.info(f"Could not find post from {url}, looking for id {id - decrement}")
|
||||
post = soup.findAll("div", {"data-post" : orig_screenname + "/" + str(id - decrement)})
|
||||
|
||||
if len(post) == 0:
|
||||
logger.warning(f"Could not find post from {url}")
|
||||
else:
|
||||
fwd_tag = post[0].findAll("a", {"class", "tgme_widget_message_forwarded_from_name"})
|
||||
|
||||
if len(fwd_tag) == 0:
|
||||
fwd_tag = post[0].findAll("span", {"class", "tgme_widget_message_forwarded_from_name"})
|
||||
|
||||
if len(fwd_tag) >= 1:
|
||||
name = fwd_tag[0].text
|
||||
|
||||
return name
|
||||
|
||||
def transform(self, data: ScraperResult, insert: Callable, session) -> Generator[Union[Post, Channel, Media], None, None]:
|
||||
raw = json.loads(data.raw_data)
|
||||
|
||||
if raw['_'] != 'Message':
|
||||
logger.warning(f"Cannot convert type {raw['_']} to post")
|
||||
return
|
||||
|
||||
fwd_from = None
|
||||
|
||||
if raw['fwd_from'] and raw['fwd_from']['from_id'] and 'channel_id' in raw['fwd_from']['from_id']:
|
||||
channel = session.query(Channel).filter_by(platform_id=str(raw['fwd_from']['from_id']['channel_id'])).first()
|
||||
|
||||
if channel is None:
|
||||
(screenname, name, notes) = self.get_screenname_from_id(raw['fwd_from']['from_id']['channel_id'])
|
||||
|
||||
if name == "":
|
||||
logger.info("Trying fallback web interface")
|
||||
orig_channel = session.query(Channel).filter_by(id=data.channel).first()
|
||||
if orig_channel.screenname is not None:
|
||||
name = self.get_name_from_web_interface(orig_channel.screenname, raw['id'])
|
||||
|
||||
channel = Channel(
|
||||
name=name,
|
||||
platform_id=raw['fwd_from']['from_id']['channel_id'],
|
||||
platform=data.platform,
|
||||
url="https://t.me/s/" + screenname if screenname is not None else "",
|
||||
screenname=screenname,
|
||||
category='forwarded',
|
||||
source=self.__version__,
|
||||
notes=notes
|
||||
)
|
||||
|
||||
channel = insert(channel)
|
||||
logger.info(f"Added {channel}")
|
||||
|
||||
fwd_from = channel.id
|
||||
|
||||
reply_to = None
|
||||
if raw['reply_to']:
|
||||
reply_to_id = raw['reply_to']['reply_to_msg_id']
|
||||
post = session.query(Post).filter_by(channel=data.channel, platform_id=reply_to_id).first()
|
||||
if post is None:
|
||||
reply_to = -1
|
||||
else:
|
||||
reply_to = post.id
|
||||
|
||||
transformed = Post(
|
||||
raw_id = data.id,
|
||||
platform_id = raw['id'],
|
||||
scraper = data.scraper,
|
||||
transformer=self.__version__,
|
||||
platform=data.platform,
|
||||
channel=data.channel,
|
||||
date=dateutil.parser.parse(raw['date']),
|
||||
date_archived=data.date_archived,
|
||||
date_transformed=datetime.now(timezone.utc),
|
||||
url="",
|
||||
content=raw['message'],
|
||||
author_id=raw['post_author'],
|
||||
author_username="",
|
||||
forwarded_from=fwd_from,
|
||||
reply_to=reply_to
|
||||
)
|
||||
|
||||
transformed = insert(transformed)
|
||||
|
||||
# for k in data.archived_urls:
|
||||
# if data.archived_urls[k]:
|
||||
# archived_url = data.archived_urls[k]
|
||||
# ext = archived_url.split('.')[-1]
|
||||
|
||||
# if ext == 'mp4' or ext == 'mov' or ext == 'avi' or ext =='mkv':
|
||||
# insert(Video(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))
|
||||
# else:
|
||||
# insert(Image(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k))
|
||||
|
||||
|
||||
@@ -1,138 +0,0 @@
|
||||
import sys
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from loguru import logger
|
||||
|
||||
from cisticola.base import Channel
|
||||
from cisticola.scraper import (
|
||||
ScraperController,
|
||||
TelegramSnscrapeScraper)
|
||||
|
||||
logger.remove()
|
||||
logger.add(sys.stderr, level="INFO")
|
||||
logger.add("../russian_telegram_ingest.log")
|
||||
|
||||
test_channels = [
|
||||
Channel(
|
||||
id=0,
|
||||
name="QAnon Россия",
|
||||
platform_id=-1001319637748,
|
||||
category="Qanon",
|
||||
followers=94048,
|
||||
platform="Telegram",
|
||||
url="https://t.me/qanonrus",
|
||||
screenname="qanonrus",
|
||||
country="RU",
|
||||
influencer=None,
|
||||
public=True,
|
||||
chat=False,
|
||||
notes=""),
|
||||
Channel(
|
||||
id=1,
|
||||
name="The Great Awakening | Q",
|
||||
platform_id=-1001325597521,
|
||||
category="Qanon",
|
||||
followers=5715,
|
||||
platform="Telegram",
|
||||
url="https://t.me/greatawakin",
|
||||
screenname="greatawakin",
|
||||
country="RU",
|
||||
influencer=None,
|
||||
public=True,
|
||||
chat=False,
|
||||
notes=""),
|
||||
Channel(
|
||||
id=2,
|
||||
name="Великое Пробуждение",
|
||||
platform_id=-1001285898079,
|
||||
category="Qanon",
|
||||
followers=5861,
|
||||
platform="Telegram",
|
||||
url="https://t.me/greatawakeningrus",
|
||||
screenname="greatawakeningrus",
|
||||
country="RU",
|
||||
influencer=None,
|
||||
public=True,
|
||||
chat=False,
|
||||
notes=""),
|
||||
Channel(
|
||||
id=3,
|
||||
name="T🕊Редакция Президент Гордон🕊",
|
||||
platform_id=-1001101170442,
|
||||
category="Qanon",
|
||||
followers=5743,
|
||||
platform="Telegram",
|
||||
url="https://t.me/prezidentgordonteam",
|
||||
screenname="prezidentgordonteam",
|
||||
country="RU",
|
||||
influencer=None,
|
||||
public=True,
|
||||
chat=False,
|
||||
notes=""),
|
||||
Channel(
|
||||
id=4,
|
||||
name="ПРОЕКТ АВРОРА",
|
||||
platform_id=-1001279171101,
|
||||
category="Qanon",
|
||||
followers=5930,
|
||||
platform="Telegram",
|
||||
url="https://t.me/project_aurora",
|
||||
screenname="project_aurora",
|
||||
country="RU",
|
||||
influencer=None,
|
||||
public=True,
|
||||
chat=False,
|
||||
notes=""),
|
||||
Channel(
|
||||
id=5,
|
||||
name="Сон Разума",
|
||||
platform_id=-1001202338312,
|
||||
category="Qanon",
|
||||
followers=27099,
|
||||
platform="Telegram",
|
||||
url="https://t.me/error_288",
|
||||
screenname="error_288",
|
||||
country="RU",
|
||||
influencer=None,
|
||||
public=True,
|
||||
chat=False,
|
||||
notes=""),
|
||||
Channel(
|
||||
id=6,
|
||||
name="Пробуждающий Мир - официальный канал",
|
||||
platform_id=-1001492521207,
|
||||
category="Qanon",
|
||||
followers=19097,
|
||||
platform="Telegram",
|
||||
url="https://t.me/promirru",
|
||||
screenname="promirru",
|
||||
country="RU",
|
||||
influencer=None,
|
||||
public=True,
|
||||
chat=False,
|
||||
notes=""),
|
||||
Channel(
|
||||
id=7,
|
||||
name="ЦЕЛЬНОЗОР",
|
||||
platform_id=-1001642737506,
|
||||
category="Qanon",
|
||||
followers=13654,
|
||||
platform="Telegram",
|
||||
url="https://t.me/tselnozor",
|
||||
screenname="tselnozor",
|
||||
country="RU",
|
||||
influencer=None,
|
||||
public=True,
|
||||
chat=False,
|
||||
notes=""),]
|
||||
|
||||
controller = ScraperController()
|
||||
|
||||
telegram = TelegramSnscrapeScraper()
|
||||
controller.register_scraper(telegram)
|
||||
|
||||
engine = create_engine('sqlite:///russian_telegram.db')
|
||||
controller.connect_to_db(engine)
|
||||
|
||||
controller.scrape_channels(test_channels, archive_media = False)
|
||||
|
||||
Reference in New Issue
Block a user