mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-12 13:18:28 +03:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a455728673 | ||
|
|
8d4357a22c | ||
|
|
cf8691bad7 |
@@ -54,8 +54,9 @@ class InstagramTbotArchiver(Archiver):
|
|||||||
|
|
||||||
def cleanup(self) -> None:
|
def cleanup(self) -> None:
|
||||||
logger.info(f"CLEANUP {self.name}.")
|
logger.info(f"CLEANUP {self.name}.")
|
||||||
if os.path.exists(self.session_file):
|
session_file_name = self.session_file + ".session"
|
||||||
os.remove(self.session_file)
|
if os.path.exists(session_file_name):
|
||||||
|
os.remove(session_file_name)
|
||||||
|
|
||||||
def download(self, item: Metadata) -> Metadata:
|
def download(self, item: Metadata) -> Metadata:
|
||||||
url = item.get_url()
|
url = item.get_url()
|
||||||
|
|||||||
@@ -101,8 +101,9 @@ class TelethonArchiver(Archiver):
|
|||||||
|
|
||||||
def cleanup(self) -> None:
|
def cleanup(self) -> None:
|
||||||
logger.info(f"CLEANUP {self.name}.")
|
logger.info(f"CLEANUP {self.name}.")
|
||||||
if os.path.exists(self.session_file):
|
session_file_name = self.session_file + ".session"
|
||||||
os.remove(self.session_file)
|
if os.path.exists(session_file_name):
|
||||||
|
os.remove(session_file_name)
|
||||||
|
|
||||||
def download(self, item: Metadata) -> Metadata:
|
def download(self, item: Metadata) -> Metadata:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -2,6 +2,8 @@ import re, requests, mimetypes, json
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo
|
from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo
|
||||||
|
from yt_dlp import YoutubeDL
|
||||||
|
from yt_dlp.extractor.twitter import TwitterIE
|
||||||
from slugify import slugify
|
from slugify import slugify
|
||||||
|
|
||||||
from . import Archiver
|
from . import Archiver
|
||||||
@@ -98,7 +100,9 @@ class TwitterArchiver(Archiver):
|
|||||||
|
|
||||||
hack_url = f"https://cdn.syndication.twimg.com/tweet-result?id={tweet_id}"
|
hack_url = f"https://cdn.syndication.twimg.com/tweet-result?id={tweet_id}"
|
||||||
r = requests.get(hack_url)
|
r = requests.get(hack_url)
|
||||||
if r.status_code != 200: return False
|
if r.status_code != 200 or r.json()=={}:
|
||||||
|
logger.warning(f"Failed to get tweet information from {hack_url}, trying ytdl")
|
||||||
|
return self.download_ytdl(item, url, tweet_id)
|
||||||
tweet = r.json()
|
tweet = r.json()
|
||||||
|
|
||||||
urls = []
|
urls = []
|
||||||
@@ -108,7 +112,7 @@ class TwitterArchiver(Archiver):
|
|||||||
# 1 tweet has 1 video max
|
# 1 tweet has 1 video max
|
||||||
if "video" in tweet:
|
if "video" in tweet:
|
||||||
v = tweet["video"]
|
v = tweet["video"]
|
||||||
urls.append(self.choose_variant(v.get("variants", [])))
|
urls.append(self.choose_variant(v.get("variants", []))['url'])
|
||||||
|
|
||||||
logger.debug(f"Twitter hack got {urls=}")
|
logger.debug(f"Twitter hack got {urls=}")
|
||||||
|
|
||||||
@@ -125,6 +129,38 @@ class TwitterArchiver(Archiver):
|
|||||||
|
|
||||||
result.set_title(tweet.get("text")).set_content(json.dumps(tweet, ensure_ascii=False)).set_timestamp(datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ"))
|
result.set_title(tweet.get("text")).set_content(json.dumps(tweet, ensure_ascii=False)).set_timestamp(datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ"))
|
||||||
return result.success("twitter-hack")
|
return result.success("twitter-hack")
|
||||||
|
|
||||||
|
def download_ytdl(self, item: Metadata, url:str, tweet_id:str) -> Metadata:
|
||||||
|
downloader = YoutubeDL()
|
||||||
|
tie = TwitterIE(downloader)
|
||||||
|
tweet = tie._extract_status(tweet_id)
|
||||||
|
result = Metadata()
|
||||||
|
result\
|
||||||
|
.set_title(tweet.get('full_text', ''))\
|
||||||
|
.set_content(json.dumps(tweet, ensure_ascii=False))\
|
||||||
|
.set_timestamp(datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y"))
|
||||||
|
if not tweet.get("entities", {}).get("media"):
|
||||||
|
logger.debug('No media found, archiving tweet text only')
|
||||||
|
return result
|
||||||
|
for i, tw_media in enumerate(tweet["entities"]["media"]):
|
||||||
|
media = Media(filename="")
|
||||||
|
mimetype = ""
|
||||||
|
if tw_media["type"] == "photo":
|
||||||
|
media.set("src", UrlUtil.twitter_best_quality_url(tw_media['media_url_https']))
|
||||||
|
mimetype = "image/jpeg"
|
||||||
|
elif tw_media["type"] == "video":
|
||||||
|
variant = self.choose_variant(tw_media['video_info']['variants'])
|
||||||
|
media.set("src", variant['url'])
|
||||||
|
mimetype = variant['content_type']
|
||||||
|
elif tw_media["type"] == "animated_gif":
|
||||||
|
variant = tw_media['video_info']['variants'][0]
|
||||||
|
media.set("src", variant['url'])
|
||||||
|
mimetype = variant['content_type']
|
||||||
|
ext = mimetypes.guess_extension(mimetype)
|
||||||
|
media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}', item)
|
||||||
|
result.add_media(media)
|
||||||
|
return result.success("twitter-ytdl")
|
||||||
|
|
||||||
|
|
||||||
def get_username_tweet_id(self, url):
|
def get_username_tweet_id(self, url):
|
||||||
# detect URLs that we definitely cannot handle
|
# detect URLs that we definitely cannot handle
|
||||||
@@ -140,13 +176,13 @@ class TwitterArchiver(Archiver):
|
|||||||
# choosing the highest quality possible
|
# choosing the highest quality possible
|
||||||
variant, width, height = None, 0, 0
|
variant, width, height = None, 0, 0
|
||||||
for var in variants:
|
for var in variants:
|
||||||
if var.get("type", "") == "video/mp4":
|
if var.get("content_type", "") == "video/mp4":
|
||||||
width_height = re.search(r"\/(\d+)x(\d+)\/", var["src"])
|
width_height = re.search(r"\/(\d+)x(\d+)\/", var["url"])
|
||||||
if width_height:
|
if width_height:
|
||||||
w, h = int(width_height[1]), int(width_height[2])
|
w, h = int(width_height[1]), int(width_height[2])
|
||||||
if w > width or h > height:
|
if w > width or h > height:
|
||||||
width, height = w, h
|
width, height = w, h
|
||||||
variant = var.get("src", variant)
|
variant = var
|
||||||
else:
|
else:
|
||||||
variant = var.get("src") if not variant else variant
|
variant = var if not variant else variant
|
||||||
return variant
|
return variant
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ _MAJOR = "0"
|
|||||||
_MINOR = "11"
|
_MINOR = "11"
|
||||||
# On main and in a nightly release the patch should be one ahead of the last
|
# On main and in a nightly release the patch should be one ahead of the last
|
||||||
# released build.
|
# released build.
|
||||||
_PATCH = "1"
|
_PATCH = "3"
|
||||||
# This is mainly for nightly builds which have the suffix ".dev$DATE". See
|
# This is mainly for nightly builds which have the suffix ".dev$DATE". See
|
||||||
# https://semver.org/#is-v123-a-semantic-version for the semantics.
|
# https://semver.org/#is-v123-a-semantic-version for the semantics.
|
||||||
_SUFFIX = ""
|
_SUFFIX = ""
|
||||||
|
|||||||
Reference in New Issue
Block a user