formatted with black, added pre-commit hook, pegged typing_extensions package version to fix spaCy issue

This commit is contained in:
Tristan Lee
2023-08-04 14:51:00 -05:00
parent 070ee3391d
commit fab65a5d67
25 changed files with 3043 additions and 2176 deletions

10
.github/workflows/black.yml vendored Normal file
View File

@@ -0,0 +1,10 @@
name: Lint
on: [push]
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: psf/black@stable

6
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,6 @@
repos:
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
language_version: python3.9

View File

@@ -24,6 +24,8 @@ ratelimit = "*"
pytz = "*"
langdetect = "*"
spacy = "==3.2.4"
# Temporary fix for https://github.com/explosion/spaCy/issues/12659
typing_extensions = "==4.4.0"
ocrd-pyexiftool = "*"
filelock = "*"
telethon = "*"
@@ -38,6 +40,7 @@ pytest-metadata = "*"
black = "*"
Sphinx = "*"
sphinx-rtd-theme = "*"
pre-commit = "*"
[requires]
python_version = "3.9"

2318
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

80
app.py
View File

@@ -10,7 +10,6 @@ import sys
from cisticola.base import mapper_registry
from cisticola.scraper import (
ScraperController,
# VkontakteScraper,
TelegramTelethonScraper,
GettrScraper,
BitchuteScraper,
@@ -22,11 +21,11 @@ from cisticola.transformer import (
GettrTransformer,
RumbleTransformer,
BitchuteTransformer,
# VkontakteTransformer,
)
from sync_with_gsheet import sync_channels
def get_db_session():
engine = create_engine(os.environ["DB"])
@@ -52,12 +51,14 @@ def get_scraper_controller(args):
TelegramTelethonScraper(telethon_session_name=telethon_session_name),
GettrScraper(),
BitchuteScraper(),
RumbleScraper()]
RumbleScraper(),
]
controller.register_scrapers(scrapers)
return controller
def get_transformer_controller(args):
engine = create_engine(os.environ["DB"])
@@ -73,7 +74,8 @@ def get_transformer_controller(args):
TelegramTelethonTransformer(telethon_session_name=telethon_session_name),
GettrTransformer(),
BitchuteTransformer(),
RumbleTransformer()]
RumbleTransformer(),
]
controller.register_transformers(transformers)
@@ -86,12 +88,14 @@ def scrape_channels(args):
controller = get_scraper_controller(args)
controller.scrape_all_channels()
def scrape_channels_old(args):
logger.info(f"Scraping old posts from channels")
controller = get_scraper_controller(args)
controller.scrape_all_channels(fetch_old=True)
def scrape_channel_info(args):
logger.info(f"Scraping channel info")
@@ -109,6 +113,7 @@ def archive_media(args):
else:
controller.archive_unarchived_media()
def transform(args):
logger.info(f"Transforming untransformed posts")
@@ -121,6 +126,7 @@ def transform(args):
controller.transform_all_untransformed(min_date=min_date)
def transform_info(args):
logger.info(f"Transforming untransformed channel info")
@@ -129,12 +135,14 @@ def transform_info(args):
# sync_channels(args, get_db_session())
def transform_media(args):
logger.info(f"Transforming untransformed channel media")
controller = get_transformer_controller(args)
controller.transform_all_untransformed_media()
def init_db():
engine = create_engine(os.environ["DB"])
mapper_registry.metadata.create_all(bind=engine)
@@ -162,29 +170,77 @@ if __name__ == "__main__":
if args.command == "init-db":
init_db()
elif args.command == "sync-channels":
logger.add("logs/sync-channels.log", level="DEBUG", rotation="100 MB", retention="2 weeks", compression="zip")
logger.add(
"logs/sync-channels.log",
level="DEBUG",
rotation="100 MB",
retention="2 weeks",
compression="zip",
)
sync_channels(args, get_db_session())
elif args.command == "scrape-channels":
logger.add("logs/scrape-channels.log", level="DEBUG", rotation="100 MB", retention="2 weeks", compression="zip")
logger.add(
"logs/scrape-channels.log",
level="DEBUG",
rotation="100 MB",
retention="2 weeks",
compression="zip",
)
scrape_channels(args)
elif args.command == "scrape-channels-old":
logger.add("logs/scrape-channels-old.log", level="DEBUG", rotation="100 MB", retention="2 weeks", compression="zip")
logger.add(
"logs/scrape-channels-old.log",
level="DEBUG",
rotation="100 MB",
retention="2 weeks",
compression="zip",
)
scrape_channels_old(args)
elif args.command == "archive-media":
logger.add("logs/archive-media.log", level="DEBUG", rotation="100 MB", retention="2 weeks", compression="zip")
logger.add(
"logs/archive-media.log",
level="DEBUG",
rotation="100 MB",
retention="2 weeks",
compression="zip",
)
archive_media(args)
elif args.command == "channel-info":
logger.add("logs/channel-info.log", level="DEBUG", rotation="100 MB", retention="2 weeks", compression="zip")
logger.add(
"logs/channel-info.log",
level="DEBUG",
rotation="100 MB",
retention="2 weeks",
compression="zip",
)
scrape_channel_info(args)
elif args.command == "transform":
logger.add("logs/transform.log", level="DEBUG", rotation="100 MB", retention="2 weeks", compression="zip")
logger.add(
"logs/transform.log",
level="DEBUG",
rotation="100 MB",
retention="2 weeks",
compression="zip",
)
logger.add("logs/transform_trace.log", level="TRACE", retention="7 days")
transform(args)
elif args.command == "transform-info":
logger.add("logs/transform-info.log", level="DEBUG", rotation="100 MB", retention="2 weeks", compression="zip")
logger.add(
"logs/transform-info.log",
level="DEBUG",
rotation="100 MB",
retention="2 weeks",
compression="zip",
)
transform_info(args)
elif args.command == "transform-media":
logger.add("logs/transform-media.log", level="DEBUG", rotation="100 MB", retention="2 weeks", compression="zip")
logger.add(
"logs/transform-media.log",
level="DEBUG",
rotation="100 MB",
retention="2 weeks",
compression="zip",
)
transform_media(args)
else:
logger.error(f"Unrecognized command {args.command}")

View File

@@ -6,7 +6,17 @@ import json
import io
from sqlalchemy.orm import registry
from sqlalchemy import Table, Column, Integer, String, JSON, DateTime, ForeignKey, Boolean, Index
from sqlalchemy import (
Table,
Column,
Integer,
String,
JSON,
DateTime,
ForeignKey,
Boolean,
Index,
)
from sqlalchemy.dialects.postgresql import JSONB
import pytesseract
import PIL
@@ -22,10 +32,10 @@ from .utils import make_request
# Disable decompression bomb check
PIL.Image.MAX_IMAGE_PIXELS = 1024 * 1024 * 256
@dataclass
class ScraperResult:
"""Minimally processed set of information from a scraper about one post
"""
"""Minimally processed set of information from a scraper about one post"""
#: String specifying name and version of scraper used to generate result, e.g. ``"TwitterScraper 0.0.1"``.
scraper: str
@@ -54,10 +64,10 @@ class ScraperResult:
#: What date was the media archived? (None if not archived)
media_archived: datetime
@dataclass
class Channel:
"""Information about a specific channel to be scraped.
"""
"""Information about a specific channel to be scraped."""
#: Name of channel (different from username because it can be non-unique and contain emojis), e.g. ``T🕊Редакция Президент Гордон🕊"``.
name: str
@@ -98,10 +108,10 @@ class Channel:
def hydrate(self):
pass
@dataclass
class RawChannelInfo:
"""Minimally processed set of information from a scraper about one channel
"""
"""Minimally processed set of information from a scraper about one channel"""
#: String specifying name and version of scraper used to generate result, e.g. ``"TwitterScraper 0.0.1"``.
scraper: str
@@ -118,10 +128,10 @@ class RawChannelInfo:
#: Datetime (relative to UTC) that the scraped post was archived at.
date_archived: datetime
@dataclass
class ChannelInfo:
"""A processed set of information about a channel.
"""
"""A processed set of information about a channel."""
# Foreign key from the raw_channel_info table
raw_channel_info_id: int
@@ -161,13 +171,15 @@ class ChannelInfo:
def hydrate(self):
pass
nlp_en = spacy.load('en_core_web_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
nlp_de = spacy.load('de_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
nlp_it = spacy.load('it_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
nlp_fr = spacy.load('fr_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
nlp_ru = spacy.load('ru_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
nlp_nl = spacy.load('nl_core_news_sm', disable=['parser', 'tok2vec', 'attribute_ruler'])
nlp_xx = spacy.load('xx_ent_wiki_sm')
nlp_en = spacy.load("en_core_web_sm", disable=["parser", "tok2vec", "attribute_ruler"])
nlp_de = spacy.load("de_core_news_sm", disable=["parser", "tok2vec", "attribute_ruler"])
nlp_it = spacy.load("it_core_news_sm", disable=["parser", "tok2vec", "attribute_ruler"])
nlp_fr = spacy.load("fr_core_news_sm", disable=["parser", "tok2vec", "attribute_ruler"])
nlp_ru = spacy.load("ru_core_news_sm", disable=["parser", "tok2vec", "attribute_ruler"])
nlp_nl = spacy.load("nl_core_news_sm", disable=["parser", "tok2vec", "attribute_ruler"])
nlp_xx = spacy.load("xx_ent_wiki_sm")
@dataclass
class Post:
@@ -258,7 +270,9 @@ class Post:
URL_REGEX = r"""(?i)\b((?:https?:(?:/{1,3}|[a-z0-9%])|[a-z0-9.\-]+[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)/)(?:[^\s()<>{}\[\]]+|\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\))+(?:\([^\s()]*?\([^\s()]+\)[^\s()]*?\)|\([^\s]+?\)|[^\s`!()\[\]{};:\'\".,<>?«»“”‘’])|(?:(?<!@)[a-z0-9]+(?:[.\-][a-z0-9]+)*[.](?:com|net|org|edu|gov|mil|aero|asia|biz|cat|coop|info|int|jobs|mobi|museum|name|post|pro|tel|travel|xxx|ac|ad|ae|af|ag|ai|al|am|an|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bm|bn|bo|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cs|cu|cv|cx|cy|cz|dd|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|eu|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|Ja|sk|sl|sm|sn|so|sr|ss|st|su|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tp|tr|tt|tv|tw|tz|ua|ug|uk|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|yu|za|zm|zw)\b/?(?!@)))"""
# replace is here in order to prevent catastrophic backtracking
urls = re.findall(URL_REGEX, self.content.replace("::::::::", "").replace("........", ""))
urls = re.findall(
URL_REGEX, self.content.replace("::::::::", "").replace("........", "")
)
self.outlinks += urls
self.outlinks = list(set(outlink for outlink in self.outlinks))
@@ -269,10 +283,12 @@ class Post:
self.hashtags = list(set(hashtag.lower() for hashtag in self.hashtags))
# regex patterns for finding crypto addresses
BTC_REGEX = r'\b(bc(0([ac-hj-np-z02-9]{39}|[ac-hj-np-z02-9]{59})|1[ac-hj-np-z02-9]{8,87})|[13][a-km-zA-HJ-NP-Z1-9]{25,35})\b'
ETHER_REGEX = r'(0x[a-fA-F0-9]{40})'
BTC_REGEX = r"\b(bc(0([ac-hj-np-z02-9]{39}|[ac-hj-np-z02-9]{59})|1[ac-hj-np-z02-9]{8,87})|[13][a-km-zA-HJ-NP-Z1-9]{25,35})\b"
ETHER_REGEX = r"(0x[a-fA-F0-9]{40})"
self.cryptocurrency_addresses = [m[0] for m in re.findall(BTC_REGEX, self.content)] + re.findall(ETHER_REGEX, self.content)
self.cryptocurrency_addresses = [
m[0] for m in re.findall(BTC_REGEX, self.content)
] + re.findall(ETHER_REGEX, self.content)
try:
self.detected_language = detect(self.content)
@@ -288,17 +304,17 @@ class Post:
def hydrate_spacy(self):
ner_only = False
if self.detected_language == 'en':
if self.detected_language == "en":
nlp = nlp_en
elif self.detected_language == 'de':
elif self.detected_language == "de":
nlp = nlp_de
elif self.detected_language == 'it':
elif self.detected_language == "it":
nlp = nlp_it
elif self.detected_language == 'fr':
elif self.detected_language == "fr":
nlp = nlp_fr
elif self.detected_language == 'ru':
elif self.detected_language == "ru":
nlp = nlp_ru
elif self.detected_language == 'nl':
elif self.detected_language == "nl":
nlp = nlp_nl
else:
nlp = nlp_xx
@@ -307,19 +323,36 @@ class Post:
doc = nlp(self.content)
if not ner_only:
punctuation = ['?',':','!',',','.',';','|','(',')','--','#','=','+']
tokens = [t.lemma_ for t in doc if not t.is_stop and t.lemma_ not in punctuation]
self.normalized_content = ' '.join(tokens)
punctuation = [
"?",
":",
"!",
",",
".",
";",
"|",
"(",
")",
"--",
"#",
"=",
"+",
]
tokens = [
t.lemma_ for t in doc if not t.is_stop and t.lemma_ not in punctuation
]
self.normalized_content = " ".join(tokens)
else:
self.normalized_content = ''
self.normalized_content = ""
self.named_entities = [{'text': ent.text, 'type': ent.label_} for ent in doc.ents]
self.named_entities = [
{"text": ent.text, "type": ent.label_} for ent in doc.ents
]
@dataclass
class Media:
"""Base class for organizing information about a media file.
"""
"""Base class for organizing information about a media file."""
#: ID number of the media's corresponding scraped post in the ``raw_posts`` table.
raw_id: int
@@ -355,16 +388,14 @@ class Media:
exif: str = None
def get_blob(self):
"""Download media file as bytes blob.
"""
"""Download media file as bytes blob."""
blob = make_request(self.url)
return blob.content
@logger.catch
def hydrate(self, blob=None):
"""Download media file as bytes blob and extract data from content.
"""
"""Download media file as bytes blob and extract data from content."""
if blob is None:
blob = self.get_blob()
@@ -372,8 +403,7 @@ class Media:
self.hydrate_exif(blob)
def hydrate_exif(self, blob):
"""Extract Exif metadata from bytes blob.
"""
"""Extract Exif metadata from bytes blob."""
with tempfile.NamedTemporaryFile() as temp_file:
temp_file.write(blob)
@@ -382,10 +412,10 @@ class Media:
exif = et.get_metadata(temp_file.name)
self.exif = json.dumps(exif)
@dataclass
class Image(Media):
"""Class for organizing information about an image file.
"""
"""Class for organizing information about an image file."""
#: Extracted OCR content from image
ocr: str = None
@@ -403,135 +433,152 @@ class Image(Media):
self.hydrate_ocr(blob)
def hydrate_ocr(self, blob):
"""Extract OCR (optical character recognition) data from image bytes blob.
"""
"""Extract OCR (optical character recognition) data from image bytes blob."""
image = PIL.Image.open(io.BytesIO(blob))
self.ocr = pytesseract.image_to_string(image)
@dataclass
class Video(Media):
"""Class for organizing information about an video file.
"""
"""Class for organizing information about an video file."""
pass
@dataclass
class Audio(Media):
"""Class for organizing information about an audio file.
"""
"""Class for organizing information about an audio file."""
pass
mapper_registry = registry()
raw_posts_table = Table('raw_posts', mapper_registry.metadata,
Column('id', Integer, primary_key=True,
autoincrement=True),
Column('scraper', String),
Column('platform', String),
Column('channel', Integer, ForeignKey('channels.id'), index=True),
Column('platform_id', String, index=True),
Column('date', DateTime, index=True),
Column('raw_data', String),
Column('date_archived', DateTime, index=True),
Column('archived_urls', JSON),
Column('media_archived', DateTime, index=True))
raw_channel_info_table = Table('raw_channel_info', mapper_registry.metadata,
Column('id', Integer, primary_key=True),
Column('scraper', String),
Column('platform', String),
Column('channel', Integer, ForeignKey('channels.id'), index=True),
Column('raw_data', String),
Column('date_archived', DateTime, index=True))
channel_info_table = Table('channel_info', mapper_registry.metadata,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('raw_channel_info_id', Integer, ForeignKey('raw_channel_info.id'), index=True),
Column('channel', Integer, ForeignKey('channels.id'), index=True),
Column('platform_id', String),
Column('scraper', String),
Column('transformer', String),
Column('platform', String),
Column('screenname', String),
Column('name', String),
Column('description', String),
Column('description_url', String),
Column('description_location', String),
Column('followers', Integer),
Column('following', Integer),
Column('verified', Boolean),
Column('date_created', DateTime),
Column('date_archived', DateTime, index=True),
Column('date_transformed', DateTime, index=True),
raw_posts_table = Table(
"raw_posts",
mapper_registry.metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("scraper", String),
Column("platform", String),
Column("channel", Integer, ForeignKey("channels.id"), index=True),
Column("platform_id", String, index=True),
Column("date", DateTime, index=True),
Column("raw_data", String),
Column("date_archived", DateTime, index=True),
Column("archived_urls", JSON),
Column("media_archived", DateTime, index=True),
)
channel_table = Table('channels', mapper_registry.metadata,
Column('id', Integer, primary_key=True, autoincrement=True),
Column('name', String),
Column('platform_id', String),
Column('category', String),
Column('platform', String),
Column('url', String),
Column('screenname', String),
Column('country', JSONB, index = True),
Column('influencer', String),
Column('public', Boolean),
Column('chat', Boolean),
Column('notes', String),
Column('source', String)
raw_channel_info_table = Table(
"raw_channel_info",
mapper_registry.metadata,
Column("id", Integer, primary_key=True),
Column("scraper", String),
Column("platform", String),
Column("channel", Integer, ForeignKey("channels.id"), index=True),
Column("raw_data", String),
Column("date_archived", DateTime, index=True),
)
post_table = Table('posts', mapper_registry.metadata,
Column('id', Integer, primary_key=True,
autoincrement=True),
Column('raw_id', Integer, ForeignKey('raw_posts.id'), index=True),
Column('platform_id', String, index=True),
Column('scraper', String),
Column('transformer', String),
Column('platform', String),
Column('channel', Integer, ForeignKey('channels.id'), index=True),
Column('date', DateTime, index=True),
Column('date_archived', DateTime, index=True),
Column('date_transformed', DateTime, index=True),
Column('url', String),
Column('author_id', String),
Column('author_username', String),
Column('content', String),
Column('forwarded_from', Integer, ForeignKey('channels.id'), index=True),
Column('reply_to', Integer, ForeignKey('posts.id'), index=True),
Column('named_entities', JSON),
Column('cryptocurrency_addresses', JSON),
Column('hashtags', JSON),
Column('outlinks', JSON),
Column('mentions', JSON),
Column('likes', Integer),
Column('forwards', Integer),
Column('views', Integer),
Column('video_title', String),
Column('video_duration', Integer),
Column('detected_language', String, index = True),
Column('normalized_content', String)
channel_info_table = Table(
"channel_info",
mapper_registry.metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column(
"raw_channel_info_id", Integer, ForeignKey("raw_channel_info.id"), index=True
),
Column("channel", Integer, ForeignKey("channels.id"), index=True),
Column("platform_id", String),
Column("scraper", String),
Column("transformer", String),
Column("platform", String),
Column("screenname", String),
Column("name", String),
Column("description", String),
Column("description_url", String),
Column("description_location", String),
Column("followers", Integer),
Column("following", Integer),
Column("verified", Boolean),
Column("date_created", DateTime),
Column("date_archived", DateTime, index=True),
Column("date_transformed", DateTime, index=True),
)
posts_forwarded_from_channel_index = Index('posts_channel_forwarded_from_idx', post_table.c.channel, post_table.c.forwarded_from)
channel_table = Table(
"channels",
mapper_registry.metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("name", String),
Column("platform_id", String),
Column("category", String),
Column("platform", String),
Column("url", String),
Column("screenname", String),
Column("country", JSONB, index=True),
Column("influencer", String),
Column("public", Boolean),
Column("chat", Boolean),
Column("notes", String),
Column("source", String),
)
media_table = Table('media', mapper_registry.metadata,
Column('id', Integer, primary_key=True,
autoincrement=True),
Column('type', String),
Column('raw_id', Integer, ForeignKey('raw_posts.id'), index=True),
Column('post', Integer, ForeignKey('posts.id'), index=True),
Column('url', String),
Column('original_url', String),
Column('exif', String),
Column('ocr', String),
Column('date', DateTime, index=True),
Column('date_archived', DateTime, index=True),
Column('date_transformed', DateTime, index=True),
Column('scraper', String),
Column('transformer', String)
post_table = Table(
"posts",
mapper_registry.metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("raw_id", Integer, ForeignKey("raw_posts.id"), index=True),
Column("platform_id", String, index=True),
Column("scraper", String),
Column("transformer", String),
Column("platform", String),
Column("channel", Integer, ForeignKey("channels.id"), index=True),
Column("date", DateTime, index=True),
Column("date_archived", DateTime, index=True),
Column("date_transformed", DateTime, index=True),
Column("url", String),
Column("author_id", String),
Column("author_username", String),
Column("content", String),
Column("forwarded_from", Integer, ForeignKey("channels.id"), index=True),
Column("reply_to", Integer, ForeignKey("posts.id"), index=True),
Column("named_entities", JSON),
Column("cryptocurrency_addresses", JSON),
Column("hashtags", JSON),
Column("outlinks", JSON),
Column("mentions", JSON),
Column("likes", Integer),
Column("forwards", Integer),
Column("views", Integer),
Column("video_title", String),
Column("video_duration", Integer),
Column("detected_language", String, index=True),
Column("normalized_content", String),
)
posts_forwarded_from_channel_index = Index(
"posts_channel_forwarded_from_idx",
post_table.c.channel,
post_table.c.forwarded_from,
)
media_table = Table(
"media",
mapper_registry.metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("type", String),
Column("raw_id", Integer, ForeignKey("raw_posts.id"), index=True),
Column("post", Integer, ForeignKey("posts.id"), index=True),
Column("url", String),
Column("original_url", String),
Column("exif", String),
Column("ocr", String),
Column("date", DateTime, index=True),
Column("date_archived", DateTime, index=True),
Column("date_transformed", DateTime, index=True),
Column("scraper", String),
Column("transformer", String),
)
mapper_registry.map_imperatively(Post, post_table)
@@ -539,7 +586,27 @@ mapper_registry.map_imperatively(Channel, channel_table)
mapper_registry.map_imperatively(ScraperResult, raw_posts_table)
mapper_registry.map_imperatively(RawChannelInfo, raw_channel_info_table)
mapper_registry.map_imperatively(ChannelInfo, channel_info_table)
mapper_registry.map_imperatively(Media, media_table, polymorphic_on='type', polymorphic_identity='media')
mapper_registry.map_imperatively(Image, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='image')
mapper_registry.map_imperatively(Video, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='video')
mapper_registry.map_imperatively(Audio, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='audio')
mapper_registry.map_imperatively(
Media, media_table, polymorphic_on="type", polymorphic_identity="media"
)
mapper_registry.map_imperatively(
Image,
media_table,
inherits=Media,
polymorphic_on="type",
polymorphic_identity="image",
)
mapper_registry.map_imperatively(
Video,
media_table,
inherits=Media,
polymorphic_on="type",
polymorphic_identity="video",
)
mapper_registry.map_imperatively(
Audio,
media_table,
inherits=Media,
polymorphic_on="type",
polymorphic_identity="audio",
)

View File

@@ -18,6 +18,7 @@ from sqlalchemy import nullsfirst
from cisticola.base import Channel, RawChannelInfo, ScraperResult, mapper_registry
from cisticola.utils import make_request
class Scraper:
"""Base class for defining platform-specific scrapers for scraping all posts
from a given channel on that specific platform.
@@ -25,23 +26,26 @@ class Scraper:
__version__ = "Scraper 0.0.0"
cookiestring = os.environ["YOUTUBE_COOKIESTRING"].replace(r'\n', '\n').replace(r'\t', '\t')
cookiefilename = 'cookiefile.txt'
cookiestring = (
os.environ["YOUTUBE_COOKIESTRING"].replace(r"\n", "\n").replace(r"\t", "\t")
)
cookiefilename = "cookiefile.txt"
def __init__(self):
# Initialize client to transfer files to the storage archive
self.s3_client = boto3.client(
service_name='s3',
region_name=os.environ['DO_SPACES_REGION'],
service_name="s3",
region_name=os.environ["DO_SPACES_REGION"],
endpoint_url=f'https://{os.environ["DO_SPACES_REGION"]}.digitaloceanspaces.com',
aws_access_key_id=os.environ['DO_SPACES_KEY'],
aws_secret_access_key=os.environ['DO_SPACES_SECRET'])
aws_access_key_id=os.environ["DO_SPACES_KEY"],
aws_secret_access_key=os.environ["DO_SPACES_SECRET"],
)
# Define request headers (necessary to bypass scraping protection
# for several platform scrapers)
self.headers = {
'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0'}
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:84.0) Gecko/20100101 Firefox/84.0"
}
def __str__(self):
return self.__version__
@@ -83,7 +87,7 @@ class Scraper:
the original post URL and the media's Content-Type.
"""
key = urlparse(url).path.split('/')[-1]
key = urlparse(url).path.split("/")[-1]
return key
def url_to_blob(self, url: str, key: str = None) -> Tuple[bytes, str, str]:
@@ -111,7 +115,7 @@ class Scraper:
r = make_request(url, headers=self.headers)
blob = r.content
content_type = r.headers.get('Content-Type')
content_type = r.headers.get("Content-Type")
if key is None:
key = self.url_to_key(url, content_type)
@@ -141,17 +145,16 @@ class Scraper:
Unique identifier for the media file.
"""
content_type = 'video/mp4'
ext = '.' + content_type.split('/')[-1]
content_type = "video/mp4"
ext = "." + content_type.split("/")[-1]
with tempfile.NamedTemporaryFile(suffix=ext) as temp_file:
(
ffmpeg
.input(url)
.output(temp_file.name, vcodec='copy')
.global_args('-loglevel', 'error')
.run(overwrite_output=True))
ffmpeg.input(url)
.output(temp_file.name, vcodec="copy")
.global_args("-loglevel", "error")
.run(overwrite_output=True)
)
temp_file.seek(0)
blob = temp_file.read()
@@ -184,11 +187,11 @@ class Scraper:
Unique identifier for the media file.
"""
content_type = 'video/mp4'
content_type = "video/mp4"
with tempfile.TemporaryDirectory() as temp_dir:
cookiefile = Path(temp_dir) / self.cookiefilename
with open(cookiefile, 'w') as f:
with open(cookiefile, "w") as f:
f.write(self.cookiestring)
ydl_opts = {
@@ -199,14 +202,16 @@ class Scraper:
"quiet": True,
"verbose": False,
"retries": 5,
"cookiefile": cookiefile}
"cookiefile": cookiefile,
}
ydl = yt_dlp.YoutubeDL(ydl_opts)
try:
meta = ydl.extract_info(
url,
download=True,)
download=True,
)
except yt_dlp.utils.DownloadError as e:
raise e
else:
@@ -240,12 +245,16 @@ class Scraper:
URL specifying the file on the storage archive.
"""
filename = self.__version__.replace(' ', '_') + '/' + key
filename = self.__version__.replace(" ", "_") + "/" + key
self.s3_client.upload_fileobj(BytesIO(blob), Bucket=os.environ[
'DO_BUCKET'], Key=filename, ExtraArgs={'ACL': 'public-read', 'ContentType': content_type})
self.s3_client.upload_fileobj(
BytesIO(blob),
Bucket=os.environ["DO_BUCKET"],
Key=filename,
ExtraArgs={"ACL": "public-read", "ContentType": content_type},
)
archived_url = os.environ['DO_URL'] + '/' + filename
archived_url = os.environ["DO_URL"] + "/" + filename
return archived_url
@@ -292,7 +301,9 @@ class Scraper:
raise NotImplementedError
@logger.catch
def get_posts(self, channel: Channel, since: ScraperResult = None) -> Generator[ScraperResult, None, None]:
def get_posts(
self, channel: Channel, since: ScraperResult = None
) -> Generator[ScraperResult, None, None]:
"""Scrape all posts from the specified Channel.
Parameters
@@ -342,8 +353,7 @@ class ScraperController:
self.scrapers.extend(scrapers)
def remove_all_scrapers(self):
"""Reset the ScraperController so that it doesn't control any scrapers
"""
"""Reset the ScraperController so that it doesn't control any scrapers"""
self.scrapers = []
def scrape_all_channels(self, fetch_old: bool = False):
@@ -362,15 +372,23 @@ class ScraperController:
session = self.session()
# TODO there should be a better/more generic way of selecting scrapeable channels
channels = session.query(Channel).filter((Channel.source=='researcher')|(Channel.source=='snowball_it')|(Channel.source=='snowball_complete')|(Channel.source=='linked_channel')).all()
channels = (
session.query(Channel)
.filter(
(Channel.source == "researcher")
| (Channel.source == "snowball_it")
| (Channel.source == "snowball_complete")
| (Channel.source == "linked_channel")
)
.all()
)
session.close()
return self.scrape_channels(channels, fetch_old=fetch_old)
def scrape_all_channel_info(self):
"""Scrape profile information from all channels in the database.
"""
"""Scrape profile information from all channels in the database."""
if self.session is None:
logger.error("No DB session")
return
@@ -379,11 +397,28 @@ class ScraperController:
# Because of rate limiting, we may not be able to succesfully scrape info for all of these channels.
# This will sort the channels by the least recently scraped.
most_recently_archived = session.query(func.max(RawChannelInfo.date_archived).label("date"), RawChannelInfo.channel.label("channel")).group_by(RawChannelInfo.channel).subquery()
channels = session.query(Channel).\
filter((Channel.source=='researcher')|(Channel.source=='snowball_it')|(Channel.source=='snowball_complete')|(Channel.source=='linked_channel')).\
outerjoin(most_recently_archived, Channel.id == most_recently_archived.c.channel).\
order_by(nullsfirst(most_recently_archived.c.date.asc())).all()
most_recently_archived = (
session.query(
func.max(RawChannelInfo.date_archived).label("date"),
RawChannelInfo.channel.label("channel"),
)
.group_by(RawChannelInfo.channel)
.subquery()
)
channels = (
session.query(Channel)
.filter(
(Channel.source == "researcher")
| (Channel.source == "snowball_it")
| (Channel.source == "snowball_complete")
| (Channel.source == "linked_channel")
)
.outerjoin(
most_recently_archived, Channel.id == most_recently_archived.c.channel
)
.order_by(nullsfirst(most_recently_archived.c.date.asc()))
.all()
)
session.close()
return self.scrape_channel_info(channels)
@@ -408,12 +443,17 @@ class ScraperController:
# If any channels are not already in the database, add them
for channel in channels:
platform_id = None
if channel.platform_id not in (None, ''):
if channel.platform_id not in (None, ""):
platform_id = channel.platform_id
channel_in_db = session.query(Channel).filter_by(platform_id=platform_id, platform=channel.platform, url=channel.url).first()
channel_in_db = (
session.query(Channel)
.filter_by(
platform_id=platform_id, platform=channel.platform, url=channel.url
)
.first()
)
if not channel_in_db:
logger.debug(f"{channel} does not exist in database, adding")
@@ -429,13 +469,17 @@ class ScraperController:
handled = True
added = 0
if fetch_old and channel.platform == 'Telegram':
if fetch_old and channel.platform == "Telegram":
# get oldest post (currently only for Telegram)
# TODO fix this so that it doesn't have an explicit check on channel.platform (should be generic)
# TODO implement until on all scrapers
rows = session.query(ScraperResult).where(
ScraperResult.channel == channel.id).order_by(
ScraperResult.date.asc(), ScraperResult.id.desc()).limit(10).all()
rows = (
session.query(ScraperResult)
.where(ScraperResult.channel == channel.id)
.order_by(ScraperResult.date.asc(), ScraperResult.id.desc())
.limit(10)
.all()
)
if len(rows) > 0:
until = rows[0]
@@ -449,9 +493,13 @@ class ScraperController:
# Note: a "bug" in Postgres can cause this query to hang for a really long time
# when searching for a single row, hence the limit(10).all() when we really just need
# the first row.
rows = session.query(ScraperResult).where(
ScraperResult.channel == channel.id).order_by(
ScraperResult.date.desc(), ScraperResult.id.asc()).limit(10).all()
rows = (
session.query(ScraperResult)
.where(ScraperResult.channel == channel.id)
.order_by(ScraperResult.date.desc(), ScraperResult.id.asc())
.limit(10)
.all()
)
if len(rows) > 0:
since = rows[0]
@@ -466,8 +514,7 @@ class ScraperController:
added += 1
session.commit()
logger.info(
f"{scraper} found {added} new posts from {channel}")
logger.info(f"{scraper} found {added} new posts from {channel}")
break
if not handled:
@@ -489,11 +536,24 @@ class ScraperController:
if session is None:
session = self.session()
if chronological:
posts = session.query(ScraperResult).where(ScraperResult.media_archived == None).where(ScraperResult.id >= 0).order_by(ScraperResult.date.desc()).limit(5000).all()
posts = (
session.query(ScraperResult)
.where(ScraperResult.media_archived == None)
.where(ScraperResult.id >= 0)
.order_by(ScraperResult.date.desc())
.limit(5000)
.all()
)
else:
# this query is really slow (~2.5 minutes) because of the shuffle. shuffling is so that multiple media archivers could work
# simultaneously with low risk of collision (at least while the number of unarchived items is very large)
posts = session.query(ScraperResult).where(ScraperResult.media_archived == None).order_by(func.random()).limit(5000).all()
posts = (
session.query(ScraperResult)
.where(ScraperResult.media_archived == None)
.order_by(func.random())
.limit(5000)
.all()
)
logger.info(f"Found {len(posts)} posts without media. Archiving now")
@@ -502,13 +562,23 @@ class ScraperController:
for scraper in self.scrapers:
# compare major versions
if post.scraper is not None and scraper.__version__.split('.')[0] == post.scraper.split('.')[0]:
if (
post.scraper is not None
and scraper.__version__.split(".")[0] == post.scraper.split(".")[0]
):
handled = True
logger.debug(f"{scraper} is archiving media for ID {post.id}")
post = scraper.archive_files(post)
if post:
session.query(ScraperResult).where(ScraperResult.id == post.id).update({'archived_urls': post.archived_urls, 'media_archived': post.media_archived})
session.query(ScraperResult).where(
ScraperResult.id == post.id
).update(
{
"archived_urls": post.archived_urls,
"media_archived": post.media_archived,
}
)
session.commit()
break
@@ -535,7 +605,9 @@ class ScraperController:
session = self.session()
while True:
self.archive_unarchived_media_batch(self, session=session, chronological=chronological)
self.archive_unarchived_media_batch(
self, session=session, chronological=chronological
)
@logger.catch(reraise=True)
def scrape_channel_info(self, channels: List[Channel]):
@@ -571,8 +643,7 @@ class ScraperController:
session.add(info)
session.commit()
logger.info(
f"{scraper} found {info}")
logger.info(f"{scraper} found {info}")
break
except ChannelDoesNotExistError:
logger.warning(f"ChannelDoesNotExist {channel}")
@@ -599,13 +670,13 @@ class ScraperController:
self.session.configure(bind=self.engine)
def reset_db(self):
"""Drop all data from the connected SQLAlchemy database.
"""
"""Drop all data from the connected SQLAlchemy database."""
close_all_sessions()
mapper_registry.metadata.drop_all(bind=self.engine)
self.connect_to_db(self.engine)
class ChannelDoesNotExistError(Exception):
"""The specified channel does not exist or has been deleted."""

View File

@@ -14,105 +14,129 @@ from loguru import logger
from cisticola.base import Channel, ScraperResult, RawChannelInfo
from cisticola.scraper.base import Scraper
class BitchuteScraper(Scraper):
"""An implementation of a Scraper for Bitchute, using classes from the 4cat
library"""
__version__ = "BitchuteScraper 0.0.1"
def get_username_from_url(self, url):
username = url.split('bitchute.com/channel/')[-1].strip('/')
username = url.split("bitchute.com/channel/")[-1].strip("/")
return username
@logger.catch
def get_posts(self, channel: Channel, since: ScraperResult = None) -> Generator[ScraperResult, None, None]:
def get_posts(
self, channel: Channel, since: ScraperResult = None
) -> Generator[ScraperResult, None, None]:
session = requests.Session()
session.headers.update(self.headers)
request = session.get("https://www.bitchute.com/search")
csrftoken = BeautifulSoup(request.text, 'html.parser').findAll(
"input", {"name": "csrfmiddlewaretoken"})[0].get("value")
csrftoken = (
BeautifulSoup(request.text, "html.parser")
.findAll("input", {"name": "csrfmiddlewaretoken"})[0]
.get("value")
)
time.sleep(0.25)
detail = 'comments'
detail = "comments"
username = self.get_username_from_url(channel.url)
scraper = get_videos_user(session, username, csrftoken, detail)
for post in scraper:
if since is not None and datetime.fromtimestamp(post['timestamp']) <= since.date:
if (
since is not None
and datetime.fromtimestamp(post["timestamp"]) <= since.date
):
break
archived_urls = {}
if 'video_url' in post:
url = post['video_url']
if "video_url" in post:
url = post["video_url"]
archived_urls[url] = None
yield ScraperResult(
scraper=self.__version__,
platform="Bitchute",
channel=channel.id,
platform_id=post['id'],
date=datetime.fromtimestamp(post['timestamp']),
platform_id=post["id"],
date=datetime.fromtimestamp(post["timestamp"]),
date_archived=datetime.now(timezone.utc),
raw_data=json.dumps(post),
archived_urls=archived_urls,
media_archived=None)
media_archived=None,
)
def can_handle(self, channel):
if channel.platform == "Bitchute" and self.get_username_from_url(channel.url) is not None:
if (
channel.platform == "Bitchute"
and self.get_username_from_url(channel.url) is not None
):
return True
@logger.catch
def get_profile(self, channel: Channel) -> RawChannelInfo:
base_url = channel.url
session = requests.session()
response = session.get(base_url)
soup = BeautifulSoup(response.content, 'html.parser')
soup = BeautifulSoup(response.content, "html.parser")
canonical_url = soup.find('link', {'id' : 'canonical'})['href']
csrftoken = session.cookies['csrftoken']
csrfmiddlewaretoken = soup.find('input', {'name' : 'csrfmiddlewaretoken'})['value']
canonical_url = soup.find("link", {"id": "canonical"})["href"]
csrftoken = session.cookies["csrftoken"]
csrfmiddlewaretoken = soup.find("input", {"name": "csrfmiddlewaretoken"})[
"value"
]
about_soup = soup.find('div', {'id' : 'channel-about'})
info_list = about_soup.find('div', {'class' : 'channel-about-details'}).find_all('p')
description_soup = about_soup.find('div', {'id' : 'channel-description'})
about_soup = soup.find("div", {"id": "channel-about"})
info_list = about_soup.find("div", {"class": "channel-about-details"}).find_all(
"p"
)
description_soup = about_soup.find("div", {"id": "channel-description"})
headers = {'Referer': base_url}
data = {
'csrftoken': csrftoken,
'csrfmiddlewaretoken': csrfmiddlewaretoken}
headers = {"Referer": base_url}
data = {"csrftoken": csrftoken, "csrfmiddlewaretoken": csrfmiddlewaretoken}
response = session.post(canonical_url + 'counts/', data = data, headers = headers)
response = session.post(canonical_url + "counts/", data=data, headers=headers)
counts = json.loads(response.text)
owner_soup = soup.find('p', {'class' : 'owner'})
if owner_soup.text == '[email\xa0protected]':
owner_name = decode_cfemail(owner_soup.find('span', {'class': "__cf_email__"})['data-cfemail'])
owner_soup = soup.find("p", {"class": "owner"})
if owner_soup.text == "[email\xa0protected]":
owner_name = decode_cfemail(
owner_soup.find("span", {"class": "__cf_email__"})["data-cfemail"]
)
else:
owner_name = owner_soup.text
profile = {
'description' : description_soup.text.strip(),
'description_links' : [a['href'] for a in description_soup.find_all('a', href = True)],
'created': re.sub(r'\s', ' ', info_list[0].text.split('Created')[1].strip('. ')),
'videos' : int(info_list[1].text.split('videos')[0].strip()),
'owner_url' : soup.find('p', {'class' : 'owner'}).find('a', href = True)['href'],
'owner_name' : owner_name,
'image' : about_soup.find('img', {'alt' : 'Channel Image'}).get('data-src'),
'subscribers': counts['subscriber_count'],
'views': int(counts['about_view_count'].split(' ')[0])}
"description": description_soup.text.strip(),
"description_links": [
a["href"] for a in description_soup.find_all("a", href=True)
],
"created": re.sub(
r"\s", " ", info_list[0].text.split("Created")[1].strip(". ")
),
"videos": int(info_list[1].text.split("videos")[0].strip()),
"owner_url": soup.find("p", {"class": "owner"}).find("a", href=True)[
"href"
],
"owner_name": owner_name,
"image": about_soup.find("img", {"alt": "Channel Image"}).get("data-src"),
"subscribers": counts["subscriber_count"],
"views": int(counts["about_view_count"].split(" ")[0]),
}
return RawChannelInfo(scraper=self.__version__,
return RawChannelInfo(
scraper=self.__version__,
platform=channel.platform,
channel=channel.id,
raw_data=json.dumps(profile, default=str),
date_archived=datetime.now(timezone.utc))
date_archived=datetime.now(timezone.utc),
)
def strip_tags(html, convert_newlines=True):
r"""
@@ -149,6 +173,7 @@ def strip_tags(html, convert_newlines=True):
stripper.feed(html)
return stripper.get_data()
def request_from_bitchute(session, method, url, headers=None, data=None):
"""
Request something via the BitChute API (or non-API)
@@ -176,7 +201,10 @@ def request_from_bitchute(session, method, url, headers=None, data=None):
raise NotImplemented()
if request.status_code >= 300:
raise ValueError("Response %i from BitChute for URL %s, need to retry" % (request.status_code, url))
raise ValueError(
"Response %i from BitChute for URL %s, need to retry"
% (request.status_code, url)
)
response = request.json()
return response
@@ -193,6 +221,7 @@ def request_from_bitchute(session, method, url, headers=None, data=None):
return response
def append_details(video, detail):
"""
Append extra metadata to video data
@@ -214,7 +243,7 @@ def append_details(video, detail):
"comments": "",
"hashtags": "",
"parent_id": "",
"video_url": ""
"video_url": "",
}
try:
@@ -223,15 +252,23 @@ def append_details(video, detail):
video_session = requests.session()
video_page = video_session.get(video["url"])
if "<h1 class=\"page-title\">Video Restricted</h1>" in video_page.text or \
"<h1 class=\"page-title\">Video Blocked</h1>" in video_page.text or \
"<h1 class=\"page-title\">Channel Blocked</h1>" in video_page.text or \
"<h1 class=\"page-title\">Channel Restricted</h1>" in video_page.text:
if "This video is unavailable as the contents have been deemed potentially illegal" in video_page.text:
if (
'<h1 class="page-title">Video Restricted</h1>' in video_page.text
or '<h1 class="page-title">Video Blocked</h1>' in video_page.text
or '<h1 class="page-title">Channel Blocked</h1>' in video_page.text
or '<h1 class="page-title">Channel Restricted</h1>' in video_page.text
):
if (
"This video is unavailable as the contents have been deemed potentially illegal"
in video_page.text
):
video["category"] = "moderated-illegal"
return (video, [])
elif "Viewing of this video is restricted, as it has been marked as Not Safe For Life" in video_page.text:
elif (
"Viewing of this video is restricted, as it has been marked as Not Safe For Life"
in video_page.text
):
video["category"] = "moderated-nsfl"
return (video, [])
@@ -255,39 +292,47 @@ def append_details(video, detail):
video["category"] = "moderated-other"
return (video, [])
elif "<iframe class=\"rumble\"" in video_page.text:
elif '<iframe class="rumble"' in video_page.text:
# some videos are actually embeds from rumble?
# these are iframes, so at the moment we cannot simply extract
# their info from the page, so we skip them. In the future we
# could add an extra request to get the relevant info, but so
# far the only examples I've seen are actually 'video not found'
video = {
**video,
"category": "error-embed-from-rumble"
}
video = {**video, "category": "error-embed-from-rumble"}
return (video, [])
elif video_page.status_code != 200:
video = {
**video,
"category": "error-%i" % video_page.status_code
}
video = {**video, "category": "error-%i" % video_page.status_code}
return (video, [])
soup = BeautifulSoup(video_page.text, 'html.parser')
video_csfrtoken = soup.findAll("input", {"name": "csrfmiddlewaretoken"})[0].get("value")
soup = BeautifulSoup(video_page.text, "html.parser")
video_csfrtoken = soup.findAll("input", {"name": "csrfmiddlewaretoken"})[0].get(
"value"
)
video["video_url"] = soup.select_one("video#player source").get("src")
video["thumbnail_image"] = soup.select_one("video#player").get("poster")
video["subject"] = soup.select_one("h1#video-title").text
video["author_id"] = soup.select_one("p.owner a").get("href").split("/")[2]
video["author"] = soup.select_one("div.channel-banner p.name a").get("href").split("/")[2]
video["body"] = soup.select_one("div#video-description").encode_contents().decode("utf-8").strip()
video["author"] = (
soup.select_one("div.channel-banner p.name a").get("href").split("/")[2]
)
video["body"] = (
soup.select_one("div#video-description")
.encode_contents()
.decode("utf-8")
.strip()
)
# we need *two more requests* to get the comment count and like/dislike counts
# this seems to be because bitchute uses a third-party comment widget
video_session.headers = {'Referer': video["url"], 'Origin': video["url"]}
counts = request_from_bitchute(video_session, "POST", "https://www.bitchute.com/video/%s/counts/" % video["id"], data={"csrfmiddlewaretoken": video_csfrtoken})
video_session.headers = {"Referer": video["url"], "Origin": video["url"]}
counts = request_from_bitchute(
video_session,
"POST",
"https://www.bitchute.com/video/%s/counts/" % video["id"],
data={"csrfmiddlewaretoken": video_csfrtoken},
)
if detail == "comments":
# if comments are also to be scraped, this is anothe request to make, which returns
@@ -308,7 +353,12 @@ def append_details(video, detail):
comment_count = 0
url = comment_script.split("'")[1]
comment_csrf = comment_script.split("'")[3]
comments_data = request_from_bitchute(video_session, "POST", url + "/api/get_comments/", data={"cf_auth": comment_csrf, "commentCount": 0})
comments_data = request_from_bitchute(
video_session,
"POST",
url + "/api/get_comments/",
data={"cf_auth": comment_csrf, "commentCount": 0},
)
for comment in comments_data:
comment_count += 1
@@ -318,14 +368,17 @@ def append_details(video, detail):
else:
thumbnail_image = ""
comments.append({
comments.append(
{
"id": comment["id"],
"thread_id": video["id"],
"subject": "",
"body": comment["content"],
"author": comment["fullname"],
"author_id": comment["creator"],
"timestamp": int(dateparser.parse(comment["created"]).timestamp()),
"timestamp": int(
dateparser.parse(comment["created"]).timestamp()
),
"url": "",
"views": "",
"length": "",
@@ -336,16 +389,24 @@ def append_details(video, detail):
"dislikes": "",
"channel_subscribers": "",
"comments": "",
"parent_id": comment.get("parent", "") if "parent" in comment else video["id"],
})
"parent_id": comment.get("parent", "")
if "parent" in comment
else video["id"],
}
)
else:
# if we don't need the full comments, we still need another request to get the *amount*
# of comments,
comment_count = request_from_bitchute(video_session, "POST",
comment_count = request_from_bitchute(
video_session,
"POST",
"https://commentfreely.bitchute.com/api/get_comment_count/",
data={"csrfmiddlewaretoken": video_csfrtoken,
"cf_thread": "bc_" + video["id"]})["commentCount"]
data={
"csrfmiddlewaretoken": video_csfrtoken,
"cf_thread": "bc_" + video["id"],
},
)["commentCount"]
except RuntimeError as e:
# we wrap this in one big try-catch because doing it for each request separarely is tedious
@@ -358,7 +419,10 @@ def append_details(video, detail):
# exact day it was uploaded
try:
published = dateparser.parse(
soup.find(class_="video-publish-date").text.split("published at")[1].strip()[:-1])
soup.find(class_="video-publish-date")
.text.split("published at")[1]
.strip()[:-1]
)
except AttributeError as e:
# publication date not on page?
published = None
@@ -373,7 +437,7 @@ def append_details(video, detail):
"comments": comment_count,
"parent_id": "",
"hashtags": ",".join([tag.text for tag in soup.select("#video-hashtags li a")]),
"views": counts["view_count"]
"views": counts["view_count"],
}
if published:
@@ -383,6 +447,7 @@ def append_details(video, detail):
time.sleep(0.25)
return (video, comments)
def get_videos_user(session, user, csrftoken, detail):
"""
Scrape videos for given BitChute user
@@ -402,23 +467,27 @@ def get_videos_user(session, user, csrftoken, detail):
url = base_url + "extend/"
container = session.get(base_url)
container_soup = BeautifulSoup(container.text, 'html.parser')
headers = {'Referer': base_url, 'Origin': "https://www.bitchute.com/"}
container_soup = BeautifulSoup(container.text, "html.parser")
headers = {"Referer": base_url, "Origin": "https://www.bitchute.com/"}
while True:
post_data = {
"csrfmiddlewaretoken": csrftoken,
"name": "",
"offset": str(offset),
}
post_data = {"csrfmiddlewaretoken": csrftoken, "name": "", "offset": str(offset)}
response = request_from_bitchute(
session, "POST", url, headers=headers, data=post_data
)
response = request_from_bitchute(session, "POST", url, headers=headers, data=post_data)
soup = BeautifulSoup(response["html"], 'html.parser')
soup = BeautifulSoup(response["html"], "html.parser")
videos = soup.select(".channel-videos-container")
comments = []
if len(videos) == 0 or num_items >= max_items:
break
for video_element in videos:
if num_items >= max_items:
break
@@ -432,16 +501,26 @@ def get_videos_user(session, user, csrftoken, detail):
"id": link["href"].split("/")[-2],
"thread_id": link["href"].split("/")[-2],
"subject": link.text,
"body": strip_tags(video_element.select_one(".channel-videos-text").text),
"body": strip_tags(
video_element.select_one(".channel-videos-text").text
),
"author": container_soup.select_one(".details .name a").text,
"author_id": container_soup.select_one(".details .name a")["href"].split("/")[2],
"author_id": container_soup.select_one(".details .name a")[
"href"
].split("/")[2],
"timestamp": int(
dateparser.parse(
video_element.select_one(".channel-videos-details.text-right.hidden-xs").text).timestamp()),
video_element.select_one(
".channel-videos-details.text-right.hidden-xs"
).text
).timestamp()
),
"url": "https://www.bitchute.com" + link["href"],
"views": video_element.select_one(".video-views").text.strip(),
"length": video_element.select_one(".video-duration").text.strip(),
"thumbnail_image": video_element.select_one(".channel-videos-image img")["src"],
"thumbnail_image": video_element.select_one(
".channel-videos-image img"
)["src"],
}
if detail != "basic":
@@ -456,10 +535,9 @@ def get_videos_user(session, user, csrftoken, detail):
# before the video, which is weird
yield comment
def decode_cfemail(cfemail):
"""https://stackoverflow.com/questions/36911296/scraping-of-protected-email
"""
def decode_cfemail(cfemail):
"""https://stackoverflow.com/questions/36911296/scraping-of-protected-email"""
email = ""
k = int(cfemail[:2], 16)

View File

@@ -9,8 +9,10 @@ from gogettr import PublicClient
from cisticola.base import Channel, ScraperResult, RawChannelInfo
from cisticola.scraper.base import Scraper
class GettrScraper(Scraper):
"""An implementation of a Scraper for Gettr, using gogettr library"""
__version__ = "GettrScraper 0.0.1"
def get_username_from_url(self, url):
@@ -21,48 +23,57 @@ class GettrScraper(Scraper):
return username
@logger.catch
def get_posts(self, channel: Channel, since: ScraperResult = None) -> Generator[ScraperResult, None, None]:
def get_posts(
self, channel: Channel, since: ScraperResult = None
) -> Generator[ScraperResult, None, None]:
client = PublicClient()
username = self.get_username_from_url(channel.url).lower()
scraper = client.user_activity(username=username, type="posts")
for post in scraper:
if since is not None and datetime.fromtimestamp(post['cdate']*0.001) <= since.date:
if (
since is not None
and datetime.fromtimestamp(post["cdate"] * 0.001) <= since.date
):
break
archived_urls = {}
if 'imgs' in post:
for img in post['imgs']:
if "imgs" in post:
for img in post["imgs"]:
url = "https://media.gettr.com/" + img
archived_urls[url] = None
if 'main' in post:
url = "https://media.gettr.com/" + post['main']
if "main" in post:
url = "https://media.gettr.com/" + post["main"]
archived_urls[url] = None
if 'ovid' in post:
url = "https://media.gettr.com/" + post['ovid']
if "ovid" in post:
url = "https://media.gettr.com/" + post["ovid"]
archived_urls[url] = None
yield ScraperResult(
scraper=self.__version__,
platform="Gettr",
channel=channel.id,
platform_id=post['_id'],
date=datetime.fromtimestamp(post['cdate']/1000.),
platform_id=post["_id"],
date=datetime.fromtimestamp(post["cdate"] / 1000.0),
date_archived=datetime.now(timezone.utc),
raw_data=json.dumps(post),
archived_urls=archived_urls,
media_archived=None)
media_archived=None,
)
def can_handle(self, channel):
if channel.platform == "Gettr" and self.get_username_from_url(channel.url) is not None:
if (
channel.platform == "Gettr"
and self.get_username_from_url(channel.url) is not None
):
return True
def url_to_key(self, url: str, content_type: str) -> str:
ext = '.' + content_type.split('/')[-1]
key = urlparse(url).path.split('/')[-2] + ext
ext = "." + content_type.split("/")[-1]
key = urlparse(url).path.split("/")[-2] + ext
return key
@logger.catch
@@ -71,8 +82,10 @@ class GettrScraper(Scraper):
username = self.get_username_from_url(channel.url)
profile = client.user_info(username)
return RawChannelInfo(scraper=self.__version__,
return RawChannelInfo(
scraper=self.__version__,
platform=channel.platform,
channel=channel.id,
raw_data=json.dumps(profile),
date_archived=datetime.now(timezone.utc))
date_archived=datetime.now(timezone.utc),
)

View File

@@ -10,25 +10,32 @@ import os
from cisticola.base import Channel, ScraperResult, RawChannelInfo
from cisticola.scraper import Scraper, make_request
BASE_URL = 'https://rumble.com'
BASE_URL = "https://rumble.com"
class RumbleScraper(Scraper):
"""An implementation of a Scraper for Rumble, using custom functions"""
__version__ = "RumbleScraper 0.0.2"
cookiestring = os.environ["YOUTUBE_COOKIESTRING"].replace(r'\n', '\n').replace(r'\t', '\t')
cookiefilename = 'cookiefile.txt'
cookiestring = (
os.environ["YOUTUBE_COOKIESTRING"].replace(r"\n", "\n").replace(r"\t", "\t")
)
cookiefilename = "cookiefile.txt"
@logger.catch
def get_posts(self, channel: Channel, since: ScraperResult = None) -> Generator[ScraperResult, None, None]:
def get_posts(
self, channel: Channel, since: ScraperResult = None
) -> Generator[ScraperResult, None, None]:
scraper = get_channel_videos(channel.url)
for post in scraper:
if since is not None and post['datetime'].replace(tzinfo=timezone.utc) <= since.date.replace(tzinfo=timezone.utc):
if since is not None and post["datetime"].replace(
tzinfo=timezone.utc
) <= since.date.replace(tzinfo=timezone.utc):
break
url = post['media_url']
url = post["media_url"]
archived_urls = {url: None}
@@ -36,16 +43,17 @@ class RumbleScraper(Scraper):
scraper=self.__version__,
platform="Rumble",
channel=channel.id,
platform_id=post['media_url'].split('/')[-2],
date=post['datetime'].replace(tzinfo=timezone.utc),
platform_id=post["media_url"].split("/")[-2],
date=post["datetime"].replace(tzinfo=timezone.utc),
date_archived=datetime.now(timezone.utc),
raw_data=json.dumps(post, default=str),
archived_urls=archived_urls,
media_archived=None)
media_archived=None,
)
def url_to_key(self, url: str, content_type: str) -> str:
ext = '.' + content_type.split('/')[-1]
key = urlparse(url).path.split('/')[-2] + ext
ext = "." + content_type.split("/")[-1]
key = urlparse(url).path.split("/")[-2] + ext
return key
@logger.catch
@@ -65,74 +73,77 @@ class RumbleScraper(Scraper):
@logger.catch
def get_profile(self, channel: Channel) -> RawChannelInfo:
profile = get_channel_profile(url=channel.url)
return RawChannelInfo(scraper=self.__version__,
return RawChannelInfo(
scraper=self.__version__,
platform=channel.platform,
channel=channel.id,
raw_data=json.dumps(profile),
date_archived=datetime.now(timezone.utc))
date_archived=datetime.now(timezone.utc),
)
def get_media_url(url):
r = make_request(url=url)
soup = BeautifulSoup(r.content, features = 'html.parser')
soup = BeautifulSoup(r.content, features="html.parser")
script = json.loads(''.join(soup.find('script', {'type':'application/ld+json'}).text))
media_url = script[0]['embedUrl']
script = json.loads(
"".join(soup.find("script", {"type": "application/ld+json"}).text)
)
media_url = script[0]["embedUrl"]
return media_url
def process_video(video):
rumble_soup = video.find('span', {'class' : 'video-item--rumbles'})
rumble_soup = video.find("span", {"class": "video-item--rumbles"})
if rumble_soup is None:
rumbles = '0'
rumbles = "0"
else:
rumbles = rumble_soup['data-value']
rumbles = rumble_soup["data-value"]
view_span = video.find('span', {'class' : 'video-item--views'})
view_span = video.find("span", {"class": "video-item--views"})
if view_span is None:
views = None
else:
views = view_span.get('data-value')
views = view_span.get("data-value")
author_a = video.find('a', {'rel': 'author'})
author_a = video.find("a", {"rel": "author"})
if author_a is None:
author_id = None
author_name = None
else:
author_id = author_a['href'].split('/')[-1]
author_id = author_a["href"].split("/")[-1]
author_name = author_a.text
video_link = BASE_URL + video.find('a', href = True)['href']
video_link = BASE_URL + video.find("a", href=True)["href"]
r = make_request(url=video_link)
soup = BeautifulSoup(r.content, features = 'html.parser')
soup = BeautifulSoup(r.content, features="html.parser")
content_div = soup.find('div', {'class': 'container content media-description'})
content_div = soup.find("div", {"class": "container content media-description"})
info = {
'title' : video.find('h3').text,
'thumbnail' : video.find('img')['src'],
'link' : video_link,
'views' : views,
'rumbles' : rumbles,
'content': '' if content_div is None else content_div.get_text('\n'),
'duration' : video.find('span', {'class' : 'video-item--duration'})['data-value'],
'datetime' : datetime.fromisoformat(video.find('time')['datetime']),
'author_id': author_id,
'author_name': author_name}
"title": video.find("h3").text,
"thumbnail": video.find("img")["src"],
"link": video_link,
"views": views,
"rumbles": rumbles,
"content": "" if content_div is None else content_div.get_text("\n"),
"duration": video.find("span", {"class": "video-item--duration"})["data-value"],
"datetime": datetime.fromisoformat(video.find("time")["datetime"]),
"author_id": author_id,
"author_name": author_name,
}
info['media_url'] = get_media_url(info['link'])
info["media_url"] = get_media_url(info["link"])
return info
def get_channel_videos(url):
page = 1
channel_url = f'{url}?page='
channel_url = f"{url}?page="
while True:
url = channel_url + str(page)
@@ -141,37 +152,38 @@ def get_channel_videos(url):
if r.status_code == 404:
break
soup = BeautifulSoup(r.content, features = 'html.parser')
soup = BeautifulSoup(r.content, features="html.parser")
video_list = soup.find_all('li', {'class' : 'video-listing-entry'})
video_list = soup.find_all("li", {"class": "video-listing-entry"})
for video in video_list:
yield process_video(video)
page += 1
def get_channel_profile(url):
channel_url = f'{url}'
channel_url = f"{url}"
r = make_request(url=channel_url)
soup = BeautifulSoup(r.content, features = 'lxml')
soup = BeautifulSoup(r.content, features="lxml")
verified_svg = soup.find('h1').find('svg', {'class' : 'listing-header--verified'})
thumbnail_soup = soup.find('img', {'class' : 'listing-header--thumb'})
cover_soup = soup.find('img', {'class' : 'listing-header--backsplash-img'})
verified_svg = soup.find("h1").find("svg", {"class": "listing-header--verified"})
thumbnail_soup = soup.find("img", {"class": "listing-header--thumb"})
cover_soup = soup.find("img", {"class": "listing-header--backsplash-img"})
author_a = soup.find('a', {'rel': 'author'})
author_a = soup.find("a", {"rel": "author"})
if author_a is None:
author_id = None
else:
author_id = author_a['href'].split('/')[-1]
author_id = author_a["href"].split("/")[-1]
profile = {
'name': soup.find('h1').text,
'id': author_id,
'verified': verified_svg is not None,
'thumbnail': thumbnail_soup.get('src') if thumbnail_soup else None,
'cover': cover_soup.get('src') if cover_soup else None,
'subscribers': soup.find('span', {'class' : 'subscribe-button-count'}).text}
"name": soup.find("h1").text,
"id": author_id,
"verified": verified_svg is not None,
"thumbnail": thumbnail_soup.get("src") if thumbnail_soup else None,
"cover": cover_soup.get("src") if cover_soup else None,
"subscribers": soup.find("span", {"class": "subscribe-button-count"}).text,
}
return profile

View File

@@ -14,19 +14,21 @@ from telethon.tl import types
from cisticola.base import Channel, ScraperResult, RawChannelInfo
from cisticola.scraper.base import Scraper
MEDIA_TYPES = ['photo', 'video', 'document', 'webpage']
MEDIA_TYPES = ["photo", "video", "document", "webpage"]
class TelegramTelethonScraper(Scraper):
"""An implementation of a Scraper for Telegram, using Telethon library"""
__version__ = "TelegramTelethonScraper 0.0.4"
client = None
def __init__(self, telethon_session_name=None):
super().__init__()
api_id = os.environ['TELEGRAM_API_ID']
api_hash = os.environ['TELEGRAM_API_HASH']
phone = os.environ['TELEGRAM_PHONE']
api_id = os.environ["TELEGRAM_API_ID"]
api_hash = os.environ["TELEGRAM_API_HASH"]
phone = os.environ["TELEGRAM_PHONE"]
if telethon_session_name is None:
telethon_session_name = phone
@@ -40,9 +42,9 @@ class TelegramTelethonScraper(Scraper):
self.client.disconnect()
def get_username_from_url(url):
username = url.split('https://t.me/')[1]
if username.startswith('s/'):
username = username.split('s/')[1]
username = url.split("https://t.me/")[1]
if username.startswith("s/"):
username = username.split("s/")[1]
return username
def get_channel_identifier(channel: Channel):
@@ -63,14 +65,18 @@ class TelegramTelethonScraper(Scraper):
return result
if len(list(result.archived_urls.keys())) != 1:
logger.warning(f"Expected 1 key in archived_urls, found {result.archived_keys}")
logger.warning(
f"Expected 1 key in archived_urls, found {result.archived_keys}"
)
else:
key = list(result.archived_urls.keys())[0]
if result.archived_urls[key] is None:
raw = json.loads(result.raw_data)
message = self.client.get_messages(raw['peer_id']['channel_id'], ids=[raw['id']])
message = self.client.get_messages(
raw["peer_id"]["channel_id"], ids=[raw["id"]]
)
blob = None
output_file_with_ext = None
@@ -81,12 +87,16 @@ class TelegramTelethonScraper(Scraper):
if blob is not None:
# TODO specify Content-Type
archived_url = self.archive_blob(blob = blob, content_type = '', key = output_file_with_ext)
archived_url = self.archive_blob(
blob=blob, content_type="", key=output_file_with_ext
)
result.archived_urls[key] = archived_url
result.media_archived = datetime.now(timezone.utc)
else:
if output_file_with_ext == 'largefile':
logger.info("Because this was a large file, not clearing media data")
if output_file_with_ext == "largefile":
logger.info(
"Because this was a large file, not clearing media data"
)
return result
logger.warning("Downloaded blob was None")
@@ -102,14 +112,18 @@ class TelegramTelethonScraper(Scraper):
if type(post.media) == types.MessageMediaDocument:
if post.media.document.size / (1024 * 1024) > 50:
logger.info(f"Skipping archive of large {type(post.media)} with size {post.media.document.size/(1024*1024)} MB")
logger.info(
f"Skipping archive of large {type(post.media)} with size {post.media.document.size/(1024*1024)} MB"
)
return (None, "largefile")
logger.debug(f"Archiving {type(post.media)} with size {post.media.document.size/(1024*1024)} MB")
logger.debug(
f"Archiving {type(post.media)} with size {post.media.document.size/(1024*1024)} MB"
)
else:
logger.debug(f"Archiving {type(post.media)}")
key = f'{post.peer_id.channel_id}_{post.id}'
key = f"{post.peer_id.channel_id}_{post.id}"
with tempfile.TemporaryDirectory() as temp_dir:
output_file = Path(temp_dir, key)
@@ -123,7 +137,7 @@ class TelegramTelethonScraper(Scraper):
output_file_with_ext = os.listdir(temp_dir)[0]
filename = Path(temp_dir, output_file_with_ext)
with open(filename, 'rb') as f:
with open(filename, "rb") as f:
blob = f.read()
return (blob, output_file_with_ext)
@@ -132,22 +146,35 @@ class TelegramTelethonScraper(Scraper):
return True
# @logger.catch
def get_posts(self, channel: Channel, since: ScraperResult = None, until: ScraperResult = None) -> Generator[ScraperResult, None, None]:
def get_posts(
self, channel: Channel, since: ScraperResult = None, until: ScraperResult = None
) -> Generator[ScraperResult, None, None]:
username = TelegramTelethonScraper.get_channel_identifier(channel)
if until is not None:
logger.info(f"Only getting old posts, up to ID {until.platform_id.split('/')[-1]}")
iterator = self.client.iter_messages(username, max_id=int(until.platform_id.split('/')[-1]), wait_time=0, limit=None)
logger.info(
f"Only getting old posts, up to ID {until.platform_id.split('/')[-1]}"
)
iterator = self.client.iter_messages(
username,
max_id=int(until.platform_id.split("/")[-1]),
wait_time=0,
limit=None,
)
else:
iterator = self.client.iter_messages(username)
post = None
for post in iterator:
post_url = f'{channel.url}/{post.id}'
post_url = f"{channel.url}/{post.id}"
logger.trace(f"Archiving post {post_url} from {post.date}")
if since is not None and post.date.replace(tzinfo=timezone.utc) <= since.date.replace(tzinfo=timezone.utc):
logger.info(f'Timestamp of post {post} is earlier than the previous archived timestamp {post.date.replace(tzinfo=timezone.utc)}')
if since is not None and post.date.replace(
tzinfo=timezone.utc
) <= since.date.replace(tzinfo=timezone.utc):
logger.info(
f"Timestamp of post {post} is earlier than the previous archived timestamp {post.date.replace(tzinfo=timezone.utc)}"
)
break
archived_urls = {}
@@ -166,10 +193,18 @@ class TelegramTelethonScraper(Scraper):
date_archived=datetime.now(timezone.utc),
raw_data=json.dumps(post.to_dict(), default=str),
archived_urls=archived_urls,
media_archived=media_archived)
media_archived=media_archived,
)
if (post is not None and post.id > 1 and since is None) or (post is not None and since is not None and post.date.replace(tzinfo=timezone.utc) > since.date.replace(tzinfo=timezone.utc)):
logger.info(f"Last post ID is {post.id} / {post.date}, since is {since.date if since is not None else None}, until is {until.platform_id if until is not None else None}, starting again")
if (post is not None and post.id > 1 and since is None) or (
post is not None
and since is not None
and post.date.replace(tzinfo=timezone.utc)
> since.date.replace(tzinfo=timezone.utc)
):
logger.info(
f"Last post ID is {post.id} / {post.date}, since is {since.date if since is not None else None}, until is {until.platform_id if until is not None else None}, starting again"
)
new_until = ScraperResult(
scraper=self.__version__,
platform="Telegram",
@@ -179,19 +214,21 @@ class TelegramTelethonScraper(Scraper):
date_archived=datetime.now(timezone.utc),
raw_data=json.dumps(post.to_dict(), default=str),
archived_urls=archived_urls,
media_archived=media_archived)
media_archived=media_archived,
)
for p in self.get_posts(channel, since=since, until=new_until):
yield p
@logger.catch
def get_profile(self, channel: Channel) -> RawChannelInfo:
username = TelegramTelethonScraper.get_channel_identifier(channel)
full_channel = self.client(GetFullChannelRequest(channel=username))
profile = full_channel.to_dict()
return RawChannelInfo(scraper=self.__version__,
return RawChannelInfo(
scraper=self.__version__,
platform=channel.platform,
channel=channel.id,
raw_data=json.dumps(profile, default=str),
date_archived=datetime.now(timezone.utc))
date_archived=datetime.now(timezone.utc),
)

View File

@@ -7,7 +7,18 @@ from sqlalchemy.sql.expression import func
from collections import defaultdict
from datetime import datetime, timezone
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Media, Channel, mapper_registry, Image, Video, Audio
from cisticola.base import (
RawChannelInfo,
ChannelInfo,
ScraperResult,
Post,
Media,
Channel,
mapper_registry,
Image,
Video,
Audio,
)
class Transformer:
@@ -35,7 +46,9 @@ class Transformer:
pass
def transform(data: ScraperResult, insert: Callable) -> Generator[Union[Post, Channel, Media], None, None]:
def transform(
data: ScraperResult, insert: Callable
) -> Generator[Union[Post, Channel, Media], None, None]:
"""Transform a ScraperResult into objects with additional parameters for analysis. This function can
yield multiple objects, as it will find references to quoted/replied posts, media objects, and Channel
objects and provide all of these to be inserted into the database.
@@ -67,16 +80,27 @@ class Transformer:
for k in data.archived_urls:
if data.archived_urls[k]:
archived_url = data.archived_urls[k]
filename = archived_url.split('/')[-1]
ext = None if '.' not in filename else filename.split('.')[-1].lower()
filename = archived_url.split("/")[-1]
ext = None if "." not in filename else filename.split(".")[-1].lower()
media_kwargs = dict(url=archived_url, post=transformed.id, raw_id=data.id, original_url=k, date=data.date, date_archived=data.date_archived, date_transformed=datetime.now(timezone.utc), transformer=self.__version__, scraper=data.scraper, platform=data.platform)
media_kwargs = dict(
url=archived_url,
post=transformed.id,
raw_id=data.id,
original_url=k,
date=data.date,
date_archived=data.date_archived,
date_transformed=datetime.now(timezone.utc),
transformer=self.__version__,
scraper=data.scraper,
platform=data.platform,
)
if ext in ('mp4', 'mov', 'avi', 'mkv'):
if ext in ("mp4", "mov", "avi", "mkv"):
media_class = Video
elif ext in ('oga', 'mp3', "wav", 'aif', 'aiff', 'aac'):
elif ext in ("oga", "mp3", "wav", "aif", "aiff", "aac"):
media_class = Audio
elif ext in ('jpg', 'jpeg', 'png', 'gif', 'bmp', 'heic', 'tiff'):
elif ext in ("jpg", "jpeg", "png", "gif", "bmp", "heic", "tiff"):
media_class = Image
else:
logger.warning(f"Unknown file extension {ext}")
@@ -202,11 +226,31 @@ class ETLController:
# This is using some adhoc unique constraints that might be worth formalizing at some point
if type(obj) == Channel:
instance = session.query(Channel).filter(
(((Channel.url==obj.url)&(Channel.url!='')&(Channel.url is not None)&(Channel.url!='https://t.me/s/'))|
((Channel.platform_id==str(obj.platform_id))&(Channel.platform_id!='')&(Channel.platform_id is not None))|
((Channel.screenname==obj.screenname)&(Channel.screenname!='')&(Channel.screenname is not None)))&
(Channel.platform==obj.platform)).first()
instance = (
session.query(Channel)
.filter(
(
(
(Channel.url == obj.url)
& (Channel.url != "")
& (Channel.url is not None)
& (Channel.url != "https://t.me/s/")
)
| (
(Channel.platform_id == str(obj.platform_id))
& (Channel.platform_id != "")
& (Channel.platform_id is not None)
)
| (
(Channel.screenname == obj.screenname)
& (Channel.screenname != "")
& (Channel.screenname is not None)
)
)
& (Channel.platform == obj.platform)
)
.first()
)
elif type(obj) == Post:
return self.insert_post(obj, session, hydrate)
@@ -240,7 +284,12 @@ class ETLController:
logger.info(f"Found matching DB entry for {obj}: {instance}")
if type(obj) == Channel:
if obj.source != instance.source and obj.source == 'linked_channel' and instance.source != 'researcher' and (instance.source is None or instance.source[:4] != 'snow'):
if (
obj.source != instance.source
and obj.source == "linked_channel"
and instance.source != "researcher"
and (instance.source is None or instance.source[:4] != "snow")
):
logger.info(f"Updating source to linked channel")
instance.source = obj.source
instance.notes = obj.notes
@@ -251,7 +300,7 @@ class ETLController:
session.flush()
session.commit()
if (instance.platform_id is None or instance.platform_id == ''):
if instance.platform_id is None or instance.platform_id == "":
instance.platform_id = obj.platform_id
session.flush()
session.commit()
@@ -293,22 +342,35 @@ class ETLController:
handled = False
if transformer.can_handle(result):
logger.trace(f"{transformer} is handling result {result.id} ({result.date})")
logger.trace(
f"{transformer} is handling result {result.id} ({result.date})"
)
handled = True
transformer.transform(result, lambda obj: self.insert_or_select(obj, session, hydrate), session, lambda obj: self.insert_post(obj, session, hydrate, flush=False), lambda: self.flush_posts(session))
transformer.transform(
result,
lambda obj: self.insert_or_select(obj, session, hydrate),
session,
lambda obj: self.insert_post(
obj, session, hydrate, flush=False
),
lambda: self.flush_posts(session),
)
break
if handled == False:
logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})")
logger.warning(
f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})"
)
self.flush_posts(session)
session.commit()
@logger.catch(reraise=True)
def transform_all_untransformed(self, hydrate: bool = True, min_date=datetime(2010, 1, 1)):
def transform_all_untransformed(
self, hydrate: bool = True, min_date=datetime(2010, 1, 1)
):
"""Transform all ScraperResult objects in the database that do not have an
equivalent Post object stored.
@@ -331,7 +393,8 @@ class ETLController:
logger.info(f"Fetching first untransformed post batch of {BATCH_SIZE}")
batch = (session.query(ScraperResult)
batch = (
session.query(ScraperResult)
.join(Post, isouter=True)
.where(ScraperResult.date > min_date)
.where(Post.raw_id == None)
@@ -344,9 +407,12 @@ class ETLController:
self.transform_results(batch, hydrate=hydrate)
logger.info(f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {max(batch, key=lambda v: v.date).date}")
logger.info(
f"Fetching untransformed posts batch of {BATCH_SIZE}, offset {max(batch, key=lambda v: v.date).date}"
)
batch = (session.query(ScraperResult)
batch = (
session.query(ScraperResult)
.join(Post, isouter=True)
.where(ScraperResult.date > min_date)
.where(Post.raw_id == None)
@@ -356,7 +422,6 @@ class ETLController:
.limit(BATCH_SIZE)
).all()
@logger.catch(reraise=True)
def transform_info(self, results: List[ChannelInfo]):
"""Transform raw RawChannelInfo objects into ChannelInfo objects.
@@ -380,17 +445,25 @@ class ETLController:
for transformer in self.transformers:
if transformer.can_handle(result):
logger.trace(f"{transformer} is handling raw info result {result.id} ({result.date_archived})")
logger.trace(
f"{transformer} is handling raw info result {result.id} ({result.date_archived})"
)
handled = True
transformer.transform_info(result, lambda obj: self.insert_or_select(obj, session, False), session, channel=data.Channel)
transformer.transform_info(
result,
lambda obj: self.insert_or_select(obj, session, False),
session,
channel=data.Channel,
)
session.commit()
break
if handled == False:
logger.warning(f"No Transformer could handle raw channel info ID {result.id} with platform {result.platform} ({result.date_archived})")
logger.warning(
f"No Transformer could handle raw channel info ID {result.id} with platform {result.platform} ({result.date_archived})"
)
@logger.catch(reraise=True)
def transform_all_untransformed_info(self):
@@ -407,7 +480,8 @@ class ETLController:
offset = 0
batch = []
query = (session.query(RawChannelInfo, Channel)
query = (
session.query(RawChannelInfo, Channel)
.select_from(RawChannelInfo)
.join(ChannelInfo, isouter=True)
.join(Channel, RawChannelInfo.channel == Channel.id)
@@ -416,12 +490,16 @@ class ETLController:
)
while len(batch) > 0 or offset == 0:
logger.info(f"Fetching untransformed info batch of {BATCH_SIZE}, offset {offset}")
logger.info(
f"Fetching untransformed info batch of {BATCH_SIZE}, offset {offset}"
)
batch = query.slice(offset, offset + BATCH_SIZE).all()
offset += BATCH_SIZE
logger.info(f"Found {len(batch)} info items to ETL ({offset} already processed)")
logger.info(
f"Found {len(batch)} info items to ETL ({offset} already processed)"
)
self.transform_info(batch)
@@ -450,16 +528,24 @@ class ETLController:
handled = False
if transformer.can_handle(result):
logger.trace(f"{transformer} is handling result {result.id} ({result.date})")
logger.trace(
f"{transformer} is handling result {result.id} ({result.date})"
)
handled = True
transformer.transform_media(result, total_result.Post, lambda obj: self.insert_or_select(obj, session, hydrate))
transformer.transform_media(
result,
total_result.Post,
lambda obj: self.insert_or_select(obj, session, hydrate),
)
session.commit()
break
if handled == False:
logger.warning(f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})")
logger.warning(
f"No Transformer could handle ID {result.id} with platform {result.platform} ({result.date})"
)
@logger.catch(reraise=True)
def transform_all_untransformed_media(self, hydrate=True):
@@ -482,10 +568,15 @@ class ETLController:
logger.info(f"Fetching first untransformed post media batch of {BATCH_SIZE}")
batch = (session.query(ScraperResult, Post)
batch = (
session.query(ScraperResult, Post)
.join(Post)
.join(Media, isouter=True)
.filter((ScraperResult.media_archived != None) & (cast(ScraperResult.archived_urls, String) != '{}') & (Media.id == None))
.filter(
(ScraperResult.media_archived != None)
& (cast(ScraperResult.archived_urls, String) != "{}")
& (Media.id == None)
)
.order_by(ScraperResult.date.desc())
.limit(BATCH_SIZE)
).all()
@@ -495,13 +586,23 @@ class ETLController:
self.transform_media(batch, hydrate=hydrate)
logger.info(f"Fetching untransformed post media batch of {BATCH_SIZE}, offset {min(batch, key=lambda v: v.ScraperResult.date).ScraperResult.date}")
logger.info(
f"Fetching untransformed post media batch of {BATCH_SIZE}, offset {min(batch, key=lambda v: v.ScraperResult.date).ScraperResult.date}"
)
batch = (session.query(ScraperResult, Post)
batch = (
session.query(ScraperResult, Post)
.join(Post)
.join(Media, isouter=True)
.where(ScraperResult.date <= min(batch, key=lambda v: v.ScraperResult.date).ScraperResult.date)
.filter((ScraperResult.media_archived != None) & (cast(ScraperResult.archived_urls, String) != '{}') & (Media.id == None))
.where(
ScraperResult.date
<= min(batch, key=lambda v: v.ScraperResult.date).ScraperResult.date
)
.filter(
(ScraperResult.media_archived != None)
& (cast(ScraperResult.archived_urls, String) != "{}")
& (Media.id == None)
)
.order_by(ScraperResult.date.desc())
.limit(BATCH_SIZE)
).all()

View File

@@ -7,7 +7,17 @@ from dateutil.relativedelta import relativedelta
from bs4 import BeautifulSoup
from cisticola.transformer.base import Transformer
from cisticola.base import RawChannelInfo, ScraperResult, Post, Image, Video, Media, Channel, ChannelInfo
from cisticola.base import (
RawChannelInfo,
ScraperResult,
Post,
Image,
Video,
Media,
Channel,
ChannelInfo,
)
class BitchuteTransformer(Transformer):
"""A Bitchute specific ScraperResult, with a method ETL/transforming"""
@@ -15,61 +25,86 @@ class BitchuteTransformer(Transformer):
__version__ = "BitchuteTransformer 0.0.2"
def can_handle(self, data: ScraperResult) -> bool:
scraper = data.scraper.split(' ')
scraper = data.scraper.split(" ")
if scraper[0] == "BitchuteScraper":
return True
return False
def transform_media(self, data: ScraperResult, transformed: Post, insert: Callable) -> Generator[Media, None, None]:
def transform_media(
self, data: ScraperResult, transformed: Post, insert: Callable
) -> Generator[Media, None, None]:
raw = json.loads(data.raw_data)
orig = raw['video_url']
orig = raw["video_url"]
new = data.archived_urls[orig]
m = Video(url=new, post=transformed.id, raw_id=data.id, original_url=orig, date=data.date, date_archived=data.date_archived, date_transformed=datetime.now(timezone.utc), transformer=self.__version__, scraper=data.scraper, platform=data.platform)
m = Video(
url=new,
post=transformed.id,
raw_id=data.id,
original_url=orig,
date=data.date,
date_archived=data.date_archived,
date_transformed=datetime.now(timezone.utc),
transformer=self.__version__,
scraper=data.scraper,
platform=data.platform,
)
insert(m)
def transform_info(self, data: RawChannelInfo, insert: Callable, session, channel=None) -> Generator[Union[Post, Channel, Media], None, None]:
def transform_info(
self, data: RawChannelInfo, insert: Callable, session, channel=None
) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
transformed = ChannelInfo(
raw_channel_info_id=data.id,
channel=data.channel,
platform_id=raw['owner_url'].strip('/').split('/')[-1],
platform_id=raw["owner_url"].strip("/").split("/")[-1],
platform=data.platform,
scraper=data.scraper,
transformer=self.__version__,
screenname=raw['owner_name'],
name=raw['owner_name'],
description=raw['description'],
description_url='', # does not exist for Bitchute
description_location='', # does not exist for Bitchute
followers=raw['subscribers'],
screenname=raw["owner_name"],
name=raw["owner_name"],
description=raw["description"],
description_url="", # does not exist for Bitchute
description_location="", # does not exist for Bitchute
followers=raw["subscribers"],
following=-1, # does not exist for Bitchute
verified=False, # does not exist for Bitchute
date_created=parse_created(raw['created'], data.date_archived),
date_created=parse_created(raw["created"], data.date_archived),
date_archived=data.date_archived,
date_transformed=datetime.now(timezone.utc)
date_transformed=datetime.now(timezone.utc),
)
transformed = insert(transformed)
def transform(self, data: ScraperResult, insert: Callable, session, insert_post, flush_posts) -> Generator[Union[Post, Channel, Media], None, None]:
def transform(
self, data: ScraperResult, insert: Callable, session, insert_post, flush_posts
) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
if raw['category'] == 'comment':
if raw['parent_id'] is None:
reply_to_id = raw['thread_id']
if raw["category"] == "comment":
if raw["parent_id"] is None:
reply_to_id = raw["thread_id"]
else:
reply_to_id = raw['parent_id']
reply_to_id = raw["parent_id"]
flush_posts()
post = session.query(Post).filter_by(channel=data.channel, platform_id=reply_to_id).first()
post = (
session.query(Post)
.filter_by(channel=data.channel, platform_id=reply_to_id)
.first()
)
if post is None:
if raw['parent_id'] is not None:
if raw["parent_id"] is not None:
# this block is for comments whose parent_ids correspond to deleted comments
post = session.query(Post).filter_by(channel=data.channel, platform_id=raw['thread_id']).first()
post = (
session.query(Post)
.filter_by(channel=data.channel, platform_id=raw["thread_id"])
.first()
)
if post is None:
reply_to = -1
else:
@@ -78,18 +113,18 @@ class BitchuteTransformer(Transformer):
reply_to = -1
else:
reply_to = post.id
content = raw['body'].strip()
content = raw["body"].strip()
else:
reply_to = -1
soup = BeautifulSoup(raw['body'], features = 'html.parser')
soup.find('div', {'class': 'teaser'}).decompose()
soup.find('span', {'class': 'more'}).decompose()
soup.find('span', {'class': 'less hidden'}).decompose()
soup = BeautifulSoup(raw["body"], features="html.parser")
soup.find("div", {"class": "teaser"}).decompose()
soup.find("span", {"class": "more"}).decompose()
soup.find("span", {"class": "less hidden"}).decompose()
content = soup.text.strip()
transformed = Post(
raw_id=data.id,
platform_id=raw['id'],
platform_id=raw["id"],
scraper=data.scraper,
transformer=self.__version__,
platform=data.platform,
@@ -97,20 +132,24 @@ class BitchuteTransformer(Transformer):
date=data.date,
date_archived=data.date_archived,
date_transformed=datetime.now(timezone.utc),
url=raw['url'] if raw['url'] else None,
url=raw["url"] if raw["url"] else None,
content=content,
author_id=raw['author_id'],
author_username=raw['author'],
author_id=raw["author_id"],
author_username=raw["author"],
reply_to=reply_to,
hashtags = list(filter(None, [h.strip('#') for h in raw['hashtags'].split(',')])),
likes = raw['likes'],
views = int(raw['views']) if raw.get('views') else None,
video_title = raw['subject'],
video_duration = _parse_duration_str(raw['length']))
hashtags=list(
filter(None, [h.strip("#") for h in raw["hashtags"].split(",")])
),
likes=raw["likes"],
views=int(raw["views"]) if raw.get("views") else None,
video_title=raw["subject"],
video_duration=_parse_duration_str(raw["length"]),
)
# insert_post
transformed = insert_post(transformed)
def parse_created(created: str, date_archived: datetime) -> datetime:
"""Convert a created string (e.g. ``"1 year, 10 months ago"``) to a datetime
object relative to the specified ``date_archived``.
@@ -119,19 +158,26 @@ def parse_created(created: str, date_archived: datetime) -> datetime:
# handle case where `created` string has already been parsed into a datetime
return datetime.fromisoformat(created)
except ValueError:
period_list = ['year', 'month', 'week', 'day']
period_list = ["year", "month", "week", "day"]
periods = [period.strip() for period in created.split('ago')[0].strip().split(',')]
_kwargs = {period : int(number) for period, number in dict(reversed(p.split(' ')) for p in periods).items()}
kwargs = {(k + 's' if k in period_list else k) : v for k, v in _kwargs.items()}
periods = [
period.strip() for period in created.split("ago")[0].strip().split(",")
]
_kwargs = {
period: int(number)
for period, number in dict(reversed(p.split(" ")) for p in periods).items()
}
kwargs = {(k + "s" if k in period_list else k): v for k, v in _kwargs.items()}
return date_archived - relativedelta(**kwargs)
def _parse_duration_str(duration_str: str) -> int:
"""Convert duration string (e.g. '2:27:04') to the number of seconds (e.g. 8824).
"""
"""Convert duration string (e.g. '2:27:04') to the number of seconds (e.g. 8824)."""
if not duration_str:
return None
else:
duration_list = duration_str.split(':')
return sum([int(s) * int(g) for s, g in zip([1, 60, 3600], reversed(duration_list))])
duration_list = duration_str.split(":")
return sum(
[int(s) * int(g) for s, g in zip([1, 60, 3600], reversed(duration_list))]
)

View File

@@ -8,7 +8,17 @@ from gogettr import PublicClient
from gogettr.api import GettrApiError
from cisticola.transformer.base import Transformer
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Image, Video, Media, Channel
from cisticola.base import (
RawChannelInfo,
ChannelInfo,
ScraperResult,
Post,
Image,
Video,
Media,
Channel,
)
class GettrTransformer(Transformer):
"""A Gettr specific ScraperResult, with a method ETL/transforming"""
@@ -16,50 +26,58 @@ class GettrTransformer(Transformer):
__version__ = "GettrTransformer 0.0.1"
def can_handle(self, data: ScraperResult) -> bool:
scraper = data.scraper.split(' ')
scraper = data.scraper.split(" ")
if scraper[0] == "GettrScraper":
return True
return False
def transform_info(self, data: RawChannelInfo, insert: Callable, session, channel=None) -> Generator[Union[Post, Channel, Media], None, None]:
def transform_info(
self, data: RawChannelInfo, insert: Callable, session, channel=None
) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
transformed = ChannelInfo(
raw_channel_info_id=data.id,
channel=data.channel,
platform_id=raw['_id'],
platform_id=raw["_id"],
platform=data.platform,
scraper=data.scraper,
transformer=self.__version__,
screenname=raw['username'],
name=raw['nickname'],
description=raw.get('dsc'),
description_url=raw.get('website'),
description_location=raw.get('location'),
followers=int(raw['flg']),
following=int(raw['flw']),
verified=True if raw.get('infl') else False,
date_created=datetime.fromtimestamp(int(raw['cdate'])*0.001),
screenname=raw["username"],
name=raw["nickname"],
description=raw.get("dsc"),
description_url=raw.get("website"),
description_location=raw.get("location"),
followers=int(raw["flg"]),
following=int(raw["flw"]),
verified=True if raw.get("infl") else False,
date_created=datetime.fromtimestamp(int(raw["cdate"]) * 0.001),
date_archived=data.date_archived,
date_transformed=datetime.now(timezone.utc)
date_transformed=datetime.now(timezone.utc),
)
transformed = insert(transformed)
def _get_channel_id(self, username: str, category: str, insert: Callable, session):
channel = session.query(Channel).where((func.lower(Channel.screenname)==func.lower(username)) & (Channel.platform == 'Gettr')).first()
channel = (
session.query(Channel)
.where(
(func.lower(Channel.screenname) == func.lower(username))
& (Channel.platform == "Gettr")
)
.first()
)
if channel is None:
try:
client = PublicClient()
profile = client.user_info(username.lower())
screenname = profile.get('_id')
screenname = profile.get("_id")
channel = Channel(
name=profile.get('nickname'),
name=profile.get("nickname"),
platform_id=screenname,
platform='Gettr',
platform="Gettr",
url="https://gettr.com/user/" + screenname,
screenname=screenname,
category=category,
@@ -69,31 +87,41 @@ class GettrTransformer(Transformer):
channel = Channel(
name=None,
platform_id=None,
platform = 'Gettr',
platform="Gettr",
url=None,
screenname=username,
category=category,
source=self.__version__,
notes='GettrApiError'
notes="GettrApiError",
)
channel = insert(channel)
return channel.id
def transform(self, data: ScraperResult, insert: Callable, session, insert_post, flush_posts) -> Generator[Union[Post, Channel, Media], None, None]:
def transform(
self, data: ScraperResult, insert: Callable, session, insert_post, flush_posts
) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
if raw["activity"]["action"] == "shares_pst":
forwarded_from = self._get_channel_id(
username = str(raw["activity"]["uid"]), category = 'forwarded', insert = insert, session = session)
username=str(raw["activity"]["uid"]),
category="forwarded",
insert=insert,
session=session,
)
else:
forwarded_from = None
mentions = []
for mentioned_user in raw.get("utgs", []):
mentioned_id = self._get_channel_id(
username = mentioned_user, category = 'mentioned', insert = insert, session = session)
username=mentioned_user,
category="mentioned",
insert=insert,
session=session,
)
mentions.append(mentioned_id)
transformed = Post(
@@ -114,9 +142,9 @@ class GettrTransformer(Transformer):
outlinks=list(filter(None, [raw.get("prevsrc")])),
forwarded_from=forwarded_from,
mentions=mentions,
likes = raw.get('lkbpst'),
likes=raw.get("lkbpst"),
forwards=raw.get("shbpst"),
views = raw.get('vfpst')
views=raw.get("vfpst"),
)
# insert_post

View File

@@ -6,7 +6,17 @@ from datetime import datetime, timezone
from sqlalchemy import func, JSON, String, cast, text
from cisticola.transformer.base import Transformer
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Image, Video, Media, Channel
from cisticola.base import (
RawChannelInfo,
ChannelInfo,
ScraperResult,
Post,
Image,
Video,
Media,
Channel,
)
class RumbleTransformer(Transformer):
"""A Rumble specific ScraperResult, with a method ETL/transforming"""
@@ -14,25 +24,36 @@ class RumbleTransformer(Transformer):
__version__ = "RumbleTransformer 0.0.1"
def can_handle(self, data: ScraperResult) -> bool:
scraper = data.scraper.split(' ')
scraper = data.scraper.split(" ")
if scraper[0] == "RumbleScraper":
return True
return False
def transform_info(self, data: RawChannelInfo, insert: Callable, session, channel=None) -> Generator[Union[Post, Channel, Media], None, None]:
def transform_info(
self, data: RawChannelInfo, insert: Callable, session, channel=None
) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
if 'id' not in raw:
if "id" not in raw:
# The first version of the Rumble ChannelInfo scraper didn't return
# the platform_id, so this is a workaround.
channel = session.query(RawChannelInfo).filter(text("raw_channel_info.raw_data::jsonb ->> 'name'=:name"), RawChannelInfo.platform == 'Rumble').params(name=raw['name']).order_by(RawChannelInfo.date_archived.desc()).first()
channel = (
session.query(RawChannelInfo)
.filter(
text("raw_channel_info.raw_data::jsonb ->> 'name'=:name"),
RawChannelInfo.platform == "Rumble",
)
.params(name=raw["name"])
.order_by(RawChannelInfo.date_archived.desc())
.first()
)
if channel is None:
platform_id = None
else:
platform_id = json.loads(channel.raw_data)['id']
platform_id = json.loads(channel.raw_data)["id"]
else:
platform_id = raw['id']
platform_id = raw["id"]
transformed = ChannelInfo(
raw_channel_info_id=data.id,
@@ -42,63 +63,67 @@ class RumbleTransformer(Transformer):
scraper=data.scraper,
transformer=self.__version__,
screenname=platform_id,
name=raw['name'],
description='', # does not exist for Rumble
description_url='', # does not exist for Rumble
description_location='', # does not exist for Rumble
followers=_process_number(raw['subscribers']),
name=raw["name"],
description="", # does not exist for Rumble
description_url="", # does not exist for Rumble
description_location="", # does not exist for Rumble
followers=_process_number(raw["subscribers"]),
following=-1, # does not exist for Rumble
verified=raw['verified'],
verified=raw["verified"],
date_created=None, # does not exist for Rumble
date_archived=data.date_archived,
date_transformed=datetime.now(timezone.utc)
date_transformed=datetime.now(timezone.utc),
)
transformed = insert(transformed)
def transform(self, data: ScraperResult, insert: Callable, session, insert_post, flush_posts) -> Generator[Union[Post, Channel, Media], None, None]:
def transform(
self, data: ScraperResult, insert: Callable, session, insert_post, flush_posts
) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
transformed = Post(
raw_id=data.id,
platform_id=raw['media_url'].strip('/').split('/')[-1],
platform_id=raw["media_url"].strip("/").split("/")[-1],
scraper=data.scraper,
transformer=self.__version__,
platform=data.platform,
channel=data.channel,
date=dateutil.parser.parse(raw['datetime']),
date=dateutil.parser.parse(raw["datetime"]),
date_archived=data.date_archived,
date_transformed=datetime.now(timezone.utc),
url=raw['link'],
content=raw['content'],
author_id=raw['author_id'],
author_username=raw['author_name'],
views = _process_number(raw.get('views')),
likes = _process_number(raw.get('rumbles')),
video_title = raw['title'],
video_duration=_parse_duration_str(raw['duration']))
url=raw["link"],
content=raw["content"],
author_id=raw["author_id"],
author_username=raw["author_name"],
views=_process_number(raw.get("views")),
likes=_process_number(raw.get("rumbles")),
video_title=raw["title"],
video_duration=_parse_duration_str(raw["duration"]),
)
# insert_post
insert_post(transformed)
def _process_number(s):
def _process_number(s):
if s is None:
return None
else:
s = s.replace(' ', '').replace(',','')
if s.endswith('M'):
s = s.replace(" ", "").replace(",", "")
if s.endswith("M"):
return int(float(s[:-1]) * 1e6)
elif s.endswith('K'):
elif s.endswith("K"):
return int(float(s[:-1]) * 1000)
return int(s)
def _parse_duration_str(duration_str: str) -> int:
"""Convert duration string (e.g. '2:27:04') to the number of seconds (e.g. 8824).
"""
"""Convert duration string (e.g. '2:27:04') to the number of seconds (e.g. 8824)."""
if not duration_str:
return None
else:
duration_list = duration_str.split(':')
return sum([int(s) * int(g) for s, g in zip([1, 60, 3600], reversed(duration_list))])
duration_list = duration_str.split(":")
return sum(
[int(s) * int(g) for s, g in zip([1, 60, 3600], reversed(duration_list))]
)

View File

@@ -17,11 +17,21 @@ from datetime import datetime, timezone
from sqlalchemy import func
from cisticola.transformer.base import Transformer
from cisticola.base import RawChannelInfo, ChannelInfo, ScraperResult, Post, Image, Video, Audio, Media, Channel
from cisticola.base import (
RawChannelInfo,
ChannelInfo,
ScraperResult,
Post,
Image,
Video,
Audio,
Media,
Channel,
)
class TelegramTelethonTransformer(Transformer):
__version__ = 'TelegramTelethonTransformer 0.0.4'
__version__ = "TelegramTelethonTransformer 0.0.4"
# TODO cache
# cache channels for which we cannot get the name from the web interface
@@ -38,7 +48,7 @@ class TelegramTelethonTransformer(Transformer):
get_screenname_cache = {}
def can_handle(self, data: ScraperResult) -> bool:
scraper = data.scraper.split(' ')
scraper = data.scraper.split(" ")
if scraper[0] == "TelegramTelethonScraper":
return True
@@ -47,9 +57,9 @@ class TelegramTelethonTransformer(Transformer):
def __init__(self, telethon_session_name=None):
super().__init__()
api_id = os.environ['TELEGRAM_API_ID']
api_hash = os.environ['TELEGRAM_API_HASH']
phone = os.environ['TELEGRAM_PHONE']
api_id = os.environ["TELEGRAM_API_ID"]
api_hash = os.environ["TELEGRAM_API_HASH"]
phone = os.environ["TELEGRAM_PHONE"]
if telethon_session_name is None:
telethon_session_name = phone
@@ -67,7 +77,11 @@ class TelegramTelethonTransformer(Transformer):
try:
data = self.client.get_entity(channel_id)
if isinstance(data, types.User):
output = (data.username, str(data.first_name or "") + " " + str(data.last_name or ""), "")
output = (
data.username,
str(data.first_name or "") + " " + str(data.last_name or ""),
"",
)
else:
output = (data.username, data.title, "")
except ChannelPrivateError:
@@ -85,7 +99,9 @@ class TelegramTelethonTransformer(Transformer):
# this doesn't work for chat channels
if orig_screenname in self.bad_channels:
logger.debug(f"Skipping screenname because it is not accessible for channel {orig_screenname}")
logger.debug(
f"Skipping screenname because it is not accessible for channel {orig_screenname}"
)
return ""
logger.info(f"Finding channel from URL {url}")
@@ -95,7 +111,7 @@ class TelegramTelethonTransformer(Transformer):
self.bad_channels[orig_screenname] = True
return ""
soup = BeautifulSoup(r.content, features = 'lxml')
soup = BeautifulSoup(r.content, features="lxml")
post = soup.findAll("div", {"data-post": orig_screenname + "/" + str(id)})
name = ""
@@ -106,127 +122,173 @@ class TelegramTelethonTransformer(Transformer):
if decrement > 8:
break
logger.info(f"Could not find post from {url}, looking for id {id - decrement}")
post = soup.findAll("div", {"data-post" : orig_screenname + "/" + str(id - decrement)})
logger.info(
f"Could not find post from {url}, looking for id {id - decrement}"
)
post = soup.findAll(
"div", {"data-post": orig_screenname + "/" + str(id - decrement)}
)
if len(post) == 0:
logger.warning(f"Could not find post from {url}")
else:
fwd_tag = post[0].findAll("a", {"class", "tgme_widget_message_forwarded_from_name"})
fwd_tag = post[0].findAll(
"a", {"class", "tgme_widget_message_forwarded_from_name"}
)
if len(fwd_tag) == 0:
fwd_tag = post[0].findAll("span", {"class", "tgme_widget_message_forwarded_from_name"})
fwd_tag = post[0].findAll(
"span", {"class", "tgme_widget_message_forwarded_from_name"}
)
if len(fwd_tag) >= 1:
name = fwd_tag[0].text
return name
def transform_info(self, data: RawChannelInfo, insert: Callable, session, channel=None) -> Generator[Union[Post, Channel, Media], None, None]:
def transform_info(
self, data: RawChannelInfo, insert: Callable, session, channel=None
) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
chat_raw = raw['chats'][0]
chat_raw = raw["chats"][0]
transformed = ChannelInfo(
raw_channel_info_id=data.id,
channel=data.channel,
platform_id=raw['full_chat']['id'],
platform_id=raw["full_chat"]["id"],
platform=data.platform,
scraper=data.scraper,
transformer=self.__version__,
screenname=chat_raw['username'],
name=chat_raw['title'],
description=raw['full_chat']['about'],
description_url='', # does not exist for Telegram
description_location='', # does not exist for Telegram
followers=raw['full_chat']['participants_count'],
screenname=chat_raw["username"],
name=chat_raw["title"],
description=raw["full_chat"]["about"],
description_url="", # does not exist for Telegram
description_location="", # does not exist for Telegram
followers=raw["full_chat"]["participants_count"],
following=-1, # does not exist for Telegram
verified=False, # does not exist for Telegram
date_created=dateutil.parser.parse(chat_raw['date']),
date_created=dateutil.parser.parse(chat_raw["date"]),
date_archived=data.date_archived,
date_transformed=datetime.now(timezone.utc)
date_transformed=datetime.now(timezone.utc),
)
transformed = insert(transformed)
if channel.platform_id is None:
logger.info(f"Missing platform ID on {channel}, setting to {raw['full_chat']['id']}")
logger.info(
f"Missing platform ID on {channel}, setting to {raw['full_chat']['id']}"
)
new_channel = session.query(Channel).where(Channel.id == channel.id).one()
new_channel.platform_id = raw['full_chat']['id']
new_channel.platform_id = raw["full_chat"]["id"]
session.flush()
session.commit()
if len(raw['chats']) > 1:
for chat in raw['chats'][1:]:
if len(raw["chats"]) > 1:
for chat in raw["chats"][1:]:
new_chat = Channel(
name=chat["title"],
platform_id=chat["id"],
category=channel.category, # this should be the same as the "parent"
platform=channel.platform, # this should be the same as the "parent"
url=("https://t.me/s/" + chat["username"]) if "username" in chat else "",
url=("https://t.me/s/" + chat["username"])
if "username" in chat
else "",
screenname=chat["username"] if "username" in chat else "",
country=channel.country, # this should be the same as the "parent"
influencer=channel.influencer, # this should be the same as the "parent"
public=None,
chat=not chat["broadcast"],
notes=channel.id, # this should be the channel ID of the parent
source="linked_channel"
source="linked_channel",
)
insert(new_chat)
# TODO this method API is chaotic and could be cleaned up
def transform(self, data: ScraperResult, insert: Callable, session, insert_post, flush_posts) -> Generator[Union[Post, Channel, Media], None, None]:
def transform(
self, data: ScraperResult, insert: Callable, session, insert_post, flush_posts
) -> Generator[Union[Post, Channel, Media], None, None]:
raw = json.loads(data.raw_data)
if raw['_'] != 'Message':
if raw["_"] != "Message":
logger.warning(f"Cannot convert type {raw['_']} to post")
return
fwd_from = None
if raw['fwd_from'] and raw['fwd_from']['from_id'] and 'channel_id' in raw['fwd_from']['from_id']:
if (
raw["fwd_from"]
and raw["fwd_from"]["from_id"]
and "channel_id" in raw["fwd_from"]["from_id"]
):
# use cache to look up channel instead of a DB request if possible
if str(raw['fwd_from']['from_id']['channel_id']) not in self.channels_cache_by_platformid:
channel = session.query(Channel).filter_by(platform_id=str(raw['fwd_from']['from_id']['channel_id']), platform = 'Telegram').first()
if (
str(raw["fwd_from"]["from_id"]["channel_id"])
not in self.channels_cache_by_platformid
):
channel = (
session.query(Channel)
.filter_by(
platform_id=str(raw["fwd_from"]["from_id"]["channel_id"]),
platform="Telegram",
)
.first()
)
if channel is None:
(screenname, name, notes) = self.get_screenname_from_id(raw['fwd_from']['from_id']['channel_id'])
(screenname, name, notes) = self.get_screenname_from_id(
raw["fwd_from"]["from_id"]["channel_id"]
)
if name == "":
logger.info("Trying fallback web interface")
orig_channel = session.query(Channel).filter_by(id=data.channel).first()
orig_channel = (
session.query(Channel).filter_by(id=data.channel).first()
)
if orig_channel.screenname is not None:
name = self.get_name_from_web_interface(orig_channel.screenname, raw['id'])
name = self.get_name_from_web_interface(
orig_channel.screenname, raw["id"]
)
channel = Channel(
name=name,
platform_id=raw['fwd_from']['from_id']['channel_id'],
platform_id=raw["fwd_from"]["from_id"]["channel_id"],
platform=data.platform,
url="https://t.me/s/" + screenname if screenname is not None else "",
url="https://t.me/s/" + screenname
if screenname is not None
else "",
screenname=screenname,
category='forwarded',
category="forwarded",
source=self.__version__,
notes=notes
notes=notes,
)
channel = insert(channel)
logger.info(f"Added {channel}")
self.channels_cache_by_platformid[str(raw['fwd_from']['from_id']['channel_id'])] = channel
self.channels_cache_by_platformid[
str(raw["fwd_from"]["from_id"]["channel_id"])
] = channel
fwd_from = self.channels_cache_by_platformid[str(raw['fwd_from']['from_id']['channel_id'])].id
fwd_from = self.channels_cache_by_platformid[
str(raw["fwd_from"]["from_id"]["channel_id"])
].id
reply_to = None
if raw['reply_to']:
reply_to_id = str(raw['reply_to']['reply_to_msg_id'])
if raw["reply_to"]:
reply_to_id = str(raw["reply_to"]["reply_to_msg_id"])
# use cache to find post ID instead of a DB request, if possible
if (data.channel, reply_to_id) not in self.posts_cache:
session.commit()
flush_posts() # TODO this is necessary because the post we are looking for might have been added in the same session
post = session.query(Post).filter_by(channel=data.channel, platform_id=reply_to_id).first()
post = (
session.query(Post)
.filter_by(channel=data.channel, platform_id=reply_to_id)
.first()
)
if post is None:
reply_to = -1
else:
@@ -238,25 +300,36 @@ class TelegramTelethonTransformer(Transformer):
mentions = []
for mention_entity in [entity for entity in raw['entities'] if entity['_'] == 'MessageEntityMention']:
offset = mention_entity['offset']
length = mention_entity['length']
for mention_entity in [
entity
for entity in raw["entities"]
if entity["_"] == "MessageEntityMention"
]:
offset = mention_entity["offset"]
length = mention_entity["length"]
screenname = add_surrogate(raw['message'])[offset:offset+length].strip('@').strip()
screenname = (
add_surrogate(raw["message"])[offset : offset + length]
.strip("@")
.strip()
)
# use cache rather than a DB request if possible
if screenname.lower() not in self.channels_cache_by_screenname:
channel = session.query(Channel).filter(func.lower(Channel.screenname)==func.lower(screenname)).first()
channel = (
session.query(Channel)
.filter(func.lower(Channel.screenname) == func.lower(screenname))
.first()
)
if channel is None:
channel = Channel(
name=None,
platform_id=None,
platform = 'Telegram',
platform="Telegram",
url="https://t.me/s/" + screenname,
screenname=screenname,
category='mentioned',
category="mentioned",
source=self.__version__,
)
@@ -277,15 +350,15 @@ class TelegramTelethonTransformer(Transformer):
channel = self.channels_cache_by_id[int(data.channel)]
if channel is not None and channel.url:
url = channel.url.strip('/') + f"/{raw['id']}"
url = channel.url.strip("/") + f"/{raw['id']}"
author_username = channel.screenname
else:
url = ""
author_username = ""
author_id = raw.get('peer_id', {}).get('channel_id')
if raw['from_id'] and 'user_id' in raw['from_id']:
author_id = raw['from_id']['user_id']
author_id = raw.get("peer_id", {}).get("channel_id")
if raw["from_id"] and "user_id" in raw["from_id"]:
author_id = raw["from_id"]["user_id"]
author_username = ""
(screenname, name, notes) = self.get_screenname_from_id(author_id)
if screenname:
@@ -293,12 +366,12 @@ class TelegramTelethonTransformer(Transformer):
transformed = Post(
raw_id=data.id,
platform_id = raw['id'],
platform_id=raw["id"],
scraper=data.scraper,
transformer=self.__version__,
platform=data.platform,
channel=data.channel,
date=dateutil.parser.parse(raw['date']),
date=dateutil.parser.parse(raw["date"]),
date_archived=data.date_archived,
date_transformed=datetime.now(timezone.utc),
url=url,
@@ -308,47 +381,56 @@ class TelegramTelethonTransformer(Transformer):
forwarded_from=fwd_from,
reply_to=reply_to,
mentions=mentions,
forwards = raw.get('forwards'),
views = raw.get('views')
forwards=raw.get("forwards"),
views=raw.get("views"),
)
# insert_post
insert_post(transformed)
def stripped(s):
"""https://stackoverflow.com/a/29933716"""
lstripped = ''.join(takewhile(str.isspace, s))
rstripped = ''.join(reversed(tuple(takewhile(str.isspace, reversed(s)))))
lstripped = "".join(takewhile(str.isspace, s))
rstripped = "".join(reversed(tuple(takewhile(str.isspace, reversed(s)))))
return lstripped + rstripped
def add_markdown_links(raw_post):
"""This function is necessary because Telethon's markdown.unparse doesn't
correctly handle trailing whitespace or multi-line links"""
global_offset = 0
transformed_content = add_surrogate(raw_post['message'])
links = [entity for entity in raw_post['entities'] if entity['_'] == 'MessageEntityTextUrl']
transformed_content = add_surrogate(raw_post["message"])
links = [
entity
for entity in raw_post["entities"]
if entity["_"] == "MessageEntityTextUrl"
]
for link in links:
offset = global_offset + link['offset']
length = link['length']
url = link['url']
offset = global_offset + link["offset"]
length = link["length"]
url = link["url"]
before_link = transformed_content[:offset]
inner_text = transformed_content[offset : offset + length]
# skip creation of link if inner link text is only whitespace
if inner_text.replace('\u200b', '').strip():
processed_inner_text = inner_text.strip().replace('\n', '\\\n')
if inner_text.replace("\u200b", "").strip():
processed_inner_text = inner_text.strip().replace("\n", "\\\n")
link_text = f"[{processed_inner_text}]"
trailing_whitespace = stripped(transformed_content[offset:offset+length])
trailing_whitespace = stripped(
transformed_content[offset : offset + length]
)
link_href = f"({url})"
after_link = transformed_content[offset + length :]
transformed_content = before_link + link_text + link_href + trailing_whitespace + after_link
global_offset += (4 + len(url) + inner_text.strip().count('\n'))
transformed_content = (
before_link + link_text + link_href + trailing_whitespace + after_link
)
global_offset += 4 + len(url) + inner_text.strip().count("\n")
return del_surrogate(transformed_content)

View File

@@ -2,8 +2,8 @@ import requests
from loguru import logger
import time
def make_request(url, headers = None, max_retries = 5, break_codes = None):
def make_request(url, headers=None, max_retries=5, break_codes=None):
"""Retry request `max_retries` times, while catching arbitrary exceptions.
Parameters
@@ -33,20 +33,17 @@ def make_request(url, headers = None, max_retries = 5, break_codes = None):
try:
r = request_until_200(
url = url,
headers = headers,
max_retries = max_retries,
break_codes = break_codes)
url=url, headers=headers, max_retries=max_retries, break_codes=break_codes
)
logger.debug(f"Request for url: {url} succeeded")
except Exception as e:
logger.warning(f"Request for url: {url} raised exception: [{e}]")
return r
def request_until_200(url, headers = None, max_retries = 5, break_codes = None):
"""Retry request `max_retries` times, or until the request is successful.
"""
def request_until_200(url, headers=None, max_retries=5, break_codes=None):
"""Retry request `max_retries` times, or until the request is successful."""
if break_codes is None:
break_codes = [200]
@@ -57,7 +54,9 @@ def request_until_200(url, headers = None, max_retries = 5, break_codes = None):
r = requests.get(url, headers=headers)
while r.status_code not in break_codes and n_retries < 5:
logger.warning(f"Request for url: {url} returned status: {r.status_code} on attempt: {n_retries}/{max_retries}")
logger.warning(
f"Request for url: {url} returned status: {r.status_code} on attempt: {n_retries}/{max_retries}"
)
n_retries += 1
# back off subsequent requests
@@ -65,6 +64,8 @@ def request_until_200(url, headers = None, max_retries = 5, break_codes = None):
r = requests.get(url, headers=headers)
if r.status_code not in break_codes:
raise ValueError(f"Request for url: {url} failed with status: {r.status_code} after {max_retries} attempts")
raise ValueError(
f"Request for url: {url} failed with status: {r.status_code} after {max_retries} attempts"
)
return r

View File

@@ -12,14 +12,15 @@
#
import os
import sys
sys.path.insert(0, os.path.abspath('../../'))
sys.path.insert(0, os.path.abspath("../../"))
# -- Project information -----------------------------------------------------
project = 'Cisticola'
copyright = '2022, Bellingcat'
author = 'Bellingcat'
project = "Cisticola"
copyright = "2022, Bellingcat"
author = "Bellingcat"
# -- General configuration ---------------------------------------------------
@@ -27,10 +28,10 @@ author = 'Bellingcat'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.coverage', 'sphinx.ext.napoleon']
extensions = ["sphinx.ext.autodoc", "sphinx.ext.coverage", "sphinx.ext.napoleon"]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
templates_path = ["_templates"]
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
@@ -43,7 +44,7 @@ exclude_patterns = []
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'sphinx_rtd_theme'
html_theme = "sphinx_rtd_theme"
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
@@ -52,9 +53,9 @@ html_static_path = []
# -- Default flags for autodoc------------------------------------------------
autodoc_default_options = {'exclude-members': '_sa_class_manager'}
autodoc_default_options = {"exclude-members": "_sa_class_manager"}
html_favicon = '../images/favicon.ico'
html_logo = '../images/cisticola_logo.svg'
html_favicon = "../images/favicon.ico"
html_logo = "../images/cisticola_logo.svg"
html_theme_options = {'style_nav_header_background': '#292a2b'}
html_theme_options = {"style_nav_header_background": "#292a2b"}

View File

@@ -20,10 +20,12 @@ expected_headers = [
"chat",
"notes",
"normalized_url",
"to_remove"]
"to_remove",
]
def standardize_country(s):
_s = s.split('(')[0].split('?')[0]
_s = s.split("(")[0].split("?")[0]
return _s.strip()
@@ -33,7 +35,7 @@ def sync_channels(args, session):
gc = gspread.service_account(filename="service_account.json")
# Open a sheet from a spreadsheet in one go
wks = gc.open_by_url(os.environ['GSHEET']).worksheet("channels")
wks = gc.open_by_url(os.environ["GSHEET"]).worksheet("channels")
channels = wks.get_all_records(expected_headers=expected_headers)
row = 2
@@ -65,22 +67,30 @@ def sync_channels(args, session):
if c["platform_id"] != "":
platform_id = c["platform_id"]
channel = (
session.query(Channel)
.filter_by(platform_id=str(platform_id), platform=str(c["platform"]))
.first()
)
if not channel:
channel = (
session.query(Channel)
.filter_by(platform=str(c["platform"]), url=str(c["url"]))
.first()
)
if not channel and c["screenname"] != "" and c["screenname"] is not None:
channel = (
session.query(Channel)
.filter_by(
platform_id=str(platform_id), platform=str(c["platform"])
platform=str(c["platform"]), screenname=str(c["screenname"])
)
.first()
)
if not channel:
channel = session.query(Channel).filter_by(platform=str(c["platform"]), url=str(c["url"])).first()
if not channel and c["screenname"] != '' and c["screenname"] is not None:
channel = session.query(Channel).filter_by(platform=str(c["platform"]), screenname=str(c["screenname"])).first()
if not channel:
if all([k in [None, True, False, ''] for k in c.values()]):
if all([k in [None, True, False, ""] for k in c.values()]):
# end sync if completely empty row is encountered
break
@@ -109,7 +119,11 @@ def sync_channels(args, session):
if c["screenname"]:
channel.screenname = c["screenname"]
if c["country"]:
channel.country = None if c["country"] is None else list(map(standardize_country, c["country"].split('/')))
channel.country = (
None
if c["country"] is None
else list(map(standardize_country, c["country"].split("/")))
)
if c["influencer"]:
channel.influencer = c["influencer"]
if c["public"]:
@@ -129,23 +143,27 @@ def sync_channels(args, session):
# this likely means that the channel was duplicated in the Google Sheet, so add a red highlight
if was_researcher:
logger.warning(f"This channel (ID {channel.id}) is possibly a duplicate.")
logger.warning(
f"This channel (ID {channel.id}) is possibly a duplicate."
)
wks.format(f"A{str(row)}:A{str(row)}", {
"backgroundColor": {
"red": 1.0,
"green": 0.0,
"blue": 0.0
}})
wks.format(
f"A{str(row)}:A{str(row)}",
{"backgroundColor": {"red": 1.0, "green": 0.0, "blue": 0.0}},
)
time.sleep(1)
# channel has ID
else:
cid = int(c["id"])
channel = session.query(Channel).filter_by(id=cid).first()
channel_info = session.query(ChannelInfo).filter_by(channel=cid).order_by(ChannelInfo.date_archived.desc()).first()
channel_info = (
session.query(ChannelInfo)
.filter_by(channel=cid)
.order_by(ChannelInfo.date_archived.desc())
.first()
)
logger.info(f"Updating channel {channel}")
logger.info(f"Found info {channel_info}")
@@ -155,7 +173,11 @@ def sync_channels(args, session):
channel.platform = c["platform"]
channel.url = c["url"]
channel.screenname = c["screenname"]
channel.country = None if c["country"] is None else list(map(standardize_country, c["country"].split('/')))
channel.country = (
None
if c["country"] is None
else list(map(standardize_country, c["country"].split("/")))
)
channel.influencer = c["influencer"]
channel.public = c["public"]
channel.chat = c["chat"]
@@ -167,7 +189,9 @@ def sync_channels(args, session):
wks.update_cell(row, 7, channel_info.screenname)
time.sleep(1)
if channel_info and str(channel.platform_id) != str(channel_info.platform_id):
if channel_info and str(channel.platform_id) != str(
channel_info.platform_id
):
channel.platform_id = channel_info.platform_id
wks.update_cell(row, 3, channel_info.platform_id)
time.sleep(1)

View File

@@ -8,9 +8,9 @@ if __name__ == "__main__":
args = parser.parse_args()
api_id = os.environ['TELEGRAM_API_ID']
api_hash = os.environ['TELEGRAM_API_HASH']
phone = os.environ['TELEGRAM_PHONE']
api_id = os.environ["TELEGRAM_API_ID"]
api_hash = os.environ["TELEGRAM_API_HASH"]
phone = os.environ["TELEGRAM_PHONE"]
telethon_session_name = args.telethon_session
if telethon_session_name is None:

View File

@@ -1,49 +1,51 @@
import pytest
from sqlalchemy.sql import text
from cisticola.base import Post, Channel, ChannelInfo, Media, ScraperResult, RawChannelInfo
from cisticola.base import (
Post,
Channel,
ChannelInfo,
Media,
ScraperResult,
RawChannelInfo,
)
from cisticola.scraper import (
TelegramTelethonScraper,
BitchuteScraper,
GettrScraper,
RumbleScraper)
RumbleScraper,
)
from cisticola.transformer import (
TelegramTelethonTransformer,
BitchuteTransformer,
GettrTransformer,
RumbleTransformer)
RumbleTransformer,
)
CONTROLLERS = {
'telegram' : {
'scraper': TelegramTelethonScraper,
'transformer': TelegramTelethonTransformer
"telegram": {
"scraper": TelegramTelethonScraper,
"transformer": TelegramTelethonTransformer,
},
'bitchute': {
'scraper': BitchuteScraper,
'transformer': BitchuteTransformer
},
'gettr': {
'scraper': GettrScraper,
'transformer': GettrTransformer
},
'rumble': {
'scraper': RumbleScraper,
'transformer': RumbleTransformer
}
"bitchute": {"scraper": BitchuteScraper, "transformer": BitchuteTransformer},
"gettr": {"scraper": GettrScraper, "transformer": GettrTransformer},
"rumble": {"scraper": RumbleScraper, "transformer": RumbleTransformer},
}
@pytest.mark.parametrize('platform', ['telegram','bitchute', 'gettr', 'rumble'])
def test_scraper_and_transformer(platform, session, controller, etl_controller, channel_kwargs):
@pytest.mark.parametrize("platform", ["telegram", "bitchute", "gettr", "rumble"])
def test_scraper_and_transformer(
platform, session, controller, etl_controller, channel_kwargs
):
controller.reset_db()
controller.remove_all_scrapers()
# necessary for comments/replies to be processed correctly
session.execute(text('INSERT INTO posts(id) VALUES (-1)'))
session.execute(text("INSERT INTO posts(id) VALUES (-1)"))
session.commit()
channels = [Channel(**channel_kwargs[platform])]
scraper = CONTROLLERS[platform]['scraper']
scraper = CONTROLLERS[platform]["scraper"]
controller.register_scraper(scraper=scraper())
controller.scrape_channels(channels=channels)
@@ -52,7 +54,11 @@ def test_scraper_and_transformer(platform, session, controller, etl_controller,
raw_posts = session.query(ScraperResult).all()
raw_channel_info = session.query(RawChannelInfo).all()
archived_urls = session.query(ScraperResult.archived_urls).order_by(ScraperResult.date_archived.desc()).first()
archived_urls = (
session.query(ScraperResult.archived_urls)
.order_by(ScraperResult.date_archived.desc())
.first()
)
assert len(raw_posts) > 0
assert len(raw_channel_info) > 0
@@ -60,7 +66,7 @@ def test_scraper_and_transformer(platform, session, controller, etl_controller,
controller.remove_all_scrapers()
transformer = CONTROLLERS[platform]['transformer']
transformer = CONTROLLERS[platform]["transformer"]
etl_controller.register_transformer(transformer())
etl_controller.transform_all_untransformed()

View File

@@ -8,161 +8,172 @@ from cisticola.scraper import ScraperController
from cisticola.transformer import ETLController
BITCHUTE_CHANNEL_KWARGS = {
'name': 'bestonlinejewelrystoresusa@gmail.com (test)',
'platform_id': 'bestonlinejewelrystoresusagmailcom',
'category': 'test',
'platform': 'Bitchute',
'url': 'https://www.bitchute.com/channel/bestonlinejewelrystoresusagmailcom/',
'screenname': None,
'country': 'US',
'influencer': None,
'public': True,
'chat': False,
'notes': '',
'source': 'researcher'}
"name": "bestonlinejewelrystoresusa@gmail.com (test)",
"platform_id": "bestonlinejewelrystoresusagmailcom",
"category": "test",
"platform": "Bitchute",
"url": "https://www.bitchute.com/channel/bestonlinejewelrystoresusagmailcom/",
"screenname": None,
"country": "US",
"influencer": None,
"public": True,
"chat": False,
"notes": "",
"source": "researcher",
}
GAB_CHANNEL_KWARGS = {
'name': 'Capt. Marc Simon (test)',
'platform_id': 'marc_capt',
'category': 'test',
'platform': 'Gab',
'url': 'https://gab.com/marc_capt',
'screenname': 'marc_capt',
'country': 'CA',
'influencer': None,
'public': True,
'chat': False,
'notes': '',
'source': 'researcher'}
"name": "Capt. Marc Simon (test)",
"platform_id": "marc_capt",
"category": "test",
"platform": "Gab",
"url": "https://gab.com/marc_capt",
"screenname": "marc_capt",
"country": "CA",
"influencer": None,
"public": True,
"chat": False,
"notes": "",
"source": "researcher",
}
GAB_GROUP_KWARGS = {
'name': 'iran group (test)',
'platform_id': "10001",
'category': 'test',
'platform': 'Gab',
'url': 'https://gab.com/groups/10001',
'screenname': 'iran group',
'country': 'IR',
'influencer': None,
'public': True,
'chat': True,
'notes': '',
'source': 'researcher'}
"name": "iran group (test)",
"platform_id": "10001",
"category": "test",
"platform": "Gab",
"url": "https://gab.com/groups/10001",
"screenname": "iran group",
"country": "IR",
"influencer": None,
"public": True,
"chat": True,
"notes": "",
"source": "researcher",
}
GETTR_CHANNEL_KWARGS = {
'name': 'LizardRepublic (test)',
'platform_id': 'lizardrepublic',
'category': 'test',
'platform': 'Gettr',
'url': 'https://www.gettr.com/user/lizardrepublic',
'screenname': 'lizardrepublic',
'country': 'US',
'influencer': None,
'public': True,
'chat': False,
'notes': '',
'source': 'researcher'}
"name": "LizardRepublic (test)",
"platform_id": "lizardrepublic",
"category": "test",
"platform": "Gettr",
"url": "https://www.gettr.com/user/lizardrepublic",
"screenname": "lizardrepublic",
"country": "US",
"influencer": None,
"public": True,
"chat": False,
"notes": "",
"source": "researcher",
}
INSTAGRAM_CHANNEL_KWARGS = {
'name': 'borland.88 (test)',
'platform_id': 'borland.88',
'category': 'test',
'platform': 'Instagram',
'url': 'https://www.instagram.com/borland.88/',
'screenname': 'borland.88',
'country': 'UA',
'influencer': None,
'public': True,
'chat': False,
'notes': '',
'source': 'researcher'}
"name": "borland.88 (test)",
"platform_id": "borland.88",
"category": "test",
"platform": "Instagram",
"url": "https://www.instagram.com/borland.88/",
"screenname": "borland.88",
"country": "UA",
"influencer": None,
"public": True,
"chat": False,
"notes": "",
"source": "researcher",
}
ODYSEE_CHANNEL_KWARGS = {
'name': "Mak1n' Bacon (test)",
'platform_id': 'Mak1nBacon',
'category': 'test',
'platform': 'Odysee',
'url': 'https://odysee.com/@Mak1nBacon',
'screenname': 'Mak1nBacon',
'country': 'US',
'influencer': None,
'public': True,
'chat': False,
'notes': '',
'source': 'researcher'}
"name": "Mak1n' Bacon (test)",
"platform_id": "Mak1nBacon",
"category": "test",
"platform": "Odysee",
"url": "https://odysee.com/@Mak1nBacon",
"screenname": "Mak1nBacon",
"country": "US",
"influencer": None,
"public": True,
"chat": False,
"notes": "",
"source": "researcher",
}
RUMBLE_CHANNEL_KWARGS = {
'name': 'we are uploading videos wow products (test)',
'platform_id': 'c-916305',
'category': 'test',
'platform': 'Rumble',
'url': 'https://rumble.com/c/c-916305',
'screenname': 'we are uploading',
'country': 'CA',
'influencer': None,
'public': True,
'chat': False,
'notes': '',
'source': 'researcher'}
"name": "we are uploading videos wow products (test)",
"platform_id": "c-916305",
"category": "test",
"platform": "Rumble",
"url": "https://rumble.com/c/c-916305",
"screenname": "we are uploading",
"country": "CA",
"influencer": None,
"public": True,
"chat": False,
"notes": "",
"source": "researcher",
}
TELEGRAM_CHANNEL_KWARGS = {
'name': 'Бутылка (test)',
'platform_id': "-1001760492118",
'category': 'test',
'platform': 'Telegram',
'url': 'https://t.me/butylka1488',
'screenname': 'butylka1488',
'country': 'RU',
'influencer': None,
'public': True,
'chat': False,
'notes': '',
'source': 'researcher'}
"name": "Бутылка (test)",
"platform_id": "-1001760492118",
"category": "test",
"platform": "Telegram",
"url": "https://t.me/butylka1488",
"screenname": "butylka1488",
"country": "RU",
"influencer": None,
"public": True,
"chat": False,
"notes": "",
"source": "researcher",
}
TWITTER_CHANNEL_KWARGS = {
'name': 'L Weber (test)',
'platform_id': "1424979017749442595",
'category': 'test',
'platform': 'Twitter',
'url': 'https://twitter.com/LWeber33662141',
'screenname': 'LWeber33662141',
'country': 'US',
'influencer': None,
'public': True,
'chat': False,
'notes': '',
'source': 'researcher'}
"name": "L Weber (test)",
"platform_id": "1424979017749442595",
"category": "test",
"platform": "Twitter",
"url": "https://twitter.com/LWeber33662141",
"screenname": "LWeber33662141",
"country": "US",
"influencer": None,
"public": True,
"chat": False,
"notes": "",
"source": "researcher",
}
VKONTAKTE_CHANNEL_KWARGS = {
'name': 'Wwg1wgA (test)',
'platform_id': 'club201278078',
'category': 'test',
'platform': 'Vkontakte',
'url': 'https://vk.com/club201278078',
'screenname': 'Wwg1wgA',
'country': 'FR',
'influencer': None,
'public': True,
'chat': False,
'notes': '',
'source': 'researcher'}
"name": "Wwg1wgA (test)",
"platform_id": "club201278078",
"category": "test",
"platform": "Vkontakte",
"url": "https://vk.com/club201278078",
"screenname": "Wwg1wgA",
"country": "FR",
"influencer": None,
"public": True,
"chat": False,
"notes": "",
"source": "researcher",
}
YOUTUBE_CHANNEL_KWARGS = {
'name': 'AnEs87 (test)',
'platform_id': 'UCP6exBqGoxGLv_pM9Dxk2pA',
'category': 'test',
'platform': 'Youtube',
'url': 'https://www.youtube.com/channel/UCP6exBqGoxGLv_pM9Dxk2pA',
'screenname': 'AnEs87',
'country': 'SV',
'influencer': None,
'public': True,
'chat': False,
'notes': '',
'source': 'researcher'}
"name": "AnEs87 (test)",
"platform_id": "UCP6exBqGoxGLv_pM9Dxk2pA",
"category": "test",
"platform": "Youtube",
"url": "https://www.youtube.com/channel/UCP6exBqGoxGLv_pM9Dxk2pA",
"screenname": "AnEs87",
"country": "SV",
"influencer": None,
"public": True,
"chat": False,
"notes": "",
"source": "researcher",
}
@pytest.fixture(scope='package')
@pytest.fixture(scope="package")
def engine(tmpdir_factory):
"""Initialize a SQLite database and SQLAlchemy engine to be used for all
tests in the package"""
@@ -171,7 +182,8 @@ def engine(tmpdir_factory):
return engine
@pytest.fixture(scope='package')
@pytest.fixture(scope="package")
def session(engine):
"""Initialize a SQLAlchemy session to be used for all tests in the package"""
@@ -179,7 +191,8 @@ def session(engine):
sessionfactory.configure(bind=engine)
return sessionfactory()
@pytest.fixture(scope='package')
@pytest.fixture(scope="package")
def controller(engine):
"""Initialize ScraperController to be used for all tests in the package."""
@@ -188,7 +201,8 @@ def controller(engine):
return scraper_controller
@pytest.fixture(scope='package')
@pytest.fixture(scope="package")
def etl_controller(engine):
"""Initialize ETLController to be used for all tests in the package."""
@@ -197,21 +211,23 @@ def etl_controller(engine):
return etl_controller
@pytest.fixture(scope='package')
@pytest.fixture(scope="package")
def channel_kwargs():
"""Define keyword arguments to use for defining test channels for each
platform to be scraped.
"""
return {
'bitchute' : BITCHUTE_CHANNEL_KWARGS,
'gab' : GAB_CHANNEL_KWARGS,
'gab_group' : GAB_GROUP_KWARGS,
'gettr' : GETTR_CHANNEL_KWARGS,
'instagram' : INSTAGRAM_CHANNEL_KWARGS,
'odysee' : ODYSEE_CHANNEL_KWARGS,
'rumble' : RUMBLE_CHANNEL_KWARGS,
'telegram' : TELEGRAM_CHANNEL_KWARGS,
'twitter' : TWITTER_CHANNEL_KWARGS,
'vkontakte' : VKONTAKTE_CHANNEL_KWARGS,
'youtube' : YOUTUBE_CHANNEL_KWARGS}
"bitchute": BITCHUTE_CHANNEL_KWARGS,
"gab": GAB_CHANNEL_KWARGS,
"gab_group": GAB_GROUP_KWARGS,
"gettr": GETTR_CHANNEL_KWARGS,
"instagram": INSTAGRAM_CHANNEL_KWARGS,
"odysee": ODYSEE_CHANNEL_KWARGS,
"rumble": RUMBLE_CHANNEL_KWARGS,
"telegram": TELEGRAM_CHANNEL_KWARGS,
"twitter": TWITTER_CHANNEL_KWARGS,
"vkontakte": VKONTAKTE_CHANNEL_KWARGS,
"youtube": YOUTUBE_CHANNEL_KWARGS,
}