Compare commits

...

27 Commits

Author SHA1 Message Date
dependabot[bot]
7e44787d7a Bump yaml in /scripts/settings
Bumps [yaml](https://github.com/eemeli/yaml) to 1.10.3 and updates ancestor dependency . These dependencies need to be updated together.


Updates `yaml` from 1.10.2 to 1.10.3
- [Release notes](https://github.com/eemeli/yaml/releases)
- [Commits](https://github.com/eemeli/yaml/compare/v1.10.2...v1.10.3)

Updates `yaml` from 2.8.2 to 2.9.0
- [Release notes](https://github.com/eemeli/yaml/releases)
- [Commits](https://github.com/eemeli/yaml/compare/v1.10.2...v1.10.3)

---
updated-dependencies:
- dependency-name: yaml
  dependency-version: 1.10.3
  dependency-type: indirect
- dependency-name: yaml
  dependency-version: 2.9.0
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-06-05 02:07:03 +00:00
Miguel Sozinho Ramalho
afbe4fac50 Merge pull request #430 from bellingcat/dev
bug fixes and maintenance
2026-04-27 15:52:39 +01:00
msramalho
e633be1721 version bump 2026-04-27 12:35:54 +01:00
msramalho
bc06de8e5c fixes incomplete yt-dlp parts download 2026-04-27 12:34:47 +01:00
Miguel Sozinho Ramalho
20fddce3a3 Merge pull request #427 from PeterUpfold/deno-container
Fix missing JS runtime config for bguils_po_token_method
2026-04-24 11:08:28 +01:00
msramalho
6efa439cdb dependencies bump 2026-04-23 17:20:54 +01:00
Miguel Sozinho Ramalho
ef77d1fc86 Merge branch 'main' into dev 2026-04-23 14:21:01 +01:00
msramalho
a57a5ee005 adds an extra check when calling pypi as it's led to uncaught ssl errors 2026-04-23 14:20:07 +01:00
msramalho
2582f567ac removes curl/unzip from dockerfile 2026-04-23 14:04:46 +01:00
msramalho
4e5c1a6218 suggested alternative change to deno install 2026-04-23 14:02:51 +01:00
Peter Upfold
12d9c469b2 Add Deno to Dockerfile 2026-04-13 18:19:23 +01:00
Miguel Sozinho Ramalho
792838f1a1 Merge pull request #419 from bellingcat/dev
Dependencies bump, new ghostarchive enricher
2026-04-07 14:44:35 +01:00
Miguel Sozinho Ramalho
17c4ae15eb Merge branch 'main' into dev 2026-04-07 10:51:10 +01:00
msramalho
a08af07348 version bump 2026-04-06 18:34:20 +01:00
Miguel Sozinho Ramalho
e54077f4e8 Merge pull request #418 from bellingcat/feat/ghostarchive
Feat/ghostarchive
2026-04-06 18:33:15 +01:00
msramalho
319c0528da dependencies bump 2026-04-06 18:27:47 +01:00
msramalho
ae0e53e434 adds tests for new ghostarchive enricher feature 2026-04-06 17:15:32 +01:00
msramalho
82fc786d56 implements new enricher to submit URLs to ghostarchive 2026-04-06 17:13:48 +01:00
Miguel Sozinho Ramalho
aa65299844 Merge pull request #408 from bellingcat/dev
telethon compatibility with celery workers, dependency bumps
2026-03-16 11:28:21 +00:00
msramalho
1b69ec1f00 dependencies bump 2026-03-16 11:11:57 +00:00
Miguel Sozinho Ramalho
304e5d40b1 Merge branch 'main' into dev 2026-03-16 11:10:26 +00:00
msramalho
3194fee95d fix telethon bug when running in celery workers that close the event loop 2026-03-12 10:20:11 +00:00
msramalho
0040810e2e dependencies bump 2026-03-10 14:33:25 +00:00
Miguel Sozinho Ramalho
63cfe34e23 Merge pull request #407 from bellingcat/dev
minor bug fix: handles failed get downloads
2026-03-02 17:10:46 +00:00
msramalho
23a88e3cf4 ci issues 2026-03-02 17:07:09 +00:00
msramalho
3cac160cc1 version bump 2026-03-02 17:01:33 +00:00
msramalho
e9a92272c5 bug fix: missing filename on url download 2026-03-02 17:01:16 +00:00
29 changed files with 1770 additions and 711 deletions

View File

@@ -1,18 +1,17 @@
FROM webrecorder/browsertrix-crawler:1.11.4 AS base
FROM webrecorder/browsertrix-crawler:1.12.4 AS base
ENV RUNNING_IN_DOCKER=1 \
LANG=C.UTF-8 \
LC_ALL=C.UTF-8 \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONFAULTHANDLER=1 \
PATH="/root/.local/bin:$PATH"
PYTHONFAULTHANDLER=1
ARG TARGETARCH
# Installing system dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool python3-tk
apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool python3-tk
# Poetry and runtime
FROM base AS runtime

1428
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[project]
name = "auto-archiver"
version = "1.2.3"
version = "1.2.7"
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
requires-python = ">=3.10,<3.13"

View File

@@ -10,21 +10,21 @@
"dependencies": {
"@dnd-kit/core": "^6.3.1",
"@dnd-kit/sortable": "^10.0.0",
"@emotion/react": "latest",
"@emotion/styled": "latest",
"@emotion/react": "*",
"@emotion/styled": "*",
"@mui/icons-material": "^7.1.1",
"@mui/material": "^7.1.1",
"react": "19.1.0",
"react-dom": "19.1.0",
"react-markdown": "^10.0.0",
"yaml": "^2.7.0"
"yaml": "^2.9.0"
},
"devDependencies": {
"@types/react": "latest",
"@types/react-dom": "latest",
"@vitejs/plugin-react": "latest",
"typescript": "latest",
"vite": "latest",
"@types/react": "*",
"@types/react-dom": "*",
"@vitejs/plugin-react": "*",
"typescript": "*",
"vite": "*",
"vite-plugin-singlefile": "^2.1.0"
}
},
@@ -1976,9 +1976,9 @@
}
},
"node_modules/cosmiconfig/node_modules/yaml": {
"version": "1.10.2",
"resolved": "https://registry.npmjs.org/yaml/-/yaml-1.10.2.tgz",
"integrity": "sha512-r3vXyErRCYJ7wg28yvBY5VSoAF8ZvlcW9/BwUzEtUsjvX/DKs24dIkuwjtuprwJJHsbyUbLApepYTR1BN4uHrg==",
"version": "1.10.3",
"resolved": "https://registry.npmjs.org/yaml/-/yaml-1.10.3.tgz",
"integrity": "sha512-vIYeF1u3CjlhAFekPPAk2h/Kv4T3mAkMox5OymRiJQB0spDP10LHvt+K7G9Ny6NuuMAb25/6n1qyUjAcGNf/AA==",
"license": "ISC",
"engines": {
"node": ">= 6"
@@ -3886,9 +3886,9 @@
"license": "ISC"
},
"node_modules/yaml": {
"version": "2.8.2",
"resolved": "https://registry.npmjs.org/yaml/-/yaml-2.8.2.tgz",
"integrity": "sha512-mplynKqc1C2hTVYxd0PU2xQAc22TI1vShAYGksCCfxbn/dFwnHTNi1bvYsBTkhdUNtGIf5xNOg938rrSSYvS9A==",
"version": "2.9.0",
"resolved": "https://registry.npmjs.org/yaml/-/yaml-2.9.0.tgz",
"integrity": "sha512-2AvhNX3mb8zd6Zy7INTtSpl1F15HW6Wnqj0srWlkKLcpYl/gMIMJiyuGq2KeI2YFxUPjdlB+3Lc10seMLtL4cA==",
"license": "ISC",
"bin": {
"yaml": "bin.mjs"

View File

@@ -18,7 +18,7 @@
"react": "19.1.0",
"react-dom": "19.1.0",
"react-markdown": "^10.0.0",
"yaml": "^2.7.0"
"yaml": "^2.9.0"
},
"devDependencies": {
"@types/react": "latest",

View File

@@ -11,6 +11,7 @@ Key Functionalities:
from __future__ import annotations
import hashlib
import os
from typing import Any, List, Union, Dict
from dataclasses import dataclass, field
from dataclasses_json import dataclass_json
@@ -181,8 +182,14 @@ class Metadata:
media_hashes = set()
new_media = []
for m in self.media:
if not m.filename:
new_media.append(m)
continue
h = m.get("hash")
if not h:
if not os.path.exists(m.filename):
logger.warning(f"Skipping missing media file: {m.filename}")
continue
h = calculate_hash_in_chunks(hashlib.sha256(), int(1.6e7), m.filename)
if len(h) and h in media_hashes:
continue

View File

@@ -467,7 +467,11 @@ Here's how that would look: \n\nsteps:\n extractors:\n - [your_extractor_name_
return self.setup_complete_parser(basic_config, yaml_config, unused_args)
def check_for_updates(self):
response = requests.get("https://pypi.org/pypi/auto-archiver/json").json()
try:
response = requests.get("https://pypi.org/pypi/auto-archiver/json", timeout=10).json()
except Exception as e:
logger.debug(f"Unable to check for updates: {e}")
return
latest_version = version.parse(response["info"]["version"])
current_version = version.parse(__version__)
# check version compared to current version

View File

@@ -73,6 +73,7 @@ class AntibotExtractorEnricher(Extractor, Enricher):
if self.enrich(result):
result.status = "antibot"
return result
return False
def _prepare_user_data_dir(self):
if self.user_data_dir:

View File

@@ -39,12 +39,18 @@ class Bluesky(GenericDropin):
media_url = "https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={}&did={}"
for image_media in image_medias:
url = media_url.format(image_media["image"]["ref"]["$link"], post["author"]["did"])
image_media = archiver.download_from_url(url)
media.append(Media(image_media))
filename = archiver.download_from_url(url)
if filename:
media.append(Media(filename))
else:
logger.warning(f"Failed to download Bluesky image from {url}")
for video_media in video_medias:
url = media_url.format(video_media["ref"]["$link"], post["author"]["did"])
video_media = archiver.download_from_url(url)
media.append(Media(video_media))
filename = archiver.download_from_url(url)
if filename:
media.append(Media(filename))
else:
logger.warning(f"Failed to download Bluesky video from {url}")
return media
def _get_post_data(self, post: dict) -> dict:

View File

@@ -204,8 +204,11 @@ class GenericExtractor(Extractor):
if thumbnail_url:
try:
cover_image_path = self.download_from_url(thumbnail_url)
media = Media(cover_image_path)
metadata.add_media(media, id="cover")
if cover_image_path:
media = Media(cover_image_path)
metadata.add_media(media, id="cover")
else:
logger.warning(f"Failed to download cover image from {thumbnail_url}")
except Exception as e:
logger.error(f"Could not download cover image {thumbnail_url}: {e}")
@@ -572,6 +575,8 @@ class GenericExtractor(Extractor):
"--live-from-start" if self.live_from_start else "--no-live-from-start",
"--postprocessor-args",
"ffmpeg:-bitexact", # ensure bitexact output to avoid mismatching hashes for same video
"--js-runtimes",
"node", # yt-dlp defaults to deno-only; node is available in the base image
]
# proxy handling

View File

@@ -1,6 +1,7 @@
from typing import Type
from auto_archiver.utils import traverse_obj
from auto_archiver.utils.custom_logger import logger
from auto_archiver.core.metadata import Metadata, Media
from auto_archiver.core.extractor import Extractor
from yt_dlp.extractor.common import InfoExtractor
@@ -58,6 +59,9 @@ class Truth(GenericDropin):
# add the media
for media in post.get("media_attachments", []):
filename = archiver.download_from_url(media["url"])
if not filename:
logger.warning(f"Failed to download media from {media['url']}")
continue
result.add_media(Media(filename), id=media.get("id"))
return result

View File

@@ -157,5 +157,8 @@ class Twitter(GenericDropin):
mimetype = variant["content_type"]
ext = mimetypes.guess_extension(mimetype)
media.filename = archiver.download_from_url(media.get("src"), f"{slugify(url)}_{i}{ext}")
if not media.filename:
logger.warning(f"Failed to download media from {media.get('src')}")
continue
result.add_media(media)
return result

View File

@@ -0,0 +1 @@
from .ghostarchive_enricher import GhostarchiveEnricher

View File

@@ -0,0 +1,58 @@
{
"name": "Ghost Archive Enricher",
"type": ["enricher"],
"entry_point": "ghostarchive_enricher::GhostarchiveEnricher",
"requires_setup": False,
"dependencies": {
"python": ["loguru", "requests", "bs4", "seleniumbase"],
},
"configs": {
"timeout": {
"default": 120,
"type": "int",
"help": "seconds to wait for successful archive confirmation from Ghost Archive.",
},
"check_existing": {
"default": True,
"type": "bool",
"help": "whether to search for an existing archive before submitting a new one.",
},
"proxy_http": {
"default": None,
"help": "http proxy to use for requests, eg http://proxy-user:password@proxy-ip:port",
},
"proxy_https": {
"default": None,
"help": "https proxy to use for requests, eg https://proxy-user:password@proxy-ip:port",
},
},
"description": """
Submits the current URL to [Ghost Archive](https://ghostarchive.org/) for archiving and returns the archived page URL.
Used as an **enricher** to add a Ghost Archive URL to items already extracted by other modules.
### Features
- Archives any public URL using the Ghost Archive service.
- Optionally checks for existing archives before submitting a new one.
- Supports HTTP and HTTPS proxies for requests.
- Parses HTML responses to extract archive URLs (Ghost Archive has no JSON API).
### Important
- This module confirms that Ghost Archive accepted the URL submission and returned an archive link.
It does **not** verify the contents or completeness of the archived page.
### Notes
- Ghost Archive is a free service with no authentication required.
- Archived pages must be smaller than 50 MB (including CSS, fonts, images, etc.).
- Videos are archived up to 360p and must be under 100 MB and shorter than 30 minutes.
- Archival may take up to 5 minutes depending on the queue and page complexity.
- Archived content is stored indefinitely.
- Ghost Archive does not archive pages that require authentication or form submission.
### Limitations
- No official API — this module interacts with the Ghost Archive web interface.
- The submission endpoint is protected by Cloudflare, so a headless browser (SeleniumBase) is used for new submissions.
- Searching for existing archives uses plain HTTP requests and does not require a browser.
- Rate limiting may apply; consider using a delay between requests if archiving many URLs.
""",
}

View File

@@ -0,0 +1,153 @@
import time
import re
import requests
from bs4 import BeautifulSoup
from seleniumbase import SB
from auto_archiver.utils.custom_logger import logger
from auto_archiver.utils import url as UrlUtil
from auto_archiver.core import Enricher, Metadata
class GhostarchiveEnricher(Enricher):
"""
Submits the current URL to Ghost Archive (ghostarchive.org) for archiving
and stores the archived page URL as enrichment metadata.
Ghost Archive has no official API — this module interacts with the web form
and parses HTML responses. The submission endpoint is protected by Cloudflare,
so a headless browser (SeleniumBase) is used for archival submissions, while
plain HTTP requests are used for searching existing archives.
Note: this module only confirms that Ghost Archive accepted the submission
and returned an archive URL. It does not verify that the archived page
content is complete or correctly rendered.
"""
GHOSTARCHIVE_BASE = "https://ghostarchive.org"
ARCHIVE_ENDPOINT = f"{GHOSTARCHIVE_BASE}/archive2"
SEARCH_ENDPOINT = f"{GHOSTARCHIVE_BASE}/search"
ARCHIVE_URL_PATTERN = re.compile(r"/archive/([A-Za-z0-9]+)")
def _get_proxies(self) -> dict:
proxies = {}
if self.proxy_http:
proxies["http"] = self.proxy_http
if self.proxy_https:
proxies["https"] = self.proxy_https
return proxies
def _get_headers(self) -> dict:
return {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
}
def _normalize_archive_href(self, href: str) -> str | None:
"""Normalize an archive link href to a full HTTPS URL, filtering out replay links."""
if "/archive/" not in href or "/replay/" in href:
return None
if href.startswith("/"):
return f"{self.GHOSTARCHIVE_BASE}{href}"
if href.startswith("http://ghostarchive.org"):
return href.replace("http://", "https://")
if href.startswith("https://ghostarchive.org"):
return href
return None
def _search_existing(self, url: str) -> str | None:
"""
Search Ghost Archive for an existing archive of the given URL.
Returns the archive URL if found, otherwise None.
"""
try:
r = requests.get(
self.SEARCH_ENDPOINT,
params={"term": url},
headers=self._get_headers(),
proxies=self._get_proxies(),
timeout=30,
)
if r.status_code != 200:
logger.warning(f"Ghost Archive search returned status {r.status_code}")
return None
soup = BeautifulSoup(r.text, "html.parser")
for link in soup.find_all("a", href=True):
archive_url = self._normalize_archive_href(link["href"])
if archive_url:
logger.info(f"Found existing Ghost Archive: {archive_url}")
return archive_url
except requests.exceptions.RequestException as e:
logger.warning(f"Ghost Archive search failed: {e}")
return None
def _submit_url(self, url: str) -> str | None:
"""
Submit a URL to Ghost Archive for archiving using a headless browser.
The /archive2 endpoint is Cloudflare-protected, requiring JS execution.
Returns the archive URL if successful, otherwise None.
"""
try:
with SB(uc=True, headless=True) as sb:
logger.debug("Opening Ghost Archive homepage in headless browser")
sb.open(self.GHOSTARCHIVE_BASE)
# fill in the archive form and submit
sb.type('input[name="archive"]', url)
sb.click('input[type="submit"][value="Submit for archival"]')
# wait for navigation to /archive/{id} or timeout
start_time = time.time()
while time.time() - start_time < self.timeout:
current_url = sb.get_current_url()
if self.ARCHIVE_URL_PATTERN.search(current_url):
archive_url = current_url.split("?")[0]
logger.info(f"Ghost Archive saved: {archive_url}")
return archive_url
time.sleep(2)
# if we didn't redirect, try parsing the page source
page_source = sb.get_page_source()
return self._parse_archive_url(page_source)
except Exception as e:
logger.warning(f"Ghost Archive submission failed: {e}")
return None
def _parse_archive_url(self, html: str) -> str | None:
"""Parse HTML response to find an archive URL."""
soup = BeautifulSoup(html, "html.parser")
for link in soup.find_all("a", href=True):
archive_url = self._normalize_archive_href(link["href"])
if archive_url:
return archive_url
return None
def enrich(self, to_enrich: Metadata) -> bool:
url = to_enrich.get_url()
if UrlUtil.is_auth_wall(url):
logger.debug("[SKIP] Ghost Archive since url is behind AUTH WALL")
return False
if to_enrich.get("ghostarchive"):
logger.info(f"Ghost Archive enricher had already been executed: {to_enrich.get('ghostarchive')}")
return True
# optionally check for existing archive first
archive_url = None
if self.check_existing:
logger.debug(f"Searching Ghost Archive for existing archive of {url}")
archive_url = self._search_existing(url)
if not archive_url:
logger.debug(f"Submitting {url} to Ghost Archive")
archive_url = self._submit_url(url)
if archive_url:
to_enrich.set("ghostarchive", archive_url)
return True
logger.warning(f"Ghost Archive failed to archive {url}")
return False

View File

@@ -25,6 +25,9 @@ class HashEnricher(Enricher):
logger.debug(f"Calculating media hashes with algo={self.algorithm}")
for i, m in enumerate(to_enrich.media):
if not m.filename:
logger.warning(f"Skipping hash for media without filename: {m}")
continue
if len(hd := self.calculate_hash(m.filename)):
to_enrich.media[i].set("hash", f"{self.algorithm}:{hd}")

View File

@@ -99,7 +99,10 @@ class InstagramAPIExtractor(Extractor):
result.set_title(user.get("full_name", username)).set("data", user)
if pic_url := user.get("profile_pic_url_hd", user.get("profile_pic_url")):
filename = self.download_from_url(pic_url)
result.add_media(Media(filename=filename), id="profile_picture")
if filename:
result.add_media(Media(filename=filename), id="profile_picture")
else:
logger.warning(f"Failed to download profile picture from {pic_url}")
count_posts = 0
if self.full_profile:
@@ -202,7 +205,10 @@ class InstagramAPIExtractor(Extractor):
if cover_media := h_info.get("cover_media", {}).get("cropped_image_version", {}).get("url"):
filename = self.download_from_url(cover_media)
result.add_media(Media(filename=filename), id=f"cover_media highlight {id}")
if filename:
result.add_media(Media(filename=filename), id=f"cover_media highlight {id}")
else:
logger.warning(f"Failed to download cover media from {cover_media}")
items = h_info.get("items", [])[::-1] # newest to oldest
items = items[: min(max_to_download, len(items))]
@@ -345,7 +351,10 @@ class InstagramAPIExtractor(Extractor):
image_media = None
if image_url := item.get("thumbnail_url"):
filename = self.download_from_url(image_url, verbose=False)
image_media = Media(filename=filename)
if filename:
image_media = Media(filename=filename)
else:
logger.warning(f"Failed to download thumbnail from {image_url}")
# retrieve video info
best_id = item.get("id", item.get("pk"))
@@ -357,16 +366,19 @@ class InstagramAPIExtractor(Extractor):
if video_url := item.get("video_url"):
filename = self.download_from_url(video_url, verbose=False)
video_media = Media(filename=filename)
if taken_at:
video_media.set("date", taken_at)
if code:
video_media.set("url", f"https://www.instagram.com/p/{code}")
if caption_text:
video_media.set("text", caption_text)
video_media.set("preview", [image_media])
video_media.set("data", [item])
return item, video_media, f"{context or 'video'} {best_id}"
if filename:
video_media = Media(filename=filename)
if taken_at:
video_media.set("date", taken_at)
if code:
video_media.set("url", f"https://www.instagram.com/p/{code}")
if caption_text:
video_media.set("text", caption_text)
video_media.set("preview", [image_media])
video_media.set("data", [item])
return item, video_media, f"{context or 'video'} {best_id}"
else:
logger.warning(f"Failed to download video from {video_url}")
elif image_media:
if taken_at:
image_media.set("date", taken_at)

View File

@@ -25,6 +25,9 @@ class MetaEnricher(Enricher):
logger.debug(f"Calculating archive file sizes for {len(to_enrich.media)} media files")
total_size = 0
for media in to_enrich.get_all_media():
if not media.filename:
logger.warning(f"Skipping file size for media without filename: {media}")
continue
file_stats = os.stat(media.filename)
media.set("bytes", file_stats.st_size)
media.set("size", self.human_readable_bytes(file_stats.st_size))

View File

@@ -49,10 +49,18 @@ class TelegramExtractor(Extractor):
if not len(image_urls):
return False
for img_url in image_urls:
result.add_media(Media(self.download_from_url(img_url)))
filename = self.download_from_url(img_url)
if not filename:
logger.warning(f"Failed to download image from {img_url}")
continue
result.add_media(Media(filename))
else:
video_url = video.get("src")
m_video = Media(self.download_from_url(video_url))
video_filename = self.download_from_url(video_url)
if not video_filename:
logger.warning(f"Failed to download video from {video_url}")
return False
m_video = Media(video_filename)
# extract duration from HTML
try:
duration = s.find_all("time")[0].contents[0]

View File

@@ -1,3 +1,4 @@
import asyncio
import os
import shutil
import re
@@ -53,6 +54,16 @@ class TelethonExtractor(Extractor):
logger.debug(f"Making a copy of the session file {base_session_filepath} to {self.session_file}.session")
shutil.copy(base_session_filepath, f"{self.session_file}.session")
# ensure a running event loop exists (Needed when used by Celery workers which may close the default one)
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# initiate the client
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
@@ -190,6 +201,9 @@ class TelethonExtractor(Extractor):
)
for i, om_url in enumerate(other_media_urls):
filename = self.download_from_url(om_url, f"{chat}_{group_id}_{i}")
if not filename:
logger.warning(f"Failed to download media from {om_url}")
continue
result.add_media(Media(filename=filename), id=f"{group_id}_{i}")
filename_dest = os.path.join(self.tmp_dir, f"{chat}_{group_id}", str(mp.id))

View File

@@ -114,6 +114,9 @@ class TwitterApiExtractor(Extractor):
logger.info(f"Found media {media}")
ext = mimetypes.guess_extension(mimetype)
media.filename = self.download_from_url(media.get("src"), f"{slugify(url)}_{i}{ext}")
if not media.filename:
logger.warning(f"Failed to download media from {media.get('src')}")
continue
result.add_media(media)
result.set_content(

View File

@@ -120,6 +120,9 @@ def ydl_entry_to_filename(ydl, entry: dict) -> str:
directory = os.path.dirname(base_filename) # '/get/path/to'
basename = os.path.basename(base_filename) # 'file'
for f in os.listdir(directory):
# skip incomplete downloads left behind by yt-dlp
if f.endswith(".part"):
continue
if (
f.startswith(basename)
or (entry_url and os.path.splitext(f)[0] in entry_url)

View File

@@ -0,0 +1,277 @@
import pytest
import requests
import os
from unittest.mock import MagicMock
from auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher import GhostarchiveEnricher
CI = os.getenv("GITHUB_ACTIONS", "") == "true"
# sample HTML responses for mocking
SEARCH_HTML_FOUND = """
<html><body>
<h1>Archives for https://example.com</h1>
<table>
<tr><td><a href="http://ghostarchive.org/archive/Abc12">https://example.com</a></td></tr>
</table>
</body></html>
"""
SEARCH_HTML_NOT_FOUND = """
<html><body>
<h1>Archives for https://example.com</h1>
<p>Page 0 out of 0</p>
<p>No archives for that site.</p>
</body></html>
"""
SAVE_RESPONSE_HTML_WITH_LINK = """
<html><body>
<h1>Archive saved</h1>
<a href="/archive/Xyz99">View archive</a>
</body></html>
"""
ENRICHER_CONFIG = {
"timeout": 120,
"check_existing": True,
"proxy_http": None,
"proxy_https": None,
}
class TestGhostarchiveEnricher:
"""Tests for Ghost Archive Enricher"""
@pytest.fixture(autouse=True)
def setup_enricher(self, setup_module):
self.enricher: GhostarchiveEnricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
def test_search_existing_found(self, mocker):
"""When an existing archive is found, it should be returned."""
mock_response = mocker.Mock()
mock_response.status_code = 200
mock_response.text = SEARCH_HTML_FOUND
mocker.patch(
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
)
result = self.enricher._search_existing("https://example.com")
assert result == "https://ghostarchive.org/archive/Abc12"
def test_search_existing_not_found(self, mocker):
"""When no existing archive is found, None should be returned."""
mock_response = mocker.Mock()
mock_response.status_code = 200
mock_response.text = SEARCH_HTML_NOT_FOUND
mocker.patch(
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
)
result = self.enricher._search_existing("https://example.com")
assert result is None
def test_search_existing_request_error(self, mocker):
"""When search request fails, None should be returned."""
mocker.patch(
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get",
side_effect=requests.exceptions.ConnectionError("connection failed"),
)
result = self.enricher._search_existing("https://example.com")
assert result is None
def test_search_existing_non_200(self, mocker):
"""When search returns non-200, None should be returned."""
mock_response = mocker.Mock()
mock_response.status_code = 503
mocker.patch(
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
)
result = self.enricher._search_existing("https://example.com")
assert result is None
def test_submit_url_success_redirect(self, mocker):
"""Successful submission via headless browser should return archive URL."""
mock_sb = MagicMock()
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive/NewId1"
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
mock_sb.__exit__ = MagicMock(return_value=False)
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
result = self.enricher._submit_url("https://example.com")
assert result == "https://ghostarchive.org/archive/NewId1"
mock_sb.type.assert_called_once()
mock_sb.click.assert_called_once()
def test_submit_url_success_redirect_strips_query(self, mocker):
"""Redirect URL query params should be stripped."""
mock_sb = MagicMock()
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive/NewId1?wr=false"
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
mock_sb.__exit__ = MagicMock(return_value=False)
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
result = self.enricher._submit_url("https://example.com")
assert result == "https://ghostarchive.org/archive/NewId1"
def test_submit_url_success_html_fallback(self, mocker):
"""When browser doesn't redirect, should parse page source for archive link."""
mock_sb = MagicMock()
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive2"
mock_sb.get_page_source.return_value = SAVE_RESPONSE_HTML_WITH_LINK
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
mock_sb.__exit__ = MagicMock(return_value=False)
# make timeout=0 so the polling loop exits immediately and falls through to HTML parsing
self.enricher.timeout = 0
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
result = self.enricher._submit_url("https://example.com")
assert result == "https://ghostarchive.org/archive/Xyz99"
def test_submit_url_browser_error(self, mocker):
"""Browser error during submission should return None."""
mocker.patch(
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB",
side_effect=Exception("browser failed to start"),
)
result = self.enricher._submit_url("https://example.com")
assert result is None
def test_proxy_configuration(self, mocker):
"""Proxies should be passed to search requests when configured."""
self.enricher.proxy_http = "http://proxy:8080"
self.enricher.proxy_https = "https://proxy:8443"
mock_get = mocker.patch(
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get",
)
mock_response = mocker.Mock()
mock_response.status_code = 200
mock_response.text = SEARCH_HTML_FOUND
mock_get.return_value = mock_response
result = self.enricher._search_existing("https://example.com")
call_kwargs = mock_get.call_args
assert call_kwargs.kwargs.get("proxies") == {"http": "http://proxy:8080", "https": "https://proxy:8443"}
assert result is not None
def test_parse_archive_url_with_replay_links(self):
"""Parser should ignore /replay/ links and only return /archive/ links."""
html = """
<html><body>
<a href="/archive/replay/w/id-abc/mp_/https://example.com">replay</a>
<a href="/archive/Valid1">valid</a>
</body></html>
"""
result = self.enricher._parse_archive_url(html)
assert result == "https://ghostarchive.org/archive/Valid1"
def test_parse_archive_url_no_links(self):
"""Parser should return None when no archive links found."""
html = "<html><body><p>No archive here</p></body></html>"
result = self.enricher._parse_archive_url(html)
assert result is None
def test_enrich_sets_ghostarchive_on_metadata(self, mocker, make_item):
"""enrich() should set 'ghostarchive' key on the metadata object."""
mocker.patch.object(self.enricher, "_search_existing", return_value="https://ghostarchive.org/archive/Enr1")
item = make_item("https://example.com")
result = self.enricher.enrich(item)
assert result is True
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/Enr1"
def test_enrich_skips_if_already_enriched(self, mocker, make_item):
"""enrich() should skip if ghostarchive key is already set."""
mock_search = mocker.patch.object(self.enricher, "_search_existing")
item = make_item("https://example.com", ghostarchive="https://ghostarchive.org/archive/Old1")
result = self.enricher.enrich(item)
assert result is True
mock_search.assert_not_called()
def test_enrich_returns_false_on_failure(self, mocker, make_item):
"""enrich() should return False when both search and submit fail."""
mocker.patch.object(self.enricher, "_search_existing", return_value=None)
mocker.patch.object(self.enricher, "_submit_url", return_value=None)
item = make_item("https://example.com")
result = self.enricher.enrich(item)
assert result is False
def test_enrich_skips_auth_wall(self, mocker, make_item):
"""enrich() should skip URLs behind auth walls."""
mocker.patch(
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.UrlUtil.is_auth_wall", return_value=True
)
item = make_item("https://example.com/login")
result = self.enricher.enrich(item)
assert result is False
def test_enrich_with_existing_archive(self, mocker, make_item):
"""enrich() should use existing archive when check_existing is True."""
mocker.patch.object(self.enricher, "_search_existing", return_value="https://ghostarchive.org/archive/Exist1")
mock_submit = mocker.patch.object(self.enricher, "_submit_url")
item = make_item("https://example.com")
result = self.enricher.enrich(item)
assert result is True
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/Exist1"
mock_submit.assert_not_called()
def test_enrich_submits_when_no_existing(self, mocker, make_item):
"""enrich() should submit URL when no existing archive found."""
mocker.patch.object(self.enricher, "_search_existing", return_value=None)
mocker.patch.object(self.enricher, "_submit_url", return_value="https://ghostarchive.org/archive/New42")
item = make_item("https://example.com")
result = self.enricher.enrich(item)
assert result is True
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/New42"
def test_enrich_skips_check_existing_when_disabled(self, mocker, make_item):
"""enrich() should skip search when check_existing is False."""
self.enricher.check_existing = False
mock_search = mocker.patch.object(self.enricher, "_search_existing")
mocker.patch.object(self.enricher, "_submit_url", return_value="https://ghostarchive.org/archive/Direct1")
item = make_item("https://example.com")
result = self.enricher.enrich(item)
assert result is True
mock_search.assert_not_called()
@pytest.mark.download
def test_real_search_existing(self, setup_module):
"""Integration test: search for an existing archive on Ghost Archive."""
enricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
# example.com is commonly archived
result = enricher._search_existing("https://example.com")
# we just check it doesn't crash; result may or may not be found
assert result is None or result.startswith("https://ghostarchive.org/archive/")
@pytest.mark.download
@pytest.mark.skipif(CI, reason="Avoid submitting a real task on every CI run")
def test_real_submit_example_com(self, setup_module, make_item):
"""Integration test: submit example.com to Ghost Archive and verify enrichment."""
enricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
item = make_item("https://example.com")
result = enricher.enrich(item)
assert result is True
archive_url = item.get("ghostarchive")
assert archive_url is not None
assert archive_url.startswith("https://ghostarchive.org/archive/")

View File

@@ -53,6 +53,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
}
@pytest.mark.download
@pytest.mark.flaky(reruns=2, reruns_delay=5)
@pytest.mark.parametrize(
"url,in_title,in_text,image_count,video_count,skip_ci",
[
@@ -128,6 +129,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
item = make_item(url)
result = self.extractor.download(item)
assert result, f"download() returned {result!r} — Selenium may have failed (e.g., window close timeout)"
assert result.status == "antibot", "Expected status to be 'antibot'"
# Check title contains all required words (case-insensitive)

View File

@@ -1,3 +1,4 @@
import asyncio
import os
from datetime import date
@@ -60,3 +61,53 @@ def test_valid_url_regex(url, expected, get_lazy_module):
def test_invite_pattern_regex(invite, expected, get_lazy_module):
match = TelethonExtractor.invite_pattern.search(invite)
assert bool(match) == expected
def test_setup_with_closed_event_loop(get_lazy_module, tmp_path, mocker):
"""
Simulate the Celery worker scenario where the asyncio event loop is closed
before setup() runs. The fix should create a new event loop so that
TelegramClient.start() does not raise 'Event loop is closed'.
"""
# create a session file so setup doesn't fail on missing file
session_file = tmp_path / "test.session"
session_file.touch()
# close the current event loop to simulate a Celery worker environment
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.close()
lazy_module = get_lazy_module("telethon_extractor")
module = lazy_module.load(
{"telethon_extractor": {"session_file": str(session_file), "api_id": 123, "api_hash": "ABC"}}
)
# setup should have succeeded and a new open event loop should exist
new_loop = asyncio.get_event_loop()
assert not new_loop.is_closed()
assert module.client is not None
def test_setup_with_no_event_loop(get_lazy_module, tmp_path, mocker):
"""
Simulate the scenario where there is no current event loop at all
(e.g. running in a non-main thread). The fix should create one.
"""
session_file = tmp_path / "test.session"
session_file.touch()
# Remove the current event loop entirely
# In Python 3.12+, get_event_loop() in a non-main thread raises RuntimeError
mocker.patch("asyncio.get_event_loop", side_effect=RuntimeError("no current event loop"))
new_loop_mock = mocker.MagicMock()
new_loop_mock.is_closed.return_value = False
mocker.patch("asyncio.new_event_loop", return_value=new_loop_mock)
set_loop = mocker.patch("asyncio.set_event_loop")
lazy_module = get_lazy_module("telethon_extractor")
lazy_module.load({"telethon_extractor": {"session_file": str(session_file), "api_id": 123, "api_hash": "ABC"}})
# a new event loop should have been created and set
asyncio.new_event_loop.assert_called_once()
set_loop.assert_called_once_with(new_loop_mock)

View File

@@ -86,6 +86,22 @@ def test_media_management(basic_metadata, media_file):
assert basic_metadata.get_media_by_id("m1") == media1
def test_remove_duplicate_skips_missing_files(basic_metadata, media_file, tmp_path):
"""Missing files should be dropped instead of crashing with FileNotFoundError."""
real_file = tmp_path / "exists.txt"
real_file.write_text("content")
valid = media_file(filename=str(real_file), hash_value="abc")
missing = media_file(filename="/nonexistent/path/gone.mp4")
basic_metadata.add_media(valid, "valid")
basic_metadata.add_media(missing, "missing")
assert len(basic_metadata.media) == 2
basic_metadata.remove_duplicate_media_by_hash()
assert len(basic_metadata.media) == 1
assert basic_metadata.get_media_by_id("valid") == valid
def test_success():
m = Metadata()
assert not m.is_success()

View File

@@ -0,0 +1,259 @@
"""
Tests for handling Media objects with None filename.
When download_from_url fails, it returns None. Various enrichers and
the metadata deduplication logic must gracefully handle Media objects
where filename is None, rather than crashing with TypeError.
"""
from datetime import datetime, timezone
from unittest.mock import MagicMock
import pytest
from auto_archiver.core.metadata import Metadata, Media
from auto_archiver.modules.hash_enricher import HashEnricher
from auto_archiver.modules.meta_enricher import MetaEnricher
# ── HashEnricher ──────────────────────────────────────────────────────
class TestHashEnricherNoneFilename:
"""hash_enricher should skip media with None filename without crashing."""
@pytest.fixture(autouse=True)
def setup(self, setup_module):
self.enricher = setup_module(HashEnricher, {"algorithm": "SHA-256", "chunksize": 100})
def test_skips_none_filename(self):
m = Metadata().set_url("https://example.com")
media = Media(filename=None)
media.set("src", "https://example.com/video.mp4")
m.add_media(media)
# Should not raise
self.enricher.enrich(m)
# No hash should be set
assert m.media[0].get("hash") is None
def test_hashes_valid_skips_none(self, tmp_path):
"""Mix of valid and None-filename media: only valid ones get hashed."""
valid_file = tmp_path / "test.txt"
valid_file.write_text("hello world")
m = Metadata().set_url("https://example.com")
m.add_media(Media(filename=str(valid_file)))
m.add_media(Media(filename=None))
self.enricher.enrich(m)
assert m.media[0].get("hash") is not None
assert m.media[1].get("hash") is None
def test_all_none_filenames(self):
"""All media have None filename enricher should not crash."""
m = Metadata().set_url("https://example.com")
m.add_media(Media(filename=None))
m.add_media(Media(filename=None))
self.enricher.enrich(m)
assert len(m.media) == 2
for media in m.media:
assert media.get("hash") is None
# ── MetaEnricher ──────────────────────────────────────────────────────
class TestMetaEnricherNoneFilename:
"""meta_enricher should skip media with None filename without crashing."""
@pytest.fixture(autouse=True)
def setup(self, setup_module):
self.enricher = setup_module(MetaEnricher, {})
def test_skips_none_filename(self):
m = Metadata().set_url("https://example.com")
m.set("_processed_at", datetime.now(timezone.utc))
media = Media(filename=None)
media.set("src", "https://example.com/video.mp4")
m.add_media(media)
# Should not raise
self.enricher.enrich(m)
assert m.get("total_bytes") == 0
def test_sizes_valid_skips_none(self, tmp_path):
"""Mix of valid and None-filename media: only valid ones get sized."""
valid_file = tmp_path / "test.txt"
valid_file.write_text("A" * 500)
m = Metadata().set_url("https://example.com")
m.set("_processed_at", datetime.now(timezone.utc))
m.add_media(Media(filename=str(valid_file)))
m.add_media(Media(filename=None))
self.enricher.enrich(m)
assert m.media[0].get("bytes") == 500
assert m.media[1].get("bytes") is None
assert m.get("total_bytes") == 500
# ── Metadata.remove_duplicate_media_by_hash ───────────────────────────
class TestRemoveDuplicateMediaNoneFilename:
"""remove_duplicate_media_by_hash should keep media with None filename."""
def test_none_filename_kept(self):
m = Metadata().set_url("https://example.com")
none_media = Media(filename=None)
none_media.set("src", "https://example.com/video.mp4")
m.add_media(none_media)
m.remove_duplicate_media_by_hash()
assert len(m.media) == 1
assert m.media[0].filename is None
def test_none_and_valid_mixed(self, tmp_path):
"""None-filename media is kept alongside valid-filename media."""
valid_file = tmp_path / "test.txt"
valid_file.write_text("content")
m = Metadata().set_url("https://example.com")
m.add_media(Media(filename=str(valid_file)))
none_media = Media(filename=None)
none_media.set("src", "https://example.com/video.mp4")
m.add_media(none_media)
m.remove_duplicate_media_by_hash()
assert len(m.media) == 2
def test_multiple_none_filename_all_kept(self):
"""Multiple None-filename media are all kept (can't deduplicate without file)."""
m = Metadata().set_url("https://example.com")
m.add_media(Media(filename=None))
m.add_media(Media(filename=None))
m.remove_duplicate_media_by_hash()
assert len(m.media) == 2
# ── Twitter dropin create_metadata ────────────────────────────────────
class TestTwitterDropinNoneFilename:
"""Twitter dropin should skip media when download_from_url returns None."""
@pytest.fixture
def twitter_dropin(self):
from auto_archiver.modules.generic_extractor.twitter import Twitter
return Twitter()
def test_create_metadata_skips_failed_photo_download(self, twitter_dropin):
"""When download_from_url returns None for a photo, it's not added to media."""
tweet = {
"user": {"name": "Test User"},
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
"full_text": "Test tweet with photo",
"entities": {
"media": [
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test.jpg"},
]
},
}
mock_archiver = MagicMock()
mock_archiver.download_from_url.return_value = None # simulate failed download
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
# The result should have no media since the download failed
assert len(result.media) == 0
def test_create_metadata_skips_failed_video_download(self, twitter_dropin):
"""When download_from_url returns None for a video, it's not added to media."""
tweet = {
"user": {"name": "Test User"},
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
"full_text": "Test tweet with video",
"entities": {
"media": [
{
"type": "video",
"video_info": {
"variants": [
{
"url": "https://video.twimg.com/vid/1280x720/test.mp4",
"content_type": "video/mp4",
},
]
},
},
]
},
}
mock_archiver = MagicMock()
mock_archiver.download_from_url.return_value = None
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
assert len(result.media) == 0
def test_create_metadata_keeps_successful_download(self, twitter_dropin, tmp_path):
"""When download_from_url succeeds, media is added."""
tweet = {
"user": {"name": "Test User"},
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
"full_text": "Test tweet with photo",
"entities": {
"media": [
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test.jpg"},
]
},
}
test_file = tmp_path / "test.jpg"
test_file.write_text("fake image data")
mock_archiver = MagicMock()
mock_archiver.download_from_url.return_value = str(test_file)
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
assert len(result.media) == 1
assert result.media[0].filename == str(test_file)
def test_create_metadata_mixed_downloads(self, twitter_dropin, tmp_path):
"""One download succeeds, one fails only successful one is kept."""
tweet = {
"user": {"name": "Test User"},
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
"full_text": "Test tweet with two photos",
"entities": {
"media": [
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test1.jpg"},
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test2.jpg"},
]
},
}
test_file = tmp_path / "test1.jpg"
test_file.write_text("fake image data")
mock_archiver = MagicMock()
# First call succeeds, second fails
mock_archiver.download_from_url.side_effect = [str(test_file), None]
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
assert len(result.media) == 1
assert result.media[0].filename == str(test_file)

View File

@@ -1,5 +1,6 @@
import pytest
from argparse import ArgumentParser, ArgumentTypeError
from requests.exceptions import SSLError
from auto_archiver.core.orchestrator import ArchivingOrchestrator
from auto_archiver.version import __version__
from auto_archiver.core.config import read_yaml, store_yaml
@@ -256,3 +257,34 @@ def test_load_failed_extractor_cleanup(test_args, mocker, caplog):
assert "Error during setup of modules: Test exception" in caplog.text
# make sure the 'cleanup' is called
assert "cleanup" in caplog.text
def test_check_for_updates_ssl_error(orchestrator, mocker):
"""check_for_updates should not raise when the HTTP request fails."""
mocker.patch(
"auto_archiver.core.orchestrator.requests.get",
side_effect=SSLError("SSL handshake failed"),
)
# should not raise
orchestrator.check_for_updates()
def test_check_for_updates_timeout(orchestrator, mocker):
"""check_for_updates should not raise on connection timeout."""
from requests.exceptions import ConnectionError
mocker.patch(
"auto_archiver.core.orchestrator.requests.get",
side_effect=ConnectionError("Connection refused"),
)
orchestrator.check_for_updates()
def test_check_for_updates_new_version_available(orchestrator, mocker):
"""check_for_updates should not raise when a newer version exists."""
mocker.patch(
"auto_archiver.core.orchestrator.requests.get",
return_value=mocker.Mock(json=lambda: {"info": {"version": "99.0.0"}}),
)
# should complete without error
orchestrator.check_for_updates()

View File

@@ -14,6 +14,7 @@ from auto_archiver.utils.misc import (
calculate_file_hash,
random_str,
get_timestamp,
ydl_entry_to_filename,
)
@@ -139,3 +140,47 @@ class TestMiscUtils:
def test_invalid_timestamp_returns_none(self):
assert get_timestamp("invalid-date") is None
class TestYdlEntryToFilename:
"""Tests for ydl_entry_to_filename, especially .part file filtering."""
def _make_mock_ydl(self, prepared_filename):
class MockYDL:
def prepare_filename(self, entry):
return prepared_filename
return MockYDL()
def test_returns_exact_file_if_exists(self, tmp_path):
video = tmp_path / "video.mp4"
video.write_bytes(b"data")
ydl = self._make_mock_ydl(str(video))
assert ydl_entry_to_filename(ydl, {}) == str(video)
def test_skips_part_file_returns_complete(self, tmp_path):
"""Simulates yt-dlp leaving a .part file from a failed format
while a complete .webm exists."""
(tmp_path / "f5U3IKfoSYs.f399.mp4.part").write_bytes(b"incomplete")
webm = tmp_path / "f5U3IKfoSYs.webm"
webm.write_bytes(b"complete video")
# ydl.prepare_filename returns the expected .mp4 which doesn't exist
ydl = self._make_mock_ydl(str(tmp_path / "f5U3IKfoSYs.mp4"))
result = ydl_entry_to_filename(ydl, {})
assert result == str(webm)
assert not result.endswith(".part")
def test_skips_part_file_returns_false_if_no_other_match(self, tmp_path):
"""Only a .part file exists — should return False."""
(tmp_path / "video.f399.mp4.part").write_bytes(b"incomplete")
ydl = self._make_mock_ydl(str(tmp_path / "video.mp4"))
assert ydl_entry_to_filename(ydl, {}) is False
def test_returns_false_when_no_files_match(self, tmp_path):
(tmp_path / "unrelated.txt").write_bytes(b"data")
ydl = self._make_mock_ydl(str(tmp_path / "video.mp4"))
assert ydl_entry_to_filename(ydl, {}) is False