Compare commits

..

26 Commits

Author SHA1 Message Date
msramalho
a9a0bae083 dependencies update 2025-10-22 18:11:36 +01:00
Miguel Sozinho Ramalho
97d133ce79 Merge pull request #357 from bellingcat/dev
small improvements on tiktok and verison bumps
2025-10-22 16:02:26 +01:00
msramalho
432ee3dcfd version bump 2025-10-22 15:50:50 +01:00
msramalho
794b4f6052 Merge branch 'dev' of https://github.com/bellingcat/auto-archiver into dev 2025-09-11 15:06:27 +01:00
msramalho
965d7d41dd dependency updates 2025-09-11 15:06:25 +01:00
Miguel Sozinho Ramalho
e73faa70cc Merge pull request #352 from mjgaughan/developer-documentation-updates
updating the style-checking code in the documentation
2025-08-11 10:42:53 +01:00
mgaughan
80beab9f23 ruff-fix -> ruff-clean; there is no ruff-fix in the Makefile. Maybe the command /should/ be ruff-fix to align with the underlying ruff command; for later discussion. This at least reconciles the documentation to the Makefile 2025-08-05 21:36:32 -04:00
Miguel Sozinho Ramalho
200cea4e12 Merge pull request #345 from mjgaughan/main
Correction of small documentation typos
2025-07-29 09:36:10 +01:00
mgaughan
1256fde159 updating location of .env.test.example in documentation 2025-07-23 13:04:48 -04:00
mgaughan
65e222e177 fixing typo in documentation pytest -> poetry 2025-07-22 17:20:59 -04:00
mgaughan
f2eb9ef784 correcting to double-dash in the poetry install documentation 2025-07-21 17:55:48 -04:00
msramalho
2081c16555 embed retry into timestamping 2025-07-10 14:49:53 +01:00
msramalho
d3efd7121c avoid empty metadata comments 2025-07-06 14:05:17 +01:00
msramalho
9d3cd5774b an improved approach for #295 2025-07-06 14:04:01 +01:00
Miguel Sozinho Ramalho
80d61e8b85 Merge pull request #341 from bellingcat/dev
Address several small bugs, includes tiktok photos extraction, and data-saving for proxy usage in generic_extractor.
2025-07-05 20:28:00 +01:00
msramalho
d36cdbfa87 fixing pypaperclip see issue #339 2025-07-05 19:07:23 +01:00
msramalho
c1506ee1cf some wayback errors are expected and should be warnings 2025-07-05 18:31:39 +01:00
msramalho
3a34a49822 adds antibot tiktok logic for photos closes #295 2025-07-05 18:31:12 +01:00
msramalho
37c6d97275 new auth wall check logic and escaped CSS selector in selenium 2025-07-05 18:30:31 +01:00
msramalho
7234eda85f expands Sheets API retries for really large spreadsheets 2025-07-05 18:29:33 +01:00
msramalho
a8c1ef3912 generic_extractor config to use proxy only when needed to avoid overzealousness 2025-07-05 16:54:58 +01:00
msramalho
52ed8196a5 updates dependencies 2025-07-05 16:03:47 +01:00
msramalho
2051e8e491 adds further exponential backoff for Sheets API worksheet enumeration 2025-07-05 16:02:07 +01:00
msramalho
21255db86a stops using service that is not up for timestamping 2025-07-05 16:00:46 +01:00
msramalho
eae0da08b3 fix issue with two runs of anitbot extractor 2025-07-05 16:00:03 +01:00
msramalho
0d1447117c updates docs to reflect new general approach extractor 2025-07-05 15:56:13 +01:00
18 changed files with 1427 additions and 1090 deletions

View File

@@ -21,7 +21,7 @@ This allows you to run the auto-archiver without the `poetry run` prefix.
### Optional Development Packages
Install development packages (used for unit tests etc.) using:
`poetry install -with dev`
`poetry install --with dev`
```{toctree}
@@ -33,4 +33,4 @@ docs
release
settings_page
style_guide
```
```

View File

@@ -50,7 +50,7 @@ Note not all warnings can be fixed automatically.
Most fixes are safe, but some non-standard practices such as dynamic loading are not picked up by linters. Ensure you check any modifications by this before committing them.
```shell
make ruff-fix
make ruff-clean
```
**Changing Configurations ⚙️**
@@ -67,4 +67,4 @@ One example is to extend the selected rules for linting the `pyproject.toml` fil
extend-select = ["B"]
```
Then re-run the `make ruff-check` command to see the new rules in action.
Then re-run the `make ruff-check` command to see the new rules in action.

View File

@@ -8,7 +8,7 @@
## Running Tests
1. Make sure you've installed the dev dependencies with `pytest install --with dev`
1. Make sure you've installed the dev dependencies with `poetry install --with dev`
2. Tests can be run as follows:
```{code} bash
#### Command prefix of 'poetry run' removed here for simplicity
@@ -26,7 +26,7 @@ pytest -ra -v tests/test_file.py
pytest -ra -v tests/test_file.py::test_function_name
```
3. Some tests require environment variables to be set. You can use the example `.env.test.example` file as a template. Copy it to `.env.test` and fill in the required values. This file will be loaded automatically by `pytest`.
3. Some tests require environment variables to be set. You can use the example `tests/.env.test.example` file as a template. Copy it to `tests/.env.test` and fill in the required values. This file will be loaded automatically by `pytest`.
```{code} bash
cp .env.test.example .env.test
```
cp tests/.env.test.example tests/.env.test
```

View File

@@ -4,8 +4,9 @@ Extractor modules are used to extract the content of a given URL. Typically, one
Extractors that are able to extract content from a wide range of websites include:
1. Generic Extractor: parses videos and images on sites using the powerful yt-dlp library.
2. Wayback Machine Extractor: sends pages to the Wayback machine for archiving, and stores the link.
3. WACZ Extractor: runs a web browser to 'browse' the URL and save a copy of the page in WACZ format.
2. Antibot Extractor: uses a headless browser to bypass bot detection and extract content.
3. WACZ Extractor: runs a web browser to 'browse' the URL and save a copy of the page in WACZ format.
4. Wayback Machine Extractor: sends pages to the Wayback machine for archiving, and stores the archived link.
```{include} autogen/extractor.md
```

2136
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[project]
name = "auto-archiver"
version = "1.1.1"
version = "1.1.5"
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
requires-python = ">=3.10,<3.13"
@@ -50,14 +50,15 @@ dependencies = [
"retrying (>=0.0.0)",
"rich-argparse (>=1.6.0,<2.0.0)",
"ruamel-yaml (>=0.18.10,<0.19.0)",
"rfc3161-client (==1.0.3)",
"cryptography (>44.0.1,<45.0.0)",
"rfc3161-client (>=1.0.5)",
"cryptography (>=46.0.3)",
"opentimestamps (>=0.4.5,<0.5.0)",
"bgutil-ytdlp-pot-provider (>=1.0.0)",
"yt-dlp[curl-cffi,default] (>=2025.5.22,<2026.0.0)",
"secretstorage (>=3.3.3,<4.0.0)",
"seleniumbase (>=4.36.4,<5.0.0)",
"pyautogui (>=0.9.54,<0.10.0)",
"pyperclip (==1.8.2)",
]
[tool.poetry.group.dev.dependencies]

View File

@@ -81,6 +81,9 @@ class AntibotExtractorEnricher(Extractor, Enricher):
os.makedirs(self.user_data_dir, exist_ok=True)
def enrich(self, to_enrich: Metadata, custom_data_dir: bool = True) -> bool:
if to_enrich.get_media_by_id("html_source_code"):
logger.info("Antibot has already been executed, skipping.")
return True
using_user_data_dir = self.user_data_dir if custom_data_dir else None
url = to_enrich.get_url()
@@ -94,9 +97,12 @@ class AntibotExtractorEnricher(Extractor, Enricher):
sb.uc_gui_click_rc() # NB: using handle instead of click breaks some sites like reddit, for now we separate here but can have dropins deciding this in the future
dropin = self._get_suitable_dropin(url, sb)
dropin.open_page(url)
if not dropin.open_page(url):
# TODO: could we detect deleted videos?
logger.warning("Failed to open drop-in page")
return False
if self.detect_auth_wall and self._hit_auth_wall(sb):
if self.detect_auth_wall and (dropin.hit_auth_wall() and self._hit_auth_wall(sb)):
logger.warning("Skipping since auth wall or CAPTCHA was detected")
return False
@@ -274,8 +280,14 @@ class AntibotExtractorEnricher(Extractor, Enricher):
return
url = to_enrich.get_url()
all_urls = set()
logger.debug(f"Extracting media for {js_css_selector=}")
try:
sources = sb.execute_script(js_css_selector)
except selenium.common.exceptions.JavascriptException as e:
logger.error(f"Error executing JavaScript selector {js_css_selector}: {e}")
return
sources = sb.execute_script(js_css_selector)
# js_for_css_selectors
for src in sources:
if len(all_urls) >= max_media:

View File

@@ -1,3 +1,4 @@
import json
import os
import traceback
from typing import Mapping
@@ -74,8 +75,11 @@ class Dropin:
You can overwrite this instead of `images_selector` for more control over scraped images.
"""
if not self.images_selectors():
return "return [];"
safe_selector = json.dumps(self.images_selectors())
return f"""
return Array.from(document.querySelectorAll("{self.images_selectors()}")).map(el => el.src || el.href).filter(Boolean);
return Array.from(document.querySelectorAll({safe_selector})).map(el => el.src || el.href).filter(Boolean);
"""
def js_for_video_css_selectors(self) -> str:
@@ -84,8 +88,11 @@ class Dropin:
You can overwrite this instead of `video_selector` for more control over scraped videos.
"""
if not self.video_selectors():
return "return [];"
safe_selector = json.dumps(self.video_selectors())
return f"""
return Array.from(document.querySelectorAll("{self.video_selectors()}")).map(el => el.src || el.href).filter(Boolean);
return Array.from(document.querySelectorAll({safe_selector})).map(el => el.src || el.href).filter(Boolean);
"""
def open_page(self, url) -> bool:
@@ -103,6 +110,12 @@ class Dropin:
"""
return 0, 0
def hit_auth_wall(self) -> bool:
"""
Custom check to see if the current page is behind an authentication wall, if True is returned the default global auth wall detector is used instead. If false, no auth wall is detected and the page is considered open.
"""
return True
def _get_username_password(self, site) -> tuple[str, str]:
"""
Get the username and password for the site from the extractor's auth data.

View File

@@ -0,0 +1,56 @@
from contextlib import suppress
from typing import Mapping
from auto_archiver.utils.custom_logger import logger
from auto_archiver.modules.antibot_extractor_enricher.dropin import Dropin
class TikTokDropin(Dropin):
"""
A class to handle TikTok drop-in functionality for the antibot extractor enricher module.
"""
def documentation() -> Mapping[str, str]:
return {
"name": "TikTok Dropin",
"description": "Handles TikTok posts and works without authentication.\nNOTE: This dropin is highly susceptible to TikTok's bot detection mechanisms and may not work reliably if you reuse the same IP. The GenericExtractor is recommended for TikTok posts, as it handles video/image download more reliable. In the future we plan to implement better anti captcha measures for this dropin.",
"site": "tiktok.com",
}
@staticmethod
def suitable(url: str) -> bool:
return "tiktok.com" in url
@staticmethod
def images_selectors() -> str:
return '[data-e2e="detail-photo"] img'
@staticmethod
def video_selectors() -> str:
return None # TikTok videos should be handled by the generic extractor
def open_page(self, url) -> bool:
self.sb.wait_for_ready_state_complete()
self._close_cookies_banner()
# TODO: implement login logic
if url != self.sb.get_current_url():
return False
if self.sb.is_text_visible("Video currently unavailable"):
logger.debug("Video may have been removed or is private.")
return False
return True
def hit_auth_wall(self) -> bool:
return False # TikTok does not require authentication for public posts
def _close_cookies_banner(self):
with suppress(Exception): # selenium.common.exceptions.JavascriptException
self.sb.execute_script("""
document
.querySelector("tiktok-cookie-banner")
.shadowRoot.querySelector("faceplate-dialog")
.querySelector("button")
.click()
""")
self.sb.click_if_visible("Skip")

View File

@@ -60,6 +60,10 @@ If you are having issues with the extractor, you can review the version of `yt-d
"default": "",
"help": "http/https/socks proxy to use for the webdriver, eg https://proxy-user:password@proxy-ip:port",
},
"proxy_on_failure_only": {
"default": True,
"help": "Applies only if a proxy is set. In that case if this setting is True, the extractor will only use the proxy if the initial request fails; if it is False, the extractor will always use the proxy.",
},
"end_means_success": {
"default": True,
"help": "if True, any archived content will mean a 'success', if False this extractor will not return a 'success' stage; this is useful for cases when the yt-dlp will archive a video but ignore other types of content like images or text only pages that the subsequent extractors can retrieve.",

View File

@@ -307,7 +307,7 @@ class GenericExtractor(Extractor):
if "description" in video_data and not result.get("content"):
result.set_content(video_data.get("description"))
# extract comments if enabled
if self.comments and video_data.get("comments", []) is not None:
if self.comments and video_data.get("comments", None) is not None:
result.set(
"comments",
[
@@ -502,6 +502,9 @@ class GenericExtractor(Extractor):
try:
result = self.get_metadata_for_post(info_extractor, url, ydl)
except (yt_dlp.utils.DownloadError, yt_dlp.utils.ExtractorError) as post_e:
if "NSFW tweet requires authentication." in str(post_e):
logger.warning(str(post_e))
return False
logger.error("Error downloading metadata for post: {error}", error=str(post_e))
return False
except Exception as generic_e:
@@ -513,7 +516,7 @@ class GenericExtractor(Extractor):
)
return False
if result:
if result and not result.is_success():
extractor_name = "yt-dlp"
if info_extractor:
extractor_name += f"_{info_extractor.ie_key()}"
@@ -525,7 +528,7 @@ class GenericExtractor(Extractor):
return result
def download(self, item: Metadata) -> Metadata:
def download(self, item: Metadata, skip_proxy: bool = False) -> Metadata:
url = item.get_url()
# TODO: this is a temporary hack until this issue is closed: https://github.com/yt-dlp/yt-dlp/issues/11025
@@ -533,6 +536,16 @@ class GenericExtractor(Extractor):
url = url.replace("https://ya.ru", "https://yandex.ru")
item.set("replaced_url", url)
# proxy_on_failure_only logic
if self.proxy and self.proxy_on_failure_only and not skip_proxy:
# when proxy_on_failure_only is True, we first try to download without a proxy and only continue with execution if that fails
try:
if without_proxy := self.download(item, skip_proxy=True):
logger.info("Downloaded successfully without proxy.")
return without_proxy
except Exception:
logger.debug("Download without proxy failed, trying with proxy...")
ydl_options = [
"-o",
os.path.join(self.tmp_dir, "%(id)s.%(ext)s"),
@@ -546,7 +559,7 @@ class GenericExtractor(Extractor):
]
# proxy handling
if self.proxy:
if self.proxy and not skip_proxy:
ydl_options.extend(["--proxy", self.proxy])
# max_downloads handling

View File

@@ -1,3 +1,4 @@
import re
import requests
from auto_archiver.utils.custom_logger import logger
@@ -14,12 +15,16 @@ class Tiktok(GenericDropin):
It's useful for capturing content that requires a login, like sensitive content.
"""
# Regex pattern to match TikTok photo post URLs
PHOTO_URL_REGEX = r"https?://(?:www\.)?tiktok\.com/@[\w\.-]+/photo/\d+"
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
def suitable(self, url, info_extractor) -> bool:
"""This dropin (which uses Tikvm) is suitable for *all* Tiktok type URLs - videos, lives, VMs, and users.
Return the 'suitable' method from the TikTokIE class."""
return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE))
return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE)) or (
re.match(self.PHOTO_URL_REGEX, url) is not None
)
def extract_post(self, url: str, ie_instance):
logger.debug("Using Tikwm API to attempt to download tiktok video")
@@ -28,56 +33,91 @@ class Tiktok(GenericDropin):
r = requests.get(endpoint)
if r.status_code != 200:
raise ValueError(f"unexpected status code '{r.status_code}' from tikwm.com for {url=}:")
raise ValueError(f"Unexpected status code '{r.status_code}' from tikwm.com")
try:
json_response = r.json()
except ValueError:
raise ValueError(f"failed to parse JSON response from tikwm.com for {url=}")
raise ValueError("Failed to parse JSON response from tikwm.com")
if not json_response.get("msg") == "success" or not (api_data := json_response.get("data", {})):
raise ValueError(f"failed to get a valid response from tikwm.com for {url=}: {repr(json_response)}")
raise ValueError(f"Unable to download with tikwm.com: {repr(json_response)}")
# tries to get the non-watermarked version first
video_url = api_data.pop("play", api_data.pop("wmplay", None))
if not video_url:
raise ValueError(f"no valid video URL found in response from tikwm.com for {url=}")
api_data["video_url"] = video_url
play_url = api_data.pop("play", api_data.pop("wmplay", None))
if play_url and "mime_type=audio" in play_url:
play_url = None
if play_url:
api_data["video_url"] = play_url
return api_data
def keys_to_clean(self, video_data: dict, info_extractor):
return ["video_url", "title", "create_time", "author", "cover", "origin_cover", "ai_dynamic_cover", "duration"]
return [
"video_url",
"title",
"create_time",
"author",
"cover",
"origin_cover",
"ai_dynamic_cover",
"duration",
"size",
"wm_size",
"music",
"music_info",
"play_count",
"digg_count",
"comment_count",
"share_count",
"download_count",
"collect_count",
"anchors",
"anchors_extras",
"is_ad",
"commerce_info",
"commercial_video_info",
"item_comment_settings",
"mentioned_users",
] # all of these will be added via api_data in a single metadata field vs individual ones in the generic extractor
def create_metadata(self, post: dict, ie_instance, archiver, url):
# prepare result, start by downloading video
result = Metadata()
video_url = post.pop("video_url")
is_success = False
# get the cover if possible
cover_url = post.pop("origin_cover", post.pop("cover", post.pop("ai_dynamic_cover", None)))
if cover_url and (cover_downloaded := archiver.download_from_url(cover_url)):
result.add_media(Media(cover_downloaded))
# get the video or fail
video_downloaded = archiver.download_from_url(video_url, f"vid_{post.get('id', '')}")
if not video_downloaded:
logger.error("Failed to download video")
return False
video_media = Media(video_downloaded)
if duration := post.get("duration", None):
video_media.set("duration", duration)
result.add_media(video_media)
for image_url in post.pop("images", []):
if image_downloaded := archiver.download_from_url(image_url):
result.add_media(Media(image_downloaded))
is_success = True # this is an images post and we got it/them
# get the video if present, could be an image post
if video_url := post.pop("video_url", None):
video_downloaded = archiver.download_from_url(video_url, f"vid_{post.get('id', '')}")
if not video_downloaded:
logger.error("Failed to download video")
return False
video_media = Media(video_downloaded)
if duration := post.pop("duration", None):
video_media.set("duration", duration)
result.add_media(video_media)
is_success = True # this is a video post and we got it
# add remaining metadata
result.set_title(post.get("title", ""))
result.set_title(post.pop("title", ""))
if created_at := post.get("create_time", None):
if created_at := post.pop("create_time", None):
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
if author := post.get("author", None):
if author := post.pop("author", None):
result.set("author", author)
result.set("api_data", post)
result.set("api_data", {k: v for k, v in post.items() if v})
if is_success:
result.success("yt-dlp_TikTok")
else:
raise ValueError("Unable to download any media from TikTok post, possibly deleted or private.")
return result

View File

@@ -32,26 +32,37 @@ class GsheetsFeederDB(Feeder, Database):
if not self.sheet and not self.sheet_id:
raise ValueError("You need to define either a 'sheet' name or a 'sheet_id' in your manifest.")
def open_sheet(self):
@retry(
wait_exponential_multiplier=1,
stop_max_attempt_number=6,
)
def open_sheet(self) -> gspread.Spreadsheet:
if self.sheet:
return self.gsheets_client.open(self.sheet)
else:
return self.gsheets_client.open_by_key(self.sheet_id)
@retry(
wait_exponential_multiplier=1,
stop_max_attempt_number=6,
)
def enumerate_sheets(self, sheet) -> Iterator[gspread.Worksheet]:
for worksheet in sheet.worksheets():
yield worksheet
def __iter__(self) -> Iterator[Metadata]:
sh = self.open_sheet()
for ii, worksheet in enumerate(sh.worksheets()):
if not self.should_process_sheet(worksheet.title):
logger.debug(f"Skipped worksheet '{worksheet.title}' due to allow/block rules")
continue
logger.info(f"Opening worksheet {ii=}: {worksheet.title=} header={self.header}")
gw = GWorksheet(worksheet, header_row=self.header, columns=self.columns)
if len(missing_cols := self.missing_required_columns(gw)):
logger.debug(
f"Skipped worksheet '{worksheet.title}' due to missing required column(s) for {missing_cols}"
)
continue
with logger.contextualize(worksheet=f"{sh.title}:{worksheet.title}"):
spreadsheet = self.open_sheet()
for worksheet in self.enumerate_sheets(spreadsheet):
with logger.contextualize(worksheet=f"{spreadsheet.title}:{worksheet.title}"):
if not self.should_process_sheet(worksheet.title):
logger.debug("Skipped worksheet due to allow/block rules")
continue
logger.info(f"Opening worksheet header={self.header}")
gw = GWorksheet(worksheet, header_row=self.header, columns=self.columns)
if len(missing_cols := self.missing_required_columns(gw)):
logger.debug(f"Skipped worksheet due to missing required column(s) for {missing_cols}")
continue
# process and yield metadata here:
yield from self._process_rows(gw)
logger.info(f"Finished worksheet {worksheet.title}")

View File

@@ -20,7 +20,7 @@
# "http://tsa.sinpe.fi.cr/tsaHttp/", # self-signed
# "http://tsa.cra.ge/signserver/tsa?workerName=qtsa", # self-signed
"http://tss.cnbs.gob.hn/TSS/HttpTspServer",
"http://dss.nowina.lu/pki-factory/tsa/good-tsa",
# "http://dss.nowina.lu/pki-factory/tsa/good-tsa",
# "https://freetsa.org/tsr", # self-signed
],
"help": "List of RFC3161 Time Stamp Authorities to use, separate with commas if passed via the command line.",

View File

@@ -4,12 +4,12 @@ from importlib.metadata import version
import hashlib
from slugify import slugify
from retrying import retry
import requests
from auto_archiver.utils.custom_logger import logger
from rfc3161_client import (decode_timestamp_response,TimestampRequestBuilder,TimeStampResponse, VerifierBuilder)
from rfc3161_client import (decode_timestamp_response, TimestampRequestBuilder, TimeStampResponse, VerifierBuilder)
from rfc3161_client import VerificationError as Rfc3161VerificationError
from rfc3161_client.base import HashAlgorithm
from rfc3161_client.tsp import SignedData
from cryptography import x509
from cryptography.hazmat.primitives import serialization
@@ -60,7 +60,6 @@ class TimestampingEnricher(Enricher):
logger.debug(f"No hashes found")
return
hashes_fn = os.path.join(self.tmp_dir, "hashes.txt")
data_to_sign = "\n".join(hashes)
@@ -75,7 +74,7 @@ class TimestampingEnricher(Enricher):
logger.debug(f"Timestamping with {tsa_url=}")
signed: TimeStampResponse = self.sign_data(tsa_url, message)
# fail if there's any issue with the certificates, uses certifi list of trusted CAs or the user-defined `cert_authorities`
root_cert = self.verify_signed(signed, message)
@@ -113,7 +112,7 @@ class TimestampingEnricher(Enricher):
f.write(timestamp_token)
return tst_path
def verify_signed(self, timestamp_response: TimeStampResponse, message: bytes) -> x509.Certificate:
def verify_signed(self, timestamp_response: TimeStampResponse, message: bytes) -> x509.Certificate:
"""
Verify a Signed Timestamp Response is trusted by a known Certificate Authority.
@@ -136,7 +135,7 @@ class TimestampingEnricher(Enricher):
if not cert_authorities:
raise ValueError(f"No trusted roots found in {trusted_root_path}.")
timestamp_certs = self.tst_certs(timestamp_response)
intermediate_certs = timestamp_certs[1:-1]
@@ -148,7 +147,7 @@ class TimestampingEnricher(Enricher):
message_hash = hashlib.sha256(message).digest()
else:
raise ValueError(f"Unsupported hash algorithm: {hash_algorithm}")
for certificate in cert_authorities:
builder = VerifierBuilder()
builder.add_root_certificate(certificate)
@@ -158,7 +157,6 @@ class TimestampingEnricher(Enricher):
verifier = builder.build()
try:
verifier.verify(timestamp_response, message_hash)
return certificate
@@ -171,23 +169,38 @@ class TimestampingEnricher(Enricher):
# see https://github.com/sigstore/sigstore-python/blob/99948d5b80525a5a104e904ffea58169dc6e0629/sigstore/_internal/timestamp.py#L84-L121
timestamp_request = (
TimestampRequestBuilder().data(bytes_data).nonce(nonce=True).build()
)
try:
TimestampRequestBuilder().data(bytes_data).nonce(nonce=True).build()
)
@retry(
wait_exponential_multiplier=1,
stop_max_attempt_number=2,
)
def sign_with_retry():
response = self.session.post(tsa_url, data=timestamp_request.as_bytes(), timeout=10)
response.raise_for_status()
return response
try:
response = sign_with_retry()
except requests.RequestException as e:
logger.error(f"Error while sending request to {tsa_url=}: {e}")
raise
@retry(
wait_exponential_multiplier=1,
stop_max_attempt_number=2,
)
def decode_with_retry(response):
return decode_timestamp_response(response.content)
# Check that we can parse the response but do not *verify* it
try:
timestamp_response = decode_timestamp_response(response.content)
timestamp_response = decode_with_retry(response)
except ValueError as e:
logger.error(f"Invalid timestamp response from server {tsa_url}: {e}")
raise
return timestamp_response
def tst_certs(self, tsp_response: TimeStampResponse):
signed_data: SignedData = tsp_response.signed_data
certs = [x509.load_der_x509_certificate(c) for c in signed_data.certificates]
@@ -196,7 +209,7 @@ class TimestampingEnricher(Enricher):
if len(certs) == 1:
return certs
while(len(ordered_certs) < len(certs)):
while (len(ordered_certs) < len(certs)):
if len(ordered_certs) == 0:
for cert in certs:
if not [c for c in certs if cert.subject == c.issuer]:
@@ -220,7 +233,7 @@ class TimestampingEnricher(Enricher):
cert_chain = []
for i, cert in enumerate(certificates):
cert_fn = os.path.join(self.tmp_dir, f"{i+1} {str(cert.serial_number)[:20]}.crt")
cert_fn = os.path.join(self.tmp_dir, f"{i + 1} {str(cert.serial_number)[:20]}.crt")
with open(cert_fn, "wb") as f:
f.write(cert.public_bytes(encoding=serialization.Encoding.PEM))
cert_chain.append(Media(filename=cert_fn).set("subject", cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)[0].value))

View File

@@ -2,7 +2,7 @@ import json
from auto_archiver.utils.custom_logger import logger
import time
import requests
from urllib3.exceptions import MaxRetryError
from auto_archiver.core import Extractor, Enricher
from auto_archiver.utils import url as UrlUtil
from auto_archiver.core import Metadata
@@ -45,7 +45,14 @@ class WaybackExtractorEnricher(Enricher, Extractor):
if self.if_not_archived_within:
post_data["if_not_archived_within"] = self.if_not_archived_within
# see https://docs.google.com/document/d/1Nsv52MvSjbLb2PCpHlat0gkzw0EvtSgpKHu4mk0MnrA for more options
r = requests.post("https://web.archive.org/save/", headers=ia_headers, data=post_data, proxies=proxies)
try:
r = requests.post("https://web.archive.org/save/", headers=ia_headers, data=post_data, proxies=proxies)
except MaxRetryError as e:
logger.warning(
f"MaxRetryError during Wayback POST call to /save, this may be do to a high number of calls leading to rate limiting: {e}"
)
to_enrich.set("wayback", "failed: possible rate limit")
return False
if r.status_code != 200:
logger.error(em := f"Internet archive failed with status of {r.status_code}: {r.json()}")
@@ -76,6 +83,9 @@ class WaybackExtractorEnricher(Enricher, Extractor):
if r_status.status_code == 200 and r_json["status"] == "success":
wayback_url = f"https://web.archive.org/web/{r_json['timestamp']}/{r_json['original_url']}"
elif r_status.status_code != 200 or r_json["status"] != "pending":
if r_json.get("status_ext") in ["error:blocked-url", "error:unauthorized"]:
logger.warning("Wayback cannot currently archive the URL, skipping.")
to_enrich.set("wayback", r_json.get("status_ext"))
logger.error(f"Wayback failed with {r_json}")
return False
except requests.exceptions.RequestException as e:

View File

@@ -88,6 +88,13 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
5,
0,
),
(
"https://www.tiktok.com/@tracy_2424/photo/7418200173953830162",
"TikTok",
"Dito ko lang",
1,
0,
),
],
)
def test_download_pages_with_media(self, setup_module, make_item, url, in_title, in_text, image_count, video_count):

View File

@@ -55,6 +55,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
("https://www.tiktok.com/@ggs68taiwan.official/video/7441821351142362375", True),
("https://www.tiktok.com/t/ZP8YQ8e5j/", True),
("https://vt.tiktok.com/ZSMTJeqRP/", True),
("https://tiktok.com/@user/photo/123?lang=en", True),
],
)
def test_is_suitable(self, url, is_suitable, tiktok_dropin):
@@ -68,10 +69,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
mock_get.assert_called_once()
mock_get.return_value.json.assert_called_once()
# first message is just the 'Skipping using ytdlp to download files for TikTok' message
assert (
"failed to parse JSON response from tikwm.com for url='https://www.tiktok.com/@example/video/1234'"
in caplog.text
)
assert "Failed to parse JSON response from tikwm.com" in caplog.text
mock_get.return_value.json.side_effect = Exception
with caplog.at_level("ERROR"):
@@ -79,10 +77,7 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
mock_get.assert_called()
assert mock_get.call_count == 2
assert mock_get.return_value.json.call_count == 2
assert (
"failed to parse JSON response from tikwm.com for url='https://www.tiktok.com/@example/video/1234'"
in caplog.text
)
assert "Failed to parse JSON response from tikwm.com" in caplog.text
@pytest.mark.parametrize(
"response",
@@ -98,27 +93,30 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
assert self.extractor.download(make_item(self.VALID_EXAMPLE_URL)) is False
mock_get.assert_called_once()
mock_get.return_value.json.assert_called_once()
assert "failed to get a valid response from tikwm.com" in caplog.text
assert "Unable to download with tikwm.com: " in caplog.text
@pytest.mark.parametrize(
"response,has_vid",
"response,is_success",
[
({"data": {"id": 123}}, False),
({"data": {"wmplay": "url"}}, True),
({"data": {"play": "url"}}, True),
({"data": {"id": 123, "images": []}}, False),
({"data": {"wmplay": "url", "images": ["img1.jpg"]}}, True),
({"data": {"play": "url", "images": ["img1.jpg"]}}, True),
({"data": {"images": ["img1.jpg"]}}, True),
],
)
def test_correct_extraction(self, mock_get, make_item, response, has_vid, mocker):
def test_correct_extraction(self, mock_get, make_item, response, is_success, mocker):
data = {k: v for k, v in response.get("data", {}).items()}
mock_get.return_value.status_code = 200
mock_get.return_value.json.return_value = {"msg": "success", **response}
result = self.extractor.download(make_item(self.VALID_EXAMPLE_URL))
if not has_vid:
assert result is False
else:
total_media = len(data.get("images", [])) + (1 if data.get("wmplay", data.get("play")) else 0)
if is_success:
assert result.is_success()
assert len(result.media) == 1
assert len(result.media) == total_media
else:
assert result is False
mock_get.assert_called()
assert mock_get.call_count == 1 + int(has_vid)
assert mock_get.call_count == 1 + total_media
mock_get.return_value.json.assert_called_once()
def test_correct_data_extracted(self, mock_get, make_item):
@@ -142,7 +140,9 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
assert len(result.media) == 2
assert result.get_title() == "Title"
assert result.get("author") == "Author"
assert result.get("api_data") == {"other": "data", "id": 123}
assert result.get("other") == "data"
assert result.get("comments") is None
assert result.get("api_data") == {"id": 123, "other": "data"}
assert result.media[1].get("duration") == 60
assert result.get("timestamp") == datetime.fromtimestamp(1736301699, tz=timezone.utc)