Merge branch 'dev' into specify-medatada-feature

This commit is contained in:
Miguel Sozinho Ramalho
2026-01-08 14:04:42 +00:00
committed by GitHub
12 changed files with 1480 additions and 1301 deletions

View File

@@ -50,7 +50,7 @@ Note not all warnings can be fixed automatically.
Most fixes are safe, but some non-standard practices such as dynamic loading are not picked up by linters. Ensure you check any modifications by this before committing them.
```shell
make ruff-fix
make ruff-clean
```
**Changing Configurations ⚙️**
@@ -67,4 +67,4 @@ One example is to extend the selected rules for linting the `pyproject.toml` fil
extend-select = ["B"]
```
Then re-run the `make ruff-check` command to see the new rules in action.
Then re-run the `make ruff-check` command to see the new rules in action.

2537
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[project]
name = "auto-archiver"
version = "1.1.2"
version = "1.1.6"
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
requires-python = ">=3.10,<3.13"
@@ -50,8 +50,8 @@ dependencies = [
"retrying (>=0.0.0)",
"rich-argparse (>=1.6.0,<2.0.0)",
"ruamel-yaml (>=0.18.10,<0.19.0)",
"rfc3161-client (==1.0.3)",
"cryptography (>44.0.1,<45.0.0)",
"rfc3161-client (>=1.0.5)",
"cryptography (>=46.0.3)",
"opentimestamps (>=0.4.5,<0.5.0)",
"bgutil-ytdlp-pot-provider (>=1.0.0)",
"yt-dlp[curl-cffi,default] (>=2025.5.22,<2026.0.0)",

View File

@@ -97,7 +97,10 @@ class AntibotExtractorEnricher(Extractor, Enricher):
sb.uc_gui_click_rc() # NB: using handle instead of click breaks some sites like reddit, for now we separate here but can have dropins deciding this in the future
dropin = self._get_suitable_dropin(url, sb)
dropin.open_page(url)
if not dropin.open_page(url):
# TODO: could we detect deleted videos?
logger.warning("Failed to open drop-in page")
return False
if self.detect_auth_wall and (dropin.hit_auth_wall() and self._hit_auth_wall(sb)):
logger.warning("Skipping since auth wall or CAPTCHA was detected")

View File

@@ -0,0 +1 @@
*.py

View File

@@ -1,17 +1,20 @@
from contextlib import suppress
from typing import Mapping
from auto_archiver.utils.custom_logger import logger
from auto_archiver.modules.antibot_extractor_enricher.dropin import Dropin
class TikTokDropin(Dropin):
"""
A class to handle TikTok drop-in functionality for the antibot extractor enricher module.
"""
def documentation() -> Mapping[str, str]:
return {
"name": "TikTok Dropin",
"description": "Handles TikTok posts and works without authentication.",
"description": "Handles TikTok posts and works without authentication.\nNOTE: This dropin is highly susceptible to TikTok's bot detection mechanisms and may not work reliably if you reuse the same IP. The GenericExtractor is recommended for TikTok posts, as it handles video/image download more reliable. In the future we plan to implement better anti captcha measures for this dropin.",
"site": "tiktok.com",
}
@@ -33,6 +36,9 @@ class TikTokDropin(Dropin):
# TODO: implement login logic
if url != self.sb.get_current_url():
return False
if self.sb.is_text_visible("Video currently unavailable"):
logger.debug("Video may have been removed or is private.")
return False
return True
def hit_auth_wall(self) -> bool:

View File

@@ -4,6 +4,7 @@ import datetime
import os
import importlib
import subprocess
import traceback
import zipfile
from typing import Generator, Type
@@ -305,9 +306,9 @@ class GenericExtractor(Extractor):
result.set_url(url)
if "description" in video_data and not result.get("content"):
result.set_content(video_data.get("description"))
result.set_content(video_data.pop("description"))
# extract comments if enabled
if self.comments and video_data.get("comments", []) is not None:
if self.comments and video_data.get("comments", None) is not None:
result.set(
"comments",
[
@@ -406,9 +407,9 @@ class GenericExtractor(Extractor):
logger.error(f"Error loading subtitle file {val.get('filepath')}: {e}")
result.add_media(new_media)
except Exception as e:
logger.error(f"Error processing entry {entry}: {e}")
logger.error(f"Error processing entry {str(entry)[:256]}: {e} {traceback.format_exc()}")
if not len(result.media):
logger.info(f"No media found for entry {entry}, skipping.")
logger.info(f"No media found for entry {str(entry)[:256]}, skipping.")
return False
return self.add_metadata(data, info_extractor, url, result)
@@ -516,7 +517,7 @@ class GenericExtractor(Extractor):
)
return False
if result:
if result and not result.is_success():
extractor_name = "yt-dlp"
if info_extractor:
extractor_name += f"_{info_extractor.ie_key()}"
@@ -535,7 +536,6 @@ class GenericExtractor(Extractor):
if url.startswith("https://ya.ru"):
url = url.replace("https://ya.ru", "https://yandex.ru")
item.set("replaced_url", url)
logger.debug(f"{skip_proxy=}, {self.proxy_on_failure_only=}, {self.proxy=}")
# proxy_on_failure_only logic
if self.proxy and self.proxy_on_failure_only and not skip_proxy:
@@ -605,9 +605,9 @@ class GenericExtractor(Extractor):
validated_options
) # allsubtitles and subtitleslangs not working as expected, so default lang is always "en"
result: Metadata = None
for info_extractor in self.suitable_extractors(url):
result = self.download_for_extractor(info_extractor, url, ydl)
if result:
return result
return False
local_result: Metadata = self.download_for_extractor(info_extractor, url, ydl)
if local_result:
result = result.merge(local_result) if result else local_result
return result if result else False

View File

@@ -1,3 +1,4 @@
import re
import requests
from auto_archiver.utils.custom_logger import logger
@@ -14,12 +15,16 @@ class Tiktok(GenericDropin):
It's useful for capturing content that requires a login, like sensitive content.
"""
# Regex pattern to match TikTok photo post URLs
PHOTO_URL_REGEX = r"https?://(?:www\.)?tiktok\.com/@[\w\.-]+/photo/\d+"
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
def suitable(self, url, info_extractor) -> bool:
"""This dropin (which uses Tikvm) is suitable for *all* Tiktok type URLs - videos, lives, VMs, and users.
Return the 'suitable' method from the TikTokIE class."""
return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE))
return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE)) or (
re.match(self.PHOTO_URL_REGEX, url) is not None
)
def extract_post(self, url: str, ie_instance):
logger.debug("Using Tikwm API to attempt to download tiktok video")
@@ -28,56 +33,91 @@ class Tiktok(GenericDropin):
r = requests.get(endpoint)
if r.status_code != 200:
raise ValueError(f"unexpected status code '{r.status_code}' from tikwm.com for {url=}:")
raise ValueError(f"Unexpected status code '{r.status_code}' from tikwm.com")
try:
json_response = r.json()
except ValueError:
raise ValueError(f"failed to parse JSON response from tikwm.com for {url=}")
raise ValueError("Failed to parse JSON response from tikwm.com")
if not json_response.get("msg") == "success" or not (api_data := json_response.get("data", {})):
raise ValueError(f"failed to get a valid response from tikwm.com for {url=}: {repr(json_response)}")
raise ValueError(f"Unable to download with tikwm.com: {repr(json_response)}")
# tries to get the non-watermarked version first
video_url = api_data.pop("play", api_data.pop("wmplay", None))
if not video_url:
raise ValueError(f"no valid video URL found in response from tikwm.com for {url=}")
api_data["video_url"] = video_url
play_url = api_data.pop("play", api_data.pop("wmplay", None))
if play_url and "mime_type=audio" in play_url:
play_url = None
if play_url:
api_data["video_url"] = play_url
return api_data
def keys_to_clean(self, video_data: dict, info_extractor):
return ["video_url", "title", "create_time", "author", "cover", "origin_cover", "ai_dynamic_cover", "duration"]
return [
"video_url",
"title",
"create_time",
"author",
"cover",
"origin_cover",
"ai_dynamic_cover",
"duration",
"size",
"wm_size",
"music",
"music_info",
"play_count",
"digg_count",
"comment_count",
"share_count",
"download_count",
"collect_count",
"anchors",
"anchors_extras",
"is_ad",
"commerce_info",
"commercial_video_info",
"item_comment_settings",
"mentioned_users",
] # all of these will be added via api_data in a single metadata field vs individual ones in the generic extractor
def create_metadata(self, post: dict, ie_instance, archiver, url):
# prepare result, start by downloading video
result = Metadata()
video_url = post.pop("video_url")
is_success = False
# get the cover if possible
cover_url = post.pop("origin_cover", post.pop("cover", post.pop("ai_dynamic_cover", None)))
if cover_url and (cover_downloaded := archiver.download_from_url(cover_url)):
result.add_media(Media(cover_downloaded))
# get the video or fail
video_downloaded = archiver.download_from_url(video_url, f"vid_{post.get('id', '')}")
if not video_downloaded:
logger.error("Failed to download video")
return False
video_media = Media(video_downloaded)
if duration := post.get("duration", None):
video_media.set("duration", duration)
result.add_media(video_media)
for image_url in post.pop("images", []):
if image_downloaded := archiver.download_from_url(image_url):
result.add_media(Media(image_downloaded))
is_success = True # this is an images post and we got it/them
# get the video if present, could be an image post
if video_url := post.pop("video_url", None):
video_downloaded = archiver.download_from_url(video_url, f"vid_{post.get('id', '')}")
if not video_downloaded:
logger.error("Failed to download video")
return False
video_media = Media(video_downloaded)
if duration := post.pop("duration", None):
video_media.set("duration", duration)
result.add_media(video_media)
is_success = True # this is a video post and we got it
# add remaining metadata
result.set_title(post.get("title", ""))
result.set_title(post.pop("title", ""))
if created_at := post.get("create_time", None):
if created_at := post.pop("create_time", None):
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
if author := post.get("author", None):
if author := post.pop("author", None):
result.set("author", author)
result.set("api_data", post)
result.set("api_data", {k: v for k, v in post.items() if v})
if is_success:
result.success("yt-dlp_TikTok")
else:
raise ValueError("Unable to download any media from TikTok post, possibly deleted or private.")
return result

View File

@@ -4,12 +4,12 @@ from importlib.metadata import version
import hashlib
from slugify import slugify
from retrying import retry
import requests
from auto_archiver.utils.custom_logger import logger
from rfc3161_client import (decode_timestamp_response,TimestampRequestBuilder,TimeStampResponse, VerifierBuilder)
from rfc3161_client import (decode_timestamp_response, TimestampRequestBuilder, TimeStampResponse, VerifierBuilder)
from rfc3161_client import VerificationError as Rfc3161VerificationError
from rfc3161_client.base import HashAlgorithm
from rfc3161_client.tsp import SignedData
from cryptography import x509
from cryptography.hazmat.primitives import serialization
@@ -60,7 +60,6 @@ class TimestampingEnricher(Enricher):
logger.debug(f"No hashes found")
return
hashes_fn = os.path.join(self.tmp_dir, "hashes.txt")
data_to_sign = "\n".join(hashes)
@@ -75,7 +74,7 @@ class TimestampingEnricher(Enricher):
logger.debug(f"Timestamping with {tsa_url=}")
signed: TimeStampResponse = self.sign_data(tsa_url, message)
# fail if there's any issue with the certificates, uses certifi list of trusted CAs or the user-defined `cert_authorities`
root_cert = self.verify_signed(signed, message)
@@ -113,7 +112,7 @@ class TimestampingEnricher(Enricher):
f.write(timestamp_token)
return tst_path
def verify_signed(self, timestamp_response: TimeStampResponse, message: bytes) -> x509.Certificate:
def verify_signed(self, timestamp_response: TimeStampResponse, message: bytes) -> x509.Certificate:
"""
Verify a Signed Timestamp Response is trusted by a known Certificate Authority.
@@ -136,7 +135,7 @@ class TimestampingEnricher(Enricher):
if not cert_authorities:
raise ValueError(f"No trusted roots found in {trusted_root_path}.")
timestamp_certs = self.tst_certs(timestamp_response)
intermediate_certs = timestamp_certs[1:-1]
@@ -148,7 +147,7 @@ class TimestampingEnricher(Enricher):
message_hash = hashlib.sha256(message).digest()
else:
raise ValueError(f"Unsupported hash algorithm: {hash_algorithm}")
for certificate in cert_authorities:
builder = VerifierBuilder()
builder.add_root_certificate(certificate)
@@ -158,7 +157,6 @@ class TimestampingEnricher(Enricher):
verifier = builder.build()
try:
verifier.verify(timestamp_response, message_hash)
return certificate
@@ -171,23 +169,38 @@ class TimestampingEnricher(Enricher):
# see https://github.com/sigstore/sigstore-python/blob/99948d5b80525a5a104e904ffea58169dc6e0629/sigstore/_internal/timestamp.py#L84-L121
timestamp_request = (
TimestampRequestBuilder().data(bytes_data).nonce(nonce=True).build()
)
try:
TimestampRequestBuilder().data(bytes_data).nonce(nonce=True).build()
)
@retry(
wait_exponential_multiplier=1,
stop_max_attempt_number=2,
)
def sign_with_retry():
response = self.session.post(tsa_url, data=timestamp_request.as_bytes(), timeout=10)
response.raise_for_status()
return response
try:
response = sign_with_retry()
except requests.RequestException as e:
logger.error(f"Error while sending request to {tsa_url=}: {e}")
raise
@retry(
wait_exponential_multiplier=1,
stop_max_attempt_number=2,
)
def decode_with_retry(response):
return decode_timestamp_response(response.content)
# Check that we can parse the response but do not *verify* it
try:
timestamp_response = decode_timestamp_response(response.content)
timestamp_response = decode_with_retry(response)
except ValueError as e:
logger.error(f"Invalid timestamp response from server {tsa_url}: {e}")
raise
return timestamp_response
def tst_certs(self, tsp_response: TimeStampResponse):
signed_data: SignedData = tsp_response.signed_data
certs = [x509.load_der_x509_certificate(c) for c in signed_data.certificates]
@@ -196,7 +209,7 @@ class TimestampingEnricher(Enricher):
if len(certs) == 1:
return certs
while(len(ordered_certs) < len(certs)):
while (len(ordered_certs) < len(certs)):
if len(ordered_certs) == 0:
for cert in certs:
if not [c for c in certs if cert.subject == c.issuer]:
@@ -220,7 +233,7 @@ class TimestampingEnricher(Enricher):
cert_chain = []
for i, cert in enumerate(certificates):
cert_fn = os.path.join(self.tmp_dir, f"{i+1} {str(cert.serial_number)[:20]}.crt")
cert_fn = os.path.join(self.tmp_dir, f"{i + 1} {str(cert.serial_number)[:20]}.crt")
with open(cert_fn, "wb") as f:
f.write(cert.public_bytes(encoding=serialization.Encoding.PEM))
cert_chain.append(Media(filename=cert_fn).set("subject", cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value))

View File

@@ -5,6 +5,9 @@ from auto_archiver.modules.antibot_extractor_enricher.antibot_extractor_enricher
from .test_extractor_base import TestExtractorBase
CI = os.getenv("GITHUB_ACTIONS", "") == "true"
class DummySB:
def __init__(self, url="", title="", visible_texts=None, visible_elements=None):
self._url = url
@@ -51,14 +54,15 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
@pytest.mark.download
@pytest.mark.parametrize(
"url,in_title,in_text,image_count,video_count",
"url,in_title,in_text,image_count,video_count,skip_ci",
[
(
"https://en.wikipedia.org/wiki/Western_barn_owl",
"western barn owl",
"Tyto alba",
5,
4,
0,
False,
),
(
"https://www.bellingcat.com/news/2025/04/29/open-sources-show-myanmar-junta-airstrike-damages-despite-post-earthquake-ceasefire/",
@@ -66,6 +70,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
"Bellingcat has geolocated",
5,
0,
False,
),
(
"https://www.bellingcat.com/news/2025/03/27/gaza-israel-palestine-shot-killed-injured-destroyed-dangerous-drone-journalists-in-gaza/",
@@ -73,6 +78,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
"continued the work of Gazan journalists",
5,
1,
False,
),
(
"https://www.bellingcat.com/about/general-information",
@@ -80,6 +86,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
"Stichting Bellingcat",
0, # SVGs are ignored
0,
False,
),
(
"https://vk.com/wikipedia?from=search&w=wall-36156673_20451",
@@ -87,6 +94,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
"16 сентября 1985 года лейблом EMI Records.",
5,
0,
False,
),
(
"https://www.tiktok.com/@tracy_2424/photo/7418200173953830162",
@@ -94,13 +102,19 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
"Dito ko lang",
1,
0,
True,
),
],
)
def test_download_pages_with_media(self, setup_module, make_item, url, in_title, in_text, image_count, video_count):
def test_download_pages_with_media(
self, setup_module, make_item, url, in_title, in_text, image_count, video_count, skip_ci
):
"""
Test downloading pages with media.
"""
if CI and skip_ci:
pytest.skip("Skipping test in CI environment")
self.extractor = setup_module(
self.extractor_module,
self.config

View File

@@ -48,8 +48,6 @@ class TestGenericExtractor(TestExtractorBase):
("https://www.youtube.com/watch?v=5qap5aO4i9A", ["youtube"]),
("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en", ["tiktok"]),
("https://www.instagram.com/p/CU1J9JYJ9Zz/", ["instagram"]),
("https://www.facebook.com/nytimes/videos/10160796550110716", ["facebook"]),
("https://www.facebook.com/BylineFest/photos/t.100057299682816/927879487315946/", ["facebook"]),
],
)
def test_suitable_extractors(self, url, suitable_extractors):
@@ -148,6 +146,7 @@ class TestGenericExtractor(TestExtractorBase):
def test_bluesky_download_video(self, make_item):
item = make_item("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i")
result = self.extractor.download(item)
assert result.get_url() == "https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i"
assert result is not False
@pytest.mark.skipif(not TEST_TRUTH_SOCIAL, reason="Truth social download tests disabled in environment variables.")

View File

@@ -55,6 +55,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
("https://www.tiktok.com/@ggs68taiwan.official/video/7441821351142362375", True),
("https://www.tiktok.com/t/ZP8YQ8e5j/", True),
("https://vt.tiktok.com/ZSMTJeqRP/", True),
("https://tiktok.com/@user/photo/123?lang=en", True),
],
)
def test_is_suitable(self, url, is_suitable, tiktok_dropin):
@@ -68,10 +69,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
mock_get.assert_called_once()
mock_get.return_value.json.assert_called_once()
# first message is just the 'Skipping using ytdlp to download files for TikTok' message
assert (
"failed to parse JSON response from tikwm.com for url='https://www.tiktok.com/@example/video/1234'"
in caplog.text
)
assert "Failed to parse JSON response from tikwm.com" in caplog.text
mock_get.return_value.json.side_effect = Exception
with caplog.at_level("ERROR"):
@@ -79,10 +77,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
mock_get.assert_called()
assert mock_get.call_count == 2
assert mock_get.return_value.json.call_count == 2
assert (
"failed to parse JSON response from tikwm.com for url='https://www.tiktok.com/@example/video/1234'"
in caplog.text
)
assert "Failed to parse JSON response from tikwm.com" in caplog.text
@pytest.mark.parametrize(
"response",
@@ -98,27 +93,30 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) is False
mock_get.assert_called_once()
mock_get.return_value.json.assert_called_once()
assert "failed to get a valid response from tikwm.com" in caplog.text
assert "Unable to download with tikwm.com: " in caplog.text
@pytest.mark.parametrize(
"response,has_vid",
"response,is_success",
[
({"data": {"id": 123}}, False),
({"data": {"wmplay": "url"}}, True),
({"data": {"play": "url"}}, True),
({"data": {"id": 123, "images": []}}, False),
({"data": {"wmplay": "url", "images": ["img1.jpg"]}}, True),
({"data": {"play": "url", "images": ["img1.jpg"]}}, True),
({"data": {"images": ["img1.jpg"]}}, True),
],
)
def test_correct_extraction(self, mock_get, make_item, response, has_vid, mocker):
def test_correct_extraction(self, mock_get, make_item, response, is_success, mocker):
data = {k: v for k, v in response.get("data", {}).items()}
mock_get.return_value.status_code = 200
mock_get.return_value.json.return_value = {"msg": "success", **response}
result = self.extractor.download(make_item(self.VALID_EXAMPLE_URL))
if not has_vid:
assert result is False
else:
total_media = len(data.get("images", [])) + (1 if data.get("wmplay", data.get("play")) else 0)
if is_success:
assert result.is_success()
assert len(result.media) == 1
assert len(result.media) == total_media
else:
assert result is False
mock_get.assert_called()
assert mock_get.call_count == 1 + int(has_vid)
assert mock_get.call_count == 1 + total_media
mock_get.return_value.json.assert_called_once()
def test_correct_data_extracted(self, mock_get, make_item):
@@ -142,7 +140,9 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
assert len(result.media) == 2
assert result.get_title() == "Title"
assert result.get("author") == "Author"
assert result.get("api_data") == {"other": "data", "id": 123}
assert result.get("other") == "data"
assert result.get("comments") is None
assert result.get("api_data") == {"id": 123, "other": "data"}
assert result.media[1].get("duration") == 60
assert result.get("timestamp") == datetime.fromtimestamp(1736301699, tz=timezone.utc)