refactors free twitter archiver strategies (#142)

This commit is contained in:
Miguel Sozinho Ramalho
2024-05-14 16:23:33 +01:00
committed by GitHub
parent 012cc36609
commit f8824691dd
2 changed files with 100 additions and 16 deletions

View File

@@ -1,4 +1,5 @@
import re, requests, mimetypes, json
from typing import Union
from datetime import datetime
from loguru import logger
from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo
@@ -31,7 +32,7 @@ class TwitterArchiver(Archiver):
# expand URL if t.co and clean tracker GET params
if 'https://t.co/' in url:
try:
r = requests.get(url)
r = requests.get(url, timeout=30)
logger.debug(f'Expanded url {url} to {r.url}')
url = r.url
except:
@@ -45,19 +46,31 @@ class TwitterArchiver(Archiver):
can handle private/public channels
"""
url = item.get_url()
# detect URLs that we definitely cannot handle
username, tweet_id = self.get_username_tweet_id(url)
if not username: return False
result = Metadata()
strategies = [self.download_yt_dlp, self.download_snscrape, self.download_syndication]
for strategy in strategies:
logger.debug(f"Trying {strategy.__name__} for {url=}")
try:
result = strategy(item, url, tweet_id)
if result: return result
except Exception as ex:
logger.error(f"Failed to download {url} with {strategy.__name__}: {type(ex).__name__} occurred. args: {ex.args}")
logger.warning(f"No free strategy worked for {url}")
return False
def download_snscrape(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]:
scr = TwitterTweetScraper(tweet_id)
try:
tweet = next(scr.get_items())
except Exception as ex:
logger.warning(f"can't get tweet: {type(ex).__name__} occurred. args: {ex.args}")
return self.download_alternative(item, url, tweet_id)
logger.warning(f"SNSCRAPE FAILED, can't get tweet: {type(ex).__name__} occurred. args: {ex.args}")
return False
result = Metadata()
result.set_title(tweet.content).set_content(tweet.json()).set_timestamp(tweet.date)
if tweet.media is None:
logger.debug(f'No media found, archiving tweet text only')
@@ -87,7 +100,7 @@ class TwitterArchiver(Archiver):
return result.success("twitter-snscrape")
def download_alternative(self, item: Metadata, url: str, tweet_id: str) -> Metadata:
def download_syndication(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]:
"""
Hack alternative working again.
https://stackoverflow.com/a/71867055/6196010 (OUTDATED URL)
@@ -95,14 +108,13 @@ class TwitterArchiver(Archiver):
next to test: https://cdn.embedly.com/widgets/media.html?&schema=twitter&url=https://twitter.com/bellingcat/status/1674700676612386816
"""
logger.debug(f"Trying twitter hack for {url=}")
result = Metadata()
hack_url = f"https://cdn.syndication.twimg.com/tweet-result?id={tweet_id}"
r = requests.get(hack_url)
if r.status_code != 200 or r.json()=={}:
logger.warning(f"Failed to get tweet information from {hack_url}, trying ytdl")
return self.download_ytdl(item, url, tweet_id)
logger.warning(f"SyndicationHack: Failed to get tweet information from {hack_url}.")
return False
result = Metadata()
tweet = r.json()
urls = []
@@ -128,9 +140,9 @@ class TwitterArchiver(Archiver):
result.add_media(media)
result.set_title(tweet.get("text")).set_content(json.dumps(tweet, ensure_ascii=False)).set_timestamp(datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ"))
return result.success("twitter-hack")
def download_ytdl(self, item: Metadata, url:str, tweet_id:str) -> Metadata:
return result.success("twitter-syndication")
def download_yt_dlp(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]:
downloader = YoutubeDL()
tie = TwitterIE(downloader)
tweet = tie._extract_status(tweet_id)
@@ -141,6 +153,7 @@ class TwitterArchiver(Archiver):
.set_timestamp(datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y"))
if not tweet.get("entities", {}).get("media"):
logger.debug('No media found, archiving tweet text only')
result.status = "twitter-ytdl"
return result
for i, tw_media in enumerate(tweet["entities"]["media"]):
media = Media(filename="")
@@ -160,7 +173,6 @@ class TwitterArchiver(Archiver):
media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}', item)
result.add_media(media)
return result.success("twitter-ytdl")
def get_username_tweet_id(self, url):
# detect URLs that we definitely cannot handle