diff --git a/Pipfile b/Pipfile index 47ccb65..3be4b27 100644 --- a/Pipfile +++ b/Pipfile @@ -21,6 +21,7 @@ pytesseract = "*" pyexiftool = {git = "https://github.com/smarnach/pyexiftool.git"} instaloader = "*" gspread = "*" +cryptg = "*" [dev-packages] pytest = "*" diff --git a/cisticola/scraper/base.py b/cisticola/scraper/base.py index ddc5510..9fb2029 100644 --- a/cisticola/scraper/base.py +++ b/cisticola/scraper/base.py @@ -234,6 +234,16 @@ class Scraper: return archived_url + def archive_files(self, result: ScraperResult) -> ScraperResult: + for url in result.archived_urls: + if result.archived_urls[url] is None: + media_blob, content_type, key = self.url_to_blob(url) + archived_url = self.archive_blob(media_blob, content_type, key) + result.archived_urls[url] = archived_url + + return result + + def can_handle(self, channel: Channel) -> bool: """Whether or not the scraper can scrape the specified channel. @@ -353,6 +363,36 @@ class ScraperController: if not handled: logger.warning(f"No handler found for Channel {channel}") + @logger.catch(reraise = True) + def archive_unarchived_media(self): + if self.session is None: + logger.error("No DB session") + return + + session = self.session() + + posts = session.query(ScraperResult).filter(ScraperResult.archived_urls.like("%null%")).all() + + logger.info(f"Found {len(posts)} posts without media. Archiving now") + + for post in posts: + handled = False + + for scraper in self.scrapers: + if scraper.__version__ == post.scraper: + handled = True + logger.info(f"{scraper} is archiving media for {post}") + post = scraper.archive_files(post) + + session.query(ScraperResult).where(ScraperResult.id == post.id).update({'archived_urls': post.archived_urls}) + session.commit() + break + + if not handled: + logger.warning(f"No handler found for post scraped with {post.scraper}") + + session.commit() + def connect_to_db(self, engine): """Connect the specified SQLAlchemy engine to the controller. """ diff --git a/cisticola/scraper/telegram_telethon.py b/cisticola/scraper/telegram_telethon.py index b8231bc..34532dc 100644 --- a/cisticola/scraper/telegram_telethon.py +++ b/cisticola/scraper/telegram_telethon.py @@ -4,9 +4,11 @@ import os import json import tempfile from pathlib import Path +import time from loguru import logger from telethon.sync import TelegramClient +from telethon.tl import types from cisticola.base import Channel, ScraperResult from cisticola.scraper.base import Scraper @@ -23,12 +25,79 @@ class TelegramTelethonScraper(Scraper): username = username.split('s/')[1] return username + def archive_files(self, result: ScraperResult, client : TelegramClient = None) -> ScraperResult: + if len(result.archived_urls.keys()) == 0: + return result + + if client is None: + api_id = os.environ['TELEGRAM_API_ID'] + api_hash = os.environ['TELEGRAM_API_HASH'] + phone = os.environ['TELEGRAM_PHONE'] + + with TelegramClient(phone, api_id, api_hash) as client: + return self.archive_files(result, client) + + if len(list(result.archived_urls.keys())) != 1: + logger.warning(f"Expected 1 key in archived_urls, found {result.archived_keys}") + else: + key = list(result.archived_urls.keys())[0] + + if result.archived_urls[key] is None: + raw = json.loads(result.raw_data) + + message = client.get_messages(raw['peer_id']['channel_id'], ids=[raw['id']]) + + blob = None + if len(message) > 0: + blob, output_file_with_ext = self.archive_post_media(message[0], client) + else: + logger.warning("No message retrieved") + + if blob is not None: + # TODO specify Content-Type + archived_url = self.archive_blob(blob = blob, content_type = '', key = output_file_with_ext) + result.archived_urls[key] = archived_url + return result + else: + logger.warning("Downloaded blob was None") + + return result + + def archive_post_media(self, post : types.Message, client : TelegramClient = None): + logger.debug(f"Archiving post {post}") + + if post.media is None: + return None, None + + logger.debug(f"Archiving media {post.media}") + + if client is None: + api_id = os.environ['TELEGRAM_API_ID'] + api_hash = os.environ['TELEGRAM_API_HASH'] + phone = os.environ['TELEGRAM_PHONE'] + + with TelegramClient(phone, api_id, api_hash) as client: + return self.archive_post_media(post, client=client) + + key = f'{post.peer_id.channel_id}_{post.id}' + + with tempfile.TemporaryDirectory() as temp_dir: + output_file = Path(temp_dir, key) + + client.download_media(post.media, output_file) + + output_file_with_ext = os.listdir(temp_dir)[0] + filename = Path(temp_dir, output_file_with_ext) + + with open(filename, 'rb') as f: + blob = f.read() + return (blob, output_file_with_ext) + def can_handle(self, channel): if channel.platform == "Telegram" and channel.public and not channel.chat: return True def get_posts(self, channel: Channel, since: ScraperResult = None, archive_media: bool = True) -> Generator[ScraperResult, None, None]: - username = self.get_username_from_url(channel.url) api_id = os.environ['TELEGRAM_API_ID'] @@ -36,34 +105,27 @@ class TelegramTelethonScraper(Scraper): phone = os.environ['TELEGRAM_PHONE'] with TelegramClient(phone, api_id, api_hash) as client: - for post in client.iter_messages(username): + post_url = f'{channel.url}/{post.id}' + + logger.info(f"Archiving post {post_url} from {post.date}") if since is not None and post.date.replace(tzinfo=timezone.utc) <= since.date.replace(tzinfo=timezone.utc): logger.info(f'Timestamp of post {post} is earlier than the previous archived timestamp {post.date.replace(tzinfo=timezone.utc)}') break - post_url = f'{channel.url}/{post.id}' - key = f'{username}_{post.id}' - archived_urls = {} + logger.info(f"Archiving post {post_url}") - if archive_media: + if post.media is not None: + archived_urls[post_url] = None - if post.media is not None: - with tempfile.TemporaryDirectory() as temp_dir: - output_file = Path(temp_dir, key) - client.download_media(post.media, output_file) - - output_file_with_ext = os.listdir(temp_dir)[0] - filename = Path(temp_dir, output_file_with_ext) - - with open(filename, 'rb') as f: - blob = f.read() - - # TODO specify Content-Type - archived_url = self.archive_blob(blob = blob, content_type = '', key = output_file_with_ext) - archived_urls[post_url] = archived_url + if archive_media: + blob, output_file_with_ext = self.archive_post_media(post, client) + if blob is not None: + # TODO specify Content-Type + archived_url = self.archive_blob(blob = blob, content_type = '', key = output_file_with_ext) + archived_urls[post_url] = archived_url yield ScraperResult( scraper=self.__version__, diff --git a/cisticola/scraper/twitter.py b/cisticola/scraper/twitter.py index 8209282..17dece2 100644 --- a/cisticola/scraper/twitter.py +++ b/cisticola/scraper/twitter.py @@ -28,31 +28,33 @@ class TwitterScraper(Scraper): archived_urls = {} - if archive_media: - media_list = [] - if tweet.media: - media_list += tweet.media + media_list = [] + if tweet.media: + media_list += tweet.media - if tweet.retweetedTweet and tweet.retweetedTweet.media: - media_list += tweet.retweetedTweet.media + if tweet.retweetedTweet and tweet.retweetedTweet.media: + media_list += tweet.retweetedTweet.media - if tweet.quotedTweet and tweet.quotedTweet.media: - media_list += tweet.quotedTweet.media + if tweet.quotedTweet and tweet.quotedTweet.media: + media_list += tweet.quotedTweet.media - for media in media_list: - if type(media) == Video: - variant = max( - [v for v in media.variants if v.bitrate], key=lambda v: v.bitrate) - url = variant.url - elif type(media) == Gif: - url = media.variants[0].url - elif type(media) == Photo: - url = media.fullUrl - else: - logger.warning(f"Could not get media URL of {media}") - url = None + for media in media_list: + if type(media) == Video: + variant = max( + [v for v in media.variants if v.bitrate], key=lambda v: v.bitrate) + url = variant.url + elif type(media) == Gif: + url = media.variants[0].url + elif type(media) == Photo: + url = media.fullUrl + else: + logger.warning(f"Could not get media URL of {media}") + url = None - if url is not None and url not in archived_urls: + if url is not None and url not in archived_urls: + archived_urls[url] = None + + if archive_media: media_blob, content_type, key = self.url_to_blob(url) archived_url = self.archive_blob(media_blob, content_type, key) archived_urls[url] = archived_url diff --git a/test.py b/test.py index 131fd71..8726820 100644 --- a/test.py +++ b/test.py @@ -28,7 +28,6 @@ scrapers = [ GettrScraper(), OdyseeScraper(), RumbleScraper(), - TelegramSnscrapeScraper(), TelegramTelethonScraper(), TwitterScraper()] @@ -43,15 +42,15 @@ session = session_generator() gc = gspread.service_account(filename='service_account.json') # Open a sheet from a spreadsheet in one go -wks = gc.open_by_url("https://docs.google.com/spreadsheets/d/1yxd6-2Mp0jZ8r9XJklb39WE-iIMrKRyA2kymJcIfGis/edit#gid=0") +wks = gc.open_by_url("https://docs.google.com/spreadsheets/d/1k5VgqREoA3v1r7bkVq7TOTRDtdYqTMWkQnsZpRbntpw/edit#gid=0") channels = wks.worksheet("channels").get_all_records() for c in channels: del c['followers'] for k in c.keys(): - if c[k] == 'TRUE': c[k] = True - if c[k] == 'FALSE': c[k] = False + if c[k] == 'TRUE' or c[k] == 'yes': c[k] = True + if c[k] == 'FALSE' or c[k] == 'no': c[k] = False # check to see if this already exists, channel = session.query(Channel).filter_by(platform_id=c['platform_id'], platform=c['platform']).first() @@ -63,11 +62,13 @@ for c in channels: session.commit() controller.connect_to_db(engine) -controller.scrape_all_channels(archive_media = True) +controller.scrape_all_channels(archive_media = False) -transformer = TwitterTransformer() +controller.archive_unarchived_media() -etl_controller = ETLController() -etl_controller.register_transformer(transformer) -etl_controller.connect_to_db(engine) -etl_controller.transform_all_untransformed() +# transformer = TwitterTransformer() + +# etl_controller = ETLController() +# etl_controller.register_transformer(transformer) +# etl_controller.connect_to_db(engine) +# etl_controller.transform_all_untransformed()