fxtwitter working instead of nitter

This commit is contained in:
msramalho
2026-03-02 12:31:28 +00:00
parent 139d647197
commit bc66dd4f2a
2 changed files with 305 additions and 44 deletions

View File

@@ -10,7 +10,6 @@ from auto_archiver.core.extractor import Extractor
from auto_archiver.utils.deletion_detection import detect_deletion, flag_as_deleted
from auto_archiver.modules.generic_extractor.dropin import GenericDropin, InfoExtractor
import requests
from bs4 import BeautifulSoup
from retrying import retry
@@ -37,56 +36,80 @@ class Twitter(GenericDropin):
if not post_data or not post_data.get("user") or not post_data.get("created_at"):
raise ValueError("Error retrieving post with twitter dropin")
return post_data
except Exception:
# try nitter
nitter_url = f"https://nitter.net/i/status/{twid}"
# nitter_url = f"https://nitter.space/i/status/{twid}"
logger.info(f"Falling back to nitter.net for tweet extraction at {nitter_url}")
except Exception as e:
logger.debug(f"yt-dlp twitter extraction failed: {e}")
# try fxtwitter API as fallback
return self._fetch_fxtwitter(twid)
@retry(wait_random_min=500, wait_random_max=2000, stop_max_attempt_number=3)
def fetch_nitter_soup(url):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/112.0"
}
resp = requests.get(url, headers=headers, timeout=10)
if resp.status_code != 200:
raise ValueError("Failed to retrieve tweet from nitter.net")
logger.error(resp.text)
soup = BeautifulSoup(resp.text, "html.parser")
tweet_container = soup.find("div", {"class": "main-tweet"})
if not tweet_container:
raise ValueError("Could not find tweet container on nitter.net page")
return tweet_container
def _fetch_fxtwitter(self, twid: str) -> dict:
"""Fetch tweet data from fxtwitter API and convert to expected format."""
fxtwitter_url = f"https://api.fxtwitter.com/status/{twid}"
logger.info(f"Falling back to fxtwitter API for tweet extraction: {fxtwitter_url}")
tweet_container = fetch_nitter_soup(nitter_url)
user = tweet_container.find("a", {"class": "username"})
author = user.text.strip() if user else ""
created_at = tweet_container.find("span", {"class": "tweet-date"})
timestamp = created_at.find("a")["title"] if created_at and created_at.find("a") else ""
@retry(wait_random_min=500, wait_random_max=2000, stop_max_attempt_number=3)
def fetch_fxtwitter_data(url):
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/112.0"}
resp = requests.get(url, headers=headers, timeout=15)
if resp.status_code != 200:
raise ValueError(f"Failed to retrieve tweet from fxtwitter API: {resp.status_code}")
data = resp.json()
if "tweet" not in data:
raise ValueError(f"No tweet data in fxtwitter response: {data.get('message', 'Unknown error')}")
return data["tweet"]
full_text = tweet_container.find("div", {"class": "tweet-content"})
text = full_text.text.strip() if full_text else ""
tweet = fetch_fxtwitter_data(fxtwitter_url)
media = []
media_tags = tweet_container.find_all("a", {"class": "still-image"})
for m in media_tags:
img_url = m["href"]
if img_url.startswith("/"):
img_url = "https://nitter.net" + img_url
media.append({"type": "photo", "media_url_https": img_url})
# Convert fxtwitter format to expected format
author = tweet.get("author", {}).get("name", "")
created_at = tweet.get("created_at", "") # Format: "Sun Feb 08 18:45:00 +0000 2026"
full_text = tweet.get("text", "") or tweet.get("raw_text", "")
video_tags = tweet_container.find_all("video")
for v in video_tags:
src = v.find("source")
if src and src.get("src"):
video_url = src["src"]
if video_url.startswith("/"):
video_url = "https://nitter.net" + video_url
media.append(
{"type": "video", "video_info": {"variants": [{"url": video_url, "content_type": "video/mp4"}]}}
# Convert media format
media = []
fx_media = tweet.get("media", {})
# Handle photos
for photo in fx_media.get("photos", []):
media.append({"type": "photo", "media_url_https": photo.get("url", "")})
# Handle videos
for video in fx_media.get("videos", []):
variants = video.get("variants", [])
# Convert to expected variant format
converted_variants = []
for var in variants:
converted_variants.append(
{
"url": var.get("url", ""),
"content_type": var.get("content_type", "video/mp4"),
"bitrate": var.get("bitrate", 0),
}
)
if converted_variants:
media.append({"type": "video", "video_info": {"variants": converted_variants}})
# Handle animated gifs (fxtwitter may include these in videos)
for item in fx_media.get("all", []):
if item.get("type") == "gif":
variants = item.get("variants", [])
converted_variants = []
for var in variants:
converted_variants.append(
{
"url": var.get("url", ""),
"content_type": var.get("content_type", "video/mp4"),
"bitrate": var.get("bitrate", 0),
}
)
if converted_variants:
media.append({"type": "animated_gif", "video_info": {"variants": converted_variants}})
return {"user": {"name": author}, "created_at": timestamp, "full_text": text, "entities": {"media": media}}
return {
"user": {"name": author},
"created_at": created_at,
"full_text": full_text,
"entities": {"media": media},
}
def keys_to_clean(self, video_data, info_extractor):
return ["user", "created_at", "entities", "favorited", "translator_type"]