Merge pull request #175 from bellingcat/youtubedlp-rewrite

Create generic archiver for all valid youtube-dl URLs, add truthsocial extractor, unit tests for twitter_api extractor, utility methods for cleaning HTML and traversing objects
This commit is contained in:
Patrick Robertson
2025-01-21 17:33:39 +01:00
committed by GitHub
27 changed files with 1051 additions and 778 deletions

View File

@@ -35,4 +35,6 @@ jobs:
run: poetry install --no-interaction --with dev
- name: Run Core Tests
run: poetry run pytest -ra -v -m "not download"
run: |
poetry run auto-archiver --version || true
poetry run pytest -ra -v -m "not download"

View File

@@ -35,4 +35,6 @@ jobs:
run: poetry install --no-interaction --with dev
- name: Run Download Tests
run: poetry run pytest -ra -v -m "download"
run: poetry run pytest -ra -v -x -m "download"
env:
TWITTER_BEARER_TOKEN: ${{ secrets.TWITTER_BEARER_TOKEN }}

73
poetry.lock generated
View File

@@ -197,7 +197,7 @@ description = "Python bindings for the Brotli compression library"
optional = false
python-versions = "*"
groups = ["main"]
markers = "implementation_name == \"cpython\" or platform_python_implementation >= \"CPython\""
markers = "platform_python_implementation >= \"CPython\""
files = [
{file = "Brotli-1.1.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:e1140c64812cb9b06c922e77f1c26a75ec5e3f0fb2bf92cc8c58720dec276752"},
{file = "Brotli-1.1.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c8fd5270e906eef71d4a8d19b7c6a43760c6abcfcc10c9101d14eb2357418de9"},
@@ -326,47 +326,6 @@ files = [
{file = "Brotli-1.1.0.tar.gz", hash = "sha256:81de08ac11bcb85841e440c13611c00b67d3bf82698314928d0b676362546724"},
]
[[package]]
name = "brotlicffi"
version = "1.1.0.0"
description = "Python CFFI bindings to the Brotli library"
optional = false
python-versions = ">=3.7"
groups = ["main"]
markers = "implementation_name != \"cpython\""
files = [
{file = "brotlicffi-1.1.0.0-cp37-abi3-macosx_10_9_x86_64.whl", hash = "sha256:9b7ae6bd1a3f0df532b6d67ff674099a96d22bc0948955cb338488c31bfb8851"},
{file = "brotlicffi-1.1.0.0-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:19ffc919fa4fc6ace69286e0a23b3789b4219058313cf9b45625016bf7ff996b"},
{file = "brotlicffi-1.1.0.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9feb210d932ffe7798ee62e6145d3a757eb6233aa9a4e7db78dd3690d7755814"},
{file = "brotlicffi-1.1.0.0-cp37-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:84763dbdef5dd5c24b75597a77e1b30c66604725707565188ba54bab4f114820"},
{file = "brotlicffi-1.1.0.0-cp37-abi3-win32.whl", hash = "sha256:1b12b50e07c3911e1efa3a8971543e7648100713d4e0971b13631cce22c587eb"},
{file = "brotlicffi-1.1.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:994a4f0681bb6c6c3b0925530a1926b7a189d878e6e5e38fae8efa47c5d9c613"},
{file = "brotlicffi-1.1.0.0-pp310-pypy310_pp73-macosx_10_9_x86_64.whl", hash = "sha256:2e4aeb0bd2540cb91b069dbdd54d458da8c4334ceaf2d25df2f4af576d6766ca"},
{file = "brotlicffi-1.1.0.0-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4b7b0033b0d37bb33009fb2fef73310e432e76f688af76c156b3594389d81391"},
{file = "brotlicffi-1.1.0.0-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:54a07bb2374a1eba8ebb52b6fafffa2afd3c4df85ddd38fcc0511f2bb387c2a8"},
{file = "brotlicffi-1.1.0.0-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7901a7dc4b88f1c1475de59ae9be59799db1007b7d059817948d8e4f12e24e35"},
{file = "brotlicffi-1.1.0.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:ce01c7316aebc7fce59da734286148b1d1b9455f89cf2c8a4dfce7d41db55c2d"},
{file = "brotlicffi-1.1.0.0-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:246f1d1a90279bb6069de3de8d75a8856e073b8ff0b09dcca18ccc14cec85979"},
{file = "brotlicffi-1.1.0.0-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cc4bc5d82bc56ebd8b514fb8350cfac4627d6b0743382e46d033976a5f80fab6"},
{file = "brotlicffi-1.1.0.0-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:37c26ecb14386a44b118ce36e546ce307f4810bc9598a6e6cb4f7fca725ae7e6"},
{file = "brotlicffi-1.1.0.0-pp37-pypy37_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ca72968ae4eaf6470498d5c2887073f7efe3b1e7d7ec8be11a06a79cc810e990"},
{file = "brotlicffi-1.1.0.0-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:add0de5b9ad9e9aa293c3aa4e9deb2b61e99ad6c1634e01d01d98c03e6a354cc"},
{file = "brotlicffi-1.1.0.0-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:9b6068e0f3769992d6b622a1cd2e7835eae3cf8d9da123d7f51ca9c1e9c333e5"},
{file = "brotlicffi-1.1.0.0-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8557a8559509b61e65083f8782329188a250102372576093c88930c875a69838"},
{file = "brotlicffi-1.1.0.0-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2a7ae37e5d79c5bdfb5b4b99f2715a6035e6c5bf538c3746abc8e26694f92f33"},
{file = "brotlicffi-1.1.0.0-pp38-pypy38_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:391151ec86bb1c683835980f4816272a87eaddc46bb91cbf44f62228b84d8cca"},
{file = "brotlicffi-1.1.0.0-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:2f3711be9290f0453de8eed5275d93d286abe26b08ab4a35d7452caa1fef532f"},
{file = "brotlicffi-1.1.0.0-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:1a807d760763e398bbf2c6394ae9da5815901aa93ee0a37bca5efe78d4ee3171"},
{file = "brotlicffi-1.1.0.0-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fa8ca0623b26c94fccc3a1fdd895be1743b838f3917300506d04aa3346fd2a14"},
{file = "brotlicffi-1.1.0.0-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3de0cf28a53a3238b252aca9fed1593e9d36c1d116748013339f0949bfc84112"},
{file = "brotlicffi-1.1.0.0-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6be5ec0e88a4925c91f3dea2bb0013b3a2accda6f77238f76a34a1ea532a1cb0"},
{file = "brotlicffi-1.1.0.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:d9eb71bb1085d996244439154387266fd23d6ad37161f6f52f1cd41dd95a3808"},
{file = "brotlicffi-1.1.0.0.tar.gz", hash = "sha256:b77827a689905143f87915310b93b273ab17888fd43ef350d4832c4a71083c13"},
]
[package.dependencies]
cffi = ">=1.0.0"
[[package]]
name = "bs4"
version = "0.0.2"
@@ -2984,37 +2943,27 @@ h11 = ">=0.9.0,<1"
[[package]]
name = "yt-dlp"
version = "2024.9.27"
version = "2025.1.12"
description = "A feature-rich command-line audio/video downloader"
optional = false
python-versions = ">=3.8"
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "yt_dlp-2024.9.27-py3-none-any.whl", hash = "sha256:2717468dd697fcfcf9a89f493ba30a3830cdfb276c09750e5b561b08b9ef5f69"},
{file = "yt_dlp-2024.9.27.tar.gz", hash = "sha256:86605542e17e2e23ad23145b637ec308133762a15a5dedac4ae50b7973237026"},
{file = "yt_dlp-2025.1.12-py3-none-any.whl", hash = "sha256:f7ea19afb64f8e457a1b9598ddb67f8deaa313bf1d57abd5612db9272ab10795"},
{file = "yt_dlp-2025.1.12.tar.gz", hash = "sha256:8e7e246e2a5a2cff0a9c13db46844a37a547680702012058c94ec18fce0ca25a"},
]
[package.dependencies]
brotli = {version = "*", markers = "implementation_name == \"cpython\""}
brotlicffi = {version = "*", markers = "implementation_name != \"cpython\""}
certifi = "*"
mutagen = "*"
pycryptodomex = "*"
requests = ">=2.32.2,<3"
urllib3 = ">=1.26.17,<3"
websockets = ">=13.0"
[package.extras]
build = ["build", "hatchling", "pip", "setuptools (>=71.0.2)", "wheel"]
curl-cffi = ["curl-cffi (==0.5.10)", "curl-cffi (>=0.5.10,!=0.6.*,<0.7.2)"]
dev = ["autopep8 (>=2.0,<3.0)", "pre-commit", "pytest (>=8.1,<9.0)", "ruff (>=0.6.0,<0.7.0)"]
py2exe = ["py2exe (>=0.12)"]
pyinstaller = ["pyinstaller (>=6.10.0)"]
default = ["brotli", "brotlicffi", "certifi", "mutagen", "pycryptodomex", "requests (>=2.32.2,<3)", "urllib3 (>=1.26.17,<3)", "websockets (>=13.0)"]
dev = ["autopep8 (>=2.0,<3.0)", "pre-commit", "pytest (>=8.1,<9.0)", "pytest-rerunfailures (>=14.0,<15.0)", "ruff (>=0.9.0,<0.10.0)"]
pyinstaller = ["pyinstaller (>=6.11.1)"]
secretstorage = ["cffi", "secretstorage"]
static-analysis = ["autopep8 (>=2.0,<3.0)", "ruff (>=0.6.0,<0.7.0)"]
test = ["pytest (>=8.1,<9.0)"]
static-analysis = ["autopep8 (>=2.0,<3.0)", "ruff (>=0.9.0,<0.10.0)"]
test = ["pytest (>=8.1,<9.0)", "pytest-rerunfailures (>=14.0,<15.0)"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.13"
content-hash = "26a6b3bd13262d1a23c8e9f8d99a961ff503b21b0ce1ec0fd76591dcca45868c"
content-hash = "462c7c5f9d1fbae895d6299ba0b690b6e24d0655a4c9fc79f75ddef4eec222f8"

View File

@@ -46,7 +46,7 @@ dependencies = [
"cryptography (>=41.0.0,<42.0.0)",
"boto3 (>=1.28.0,<2.0.0)",
"dataclasses-json (>=0.0.0)",
"yt-dlp (==2024.09.27)",
"yt-dlp (==2025.1.12)",
"numpy (==2.1.3)",
"vk-url-scraper (>=0.0.0)",
"requests[socks] (>=0.0.0)",
@@ -83,4 +83,5 @@ documentation = "https://github.com/bellingcat/auto-archiver"
[tool.pytest.ini_options]
markers = [
"download: marks tests that download content from the network",
"incremental: marks a class to run tests incrementally. If a test fails in the class, the remaining tests will be skipped",
]

View File

@@ -7,13 +7,10 @@ collect and preserve a variety of content types, such as posts, images, videos a
"""
from .archiver import Archiver
from .telethon_archiver import TelethonArchiver
from .twitter_archiver import TwitterArchiver
from .twitter_api_archiver import TwitterApiArchiver
from .instagram_archiver import InstagramArchiver
from .instagram_tbot_archiver import InstagramTbotArchiver
from .tiktok_archiver import TiktokArchiver
from .telegram_archiver import TelegramArchiver
from .vk_archiver import VkArchiver
from .youtubedl_archiver import YoutubeDLArchiver
from .generic_archiver.generic_archiver import GenericArchiver as YoutubeDLArchiver
from .instagram_api_archiver import InstagramAPIArchiver
from .bluesky_archiver import BlueskyArchiver

View File

@@ -6,8 +6,10 @@
"""
from __future__ import annotations
from pathlib import Path
from abc import abstractmethod
from dataclasses import dataclass
import mimetypes
import os
import mimetypes, requests
from loguru import logger
@@ -44,6 +46,14 @@ class Archiver(Step):
def sanitize_url(self, url: str) -> str:
# used to clean unnecessary URL parameters OR unfurl redirect links
return url
def suitable(self, url: str) -> bool:
"""
Returns True if this archiver can handle the given URL
Should be overridden by subclasses
"""
return True
def _guess_file_type(self, path: str) -> str:
"""
@@ -58,10 +68,8 @@ class Archiver(Step):
@retry(wait_random_min=500, wait_random_max=3500, stop_max_attempt_number=5)
def download_from_url(self, url: str, to_filename: str = None, verbose=True) -> str:
"""
downloads a URL to provided filename, or inferred from URL, returns local filename
downloads a URL to provided filename, or inferred from URL, returns local filename
"""
# TODO: should we refactor to use requests.get(url, stream=True) and write to file in chunks? compare approaches
# TODO: should we guess the extension?
if not to_filename:
to_filename = url.split('/')[-1].split('?')[0]
if len(to_filename) > 64:
@@ -71,11 +79,24 @@ class Archiver(Step):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
}
d = requests.get(url, headers=headers)
assert d.status_code == 200, f"got response code {d.status_code} for {url=}"
with open(to_filename, 'wb') as f:
f.write(d.content)
return to_filename
try:
d = requests.get(url, stream=True, headers=headers, timeout=30)
d.raise_for_status()
# get mimetype from the response headers
if not Path(to_filename).suffix:
content_type = d.headers.get('Content-Type')
extension = mimetypes.guess_extension(content_type)
if extension:
to_filename += extension
with open(to_filename, 'wb') as f:
for chunk in d.iter_content(chunk_size=8192):
f.write(chunk)
return to_filename
except requests.RequestException as e:
logger.warning(f"Failed to fetch the Media URL: {e}")
@abstractmethod
def download(self, item: Metadata) -> Metadata:

View File

@@ -1,119 +0,0 @@
import os
import re, requests, mimetypes
from loguru import logger
from . import Archiver
from ..core import Metadata, Media, ArchivingContext
class BlueskyArchiver(Archiver):
"""
Uses an unauthenticated Bluesky API to archive posts including metadata, images and videos. Relies on `public.api.bsky.app/xrpc` and `bsky.social/xrpc`. Avoids ATProto to avoid auth.
Some inspiration from https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/extractor/bluesky.py
"""
name = "bluesky_archiver"
BSKY_POST = re.compile(r"/profile/([^/]+)/post/([a-zA-Z0-9]+)")
def __init__(self, config: dict) -> None:
super().__init__(config)
@staticmethod
def configs() -> dict:
return {}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
if not re.search(self.BSKY_POST, url):
return False
logger.debug(f"Identified a Bluesky post: {url}, archiving...")
result = Metadata()
# fetch post info and update result
post = self._get_post_from_uri(url)
logger.debug(f"Extracted post info: {post['record']['text']}")
result.set_title(post["record"]["text"])
result.set_timestamp(post["record"]["createdAt"])
for k, v in self._get_post_data(post).items():
if v: result.set(k, v)
# download if embeds present (1 video XOR >=1 images)
for media in self._download_bsky_embeds(post):
result.add_media(media)
logger.debug(f"Downloaded {len(result.media)} media files")
return result.success("bluesky")
def _get_post_from_uri(self, post_uri: str) -> dict:
"""
Calls a public (no auth needed) Bluesky API to get a post from its uri, uses .getPostThread as it brings author info as well (unlike .getPost).
"""
post_match = re.search(self.BSKY_POST, post_uri)
username = post_match.group(1)
post_id = post_match.group(2)
at_uri = f'at://{username}/app.bsky.feed.post/{post_id}'
r = requests.get(f"https://public.api.bsky.app/xrpc/app.bsky.feed.getPostThread?uri={at_uri}&depth=0&parent_height=0")
r.raise_for_status()
thread = r.json()
assert thread["thread"]["$type"] == "app.bsky.feed.defs#threadViewPost"
return thread["thread"]["post"]
def _download_bsky_embeds(self, post: dict) -> list[Media]:
"""
Iterates over image(s) or video in a Bluesky post and downloads them
"""
media = []
embed = post.get("record", {}).get("embed", {})
image_medias = embed.get("images", []) + embed.get("media", {}).get("images", [])
video_medias = [e for e in [embed.get("video"), embed.get("media", {}).get("video")] if e]
for image_media in image_medias:
image_media = self._download_bsky_file_as_media(image_media["image"]["ref"]["$link"], post["author"]["did"])
media.append(image_media)
for video_media in video_medias:
video_media = self._download_bsky_file_as_media(video_media["ref"]["$link"], post["author"]["did"])
media.append(video_media)
return media
def _download_bsky_file_as_media(self, cid: str, did: str) -> Media:
"""
Uses the Bluesky API to download a file by its `cid` and `did`.
"""
# TODO: replace with self.download_from_url once that function has been cleaned-up
file_url = f"https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={cid}&did={did}"
response = requests.get(file_url, stream=True)
response.raise_for_status()
ext = mimetypes.guess_extension(response.headers["Content-Type"])
filename = os.path.join(ArchivingContext.get_tmp_dir(), f"{cid}{ext}")
with open(filename, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
media = Media(filename=filename)
media.set("src", file_url)
return media
def _get_post_data(self, post: dict) -> dict:
"""
Extracts relevant information returned by the .getPostThread api call (excluding text/created_at): author, mentions, tags, links.
"""
author = post["author"]
if "labels" in author and not author["labels"]: del author["labels"]
if "associated" in author: del author["associated"]
mentions, tags, links = [], [], []
facets = post.get("record", {}).get("facets", [])
for f in facets:
for feature in f["features"]:
if feature["$type"] == "app.bsky.richtext.facet#mention":
mentions.append(feature["did"])
elif feature["$type"] == "app.bsky.richtext.facet#tag":
tags.append(feature["tag"])
elif feature["$type"] == "app.bsky.richtext.facet#link":
links.append(feature["uri"])
res = {"author": author}
if mentions: res["mentions"] = mentions
if tags: res["tags"] = tags
if links: res["links"] = links
return res

View File

@@ -0,0 +1 @@
from .generic_archiver import GenericArchiver

View File

@@ -0,0 +1,93 @@
import os
import mimetypes
import requests
from loguru import logger
from auto_archiver.core.context import ArchivingContext
from auto_archiver.archivers.archiver import Archiver
from auto_archiver.core.metadata import Metadata, Media
from .dropin import GenericDropin, InfoExtractor
class Bluesky(GenericDropin):
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata:
result = Metadata()
result.set_url(url)
result.set_title(post["record"]["text"])
result.set_timestamp(post["record"]["createdAt"])
for k, v in self._get_post_data(post).items():
if v: result.set(k, v)
# download if embeds present (1 video XOR >=1 images)
for media in self._download_bsky_embeds(post, archiver):
result.add_media(media)
logger.debug(f"Downloaded {len(result.media)} media files")
return result
def extract_post(self, url: str, ie_instance: InfoExtractor) -> dict:
# TODO: If/when this PR (https://github.com/yt-dlp/yt-dlp/pull/12098) is merged on ytdlp, remove the comments and delete the code below
# handle, video_id = ie_instance._match_valid_url(url).group('handle', 'id')
# return ie_instance._extract_post(handle=handle, post_id=video_id)
handle, video_id = ie_instance._match_valid_url(url).group('handle', 'id')
return ie_instance._download_json(
'https://public.api.bsky.app/xrpc/app.bsky.feed.getPostThread',
video_id, query={
'uri': f'at://{handle}/app.bsky.feed.post/{video_id}',
'depth': 0,
'parentHeight': 0,
})['thread']['post']
def _download_bsky_embeds(self, post: dict, archiver: Archiver) -> list[Media]:
"""
Iterates over image(s) or video in a Bluesky post and downloads them
"""
media = []
embed = post.get("record", {}).get("embed", {})
image_medias = embed.get("images", []) + embed.get("media", {}).get("images", [])
video_medias = [e for e in [embed.get("video"), embed.get("media", {}).get("video")] if e]
media_url = "https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={}&did={}"
for image_media in image_medias:
url = media_url.format(image_media['image']['ref']['$link'], post['author']['did'])
image_media = archiver.download_from_url(url)
media.append(image_media)
for video_media in video_medias:
url = media_url.format(video_media['ref']['$link'], post['author']['did'])
video_media = archiver.download_from_url(url)
media.append(video_media)
return media
def _get_post_data(self, post: dict) -> dict:
"""
Extracts relevant information returned by the .getPostThread api call (excluding text/created_at): author, mentions, tags, links.
"""
author = post["author"]
if "labels" in author and not author["labels"]:
del author["labels"]
if "associated" in author:
del author["associated"]
mentions, tags, links = [], [], []
facets = post.get("record", {}).get("facets", [])
for f in facets:
for feature in f["features"]:
if feature["$type"] == "app.bsky.richtext.facet#mention":
mentions.append(feature["did"])
elif feature["$type"] == "app.bsky.richtext.facet#tag":
tags.append(feature["tag"])
elif feature["$type"] == "app.bsky.richtext.facet#link":
links.append(feature["uri"])
res = {"author": author}
if mentions:
res["mentions"] = mentions
if tags:
res["tags"] = tags
if links:
res["links"] = links
return res

View File

@@ -0,0 +1,58 @@
from yt_dlp.extractor.common import InfoExtractor
from auto_archiver.core.metadata import Metadata
from auto_archiver.archivers.archiver import Archiver
class GenericDropin:
"""Base class for dropins for the generic extractor.
In many instances, an extractor will exist in ytdlp, but it will only process videos.
Dropins can be created and used to make use of the already-written private code of a
specific extractor from ytdlp.
The dropin should be able to handle the following methods:
- `get_post_data`: This method should be able to extract the post data from the url and return it as a dict.
- `create_metadata`: This method should be able to create a Metadata object from a post dict.
Optional methods include:
- `skip_ytdlp_download`: If you want to skip the ytdlp 'download' method all together, and do your own, then return True for this method.
This is useful in cases where ytdlp might not work properly for all of your posts
- `keys_to_clean`: for the generic 'video_data' created by ytdlp (for video URLs), any additional fields you would like to clean out of the data before storing in metadata
"""
def extract_post(self, url: str, ie_instance: InfoExtractor):
"""
This method should return the post data from the url.
"""
raise NotImplementedError("This method should be implemented in the subclass")
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata:
"""
This method should create a Metadata object from the post data.
"""
raise NotImplementedError("This method should be implemented in the subclass")
def skip_ytdlp_download(self, url: str, ie_instance: InfoExtractor):
"""
This method should return True if you want to skip the ytdlp download method.
"""
return False
def keys_to_clean(self, video_data: dict, info_extractor: InfoExtractor):
"""
This method should return a list of strings (keys) to clean from the video_data dict.
E.g. ["uploader", "uploader_id", "tiktok_specific_field"]
"""
return []
def download_additional_media(self, video_data: dict, info_extractor: InfoExtractor, metadata: Metadata):
"""
This method should download any additional media from the post.
"""
return metadata

View File

@@ -0,0 +1,321 @@
"""
This is the generic archiver used by auto-archiver, which uses `yt-dlp` under the hood.
This module is responsible for downloading and processing media content from platforms
supported by `yt-dlp`, such as YouTube, Facebook, and others. It provides functionality
for retrieving videos, subtitles, comments, and other metadata, and it integrates with
the broader archiving framework.
### Features
- Supports downloading videos and playlists.
- Retrieves metadata like titles, descriptions, upload dates, and durations.
- Downloads subtitles and comments when enabled.
- Configurable options for handling live streams, proxies, and more.
### Dropins
- For websites supported by `yt-dlp` that also contain posts in addition to videos
(e.g. Facebook, Twitter, Bluesky), dropins can be created to extract post data and create
metadata objects. Some dropins are included in this generic_archiver by default, but
custom dropins can be created to handle additional websites and passed to the archiver
via the command line using the `--dropins` option (TODO!).
"""
import datetime, os, yt_dlp, pysubs2
import importlib
from typing import Type
from yt_dlp.extractor.common import InfoExtractor
from loguru import logger
from auto_archiver.archivers.archiver import Archiver
from ...core import Metadata, Media, ArchivingContext
class GenericArchiver(Archiver):
name = "youtubedl_archiver" #left as is for backwards compat
_dropins = {}
def __init__(self, config: dict) -> None:
super().__init__(config)
self.subtitles = bool(self.subtitles)
self.comments = bool(self.comments)
self.livestreams = bool(self.livestreams)
self.live_from_start = bool(self.live_from_start)
self.end_means_success = bool(self.end_means_success)
self.allow_playlist = bool(self.allow_playlist)
self.max_downloads = self.max_downloads
def suitable_extractors(self, url: str) -> list[str]:
"""
Returns a list of valid extractors for the given URL"""
for info_extractor in yt_dlp.YoutubeDL()._ies.values():
if info_extractor.suitable(url) and info_extractor.working():
yield info_extractor
def suitable(self, url: str) -> bool:
"""
Checks for valid URLs out of all ytdlp extractors.
Returns False for the GenericIE, which as labelled by yt-dlp: 'Generic downloader that works on some sites'
"""
return any(self.suitable_extractors(url))
def download_additional_media(self, video_data: dict, info_extractor: InfoExtractor, metadata: Metadata) -> Metadata:
"""
Downloads additional media like images, comments, subtitles, etc.
Creates a 'media' object and attaches it to the metadata object.
"""
# Just get the main thumbnail. More thumbnails are available in
# video_data['thumbnails'] should they be required
thumbnail_url = video_data.get('thumbnail')
if thumbnail_url:
try:
cover_image_path = self.download_from_url(thumbnail_url)
media = Media(cover_image_path)
metadata.add_media(media, id="cover")
except Exception as e:
logger.error(f"Error downloading cover image {thumbnail_url}: {e}")
dropin = self.dropin_for_name(info_extractor.ie_key())
if dropin:
try:
metadata = dropin.download_additional_media(video_data, info_extractor, metadata)
except AttributeError:
pass
return metadata
def keys_to_clean(self, info_extractor: InfoExtractor, video_data: dict) -> dict:
"""
Clean up the ytdlp generic video data to make it more readable and remove unnecessary keys that ytdlp adds
"""
base_keys = ['formats', 'thumbnail', 'display_id', 'epoch', 'requested_downloads',
'duration_string', 'thumbnails', 'http_headers', 'webpage_url_basename', 'webpage_url_domain',
'extractor', 'extractor_key', 'playlist', 'playlist_index', 'duration_string', 'protocol', 'requested_subtitles',
'format_id', 'acodec', 'vcodec', 'ext', 'epoch', '_has_drm', 'filesize', 'audio_ext', 'video_ext', 'vbr', 'abr',
'resolution', 'dynamic_range', 'aspect_ratio', 'cookies', 'format', 'quality', 'preference', 'artists',
'channel_id', 'subtitles', 'tbr', 'url', 'original_url', 'automatic_captions', 'playable_in_embed', 'live_status',
'_format_sort_fields', 'chapters', 'requested_formats', 'format_note',
'audio_channels', 'asr', 'fps', 'was_live', 'is_live', 'heatmap', 'age_limit', 'stretched_ratio']
dropin = self.dropin_for_name(info_extractor.ie_key())
if dropin:
try:
base_keys += dropin.keys_to_clean(video_data, info_extractor)
except AttributeError:
pass
return base_keys
def add_metadata(self, video_data: dict, info_extractor: InfoExtractor, url:str, result: Metadata) -> Metadata:
"""
Creates a Metadata object from the given video_data
"""
# first add the media
result = self.download_additional_media(video_data, info_extractor, result)
# keep both 'title' and 'fulltitle', but prefer 'title', falling back to 'fulltitle' if it doesn't exist
result.set_title(video_data.pop('title', video_data.pop('fulltitle', "")))
result.set_url(url)
# extract comments if enabled
if self.comments:
result.set("comments", [{
"text": c["text"],
"author": c["author"],
"timestamp": datetime.datetime.fromtimestamp(c.get("timestamp"), tz = datetime.timezone.utc)
} for c in video_data.get("comments", [])])
# then add the common metadata
if timestamp := video_data.pop("timestamp", None):
timestamp = datetime.datetime.fromtimestamp(timestamp, tz = datetime.timezone.utc).isoformat()
result.set_timestamp(timestamp)
if upload_date := video_data.pop("upload_date", None):
upload_date = datetime.datetime.strptime(upload_date, '%Y%m%d').replace(tzinfo=datetime.timezone.utc)
result.set("upload_date", upload_date)
# then clean away any keys we don't want
for clean_key in self.keys_to_clean(info_extractor, video_data):
video_data.pop(clean_key, None)
# then add the rest of the video data
for k, v in video_data.items():
if v:
result.set(k, v)
return result
def get_metadata_for_post(self, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata:
"""
Calls into the ytdlp InfoExtract subclass to use the prive _extract_post method to get the post metadata.
"""
ie_instance = info_extractor(downloader=ydl)
dropin = self.dropin_for_name(info_extractor.ie_key())
if not dropin:
# TODO: add a proper link to 'how to create your own dropin'
logger.debug(f"""Could not find valid dropin for {info_extractor.IE_NAME}.
Why not try creating your own, and make sure it has a valid function called 'create_metadata'. Learn more: https://auto-archiver.readthedocs.io/en/latest/user_guidelines.html#""")
return False
post_data = dropin.extract_post(url, ie_instance)
return dropin.create_metadata(post_data, ie_instance, self, url)
def get_metadata_for_video(self, data: dict, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata:
# this time download
ydl.params['getcomments'] = self.comments
#TODO: for playlist or long lists of videos, how to download one at a time so they can be stored before the next one is downloaded?
data = ydl.extract_info(url, ie_key=info_extractor.ie_key(), download=True)
if "entries" in data:
entries = data.get("entries", [])
if not len(entries):
logger.warning('YoutubeDLArchiver could not find any video')
return False
else: entries = [data]
result = Metadata()
for entry in entries:
try:
filename = ydl.prepare_filename(entry)
if not os.path.exists(filename):
filename = filename.split('.')[0] + '.mkv'
new_media = Media(filename)
for x in ["duration", "original_url", "fulltitle", "description", "upload_date"]:
if x in entry: new_media.set(x, entry[x])
# read text from subtitles if enabled
if self.subtitles:
for lang, val in (data.get('requested_subtitles') or {}).items():
try:
subs = pysubs2.load(val.get('filepath'), encoding="utf-8")
text = " ".join([line.text for line in subs])
new_media.set(f"subtitles_{lang}", text)
except Exception as e:
logger.error(f"Error loading subtitle file {val.get('filepath')}: {e}")
result.add_media(new_media)
except Exception as e:
logger.error(f"Error processing entry {entry}: {e}")
return self.add_metadata(data, info_extractor, url, result)
def dropin_for_name(self, dropin_name: str, additional_paths = [], package=__package__) -> Type[InfoExtractor]:
if dropin_name == "generic":
# no need for a dropin for the generic extractor (?)
return None
dropin_class_name = dropin_name.title()
def _load_dropin(dropin):
dropin_class = getattr(dropin, dropin_class_name)()
return self._dropins.setdefault(dropin_name, dropin_class)
try:
return self._dropins[dropin_name]
except KeyError:
pass
# TODO: user should be able to pass --dropins="/some/folder,/other/folder" as a cmd line option
# which would allow the user to override the default dropins/add their own
paths = [] + additional_paths
for path in paths:
dropin_path = os.path.join(path, f"{dropin_name}.py")
dropin_spec = importlib.util.spec_from_file_location(dropin_name, dropin_path)
if not dropin_spec:
continue
try:
dropin = importlib.util.module_from_spec(dropin_spec)
dropin_spec.loader.exec_module(dropin)
return _load_dropin(dropin)
except (FileNotFoundError, ModuleNotFoundError):
pass
# fallback to loading the dropins within auto-archiver
try:
return _load_dropin(importlib.import_module(f".{dropin_name}", package=package))
except ModuleNotFoundError:
pass
return None
def download_for_extractor(self, info_extractor: InfoExtractor, url: str, ydl: yt_dlp.YoutubeDL) -> Metadata:
"""
Tries to download the given url using the specified extractor
It first tries to use ytdlp directly to download the video. If the post is not a video, it will then try to
use the extractor's _extract_post method to get the post metadata if possible.
"""
# when getting info without download, we also don't need the comments
ydl.params['getcomments'] = False
result = False
dropin_submodule = self.dropin_for_name(info_extractor.ie_key())
try:
if dropin_submodule and dropin_submodule.skip_ytdlp_download(info_extractor, url):
raise Exception(f"Skipping using ytdlp to download files for {info_extractor.ie_key()}")
# don't download since it can be a live stream
data = ydl.extract_info(url, ie_key=info_extractor.ie_key(), download=False)
if data.get('is_live', False) and not self.livestreams:
logger.warning("Livestream detected, skipping due to 'livestreams' configuration setting")
return False
# it's a valid video, that the youtubdedl can download out of the box
result = self.get_metadata_for_video(data, info_extractor, url, ydl)
except Exception as e:
logger.debug(f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use extractor to get post data instead')
try:
result = self.get_metadata_for_post(info_extractor, url, ydl)
except (yt_dlp.utils.DownloadError, yt_dlp.utils.ExtractorError) as post_e:
logger.error(f'Error downloading metadata for post: {post_e}')
return False
except Exception as generic_e:
logger.debug(f'Attempt to extract using ytdlp extractor "{info_extractor.IE_NAME}" failed: \n {repr(generic_e)}', exc_info=True)
return False
if result:
extractor_name = "yt-dlp"
if info_extractor:
extractor_name += f"_{info_extractor.ie_key()}"
if self.end_means_success:
result.success(extractor_name)
else:
result.status = extractor_name
return result
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
if item.netloc in ['facebook.com', 'www.facebook.com'] and self.facebook_cookie:
logger.debug('Using Facebook cookie')
yt_dlp.utils.std_headers['cookie'] = self.facebook_cookie
ydl_options = {'outtmpl': os.path.join(ArchivingContext.get_tmp_dir(), f'%(id)s.%(ext)s'), 'quiet': False, 'noplaylist': not self.allow_playlist , 'writesubtitles': self.subtitles, 'writeautomaticsub': self.subtitles, "live_from_start": self.live_from_start, "proxy": self.proxy, "max_downloads": self.max_downloads, "playlistend": self.max_downloads}
if item.netloc in ['youtube.com', 'www.youtube.com']:
if self.cookies_from_browser:
logger.debug(f'Extracting cookies from browser {self.cookies_from_browser} for Youtube')
ydl_options['cookiesfrombrowser'] = (self.cookies_from_browser,)
elif self.cookie_file:
logger.debug(f'Using cookies from file {self.cookie_file}')
ydl_options['cookiefile'] = self.cookie_file
ydl = yt_dlp.YoutubeDL(ydl_options) # allsubtitles and subtitleslangs not working as expected, so default lang is always "en"
for info_extractor in self.suitable_extractors(url):
result = self.download_for_extractor(info_extractor, url, ydl)
if result:
return result
return False

View File

@@ -0,0 +1,52 @@
from typing import Type
from auto_archiver.utils import traverse_obj
from auto_archiver.core.metadata import Metadata, Media
from auto_archiver.archivers.archiver import Archiver
from yt_dlp.extractor.common import InfoExtractor
from dateutil.parser import parse as parse_dt
from .dropin import GenericDropin
class Truth(GenericDropin):
def extract_post(self, url, ie_instance: InfoExtractor) -> dict:
video_id = ie_instance._match_id(url)
truthsocial_url = f'https://truthsocial.com/api/v1/statuses/{video_id}'
return ie_instance._download_json(truthsocial_url, video_id)
def skip_ytdlp_download(self, url, ie_instance: Type[InfoExtractor]) -> bool:
return True
def create_metadata(self, post: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata:
"""
Creates metadata from a truth social post
Only used for posts that contain no media. ytdlp.TruthIE extractor can handle posts with media
Format is:
{'id': '109598702184774628', 'created_at': '2022-12-29T19:51:18.161Z', 'in_reply_to_id': None, 'quote_id': None, 'in_reply_to_account_id': None, 'sensitive': False, 'spoiler_text': '', 'visibility': 'public', 'language': 'en', 'uri': 'https://truthsocial.com/@bbcnewa/109598702184774628', 'url': 'https://truthsocial.com/@bbcnewa/109598702184774628', 'content': '<p>Pele, regarded by many as football\'s greatest ever player, has died in Brazil at the age of 82. <a href="https://www.bbc.com/sport/football/42751517" rel="nofollow noopener noreferrer" target="_blank"><span class="invisible">https://www.</span><span class="ellipsis">bbc.com/sport/football/4275151</span><span class="invisible">7</span></a></p>', 'account': {'id': '107905163010312793', 'username': 'bbcnewa', 'acct': 'bbcnewa', 'display_name': 'BBC News', 'locked': False, 'bot': False, 'discoverable': True, 'group': False, 'created_at': '2022-03-05T17:42:01.159Z', 'note': '<p>News, features and analysis by the BBC</p>', 'url': 'https://truthsocial.com/@bbcnewa', 'avatar': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/avatars/107/905/163/010/312/793/original/e7c07550dc22c23a.jpeg', 'avatar_static': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/avatars/107/905/163/010/312/793/original/e7c07550dc22c23a.jpeg', 'header': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/headers/107/905/163/010/312/793/original/a00eeec2b57206c7.jpeg', 'header_static': 'https://static-assets-1.truthsocial.com/tmtg:prime-ts-assets/accounts/headers/107/905/163/010/312/793/original/a00eeec2b57206c7.jpeg', 'followers_count': 1131, 'following_count': 3, 'statuses_count': 9, 'last_status_at': '2024-11-12', 'verified': False, 'location': '', 'website': 'https://www.bbc.com/news', 'unauth_visibility': True, 'chats_onboarded': True, 'feeds_onboarded': True, 'accepting_messages': False, 'show_nonmember_group_statuses': None, 'emojis': [], 'fields': [], 'tv_onboarded': True, 'tv_account': False}, 'media_attachments': [], 'mentions': [], 'tags': [], 'card': None, 'group': None, 'quote': None, 'in_reply_to': None, 'reblog': None, 'sponsored': False, 'replies_count': 1, 'reblogs_count': 0, 'favourites_count': 2, 'favourited': False, 'reblogged': False, 'muted': False, 'pinned': False, 'bookmarked': False, 'poll': None, 'emojis': []}
"""
result = Metadata()
result.set_url(url)
timestamp = post['created_at'] # format is 2022-12-29T19:51:18.161Z
result.set_timestamp(parse_dt(timestamp))
result.set('description', post['content'])
result.set('author', post['account']['username'])
for key in ['replies_count', 'reblogs_count', 'favourites_count', ('account', 'followers_count'), ('account', 'following_count'), ('account', 'statuses_count'), ('account', 'display_name'), 'language', 'in_reply_to_account', 'replies_count']:
if isinstance(key, tuple):
store_key = " ".join(key)
else:
store_key = key
result.set(store_key, traverse_obj(post, key))
# add the media
for media in post.get('media_attachments', []):
filename = archiver.download_from_url(media['url'])
result.add_media(Media(filename), id=media.get('id'))
return result

View File

@@ -0,0 +1,70 @@
import re, mimetypes, json
from datetime import datetime
from loguru import logger
from slugify import slugify
from auto_archiver.core.metadata import Metadata, Media
from auto_archiver.utils import UrlUtil
from auto_archiver.archivers.archiver import Archiver
from .dropin import GenericDropin, InfoExtractor
class Twitter(GenericDropin):
def choose_variant(self, variants):
# choosing the highest quality possible
variant, width, height = None, 0, 0
for var in variants:
if var.get("content_type", "") == "video/mp4":
width_height = re.search(r"\/(\d+)x(\d+)\/", var["url"])
if width_height:
w, h = int(width_height[1]), int(width_height[2])
if w > width or h > height:
width, height = w, h
variant = var
else:
variant = var if not variant else variant
return variant
def extract_post(self, url: str, ie_instance: InfoExtractor):
twid = ie_instance._match_valid_url(url).group('id')
return ie_instance._extract_status(twid=twid)
def create_metadata(self, tweet: dict, ie_instance: InfoExtractor, archiver: Archiver, url: str) -> Metadata:
result = Metadata()
try:
if not tweet.get("user") or not tweet.get("created_at"):
raise ValueError(f"Error retreiving post. Are you sure it exists?")
timestamp = datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y")
except (ValueError, KeyError) as ex:
logger.warning(f"Unable to parse tweet: {str(ex)}\nRetreived tweet data: {tweet}")
return False
result\
.set_title(tweet.get('full_text', ''))\
.set_content(json.dumps(tweet, ensure_ascii=False))\
.set_timestamp(timestamp)
if not tweet.get("entities", {}).get("media"):
logger.debug('No media found, archiving tweet text only')
result.status = "twitter-ytdl"
return result
for i, tw_media in enumerate(tweet["entities"]["media"]):
media = Media(filename="")
mimetype = ""
if tw_media["type"] == "photo":
media.set("src", UrlUtil.twitter_best_quality_url(tw_media['media_url_https']))
mimetype = "image/jpeg"
elif tw_media["type"] == "video":
variant = self.choose_variant(tw_media['video_info']['variants'])
media.set("src", variant['url'])
mimetype = variant['content_type']
elif tw_media["type"] == "animated_gif":
variant = tw_media['video_info']['variants'][0]
media.set("src", variant['url'])
mimetype = variant['content_type']
ext = mimetypes.guess_extension(mimetype)
media.filename = archiver.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}')
result.add_media(media)
return result

View File

@@ -1,55 +0,0 @@
import json, os, traceback
from loguru import logger
from . import Archiver
from ..core import Metadata, Media, ArchivingContext
from ..utils.misc import random_str
class TiktokArchiver(Archiver):
name = "tiktok_archiver"
def __init__(self, config: dict) -> None:
super().__init__(config)
@staticmethod
def configs() -> dict:
return {}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
if 'tiktok.com' not in url:
return False
result = Metadata()
try:
info = tiktok_downloader.info_post(url)
result.set_title(info.desc)
result.set_timestamp(info.create_time)
result.set_content(json.dumps({
"cover": info.cover,
"author": info.author,
"music_title": info.author,
"caption": getattr(info, "caption", info.desc),
}, ensure_ascii=False, indent=4))
except:
error = traceback.format_exc()
logger.warning(f'Other Tiktok error {error}')
try:
filename = os.path.join(ArchivingContext.get_tmp_dir(), f'{random_str(8)}.mp4')
tiktok_media = tiktok_downloader.snaptik(url).get_media()
if len(tiktok_media) <= 0:
logger.debug(f"TikTok: could not get media from {url=}")
return False
logger.info(f'downloading video {filename=}')
tiktok_media[0].download(filename)
result.add_media(Media(filename))
return result.success("tiktok")
except:
error = traceback.format_exc()
logger.warning(f'Other Tiktok error {error}')

View File

@@ -1,17 +1,19 @@
import json, mimetypes
import json
import re
import mimetypes
import requests
from datetime import datetime
from loguru import logger
from pytwitter import Api
from slugify import slugify
from . import Archiver
from .twitter_archiver import TwitterArchiver
from ..core import Metadata,Media
class TwitterApiArchiver(TwitterArchiver, Archiver):
class TwitterApiArchiver(Archiver):
name = "twitter_api_archiver"
link_pattern = re.compile(r"(?:twitter|x).com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
def __init__(self, config: dict) -> None:
super().__init__(config)
@@ -47,6 +49,17 @@ class TwitterApiArchiver(TwitterArchiver, Archiver):
def api_client(self) -> str:
return self.apis[self.api_index]
def sanitize_url(self, url: str) -> str:
# expand URL if t.co and clean tracker GET params
if 'https://t.co/' in url:
try:
r = requests.get(url, timeout=30)
logger.debug(f'Expanded url {url} to {r.url}')
url = r.url
except:
logger.error(f'Failed to expand url {url}')
return url
def download(self, item: Metadata) -> Metadata:
# call download retry until success or no more apis
@@ -56,6 +69,16 @@ class TwitterApiArchiver(TwitterArchiver, Archiver):
self.api_index = 0
return False
def get_username_tweet_id(self, url):
# detect URLs that we definitely cannot handle
matches = self.link_pattern.findall(url)
if not len(matches): return False, False
username, tweet_id = matches[0] # only one URL supported
logger.debug(f"Found {username=} and {tweet_id=} in {url=}")
return username, tweet_id
def download_retry(self, item: Metadata) -> Metadata:
url = item.get_url()
# detect URLs that we definitely cannot handle
@@ -102,10 +125,13 @@ class TwitterApiArchiver(TwitterArchiver, Archiver):
"lang": tweet.data.lang,
"media": urls
}, ensure_ascii=False, indent=4))
return result.success("twitter")
return result.success("twitter-api")
def choose_variant(self, variants):
# choosing the highest quality possible
"""
Chooses the highest quality variable possible out of a list of variants
"""
variant, bit_rate = None, -1
for var in variants:
if var.content_type == "video/mp4":

View File

@@ -1,209 +0,0 @@
import re, requests, mimetypes, json, math
from typing import Union
from datetime import datetime
from loguru import logger
from yt_dlp import YoutubeDL
from yt_dlp.extractor.twitter import TwitterIE
from slugify import slugify
from . import Archiver
from ..core import Metadata, Media
from ..utils import UrlUtil
class TwitterArchiver(Archiver):
"""
This Twitter Archiver uses unofficial scraping methods.
"""
name = "twitter_archiver"
link_pattern = re.compile(r"(?:twitter|x).com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
link_clean_pattern = re.compile(r"(.+(?:twitter|x)\.com\/.+\/\d+)(\?)*.*")
def __init__(self, config: dict) -> None:
super().__init__(config)
@staticmethod
def configs() -> dict:
return {}
def sanitize_url(self, url: str) -> str:
# expand URL if t.co and clean tracker GET params
if 'https://t.co/' in url:
try:
r = requests.get(url, timeout=30)
logger.debug(f'Expanded url {url} to {r.url}')
url = r.url
except:
logger.error(f'Failed to expand url {url}')
# https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w
return self.link_clean_pattern.sub("\\1", url)
def download(self, item: Metadata) -> Metadata:
"""
if this url is archivable will download post info and look for other posts from the same group with media.
can handle private/public channels
"""
url = item.get_url()
username, tweet_id = self.get_username_tweet_id(url)
if not username: return False
strategies = [self.download_yt_dlp, self.download_syndication]
for strategy in strategies:
logger.debug(f"Trying {strategy.__name__} for {url=}")
try:
result = strategy(item, url, tweet_id)
if result: return result
except Exception as ex:
logger.error(f"Failed to download {url} with {strategy.__name__}: {type(ex).__name__} occurred. args: {ex.args}")
logger.warning(f"No free strategy worked for {url}")
return False
def generate_token(self, tweet_id: str) -> str:
"""Generates the syndication token for a tweet ID.
Taken from https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-2211358215
And Vercel's code: https://github.com/vercel/react-tweet/blob/main/packages/react-tweet/src/api/fetch-tweet.ts#L27
"""
# Perform the division and multiplication by π
result = (int(tweet_id) / 1e15) * math.pi
fractional_part = result % 1
# Convert to base 36
base_36 = ''
while result >= 1:
base_36 = "0123456789abcdefghijklmnopqrstuvwxyz"[int(result % 36)] + base_36
result = math.floor(result / 36)
# Append fractional part in base 36
while fractional_part > 0 and len(base_36) < 11: # Limit to avoid infinite loop
fractional_part *= 36
digit = int(fractional_part)
base_36 += "0123456789abcdefghijklmnopqrstuvwxyz"[digit]
fractional_part -= digit
# Remove leading zeros and dots
return base_36.replace('0', '').replace('.', '')
def download_syndication(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]:
"""
Downloads tweets using Twitter's own embed API (Hack).
Background on method can be found at:
https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-1615937362
https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-2211358215
next to test: https://cdn.embedly.com/widgets/media.html?&schema=twitter&url=https://twitter.com/bellingcat/status/1674700676612386816
"""
hack_url = "https://cdn.syndication.twimg.com/tweet-result"
params = {
'id': tweet_id,
'token': self.generate_token(tweet_id)
}
r = requests.get(hack_url, params=params, timeout=10)
if r.status_code != 200 or r.json()=={}:
logger.warning(f"SyndicationHack: Failed to get tweet information from {hack_url}.")
return False
result = Metadata()
tweet = r.json()
if tweet.get('__typename') == 'TweetTombstone':
logger.error(f"Failed to get tweet {tweet_id}: {tweet['tombstone']['text']['text']}")
return False
urls = []
for p in tweet.get("photos", []):
urls.append(p["url"])
# 1 tweet has 1 video max
if "video" in tweet:
v = tweet["video"]
urls.append(self.choose_variant(v.get("variants", []))['url'])
logger.debug(f"Twitter hack got media {urls=}")
for i, u in enumerate(urls):
media = Media(filename="")
u = UrlUtil.twitter_best_quality_url(u)
media.set("src", u)
ext = ""
if (mtype := mimetypes.guess_type(UrlUtil.remove_get_parameters(u))[0]):
ext = mimetypes.guess_extension(mtype)
media.filename = self.download_from_url(u, f'{slugify(url)}_{i}{ext}')
result.add_media(media)
result.set_title(tweet.get("text")).set_content(json.dumps(tweet, ensure_ascii=False)).set_timestamp(datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ"))
return result.success("twitter-syndication")
def download_yt_dlp(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]:
downloader = YoutubeDL()
tie = TwitterIE(downloader)
tweet = tie._extract_status(tweet_id)
result = Metadata()
try:
if not tweet.get("user") or not tweet.get("created_at"):
raise ValueError(f"Error retreiving post with id {tweet_id}. Are you sure it exists?")
timestamp = datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y")
except (ValueError, KeyError) as ex:
logger.warning(f"Unable to parse tweet: {str(ex)}\nRetreived tweet data: {tweet}")
return False
result\
.set_title(tweet.get('full_text', ''))\
.set_content(json.dumps(tweet, ensure_ascii=False))\
.set_timestamp(timestamp)
if not tweet.get("entities", {}).get("media"):
logger.debug('No media found, archiving tweet text only')
result.status = "twitter-ytdl"
return result
for i, tw_media in enumerate(tweet["entities"]["media"]):
media = Media(filename="")
mimetype = ""
if tw_media["type"] == "photo":
media.set("src", UrlUtil.twitter_best_quality_url(tw_media['media_url_https']))
mimetype = "image/jpeg"
elif tw_media["type"] == "video":
variant = self.choose_variant(tw_media['video_info']['variants'])
media.set("src", variant['url'])
mimetype = variant['content_type']
elif tw_media["type"] == "animated_gif":
variant = tw_media['video_info']['variants'][0]
media.set("src", variant['url'])
mimetype = variant['content_type']
ext = mimetypes.guess_extension(mimetype)
media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}', item)
result.add_media(media)
return result.success("twitter-ytdl")
def get_username_tweet_id(self, url):
# detect URLs that we definitely cannot handle
matches = self.link_pattern.findall(url)
if not len(matches): return False, False
username, tweet_id = matches[0] # only one URL supported
logger.debug(f"Found {username=} and {tweet_id=} in {url=}")
return username, tweet_id
def choose_variant(self, variants):
# choosing the highest quality possible
variant, width, height = None, 0, 0
for var in variants:
if var.get("content_type", "") == "video/mp4":
width_height = re.search(r"\/(\d+)x(\d+)\/", var["url"])
if width_height:
w, h = int(width_height[1]), int(width_height[2])
if w > width or h > height:
width, height = w, h
variant = var
else:
variant = var if not variant else variant
return variant

View File

@@ -1,150 +1,2 @@
"""
This defines an archiver implementation using `yt-dlp`.
This module is responsible for downloading and processing media content from platforms
supported by `yt-dlp`, such as YouTube, Facebook, and others. It provides functionality
for retrieving videos, subtitles, comments, and other metadata, and it integrates with
the broader archiving framework.
### Features
- Supports downloading videos and playlists.
- Retrieves metadata like titles, descriptions, upload dates, and durations.
- Downloads subtitles and comments when enabled.
- Configurable options for handling live streams, proxies, and more.
"""
import datetime
import os
import pysubs2
import yt_dlp
from loguru import logger
from . import Archiver
from ..core import Metadata, Media, ArchivingContext
class YoutubeDLArchiver(Archiver):
name = "youtubedl_archiver"
def __init__(self, config: dict) -> None:
super().__init__(config)
self.subtitles = bool(self.subtitles)
self.comments = bool(self.comments)
self.livestreams = bool(self.livestreams)
self.live_from_start = bool(self.live_from_start)
self.end_means_success = bool(self.end_means_success)
self.allow_playlist = bool(self.allow_playlist)
self.max_downloads = self.max_downloads
@staticmethod
def configs() -> dict:
return {
"facebook_cookie": {"default": None, "help": "optional facebook cookie to have more access to content, from browser, looks like 'cookie: datr= xxxx'"},
"subtitles": {"default": True, "help": "download subtitles if available"},
"comments": {"default": False, "help": "download all comments if available, may lead to large metadata"},
"livestreams": {"default": False, "help": "if set, will download live streams, otherwise will skip them; see --max-filesize for more control"},
"live_from_start": {"default": False, "help": "if set, will download live streams from their earliest available moment, otherwise starts now."},
"proxy": {"default": "", "help": "http/socks (https seems to not work atm) proxy to use for the webdriver, eg https://proxy-user:password@proxy-ip:port"},
"end_means_success": {"default": True, "help": "if True, any archived content will mean a 'success', if False this archiver will not return a 'success' stage; this is useful for cases when the yt-dlp will archive a video but ignore other types of content like images or text only pages that the subsequent archivers can retrieve."},
'allow_playlist': {"default": False, "help": "If True will also download playlists, set to False if the expectation is to download a single video."},
"max_downloads": {"default": "inf", "help": "Use to limit the number of videos to download when a channel or long page is being extracted. 'inf' means no limit."},
"cookies_from_browser": {"default": None, "help": "optional browser for ytdl to extract cookies from, can be one of: brave, chrome, chromium, edge, firefox, opera, safari, vivaldi, whale"},
"cookie_file": {"default": None, "help": "optional cookie file to use for Youtube, see instructions here on how to export from your browser: https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp"},
}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
# Handle Facebook cookies if enabled
if item.netloc in ['facebook.com', 'www.facebook.com'] and self.facebook_cookie:
logger.debug('Using Facebook cookie')
yt_dlp.utils.std_headers['cookie'] = self.facebook_cookie
ydl_options = {'outtmpl': os.path.join(ArchivingContext.get_tmp_dir(), f'%(id)s.%(ext)s'), 'quiet': False, 'noplaylist': not self.allow_playlist , 'writesubtitles': self.subtitles, 'writeautomaticsub': self.subtitles, "live_from_start": self.live_from_start, "proxy": self.proxy, "max_downloads": self.max_downloads, "playlistend": self.max_downloads}
if item.netloc in ['youtube.com', 'www.youtube.com']:
if self.cookies_from_browser:
logger.debug(f'Extracting cookies from browser {self.cookies_from_browser} for Youtube')
ydl_options['cookiesfrombrowser'] = (self.cookies_from_browser,)
elif self.cookie_file:
logger.debug(f'Using cookies from file {self.cookie_file}')
ydl_options['cookiefile'] = self.cookie_file
ydl = yt_dlp.YoutubeDL(ydl_options) # allsubtitles and subtitleslangs not working as expected, so default lang is always "en"
try:
# don't download since it can be a live stream
info = ydl.extract_info(url, download=False)
if info.get('is_live', False) and not self.livestreams:
logger.warning("Livestream detected, skipping due to 'livestreams' configuration setting")
return False
except yt_dlp.utils.DownloadError as e:
logger.debug(f'No video - Youtube normal control flow: {e}')
return False
except Exception as e:
logger.debug(f'ytdlp exception which is normal for example a facebook page with images only will cause a IndexError: list index out of range. Exception is: \n {e}')
return False
# This time download the content
ydl = yt_dlp.YoutubeDL({**ydl_options, "getcomments": self.comments})
#TODO: for playlist or long lists of videos, how to download one at a time so they can be stored before the next one is downloaded?
info = ydl.extract_info(url, download=True)
# Process entries (e.g., for playlists)
if "entries" in info:
entries = info.get("entries", [])
if not len(entries):
logger.warning('YoutubeDLArchiver could not find any video')
return False
else: entries = [info]
# Prepare enriched metadata
result = Metadata()
result.set_title(info.get("title"))
if "description" in info: result.set_content(info["description"])
# Process individual entries
for entry in entries:
try:
filename = ydl.prepare_filename(entry)
if not os.path.exists(filename):
filename = filename.split('.')[0] + '.mkv'
new_media = Media(filename)
for x in ["duration", "original_url", "fulltitle", "description", "upload_date"]:
if x in entry: new_media.set(x, entry[x])
# read text from subtitles if enabled
if self.subtitles:
for lang, val in (info.get('requested_subtitles') or {}).items():
try:
subs = pysubs2.load(val.get('filepath'), encoding="utf-8")
text = " ".join([line.text for line in subs])
new_media.set(f"subtitles_{lang}", text)
except Exception as e:
logger.error(f"Error loading subtitle file {val.get('filepath')}: {e}")
result.add_media(new_media)
except Exception as e:
logger.error(f"Error processing entry {entry}: {e}")
# extract comments if enabled
if self.comments:
result.set("comments", [{
"text": c["text"],
"author": c["author"],
"timestamp": datetime.datetime.fromtimestamp(c.get("timestamp"), tz = datetime.timezone.utc)
} for c in info.get("comments", [])])
# Set additional metadata
if (timestamp := info.get("timestamp")):
#TODO: fix deprecated timestamp,
timestamp = datetime.datetime.fromtimestamp(timestamp, tz = datetime.timezone.utc).isoformat()
result.set_timestamp(timestamp)
if (upload_date := info.get("upload_date")):
upload_date = datetime.datetime.strptime(upload_date, '%Y%m%d').replace(tzinfo=datetime.timezone.utc)
result.set("upload_date", upload_date)
# Update status for success
if self.end_means_success: result.success("yt-dlp")
else: result.status = "yt-dlp"
return result
# temporary hack, as we implement module
from .generic_archiver.generic_archiver import GenericArchiver as YoutubeDLArchiver

View File

@@ -5,7 +5,9 @@ flexible setup in various environments.
"""
import argparse, yaml
import importlib
import argparse
import yaml
from dataclasses import dataclass, field
from typing import List
from collections import defaultdict
@@ -59,6 +61,7 @@ class Config:
)
parser.add_argument('--config', action='store', dest='config', help='the filename of the YAML configuration file (defaults to \'config.yaml\')', default='orchestration.yaml')
parser.add_argument('--version', action='version', version=importlib.metadata.version('auto_archiver'))
# Iterate over all step subclasses to gather default configs and CLI arguments
for configurable in self.configurable_parents:

View File

@@ -0,0 +1,41 @@
from loguru import logger
import csv
from . import Feeder
from ..core import Metadata, ArchivingContext
from ..utils import url_or_none
class CSVFeeder(Feeder):
@staticmethod
def configs() -> dict:
return {
"files": {
"default": None,
"help": "Path to the input file(s) to read the URLs from, comma separated. \
Input files should be formatted with one URL per line",
"cli_set": lambda cli_val, cur_val: list(set(cli_val.split(",")))
},
"column": {
"default": None,
"help": "Column number or name to read the URLs from, 0-indexed",
}
}
def __iter__(self) -> Metadata:
url_column = self.column or 0
for file in self.files:
with open(file, "r") as f:
reader = csv.reader(f)
first_row = next(reader)
if not(url_or_none(first_row[url_column])):
# it's a header row, skip it
logger.debug(f"Skipping header row: {first_row}")
for row in reader:
url = row[0]
logger.debug(f"Processing {url}")
yield Metadata().set_url(url)
ArchivingContext.set("folder", "cli")
logger.success(f"Processed {len(self.urls)} URL(s)")

View File

@@ -5,4 +5,7 @@ from .misc import *
from .webdriver import Webdriver
from .gsheet import Gsheets
from .url import UrlUtil
from .atlos import get_atlos_config_options
from .atlos import get_atlos_config_options
# handy utils from ytdlp
from yt_dlp.utils import (clean_html, traverse_obj, strip_or_none, url_or_none)

View File

@@ -3,17 +3,17 @@ import pytest
from auto_archiver.core import Metadata
from auto_archiver.core import Step
from auto_archiver.core.metadata import Metadata
from auto_archiver.archivers.archiver import Archiver
class TestArchiverBase(object):
archiver_class = None
config = None
archiver_class: str = None
config: dict = None
@pytest.fixture(autouse=True)
def setup_archiver(self):
assert self.archiver_class is not None, "self.archiver_class must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
self.archiver = self.archiver_class(self.config)
self.archiver: Archiver = self.archiver_class({self.archiver_class.name: self.config})
def assertValidResponseMetadata(self, test_response: Metadata, title: str, timestamp: str, status: str = ""):
assert test_response is not False

View File

@@ -1,73 +0,0 @@
import pytest
from auto_archiver.archivers.bluesky_archiver import BlueskyArchiver
from .test_archiver_base import TestArchiverBase
class TestBlueskyArchiver(TestArchiverBase):
"""Tests Bluesky Archiver
Note that these tests will download API responses from the bluesky API, so they may be slow.
This is an intended feature, as we want to test to ensure the bluesky API format hasn't changed,
and also test the archiver's ability to download media.
"""
archiver_class = BlueskyArchiver
config = {}
@pytest.mark.download
def test_download_media_with_images(self):
# url https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y
post = self.archiver._get_post_from_uri("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y")
# just make sure bsky haven't changed their format, images should be under "record/embed/media/images"
# there should be 2 images
assert "record" in post
assert "embed" in post["record"]
assert "media" in post["record"]["embed"]
assert "images" in post["record"]["embed"]["media"]
assert len(post["record"]["embed"]["media"]["images"]) == 2
# try downloading the media files
media = self.archiver._download_bsky_embeds(post)
assert len(media) == 2
# check the IDs
assert "bafkreiflrkfihcvwlhka5tb2opw2qog6gfvywsdzdlibveys2acozh75tq" in media[0].get('src')
assert "bafkreibsprmwchf7r6xcstqkdvvuj3ijw7efciw7l3y4crxr4cmynseo7u" in media[1].get('src')
@pytest.mark.download
def test_download_post_with_single_image(self):
# url https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l
post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l")
# just make sure bsky haven't changed their format, images should be under "record/embed/images"
# there should be 1 image
assert "record" in post
assert "embed" in post["record"]
assert "images" in post["record"]["embed"]
assert len(post["record"]["embed"]["images"]) == 1
media = self.archiver._download_bsky_embeds(post)
assert len(media) == 1
# check the ID
assert "bafkreihljdtomy4yulx4nfxuqdatlgvdg45vxdmjzzhclsd4ludk7zfma4" in media[0].get('src')
@pytest.mark.download
def test_download_post_with_video(self):
# url https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i
post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i")
# just make sure bsky haven't changed their format, video should be under "record/embed/video"
assert "record" in post
assert "embed" in post["record"]
assert "video" in post["record"]["embed"]
media = self.archiver._download_bsky_embeds(post)
assert len(media) == 1
# check the ID
assert "bafkreiaiskn2nt5cxjnxbgcqqcrnurvkr2ni3unekn6zvhvgr5nrqg6u2q" in media[0].get('src')

View File

@@ -0,0 +1,203 @@
from pathlib import Path
import datetime
import os
from os.path import dirname
import pytest
from auto_archiver.archivers.generic_archiver import GenericArchiver
from .test_archiver_base import TestArchiverBase
class TestGenericArchiver(TestArchiverBase):
"""Tests Base Archiver
"""
archiver_class = GenericArchiver
config = {
'subtitles': False,
'comments': False,
'livestreams': False,
'live_from_start': False,
'end_means_success': True,
'allow_playlist': False,
'max_downloads': "inf",
'proxy': None,
'cookies_from_browser': False,
'cookie_file': None,
}
def test_load_dropin(self):
# test loading dropins that are in the generic_archiver package
package = "auto_archiver.archivers.generic_archiver"
assert self.archiver.dropin_for_name("bluesky", package=package)
# test loading dropings via filepath
path = os.path.join(dirname(dirname(__file__)), "data/")
assert self.archiver.dropin_for_name("dropin", additional_paths=[path])
@pytest.mark.parametrize("url, is_suitable", [
("https://www.youtube.com/watch?v=5qap5aO4i9A", True),
("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en", True),
("https://www.instagram.com/p/CU1J9JYJ9Zz/", True),
("https://www.facebook.com/nytimes/videos/10160796550110716", True),
("https://www.twitch.tv/videos/1167226570", True),
("https://bellingcat.com/news/2021/10/08/ukrainian-soldiers-are-being-killed-by-landmines-in-the-donbas/", True),
("https://google.com", True)])
def test_suitable_urls(self, make_item, url, is_suitable):
"""
Note: expected behaviour is to return True for all URLs, as YoutubeDLArchiver should be able to handle all URLs
This behaviour may be changed in the future (e.g. if we want the youtubedl archiver to just handle URLs it has extractors for,
and then if and only if all archivers fails, does it fall back to the generic archiver)
"""
assert self.archiver.suitable(url) == is_suitable
@pytest.mark.download
def test_download_tiktok(self, make_item):
item = make_item("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970")
result = self.archiver.download(item)
assert result.get_url() == "https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970"
@pytest.mark.download
@pytest.mark.parametrize("url", [
"https://bsky.app/profile/colborne.bsky.social/post/3lcxcpgt6j42l",
"twitter.com/bellingcat/status/123",
"https://www.youtube.com/watch?v=1"
])
def test_download_nonexistend_media(self, make_item, url):
"""
Test to make sure that the extractor doesn't break on non-existend posts/media
It should return 'False'
"""
item = make_item(url)
result = self.archiver.download(item)
assert not result
@pytest.mark.download
def test_youtube_download(self, make_item):
# url https://www.youtube.com/watch?v=5qap5aO4i9A
item = make_item("https://www.youtube.com/watch?v=J---aiyznGQ")
result = self.archiver.download(item)
assert result.get_url() == "https://www.youtube.com/watch?v=J---aiyznGQ"
assert result.get_title() == "Keyboard Cat! - THE ORIGINAL!"
assert result.get('description') == "Buy NEW Keyboard Cat Merch! https://keyboardcat.creator-spring.com\n\nxo Keyboard Cat memes make your day better!\nhttp://www.keyboardcatstore.com/\nhttps://www.facebook.com/thekeyboardcat\nhttp://www.charlieschmidt.com/"
assert len(result.media) == 2
assert Path(result.media[0].filename).name == "J---aiyznGQ.webm"
assert Path(result.media[1].filename).name == "hqdefault.jpg"
@pytest.mark.download
def test_bluesky_download_multiple_images(self, make_item):
item = make_item("https://bsky.app/profile/bellingcat.com/post/3lffjoxcu7k2w")
result = self.archiver.download(item)
assert result is not False
@pytest.mark.download
def test_bluesky_download_single_image(self, make_item):
item = make_item("https://bsky.app/profile/bellingcat.com/post/3lfn3hbcxgc2q")
result = self.archiver.download(item)
assert result is not False
@pytest.mark.download
def test_bluesky_download_no_media(self, make_item):
item = make_item("https://bsky.app/profile/bellingcat.com/post/3lfphwmcs4c2z")
result = self.archiver.download(item)
assert result is not False
@pytest.mark.download
def test_bluesky_download_video(self, make_item):
item = make_item("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i")
result = self.archiver.download(item)
assert result is not False
@pytest.mark.download
def test_truthsocial_download_video(self, make_item):
item = make_item("https://truthsocial.com/@DaynaTrueman/posts/110602446619561579")
result = self.archiver.download(item)
assert len(result.media) == 1
assert result is not False
@pytest.mark.download
def test_truthsocial_download_no_media(self, make_item):
item = make_item("https://truthsocial.com/@bbcnewa/posts/109598702184774628")
result = self.archiver.download(item)
assert result is not False
@pytest.mark.download
def test_truthsocial_download_poll(self, make_item):
item = make_item("https://truthsocial.com/@CNN_US/posts/113724326568555098")
result = self.archiver.download(item)
assert result is not False
@pytest.mark.download
def test_truthsocial_download_single_image(self, make_item):
item = make_item("https://truthsocial.com/@mariabartiromo/posts/113861116433335006")
result = self.archiver.download(item)
assert len(result.media) == 1
assert result is not False
@pytest.mark.download
def test_truthsocial_download_multiple_images(self, make_item):
item = make_item("https://truthsocial.com/@trrth/posts/113861302149349135")
result = self.archiver.download(item)
assert len(result.media) == 3
@pytest.mark.download
def test_twitter_download_nonexistend_tweet(self, make_item):
# this tweet does not exist
url = "https://x.com/Bellingcat/status/17197025860711058"
response = self.archiver.download(make_item(url))
assert not response
@pytest.mark.download
def test_twitter_download_malformed_tweetid(self, make_item):
# this tweet does not exist
url = "https://x.com/Bellingcat/status/1719702a586071100058"
response = self.archiver.download(make_item(url))
assert not response
@pytest.mark.download
def test_twitter_download_tweet_no_media(self, make_item):
item = make_item("https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w")
post = self.archiver.download(item)
self.assertValidResponseMetadata(
post,
"Onion rings are just vegetable donuts.",
datetime.datetime(2023, 1, 24, 16, 25, 51, tzinfo=datetime.timezone.utc),
"yt-dlp_Twitter: success"
)
@pytest.mark.download
def test_twitter_download_video(self, make_item):
url = "https://x.com/bellingcat/status/1871552600346415571"
post = self.archiver.download(make_item(url))
self.assertValidResponseMetadata(
post,
"Bellingcat - This month's Bellingchat Premium is with @KolinaKoltai. She reveals how she investigated a platform allowing users to create AI-generated child sexual abuse material and explains why it's crucial to investigate the people behind these services",
datetime.datetime(2024, 12, 24, 13, 44, 46, tzinfo=datetime.timezone.utc)
)
@pytest.mark.xfail(reason="Currently failing, sensitive content requires logged in users/cookies - not yet implemented")
@pytest.mark.download
@pytest.mark.parametrize("url, title, timestamp, image_hash", [
("https://x.com/SozinhoRamalho/status/1876710769913450647", "ignore tweet, testing sensitivity warning nudity", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
("https://x.com/SozinhoRamalho/status/1876710875475681357", "ignore tweet, testing sensitivity warning violence", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
("https://x.com/SozinhoRamalho/status/1876711053813227618", "ignore tweet, testing sensitivity warning sensitive", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
("https://x.com/SozinhoRamalho/status/1876711141314801937", "ignore tweet, testing sensitivity warning nudity, violence, sensitivity", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
])
def test_twitter_download_sensitive_media(self, url, title, timestamp, image_hash, make_item):
"""Download tweets with sensitive media"""
post = self.archiver.download(make_item(url))
self.assertValidResponseMetadata(
post,
title,
timestamp
)
assert len(post.media) == 1
assert post.media[0].hash == image_hash

View File

@@ -1,17 +0,0 @@
import pytest
from .test_archiver_base import TestArchiverBase
from auto_archiver.archivers.tiktok_archiver import TiktokArchiver
class TestBlueskyArchiver(TestArchiverBase):
archiver_class = TiktokArchiver
config = {}
@pytest.mark.xfail(reason="Tiktok API is not working")
@pytest.mark.download
def test_download_video(self, make_item):
# cat video
url = "https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en"
item = self.archiver.download(make_item(url))
assert item.success

View File

@@ -1,19 +1,31 @@
import os
import datetime
import pytest
from auto_archiver.archivers.twitter_archiver import TwitterArchiver
from pytwitter.models.media import MediaVariant
from .test_archiver_base import TestArchiverBase
from auto_archiver.archivers import TwitterApiArchiver
class TestTwitterArchiver(TestArchiverBase):
archiver_class = TwitterArchiver
config = {}
@pytest.mark.incremental
class TestTwitterApiArchiver(TestArchiverBase):
archiver_class = TwitterApiArchiver
config = {
"bearer_tokens": [],
"bearer_token": os.environ.get("TWITTER_BEARER_TOKEN", "TEST_KEY"),
"consumer_key": os.environ.get("TWITTER_CONSUMER_KEY"),
"consumer_secret": os.environ.get("TWITTER_CONSUMER_SECRET"),
"access_token": os.environ.get("TWITTER_ACCESS_TOKEN"),
"access_secret": os.environ.get("TWITTER_ACCESS_SECRET"),
}
@pytest.mark.parametrize("url, expected", [
("https://t.co/yl3oOJatFp", "https://www.bellingcat.com/category/resources/"), # t.co URL
("https://x.com/bellingcat/status/1874097816571961839", "https://x.com/bellingcat/status/1874097816571961839"), # x.com urls unchanged
("https://twitter.com/bellingcat/status/1874097816571961839", "https://twitter.com/bellingcat/status/1874097816571961839"), # twitter urls unchanged
("https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w", "https://twitter.com/bellingcat/status/1874097816571961839"), # strip tracking params
("https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w", "https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"), # don't strip params from twitter urls (changed Jan 2025)
("https://www.bellingcat.com/category/resources/", "https://www.bellingcat.com/category/resources/"), # non-twitter/x urls unchanged
("https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w", "https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"), # shouldn't strip params from non-twitter/x URLs
])
@@ -25,69 +37,31 @@ class TestTwitterArchiver(TestArchiverBase):
("https://x.com/bellingcat/status/1874097816571961839", "bellingcat", "1874097816571961839"),
("https://www.bellingcat.com/category/resources/", False, False)
])
def test_get_username_tweet_id_from_url(self, url, exptected_username, exptected_tweetid):
username, tweet_id = self.archiver.get_username_tweet_id(url)
assert exptected_username == username
assert exptected_tweetid == tweet_id
def test_choose_variants(self):
# taken from the response for url https://x.com/bellingcat/status/1871552600346415571
variant_list = [{'content_type': 'application/x-mpegURL', 'url': 'https://video.twimg.com/ext_tw_video/1871551993677852672/pu/pl/ovWo7ux-bKROwYIC.m3u8?tag=12&v=e1b'},
{'bitrate': 256000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/480x270/OqZIrKV0LFswMvxS.mp4?tag=12'},
{'bitrate': 832000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/640x360/uiDZDSmZ8MZn9hsi.mp4?tag=12'},
{'bitrate': 2176000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/1280x720/6Y340Esh568WZnRZ.mp4?tag=12'}
variant_list = [MediaVariant(content_type='application/x-mpegURL', url='https://video.twimg.com/ext_tw_video/1871551993677852672/pu/pl/ovWo7ux-bKROwYIC.m3u8?tag=12&v=e1b'),
MediaVariant(bit_rate=256000, content_type='video/mp4', url='https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/480x270/OqZIrKV0LFswMvxS.mp4?tag=12'),
MediaVariant(bit_rate=832000, content_type='video/mp4', url='https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/640x360/uiDZDSmZ8MZn9hsi.mp4?tag=12'),
MediaVariant(bit_rate=2176000, content_type='video/mp4', url='https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/1280x720/6Y340Esh568WZnRZ.mp4?tag=12')
]
chosen_variant = self.archiver.choose_variant(variant_list)
assert chosen_variant == variant_list[3]
@pytest.mark.parametrize("tweet_id, expected_token", [
("1874097816571961839", "4jjngwkifa"),
("1674700676612386816", "42586mwa3uv"),
("1877747914073620506", "4jv4aahw36n"),
("1876710769913450647", "4jruzjz5lux"),
("1346554693649113090", "39ibqxei7mo")
])
def test_reverse_engineer_token(self, tweet_id, expected_token):
# see Vercel's implementation here: https://github.com/vercel/react-tweet/blob/main/packages/react-tweet/src/api/fetch-tweet.ts#L27C1-L31C2
# and the discussion here: https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-2211358215
generated_token = self.archiver.generate_token(tweet_id)
assert expected_token == generated_token
@pytest.mark.skipif(not os.environ.get("TWITTER_BEARER_TOKEN"), reason="No Twitter bearer token provided")
@pytest.mark.download
def test_youtube_dlp_archiver(self, make_item):
url = "https://x.com/bellingcat/status/1874097816571961839"
post = self.archiver.download_yt_dlp(make_item(url), url, "1874097816571961839")
assert post
self.assertValidResponseMetadata(
post,
"As 2024 comes to a close, heres some examples of what Bellingcat investigated per month in our 10th year! 🧵",
datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc),
"twitter-ytdl"
)
@pytest.mark.download
def test_syndication_archiver(self, make_item):
url = "https://x.com/bellingcat/status/1874097816571961839"
post = self.archiver.download_syndication(make_item(url), url, "1874097816571961839")
assert post
self.assertValidResponseMetadata(
post,
"As 2024 comes to a close, heres some examples of what Bellingcat investigated per month in our 10th year! 🧵",
datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc)
)
@pytest.mark.download
def test_download_nonexistend_tweet(self, make_item):
def test_download_nonexistent_tweet(self, make_item):
# this tweet does not exist
url = "https://x.com/Bellingcat/status/17197025860711058"
response = self.archiver.download(make_item(url))
assert not response
@pytest.mark.skipif(not os.environ.get("TWITTER_BEARER_TOKEN"), reason="No Twitter bearer token provided")
@pytest.mark.download
def test_download_malformed_tweetid(self, make_item):
# this tweet does not exist
@@ -95,6 +69,7 @@ class TestTwitterArchiver(TestArchiverBase):
response = self.archiver.download(make_item(url))
assert not response
@pytest.mark.skipif(not os.environ.get("TWITTER_BEARER_TOKEN"), reason="No Twitter bearer token provided")
@pytest.mark.download
def test_download_tweet_no_media(self, make_item):
@@ -105,9 +80,10 @@ class TestTwitterArchiver(TestArchiverBase):
post,
"Onion rings are just vegetable donuts.",
datetime.datetime(2023, 1, 24, 16, 25, 51, tzinfo=datetime.timezone.utc),
"twitter-ytdl"
"twitter-api: success"
)
@pytest.mark.skipif(not os.environ.get("TWITTER_BEARER_TOKEN"), reason="No Twitter bearer token provided")
@pytest.mark.download
def test_download_video(self, make_item):
url = "https://x.com/bellingcat/status/1871552600346415571"
@@ -118,15 +94,15 @@ class TestTwitterArchiver(TestArchiverBase):
datetime.datetime(2024, 12, 24, 13, 44, 46, tzinfo=datetime.timezone.utc)
)
@pytest.mark.xfail(reason="Currently failing, sensitive content requires logged in users/cookies - not yet implemented")
@pytest.mark.download
@pytest.mark.parametrize("url, title, timestamp, image_hash", [
("https://x.com/SozinhoRamalho/status/1876710769913450647", "ignore tweet, testing sensitivity warning nudity", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
("https://x.com/SozinhoRamalho/status/1876710875475681357", "ignore tweet, testing sensitivity warning violence", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
("https://x.com/SozinhoRamalho/status/1876711053813227618", "ignore tweet, testing sensitivity warning sensitive", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
("https://x.com/SozinhoRamalho/status/1876711141314801937", "ignore tweet, testing sensitivity warning nudity, violence, sensitivity", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"),
@pytest.mark.skipif(not os.environ.get("TWITTER_BEARER_TOKEN"), reason="No Twitter bearer token provided")
@pytest.mark.parametrize("url, title, timestamp, image_src", [
("https://x.com/SozinhoRamalho/status/1876710769913450647", "ignore tweet, testing sensitivity warning nudity https://t.co/t3u0hQsSB1", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "https://pbs.twimg.com/media/GgtqkomWkAAHUUl.jpg"),
("https://x.com/SozinhoRamalho/status/1876710875475681357", "ignore tweet, testing sensitivity warning violence https://t.co/syYDSkpjZD", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "https://pbs.twimg.com/media/GgtqkomWkAAHUUl.jpg"),
("https://x.com/SozinhoRamalho/status/1876711053813227618", "ignore tweet, testing sensitivity warning sensitive https://t.co/XE7cRdjzYq", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "https://pbs.twimg.com/media/GgtqkomWkAAHUUl.jpg"),
("https://x.com/SozinhoRamalho/status/1876711141314801937", "ignore tweet, testing sensitivity warning nudity, violence, sensitivity https://t.co/YxCFbbhYE3", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "https://pbs.twimg.com/media/GgtqkomWkAAHUUl.jpg"),
])
def test_download_sensitive_media(self, url, title, timestamp, image_hash, make_item):
@pytest.mark.download
def test_download_sensitive_media(self, url, title, timestamp, image_src, make_item):
"""Download tweets with sensitive media"""
@@ -137,4 +113,4 @@ class TestTwitterArchiver(TestArchiverBase):
timestamp
)
assert len(post.media) == 1
assert post.media[0].hash == image_hash
assert post.media[0].get('src') == image_src

View File

@@ -1,6 +1,19 @@
"""
pytest conftest file, for shared fixtures and configuration
"""
from typing import Dict, Tuple
import pytest
from auto_archiver.core.metadata import Metadata
# Test names inserted into this list will be run last. This is useful for expensive/costly tests
# that you only want to run if everything else succeeds (e.g. API calls). The order here is important
# what comes first will be run first (at the end of all other tests not mentioned)
# format is the name of the module (python file) without the .py extension
TESTS_TO_RUN_LAST = ['test_twitter_api_archiver']
@pytest.fixture
def make_item():
def _make_item(url: str, **kwargs) -> Metadata:
@@ -9,4 +22,61 @@ def make_item():
item.set(key, value)
return item
return _make_item
return _make_item
def pytest_collection_modifyitems(items):
module_mapping = {item: item.module.__name__.split(".")[-1] for item in items}
sorted_items = items.copy()
# Iteratively move tests of each module to the end of the test queue
for module in TESTS_TO_RUN_LAST:
if module in module_mapping.values():
for item in sorted_items:
if module_mapping[item] == module:
sorted_items.remove(item)
sorted_items.append(item)
items[:] = sorted_items
# Incremental testing - fail tests in a class if any previous test fails
# taken from https://docs.pytest.org/en/latest/example/simple.html#incremental-testing-test-steps
# store history of failures per test class name and per index in parametrize (if parametrize used)
_test_failed_incremental: Dict[str, Dict[Tuple[int, ...], str]] = {}
def pytest_runtest_makereport(item, call):
if "incremental" in item.keywords:
# incremental marker is used
if call.excinfo is not None:
# the test has failed
# retrieve the class name of the test
cls_name = str(item.cls)
# retrieve the index of the test (if parametrize is used in combination with incremental)
parametrize_index = (
tuple(item.callspec.indices.values())
if hasattr(item, "callspec")
else ()
)
# retrieve the name of the test function
test_name = item.originalname or item.name
# store in _test_failed_incremental the original name of the failed test
_test_failed_incremental.setdefault(cls_name, {}).setdefault(
parametrize_index, test_name
)
def pytest_runtest_setup(item):
if "incremental" in item.keywords:
# retrieve the class name of the test
cls_name = str(item.cls)
# check if a previous test has failed for this class
if cls_name in _test_failed_incremental:
# retrieve the name of the first test function to fail for this class name and index
test_name = _test_failed_incremental[cls_name].get((), None)
# if name found, test has failed for the combination of class name & test name
if test_name is not None:
pytest.xfail(f"previous test failed ({test_name})")

5
tests/data/dropin.py Normal file
View File

@@ -0,0 +1,5 @@
# this is a dummy class used to test importing a dropin in the
# generic extractor by filename/path
class Dropin:
pass