Add an option to scape posts older than the database record as well as newer (Telegram only)

This commit is contained in:
Logan Williams
2022-09-05 13:48:01 +00:00
parent 86656f8ba3
commit c15022402d
5 changed files with 408 additions and 400 deletions

13
Pipfile
View File

@@ -26,10 +26,8 @@ pytz = "*"
langdetect = "*"
spacy = "==3.2.4"
ocrd-pyexiftool = "*"
gabber = {git = "https://github.com/stanfordio/gabber.git"}
snscrape = {git = "https://github.com/bellingcat/snscrape"}
polyphemus = {git = "https://github.com/bellingcat/polyphemus"}
filelock = "*"
telethon = "*"
[dev-packages]
pytest = "*"
@@ -45,3 +43,12 @@ python_version = "3.9"
[pipenv]
allow_prereleases = true
[packages.gabber]
git = "https://github.com/stanfordio/gabber.git"
[packages.snscrape]
git = "https://github.com/bellingcat/snscrape"
[packages.polyphemus]
git = "https://github.com/bellingcat/polyphemus"

714
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

8
app.py
View File

@@ -74,6 +74,11 @@ def scrape_channels(args):
controller = get_scraper_controller()
controller.scrape_all_channels(archive_media=args.media)
def scrape_channels_old(args):
logger.info(f"Scraping old posts from channels, media: {args.media}")
controller = get_scraper_controller()
controller.scrape_all_channels(archive_media=args.media, fetch_old=True)
def scrape_channel_info(args):
logger.info(f"Scraping channel info")
@@ -147,6 +152,9 @@ if __name__ == "__main__":
elif args.command == "scrape-channels":
logger.add("logs/scrape-channels.log", level="TRACE", rotation="100 MB")
scrape_channels(args)
elif args.command == "scrape-channels-old":
logger.add("logs/scrape-channels-old.log", level="TRACE", rotation="100 MB")
scrape_channels_old(args)
elif args.command == "archive-media":
logger.add("logs/archive-media.log", level="TRACE", rotation="100 MB")
archive_media(args)

View File

@@ -335,18 +335,19 @@ class ScraperController:
def remove_all_scrapers(self):
self.scrapers = []
def scrape_all_channels(self, archive_media: bool = True):
def scrape_all_channels(self, archive_media: bool = True, fetch_old: bool = False):
if self.session is None:
logger.error("No DB session")
return
session = self.session()
# TODO there should be a better/more generic way of selecting scrapeable channels
channels = session.query(Channel).filter((Channel.source=='researcher')|(Channel.source=='snowball_it')).all()
session.close()
return self.scrape_channels(channels, archive_media=archive_media)
return self.scrape_channels(channels, archive_media=archive_media, fetch_old=fetch_old)
def scrape_all_channel_info(self):
if self.session is None:
@@ -366,7 +367,7 @@ class ScraperController:
session.close()
return self.scrape_channel_info(channels)
def scrape_channels(self, channels: List[Channel], archive_media: bool = True):
def scrape_channels(self, channels: List[Channel], archive_media: bool = True, fetch_old: bool = False):
"""Scrape all posts for all specified channels.
Parameters
@@ -407,20 +408,36 @@ class ScraperController:
handled = True
added = 0
# get most recent post
# Note: a "bug" in Postgres can cause this query to hang for a really long time
# when searching for a single row, hence the limit(10).all() when we really just need
# the first row.
rows = session.query(ScraperResult).where(
ScraperResult.channel == channel.id).order_by(
ScraperResult.date.desc()).limit(10).all()
if fetch_old and channel.platform == 'Telegram':
# get oldest post (currently only for Telegram)
# TODO fix this so that it doesn't have an explicit check on channel.platform (should be generic)
# TODO implement until on all scrapers
rows = session.query(ScraperResult).where(
ScraperResult.channel == channel.id).order_by(
ScraperResult.date.asc(), ScraperResult.id.desc()).limit(10).all()
if len(rows) > 0:
until = rows[0]
else:
until = None
posts = scraper.get_posts(channel, until=until, archive_media=archive_media)
if len(rows) > 0:
since = rows[0]
else:
since = None
# get most recent post
# Note: a "bug" in Postgres can cause this query to hang for a really long time
# when searching for a single row, hence the limit(10).all() when we really just need
# the first row.
rows = session.query(ScraperResult).where(
ScraperResult.channel == channel.id).order_by(
ScraperResult.date.desc(), ScraperResult.id.asc()).limit(10).all()
posts = scraper.get_posts(channel, since=since, archive_media=archive_media)
if len(rows) > 0:
since = rows[0]
else:
since = None
posts = scraper.get_posts(channel, since=since, archive_media=archive_media)
for post in posts:
session.add(post)

View File

@@ -18,7 +18,7 @@ MEDIA_TYPES = ['photo', 'video', 'document', 'webpage']
class TelegramTelethonScraper(Scraper):
"""An implementation of a Scraper for Telegram, using Telethon library"""
__version__ = "TelegramTelethonScraper 0.0.3"
__version__ = "TelegramTelethonScraper 0.0.4"
client = None
def __init__(self, telethon_session_name = None):
@@ -132,10 +132,16 @@ class TelegramTelethonScraper(Scraper):
return True
@logger.catch
def get_posts(self, channel: Channel, since: ScraperResult = None, archive_media: bool = True) -> Generator[ScraperResult, None, None]:
def get_posts(self, channel: Channel, since: ScraperResult = None, until: ScraperResult = None, archive_media: bool = True) -> Generator[ScraperResult, None, None]:
username = TelegramTelethonScraper.get_channel_identifier(channel)
if until is not None:
logger.info(f"Only getting old posts, up to ID {until.platform_id.split('/')[-1]}")
iterator = self.client.iter_messages(username, max_id=int(until.platform_id.split('/')[-1]), wait_time=0, limit=None)
else:
iterator = self.client.iter_messages(username)
for post in self.client.iter_messages(username):
post = None
for post in iterator:
post_url = f'{channel.url}/{post.id}'
logger.trace(f"Archiving post {post_url} from {post.date}")
@@ -169,6 +175,22 @@ class TelegramTelethonScraper(Scraper):
archived_urls=archived_urls,
media_archived=media_archived)
if (post is not None and post.id > 1 and since is None) or (post is not None and since is not None and post.date.replace(tzinfo=timezone.utc) > since.date.replace(tzinfo=timezone.utc)):
logger.info(f"Last post ID is {post.id} / {post.date}, since is {since.date if since is not None else None}, until is {until.platform_id if until is not None else None}, starting again")
new_until = ScraperResult(
scraper=self.__version__,
platform="Telegram",
channel=channel.id,
platform_id=post_url,
date=post.date.replace(tzinfo=timezone.utc),
date_archived=datetime.now(timezone.utc),
raw_data=json.dumps(post.to_dict(), default=str),
archived_urls=archived_urls,
media_archived=media_archived)
for p in self.get_posts(channel, since=since, until=new_until, archive_media=archive_media):
yield p
@logger.catch
def get_profile(self, channel: Channel) -> RawChannelInfo:
username = TelegramTelethonScraper.get_channel_identifier(channel)