mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-10 20:28:28 +03:00
Compare commits
27 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e651bb849 | ||
|
|
6581bbe139 | ||
|
|
e633be1721 | ||
|
|
bc06de8e5c | ||
|
|
20fddce3a3 | ||
|
|
6efa439cdb | ||
|
|
ef77d1fc86 | ||
|
|
a57a5ee005 | ||
|
|
2582f567ac | ||
|
|
4e5c1a6218 | ||
|
|
12d9c469b2 | ||
|
|
792838f1a1 | ||
|
|
17c4ae15eb | ||
|
|
a08af07348 | ||
|
|
e54077f4e8 | ||
|
|
319c0528da | ||
|
|
ae0e53e434 | ||
|
|
82fc786d56 | ||
|
|
aa65299844 | ||
|
|
1b69ec1f00 | ||
|
|
304e5d40b1 | ||
|
|
3194fee95d | ||
|
|
0040810e2e | ||
|
|
63cfe34e23 | ||
|
|
23a88e3cf4 | ||
|
|
3cac160cc1 | ||
|
|
e9a92272c5 |
@@ -1,18 +1,17 @@
|
|||||||
FROM webrecorder/browsertrix-crawler:1.11.4 AS base
|
FROM webrecorder/browsertrix-crawler:1.12.4 AS base
|
||||||
|
|
||||||
ENV RUNNING_IN_DOCKER=1 \
|
ENV RUNNING_IN_DOCKER=1 \
|
||||||
LANG=C.UTF-8 \
|
LANG=C.UTF-8 \
|
||||||
LC_ALL=C.UTF-8 \
|
LC_ALL=C.UTF-8 \
|
||||||
PYTHONDONTWRITEBYTECODE=1 \
|
PYTHONDONTWRITEBYTECODE=1 \
|
||||||
PYTHONFAULTHANDLER=1 \
|
PYTHONFAULTHANDLER=1
|
||||||
PATH="/root/.local/bin:$PATH"
|
|
||||||
|
|
||||||
|
|
||||||
ARG TARGETARCH
|
ARG TARGETARCH
|
||||||
|
|
||||||
# Installing system dependencies
|
# Installing system dependencies
|
||||||
RUN apt-get update && \
|
RUN apt-get update && \
|
||||||
apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool python3-tk
|
apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool python3-tk
|
||||||
|
|
||||||
# Poetry and runtime
|
# Poetry and runtime
|
||||||
FROM base AS runtime
|
FROM base AS runtime
|
||||||
|
|||||||
1428
poetry.lock
generated
1428
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "auto-archiver"
|
name = "auto-archiver"
|
||||||
version = "1.2.3"
|
version = "1.2.7"
|
||||||
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
|
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
|
||||||
|
|
||||||
requires-python = ">=3.10,<3.13"
|
requires-python = ">=3.10,<3.13"
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ Key Functionalities:
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import hashlib
|
import hashlib
|
||||||
|
import os
|
||||||
from typing import Any, List, Union, Dict
|
from typing import Any, List, Union, Dict
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from dataclasses_json import dataclass_json
|
from dataclasses_json import dataclass_json
|
||||||
@@ -181,8 +182,14 @@ class Metadata:
|
|||||||
media_hashes = set()
|
media_hashes = set()
|
||||||
new_media = []
|
new_media = []
|
||||||
for m in self.media:
|
for m in self.media:
|
||||||
|
if not m.filename:
|
||||||
|
new_media.append(m)
|
||||||
|
continue
|
||||||
h = m.get("hash")
|
h = m.get("hash")
|
||||||
if not h:
|
if not h:
|
||||||
|
if not os.path.exists(m.filename):
|
||||||
|
logger.warning(f"Skipping missing media file: {m.filename}")
|
||||||
|
continue
|
||||||
h = calculate_hash_in_chunks(hashlib.sha256(), int(1.6e7), m.filename)
|
h = calculate_hash_in_chunks(hashlib.sha256(), int(1.6e7), m.filename)
|
||||||
if len(h) and h in media_hashes:
|
if len(h) and h in media_hashes:
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -467,7 +467,11 @@ Here's how that would look: \n\nsteps:\n extractors:\n - [your_extractor_name_
|
|||||||
return self.setup_complete_parser(basic_config, yaml_config, unused_args)
|
return self.setup_complete_parser(basic_config, yaml_config, unused_args)
|
||||||
|
|
||||||
def check_for_updates(self):
|
def check_for_updates(self):
|
||||||
response = requests.get("https://pypi.org/pypi/auto-archiver/json").json()
|
try:
|
||||||
|
response = requests.get("https://pypi.org/pypi/auto-archiver/json", timeout=10).json()
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Unable to check for updates: {e}")
|
||||||
|
return
|
||||||
latest_version = version.parse(response["info"]["version"])
|
latest_version = version.parse(response["info"]["version"])
|
||||||
current_version = version.parse(__version__)
|
current_version = version.parse(__version__)
|
||||||
# check version compared to current version
|
# check version compared to current version
|
||||||
|
|||||||
@@ -73,6 +73,7 @@ class AntibotExtractorEnricher(Extractor, Enricher):
|
|||||||
if self.enrich(result):
|
if self.enrich(result):
|
||||||
result.status = "antibot"
|
result.status = "antibot"
|
||||||
return result
|
return result
|
||||||
|
return False
|
||||||
|
|
||||||
def _prepare_user_data_dir(self):
|
def _prepare_user_data_dir(self):
|
||||||
if self.user_data_dir:
|
if self.user_data_dir:
|
||||||
|
|||||||
@@ -39,12 +39,18 @@ class Bluesky(GenericDropin):
|
|||||||
media_url = "https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={}&did={}"
|
media_url = "https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={}&did={}"
|
||||||
for image_media in image_medias:
|
for image_media in image_medias:
|
||||||
url = media_url.format(image_media["image"]["ref"]["$link"], post["author"]["did"])
|
url = media_url.format(image_media["image"]["ref"]["$link"], post["author"]["did"])
|
||||||
image_media = archiver.download_from_url(url)
|
filename = archiver.download_from_url(url)
|
||||||
media.append(Media(image_media))
|
if filename:
|
||||||
|
media.append(Media(filename))
|
||||||
|
else:
|
||||||
|
logger.warning(f"Failed to download Bluesky image from {url}")
|
||||||
for video_media in video_medias:
|
for video_media in video_medias:
|
||||||
url = media_url.format(video_media["ref"]["$link"], post["author"]["did"])
|
url = media_url.format(video_media["ref"]["$link"], post["author"]["did"])
|
||||||
video_media = archiver.download_from_url(url)
|
filename = archiver.download_from_url(url)
|
||||||
media.append(Media(video_media))
|
if filename:
|
||||||
|
media.append(Media(filename))
|
||||||
|
else:
|
||||||
|
logger.warning(f"Failed to download Bluesky video from {url}")
|
||||||
return media
|
return media
|
||||||
|
|
||||||
def _get_post_data(self, post: dict) -> dict:
|
def _get_post_data(self, post: dict) -> dict:
|
||||||
|
|||||||
@@ -204,8 +204,11 @@ class GenericExtractor(Extractor):
|
|||||||
if thumbnail_url:
|
if thumbnail_url:
|
||||||
try:
|
try:
|
||||||
cover_image_path = self.download_from_url(thumbnail_url)
|
cover_image_path = self.download_from_url(thumbnail_url)
|
||||||
media = Media(cover_image_path)
|
if cover_image_path:
|
||||||
metadata.add_media(media, id="cover")
|
media = Media(cover_image_path)
|
||||||
|
metadata.add_media(media, id="cover")
|
||||||
|
else:
|
||||||
|
logger.warning(f"Failed to download cover image from {thumbnail_url}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Could not download cover image {thumbnail_url}: {e}")
|
logger.error(f"Could not download cover image {thumbnail_url}: {e}")
|
||||||
|
|
||||||
@@ -572,6 +575,8 @@ class GenericExtractor(Extractor):
|
|||||||
"--live-from-start" if self.live_from_start else "--no-live-from-start",
|
"--live-from-start" if self.live_from_start else "--no-live-from-start",
|
||||||
"--postprocessor-args",
|
"--postprocessor-args",
|
||||||
"ffmpeg:-bitexact", # ensure bitexact output to avoid mismatching hashes for same video
|
"ffmpeg:-bitexact", # ensure bitexact output to avoid mismatching hashes for same video
|
||||||
|
"--js-runtimes",
|
||||||
|
"node", # yt-dlp defaults to deno-only; node is available in the base image
|
||||||
]
|
]
|
||||||
|
|
||||||
# proxy handling
|
# proxy handling
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
from typing import Type
|
from typing import Type
|
||||||
|
|
||||||
from auto_archiver.utils import traverse_obj
|
from auto_archiver.utils import traverse_obj
|
||||||
|
from auto_archiver.utils.custom_logger import logger
|
||||||
from auto_archiver.core.metadata import Metadata, Media
|
from auto_archiver.core.metadata import Metadata, Media
|
||||||
from auto_archiver.core.extractor import Extractor
|
from auto_archiver.core.extractor import Extractor
|
||||||
from yt_dlp.extractor.common import InfoExtractor
|
from yt_dlp.extractor.common import InfoExtractor
|
||||||
@@ -58,6 +59,9 @@ class Truth(GenericDropin):
|
|||||||
# add the media
|
# add the media
|
||||||
for media in post.get("media_attachments", []):
|
for media in post.get("media_attachments", []):
|
||||||
filename = archiver.download_from_url(media["url"])
|
filename = archiver.download_from_url(media["url"])
|
||||||
|
if not filename:
|
||||||
|
logger.warning(f"Failed to download media from {media['url']}")
|
||||||
|
continue
|
||||||
result.add_media(Media(filename), id=media.get("id"))
|
result.add_media(Media(filename), id=media.get("id"))
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|||||||
@@ -157,5 +157,8 @@ class Twitter(GenericDropin):
|
|||||||
mimetype = variant["content_type"]
|
mimetype = variant["content_type"]
|
||||||
ext = mimetypes.guess_extension(mimetype)
|
ext = mimetypes.guess_extension(mimetype)
|
||||||
media.filename = archiver.download_from_url(media.get("src"), f"{slugify(url)}_{i}{ext}")
|
media.filename = archiver.download_from_url(media.get("src"), f"{slugify(url)}_{i}{ext}")
|
||||||
|
if not media.filename:
|
||||||
|
logger.warning(f"Failed to download media from {media.get('src')}")
|
||||||
|
continue
|
||||||
result.add_media(media)
|
result.add_media(media)
|
||||||
return result
|
return result
|
||||||
|
|||||||
@@ -0,0 +1 @@
|
|||||||
|
from .ghostarchive_enricher import GhostarchiveEnricher
|
||||||
@@ -0,0 +1,58 @@
|
|||||||
|
{
|
||||||
|
"name": "Ghost Archive Enricher",
|
||||||
|
"type": ["enricher"],
|
||||||
|
"entry_point": "ghostarchive_enricher::GhostarchiveEnricher",
|
||||||
|
"requires_setup": False,
|
||||||
|
"dependencies": {
|
||||||
|
"python": ["loguru", "requests", "bs4", "seleniumbase"],
|
||||||
|
},
|
||||||
|
"configs": {
|
||||||
|
"timeout": {
|
||||||
|
"default": 120,
|
||||||
|
"type": "int",
|
||||||
|
"help": "seconds to wait for successful archive confirmation from Ghost Archive.",
|
||||||
|
},
|
||||||
|
"check_existing": {
|
||||||
|
"default": True,
|
||||||
|
"type": "bool",
|
||||||
|
"help": "whether to search for an existing archive before submitting a new one.",
|
||||||
|
},
|
||||||
|
"proxy_http": {
|
||||||
|
"default": None,
|
||||||
|
"help": "http proxy to use for requests, eg http://proxy-user:password@proxy-ip:port",
|
||||||
|
},
|
||||||
|
"proxy_https": {
|
||||||
|
"default": None,
|
||||||
|
"help": "https proxy to use for requests, eg https://proxy-user:password@proxy-ip:port",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"description": """
|
||||||
|
Submits the current URL to [Ghost Archive](https://ghostarchive.org/) for archiving and returns the archived page URL.
|
||||||
|
|
||||||
|
Used as an **enricher** to add a Ghost Archive URL to items already extracted by other modules.
|
||||||
|
|
||||||
|
### Features
|
||||||
|
- Archives any public URL using the Ghost Archive service.
|
||||||
|
- Optionally checks for existing archives before submitting a new one.
|
||||||
|
- Supports HTTP and HTTPS proxies for requests.
|
||||||
|
- Parses HTML responses to extract archive URLs (Ghost Archive has no JSON API).
|
||||||
|
|
||||||
|
### Important
|
||||||
|
- This module confirms that Ghost Archive accepted the URL submission and returned an archive link.
|
||||||
|
It does **not** verify the contents or completeness of the archived page.
|
||||||
|
|
||||||
|
### Notes
|
||||||
|
- Ghost Archive is a free service with no authentication required.
|
||||||
|
- Archived pages must be smaller than 50 MB (including CSS, fonts, images, etc.).
|
||||||
|
- Videos are archived up to 360p and must be under 100 MB and shorter than 30 minutes.
|
||||||
|
- Archival may take up to 5 minutes depending on the queue and page complexity.
|
||||||
|
- Archived content is stored indefinitely.
|
||||||
|
- Ghost Archive does not archive pages that require authentication or form submission.
|
||||||
|
|
||||||
|
### Limitations
|
||||||
|
- No official API — this module interacts with the Ghost Archive web interface.
|
||||||
|
- The submission endpoint is protected by Cloudflare, so a headless browser (SeleniumBase) is used for new submissions.
|
||||||
|
- Searching for existing archives uses plain HTTP requests and does not require a browser.
|
||||||
|
- Rate limiting may apply; consider using a delay between requests if archiving many URLs.
|
||||||
|
""",
|
||||||
|
}
|
||||||
@@ -0,0 +1,153 @@
|
|||||||
|
import time
|
||||||
|
import re
|
||||||
|
|
||||||
|
import requests
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
from seleniumbase import SB
|
||||||
|
from auto_archiver.utils.custom_logger import logger
|
||||||
|
from auto_archiver.utils import url as UrlUtil
|
||||||
|
from auto_archiver.core import Enricher, Metadata
|
||||||
|
|
||||||
|
|
||||||
|
class GhostarchiveEnricher(Enricher):
|
||||||
|
"""
|
||||||
|
Submits the current URL to Ghost Archive (ghostarchive.org) for archiving
|
||||||
|
and stores the archived page URL as enrichment metadata.
|
||||||
|
|
||||||
|
Ghost Archive has no official API — this module interacts with the web form
|
||||||
|
and parses HTML responses. The submission endpoint is protected by Cloudflare,
|
||||||
|
so a headless browser (SeleniumBase) is used for archival submissions, while
|
||||||
|
plain HTTP requests are used for searching existing archives.
|
||||||
|
|
||||||
|
Note: this module only confirms that Ghost Archive accepted the submission
|
||||||
|
and returned an archive URL. It does not verify that the archived page
|
||||||
|
content is complete or correctly rendered.
|
||||||
|
"""
|
||||||
|
|
||||||
|
GHOSTARCHIVE_BASE = "https://ghostarchive.org"
|
||||||
|
ARCHIVE_ENDPOINT = f"{GHOSTARCHIVE_BASE}/archive2"
|
||||||
|
SEARCH_ENDPOINT = f"{GHOSTARCHIVE_BASE}/search"
|
||||||
|
ARCHIVE_URL_PATTERN = re.compile(r"/archive/([A-Za-z0-9]+)")
|
||||||
|
|
||||||
|
def _get_proxies(self) -> dict:
|
||||||
|
proxies = {}
|
||||||
|
if self.proxy_http:
|
||||||
|
proxies["http"] = self.proxy_http
|
||||||
|
if self.proxy_https:
|
||||||
|
proxies["https"] = self.proxy_https
|
||||||
|
return proxies
|
||||||
|
|
||||||
|
def _get_headers(self) -> dict:
|
||||||
|
return {
|
||||||
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||||||
|
}
|
||||||
|
|
||||||
|
def _normalize_archive_href(self, href: str) -> str | None:
|
||||||
|
"""Normalize an archive link href to a full HTTPS URL, filtering out replay links."""
|
||||||
|
if "/archive/" not in href or "/replay/" in href:
|
||||||
|
return None
|
||||||
|
if href.startswith("/"):
|
||||||
|
return f"{self.GHOSTARCHIVE_BASE}{href}"
|
||||||
|
if href.startswith("http://ghostarchive.org"):
|
||||||
|
return href.replace("http://", "https://")
|
||||||
|
if href.startswith("https://ghostarchive.org"):
|
||||||
|
return href
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _search_existing(self, url: str) -> str | None:
|
||||||
|
"""
|
||||||
|
Search Ghost Archive for an existing archive of the given URL.
|
||||||
|
Returns the archive URL if found, otherwise None.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
r = requests.get(
|
||||||
|
self.SEARCH_ENDPOINT,
|
||||||
|
params={"term": url},
|
||||||
|
headers=self._get_headers(),
|
||||||
|
proxies=self._get_proxies(),
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
if r.status_code != 200:
|
||||||
|
logger.warning(f"Ghost Archive search returned status {r.status_code}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
soup = BeautifulSoup(r.text, "html.parser")
|
||||||
|
for link in soup.find_all("a", href=True):
|
||||||
|
archive_url = self._normalize_archive_href(link["href"])
|
||||||
|
if archive_url:
|
||||||
|
logger.info(f"Found existing Ghost Archive: {archive_url}")
|
||||||
|
return archive_url
|
||||||
|
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.warning(f"Ghost Archive search failed: {e}")
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _submit_url(self, url: str) -> str | None:
|
||||||
|
"""
|
||||||
|
Submit a URL to Ghost Archive for archiving using a headless browser.
|
||||||
|
The /archive2 endpoint is Cloudflare-protected, requiring JS execution.
|
||||||
|
Returns the archive URL if successful, otherwise None.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
with SB(uc=True, headless=True) as sb:
|
||||||
|
logger.debug("Opening Ghost Archive homepage in headless browser")
|
||||||
|
sb.open(self.GHOSTARCHIVE_BASE)
|
||||||
|
|
||||||
|
# fill in the archive form and submit
|
||||||
|
sb.type('input[name="archive"]', url)
|
||||||
|
sb.click('input[type="submit"][value="Submit for archival"]')
|
||||||
|
|
||||||
|
# wait for navigation to /archive/{id} or timeout
|
||||||
|
start_time = time.time()
|
||||||
|
while time.time() - start_time < self.timeout:
|
||||||
|
current_url = sb.get_current_url()
|
||||||
|
if self.ARCHIVE_URL_PATTERN.search(current_url):
|
||||||
|
archive_url = current_url.split("?")[0]
|
||||||
|
logger.info(f"Ghost Archive saved: {archive_url}")
|
||||||
|
return archive_url
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
# if we didn't redirect, try parsing the page source
|
||||||
|
page_source = sb.get_page_source()
|
||||||
|
return self._parse_archive_url(page_source)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Ghost Archive submission failed: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _parse_archive_url(self, html: str) -> str | None:
|
||||||
|
"""Parse HTML response to find an archive URL."""
|
||||||
|
soup = BeautifulSoup(html, "html.parser")
|
||||||
|
for link in soup.find_all("a", href=True):
|
||||||
|
archive_url = self._normalize_archive_href(link["href"])
|
||||||
|
if archive_url:
|
||||||
|
return archive_url
|
||||||
|
return None
|
||||||
|
|
||||||
|
def enrich(self, to_enrich: Metadata) -> bool:
|
||||||
|
url = to_enrich.get_url()
|
||||||
|
if UrlUtil.is_auth_wall(url):
|
||||||
|
logger.debug("[SKIP] Ghost Archive since url is behind AUTH WALL")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if to_enrich.get("ghostarchive"):
|
||||||
|
logger.info(f"Ghost Archive enricher had already been executed: {to_enrich.get('ghostarchive')}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
# optionally check for existing archive first
|
||||||
|
archive_url = None
|
||||||
|
if self.check_existing:
|
||||||
|
logger.debug(f"Searching Ghost Archive for existing archive of {url}")
|
||||||
|
archive_url = self._search_existing(url)
|
||||||
|
|
||||||
|
if not archive_url:
|
||||||
|
logger.debug(f"Submitting {url} to Ghost Archive")
|
||||||
|
archive_url = self._submit_url(url)
|
||||||
|
|
||||||
|
if archive_url:
|
||||||
|
to_enrich.set("ghostarchive", archive_url)
|
||||||
|
return True
|
||||||
|
|
||||||
|
logger.warning(f"Ghost Archive failed to archive {url}")
|
||||||
|
return False
|
||||||
@@ -25,6 +25,9 @@ class HashEnricher(Enricher):
|
|||||||
logger.debug(f"Calculating media hashes with algo={self.algorithm}")
|
logger.debug(f"Calculating media hashes with algo={self.algorithm}")
|
||||||
|
|
||||||
for i, m in enumerate(to_enrich.media):
|
for i, m in enumerate(to_enrich.media):
|
||||||
|
if not m.filename:
|
||||||
|
logger.warning(f"Skipping hash for media without filename: {m}")
|
||||||
|
continue
|
||||||
if len(hd := self.calculate_hash(m.filename)):
|
if len(hd := self.calculate_hash(m.filename)):
|
||||||
to_enrich.media[i].set("hash", f"{self.algorithm}:{hd}")
|
to_enrich.media[i].set("hash", f"{self.algorithm}:{hd}")
|
||||||
|
|
||||||
|
|||||||
@@ -99,7 +99,10 @@ class InstagramAPIExtractor(Extractor):
|
|||||||
result.set_title(user.get("full_name", username)).set("data", user)
|
result.set_title(user.get("full_name", username)).set("data", user)
|
||||||
if pic_url := user.get("profile_pic_url_hd", user.get("profile_pic_url")):
|
if pic_url := user.get("profile_pic_url_hd", user.get("profile_pic_url")):
|
||||||
filename = self.download_from_url(pic_url)
|
filename = self.download_from_url(pic_url)
|
||||||
result.add_media(Media(filename=filename), id="profile_picture")
|
if filename:
|
||||||
|
result.add_media(Media(filename=filename), id="profile_picture")
|
||||||
|
else:
|
||||||
|
logger.warning(f"Failed to download profile picture from {pic_url}")
|
||||||
|
|
||||||
count_posts = 0
|
count_posts = 0
|
||||||
if self.full_profile:
|
if self.full_profile:
|
||||||
@@ -202,7 +205,10 @@ class InstagramAPIExtractor(Extractor):
|
|||||||
|
|
||||||
if cover_media := h_info.get("cover_media", {}).get("cropped_image_version", {}).get("url"):
|
if cover_media := h_info.get("cover_media", {}).get("cropped_image_version", {}).get("url"):
|
||||||
filename = self.download_from_url(cover_media)
|
filename = self.download_from_url(cover_media)
|
||||||
result.add_media(Media(filename=filename), id=f"cover_media highlight {id}")
|
if filename:
|
||||||
|
result.add_media(Media(filename=filename), id=f"cover_media highlight {id}")
|
||||||
|
else:
|
||||||
|
logger.warning(f"Failed to download cover media from {cover_media}")
|
||||||
|
|
||||||
items = h_info.get("items", [])[::-1] # newest to oldest
|
items = h_info.get("items", [])[::-1] # newest to oldest
|
||||||
items = items[: min(max_to_download, len(items))]
|
items = items[: min(max_to_download, len(items))]
|
||||||
@@ -345,7 +351,10 @@ class InstagramAPIExtractor(Extractor):
|
|||||||
image_media = None
|
image_media = None
|
||||||
if image_url := item.get("thumbnail_url"):
|
if image_url := item.get("thumbnail_url"):
|
||||||
filename = self.download_from_url(image_url, verbose=False)
|
filename = self.download_from_url(image_url, verbose=False)
|
||||||
image_media = Media(filename=filename)
|
if filename:
|
||||||
|
image_media = Media(filename=filename)
|
||||||
|
else:
|
||||||
|
logger.warning(f"Failed to download thumbnail from {image_url}")
|
||||||
|
|
||||||
# retrieve video info
|
# retrieve video info
|
||||||
best_id = item.get("id", item.get("pk"))
|
best_id = item.get("id", item.get("pk"))
|
||||||
@@ -357,16 +366,19 @@ class InstagramAPIExtractor(Extractor):
|
|||||||
|
|
||||||
if video_url := item.get("video_url"):
|
if video_url := item.get("video_url"):
|
||||||
filename = self.download_from_url(video_url, verbose=False)
|
filename = self.download_from_url(video_url, verbose=False)
|
||||||
video_media = Media(filename=filename)
|
if filename:
|
||||||
if taken_at:
|
video_media = Media(filename=filename)
|
||||||
video_media.set("date", taken_at)
|
if taken_at:
|
||||||
if code:
|
video_media.set("date", taken_at)
|
||||||
video_media.set("url", f"https://www.instagram.com/p/{code}")
|
if code:
|
||||||
if caption_text:
|
video_media.set("url", f"https://www.instagram.com/p/{code}")
|
||||||
video_media.set("text", caption_text)
|
if caption_text:
|
||||||
video_media.set("preview", [image_media])
|
video_media.set("text", caption_text)
|
||||||
video_media.set("data", [item])
|
video_media.set("preview", [image_media])
|
||||||
return item, video_media, f"{context or 'video'} {best_id}"
|
video_media.set("data", [item])
|
||||||
|
return item, video_media, f"{context or 'video'} {best_id}"
|
||||||
|
else:
|
||||||
|
logger.warning(f"Failed to download video from {video_url}")
|
||||||
elif image_media:
|
elif image_media:
|
||||||
if taken_at:
|
if taken_at:
|
||||||
image_media.set("date", taken_at)
|
image_media.set("date", taken_at)
|
||||||
|
|||||||
@@ -25,6 +25,9 @@ class MetaEnricher(Enricher):
|
|||||||
logger.debug(f"Calculating archive file sizes for {len(to_enrich.media)} media files")
|
logger.debug(f"Calculating archive file sizes for {len(to_enrich.media)} media files")
|
||||||
total_size = 0
|
total_size = 0
|
||||||
for media in to_enrich.get_all_media():
|
for media in to_enrich.get_all_media():
|
||||||
|
if not media.filename:
|
||||||
|
logger.warning(f"Skipping file size for media without filename: {media}")
|
||||||
|
continue
|
||||||
file_stats = os.stat(media.filename)
|
file_stats = os.stat(media.filename)
|
||||||
media.set("bytes", file_stats.st_size)
|
media.set("bytes", file_stats.st_size)
|
||||||
media.set("size", self.human_readable_bytes(file_stats.st_size))
|
media.set("size", self.human_readable_bytes(file_stats.st_size))
|
||||||
|
|||||||
@@ -49,10 +49,18 @@ class TelegramExtractor(Extractor):
|
|||||||
if not len(image_urls):
|
if not len(image_urls):
|
||||||
return False
|
return False
|
||||||
for img_url in image_urls:
|
for img_url in image_urls:
|
||||||
result.add_media(Media(self.download_from_url(img_url)))
|
filename = self.download_from_url(img_url)
|
||||||
|
if not filename:
|
||||||
|
logger.warning(f"Failed to download image from {img_url}")
|
||||||
|
continue
|
||||||
|
result.add_media(Media(filename))
|
||||||
else:
|
else:
|
||||||
video_url = video.get("src")
|
video_url = video.get("src")
|
||||||
m_video = Media(self.download_from_url(video_url))
|
video_filename = self.download_from_url(video_url)
|
||||||
|
if not video_filename:
|
||||||
|
logger.warning(f"Failed to download video from {video_url}")
|
||||||
|
return False
|
||||||
|
m_video = Media(video_filename)
|
||||||
# extract duration from HTML
|
# extract duration from HTML
|
||||||
try:
|
try:
|
||||||
duration = s.find_all("time")[0].contents[0]
|
duration = s.find_all("time")[0].contents[0]
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import asyncio
|
||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
import re
|
import re
|
||||||
@@ -53,6 +54,16 @@ class TelethonExtractor(Extractor):
|
|||||||
logger.debug(f"Making a copy of the session file {base_session_filepath} to {self.session_file}.session")
|
logger.debug(f"Making a copy of the session file {base_session_filepath} to {self.session_file}.session")
|
||||||
shutil.copy(base_session_filepath, f"{self.session_file}.session")
|
shutil.copy(base_session_filepath, f"{self.session_file}.session")
|
||||||
|
|
||||||
|
# ensure a running event loop exists (Needed when used by Celery workers which may close the default one)
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
if loop.is_closed():
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
except RuntimeError:
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
|
||||||
# initiate the client
|
# initiate the client
|
||||||
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
|
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
|
||||||
|
|
||||||
@@ -190,6 +201,9 @@ class TelethonExtractor(Extractor):
|
|||||||
)
|
)
|
||||||
for i, om_url in enumerate(other_media_urls):
|
for i, om_url in enumerate(other_media_urls):
|
||||||
filename = self.download_from_url(om_url, f"{chat}_{group_id}_{i}")
|
filename = self.download_from_url(om_url, f"{chat}_{group_id}_{i}")
|
||||||
|
if not filename:
|
||||||
|
logger.warning(f"Failed to download media from {om_url}")
|
||||||
|
continue
|
||||||
result.add_media(Media(filename=filename), id=f"{group_id}_{i}")
|
result.add_media(Media(filename=filename), id=f"{group_id}_{i}")
|
||||||
|
|
||||||
filename_dest = os.path.join(self.tmp_dir, f"{chat}_{group_id}", str(mp.id))
|
filename_dest = os.path.join(self.tmp_dir, f"{chat}_{group_id}", str(mp.id))
|
||||||
|
|||||||
@@ -114,6 +114,9 @@ class TwitterApiExtractor(Extractor):
|
|||||||
logger.info(f"Found media {media}")
|
logger.info(f"Found media {media}")
|
||||||
ext = mimetypes.guess_extension(mimetype)
|
ext = mimetypes.guess_extension(mimetype)
|
||||||
media.filename = self.download_from_url(media.get("src"), f"{slugify(url)}_{i}{ext}")
|
media.filename = self.download_from_url(media.get("src"), f"{slugify(url)}_{i}{ext}")
|
||||||
|
if not media.filename:
|
||||||
|
logger.warning(f"Failed to download media from {media.get('src')}")
|
||||||
|
continue
|
||||||
result.add_media(media)
|
result.add_media(media)
|
||||||
|
|
||||||
result.set_content(
|
result.set_content(
|
||||||
|
|||||||
@@ -64,7 +64,6 @@ class DeletionIndicators:
|
|||||||
# YouTube deletion indicators
|
# YouTube deletion indicators
|
||||||
YOUTUBE = [
|
YOUTUBE = [
|
||||||
"This video isn't available anymore",
|
"This video isn't available anymore",
|
||||||
"Video unavailable",
|
|
||||||
"This video has been removed",
|
"This video has been removed",
|
||||||
"This video is no longer available",
|
"This video is no longer available",
|
||||||
"This video is private",
|
"This video is private",
|
||||||
|
|||||||
@@ -120,6 +120,9 @@ def ydl_entry_to_filename(ydl, entry: dict) -> str:
|
|||||||
directory = os.path.dirname(base_filename) # '/get/path/to'
|
directory = os.path.dirname(base_filename) # '/get/path/to'
|
||||||
basename = os.path.basename(base_filename) # 'file'
|
basename = os.path.basename(base_filename) # 'file'
|
||||||
for f in os.listdir(directory):
|
for f in os.listdir(directory):
|
||||||
|
# skip incomplete downloads left behind by yt-dlp
|
||||||
|
if f.endswith(".part"):
|
||||||
|
continue
|
||||||
if (
|
if (
|
||||||
f.startswith(basename)
|
f.startswith(basename)
|
||||||
or (entry_url and os.path.splitext(f)[0] in entry_url)
|
or (entry_url and os.path.splitext(f)[0] in entry_url)
|
||||||
|
|||||||
277
tests/enrichers/test_ghostarchive_enricher.py
Normal file
277
tests/enrichers/test_ghostarchive_enricher.py
Normal file
@@ -0,0 +1,277 @@
|
|||||||
|
import pytest
|
||||||
|
import requests
|
||||||
|
import os
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
from auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher import GhostarchiveEnricher
|
||||||
|
|
||||||
|
CI = os.getenv("GITHUB_ACTIONS", "") == "true"
|
||||||
|
|
||||||
|
# sample HTML responses for mocking
|
||||||
|
SEARCH_HTML_FOUND = """
|
||||||
|
<html><body>
|
||||||
|
<h1>Archives for https://example.com</h1>
|
||||||
|
<table>
|
||||||
|
<tr><td><a href="http://ghostarchive.org/archive/Abc12">https://example.com</a></td></tr>
|
||||||
|
</table>
|
||||||
|
</body></html>
|
||||||
|
"""
|
||||||
|
|
||||||
|
SEARCH_HTML_NOT_FOUND = """
|
||||||
|
<html><body>
|
||||||
|
<h1>Archives for https://example.com</h1>
|
||||||
|
<p>Page 0 out of 0</p>
|
||||||
|
<p>No archives for that site.</p>
|
||||||
|
</body></html>
|
||||||
|
"""
|
||||||
|
|
||||||
|
SAVE_RESPONSE_HTML_WITH_LINK = """
|
||||||
|
<html><body>
|
||||||
|
<h1>Archive saved</h1>
|
||||||
|
<a href="/archive/Xyz99">View archive</a>
|
||||||
|
</body></html>
|
||||||
|
"""
|
||||||
|
|
||||||
|
ENRICHER_CONFIG = {
|
||||||
|
"timeout": 120,
|
||||||
|
"check_existing": True,
|
||||||
|
"proxy_http": None,
|
||||||
|
"proxy_https": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class TestGhostarchiveEnricher:
|
||||||
|
"""Tests for Ghost Archive Enricher"""
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def setup_enricher(self, setup_module):
|
||||||
|
self.enricher: GhostarchiveEnricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
|
||||||
|
|
||||||
|
def test_search_existing_found(self, mocker):
|
||||||
|
"""When an existing archive is found, it should be returned."""
|
||||||
|
mock_response = mocker.Mock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.text = SEARCH_HTML_FOUND
|
||||||
|
mocker.patch(
|
||||||
|
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
|
||||||
|
)
|
||||||
|
|
||||||
|
result = self.enricher._search_existing("https://example.com")
|
||||||
|
assert result == "https://ghostarchive.org/archive/Abc12"
|
||||||
|
|
||||||
|
def test_search_existing_not_found(self, mocker):
|
||||||
|
"""When no existing archive is found, None should be returned."""
|
||||||
|
mock_response = mocker.Mock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.text = SEARCH_HTML_NOT_FOUND
|
||||||
|
mocker.patch(
|
||||||
|
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
|
||||||
|
)
|
||||||
|
|
||||||
|
result = self.enricher._search_existing("https://example.com")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_search_existing_request_error(self, mocker):
|
||||||
|
"""When search request fails, None should be returned."""
|
||||||
|
mocker.patch(
|
||||||
|
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get",
|
||||||
|
side_effect=requests.exceptions.ConnectionError("connection failed"),
|
||||||
|
)
|
||||||
|
|
||||||
|
result = self.enricher._search_existing("https://example.com")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_search_existing_non_200(self, mocker):
|
||||||
|
"""When search returns non-200, None should be returned."""
|
||||||
|
mock_response = mocker.Mock()
|
||||||
|
mock_response.status_code = 503
|
||||||
|
mocker.patch(
|
||||||
|
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
|
||||||
|
)
|
||||||
|
|
||||||
|
result = self.enricher._search_existing("https://example.com")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_submit_url_success_redirect(self, mocker):
|
||||||
|
"""Successful submission via headless browser should return archive URL."""
|
||||||
|
mock_sb = MagicMock()
|
||||||
|
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive/NewId1"
|
||||||
|
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
|
||||||
|
mock_sb.__exit__ = MagicMock(return_value=False)
|
||||||
|
|
||||||
|
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
|
||||||
|
|
||||||
|
result = self.enricher._submit_url("https://example.com")
|
||||||
|
assert result == "https://ghostarchive.org/archive/NewId1"
|
||||||
|
mock_sb.type.assert_called_once()
|
||||||
|
mock_sb.click.assert_called_once()
|
||||||
|
|
||||||
|
def test_submit_url_success_redirect_strips_query(self, mocker):
|
||||||
|
"""Redirect URL query params should be stripped."""
|
||||||
|
mock_sb = MagicMock()
|
||||||
|
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive/NewId1?wr=false"
|
||||||
|
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
|
||||||
|
mock_sb.__exit__ = MagicMock(return_value=False)
|
||||||
|
|
||||||
|
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
|
||||||
|
|
||||||
|
result = self.enricher._submit_url("https://example.com")
|
||||||
|
assert result == "https://ghostarchive.org/archive/NewId1"
|
||||||
|
|
||||||
|
def test_submit_url_success_html_fallback(self, mocker):
|
||||||
|
"""When browser doesn't redirect, should parse page source for archive link."""
|
||||||
|
mock_sb = MagicMock()
|
||||||
|
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive2"
|
||||||
|
mock_sb.get_page_source.return_value = SAVE_RESPONSE_HTML_WITH_LINK
|
||||||
|
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
|
||||||
|
mock_sb.__exit__ = MagicMock(return_value=False)
|
||||||
|
|
||||||
|
# make timeout=0 so the polling loop exits immediately and falls through to HTML parsing
|
||||||
|
self.enricher.timeout = 0
|
||||||
|
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
|
||||||
|
|
||||||
|
result = self.enricher._submit_url("https://example.com")
|
||||||
|
assert result == "https://ghostarchive.org/archive/Xyz99"
|
||||||
|
|
||||||
|
def test_submit_url_browser_error(self, mocker):
|
||||||
|
"""Browser error during submission should return None."""
|
||||||
|
mocker.patch(
|
||||||
|
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB",
|
||||||
|
side_effect=Exception("browser failed to start"),
|
||||||
|
)
|
||||||
|
|
||||||
|
result = self.enricher._submit_url("https://example.com")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_proxy_configuration(self, mocker):
|
||||||
|
"""Proxies should be passed to search requests when configured."""
|
||||||
|
self.enricher.proxy_http = "http://proxy:8080"
|
||||||
|
self.enricher.proxy_https = "https://proxy:8443"
|
||||||
|
|
||||||
|
mock_get = mocker.patch(
|
||||||
|
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get",
|
||||||
|
)
|
||||||
|
mock_response = mocker.Mock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.text = SEARCH_HTML_FOUND
|
||||||
|
mock_get.return_value = mock_response
|
||||||
|
|
||||||
|
result = self.enricher._search_existing("https://example.com")
|
||||||
|
|
||||||
|
call_kwargs = mock_get.call_args
|
||||||
|
assert call_kwargs.kwargs.get("proxies") == {"http": "http://proxy:8080", "https": "https://proxy:8443"}
|
||||||
|
assert result is not None
|
||||||
|
|
||||||
|
def test_parse_archive_url_with_replay_links(self):
|
||||||
|
"""Parser should ignore /replay/ links and only return /archive/ links."""
|
||||||
|
html = """
|
||||||
|
<html><body>
|
||||||
|
<a href="/archive/replay/w/id-abc/mp_/https://example.com">replay</a>
|
||||||
|
<a href="/archive/Valid1">valid</a>
|
||||||
|
</body></html>
|
||||||
|
"""
|
||||||
|
result = self.enricher._parse_archive_url(html)
|
||||||
|
assert result == "https://ghostarchive.org/archive/Valid1"
|
||||||
|
|
||||||
|
def test_parse_archive_url_no_links(self):
|
||||||
|
"""Parser should return None when no archive links found."""
|
||||||
|
html = "<html><body><p>No archive here</p></body></html>"
|
||||||
|
result = self.enricher._parse_archive_url(html)
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_enrich_sets_ghostarchive_on_metadata(self, mocker, make_item):
|
||||||
|
"""enrich() should set 'ghostarchive' key on the metadata object."""
|
||||||
|
mocker.patch.object(self.enricher, "_search_existing", return_value="https://ghostarchive.org/archive/Enr1")
|
||||||
|
|
||||||
|
item = make_item("https://example.com")
|
||||||
|
result = self.enricher.enrich(item)
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/Enr1"
|
||||||
|
|
||||||
|
def test_enrich_skips_if_already_enriched(self, mocker, make_item):
|
||||||
|
"""enrich() should skip if ghostarchive key is already set."""
|
||||||
|
mock_search = mocker.patch.object(self.enricher, "_search_existing")
|
||||||
|
|
||||||
|
item = make_item("https://example.com", ghostarchive="https://ghostarchive.org/archive/Old1")
|
||||||
|
result = self.enricher.enrich(item)
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
mock_search.assert_not_called()
|
||||||
|
|
||||||
|
def test_enrich_returns_false_on_failure(self, mocker, make_item):
|
||||||
|
"""enrich() should return False when both search and submit fail."""
|
||||||
|
mocker.patch.object(self.enricher, "_search_existing", return_value=None)
|
||||||
|
mocker.patch.object(self.enricher, "_submit_url", return_value=None)
|
||||||
|
|
||||||
|
item = make_item("https://example.com")
|
||||||
|
result = self.enricher.enrich(item)
|
||||||
|
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
def test_enrich_skips_auth_wall(self, mocker, make_item):
|
||||||
|
"""enrich() should skip URLs behind auth walls."""
|
||||||
|
mocker.patch(
|
||||||
|
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.UrlUtil.is_auth_wall", return_value=True
|
||||||
|
)
|
||||||
|
|
||||||
|
item = make_item("https://example.com/login")
|
||||||
|
result = self.enricher.enrich(item)
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
def test_enrich_with_existing_archive(self, mocker, make_item):
|
||||||
|
"""enrich() should use existing archive when check_existing is True."""
|
||||||
|
mocker.patch.object(self.enricher, "_search_existing", return_value="https://ghostarchive.org/archive/Exist1")
|
||||||
|
mock_submit = mocker.patch.object(self.enricher, "_submit_url")
|
||||||
|
|
||||||
|
item = make_item("https://example.com")
|
||||||
|
result = self.enricher.enrich(item)
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/Exist1"
|
||||||
|
mock_submit.assert_not_called()
|
||||||
|
|
||||||
|
def test_enrich_submits_when_no_existing(self, mocker, make_item):
|
||||||
|
"""enrich() should submit URL when no existing archive found."""
|
||||||
|
mocker.patch.object(self.enricher, "_search_existing", return_value=None)
|
||||||
|
mocker.patch.object(self.enricher, "_submit_url", return_value="https://ghostarchive.org/archive/New42")
|
||||||
|
|
||||||
|
item = make_item("https://example.com")
|
||||||
|
result = self.enricher.enrich(item)
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/New42"
|
||||||
|
|
||||||
|
def test_enrich_skips_check_existing_when_disabled(self, mocker, make_item):
|
||||||
|
"""enrich() should skip search when check_existing is False."""
|
||||||
|
self.enricher.check_existing = False
|
||||||
|
mock_search = mocker.patch.object(self.enricher, "_search_existing")
|
||||||
|
mocker.patch.object(self.enricher, "_submit_url", return_value="https://ghostarchive.org/archive/Direct1")
|
||||||
|
|
||||||
|
item = make_item("https://example.com")
|
||||||
|
result = self.enricher.enrich(item)
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
mock_search.assert_not_called()
|
||||||
|
|
||||||
|
@pytest.mark.download
|
||||||
|
def test_real_search_existing(self, setup_module):
|
||||||
|
"""Integration test: search for an existing archive on Ghost Archive."""
|
||||||
|
enricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
|
||||||
|
# example.com is commonly archived
|
||||||
|
result = enricher._search_existing("https://example.com")
|
||||||
|
# we just check it doesn't crash; result may or may not be found
|
||||||
|
assert result is None or result.startswith("https://ghostarchive.org/archive/")
|
||||||
|
|
||||||
|
@pytest.mark.download
|
||||||
|
@pytest.mark.skipif(CI, reason="Avoid submitting a real task on every CI run")
|
||||||
|
def test_real_submit_example_com(self, setup_module, make_item):
|
||||||
|
"""Integration test: submit example.com to Ghost Archive and verify enrichment."""
|
||||||
|
enricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
|
||||||
|
item = make_item("https://example.com")
|
||||||
|
result = enricher.enrich(item)
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
archive_url = item.get("ghostarchive")
|
||||||
|
assert archive_url is not None
|
||||||
|
assert archive_url.startswith("https://ghostarchive.org/archive/")
|
||||||
@@ -53,6 +53,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
|
|||||||
}
|
}
|
||||||
|
|
||||||
@pytest.mark.download
|
@pytest.mark.download
|
||||||
|
@pytest.mark.flaky(reruns=2, reruns_delay=5)
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"url,in_title,in_text,image_count,video_count,skip_ci",
|
"url,in_title,in_text,image_count,video_count,skip_ci",
|
||||||
[
|
[
|
||||||
@@ -128,6 +129,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
|
|||||||
item = make_item(url)
|
item = make_item(url)
|
||||||
result = self.extractor.download(item)
|
result = self.extractor.download(item)
|
||||||
|
|
||||||
|
assert result, f"download() returned {result!r} — Selenium may have failed (e.g., window close timeout)"
|
||||||
assert result.status == "antibot", "Expected status to be 'antibot'"
|
assert result.status == "antibot", "Expected status to be 'antibot'"
|
||||||
|
|
||||||
# Check title contains all required words (case-insensitive)
|
# Check title contains all required words (case-insensitive)
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import asyncio
|
||||||
import os
|
import os
|
||||||
from datetime import date
|
from datetime import date
|
||||||
|
|
||||||
@@ -60,3 +61,53 @@ def test_valid_url_regex(url, expected, get_lazy_module):
|
|||||||
def test_invite_pattern_regex(invite, expected, get_lazy_module):
|
def test_invite_pattern_regex(invite, expected, get_lazy_module):
|
||||||
match = TelethonExtractor.invite_pattern.search(invite)
|
match = TelethonExtractor.invite_pattern.search(invite)
|
||||||
assert bool(match) == expected
|
assert bool(match) == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_setup_with_closed_event_loop(get_lazy_module, tmp_path, mocker):
|
||||||
|
"""
|
||||||
|
Simulate the Celery worker scenario where the asyncio event loop is closed
|
||||||
|
before setup() runs. The fix should create a new event loop so that
|
||||||
|
TelegramClient.start() does not raise 'Event loop is closed'.
|
||||||
|
"""
|
||||||
|
# create a session file so setup doesn't fail on missing file
|
||||||
|
session_file = tmp_path / "test.session"
|
||||||
|
session_file.touch()
|
||||||
|
|
||||||
|
# close the current event loop to simulate a Celery worker environment
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
lazy_module = get_lazy_module("telethon_extractor")
|
||||||
|
module = lazy_module.load(
|
||||||
|
{"telethon_extractor": {"session_file": str(session_file), "api_id": 123, "api_hash": "ABC"}}
|
||||||
|
)
|
||||||
|
|
||||||
|
# setup should have succeeded and a new open event loop should exist
|
||||||
|
new_loop = asyncio.get_event_loop()
|
||||||
|
assert not new_loop.is_closed()
|
||||||
|
assert module.client is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_setup_with_no_event_loop(get_lazy_module, tmp_path, mocker):
|
||||||
|
"""
|
||||||
|
Simulate the scenario where there is no current event loop at all
|
||||||
|
(e.g. running in a non-main thread). The fix should create one.
|
||||||
|
"""
|
||||||
|
session_file = tmp_path / "test.session"
|
||||||
|
session_file.touch()
|
||||||
|
|
||||||
|
# Remove the current event loop entirely
|
||||||
|
# In Python 3.12+, get_event_loop() in a non-main thread raises RuntimeError
|
||||||
|
mocker.patch("asyncio.get_event_loop", side_effect=RuntimeError("no current event loop"))
|
||||||
|
new_loop_mock = mocker.MagicMock()
|
||||||
|
new_loop_mock.is_closed.return_value = False
|
||||||
|
mocker.patch("asyncio.new_event_loop", return_value=new_loop_mock)
|
||||||
|
set_loop = mocker.patch("asyncio.set_event_loop")
|
||||||
|
|
||||||
|
lazy_module = get_lazy_module("telethon_extractor")
|
||||||
|
lazy_module.load({"telethon_extractor": {"session_file": str(session_file), "api_id": 123, "api_hash": "ABC"}})
|
||||||
|
|
||||||
|
# a new event loop should have been created and set
|
||||||
|
asyncio.new_event_loop.assert_called_once()
|
||||||
|
set_loop.assert_called_once_with(new_loop_mock)
|
||||||
|
|||||||
@@ -86,6 +86,22 @@ def test_media_management(basic_metadata, media_file):
|
|||||||
assert basic_metadata.get_media_by_id("m1") == media1
|
assert basic_metadata.get_media_by_id("m1") == media1
|
||||||
|
|
||||||
|
|
||||||
|
def test_remove_duplicate_skips_missing_files(basic_metadata, media_file, tmp_path):
|
||||||
|
"""Missing files should be dropped instead of crashing with FileNotFoundError."""
|
||||||
|
real_file = tmp_path / "exists.txt"
|
||||||
|
real_file.write_text("content")
|
||||||
|
valid = media_file(filename=str(real_file), hash_value="abc")
|
||||||
|
missing = media_file(filename="/nonexistent/path/gone.mp4")
|
||||||
|
|
||||||
|
basic_metadata.add_media(valid, "valid")
|
||||||
|
basic_metadata.add_media(missing, "missing")
|
||||||
|
|
||||||
|
assert len(basic_metadata.media) == 2
|
||||||
|
basic_metadata.remove_duplicate_media_by_hash()
|
||||||
|
assert len(basic_metadata.media) == 1
|
||||||
|
assert basic_metadata.get_media_by_id("valid") == valid
|
||||||
|
|
||||||
|
|
||||||
def test_success():
|
def test_success():
|
||||||
m = Metadata()
|
m = Metadata()
|
||||||
assert not m.is_success()
|
assert not m.is_success()
|
||||||
|
|||||||
259
tests/test_none_filename_handling.py
Normal file
259
tests/test_none_filename_handling.py
Normal file
@@ -0,0 +1,259 @@
|
|||||||
|
"""
|
||||||
|
Tests for handling Media objects with None filename.
|
||||||
|
|
||||||
|
When download_from_url fails, it returns None. Various enrichers and
|
||||||
|
the metadata deduplication logic must gracefully handle Media objects
|
||||||
|
where filename is None, rather than crashing with TypeError.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from auto_archiver.core.metadata import Metadata, Media
|
||||||
|
from auto_archiver.modules.hash_enricher import HashEnricher
|
||||||
|
from auto_archiver.modules.meta_enricher import MetaEnricher
|
||||||
|
|
||||||
|
|
||||||
|
# ── HashEnricher ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestHashEnricherNoneFilename:
|
||||||
|
"""hash_enricher should skip media with None filename without crashing."""
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def setup(self, setup_module):
|
||||||
|
self.enricher = setup_module(HashEnricher, {"algorithm": "SHA-256", "chunksize": 100})
|
||||||
|
|
||||||
|
def test_skips_none_filename(self):
|
||||||
|
m = Metadata().set_url("https://example.com")
|
||||||
|
media = Media(filename=None)
|
||||||
|
media.set("src", "https://example.com/video.mp4")
|
||||||
|
m.add_media(media)
|
||||||
|
|
||||||
|
# Should not raise
|
||||||
|
self.enricher.enrich(m)
|
||||||
|
# No hash should be set
|
||||||
|
assert m.media[0].get("hash") is None
|
||||||
|
|
||||||
|
def test_hashes_valid_skips_none(self, tmp_path):
|
||||||
|
"""Mix of valid and None-filename media: only valid ones get hashed."""
|
||||||
|
valid_file = tmp_path / "test.txt"
|
||||||
|
valid_file.write_text("hello world")
|
||||||
|
|
||||||
|
m = Metadata().set_url("https://example.com")
|
||||||
|
m.add_media(Media(filename=str(valid_file)))
|
||||||
|
m.add_media(Media(filename=None))
|
||||||
|
|
||||||
|
self.enricher.enrich(m)
|
||||||
|
|
||||||
|
assert m.media[0].get("hash") is not None
|
||||||
|
assert m.media[1].get("hash") is None
|
||||||
|
|
||||||
|
def test_all_none_filenames(self):
|
||||||
|
"""All media have None filename – enricher should not crash."""
|
||||||
|
m = Metadata().set_url("https://example.com")
|
||||||
|
m.add_media(Media(filename=None))
|
||||||
|
m.add_media(Media(filename=None))
|
||||||
|
|
||||||
|
self.enricher.enrich(m)
|
||||||
|
|
||||||
|
assert len(m.media) == 2
|
||||||
|
for media in m.media:
|
||||||
|
assert media.get("hash") is None
|
||||||
|
|
||||||
|
|
||||||
|
# ── MetaEnricher ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestMetaEnricherNoneFilename:
|
||||||
|
"""meta_enricher should skip media with None filename without crashing."""
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def setup(self, setup_module):
|
||||||
|
self.enricher = setup_module(MetaEnricher, {})
|
||||||
|
|
||||||
|
def test_skips_none_filename(self):
|
||||||
|
m = Metadata().set_url("https://example.com")
|
||||||
|
m.set("_processed_at", datetime.now(timezone.utc))
|
||||||
|
media = Media(filename=None)
|
||||||
|
media.set("src", "https://example.com/video.mp4")
|
||||||
|
m.add_media(media)
|
||||||
|
|
||||||
|
# Should not raise
|
||||||
|
self.enricher.enrich(m)
|
||||||
|
assert m.get("total_bytes") == 0
|
||||||
|
|
||||||
|
def test_sizes_valid_skips_none(self, tmp_path):
|
||||||
|
"""Mix of valid and None-filename media: only valid ones get sized."""
|
||||||
|
valid_file = tmp_path / "test.txt"
|
||||||
|
valid_file.write_text("A" * 500)
|
||||||
|
|
||||||
|
m = Metadata().set_url("https://example.com")
|
||||||
|
m.set("_processed_at", datetime.now(timezone.utc))
|
||||||
|
m.add_media(Media(filename=str(valid_file)))
|
||||||
|
m.add_media(Media(filename=None))
|
||||||
|
|
||||||
|
self.enricher.enrich(m)
|
||||||
|
|
||||||
|
assert m.media[0].get("bytes") == 500
|
||||||
|
assert m.media[1].get("bytes") is None
|
||||||
|
assert m.get("total_bytes") == 500
|
||||||
|
|
||||||
|
|
||||||
|
# ── Metadata.remove_duplicate_media_by_hash ───────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestRemoveDuplicateMediaNoneFilename:
|
||||||
|
"""remove_duplicate_media_by_hash should keep media with None filename."""
|
||||||
|
|
||||||
|
def test_none_filename_kept(self):
|
||||||
|
m = Metadata().set_url("https://example.com")
|
||||||
|
none_media = Media(filename=None)
|
||||||
|
none_media.set("src", "https://example.com/video.mp4")
|
||||||
|
m.add_media(none_media)
|
||||||
|
|
||||||
|
m.remove_duplicate_media_by_hash()
|
||||||
|
|
||||||
|
assert len(m.media) == 1
|
||||||
|
assert m.media[0].filename is None
|
||||||
|
|
||||||
|
def test_none_and_valid_mixed(self, tmp_path):
|
||||||
|
"""None-filename media is kept alongside valid-filename media."""
|
||||||
|
valid_file = tmp_path / "test.txt"
|
||||||
|
valid_file.write_text("content")
|
||||||
|
|
||||||
|
m = Metadata().set_url("https://example.com")
|
||||||
|
m.add_media(Media(filename=str(valid_file)))
|
||||||
|
none_media = Media(filename=None)
|
||||||
|
none_media.set("src", "https://example.com/video.mp4")
|
||||||
|
m.add_media(none_media)
|
||||||
|
|
||||||
|
m.remove_duplicate_media_by_hash()
|
||||||
|
|
||||||
|
assert len(m.media) == 2
|
||||||
|
|
||||||
|
def test_multiple_none_filename_all_kept(self):
|
||||||
|
"""Multiple None-filename media are all kept (can't deduplicate without file)."""
|
||||||
|
m = Metadata().set_url("https://example.com")
|
||||||
|
m.add_media(Media(filename=None))
|
||||||
|
m.add_media(Media(filename=None))
|
||||||
|
|
||||||
|
m.remove_duplicate_media_by_hash()
|
||||||
|
|
||||||
|
assert len(m.media) == 2
|
||||||
|
|
||||||
|
|
||||||
|
# ── Twitter dropin create_metadata ────────────────────────────────────
|
||||||
|
|
||||||
|
|
||||||
|
class TestTwitterDropinNoneFilename:
|
||||||
|
"""Twitter dropin should skip media when download_from_url returns None."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def twitter_dropin(self):
|
||||||
|
from auto_archiver.modules.generic_extractor.twitter import Twitter
|
||||||
|
|
||||||
|
return Twitter()
|
||||||
|
|
||||||
|
def test_create_metadata_skips_failed_photo_download(self, twitter_dropin):
|
||||||
|
"""When download_from_url returns None for a photo, it's not added to media."""
|
||||||
|
tweet = {
|
||||||
|
"user": {"name": "Test User"},
|
||||||
|
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
|
||||||
|
"full_text": "Test tweet with photo",
|
||||||
|
"entities": {
|
||||||
|
"media": [
|
||||||
|
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test.jpg"},
|
||||||
|
]
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
mock_archiver = MagicMock()
|
||||||
|
mock_archiver.download_from_url.return_value = None # simulate failed download
|
||||||
|
|
||||||
|
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
|
||||||
|
|
||||||
|
# The result should have no media since the download failed
|
||||||
|
assert len(result.media) == 0
|
||||||
|
|
||||||
|
def test_create_metadata_skips_failed_video_download(self, twitter_dropin):
|
||||||
|
"""When download_from_url returns None for a video, it's not added to media."""
|
||||||
|
tweet = {
|
||||||
|
"user": {"name": "Test User"},
|
||||||
|
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
|
||||||
|
"full_text": "Test tweet with video",
|
||||||
|
"entities": {
|
||||||
|
"media": [
|
||||||
|
{
|
||||||
|
"type": "video",
|
||||||
|
"video_info": {
|
||||||
|
"variants": [
|
||||||
|
{
|
||||||
|
"url": "https://video.twimg.com/vid/1280x720/test.mp4",
|
||||||
|
"content_type": "video/mp4",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
mock_archiver = MagicMock()
|
||||||
|
mock_archiver.download_from_url.return_value = None
|
||||||
|
|
||||||
|
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
|
||||||
|
|
||||||
|
assert len(result.media) == 0
|
||||||
|
|
||||||
|
def test_create_metadata_keeps_successful_download(self, twitter_dropin, tmp_path):
|
||||||
|
"""When download_from_url succeeds, media is added."""
|
||||||
|
tweet = {
|
||||||
|
"user": {"name": "Test User"},
|
||||||
|
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
|
||||||
|
"full_text": "Test tweet with photo",
|
||||||
|
"entities": {
|
||||||
|
"media": [
|
||||||
|
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test.jpg"},
|
||||||
|
]
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
test_file = tmp_path / "test.jpg"
|
||||||
|
test_file.write_text("fake image data")
|
||||||
|
|
||||||
|
mock_archiver = MagicMock()
|
||||||
|
mock_archiver.download_from_url.return_value = str(test_file)
|
||||||
|
|
||||||
|
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
|
||||||
|
|
||||||
|
assert len(result.media) == 1
|
||||||
|
assert result.media[0].filename == str(test_file)
|
||||||
|
|
||||||
|
def test_create_metadata_mixed_downloads(self, twitter_dropin, tmp_path):
|
||||||
|
"""One download succeeds, one fails – only successful one is kept."""
|
||||||
|
tweet = {
|
||||||
|
"user": {"name": "Test User"},
|
||||||
|
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
|
||||||
|
"full_text": "Test tweet with two photos",
|
||||||
|
"entities": {
|
||||||
|
"media": [
|
||||||
|
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test1.jpg"},
|
||||||
|
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test2.jpg"},
|
||||||
|
]
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
test_file = tmp_path / "test1.jpg"
|
||||||
|
test_file.write_text("fake image data")
|
||||||
|
|
||||||
|
mock_archiver = MagicMock()
|
||||||
|
# First call succeeds, second fails
|
||||||
|
mock_archiver.download_from_url.side_effect = [str(test_file), None]
|
||||||
|
|
||||||
|
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
|
||||||
|
|
||||||
|
assert len(result.media) == 1
|
||||||
|
assert result.media[0].filename == str(test_file)
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
import pytest
|
import pytest
|
||||||
from argparse import ArgumentParser, ArgumentTypeError
|
from argparse import ArgumentParser, ArgumentTypeError
|
||||||
|
from requests.exceptions import SSLError
|
||||||
from auto_archiver.core.orchestrator import ArchivingOrchestrator
|
from auto_archiver.core.orchestrator import ArchivingOrchestrator
|
||||||
from auto_archiver.version import __version__
|
from auto_archiver.version import __version__
|
||||||
from auto_archiver.core.config import read_yaml, store_yaml
|
from auto_archiver.core.config import read_yaml, store_yaml
|
||||||
@@ -256,3 +257,34 @@ def test_load_failed_extractor_cleanup(test_args, mocker, caplog):
|
|||||||
assert "Error during setup of modules: Test exception" in caplog.text
|
assert "Error during setup of modules: Test exception" in caplog.text
|
||||||
# make sure the 'cleanup' is called
|
# make sure the 'cleanup' is called
|
||||||
assert "cleanup" in caplog.text
|
assert "cleanup" in caplog.text
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_for_updates_ssl_error(orchestrator, mocker):
|
||||||
|
"""check_for_updates should not raise when the HTTP request fails."""
|
||||||
|
mocker.patch(
|
||||||
|
"auto_archiver.core.orchestrator.requests.get",
|
||||||
|
side_effect=SSLError("SSL handshake failed"),
|
||||||
|
)
|
||||||
|
# should not raise
|
||||||
|
orchestrator.check_for_updates()
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_for_updates_timeout(orchestrator, mocker):
|
||||||
|
"""check_for_updates should not raise on connection timeout."""
|
||||||
|
from requests.exceptions import ConnectionError
|
||||||
|
|
||||||
|
mocker.patch(
|
||||||
|
"auto_archiver.core.orchestrator.requests.get",
|
||||||
|
side_effect=ConnectionError("Connection refused"),
|
||||||
|
)
|
||||||
|
orchestrator.check_for_updates()
|
||||||
|
|
||||||
|
|
||||||
|
def test_check_for_updates_new_version_available(orchestrator, mocker):
|
||||||
|
"""check_for_updates should not raise when a newer version exists."""
|
||||||
|
mocker.patch(
|
||||||
|
"auto_archiver.core.orchestrator.requests.get",
|
||||||
|
return_value=mocker.Mock(json=lambda: {"info": {"version": "99.0.0"}}),
|
||||||
|
)
|
||||||
|
# should complete without error
|
||||||
|
orchestrator.check_for_updates()
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ from auto_archiver.utils.misc import (
|
|||||||
calculate_file_hash,
|
calculate_file_hash,
|
||||||
random_str,
|
random_str,
|
||||||
get_timestamp,
|
get_timestamp,
|
||||||
|
ydl_entry_to_filename,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -139,3 +140,47 @@ class TestMiscUtils:
|
|||||||
|
|
||||||
def test_invalid_timestamp_returns_none(self):
|
def test_invalid_timestamp_returns_none(self):
|
||||||
assert get_timestamp("invalid-date") is None
|
assert get_timestamp("invalid-date") is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestYdlEntryToFilename:
|
||||||
|
"""Tests for ydl_entry_to_filename, especially .part file filtering."""
|
||||||
|
|
||||||
|
def _make_mock_ydl(self, prepared_filename):
|
||||||
|
class MockYDL:
|
||||||
|
def prepare_filename(self, entry):
|
||||||
|
return prepared_filename
|
||||||
|
|
||||||
|
return MockYDL()
|
||||||
|
|
||||||
|
def test_returns_exact_file_if_exists(self, tmp_path):
|
||||||
|
video = tmp_path / "video.mp4"
|
||||||
|
video.write_bytes(b"data")
|
||||||
|
ydl = self._make_mock_ydl(str(video))
|
||||||
|
assert ydl_entry_to_filename(ydl, {}) == str(video)
|
||||||
|
|
||||||
|
def test_skips_part_file_returns_complete(self, tmp_path):
|
||||||
|
"""Simulates yt-dlp leaving a .part file from a failed format
|
||||||
|
while a complete .webm exists."""
|
||||||
|
(tmp_path / "f5U3IKfoSYs.f399.mp4.part").write_bytes(b"incomplete")
|
||||||
|
webm = tmp_path / "f5U3IKfoSYs.webm"
|
||||||
|
webm.write_bytes(b"complete video")
|
||||||
|
|
||||||
|
# ydl.prepare_filename returns the expected .mp4 which doesn't exist
|
||||||
|
ydl = self._make_mock_ydl(str(tmp_path / "f5U3IKfoSYs.mp4"))
|
||||||
|
result = ydl_entry_to_filename(ydl, {})
|
||||||
|
|
||||||
|
assert result == str(webm)
|
||||||
|
assert not result.endswith(".part")
|
||||||
|
|
||||||
|
def test_skips_part_file_returns_false_if_no_other_match(self, tmp_path):
|
||||||
|
"""Only a .part file exists — should return False."""
|
||||||
|
(tmp_path / "video.f399.mp4.part").write_bytes(b"incomplete")
|
||||||
|
|
||||||
|
ydl = self._make_mock_ydl(str(tmp_path / "video.mp4"))
|
||||||
|
assert ydl_entry_to_filename(ydl, {}) is False
|
||||||
|
|
||||||
|
def test_returns_false_when_no_files_match(self, tmp_path):
|
||||||
|
(tmp_path / "unrelated.txt").write_bytes(b"data")
|
||||||
|
|
||||||
|
ydl = self._make_mock_ydl(str(tmp_path / "video.mp4"))
|
||||||
|
assert ydl_entry_to_filename(ydl, {}) is False
|
||||||
|
|||||||
Reference in New Issue
Block a user