From 65edde6d20c7510954cb6c74ccaf17833490e415 Mon Sep 17 00:00:00 2001 From: Logan Williams Date: Tue, 22 Mar 2022 11:56:28 +0100 Subject: [PATCH 1/3] Fix bug after merge --- cisticola/base.py | 80 +++++++++++++++-------------------------------- 1 file changed, 25 insertions(+), 55 deletions(-) diff --git a/cisticola/base.py b/cisticola/base.py index d5b10e8..decaab8 100644 --- a/cisticola/base.py +++ b/cisticola/base.py @@ -42,18 +42,6 @@ class ScraperResult: #: Dict in which the keys are the original media URLs from the post, and the corresponding values are the URLs of the archived media files. archived_urls: dict -raw_data_table = Table('raw_data', mapper_registry.metadata, - Column('id', Integer, primary_key=True, - autoincrement=True), - Column('scraper', String), - Column('platform', String), - Column('channel', Integer, ForeignKey('channels.id')), - Column('platform_id', String), - Column('date', DateTime), - Column('raw_data', String), - Column('date_archived', DateTime), - Column('archived_urls', JSON)) - @dataclass class Channel: """Information about a specific channel to be scraped. @@ -98,24 +86,6 @@ class Channel: def hydrate(self): pass -channel_table = Table('channels', mapper_registry.metadata, - Column('id', Integer, primary_key=True, autoincrement=True), - Column('name', String), - Column('platform_id', Integer), - Column('category', String), - Column('platform', String), - Column('url', String), - Column('screenname', String), - Column('country', String), - Column('influencer', String), - Column('public', Boolean), - Column('chat', Boolean), - Column('notes', String), - Column('source', String) - ) - -mapper_registry.map_imperatively(Channel, channel_table) - @dataclass class Post: """An object with fields for columns in the analysis table""" @@ -165,26 +135,6 @@ class Post: def hydrate(self): pass -post_table = Table('posts', mapper_registry.metadata, - Column('id', Integer, primary_key=True, - autoincrement=True), - Column('raw_id', Integer, ForeignKey('raw_data.id')), - Column('platform_id', Integer), - Column('scraper', String), - Column('transformer', String), - Column('platform', String), - Column('channel', Integer, ForeignKey('channels.id')), - Column('date', DateTime), - Column('date_archived', DateTime), - Column('url', String), - Column('author_id', String), - Column('author_username', String), - Column('content', String), - Column('forwarded_from', Integer, ForeignKey('channels.id')), - Column('reply_to', Integer, ForeignKey('posts.id')) - ) - -mapper_registry.map_imperatively(Post, post_table) @dataclass class Media: @@ -273,28 +223,47 @@ raw_data_table = Table('raw_data', mapper_registry.metadata, autoincrement=True), Column('scraper', String), Column('platform', String), - Column('channel', Integer), + Column('channel', Integer, ForeignKey('channels.id')), Column('platform_id', String), Column('date', DateTime), Column('raw_data', String), Column('date_archived', DateTime), Column('archived_urls', JSON)) +channel_table = Table('channels', mapper_registry.metadata, + Column('id', Integer, primary_key=True, autoincrement=True), + Column('name', String), + Column('platform_id', Integer), + Column('category', String), + Column('platform', String), + Column('url', String), + Column('screenname', String), + Column('country', String), + Column('influencer', String), + Column('public', Boolean), + Column('chat', Boolean), + Column('notes', String), + Column('source', String) + ) -analysis_table = Table('analysis', mapper_registry.metadata, +post_table = Table('posts', mapper_registry.metadata, Column('id', Integer, primary_key=True, autoincrement=True), Column('raw_id', Integer, ForeignKey('raw_data.id')), + Column('platform_id', Integer), Column('scraper', String), Column('transformer', String), Column('platform', String), - Column('channel', Integer), + Column('channel', Integer, ForeignKey('channels.id')), Column('date', DateTime), Column('date_archived', DateTime), Column('url', String), Column('author_id', String), Column('author_username', String), - Column('content', String)) + Column('content', String), + Column('forwarded_from', Integer, ForeignKey('channels.id')), + Column('reply_to', Integer, ForeignKey('posts.id')) + ) media_table = Table('media', mapper_registry.metadata, Column('id', Integer, primary_key=True, @@ -307,7 +276,8 @@ media_table = Table('media', mapper_registry.metadata, Column('exif', String), Column('ocr', String)) -mapper_registry.map_imperatively(TransformedResult, analysis_table) +mapper_registry.map_imperatively(Post, post_table) +mapper_registry.map_imperatively(Channel, channel_table) mapper_registry.map_imperatively(ScraperResult, raw_data_table) mapper_registry.map_imperatively(Media, media_table, polymorphic_on='type', polymorphic_identity='media') mapper_registry.map_imperatively(Image, media_table, inherits=Media, polymorphic_on='type', polymorphic_identity='image') From 63fdae9f1b374e7871a3e63c3bd5f1a6a861a8ef Mon Sep 17 00:00:00 2001 From: Logan Williams Date: Thu, 24 Mar 2022 16:52:11 +0100 Subject: [PATCH 2/3] Implement media archiving after the initial scrape for Twitter and Telethon --- Pipfile | 1 + cisticola/scraper/base.py | 40 ++++++++++ cisticola/scraper/telegram_telethon.py | 102 ++++++++++++++++++++----- cisticola/scraper/twitter.py | 44 ++++++----- test.py | 21 ++--- 5 files changed, 157 insertions(+), 51 deletions(-) diff --git a/Pipfile b/Pipfile index 47ccb65..3be4b27 100644 --- a/Pipfile +++ b/Pipfile @@ -21,6 +21,7 @@ pytesseract = "*" pyexiftool = {git = "https://github.com/smarnach/pyexiftool.git"} instaloader = "*" gspread = "*" +cryptg = "*" [dev-packages] pytest = "*" diff --git a/cisticola/scraper/base.py b/cisticola/scraper/base.py index ddc5510..9fb2029 100644 --- a/cisticola/scraper/base.py +++ b/cisticola/scraper/base.py @@ -234,6 +234,16 @@ class Scraper: return archived_url + def archive_files(self, result: ScraperResult) -> ScraperResult: + for url in result.archived_urls: + if result.archived_urls[url] is None: + media_blob, content_type, key = self.url_to_blob(url) + archived_url = self.archive_blob(media_blob, content_type, key) + result.archived_urls[url] = archived_url + + return result + + def can_handle(self, channel: Channel) -> bool: """Whether or not the scraper can scrape the specified channel. @@ -353,6 +363,36 @@ class ScraperController: if not handled: logger.warning(f"No handler found for Channel {channel}") + @logger.catch(reraise = True) + def archive_unarchived_media(self): + if self.session is None: + logger.error("No DB session") + return + + session = self.session() + + posts = session.query(ScraperResult).filter(ScraperResult.archived_urls.like("%null%")).all() + + logger.info(f"Found {len(posts)} posts without media. Archiving now") + + for post in posts: + handled = False + + for scraper in self.scrapers: + if scraper.__version__ == post.scraper: + handled = True + logger.info(f"{scraper} is archiving media for {post}") + post = scraper.archive_files(post) + + session.query(ScraperResult).where(ScraperResult.id == post.id).update({'archived_urls': post.archived_urls}) + session.commit() + break + + if not handled: + logger.warning(f"No handler found for post scraped with {post.scraper}") + + session.commit() + def connect_to_db(self, engine): """Connect the specified SQLAlchemy engine to the controller. """ diff --git a/cisticola/scraper/telegram_telethon.py b/cisticola/scraper/telegram_telethon.py index b8231bc..34532dc 100644 --- a/cisticola/scraper/telegram_telethon.py +++ b/cisticola/scraper/telegram_telethon.py @@ -4,9 +4,11 @@ import os import json import tempfile from pathlib import Path +import time from loguru import logger from telethon.sync import TelegramClient +from telethon.tl import types from cisticola.base import Channel, ScraperResult from cisticola.scraper.base import Scraper @@ -23,12 +25,79 @@ class TelegramTelethonScraper(Scraper): username = username.split('s/')[1] return username + def archive_files(self, result: ScraperResult, client : TelegramClient = None) -> ScraperResult: + if len(result.archived_urls.keys()) == 0: + return result + + if client is None: + api_id = os.environ['TELEGRAM_API_ID'] + api_hash = os.environ['TELEGRAM_API_HASH'] + phone = os.environ['TELEGRAM_PHONE'] + + with TelegramClient(phone, api_id, api_hash) as client: + return self.archive_files(result, client) + + if len(list(result.archived_urls.keys())) != 1: + logger.warning(f"Expected 1 key in archived_urls, found {result.archived_keys}") + else: + key = list(result.archived_urls.keys())[0] + + if result.archived_urls[key] is None: + raw = json.loads(result.raw_data) + + message = client.get_messages(raw['peer_id']['channel_id'], ids=[raw['id']]) + + blob = None + if len(message) > 0: + blob, output_file_with_ext = self.archive_post_media(message[0], client) + else: + logger.warning("No message retrieved") + + if blob is not None: + # TODO specify Content-Type + archived_url = self.archive_blob(blob = blob, content_type = '', key = output_file_with_ext) + result.archived_urls[key] = archived_url + return result + else: + logger.warning("Downloaded blob was None") + + return result + + def archive_post_media(self, post : types.Message, client : TelegramClient = None): + logger.debug(f"Archiving post {post}") + + if post.media is None: + return None, None + + logger.debug(f"Archiving media {post.media}") + + if client is None: + api_id = os.environ['TELEGRAM_API_ID'] + api_hash = os.environ['TELEGRAM_API_HASH'] + phone = os.environ['TELEGRAM_PHONE'] + + with TelegramClient(phone, api_id, api_hash) as client: + return self.archive_post_media(post, client=client) + + key = f'{post.peer_id.channel_id}_{post.id}' + + with tempfile.TemporaryDirectory() as temp_dir: + output_file = Path(temp_dir, key) + + client.download_media(post.media, output_file) + + output_file_with_ext = os.listdir(temp_dir)[0] + filename = Path(temp_dir, output_file_with_ext) + + with open(filename, 'rb') as f: + blob = f.read() + return (blob, output_file_with_ext) + def can_handle(self, channel): if channel.platform == "Telegram" and channel.public and not channel.chat: return True def get_posts(self, channel: Channel, since: ScraperResult = None, archive_media: bool = True) -> Generator[ScraperResult, None, None]: - username = self.get_username_from_url(channel.url) api_id = os.environ['TELEGRAM_API_ID'] @@ -36,34 +105,27 @@ class TelegramTelethonScraper(Scraper): phone = os.environ['TELEGRAM_PHONE'] with TelegramClient(phone, api_id, api_hash) as client: - for post in client.iter_messages(username): + post_url = f'{channel.url}/{post.id}' + + logger.info(f"Archiving post {post_url} from {post.date}") if since is not None and post.date.replace(tzinfo=timezone.utc) <= since.date.replace(tzinfo=timezone.utc): logger.info(f'Timestamp of post {post} is earlier than the previous archived timestamp {post.date.replace(tzinfo=timezone.utc)}') break - post_url = f'{channel.url}/{post.id}' - key = f'{username}_{post.id}' - archived_urls = {} + logger.info(f"Archiving post {post_url}") - if archive_media: + if post.media is not None: + archived_urls[post_url] = None - if post.media is not None: - with tempfile.TemporaryDirectory() as temp_dir: - output_file = Path(temp_dir, key) - client.download_media(post.media, output_file) - - output_file_with_ext = os.listdir(temp_dir)[0] - filename = Path(temp_dir, output_file_with_ext) - - with open(filename, 'rb') as f: - blob = f.read() - - # TODO specify Content-Type - archived_url = self.archive_blob(blob = blob, content_type = '', key = output_file_with_ext) - archived_urls[post_url] = archived_url + if archive_media: + blob, output_file_with_ext = self.archive_post_media(post, client) + if blob is not None: + # TODO specify Content-Type + archived_url = self.archive_blob(blob = blob, content_type = '', key = output_file_with_ext) + archived_urls[post_url] = archived_url yield ScraperResult( scraper=self.__version__, diff --git a/cisticola/scraper/twitter.py b/cisticola/scraper/twitter.py index 8209282..17dece2 100644 --- a/cisticola/scraper/twitter.py +++ b/cisticola/scraper/twitter.py @@ -28,31 +28,33 @@ class TwitterScraper(Scraper): archived_urls = {} - if archive_media: - media_list = [] - if tweet.media: - media_list += tweet.media + media_list = [] + if tweet.media: + media_list += tweet.media - if tweet.retweetedTweet and tweet.retweetedTweet.media: - media_list += tweet.retweetedTweet.media + if tweet.retweetedTweet and tweet.retweetedTweet.media: + media_list += tweet.retweetedTweet.media - if tweet.quotedTweet and tweet.quotedTweet.media: - media_list += tweet.quotedTweet.media + if tweet.quotedTweet and tweet.quotedTweet.media: + media_list += tweet.quotedTweet.media - for media in media_list: - if type(media) == Video: - variant = max( - [v for v in media.variants if v.bitrate], key=lambda v: v.bitrate) - url = variant.url - elif type(media) == Gif: - url = media.variants[0].url - elif type(media) == Photo: - url = media.fullUrl - else: - logger.warning(f"Could not get media URL of {media}") - url = None + for media in media_list: + if type(media) == Video: + variant = max( + [v for v in media.variants if v.bitrate], key=lambda v: v.bitrate) + url = variant.url + elif type(media) == Gif: + url = media.variants[0].url + elif type(media) == Photo: + url = media.fullUrl + else: + logger.warning(f"Could not get media URL of {media}") + url = None - if url is not None and url not in archived_urls: + if url is not None and url not in archived_urls: + archived_urls[url] = None + + if archive_media: media_blob, content_type, key = self.url_to_blob(url) archived_url = self.archive_blob(media_blob, content_type, key) archived_urls[url] = archived_url diff --git a/test.py b/test.py index 131fd71..8726820 100644 --- a/test.py +++ b/test.py @@ -28,7 +28,6 @@ scrapers = [ GettrScraper(), OdyseeScraper(), RumbleScraper(), - TelegramSnscrapeScraper(), TelegramTelethonScraper(), TwitterScraper()] @@ -43,15 +42,15 @@ session = session_generator() gc = gspread.service_account(filename='service_account.json') # Open a sheet from a spreadsheet in one go -wks = gc.open_by_url("https://docs.google.com/spreadsheets/d/1yxd6-2Mp0jZ8r9XJklb39WE-iIMrKRyA2kymJcIfGis/edit#gid=0") +wks = gc.open_by_url("https://docs.google.com/spreadsheets/d/1k5VgqREoA3v1r7bkVq7TOTRDtdYqTMWkQnsZpRbntpw/edit#gid=0") channels = wks.worksheet("channels").get_all_records() for c in channels: del c['followers'] for k in c.keys(): - if c[k] == 'TRUE': c[k] = True - if c[k] == 'FALSE': c[k] = False + if c[k] == 'TRUE' or c[k] == 'yes': c[k] = True + if c[k] == 'FALSE' or c[k] == 'no': c[k] = False # check to see if this already exists, channel = session.query(Channel).filter_by(platform_id=c['platform_id'], platform=c['platform']).first() @@ -63,11 +62,13 @@ for c in channels: session.commit() controller.connect_to_db(engine) -controller.scrape_all_channels(archive_media = True) +controller.scrape_all_channels(archive_media = False) -transformer = TwitterTransformer() +controller.archive_unarchived_media() -etl_controller = ETLController() -etl_controller.register_transformer(transformer) -etl_controller.connect_to_db(engine) -etl_controller.transform_all_untransformed() +# transformer = TwitterTransformer() + +# etl_controller = ETLController() +# etl_controller.register_transformer(transformer) +# etl_controller.connect_to_db(engine) +# etl_controller.transform_all_untransformed() From a80dbddbbc2e8ced29041795c6002b70bdd0f683 Mon Sep 17 00:00:00 2001 From: Logan Williams Date: Mon, 28 Mar 2022 11:42:15 +0200 Subject: [PATCH 3/3] Add snscrape delayed media archiving support; add explicit bool --- cisticola/base.py | 6 +++++- cisticola/scraper/base.py | 9 ++++++--- cisticola/scraper/telegram_snscrape.py | 23 +++++++++++------------ cisticola/scraper/telegram_telethon.py | 4 +++- 4 files changed, 25 insertions(+), 17 deletions(-) diff --git a/cisticola/base.py b/cisticola/base.py index decaab8..ff7f136 100644 --- a/cisticola/base.py +++ b/cisticola/base.py @@ -41,6 +41,9 @@ class ScraperResult: #: Dict in which the keys are the original media URLs from the post, and the corresponding values are the URLs of the archived media files. archived_urls: dict + + #: Has the media in this post been archived? + media_archived: bool @dataclass class Channel: @@ -228,7 +231,8 @@ raw_data_table = Table('raw_data', mapper_registry.metadata, Column('date', DateTime), Column('raw_data', String), Column('date_archived', DateTime), - Column('archived_urls', JSON)) + Column('archived_urls', JSON), + Column('media_archived', Boolean)) channel_table = Table('channels', mapper_registry.metadata, Column('id', Integer, primary_key=True, autoincrement=True), diff --git a/cisticola/scraper/base.py b/cisticola/scraper/base.py index 9fb2029..c887ee1 100644 --- a/cisticola/scraper/base.py +++ b/cisticola/scraper/base.py @@ -241,6 +241,7 @@ class Scraper: archived_url = self.archive_blob(media_blob, content_type, key) result.archived_urls[url] = archived_url + result.media_archived = True return result @@ -371,7 +372,7 @@ class ScraperController: session = self.session() - posts = session.query(ScraperResult).filter(ScraperResult.archived_urls.like("%null%")).all() + posts = session.query(ScraperResult).where(ScraperResult.media_archived == False).all() logger.info(f"Found {len(posts)} posts without media. Archiving now") @@ -384,8 +385,10 @@ class ScraperController: logger.info(f"{scraper} is archiving media for {post}") post = scraper.archive_files(post) - session.query(ScraperResult).where(ScraperResult.id == post.id).update({'archived_urls': post.archived_urls}) - session.commit() + if post: + session.query(ScraperResult).where(ScraperResult.id == post.id).update({'archived_urls': post.archived_urls, 'media_archived': True}) + session.commit() + break if not handled: diff --git a/cisticola/scraper/telegram_snscrape.py b/cisticola/scraper/telegram_snscrape.py index ec5b292..a0b758b 100644 --- a/cisticola/scraper/telegram_snscrape.py +++ b/cisticola/scraper/telegram_snscrape.py @@ -30,19 +30,17 @@ class TelegramSnscrapeScraper(Scraper): archived_urls = {} + for image_url in post.images: + archived_urls[image_url] = None + + if post.video: + archived_urls[post.video] = None + if archive_media: - - for image_url in post.images: - logger.debug(f'Archiving image: {image_url}') - media_blob, content_type, key = self.url_to_blob(image_url) + for url in archived_urls: + media_blob, content_type, key = self.url_to_blob(url) archived_url = self.archive_blob(media_blob, content_type, key) - archived_urls[image_url] = archived_url - - if post.video: - logger.debug(f'Archiving video: {post.video}') - media_blob, content_type, key = self.url_to_blob(post.video) - archived_url = self.archive_blob(media_blob, content_type, key) - archived_urls[post.video] = archived_url + archived_urls[url] = archived_url yield ScraperResult( scraper=self.__version__, @@ -52,5 +50,6 @@ class TelegramSnscrapeScraper(Scraper): date=post.date, date_archived=datetime.now(timezone.utc), raw_data=post.json(), - archived_urls=archived_urls + archived_urls=archived_urls, + media_archived=archive_media ) diff --git a/cisticola/scraper/telegram_telethon.py b/cisticola/scraper/telegram_telethon.py index 34532dc..e9740c9 100644 --- a/cisticola/scraper/telegram_telethon.py +++ b/cisticola/scraper/telegram_telethon.py @@ -61,6 +61,7 @@ class TelegramTelethonScraper(Scraper): else: logger.warning("Downloaded blob was None") + result.media_archived = True return result def archive_post_media(self, post : types.Message, client : TelegramClient = None): @@ -135,4 +136,5 @@ class TelegramTelethonScraper(Scraper): date=post.date.replace(tzinfo=timezone.utc), date_archived=datetime.now(timezone.utc), raw_data=json.dumps(post.to_dict(), default=str), - archived_urls=archived_urls) + archived_urls=archived_urls, + media_archived=archive_media)