From add83c96500171ad61f5808d82ee4bdab6b7969a Mon Sep 17 00:00:00 2001 From: Patrick Robertson Date: Tue, 7 Jan 2025 19:40:19 +0100 Subject: [PATCH 1/2] Remove snscrape from twitter_archiver 1. snscrape twitter downloader no longer works (ref: https://github.com/JustAnotherArchivist/snscrape/issues/1045) 2. snscrape limits python to versions <3.12 --- .../archivers/twitter_archiver.py | 42 +------------------ 1 file changed, 1 insertion(+), 41 deletions(-) diff --git a/src/auto_archiver/archivers/twitter_archiver.py b/src/auto_archiver/archivers/twitter_archiver.py index 6735488..1a356ca 100644 --- a/src/auto_archiver/archivers/twitter_archiver.py +++ b/src/auto_archiver/archivers/twitter_archiver.py @@ -2,7 +2,6 @@ import re, requests, mimetypes, json from typing import Union from datetime import datetime from loguru import logger -from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo from yt_dlp import YoutubeDL from yt_dlp.extractor.twitter import TwitterIE from slugify import slugify @@ -49,7 +48,7 @@ class TwitterArchiver(Archiver): username, tweet_id = self.get_username_tweet_id(url) if not username: return False - strategies = [self.download_yt_dlp, self.download_snscrape, self.download_syndication] + strategies = [self.download_yt_dlp, self.download_syndication] for strategy in strategies: logger.debug(f"Trying {strategy.__name__} for {url=}") try: @@ -61,45 +60,6 @@ class TwitterArchiver(Archiver): logger.warning(f"No free strategy worked for {url}") return False - - def download_snscrape(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]: - scr = TwitterTweetScraper(tweet_id) - try: - tweet = next(scr.get_items()) - except Exception as ex: - logger.warning(f"SNSCRAPE FAILED, can't get tweet: {type(ex).__name__} occurred. args: {ex.args}") - return False - - result = Metadata() - result.set_title(tweet.content).set_content(tweet.json()).set_timestamp(tweet.date) - if tweet.media is None: - logger.debug(f'No media found, archiving tweet text only') - return result - - for i, tweet_media in enumerate(tweet.media): - media = Media(filename="") - mimetype = "" - if type(tweet_media) == Video: - variant = max( - [v for v in tweet_media.variants if v.bitrate], key=lambda v: v.bitrate) - media.set("src", variant.url).set("duration", tweet_media.duration) - mimetype = variant.contentType - elif type(tweet_media) == Gif: - variant = tweet_media.variants[0] - media.set("src", variant.url) - mimetype = variant.contentType - elif type(tweet_media) == Photo: - media.set("src", UrlUtil.twitter_best_quality_url(tweet_media.fullUrl)) - mimetype = "image/jpeg" - else: - logger.warning(f"Could not get media URL of {tweet_media}") - continue - ext = mimetypes.guess_extension(mimetype) - media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}') - result.add_media(media) - - return result.success("twitter-snscrape") - def download_syndication(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]: """ Hack alternative working again. From 8c044c15f09459c5c4e5feba7f0e81bf9eabe073 Mon Sep 17 00:00:00 2001 From: Patrick Robertson Date: Tue, 7 Jan 2025 19:43:20 +0100 Subject: [PATCH 2/2] Add base test class for archivers with boilerplate code Plus: create test class for twitter archiver. Currently WIP --- tests/archivers/test_archiver_base.py | 22 ++++++ tests/archivers/test_bluesky_archiver.py | 31 +++----- tests/archivers/test_twitter_archiver.py | 95 ++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 21 deletions(-) create mode 100644 tests/archivers/test_archiver_base.py create mode 100644 tests/archivers/test_twitter_archiver.py diff --git a/tests/archivers/test_archiver_base.py b/tests/archivers/test_archiver_base.py new file mode 100644 index 0000000..c64999f --- /dev/null +++ b/tests/archivers/test_archiver_base.py @@ -0,0 +1,22 @@ +from auto_archiver.core import Metadata + +class TestArchiverBase(object): + + archiver_class = None + config = None + + def setUp(self): + assert self.archiver_class is not None, "self.archiver_class must be set on the subclass" + assert self.config is not None, "self.config must be a dict set on the subclass" + self.archiver = self.archiver_class(self.config) + + def create_item(self, url, **kwargs): + item = Metadata().set_url(url) + for key, value in kwargs.items(): + item.set(key, value) + return item + + def assertValidResponseMetadata(self, test_response, title, timestamp): + self.assertTrue(test_response.is_success()) + self.assertEqual(title, test_response.get_title()) + self.assertTrue(timestamp, test_response.get("timestamp")) diff --git a/tests/archivers/test_bluesky_archiver.py b/tests/archivers/test_bluesky_archiver.py index cb6d878..06189ad 100644 --- a/tests/archivers/test_bluesky_archiver.py +++ b/tests/archivers/test_bluesky_archiver.py @@ -1,7 +1,8 @@ from auto_archiver.archivers.bluesky_archiver import BlueskyArchiver +from .test_archiver_base import TestArchiverBase import unittest -class TestBlueskyArchiver(unittest.TestCase): +class TestBlueskyArchiver(TestArchiverBase, unittest.TestCase): """Tests Bluesky Archiver Note that these tests will download API responses from the bluesky API, so they may be slow. @@ -9,24 +10,12 @@ class TestBlueskyArchiver(unittest.TestCase): and also test the archiver's ability to download media. """ - # def _download_bsky_embeds(self, post): - # # method to override actual method, and monkey patch requests.get so as to not actually download - # # the media files - # old_requests_get = requests.get - # def mock_requests_get(*args, **kwargs): - # return {"status_code": 200, "json": lambda: {"data": "fake data"}} - # requests.get = mock_requests_get - # media = self.bsky._download_bsky_embeds(post) - # requests.get = old_requests_get - # return media + archiver_class = BlueskyArchiver + config = {} - def setUp(self): - self.bsky = BlueskyArchiver({}) - return super().setUp() - def test_download_media_with_images(self): # url https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y - post = self.bsky._get_post_from_uri("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y") + post = self.archiver._get_post_from_uri("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y") # just make sure bsky haven't changed their format, images should be under "record/embed/media/images" # there should be 2 images @@ -37,7 +26,7 @@ class TestBlueskyArchiver(unittest.TestCase): self.assertEqual(len(post["record"]["embed"]["media"]["images"]), 2) # try downloading the media files - media = self.bsky._download_bsky_embeds(post) + media = self.archiver._download_bsky_embeds(post) self.assertEqual(len(media), 2) # check the IDs @@ -46,7 +35,7 @@ class TestBlueskyArchiver(unittest.TestCase): def test_download_post_with_single_image(self): # url https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l - post = self.bsky._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l") + post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l") # just make sure bsky haven't changed their format, images should be under "record/embed/images" # there should be 1 image @@ -55,7 +44,7 @@ class TestBlueskyArchiver(unittest.TestCase): self.assertTrue("images" in post["record"]["embed"]) self.assertEqual(len(post["record"]["embed"]["images"]), 1) - media = self.bsky._download_bsky_embeds(post) + media = self.archiver._download_bsky_embeds(post) self.assertEqual(len(media), 1) # check the ID @@ -64,14 +53,14 @@ class TestBlueskyArchiver(unittest.TestCase): def test_download_post_with_video(self): # url https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i - post = self.bsky._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i") + post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i") # just make sure bsky haven't changed their format, video should be under "record/embed/video" self.assertTrue("record" in post) self.assertTrue("embed" in post["record"]) self.assertTrue("video" in post["record"]["embed"]) - media = self.bsky._download_bsky_embeds(post) + media = self.archiver._download_bsky_embeds(post) self.assertEqual(len(media), 1) # check the ID diff --git a/tests/archivers/test_twitter_archiver.py b/tests/archivers/test_twitter_archiver.py new file mode 100644 index 0000000..e7e015c --- /dev/null +++ b/tests/archivers/test_twitter_archiver.py @@ -0,0 +1,95 @@ +import unittest +import datetime + +from auto_archiver.archivers.twitter_archiver import TwitterArchiver + +from .test_archiver_base import TestArchiverBase + + +class TestTwitterArchiver(TestArchiverBase, unittest.TestCase): + + archiver_class = TwitterArchiver + config = {} + + def test_sanitize_url(self): + + # should expand t.co URLs + t_co_url = "https://t.co/yl3oOJatFp" + t_co_resolved_url = "https://www.bellingcat.com/category/resources/" + self.assertEqual(t_co_resolved_url, self.archiver.sanitize_url(t_co_url)) + + # shouldn't alter valid x URLs + x_url = "https://x.com/bellingcat/status/1874097816571961839" + self.assertEqual(x_url, self.archiver.sanitize_url(x_url)) + + # shouldn't alter valid twitter.com URLs + twitter_url = "https://twitter.com/bellingcat/status/1874097816571961839" + self.assertEqual(twitter_url, self.archiver.sanitize_url(twitter_url)) + + # should strip tracking params + tracking_url = "https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w" + self.assertEqual("https://twitter.com/bellingcat/status/1874097816571961839", self.archiver.sanitize_url(tracking_url)) + + # shouldn't alter non-twitter/x URLs + test_url = "https://www.bellingcat.com/category/resources/" + self.assertEqual(test_url, self.archiver.sanitize_url(test_url)) + + # shouldn't strip params from non-twitter/x URLs + test_url = "https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w" + self.assertEqual(test_url, self.archiver.sanitize_url(test_url)) + + + def test_get_username_tweet_id_from_url(self): + + # test valid twitter URL + url = "https://twitter.com/bellingcat/status/1874097816571961839" + username, tweet_id = self.archiver.get_username_tweet_id(url) + self.assertEqual("bellingcat", username) + self.assertEqual("1874097816571961839", tweet_id) + + # test valid x URL + url = "https://x.com/bellingcat/status/1874097816571961839" + username, tweet_id = self.archiver.get_username_tweet_id(url) + self.assertEqual("bellingcat", username) + self.assertEqual("1874097816571961839", tweet_id) + + # test invalid URL + # TODO: should this return None, False or raise an exception? Right now it returns False + url = "https://www.bellingcat.com/category/resources/" + username, tweet_id = self.archiver.get_username_tweet_id(url) + self.assertFalse(username) + self.assertFalse(tweet_id) + + def test_youtube_dlp_archiver(self): + + url = "https://x.com/bellingcat/status/1874097816571961839" + post = self.archiver.download_yt_dlp(self.create_item(url), url, "1874097816571961839") + self.assertTrue(post) + self.assertValidResponseMetadata( + post, + "As 2024 comes to a close, here’s some examples of what Bellingcat investigated per month in our 10th year! 🧵", + datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc) + ) + breakpoint() + + + def test_download_media_with_images(self): + # url https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w + + post = self.archiver.download() + + # just make sure twitter haven't changed their format, images should be under "record/embed/media/images" + # there should be 2 images + self.assertTrue("record" in post) + self.assertTrue("embed" in post["record"]) + self.assertTrue("media" in post["record"]["embed"]) + self.assertTrue("images" in post["record"]["embed"]["media"]) + self.assertEqual(len(post["record"]["embed"]["media"]["images"]), 2) + + # try downloading the media files + media = self.archiver.download(post) + self.assertEqual(len(media), 2) + + # check the IDs + self.assertTrue("bafkreiflrkfihcvwlhka5tb2opw2qog6gfvywsdzdlibveys2acozh75tq" in media[0].get('src')) + self.assertTrue("bafkreibsprmwchf7r6xcstqkdvvuj3ijw7efciw7l3y4crxr4cmynseo7u" in media[1].get('src')) \ No newline at end of file