diff --git a/src/auto_archiver/modules/generic_extractor/twitter.py b/src/auto_archiver/modules/generic_extractor/twitter.py index 0141e1b..a93f363 100644 --- a/src/auto_archiver/modules/generic_extractor/twitter.py +++ b/src/auto_archiver/modules/generic_extractor/twitter.py @@ -9,6 +9,8 @@ from auto_archiver.utils import url as UrlUtil, get_datetime_from_str from auto_archiver.core.extractor import Extractor from auto_archiver.utils.deletion_detection import detect_deletion, flag_as_deleted from auto_archiver.modules.generic_extractor.dropin import GenericDropin, InfoExtractor +import requests +from retrying import retry class Twitter(GenericDropin): @@ -29,7 +31,85 @@ class Twitter(GenericDropin): def extract_post(self, url: str, ie_instance: InfoExtractor): twid = ie_instance._match_valid_url(url).group("id") - return ie_instance._extract_status(twid=twid) + try: + post_data = ie_instance._extract_status(twid=twid) + if not post_data or not post_data.get("user") or not post_data.get("created_at"): + raise ValueError("Error retrieving post with twitter dropin") + return post_data + except Exception as e: + logger.debug(f"yt-dlp twitter extraction failed: {e}") + # try fxtwitter API as fallback + return self._fetch_fxtwitter(twid) + + def _fetch_fxtwitter(self, twid: str) -> dict: + """Fetch tweet data from fxtwitter API and convert to expected format.""" + fxtwitter_url = f"https://api.fxtwitter.com/status/{twid}" + logger.info(f"Falling back to fxtwitter API for tweet extraction: {fxtwitter_url}") + + @retry(wait_random_min=500, wait_random_max=2000, stop_max_attempt_number=3) + def fetch_fxtwitter_data(url): + headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/112.0"} + resp = requests.get(url, headers=headers, timeout=15) + if resp.status_code != 200: + raise ValueError(f"Failed to retrieve tweet from fxtwitter API: {resp.status_code}") + data = resp.json() + if "tweet" not in data: + raise ValueError(f"No tweet data in fxtwitter response: {data.get('message', 'Unknown error')}") + return data["tweet"] + + tweet = fetch_fxtwitter_data(fxtwitter_url) + + # Convert fxtwitter format to expected format + author = tweet.get("author", {}).get("name", "") + created_at = tweet.get("created_at", "") # Format: "Sun Feb 08 18:45:00 +0000 2026" + full_text = tweet.get("text", "") or tweet.get("raw_text", "") + + # Convert media format + media = [] + fx_media = tweet.get("media", {}) + + # Handle photos + for photo in fx_media.get("photos", []): + media.append({"type": "photo", "media_url_https": photo.get("url", "")}) + + # Handle videos + for video in fx_media.get("videos", []): + variants = video.get("variants", []) + # Convert to expected variant format + converted_variants = [] + for var in variants: + converted_variants.append( + { + "url": var.get("url", ""), + "content_type": var.get("content_type", "video/mp4"), + "bitrate": var.get("bitrate", 0), + } + ) + if converted_variants: + media.append({"type": "video", "video_info": {"variants": converted_variants}}) + + # Handle animated gifs (fxtwitter may include these in videos) + for item in fx_media.get("all", []): + if item.get("type") == "gif": + variants = item.get("variants", []) + converted_variants = [] + for var in variants: + converted_variants.append( + { + "url": var.get("url", ""), + "content_type": var.get("content_type", "video/mp4"), + "bitrate": var.get("bitrate", 0), + } + ) + if converted_variants: + media.append({"type": "animated_gif", "video_info": {"variants": converted_variants}}) + + return { + "user": {"name": author}, + "created_at": created_at, + "full_text": full_text, + "entities": {"media": media}, + } def keys_to_clean(self, video_data, info_extractor): return ["user", "created_at", "entities", "favorited", "translator_type"] diff --git a/tests/extractors/test_twitter_dropin.py b/tests/extractors/test_twitter_dropin.py new file mode 100644 index 0000000..f3082b9 --- /dev/null +++ b/tests/extractors/test_twitter_dropin.py @@ -0,0 +1,238 @@ +""" +Tests for the Twitter dropin extractor with fxtwitter fallback +""" + +import pytest +from unittest.mock import Mock, patch + +from auto_archiver.modules.generic_extractor.twitter import Twitter + + +@pytest.fixture +def twitter_dropin(): + return Twitter() + + +class TestTwitterFxTwitterFallback: + """Test the fxtwitter API fallback functionality.""" + + @pytest.fixture + def mock_fxtwitter_video_response(self): + return { + "code": 200, + "message": "OK", + "tweet": { + "url": "https://x.com/user/status/123456789", + "id": "123456789", + "text": "Test tweet with video", + "author": { + "id": "111", + "name": "Test User", + "screen_name": "testuser", + }, + "created_at": "Sun Feb 08 18:45:00 +0000 2026", + "media": { + "all": [ + { + "type": "video", + "url": "https://video.twimg.com/test.mp4", + "variants": [ + {"url": "https://video.twimg.com/test.m3u8", "content_type": "application/x-mpegURL"}, + { + "url": "https://video.twimg.com/test_480.mp4", + "content_type": "video/mp4", + "bitrate": 632000, + }, + { + "url": "https://video.twimg.com/test_720.mp4", + "content_type": "video/mp4", + "bitrate": 2176000, + }, + ], + } + ], + "videos": [ + { + "url": "https://video.twimg.com/test.mp4", + "variants": [ + {"url": "https://video.twimg.com/test.m3u8", "content_type": "application/x-mpegURL"}, + { + "url": "https://video.twimg.com/test_480.mp4", + "content_type": "video/mp4", + "bitrate": 632000, + }, + { + "url": "https://video.twimg.com/test_720.mp4", + "content_type": "video/mp4", + "bitrate": 2176000, + }, + ], + } + ], + }, + }, + } + + @pytest.fixture + def mock_fxtwitter_photo_response(self): + return { + "code": 200, + "message": "OK", + "tweet": { + "url": "https://x.com/user/status/123456790", + "id": "123456790", + "text": "Test tweet with photo", + "author": { + "id": "111", + "name": "Test User", + "screen_name": "testuser", + }, + "created_at": "Mon Feb 09 10:30:00 +0000 2026", + "media": { + "all": [ + { + "type": "photo", + "url": "https://pbs.twimg.com/media/test.jpg?name=orig", + } + ], + "photos": [ + { + "type": "photo", + "url": "https://pbs.twimg.com/media/test.jpg?name=orig", + } + ], + }, + }, + } + + def test_fetch_fxtwitter_video(self, twitter_dropin, mock_fxtwitter_video_response): + """Test fetching a tweet with video via fxtwitter API.""" + with patch("requests.get") as mock_get: + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = mock_fxtwitter_video_response + mock_get.return_value = mock_response + + result = twitter_dropin._fetch_fxtwitter("123456789") + + assert result["user"]["name"] == "Test User" + assert result["created_at"] == "Sun Feb 08 18:45:00 +0000 2026" + assert result["full_text"] == "Test tweet with video" + assert len(result["entities"]["media"]) == 1 + assert result["entities"]["media"][0]["type"] == "video" + assert "video_info" in result["entities"]["media"][0] + assert len(result["entities"]["media"][0]["video_info"]["variants"]) == 3 + + def test_fetch_fxtwitter_photo(self, twitter_dropin, mock_fxtwitter_photo_response): + """Test fetching a tweet with photo via fxtwitter API.""" + with patch("requests.get") as mock_get: + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = mock_fxtwitter_photo_response + mock_get.return_value = mock_response + + result = twitter_dropin._fetch_fxtwitter("123456790") + + assert result["user"]["name"] == "Test User" + assert result["created_at"] == "Mon Feb 09 10:30:00 +0000 2026" + assert result["full_text"] == "Test tweet with photo" + assert len(result["entities"]["media"]) == 1 + assert result["entities"]["media"][0]["type"] == "photo" + assert result["entities"]["media"][0]["media_url_https"] == "https://pbs.twimg.com/media/test.jpg?name=orig" + + def test_fetch_fxtwitter_no_media(self, twitter_dropin): + """Test fetching a text-only tweet via fxtwitter API.""" + mock_response_data = { + "code": 200, + "message": "OK", + "tweet": { + "id": "123456791", + "text": "Just text, no media", + "author": {"name": "Text Only User"}, + "created_at": "Tue Feb 10 12:00:00 +0000 2026", + "media": {}, + }, + } + with patch("requests.get") as mock_get: + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = mock_response_data + mock_get.return_value = mock_response + + result = twitter_dropin._fetch_fxtwitter("123456791") + + assert result["user"]["name"] == "Text Only User" + assert result["full_text"] == "Just text, no media" + assert result["entities"]["media"] == [] + + def test_fetch_fxtwitter_api_error(self, twitter_dropin): + """Test handling of fxtwitter API errors.""" + with patch("requests.get") as mock_get: + mock_response = Mock() + mock_response.status_code = 404 + mock_get.return_value = mock_response + + with pytest.raises(Exception): + twitter_dropin._fetch_fxtwitter("nonexistent") + + +class TestTwitterChooseVariant: + """Test the video variant selection logic.""" + + def test_choose_highest_quality_video(self, twitter_dropin): + """Test that the highest quality video variant is selected.""" + variants = [ + {"url": "https://video.twimg.com/vid/320x240/test.mp4", "content_type": "video/mp4"}, + {"url": "https://video.twimg.com/vid/1280x720/test.mp4", "content_type": "video/mp4"}, + {"url": "https://video.twimg.com/vid/640x480/test.mp4", "content_type": "video/mp4"}, + ] + + result = twitter_dropin.choose_variant(variants) + + assert result["url"] == "https://video.twimg.com/vid/1280x720/test.mp4" + + def test_choose_variant_fallback_for_non_mp4(self, twitter_dropin): + """Test fallback when no mp4 variant is available.""" + variants = [ + {"url": "https://video.twimg.com/test.m3u8", "content_type": "application/x-mpegURL"}, + ] + + result = twitter_dropin.choose_variant(variants) + + assert result["url"] == "https://video.twimg.com/test.m3u8" + + def test_choose_variant_prefers_mp4(self, twitter_dropin): + """Test that mp4 is preferred over other formats when quality is equal.""" + variants = [ + {"url": "https://video.twimg.com/test.m3u8", "content_type": "application/x-mpegURL"}, + {"url": "https://video.twimg.com/vid/1280x720/test.mp4", "content_type": "video/mp4"}, + ] + + result = twitter_dropin.choose_variant(variants) + + assert result["content_type"] == "video/mp4" + + +@pytest.mark.download +class TestTwitterFxTwitterLive: + """Live integration tests for fxtwitter API - requires network access.""" + + @pytest.mark.parametrize( + "tweet_id,expected_media_type", + [ + ("2020569571682312581", "video"), # Video tweet + ("2020410438198890618", "video"), # Video tweet + ("2020341585502957801", "photo"), # Photo tweet + ], + ) + def test_fetch_real_tweets(self, twitter_dropin, tweet_id, expected_media_type): + """Test fetching real tweets from fxtwitter API.""" + result = twitter_dropin._fetch_fxtwitter(tweet_id) + + assert result["user"]["name"] # Author should be non-empty + assert result["created_at"] # Should have timestamp + assert result["full_text"] # Should have text content + + media = result["entities"]["media"] + assert len(media) >= 1 + assert media[0]["type"] == expected_media_type