merged main

This commit is contained in:
Tristan Lee
2022-10-26 19:50:48 -05:00
7 changed files with 419 additions and 405 deletions

13
Pipfile
View File

@@ -26,10 +26,8 @@ pytz = "*"
langdetect = "*"
spacy = "==3.2.4"
ocrd-pyexiftool = "*"
gabber = {git = "https://github.com/stanfordio/gabber.git"}
snscrape = {git = "https://github.com/bellingcat/snscrape"}
polyphemus = {git = "https://github.com/bellingcat/polyphemus"}
filelock = "*"
telethon = "*"
[dev-packages]
pytest = "*"
@@ -45,3 +43,12 @@ python_version = "3.9"
[pipenv]
allow_prereleases = true
[packages.gabber]
git = "https://github.com/stanfordio/gabber.git"
[packages.snscrape]
git = "https://github.com/bellingcat/snscrape"
[packages.polyphemus]
git = "https://github.com/bellingcat/polyphemus"

714
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

8
app.py
View File

@@ -74,6 +74,11 @@ def scrape_channels(args):
controller = get_scraper_controller()
controller.scrape_all_channels(archive_media=args.media)
def scrape_channels_old(args):
logger.info(f"Scraping old posts from channels, media: {args.media}")
controller = get_scraper_controller()
controller.scrape_all_channels(archive_media=args.media, fetch_old=True)
def scrape_channel_info(args):
logger.info(f"Scraping channel info")
@@ -153,6 +158,9 @@ if __name__ == "__main__":
elif args.command == "scrape-channels":
logger.add("logs/scrape-channels.log", level="TRACE", rotation="100 MB")
scrape_channels(args)
elif args.command == "scrape-channels-old":
logger.add("logs/scrape-channels-old.log", level="TRACE", rotation="100 MB")
scrape_channels_old(args)
elif args.command == "archive-media":
logger.add("logs/archive-media.log", level="TRACE", rotation="100 MB")
archive_media(args)

View File

@@ -7,6 +7,7 @@ import io
from sqlalchemy.orm import registry
from sqlalchemy import Table, Column, Integer, String, JSON, DateTime, ForeignKey, Boolean, Index
from sqlalchemy.dialects.postgresql import JSONB
import pytesseract
import PIL
import exiftool
@@ -475,7 +476,7 @@ channel_table = Table('channels', mapper_registry.metadata,
Column('platform', String),
Column('url', String),
Column('screenname', String),
Column('country', JSON),
Column('country', JSONB, index = True),
Column('influencer', String),
Column('public', Boolean),
Column('chat', Boolean),

View File

@@ -335,18 +335,19 @@ class ScraperController:
def remove_all_scrapers(self):
self.scrapers = []
def scrape_all_channels(self, archive_media: bool = True):
def scrape_all_channels(self, archive_media: bool = True, fetch_old: bool = False):
if self.session is None:
logger.error("No DB session")
return
session = self.session()
channels = session.query(Channel).filter((Channel.source=='researcher')|(Channel.source=='snowball_it')).all()
# TODO there should be a better/more generic way of selecting scrapeable channels
channels = session.query(Channel).filter((Channel.source=='researcher')|(Channel.source=='snowball_it')|(Channel.source=='snowball_complete')).all()
session.close()
return self.scrape_channels(channels, archive_media=archive_media)
return self.scrape_channels(channels, archive_media=archive_media, fetch_old=fetch_old)
def scrape_all_channel_info(self):
if self.session is None:
@@ -359,14 +360,14 @@ class ScraperController:
# This will sort the channels by the least recently scraped.
most_recently_archived = session.query(func.max(RawChannelInfo.date_archived).label("date"), RawChannelInfo.channel.label("channel")).group_by(RawChannelInfo.channel).subquery()
channels = session.query(Channel).\
filter((Channel.source=='researcher')|(Channel.source=='snowball_it')).\
filter((Channel.source=='researcher')|(Channel.source=='snowball_it')|(Channel.source=='snowball_complete')).\
outerjoin(most_recently_archived, Channel.id == most_recently_archived.c.channel).\
order_by(nullsfirst(most_recently_archived.c.date.asc())).all()
session.close()
return self.scrape_channel_info(channels)
def scrape_channels(self, channels: List[Channel], archive_media: bool = True):
def scrape_channels(self, channels: List[Channel], archive_media: bool = True, fetch_old: bool = False):
"""Scrape all posts for all specified channels.
Parameters
@@ -407,20 +408,36 @@ class ScraperController:
handled = True
added = 0
# get most recent post
# Note: a "bug" in Postgres can cause this query to hang for a really long time
# when searching for a single row, hence the limit(10).all() when we really just need
# the first row.
rows = session.query(ScraperResult).where(
ScraperResult.channel == channel.id).order_by(
ScraperResult.date.desc()).limit(10).all()
if fetch_old and channel.platform == 'Telegram':
# get oldest post (currently only for Telegram)
# TODO fix this so that it doesn't have an explicit check on channel.platform (should be generic)
# TODO implement until on all scrapers
rows = session.query(ScraperResult).where(
ScraperResult.channel == channel.id).order_by(
ScraperResult.date.asc(), ScraperResult.id.desc()).limit(10).all()
if len(rows) > 0:
until = rows[0]
else:
until = None
posts = scraper.get_posts(channel, until=until, archive_media=archive_media)
if len(rows) > 0:
since = rows[0]
else:
since = None
# get most recent post
# Note: a "bug" in Postgres can cause this query to hang for a really long time
# when searching for a single row, hence the limit(10).all() when we really just need
# the first row.
rows = session.query(ScraperResult).where(
ScraperResult.channel == channel.id).order_by(
ScraperResult.date.desc(), ScraperResult.id.asc()).limit(10).all()
posts = scraper.get_posts(channel, since=since, archive_media=archive_media)
if len(rows) > 0:
since = rows[0]
else:
since = None
posts = scraper.get_posts(channel, since=since, archive_media=archive_media)
for post in posts:
session.add(post)

View File

@@ -18,7 +18,7 @@ MEDIA_TYPES = ['photo', 'video', 'document', 'webpage']
class TelegramTelethonScraper(Scraper):
"""An implementation of a Scraper for Telegram, using Telethon library"""
__version__ = "TelegramTelethonScraper 0.0.3"
__version__ = "TelegramTelethonScraper 0.0.4"
client = None
def __init__(self, telethon_session_name = None):
@@ -132,10 +132,16 @@ class TelegramTelethonScraper(Scraper):
return True
@logger.catch
def get_posts(self, channel: Channel, since: ScraperResult = None, archive_media: bool = True) -> Generator[ScraperResult, None, None]:
def get_posts(self, channel: Channel, since: ScraperResult = None, until: ScraperResult = None, archive_media: bool = True) -> Generator[ScraperResult, None, None]:
username = TelegramTelethonScraper.get_channel_identifier(channel)
if until is not None:
logger.info(f"Only getting old posts, up to ID {until.platform_id.split('/')[-1]}")
iterator = self.client.iter_messages(username, max_id=int(until.platform_id.split('/')[-1]), wait_time=0, limit=None)
else:
iterator = self.client.iter_messages(username)
for post in self.client.iter_messages(username):
post = None
for post in iterator:
post_url = f'{channel.url}/{post.id}'
logger.trace(f"Archiving post {post_url} from {post.date}")
@@ -169,6 +175,22 @@ class TelegramTelethonScraper(Scraper):
archived_urls=archived_urls,
media_archived=media_archived)
if (post is not None and post.id > 1 and since is None) or (post is not None and since is not None and post.date.replace(tzinfo=timezone.utc) > since.date.replace(tzinfo=timezone.utc)):
logger.info(f"Last post ID is {post.id} / {post.date}, since is {since.date if since is not None else None}, until is {until.platform_id if until is not None else None}, starting again")
new_until = ScraperResult(
scraper=self.__version__,
platform="Telegram",
channel=channel.id,
platform_id=post_url,
date=post.date.replace(tzinfo=timezone.utc),
date_archived=datetime.now(timezone.utc),
raw_data=json.dumps(post.to_dict(), default=str),
archived_urls=archived_urls,
media_archived=media_archived)
for p in self.get_posts(channel, since=since, until=new_until, archive_media=archive_media):
yield p
@logger.catch
def get_profile(self, channel: Channel) -> RawChannelInfo:
username = TelegramTelethonScraper.get_channel_identifier(channel)

View File

@@ -4,6 +4,11 @@ from loguru import logger
from cisticola.base import Channel, ChannelInfo
def standardize_country(s):
_s = s.split('(')[0].split('?')[0]
return _s.strip()
def sync_channels(args, session):
logger.info("Synchronizing channels")
@@ -73,7 +78,7 @@ def sync_channels(args, session):
channel.platform = c["platform"]
channel.url = c["url"]
channel.screenname = c["screenname"]
channel.country = c["country"]
channel.country = list(map(standardize_country, c["country"].split('/')))
channel.influencer = c["influencer"]
channel.public = c["public"]
channel.chat = c["chat"]
@@ -114,7 +119,7 @@ def sync_channels(args, session):
channel.platform = c["platform"]
channel.url = c["url"]
channel.screenname = c["screenname"]
channel.country = c["country"]
channel.country = list(map(standardize_country, c["country"].split('/')))
channel.influencer = c["influencer"]
channel.public = c["public"]
channel.chat = c["chat"]