mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-07 19:08:30 +03:00
Compare commits
38 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
afbe4fac50 | ||
|
|
e633be1721 | ||
|
|
bc06de8e5c | ||
|
|
20fddce3a3 | ||
|
|
6efa439cdb | ||
|
|
ef77d1fc86 | ||
|
|
a57a5ee005 | ||
|
|
2582f567ac | ||
|
|
4e5c1a6218 | ||
|
|
12d9c469b2 | ||
|
|
792838f1a1 | ||
|
|
17c4ae15eb | ||
|
|
a08af07348 | ||
|
|
e54077f4e8 | ||
|
|
319c0528da | ||
|
|
ae0e53e434 | ||
|
|
82fc786d56 | ||
|
|
aa65299844 | ||
|
|
1b69ec1f00 | ||
|
|
304e5d40b1 | ||
|
|
3194fee95d | ||
|
|
0040810e2e | ||
|
|
63cfe34e23 | ||
|
|
23a88e3cf4 | ||
|
|
3cac160cc1 | ||
|
|
e9a92272c5 | ||
|
|
5d6c5ac2b1 | ||
|
|
f1de07c9aa | ||
|
|
1e1e060a77 | ||
|
|
b43d229326 | ||
|
|
077b03fc61 | ||
|
|
cf77cfa64d | ||
|
|
bc66dd4f2a | ||
|
|
139d647197 | ||
|
|
f465b570cd | ||
|
|
52a7cabaf1 | ||
|
|
a739361e12 | ||
|
|
b9ab26ed5a |
17
Dockerfile
17
Dockerfile
@@ -1,18 +1,17 @@
|
||||
FROM webrecorder/browsertrix-crawler:1.11.4 AS base
|
||||
FROM webrecorder/browsertrix-crawler:1.12.4 AS base
|
||||
|
||||
ENV RUNNING_IN_DOCKER=1 \
|
||||
LANG=C.UTF-8 \
|
||||
LC_ALL=C.UTF-8 \
|
||||
PYTHONDONTWRITEBYTECODE=1 \
|
||||
PYTHONFAULTHANDLER=1 \
|
||||
PATH="/root/.local/bin:$PATH"
|
||||
PYTHONFAULTHANDLER=1
|
||||
|
||||
|
||||
ARG TARGETARCH
|
||||
|
||||
# Installing system dependencies
|
||||
RUN apt-get update && \
|
||||
apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool python3-tk
|
||||
apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool python3-tk
|
||||
|
||||
# Poetry and runtime
|
||||
FROM base AS runtime
|
||||
@@ -41,11 +40,21 @@ COPY ./src/ .
|
||||
RUN /poetry-venv/bin/poetry install --only main --no-cache
|
||||
|
||||
|
||||
# Run as non-root user to avoid permission issues with mounted volumes (see #342)
|
||||
# The base image already has an 'ubuntu' user at UID/GID 1000.
|
||||
# Ensure directories that need write access at runtime are writable.
|
||||
RUN chown 1000:1000 /app && \
|
||||
chown -R 1000:1000 /app/.venv/lib/python3.12/site-packages/seleniumbase/drivers/ && \
|
||||
mkdir -p /app/local_archive /app/secrets /tmp/archive && \
|
||||
chown -R 1000:1000 /app/local_archive /app/secrets /tmp/archive
|
||||
|
||||
# Update PATH to include virtual environment binaries
|
||||
# Allowing entry point to run the application directly with Python
|
||||
ENV VIRTUAL_ENV=/app/.venv \
|
||||
PATH="/app/.venv/bin:$PATH"
|
||||
|
||||
USER 1000
|
||||
|
||||
ENTRYPOINT ["python3", "-m", "auto_archiver"]
|
||||
|
||||
# should be executed with 2 volumes (3 if local_storage is used)
|
||||
|
||||
@@ -6,6 +6,9 @@ services:
|
||||
context: .
|
||||
dockerfile: Dockerfile
|
||||
container_name: auto-archiver
|
||||
# Override user to match host UID/GID and avoid permission issues on volumes.
|
||||
# Set USER_ID and GROUP_ID env vars, or defaults to 1000:1000.
|
||||
user: "${USER_ID:-1000}:${GROUP_ID:-1000}"
|
||||
volumes:
|
||||
- ./secrets:/app/secrets
|
||||
- ./local_archive:/app/local_archive
|
||||
|
||||
1428
poetry.lock
generated
1428
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[project]
|
||||
name = "auto-archiver"
|
||||
version = "1.2.1"
|
||||
version = "1.2.7"
|
||||
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
|
||||
|
||||
requires-python = ">=3.10,<3.13"
|
||||
|
||||
@@ -11,6 +11,7 @@ Key Functionalities:
|
||||
|
||||
from __future__ import annotations
|
||||
import hashlib
|
||||
import os
|
||||
from typing import Any, List, Union, Dict
|
||||
from dataclasses import dataclass, field
|
||||
from dataclasses_json import dataclass_json
|
||||
@@ -181,8 +182,14 @@ class Metadata:
|
||||
media_hashes = set()
|
||||
new_media = []
|
||||
for m in self.media:
|
||||
if not m.filename:
|
||||
new_media.append(m)
|
||||
continue
|
||||
h = m.get("hash")
|
||||
if not h:
|
||||
if not os.path.exists(m.filename):
|
||||
logger.warning(f"Skipping missing media file: {m.filename}")
|
||||
continue
|
||||
h = calculate_hash_in_chunks(hashlib.sha256(), int(1.6e7), m.filename)
|
||||
if len(h) and h in media_hashes:
|
||||
continue
|
||||
|
||||
@@ -467,7 +467,11 @@ Here's how that would look: \n\nsteps:\n extractors:\n - [your_extractor_name_
|
||||
return self.setup_complete_parser(basic_config, yaml_config, unused_args)
|
||||
|
||||
def check_for_updates(self):
|
||||
response = requests.get("https://pypi.org/pypi/auto-archiver/json").json()
|
||||
try:
|
||||
response = requests.get("https://pypi.org/pypi/auto-archiver/json", timeout=10).json()
|
||||
except Exception as e:
|
||||
logger.debug(f"Unable to check for updates: {e}")
|
||||
return
|
||||
latest_version = version.parse(response["info"]["version"])
|
||||
current_version = version.parse(__version__)
|
||||
# check version compared to current version
|
||||
|
||||
@@ -73,6 +73,7 @@ class AntibotExtractorEnricher(Extractor, Enricher):
|
||||
if self.enrich(result):
|
||||
result.status = "antibot"
|
||||
return result
|
||||
return False
|
||||
|
||||
def _prepare_user_data_dir(self):
|
||||
if self.user_data_dir:
|
||||
@@ -88,8 +89,18 @@ class AntibotExtractorEnricher(Extractor, Enricher):
|
||||
using_user_data_dir = self.user_data_dir if custom_data_dir else None
|
||||
url = to_enrich.get_url()
|
||||
|
||||
# Use xvfb in Docker environments where no display is available
|
||||
use_xvfb = bool(os.environ.get("RUNNING_IN_DOCKER"))
|
||||
|
||||
try:
|
||||
with SB(uc=True, agent=self.agent, headed=None, user_data_dir=using_user_data_dir, proxy=self.proxy) as sb:
|
||||
with SB(
|
||||
uc=True,
|
||||
agent=self.agent,
|
||||
headed=None,
|
||||
user_data_dir=using_user_data_dir,
|
||||
proxy=self.proxy,
|
||||
xvfb=use_xvfb,
|
||||
) as sb:
|
||||
logger.info(f"Selenium browser is up with agent {self.agent}, opening url...")
|
||||
sb.uc_open_with_reconnect(url, 4)
|
||||
|
||||
|
||||
@@ -39,12 +39,18 @@ class Bluesky(GenericDropin):
|
||||
media_url = "https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={}&did={}"
|
||||
for image_media in image_medias:
|
||||
url = media_url.format(image_media["image"]["ref"]["$link"], post["author"]["did"])
|
||||
image_media = archiver.download_from_url(url)
|
||||
media.append(Media(image_media))
|
||||
filename = archiver.download_from_url(url)
|
||||
if filename:
|
||||
media.append(Media(filename))
|
||||
else:
|
||||
logger.warning(f"Failed to download Bluesky image from {url}")
|
||||
for video_media in video_medias:
|
||||
url = media_url.format(video_media["ref"]["$link"], post["author"]["did"])
|
||||
video_media = archiver.download_from_url(url)
|
||||
media.append(Media(video_media))
|
||||
filename = archiver.download_from_url(url)
|
||||
if filename:
|
||||
media.append(Media(filename))
|
||||
else:
|
||||
logger.warning(f"Failed to download Bluesky video from {url}")
|
||||
return media
|
||||
|
||||
def _get_post_data(self, post: dict) -> dict:
|
||||
|
||||
@@ -204,8 +204,11 @@ class GenericExtractor(Extractor):
|
||||
if thumbnail_url:
|
||||
try:
|
||||
cover_image_path = self.download_from_url(thumbnail_url)
|
||||
media = Media(cover_image_path)
|
||||
metadata.add_media(media, id="cover")
|
||||
if cover_image_path:
|
||||
media = Media(cover_image_path)
|
||||
metadata.add_media(media, id="cover")
|
||||
else:
|
||||
logger.warning(f"Failed to download cover image from {thumbnail_url}")
|
||||
except Exception as e:
|
||||
logger.error(f"Could not download cover image {thumbnail_url}: {e}")
|
||||
|
||||
@@ -572,6 +575,8 @@ class GenericExtractor(Extractor):
|
||||
"--live-from-start" if self.live_from_start else "--no-live-from-start",
|
||||
"--postprocessor-args",
|
||||
"ffmpeg:-bitexact", # ensure bitexact output to avoid mismatching hashes for same video
|
||||
"--js-runtimes",
|
||||
"node", # yt-dlp defaults to deno-only; node is available in the base image
|
||||
]
|
||||
|
||||
# proxy handling
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from typing import Type
|
||||
|
||||
from auto_archiver.utils import traverse_obj
|
||||
from auto_archiver.utils.custom_logger import logger
|
||||
from auto_archiver.core.metadata import Metadata, Media
|
||||
from auto_archiver.core.extractor import Extractor
|
||||
from yt_dlp.extractor.common import InfoExtractor
|
||||
@@ -58,6 +59,9 @@ class Truth(GenericDropin):
|
||||
# add the media
|
||||
for media in post.get("media_attachments", []):
|
||||
filename = archiver.download_from_url(media["url"])
|
||||
if not filename:
|
||||
logger.warning(f"Failed to download media from {media['url']}")
|
||||
continue
|
||||
result.add_media(Media(filename), id=media.get("id"))
|
||||
|
||||
return result
|
||||
|
||||
@@ -9,6 +9,8 @@ from auto_archiver.utils import url as UrlUtil, get_datetime_from_str
|
||||
from auto_archiver.core.extractor import Extractor
|
||||
from auto_archiver.utils.deletion_detection import detect_deletion, flag_as_deleted
|
||||
from auto_archiver.modules.generic_extractor.dropin import GenericDropin, InfoExtractor
|
||||
import requests
|
||||
from retrying import retry
|
||||
|
||||
|
||||
class Twitter(GenericDropin):
|
||||
@@ -29,7 +31,85 @@ class Twitter(GenericDropin):
|
||||
|
||||
def extract_post(self, url: str, ie_instance: InfoExtractor):
|
||||
twid = ie_instance._match_valid_url(url).group("id")
|
||||
return ie_instance._extract_status(twid=twid)
|
||||
try:
|
||||
post_data = ie_instance._extract_status(twid=twid)
|
||||
if not post_data or not post_data.get("user") or not post_data.get("created_at"):
|
||||
raise ValueError("Error retrieving post with twitter dropin")
|
||||
return post_data
|
||||
except Exception as e:
|
||||
logger.debug(f"yt-dlp twitter extraction failed: {e}")
|
||||
# try fxtwitter API as fallback
|
||||
return self._fetch_fxtwitter(twid)
|
||||
|
||||
def _fetch_fxtwitter(self, twid: str) -> dict:
|
||||
"""Fetch tweet data from fxtwitter API and convert to expected format."""
|
||||
fxtwitter_url = f"https://api.fxtwitter.com/status/{twid}"
|
||||
logger.info(f"Falling back to fxtwitter API for tweet extraction: {fxtwitter_url}")
|
||||
|
||||
@retry(wait_random_min=500, wait_random_max=2000, stop_max_attempt_number=3)
|
||||
def fetch_fxtwitter_data(url):
|
||||
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/112.0"}
|
||||
resp = requests.get(url, headers=headers, timeout=15)
|
||||
if resp.status_code != 200:
|
||||
raise ValueError(f"Failed to retrieve tweet from fxtwitter API: {resp.status_code}")
|
||||
data = resp.json()
|
||||
if "tweet" not in data:
|
||||
raise ValueError(f"No tweet data in fxtwitter response: {data.get('message', 'Unknown error')}")
|
||||
return data["tweet"]
|
||||
|
||||
tweet = fetch_fxtwitter_data(fxtwitter_url)
|
||||
|
||||
# Convert fxtwitter format to expected format
|
||||
author = tweet.get("author", {}).get("name", "")
|
||||
created_at = tweet.get("created_at", "") # Format: "Sun Feb 08 18:45:00 +0000 2026"
|
||||
full_text = tweet.get("text", "") or tweet.get("raw_text", "")
|
||||
|
||||
# Convert media format
|
||||
media = []
|
||||
fx_media = tweet.get("media", {})
|
||||
|
||||
# Handle photos
|
||||
for photo in fx_media.get("photos", []):
|
||||
media.append({"type": "photo", "media_url_https": photo.get("url", "")})
|
||||
|
||||
# Handle videos
|
||||
for video in fx_media.get("videos", []):
|
||||
variants = video.get("variants", [])
|
||||
# Convert to expected variant format
|
||||
converted_variants = []
|
||||
for var in variants:
|
||||
converted_variants.append(
|
||||
{
|
||||
"url": var.get("url", ""),
|
||||
"content_type": var.get("content_type", "video/mp4"),
|
||||
"bitrate": var.get("bitrate", 0),
|
||||
}
|
||||
)
|
||||
if converted_variants:
|
||||
media.append({"type": "video", "video_info": {"variants": converted_variants}})
|
||||
|
||||
# Handle animated gifs (fxtwitter may include these in videos)
|
||||
for item in fx_media.get("all", []):
|
||||
if item.get("type") == "gif":
|
||||
variants = item.get("variants", [])
|
||||
converted_variants = []
|
||||
for var in variants:
|
||||
converted_variants.append(
|
||||
{
|
||||
"url": var.get("url", ""),
|
||||
"content_type": var.get("content_type", "video/mp4"),
|
||||
"bitrate": var.get("bitrate", 0),
|
||||
}
|
||||
)
|
||||
if converted_variants:
|
||||
media.append({"type": "animated_gif", "video_info": {"variants": converted_variants}})
|
||||
|
||||
return {
|
||||
"user": {"name": author},
|
||||
"created_at": created_at,
|
||||
"full_text": full_text,
|
||||
"entities": {"media": media},
|
||||
}
|
||||
|
||||
def keys_to_clean(self, video_data, info_extractor):
|
||||
return ["user", "created_at", "entities", "favorited", "translator_type"]
|
||||
@@ -77,5 +157,8 @@ class Twitter(GenericDropin):
|
||||
mimetype = variant["content_type"]
|
||||
ext = mimetypes.guess_extension(mimetype)
|
||||
media.filename = archiver.download_from_url(media.get("src"), f"{slugify(url)}_{i}{ext}")
|
||||
if not media.filename:
|
||||
logger.warning(f"Failed to download media from {media.get('src')}")
|
||||
continue
|
||||
result.add_media(media)
|
||||
return result
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
from .ghostarchive_enricher import GhostarchiveEnricher
|
||||
@@ -0,0 +1,58 @@
|
||||
{
|
||||
"name": "Ghost Archive Enricher",
|
||||
"type": ["enricher"],
|
||||
"entry_point": "ghostarchive_enricher::GhostarchiveEnricher",
|
||||
"requires_setup": False,
|
||||
"dependencies": {
|
||||
"python": ["loguru", "requests", "bs4", "seleniumbase"],
|
||||
},
|
||||
"configs": {
|
||||
"timeout": {
|
||||
"default": 120,
|
||||
"type": "int",
|
||||
"help": "seconds to wait for successful archive confirmation from Ghost Archive.",
|
||||
},
|
||||
"check_existing": {
|
||||
"default": True,
|
||||
"type": "bool",
|
||||
"help": "whether to search for an existing archive before submitting a new one.",
|
||||
},
|
||||
"proxy_http": {
|
||||
"default": None,
|
||||
"help": "http proxy to use for requests, eg http://proxy-user:password@proxy-ip:port",
|
||||
},
|
||||
"proxy_https": {
|
||||
"default": None,
|
||||
"help": "https proxy to use for requests, eg https://proxy-user:password@proxy-ip:port",
|
||||
},
|
||||
},
|
||||
"description": """
|
||||
Submits the current URL to [Ghost Archive](https://ghostarchive.org/) for archiving and returns the archived page URL.
|
||||
|
||||
Used as an **enricher** to add a Ghost Archive URL to items already extracted by other modules.
|
||||
|
||||
### Features
|
||||
- Archives any public URL using the Ghost Archive service.
|
||||
- Optionally checks for existing archives before submitting a new one.
|
||||
- Supports HTTP and HTTPS proxies for requests.
|
||||
- Parses HTML responses to extract archive URLs (Ghost Archive has no JSON API).
|
||||
|
||||
### Important
|
||||
- This module confirms that Ghost Archive accepted the URL submission and returned an archive link.
|
||||
It does **not** verify the contents or completeness of the archived page.
|
||||
|
||||
### Notes
|
||||
- Ghost Archive is a free service with no authentication required.
|
||||
- Archived pages must be smaller than 50 MB (including CSS, fonts, images, etc.).
|
||||
- Videos are archived up to 360p and must be under 100 MB and shorter than 30 minutes.
|
||||
- Archival may take up to 5 minutes depending on the queue and page complexity.
|
||||
- Archived content is stored indefinitely.
|
||||
- Ghost Archive does not archive pages that require authentication or form submission.
|
||||
|
||||
### Limitations
|
||||
- No official API — this module interacts with the Ghost Archive web interface.
|
||||
- The submission endpoint is protected by Cloudflare, so a headless browser (SeleniumBase) is used for new submissions.
|
||||
- Searching for existing archives uses plain HTTP requests and does not require a browser.
|
||||
- Rate limiting may apply; consider using a delay between requests if archiving many URLs.
|
||||
""",
|
||||
}
|
||||
@@ -0,0 +1,153 @@
|
||||
import time
|
||||
import re
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from seleniumbase import SB
|
||||
from auto_archiver.utils.custom_logger import logger
|
||||
from auto_archiver.utils import url as UrlUtil
|
||||
from auto_archiver.core import Enricher, Metadata
|
||||
|
||||
|
||||
class GhostarchiveEnricher(Enricher):
|
||||
"""
|
||||
Submits the current URL to Ghost Archive (ghostarchive.org) for archiving
|
||||
and stores the archived page URL as enrichment metadata.
|
||||
|
||||
Ghost Archive has no official API — this module interacts with the web form
|
||||
and parses HTML responses. The submission endpoint is protected by Cloudflare,
|
||||
so a headless browser (SeleniumBase) is used for archival submissions, while
|
||||
plain HTTP requests are used for searching existing archives.
|
||||
|
||||
Note: this module only confirms that Ghost Archive accepted the submission
|
||||
and returned an archive URL. It does not verify that the archived page
|
||||
content is complete or correctly rendered.
|
||||
"""
|
||||
|
||||
GHOSTARCHIVE_BASE = "https://ghostarchive.org"
|
||||
ARCHIVE_ENDPOINT = f"{GHOSTARCHIVE_BASE}/archive2"
|
||||
SEARCH_ENDPOINT = f"{GHOSTARCHIVE_BASE}/search"
|
||||
ARCHIVE_URL_PATTERN = re.compile(r"/archive/([A-Za-z0-9]+)")
|
||||
|
||||
def _get_proxies(self) -> dict:
|
||||
proxies = {}
|
||||
if self.proxy_http:
|
||||
proxies["http"] = self.proxy_http
|
||||
if self.proxy_https:
|
||||
proxies["https"] = self.proxy_https
|
||||
return proxies
|
||||
|
||||
def _get_headers(self) -> dict:
|
||||
return {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||||
}
|
||||
|
||||
def _normalize_archive_href(self, href: str) -> str | None:
|
||||
"""Normalize an archive link href to a full HTTPS URL, filtering out replay links."""
|
||||
if "/archive/" not in href or "/replay/" in href:
|
||||
return None
|
||||
if href.startswith("/"):
|
||||
return f"{self.GHOSTARCHIVE_BASE}{href}"
|
||||
if href.startswith("http://ghostarchive.org"):
|
||||
return href.replace("http://", "https://")
|
||||
if href.startswith("https://ghostarchive.org"):
|
||||
return href
|
||||
return None
|
||||
|
||||
def _search_existing(self, url: str) -> str | None:
|
||||
"""
|
||||
Search Ghost Archive for an existing archive of the given URL.
|
||||
Returns the archive URL if found, otherwise None.
|
||||
"""
|
||||
try:
|
||||
r = requests.get(
|
||||
self.SEARCH_ENDPOINT,
|
||||
params={"term": url},
|
||||
headers=self._get_headers(),
|
||||
proxies=self._get_proxies(),
|
||||
timeout=30,
|
||||
)
|
||||
if r.status_code != 200:
|
||||
logger.warning(f"Ghost Archive search returned status {r.status_code}")
|
||||
return None
|
||||
|
||||
soup = BeautifulSoup(r.text, "html.parser")
|
||||
for link in soup.find_all("a", href=True):
|
||||
archive_url = self._normalize_archive_href(link["href"])
|
||||
if archive_url:
|
||||
logger.info(f"Found existing Ghost Archive: {archive_url}")
|
||||
return archive_url
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.warning(f"Ghost Archive search failed: {e}")
|
||||
|
||||
return None
|
||||
|
||||
def _submit_url(self, url: str) -> str | None:
|
||||
"""
|
||||
Submit a URL to Ghost Archive for archiving using a headless browser.
|
||||
The /archive2 endpoint is Cloudflare-protected, requiring JS execution.
|
||||
Returns the archive URL if successful, otherwise None.
|
||||
"""
|
||||
try:
|
||||
with SB(uc=True, headless=True) as sb:
|
||||
logger.debug("Opening Ghost Archive homepage in headless browser")
|
||||
sb.open(self.GHOSTARCHIVE_BASE)
|
||||
|
||||
# fill in the archive form and submit
|
||||
sb.type('input[name="archive"]', url)
|
||||
sb.click('input[type="submit"][value="Submit for archival"]')
|
||||
|
||||
# wait for navigation to /archive/{id} or timeout
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < self.timeout:
|
||||
current_url = sb.get_current_url()
|
||||
if self.ARCHIVE_URL_PATTERN.search(current_url):
|
||||
archive_url = current_url.split("?")[0]
|
||||
logger.info(f"Ghost Archive saved: {archive_url}")
|
||||
return archive_url
|
||||
time.sleep(2)
|
||||
|
||||
# if we didn't redirect, try parsing the page source
|
||||
page_source = sb.get_page_source()
|
||||
return self._parse_archive_url(page_source)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Ghost Archive submission failed: {e}")
|
||||
return None
|
||||
|
||||
def _parse_archive_url(self, html: str) -> str | None:
|
||||
"""Parse HTML response to find an archive URL."""
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
for link in soup.find_all("a", href=True):
|
||||
archive_url = self._normalize_archive_href(link["href"])
|
||||
if archive_url:
|
||||
return archive_url
|
||||
return None
|
||||
|
||||
def enrich(self, to_enrich: Metadata) -> bool:
|
||||
url = to_enrich.get_url()
|
||||
if UrlUtil.is_auth_wall(url):
|
||||
logger.debug("[SKIP] Ghost Archive since url is behind AUTH WALL")
|
||||
return False
|
||||
|
||||
if to_enrich.get("ghostarchive"):
|
||||
logger.info(f"Ghost Archive enricher had already been executed: {to_enrich.get('ghostarchive')}")
|
||||
return True
|
||||
|
||||
# optionally check for existing archive first
|
||||
archive_url = None
|
||||
if self.check_existing:
|
||||
logger.debug(f"Searching Ghost Archive for existing archive of {url}")
|
||||
archive_url = self._search_existing(url)
|
||||
|
||||
if not archive_url:
|
||||
logger.debug(f"Submitting {url} to Ghost Archive")
|
||||
archive_url = self._submit_url(url)
|
||||
|
||||
if archive_url:
|
||||
to_enrich.set("ghostarchive", archive_url)
|
||||
return True
|
||||
|
||||
logger.warning(f"Ghost Archive failed to archive {url}")
|
||||
return False
|
||||
@@ -25,6 +25,9 @@ class HashEnricher(Enricher):
|
||||
logger.debug(f"Calculating media hashes with algo={self.algorithm}")
|
||||
|
||||
for i, m in enumerate(to_enrich.media):
|
||||
if not m.filename:
|
||||
logger.warning(f"Skipping hash for media without filename: {m}")
|
||||
continue
|
||||
if len(hd := self.calculate_hash(m.filename)):
|
||||
to_enrich.media[i].set("hash", f"{self.algorithm}:{hd}")
|
||||
|
||||
|
||||
@@ -99,7 +99,10 @@ class InstagramAPIExtractor(Extractor):
|
||||
result.set_title(user.get("full_name", username)).set("data", user)
|
||||
if pic_url := user.get("profile_pic_url_hd", user.get("profile_pic_url")):
|
||||
filename = self.download_from_url(pic_url)
|
||||
result.add_media(Media(filename=filename), id="profile_picture")
|
||||
if filename:
|
||||
result.add_media(Media(filename=filename), id="profile_picture")
|
||||
else:
|
||||
logger.warning(f"Failed to download profile picture from {pic_url}")
|
||||
|
||||
count_posts = 0
|
||||
if self.full_profile:
|
||||
@@ -202,7 +205,10 @@ class InstagramAPIExtractor(Extractor):
|
||||
|
||||
if cover_media := h_info.get("cover_media", {}).get("cropped_image_version", {}).get("url"):
|
||||
filename = self.download_from_url(cover_media)
|
||||
result.add_media(Media(filename=filename), id=f"cover_media highlight {id}")
|
||||
if filename:
|
||||
result.add_media(Media(filename=filename), id=f"cover_media highlight {id}")
|
||||
else:
|
||||
logger.warning(f"Failed to download cover media from {cover_media}")
|
||||
|
||||
items = h_info.get("items", [])[::-1] # newest to oldest
|
||||
items = items[: min(max_to_download, len(items))]
|
||||
@@ -345,7 +351,10 @@ class InstagramAPIExtractor(Extractor):
|
||||
image_media = None
|
||||
if image_url := item.get("thumbnail_url"):
|
||||
filename = self.download_from_url(image_url, verbose=False)
|
||||
image_media = Media(filename=filename)
|
||||
if filename:
|
||||
image_media = Media(filename=filename)
|
||||
else:
|
||||
logger.warning(f"Failed to download thumbnail from {image_url}")
|
||||
|
||||
# retrieve video info
|
||||
best_id = item.get("id", item.get("pk"))
|
||||
@@ -357,16 +366,19 @@ class InstagramAPIExtractor(Extractor):
|
||||
|
||||
if video_url := item.get("video_url"):
|
||||
filename = self.download_from_url(video_url, verbose=False)
|
||||
video_media = Media(filename=filename)
|
||||
if taken_at:
|
||||
video_media.set("date", taken_at)
|
||||
if code:
|
||||
video_media.set("url", f"https://www.instagram.com/p/{code}")
|
||||
if caption_text:
|
||||
video_media.set("text", caption_text)
|
||||
video_media.set("preview", [image_media])
|
||||
video_media.set("data", [item])
|
||||
return item, video_media, f"{context or 'video'} {best_id}"
|
||||
if filename:
|
||||
video_media = Media(filename=filename)
|
||||
if taken_at:
|
||||
video_media.set("date", taken_at)
|
||||
if code:
|
||||
video_media.set("url", f"https://www.instagram.com/p/{code}")
|
||||
if caption_text:
|
||||
video_media.set("text", caption_text)
|
||||
video_media.set("preview", [image_media])
|
||||
video_media.set("data", [item])
|
||||
return item, video_media, f"{context or 'video'} {best_id}"
|
||||
else:
|
||||
logger.warning(f"Failed to download video from {video_url}")
|
||||
elif image_media:
|
||||
if taken_at:
|
||||
image_media.set("date", taken_at)
|
||||
|
||||
@@ -25,6 +25,9 @@ class MetaEnricher(Enricher):
|
||||
logger.debug(f"Calculating archive file sizes for {len(to_enrich.media)} media files")
|
||||
total_size = 0
|
||||
for media in to_enrich.get_all_media():
|
||||
if not media.filename:
|
||||
logger.warning(f"Skipping file size for media without filename: {media}")
|
||||
continue
|
||||
file_stats = os.stat(media.filename)
|
||||
media.set("bytes", file_stats.st_size)
|
||||
media.set("size", self.human_readable_bytes(file_stats.st_size))
|
||||
|
||||
@@ -49,10 +49,18 @@ class TelegramExtractor(Extractor):
|
||||
if not len(image_urls):
|
||||
return False
|
||||
for img_url in image_urls:
|
||||
result.add_media(Media(self.download_from_url(img_url)))
|
||||
filename = self.download_from_url(img_url)
|
||||
if not filename:
|
||||
logger.warning(f"Failed to download image from {img_url}")
|
||||
continue
|
||||
result.add_media(Media(filename))
|
||||
else:
|
||||
video_url = video.get("src")
|
||||
m_video = Media(self.download_from_url(video_url))
|
||||
video_filename = self.download_from_url(video_url)
|
||||
if not video_filename:
|
||||
logger.warning(f"Failed to download video from {video_url}")
|
||||
return False
|
||||
m_video = Media(video_filename)
|
||||
# extract duration from HTML
|
||||
try:
|
||||
duration = s.find_all("time")[0].contents[0]
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
import os
|
||||
import shutil
|
||||
import re
|
||||
@@ -53,6 +54,16 @@ class TelethonExtractor(Extractor):
|
||||
logger.debug(f"Making a copy of the session file {base_session_filepath} to {self.session_file}.session")
|
||||
shutil.copy(base_session_filepath, f"{self.session_file}.session")
|
||||
|
||||
# ensure a running event loop exists (Needed when used by Celery workers which may close the default one)
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_closed():
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
except RuntimeError:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
# initiate the client
|
||||
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
|
||||
|
||||
@@ -190,6 +201,9 @@ class TelethonExtractor(Extractor):
|
||||
)
|
||||
for i, om_url in enumerate(other_media_urls):
|
||||
filename = self.download_from_url(om_url, f"{chat}_{group_id}_{i}")
|
||||
if not filename:
|
||||
logger.warning(f"Failed to download media from {om_url}")
|
||||
continue
|
||||
result.add_media(Media(filename=filename), id=f"{group_id}_{i}")
|
||||
|
||||
filename_dest = os.path.join(self.tmp_dir, f"{chat}_{group_id}", str(mp.id))
|
||||
|
||||
@@ -114,6 +114,9 @@ class TwitterApiExtractor(Extractor):
|
||||
logger.info(f"Found media {media}")
|
||||
ext = mimetypes.guess_extension(mimetype)
|
||||
media.filename = self.download_from_url(media.get("src"), f"{slugify(url)}_{i}{ext}")
|
||||
if not media.filename:
|
||||
logger.warning(f"Failed to download media from {media.get('src')}")
|
||||
continue
|
||||
result.add_media(media)
|
||||
|
||||
result.set_content(
|
||||
|
||||
@@ -24,8 +24,7 @@ class WaczExtractorEnricher(Enricher, Extractor):
|
||||
self.use_docker = os.environ.get("WACZ_ENABLE_DOCKER") or not os.environ.get("RUNNING_IN_DOCKER")
|
||||
self.docker_in_docker = os.environ.get("WACZ_ENABLE_DOCKER") and os.environ.get("RUNNING_IN_DOCKER")
|
||||
|
||||
self.crawl_id = random_str(8)
|
||||
self.cwd_dind = f"/crawls/crawls{self.crawl_id}"
|
||||
self.cwd_dind = f"/crawls/crawls{random_str(8)}"
|
||||
self.browsertrix_home_host = os.environ.get("BROWSERTRIX_HOME_HOST")
|
||||
self.browsertrix_home_container = os.environ.get("BROWSERTRIX_HOME_CONTAINER") or self.browsertrix_home_host
|
||||
# create crawls folder if not exists, so it can be safely removed in cleanup
|
||||
@@ -51,7 +50,8 @@ class WaczExtractorEnricher(Enricher, Extractor):
|
||||
|
||||
url = to_enrich.get_url()
|
||||
|
||||
collection = self.crawl_id
|
||||
crawl_id = random_str(8)
|
||||
collection = crawl_id
|
||||
browsertrix_home_host = self.browsertrix_home_host or os.path.abspath(self.tmp_dir)
|
||||
browsertrix_home_container = self.browsertrix_home_container or browsertrix_home_host
|
||||
|
||||
@@ -83,8 +83,10 @@ class WaczExtractorEnricher(Enricher, Extractor):
|
||||
# "--blockAds" # note: this has been known to cause issues on cloudflare protected sites
|
||||
]
|
||||
|
||||
crawl_cwd_dind = os.path.join(self.cwd_dind, crawl_id)
|
||||
if self.docker_in_docker:
|
||||
cmd.extend(["--cwd", self.cwd_dind])
|
||||
os.makedirs(crawl_cwd_dind, exist_ok=True)
|
||||
cmd.extend(["--cwd", crawl_cwd_dind])
|
||||
|
||||
if self.auth_for_site(url):
|
||||
# there's an auth for this site, but browsertrix only supports username/password auth
|
||||
@@ -109,7 +111,7 @@ class WaczExtractorEnricher(Enricher, Extractor):
|
||||
] + cmd
|
||||
|
||||
if self.profile:
|
||||
profile_file = f"profile-{self.crawl_id}.tar.gz"
|
||||
profile_file = f"profile-{crawl_id}.tar.gz"
|
||||
profile_fn = os.path.join(browsertrix_home_container, profile_file)
|
||||
logger.debug(f"Copying {self.profile} to {profile_fn}")
|
||||
shutil.copyfile(self.profile, profile_fn)
|
||||
@@ -137,7 +139,7 @@ class WaczExtractorEnricher(Enricher, Extractor):
|
||||
return False
|
||||
|
||||
if self.docker_in_docker:
|
||||
wacz_fn = os.path.join(self.cwd_dind, "collections", collection, f"{collection}.wacz")
|
||||
wacz_fn = os.path.join(crawl_cwd_dind, "collections", collection, f"{collection}.wacz")
|
||||
elif self.use_docker:
|
||||
wacz_fn = os.path.join(browsertrix_home_container, "collections", collection, f"{collection}.wacz")
|
||||
else:
|
||||
@@ -152,7 +154,7 @@ class WaczExtractorEnricher(Enricher, Extractor):
|
||||
self.extract_media_from_wacz(to_enrich, wacz_fn)
|
||||
|
||||
if self.docker_in_docker:
|
||||
jsonl_fn = os.path.join(self.cwd_dind, "collections", collection, "pages", "pages.jsonl")
|
||||
jsonl_fn = os.path.join(crawl_cwd_dind, "collections", collection, "pages", "pages.jsonl")
|
||||
elif self.use_docker:
|
||||
jsonl_fn = os.path.join(browsertrix_home_container, "collections", collection, "pages", "pages.jsonl")
|
||||
else:
|
||||
|
||||
@@ -120,6 +120,9 @@ def ydl_entry_to_filename(ydl, entry: dict) -> str:
|
||||
directory = os.path.dirname(base_filename) # '/get/path/to'
|
||||
basename = os.path.basename(base_filename) # 'file'
|
||||
for f in os.listdir(directory):
|
||||
# skip incomplete downloads left behind by yt-dlp
|
||||
if f.endswith(".part"):
|
||||
continue
|
||||
if (
|
||||
f.startswith(basename)
|
||||
or (entry_url and os.path.splitext(f)[0] in entry_url)
|
||||
|
||||
1
tests/core/__init__.py
Normal file
1
tests/core/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Core module tests
|
||||
198
tests/core/test_media.py
Normal file
198
tests/core/test_media.py
Normal file
@@ -0,0 +1,198 @@
|
||||
"""
|
||||
Tests for the Media class from auto_archiver.core.media
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
from auto_archiver.core.media import Media
|
||||
|
||||
|
||||
class TestMediaBasics:
|
||||
"""Test basic Media properties and methods."""
|
||||
|
||||
def test_media_creation_with_filename(self):
|
||||
media = Media(filename="test.mp4")
|
||||
assert media.filename == "test.mp4"
|
||||
assert media.urls == []
|
||||
assert media.properties == {}
|
||||
|
||||
def test_media_key_property(self):
|
||||
media = Media(filename="test.mp4", _key="my_key")
|
||||
assert media.key == "my_key"
|
||||
|
||||
def test_media_set_get_properties(self):
|
||||
media = Media(filename="test.mp4")
|
||||
result = media.set("author", "John Doe")
|
||||
assert result is media # returns self for chaining
|
||||
assert media.get("author") == "John Doe"
|
||||
assert media.get("nonexistent") is None
|
||||
assert media.get("nonexistent", "default") == "default"
|
||||
|
||||
def test_media_add_url(self):
|
||||
media = Media(filename="test.mp4")
|
||||
media.add_url("https://example.com/test.mp4")
|
||||
assert "https://example.com/test.mp4" in media.urls
|
||||
media.add_url("https://cdn.example.com/test.mp4")
|
||||
assert len(media.urls) == 2
|
||||
|
||||
|
||||
class TestMediaMimetype:
|
||||
"""Test mimetype detection and handling."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"filename,expected_mimetype",
|
||||
[
|
||||
("video.mp4", "video/mp4"),
|
||||
("image.jpg", "image/jpeg"),
|
||||
("image.png", "image/png"),
|
||||
("audio.mp3", "audio/mpeg"),
|
||||
("document.pdf", "application/pdf"),
|
||||
("text.txt", "text/plain"),
|
||||
],
|
||||
)
|
||||
def test_mimetype_detection(self, filename, expected_mimetype):
|
||||
media = Media(filename=filename)
|
||||
assert media.mimetype == expected_mimetype
|
||||
|
||||
def test_mimetype_setter(self):
|
||||
media = Media(filename="file.unknown")
|
||||
media.mimetype = "custom/type"
|
||||
assert media.mimetype == "custom/type"
|
||||
|
||||
def test_mimetype_empty_filename(self):
|
||||
media = Media(filename="")
|
||||
assert media.mimetype == ""
|
||||
|
||||
|
||||
class TestMediaTypeChecks:
|
||||
"""Test media type checking methods."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"filename,is_video,is_audio,is_image",
|
||||
[
|
||||
("video.mp4", True, False, False),
|
||||
("video.avi", True, False, False),
|
||||
("audio.mp3", False, True, False),
|
||||
("audio.wav", False, True, False),
|
||||
("image.jpg", False, False, True),
|
||||
("image.png", False, False, True),
|
||||
("document.pdf", False, False, False),
|
||||
],
|
||||
)
|
||||
def test_type_checks(self, filename, is_video, is_audio, is_image):
|
||||
media = Media(filename=filename)
|
||||
assert media.is_video() == is_video
|
||||
assert media.is_audio() == is_audio
|
||||
assert media.is_image() == is_image
|
||||
|
||||
|
||||
class TestMediaStore:
|
||||
"""Test media storage functionality."""
|
||||
|
||||
def test_store_with_no_storages(self, caplog):
|
||||
media = Media(filename="test.mp4")
|
||||
metadata = Mock()
|
||||
media.store(metadata, storages=[])
|
||||
assert "No storages found" in caplog.text
|
||||
|
||||
def test_store_with_storage(self):
|
||||
media = Media(filename="test.mp4")
|
||||
metadata = Mock()
|
||||
mock_storage = Mock()
|
||||
media.store(metadata, url="https://example.com", storages=[mock_storage])
|
||||
mock_storage.store.assert_called_once()
|
||||
|
||||
|
||||
class TestMediaInnerMedia:
|
||||
"""Test nested media retrieval."""
|
||||
|
||||
def test_all_inner_media_no_nested(self):
|
||||
media = Media(filename="test.mp4")
|
||||
inner = list(media.all_inner_media(include_self=False))
|
||||
assert len(inner) == 0
|
||||
|
||||
inner_with_self = list(media.all_inner_media(include_self=True))
|
||||
assert len(inner_with_self) == 1
|
||||
assert inner_with_self[0] is media
|
||||
|
||||
def test_all_inner_media_with_nested(self):
|
||||
parent = Media(filename="parent.mp4")
|
||||
child = Media(filename="child.jpg")
|
||||
grandchild = Media(filename="grandchild.png")
|
||||
|
||||
child.set("thumbnail", grandchild)
|
||||
parent.set("preview", child)
|
||||
|
||||
inner = list(parent.all_inner_media(include_self=False))
|
||||
assert len(inner) == 2
|
||||
assert child in inner
|
||||
assert grandchild in inner
|
||||
|
||||
def test_all_inner_media_with_list_property(self):
|
||||
parent = Media(filename="parent.mp4")
|
||||
child1 = Media(filename="frame1.jpg")
|
||||
child2 = Media(filename="frame2.jpg")
|
||||
|
||||
parent.set("frames", [child1, child2])
|
||||
|
||||
inner = list(parent.all_inner_media(include_self=False))
|
||||
assert len(inner) == 2
|
||||
assert child1 in inner
|
||||
assert child2 in inner
|
||||
|
||||
|
||||
class TestMediaIsStored:
|
||||
"""Test the is_stored method."""
|
||||
|
||||
def test_is_stored_no_urls(self):
|
||||
media = Media(filename="test.mp4")
|
||||
storage = Mock()
|
||||
storage.config = {"steps": {"storages": ["s3", "local"]}}
|
||||
assert media.is_stored(storage) is False
|
||||
|
||||
def test_is_stored_partial_urls(self):
|
||||
media = Media(filename="test.mp4")
|
||||
media.add_url("https://s3.example.com/test.mp4")
|
||||
storage = Mock()
|
||||
storage.config = {"steps": {"storages": ["s3", "local"]}}
|
||||
assert media.is_stored(storage) is False
|
||||
|
||||
def test_is_stored_full_urls(self):
|
||||
media = Media(filename="test.mp4")
|
||||
media.add_url("https://s3.example.com/test.mp4")
|
||||
media.add_url("file:///local/test.mp4")
|
||||
storage = Mock()
|
||||
storage.config = {"steps": {"storages": ["s3", "local"]}}
|
||||
assert media.is_stored(storage) is True
|
||||
|
||||
|
||||
class TestMediaValidVideo:
|
||||
"""Test video validation functionality."""
|
||||
|
||||
def test_is_valid_video_with_valid_probe(self):
|
||||
media = Media(filename="test.mp4")
|
||||
|
||||
mock_streams = {"streams": [{"duration_ts": 1000}]}
|
||||
|
||||
with patch("ffmpeg.probe", return_value=mock_streams):
|
||||
assert media.is_valid_video() is True
|
||||
|
||||
def test_is_valid_video_with_no_duration(self):
|
||||
media = Media(filename="test.mp4")
|
||||
|
||||
mock_streams = {"streams": [{"duration_ts": 0}]}
|
||||
|
||||
with patch("ffmpeg.probe", return_value=mock_streams):
|
||||
assert media.is_valid_video() is False
|
||||
|
||||
def test_is_valid_video_with_ffmpeg_error(self):
|
||||
media = Media(filename="test.mp4")
|
||||
|
||||
with patch("ffmpeg.probe", side_effect=Exception("ffmpeg error")):
|
||||
with patch("os.path.getsize", return_value=100):
|
||||
# Falls back to file size check, small file
|
||||
assert media.is_valid_video() is False
|
||||
|
||||
with patch("os.path.getsize", return_value=30000):
|
||||
# Falls back to file size check, larger file
|
||||
assert media.is_valid_video() is True
|
||||
98
tests/core/test_validators.py
Normal file
98
tests/core/test_validators.py
Normal file
@@ -0,0 +1,98 @@
|
||||
"""
|
||||
Tests for validators module from auto_archiver.core.validators
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core.validators import positive_number, valid_file, json_loader
|
||||
|
||||
|
||||
class TestPositiveNumber:
|
||||
"""Test the positive_number validator."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"value,expected",
|
||||
[
|
||||
(0, 0),
|
||||
(1, 1),
|
||||
(100, 100),
|
||||
(0.5, 0.5),
|
||||
(999999, 999999),
|
||||
],
|
||||
)
|
||||
def test_positive_values(self, value, expected):
|
||||
assert positive_number(value) == expected
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"value",
|
||||
[
|
||||
-1,
|
||||
-100,
|
||||
-0.5,
|
||||
-999999,
|
||||
],
|
||||
)
|
||||
def test_negative_values_raise_error(self, value):
|
||||
with pytest.raises(argparse.ArgumentTypeError) as exc_info:
|
||||
positive_number(value)
|
||||
assert "not a positive number" in str(exc_info.value)
|
||||
|
||||
|
||||
class TestValidFile:
|
||||
"""Test the valid_file validator."""
|
||||
|
||||
def test_valid_file_exists(self, tmp_path):
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("test content")
|
||||
result = valid_file(str(test_file))
|
||||
assert result == str(test_file)
|
||||
|
||||
def test_valid_file_not_exists(self):
|
||||
with pytest.raises(argparse.ArgumentTypeError) as exc_info:
|
||||
valid_file("/nonexistent/path/to/file.txt")
|
||||
assert "does not exist" in str(exc_info.value)
|
||||
|
||||
def test_valid_file_directory_not_file(self, tmp_path):
|
||||
# A directory is not a file
|
||||
with pytest.raises(argparse.ArgumentTypeError) as exc_info:
|
||||
valid_file(str(tmp_path))
|
||||
assert "does not exist" in str(exc_info.value)
|
||||
|
||||
|
||||
class TestJsonLoader:
|
||||
"""Test the json_loader validator."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"json_str,expected",
|
||||
[
|
||||
('{"key": "value"}', {"key": "value"}),
|
||||
('{"number": 123}', {"number": 123}),
|
||||
('{"list": [1, 2, 3]}', {"list": [1, 2, 3]}),
|
||||
('{"nested": {"inner": "value"}}', {"nested": {"inner": "value"}}),
|
||||
("[]", []),
|
||||
("[1, 2, 3]", [1, 2, 3]),
|
||||
('"string"', "string"),
|
||||
("123", 123),
|
||||
("true", True),
|
||||
("false", False),
|
||||
("null", None),
|
||||
],
|
||||
)
|
||||
def test_valid_json(self, json_str, expected):
|
||||
assert json_loader(json_str) == expected
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"invalid_json",
|
||||
[
|
||||
"{invalid}",
|
||||
"{'single': 'quotes'}",
|
||||
"{missing: quotes}",
|
||||
'{"unclosed": "brace"',
|
||||
"",
|
||||
],
|
||||
)
|
||||
def test_invalid_json_raises_error(self, invalid_json):
|
||||
with pytest.raises(json.JSONDecodeError):
|
||||
json_loader(invalid_json)
|
||||
62
tests/databases/test_console_db.py
Normal file
62
tests/databases/test_console_db.py
Normal file
@@ -0,0 +1,62 @@
|
||||
"""
|
||||
Tests for the ConsoleDb module
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def console_db(setup_module):
|
||||
return setup_module("console_db")
|
||||
|
||||
|
||||
class TestConsoleDb:
|
||||
"""Test the ConsoleDb functionality."""
|
||||
|
||||
def test_started_logs_info(self, console_db, make_item, caplog):
|
||||
"""Test that started() logs an info message."""
|
||||
item = make_item("https://example.com/test")
|
||||
|
||||
with caplog.at_level("INFO"):
|
||||
console_db.started(item)
|
||||
|
||||
assert "STARTED" in caplog.text
|
||||
assert "example.com" in caplog.text
|
||||
|
||||
def test_failed_logs_error(self, console_db, make_item, caplog):
|
||||
"""Test that failed() logs an error message with reason."""
|
||||
item = make_item("https://example.com/test")
|
||||
reason = "Connection timeout"
|
||||
|
||||
with caplog.at_level("ERROR"):
|
||||
console_db.failed(item, reason)
|
||||
|
||||
assert "FAILED" in caplog.text
|
||||
assert "Connection timeout" in caplog.text
|
||||
|
||||
def test_aborted_logs_warning(self, console_db, make_item, caplog):
|
||||
"""Test that aborted() logs a warning message."""
|
||||
item = make_item("https://example.com/test")
|
||||
|
||||
with caplog.at_level("WARNING"):
|
||||
console_db.aborted(item)
|
||||
|
||||
assert "ABORTED" in caplog.text
|
||||
|
||||
def test_done_logs_success(self, console_db, make_item, caplog):
|
||||
"""Test that done() logs a success message."""
|
||||
item = make_item("https://example.com/test")
|
||||
|
||||
with caplog.at_level("INFO"):
|
||||
console_db.done(item)
|
||||
|
||||
assert "DONE" in caplog.text
|
||||
|
||||
def test_done_cached(self, console_db, make_item, caplog):
|
||||
"""Test done() with cached=True (should behave the same)."""
|
||||
item = make_item("https://example.com/test")
|
||||
|
||||
with caplog.at_level("INFO"):
|
||||
console_db.done(item, cached=True)
|
||||
|
||||
assert "DONE" in caplog.text
|
||||
277
tests/enrichers/test_ghostarchive_enricher.py
Normal file
277
tests/enrichers/test_ghostarchive_enricher.py
Normal file
@@ -0,0 +1,277 @@
|
||||
import pytest
|
||||
import requests
|
||||
import os
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher import GhostarchiveEnricher
|
||||
|
||||
CI = os.getenv("GITHUB_ACTIONS", "") == "true"
|
||||
|
||||
# sample HTML responses for mocking
|
||||
SEARCH_HTML_FOUND = """
|
||||
<html><body>
|
||||
<h1>Archives for https://example.com</h1>
|
||||
<table>
|
||||
<tr><td><a href="http://ghostarchive.org/archive/Abc12">https://example.com</a></td></tr>
|
||||
</table>
|
||||
</body></html>
|
||||
"""
|
||||
|
||||
SEARCH_HTML_NOT_FOUND = """
|
||||
<html><body>
|
||||
<h1>Archives for https://example.com</h1>
|
||||
<p>Page 0 out of 0</p>
|
||||
<p>No archives for that site.</p>
|
||||
</body></html>
|
||||
"""
|
||||
|
||||
SAVE_RESPONSE_HTML_WITH_LINK = """
|
||||
<html><body>
|
||||
<h1>Archive saved</h1>
|
||||
<a href="/archive/Xyz99">View archive</a>
|
||||
</body></html>
|
||||
"""
|
||||
|
||||
ENRICHER_CONFIG = {
|
||||
"timeout": 120,
|
||||
"check_existing": True,
|
||||
"proxy_http": None,
|
||||
"proxy_https": None,
|
||||
}
|
||||
|
||||
|
||||
class TestGhostarchiveEnricher:
|
||||
"""Tests for Ghost Archive Enricher"""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_enricher(self, setup_module):
|
||||
self.enricher: GhostarchiveEnricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
|
||||
|
||||
def test_search_existing_found(self, mocker):
|
||||
"""When an existing archive is found, it should be returned."""
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.text = SEARCH_HTML_FOUND
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
|
||||
)
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
assert result == "https://ghostarchive.org/archive/Abc12"
|
||||
|
||||
def test_search_existing_not_found(self, mocker):
|
||||
"""When no existing archive is found, None should be returned."""
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.text = SEARCH_HTML_NOT_FOUND
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
|
||||
)
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
assert result is None
|
||||
|
||||
def test_search_existing_request_error(self, mocker):
|
||||
"""When search request fails, None should be returned."""
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get",
|
||||
side_effect=requests.exceptions.ConnectionError("connection failed"),
|
||||
)
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
assert result is None
|
||||
|
||||
def test_search_existing_non_200(self, mocker):
|
||||
"""When search returns non-200, None should be returned."""
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.status_code = 503
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
|
||||
)
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
assert result is None
|
||||
|
||||
def test_submit_url_success_redirect(self, mocker):
|
||||
"""Successful submission via headless browser should return archive URL."""
|
||||
mock_sb = MagicMock()
|
||||
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive/NewId1"
|
||||
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
|
||||
mock_sb.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
|
||||
|
||||
result = self.enricher._submit_url("https://example.com")
|
||||
assert result == "https://ghostarchive.org/archive/NewId1"
|
||||
mock_sb.type.assert_called_once()
|
||||
mock_sb.click.assert_called_once()
|
||||
|
||||
def test_submit_url_success_redirect_strips_query(self, mocker):
|
||||
"""Redirect URL query params should be stripped."""
|
||||
mock_sb = MagicMock()
|
||||
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive/NewId1?wr=false"
|
||||
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
|
||||
mock_sb.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
|
||||
|
||||
result = self.enricher._submit_url("https://example.com")
|
||||
assert result == "https://ghostarchive.org/archive/NewId1"
|
||||
|
||||
def test_submit_url_success_html_fallback(self, mocker):
|
||||
"""When browser doesn't redirect, should parse page source for archive link."""
|
||||
mock_sb = MagicMock()
|
||||
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive2"
|
||||
mock_sb.get_page_source.return_value = SAVE_RESPONSE_HTML_WITH_LINK
|
||||
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
|
||||
mock_sb.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
# make timeout=0 so the polling loop exits immediately and falls through to HTML parsing
|
||||
self.enricher.timeout = 0
|
||||
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
|
||||
|
||||
result = self.enricher._submit_url("https://example.com")
|
||||
assert result == "https://ghostarchive.org/archive/Xyz99"
|
||||
|
||||
def test_submit_url_browser_error(self, mocker):
|
||||
"""Browser error during submission should return None."""
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB",
|
||||
side_effect=Exception("browser failed to start"),
|
||||
)
|
||||
|
||||
result = self.enricher._submit_url("https://example.com")
|
||||
assert result is None
|
||||
|
||||
def test_proxy_configuration(self, mocker):
|
||||
"""Proxies should be passed to search requests when configured."""
|
||||
self.enricher.proxy_http = "http://proxy:8080"
|
||||
self.enricher.proxy_https = "https://proxy:8443"
|
||||
|
||||
mock_get = mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get",
|
||||
)
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.text = SEARCH_HTML_FOUND
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
|
||||
call_kwargs = mock_get.call_args
|
||||
assert call_kwargs.kwargs.get("proxies") == {"http": "http://proxy:8080", "https": "https://proxy:8443"}
|
||||
assert result is not None
|
||||
|
||||
def test_parse_archive_url_with_replay_links(self):
|
||||
"""Parser should ignore /replay/ links and only return /archive/ links."""
|
||||
html = """
|
||||
<html><body>
|
||||
<a href="/archive/replay/w/id-abc/mp_/https://example.com">replay</a>
|
||||
<a href="/archive/Valid1">valid</a>
|
||||
</body></html>
|
||||
"""
|
||||
result = self.enricher._parse_archive_url(html)
|
||||
assert result == "https://ghostarchive.org/archive/Valid1"
|
||||
|
||||
def test_parse_archive_url_no_links(self):
|
||||
"""Parser should return None when no archive links found."""
|
||||
html = "<html><body><p>No archive here</p></body></html>"
|
||||
result = self.enricher._parse_archive_url(html)
|
||||
assert result is None
|
||||
|
||||
def test_enrich_sets_ghostarchive_on_metadata(self, mocker, make_item):
|
||||
"""enrich() should set 'ghostarchive' key on the metadata object."""
|
||||
mocker.patch.object(self.enricher, "_search_existing", return_value="https://ghostarchive.org/archive/Enr1")
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/Enr1"
|
||||
|
||||
def test_enrich_skips_if_already_enriched(self, mocker, make_item):
|
||||
"""enrich() should skip if ghostarchive key is already set."""
|
||||
mock_search = mocker.patch.object(self.enricher, "_search_existing")
|
||||
|
||||
item = make_item("https://example.com", ghostarchive="https://ghostarchive.org/archive/Old1")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
mock_search.assert_not_called()
|
||||
|
||||
def test_enrich_returns_false_on_failure(self, mocker, make_item):
|
||||
"""enrich() should return False when both search and submit fail."""
|
||||
mocker.patch.object(self.enricher, "_search_existing", return_value=None)
|
||||
mocker.patch.object(self.enricher, "_submit_url", return_value=None)
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is False
|
||||
|
||||
def test_enrich_skips_auth_wall(self, mocker, make_item):
|
||||
"""enrich() should skip URLs behind auth walls."""
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.UrlUtil.is_auth_wall", return_value=True
|
||||
)
|
||||
|
||||
item = make_item("https://example.com/login")
|
||||
result = self.enricher.enrich(item)
|
||||
assert result is False
|
||||
|
||||
def test_enrich_with_existing_archive(self, mocker, make_item):
|
||||
"""enrich() should use existing archive when check_existing is True."""
|
||||
mocker.patch.object(self.enricher, "_search_existing", return_value="https://ghostarchive.org/archive/Exist1")
|
||||
mock_submit = mocker.patch.object(self.enricher, "_submit_url")
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/Exist1"
|
||||
mock_submit.assert_not_called()
|
||||
|
||||
def test_enrich_submits_when_no_existing(self, mocker, make_item):
|
||||
"""enrich() should submit URL when no existing archive found."""
|
||||
mocker.patch.object(self.enricher, "_search_existing", return_value=None)
|
||||
mocker.patch.object(self.enricher, "_submit_url", return_value="https://ghostarchive.org/archive/New42")
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/New42"
|
||||
|
||||
def test_enrich_skips_check_existing_when_disabled(self, mocker, make_item):
|
||||
"""enrich() should skip search when check_existing is False."""
|
||||
self.enricher.check_existing = False
|
||||
mock_search = mocker.patch.object(self.enricher, "_search_existing")
|
||||
mocker.patch.object(self.enricher, "_submit_url", return_value="https://ghostarchive.org/archive/Direct1")
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
mock_search.assert_not_called()
|
||||
|
||||
@pytest.mark.download
|
||||
def test_real_search_existing(self, setup_module):
|
||||
"""Integration test: search for an existing archive on Ghost Archive."""
|
||||
enricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
|
||||
# example.com is commonly archived
|
||||
result = enricher._search_existing("https://example.com")
|
||||
# we just check it doesn't crash; result may or may not be found
|
||||
assert result is None or result.startswith("https://ghostarchive.org/archive/")
|
||||
|
||||
@pytest.mark.download
|
||||
@pytest.mark.skipif(CI, reason="Avoid submitting a real task on every CI run")
|
||||
def test_real_submit_example_com(self, setup_module, make_item):
|
||||
"""Integration test: submit example.com to Ghost Archive and verify enrichment."""
|
||||
enricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
|
||||
item = make_item("https://example.com")
|
||||
result = enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
archive_url = item.get("ghostarchive")
|
||||
assert archive_url is not None
|
||||
assert archive_url.startswith("https://ghostarchive.org/archive/")
|
||||
72
tests/enrichers/test_json_enricher.py
Normal file
72
tests/enrichers/test_json_enricher.py
Normal file
@@ -0,0 +1,72 @@
|
||||
"""
|
||||
Tests for the JsonEnricher module
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def json_enricher(setup_module):
|
||||
return setup_module("json_enricher")
|
||||
|
||||
|
||||
class TestJsonEnricher:
|
||||
"""Test the JsonEnricher functionality."""
|
||||
|
||||
def test_enrich_creates_json_file(self, json_enricher, make_item):
|
||||
"""Test that enrich creates a metadata.json file."""
|
||||
item = make_item("https://example.com/test")
|
||||
item.set("title", "Test Title")
|
||||
item.set("description", "Test description")
|
||||
|
||||
json_enricher.enrich(item)
|
||||
|
||||
# Check that a media with id 'metadata_json' was added
|
||||
json_media = item.get_media_by_id("metadata_json")
|
||||
assert json_media is not None
|
||||
assert json_media.filename.endswith("metadata.json")
|
||||
assert os.path.exists(json_media.filename)
|
||||
|
||||
def test_enrich_json_content(self, json_enricher, make_item):
|
||||
"""Test that the JSON content is correct."""
|
||||
item = make_item("https://example.com/test")
|
||||
item.set("title", "Test Title")
|
||||
item.set("custom_field", "custom_value")
|
||||
|
||||
json_enricher.enrich(item)
|
||||
|
||||
json_media = item.get_media_by_id("metadata_json")
|
||||
with open(json_media.filename, "r", encoding="utf-8") as f:
|
||||
content = json.load(f)
|
||||
|
||||
# The to_dict() returns nested structure: {status, metadata: {...}, media: [...]}
|
||||
assert content["metadata"]["title"] == "Test Title"
|
||||
assert content["metadata"]["custom_field"] == "custom_value"
|
||||
assert content["metadata"]["url"] == "https://example.com/test"
|
||||
|
||||
def test_enrich_handles_special_characters(self, json_enricher, make_item):
|
||||
"""Test that special characters are handled correctly."""
|
||||
item = make_item("https://example.com/test")
|
||||
item.set("title", "Test with émojis 🎉 and üñíçödé")
|
||||
|
||||
json_enricher.enrich(item)
|
||||
|
||||
json_media = item.get_media_by_id("metadata_json")
|
||||
with open(json_media.filename, "r", encoding="utf-8") as f:
|
||||
content = json.load(f)
|
||||
|
||||
# Access the nested metadata structure
|
||||
assert "émojis 🎉" in content["metadata"]["title"]
|
||||
assert "üñíçödé" in content["metadata"]["title"]
|
||||
|
||||
def test_enrich_empty_metadata(self, json_enricher, make_item):
|
||||
"""Test enriching metadata with minimal content."""
|
||||
item = make_item("https://example.com/minimal")
|
||||
|
||||
json_enricher.enrich(item)
|
||||
|
||||
json_media = item.get_media_by_id("metadata_json")
|
||||
assert json_media is not None
|
||||
assert os.path.exists(json_media.filename)
|
||||
@@ -53,6 +53,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
|
||||
}
|
||||
|
||||
@pytest.mark.download
|
||||
@pytest.mark.flaky(reruns=2, reruns_delay=5)
|
||||
@pytest.mark.parametrize(
|
||||
"url,in_title,in_text,image_count,video_count,skip_ci",
|
||||
[
|
||||
@@ -60,7 +61,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
|
||||
"https://en.wikipedia.org/wiki/Western_barn_owl",
|
||||
"western barn owl",
|
||||
"Tyto alba",
|
||||
5,
|
||||
3, # Reduced due to Wikipedia rate limiting (429 errors)
|
||||
0,
|
||||
False,
|
||||
),
|
||||
@@ -128,6 +129,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
|
||||
item = make_item(url)
|
||||
result = self.extractor.download(item)
|
||||
|
||||
assert result, f"download() returned {result!r} — Selenium may have failed (e.g., window close timeout)"
|
||||
assert result.status == "antibot", "Expected status to be 'antibot'"
|
||||
|
||||
# Check title contains all required words (case-insensitive)
|
||||
@@ -142,9 +144,9 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
|
||||
)
|
||||
|
||||
image_media = [m for m in result.media if m.is_image() and not m.get("id") == "screenshot"]
|
||||
assert len(image_media) == image_count, f"Expected {image_count} image items, got {len(image_media)}"
|
||||
assert len(image_media) >= image_count, f"Expected at least {image_count} image items, got {len(image_media)}"
|
||||
video_media = [m for m in result.media if m.is_video()]
|
||||
assert len(video_media) == video_count, f"Expected {video_count} video items, got {len(video_media)}"
|
||||
assert len(video_media) >= video_count, f"Expected at least {video_count} video items, got {len(video_media)}"
|
||||
|
||||
for expected_id in ["screenshot", "pdf", "html_source_code"]:
|
||||
assert any(m.get("id") == expected_id for m in result.media), (
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
import os
|
||||
from datetime import date
|
||||
|
||||
@@ -60,3 +61,53 @@ def test_valid_url_regex(url, expected, get_lazy_module):
|
||||
def test_invite_pattern_regex(invite, expected, get_lazy_module):
|
||||
match = TelethonExtractor.invite_pattern.search(invite)
|
||||
assert bool(match) == expected
|
||||
|
||||
|
||||
def test_setup_with_closed_event_loop(get_lazy_module, tmp_path, mocker):
|
||||
"""
|
||||
Simulate the Celery worker scenario where the asyncio event loop is closed
|
||||
before setup() runs. The fix should create a new event loop so that
|
||||
TelegramClient.start() does not raise 'Event loop is closed'.
|
||||
"""
|
||||
# create a session file so setup doesn't fail on missing file
|
||||
session_file = tmp_path / "test.session"
|
||||
session_file.touch()
|
||||
|
||||
# close the current event loop to simulate a Celery worker environment
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.close()
|
||||
|
||||
lazy_module = get_lazy_module("telethon_extractor")
|
||||
module = lazy_module.load(
|
||||
{"telethon_extractor": {"session_file": str(session_file), "api_id": 123, "api_hash": "ABC"}}
|
||||
)
|
||||
|
||||
# setup should have succeeded and a new open event loop should exist
|
||||
new_loop = asyncio.get_event_loop()
|
||||
assert not new_loop.is_closed()
|
||||
assert module.client is not None
|
||||
|
||||
|
||||
def test_setup_with_no_event_loop(get_lazy_module, tmp_path, mocker):
|
||||
"""
|
||||
Simulate the scenario where there is no current event loop at all
|
||||
(e.g. running in a non-main thread). The fix should create one.
|
||||
"""
|
||||
session_file = tmp_path / "test.session"
|
||||
session_file.touch()
|
||||
|
||||
# Remove the current event loop entirely
|
||||
# In Python 3.12+, get_event_loop() in a non-main thread raises RuntimeError
|
||||
mocker.patch("asyncio.get_event_loop", side_effect=RuntimeError("no current event loop"))
|
||||
new_loop_mock = mocker.MagicMock()
|
||||
new_loop_mock.is_closed.return_value = False
|
||||
mocker.patch("asyncio.new_event_loop", return_value=new_loop_mock)
|
||||
set_loop = mocker.patch("asyncio.set_event_loop")
|
||||
|
||||
lazy_module = get_lazy_module("telethon_extractor")
|
||||
lazy_module.load({"telethon_extractor": {"session_file": str(session_file), "api_id": 123, "api_hash": "ABC"}})
|
||||
|
||||
# a new event loop should have been created and set
|
||||
asyncio.new_event_loop.assert_called_once()
|
||||
set_loop.assert_called_once_with(new_loop_mock)
|
||||
|
||||
238
tests/extractors/test_twitter_dropin.py
Normal file
238
tests/extractors/test_twitter_dropin.py
Normal file
@@ -0,0 +1,238 @@
|
||||
"""
|
||||
Tests for the Twitter dropin extractor with fxtwitter fallback
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from auto_archiver.modules.generic_extractor.twitter import Twitter
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def twitter_dropin():
|
||||
return Twitter()
|
||||
|
||||
|
||||
class TestTwitterFxTwitterFallback:
|
||||
"""Test the fxtwitter API fallback functionality."""
|
||||
|
||||
@pytest.fixture
|
||||
def mock_fxtwitter_video_response(self):
|
||||
return {
|
||||
"code": 200,
|
||||
"message": "OK",
|
||||
"tweet": {
|
||||
"url": "https://x.com/user/status/123456789",
|
||||
"id": "123456789",
|
||||
"text": "Test tweet with video",
|
||||
"author": {
|
||||
"id": "111",
|
||||
"name": "Test User",
|
||||
"screen_name": "testuser",
|
||||
},
|
||||
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
|
||||
"media": {
|
||||
"all": [
|
||||
{
|
||||
"type": "video",
|
||||
"url": "https://video.twimg.com/test.mp4",
|
||||
"variants": [
|
||||
{"url": "https://video.twimg.com/test.m3u8", "content_type": "application/x-mpegURL"},
|
||||
{
|
||||
"url": "https://video.twimg.com/test_480.mp4",
|
||||
"content_type": "video/mp4",
|
||||
"bitrate": 632000,
|
||||
},
|
||||
{
|
||||
"url": "https://video.twimg.com/test_720.mp4",
|
||||
"content_type": "video/mp4",
|
||||
"bitrate": 2176000,
|
||||
},
|
||||
],
|
||||
}
|
||||
],
|
||||
"videos": [
|
||||
{
|
||||
"url": "https://video.twimg.com/test.mp4",
|
||||
"variants": [
|
||||
{"url": "https://video.twimg.com/test.m3u8", "content_type": "application/x-mpegURL"},
|
||||
{
|
||||
"url": "https://video.twimg.com/test_480.mp4",
|
||||
"content_type": "video/mp4",
|
||||
"bitrate": 632000,
|
||||
},
|
||||
{
|
||||
"url": "https://video.twimg.com/test_720.mp4",
|
||||
"content_type": "video/mp4",
|
||||
"bitrate": 2176000,
|
||||
},
|
||||
],
|
||||
}
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def mock_fxtwitter_photo_response(self):
|
||||
return {
|
||||
"code": 200,
|
||||
"message": "OK",
|
||||
"tweet": {
|
||||
"url": "https://x.com/user/status/123456790",
|
||||
"id": "123456790",
|
||||
"text": "Test tweet with photo",
|
||||
"author": {
|
||||
"id": "111",
|
||||
"name": "Test User",
|
||||
"screen_name": "testuser",
|
||||
},
|
||||
"created_at": "Mon Feb 09 10:30:00 +0000 2026",
|
||||
"media": {
|
||||
"all": [
|
||||
{
|
||||
"type": "photo",
|
||||
"url": "https://pbs.twimg.com/media/test.jpg?name=orig",
|
||||
}
|
||||
],
|
||||
"photos": [
|
||||
{
|
||||
"type": "photo",
|
||||
"url": "https://pbs.twimg.com/media/test.jpg?name=orig",
|
||||
}
|
||||
],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
def test_fetch_fxtwitter_video(self, twitter_dropin, mock_fxtwitter_video_response):
|
||||
"""Test fetching a tweet with video via fxtwitter API."""
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_response = Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = mock_fxtwitter_video_response
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
result = twitter_dropin._fetch_fxtwitter("123456789")
|
||||
|
||||
assert result["user"]["name"] == "Test User"
|
||||
assert result["created_at"] == "Sun Feb 08 18:45:00 +0000 2026"
|
||||
assert result["full_text"] == "Test tweet with video"
|
||||
assert len(result["entities"]["media"]) == 1
|
||||
assert result["entities"]["media"][0]["type"] == "video"
|
||||
assert "video_info" in result["entities"]["media"][0]
|
||||
assert len(result["entities"]["media"][0]["video_info"]["variants"]) == 3
|
||||
|
||||
def test_fetch_fxtwitter_photo(self, twitter_dropin, mock_fxtwitter_photo_response):
|
||||
"""Test fetching a tweet with photo via fxtwitter API."""
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_response = Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = mock_fxtwitter_photo_response
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
result = twitter_dropin._fetch_fxtwitter("123456790")
|
||||
|
||||
assert result["user"]["name"] == "Test User"
|
||||
assert result["created_at"] == "Mon Feb 09 10:30:00 +0000 2026"
|
||||
assert result["full_text"] == "Test tweet with photo"
|
||||
assert len(result["entities"]["media"]) == 1
|
||||
assert result["entities"]["media"][0]["type"] == "photo"
|
||||
assert result["entities"]["media"][0]["media_url_https"] == "https://pbs.twimg.com/media/test.jpg?name=orig"
|
||||
|
||||
def test_fetch_fxtwitter_no_media(self, twitter_dropin):
|
||||
"""Test fetching a text-only tweet via fxtwitter API."""
|
||||
mock_response_data = {
|
||||
"code": 200,
|
||||
"message": "OK",
|
||||
"tweet": {
|
||||
"id": "123456791",
|
||||
"text": "Just text, no media",
|
||||
"author": {"name": "Text Only User"},
|
||||
"created_at": "Tue Feb 10 12:00:00 +0000 2026",
|
||||
"media": {},
|
||||
},
|
||||
}
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_response = Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = mock_response_data
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
result = twitter_dropin._fetch_fxtwitter("123456791")
|
||||
|
||||
assert result["user"]["name"] == "Text Only User"
|
||||
assert result["full_text"] == "Just text, no media"
|
||||
assert result["entities"]["media"] == []
|
||||
|
||||
def test_fetch_fxtwitter_api_error(self, twitter_dropin):
|
||||
"""Test handling of fxtwitter API errors."""
|
||||
with patch("requests.get") as mock_get:
|
||||
mock_response = Mock()
|
||||
mock_response.status_code = 404
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
with pytest.raises(Exception):
|
||||
twitter_dropin._fetch_fxtwitter("nonexistent")
|
||||
|
||||
|
||||
class TestTwitterChooseVariant:
|
||||
"""Test the video variant selection logic."""
|
||||
|
||||
def test_choose_highest_quality_video(self, twitter_dropin):
|
||||
"""Test that the highest quality video variant is selected."""
|
||||
variants = [
|
||||
{"url": "https://video.twimg.com/vid/320x240/test.mp4", "content_type": "video/mp4"},
|
||||
{"url": "https://video.twimg.com/vid/1280x720/test.mp4", "content_type": "video/mp4"},
|
||||
{"url": "https://video.twimg.com/vid/640x480/test.mp4", "content_type": "video/mp4"},
|
||||
]
|
||||
|
||||
result = twitter_dropin.choose_variant(variants)
|
||||
|
||||
assert result["url"] == "https://video.twimg.com/vid/1280x720/test.mp4"
|
||||
|
||||
def test_choose_variant_fallback_for_non_mp4(self, twitter_dropin):
|
||||
"""Test fallback when no mp4 variant is available."""
|
||||
variants = [
|
||||
{"url": "https://video.twimg.com/test.m3u8", "content_type": "application/x-mpegURL"},
|
||||
]
|
||||
|
||||
result = twitter_dropin.choose_variant(variants)
|
||||
|
||||
assert result["url"] == "https://video.twimg.com/test.m3u8"
|
||||
|
||||
def test_choose_variant_prefers_mp4(self, twitter_dropin):
|
||||
"""Test that mp4 is preferred over other formats when quality is equal."""
|
||||
variants = [
|
||||
{"url": "https://video.twimg.com/test.m3u8", "content_type": "application/x-mpegURL"},
|
||||
{"url": "https://video.twimg.com/vid/1280x720/test.mp4", "content_type": "video/mp4"},
|
||||
]
|
||||
|
||||
result = twitter_dropin.choose_variant(variants)
|
||||
|
||||
assert result["content_type"] == "video/mp4"
|
||||
|
||||
|
||||
@pytest.mark.download
|
||||
class TestTwitterFxTwitterLive:
|
||||
"""Live integration tests for fxtwitter API - requires network access."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"tweet_id,expected_media_type",
|
||||
[
|
||||
("2020569571682312581", "video"), # Video tweet
|
||||
("2020410438198890618", "video"), # Video tweet
|
||||
("2020341585502957801", "photo"), # Photo tweet
|
||||
],
|
||||
)
|
||||
def test_fetch_real_tweets(self, twitter_dropin, tweet_id, expected_media_type):
|
||||
"""Test fetching real tweets from fxtwitter API."""
|
||||
result = twitter_dropin._fetch_fxtwitter(tweet_id)
|
||||
|
||||
assert result["user"]["name"] # Author should be non-empty
|
||||
assert result["created_at"] # Should have timestamp
|
||||
assert result["full_text"] # Should have text content
|
||||
|
||||
media = result["entities"]["media"]
|
||||
assert len(media) >= 1
|
||||
assert media[0]["type"] == expected_media_type
|
||||
70
tests/feeders/test_cli_feeder.py
Normal file
70
tests/feeders/test_cli_feeder.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""
|
||||
Tests for the CLIFeeder module
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.modules.cli_feeder.cli_feeder import CLIFeeder
|
||||
from auto_archiver.core.consts import SetupError
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cli_feeder_instance():
|
||||
"""Create a CLIFeeder instance with mocked config."""
|
||||
|
||||
def _create(urls):
|
||||
feeder = CLIFeeder()
|
||||
# Mock the config structure that cli_feeder expects
|
||||
feeder.config = {"urls": urls}
|
||||
feeder.name = "cli_feeder"
|
||||
feeder.tmp_dir = "/tmp"
|
||||
return feeder
|
||||
|
||||
return _create
|
||||
|
||||
|
||||
class TestCLIFeeder:
|
||||
"""Test the CLIFeeder functionality."""
|
||||
|
||||
def test_iter_yields_metadata_for_urls(self, cli_feeder_instance):
|
||||
"""Test that iteration yields Metadata objects for each URL."""
|
||||
urls = ["https://example.com/1", "https://example.com/2", "https://example.com/3"]
|
||||
feeder = cli_feeder_instance(urls)
|
||||
feeder.setup()
|
||||
|
||||
items = list(feeder)
|
||||
|
||||
assert len(items) == 3
|
||||
assert all(isinstance(item, Metadata) for item in items)
|
||||
assert items[0].get_url() == "https://example.com/1"
|
||||
assert items[1].get_url() == "https://example.com/2"
|
||||
assert items[2].get_url() == "https://example.com/3"
|
||||
|
||||
def test_iter_single_url(self, cli_feeder_instance):
|
||||
"""Test iteration with a single URL."""
|
||||
feeder = cli_feeder_instance(["https://example.com/single"])
|
||||
feeder.setup()
|
||||
|
||||
items = list(feeder)
|
||||
|
||||
assert len(items) == 1
|
||||
assert items[0].get_url() == "https://example.com/single"
|
||||
|
||||
def test_setup_raises_without_urls(self, cli_feeder_instance):
|
||||
"""Test that setup raises SetupError when no URLs provided."""
|
||||
feeder = cli_feeder_instance([])
|
||||
|
||||
with pytest.raises(SetupError) as exc_info:
|
||||
feeder.setup()
|
||||
|
||||
assert "No URLs provided" in str(exc_info.value)
|
||||
|
||||
def test_setup_raises_with_none_urls(self, cli_feeder_instance):
|
||||
"""Test that setup raises SetupError when urls is None."""
|
||||
feeder = cli_feeder_instance(None)
|
||||
|
||||
with pytest.raises(SetupError) as exc_info:
|
||||
feeder.setup()
|
||||
|
||||
assert "No URLs provided" in str(exc_info.value)
|
||||
43
tests/formatters/test_mute_formatter.py
Normal file
43
tests/formatters/test_mute_formatter.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""
|
||||
Tests for the MuteFormatter module
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mute_formatter(setup_module):
|
||||
return setup_module("mute_formatter")
|
||||
|
||||
|
||||
class TestMuteFormatter:
|
||||
"""Test the MuteFormatter functionality."""
|
||||
|
||||
def test_format_returns_none(self, mute_formatter, make_item):
|
||||
"""Test that format always returns None (mutes output)."""
|
||||
item = make_item("https://example.com/test")
|
||||
item.set("title", "Test Title")
|
||||
|
||||
result = mute_formatter.format(item)
|
||||
|
||||
assert result is None
|
||||
|
||||
def test_format_with_empty_metadata(self, mute_formatter):
|
||||
"""Test format with empty metadata."""
|
||||
item = Metadata().set_url("https://example.com/empty")
|
||||
|
||||
result = mute_formatter.format(item)
|
||||
|
||||
assert result is None
|
||||
|
||||
def test_format_with_media(self, mute_formatter, make_item):
|
||||
"""Test that format still returns None even with media attached."""
|
||||
from auto_archiver.core.media import Media
|
||||
|
||||
item = make_item("https://example.com/with-media")
|
||||
item.add_media(Media(filename="test.mp4"))
|
||||
|
||||
result = mute_formatter.format(item)
|
||||
|
||||
assert result is None
|
||||
@@ -86,6 +86,22 @@ def test_media_management(basic_metadata, media_file):
|
||||
assert basic_metadata.get_media_by_id("m1") == media1
|
||||
|
||||
|
||||
def test_remove_duplicate_skips_missing_files(basic_metadata, media_file, tmp_path):
|
||||
"""Missing files should be dropped instead of crashing with FileNotFoundError."""
|
||||
real_file = tmp_path / "exists.txt"
|
||||
real_file.write_text("content")
|
||||
valid = media_file(filename=str(real_file), hash_value="abc")
|
||||
missing = media_file(filename="/nonexistent/path/gone.mp4")
|
||||
|
||||
basic_metadata.add_media(valid, "valid")
|
||||
basic_metadata.add_media(missing, "missing")
|
||||
|
||||
assert len(basic_metadata.media) == 2
|
||||
basic_metadata.remove_duplicate_media_by_hash()
|
||||
assert len(basic_metadata.media) == 1
|
||||
assert basic_metadata.get_media_by_id("valid") == valid
|
||||
|
||||
|
||||
def test_success():
|
||||
m = Metadata()
|
||||
assert not m.is_success()
|
||||
|
||||
259
tests/test_none_filename_handling.py
Normal file
259
tests/test_none_filename_handling.py
Normal file
@@ -0,0 +1,259 @@
|
||||
"""
|
||||
Tests for handling Media objects with None filename.
|
||||
|
||||
When download_from_url fails, it returns None. Various enrichers and
|
||||
the metadata deduplication logic must gracefully handle Media objects
|
||||
where filename is None, rather than crashing with TypeError.
|
||||
"""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core.metadata import Metadata, Media
|
||||
from auto_archiver.modules.hash_enricher import HashEnricher
|
||||
from auto_archiver.modules.meta_enricher import MetaEnricher
|
||||
|
||||
|
||||
# ── HashEnricher ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestHashEnricherNoneFilename:
|
||||
"""hash_enricher should skip media with None filename without crashing."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self, setup_module):
|
||||
self.enricher = setup_module(HashEnricher, {"algorithm": "SHA-256", "chunksize": 100})
|
||||
|
||||
def test_skips_none_filename(self):
|
||||
m = Metadata().set_url("https://example.com")
|
||||
media = Media(filename=None)
|
||||
media.set("src", "https://example.com/video.mp4")
|
||||
m.add_media(media)
|
||||
|
||||
# Should not raise
|
||||
self.enricher.enrich(m)
|
||||
# No hash should be set
|
||||
assert m.media[0].get("hash") is None
|
||||
|
||||
def test_hashes_valid_skips_none(self, tmp_path):
|
||||
"""Mix of valid and None-filename media: only valid ones get hashed."""
|
||||
valid_file = tmp_path / "test.txt"
|
||||
valid_file.write_text("hello world")
|
||||
|
||||
m = Metadata().set_url("https://example.com")
|
||||
m.add_media(Media(filename=str(valid_file)))
|
||||
m.add_media(Media(filename=None))
|
||||
|
||||
self.enricher.enrich(m)
|
||||
|
||||
assert m.media[0].get("hash") is not None
|
||||
assert m.media[1].get("hash") is None
|
||||
|
||||
def test_all_none_filenames(self):
|
||||
"""All media have None filename – enricher should not crash."""
|
||||
m = Metadata().set_url("https://example.com")
|
||||
m.add_media(Media(filename=None))
|
||||
m.add_media(Media(filename=None))
|
||||
|
||||
self.enricher.enrich(m)
|
||||
|
||||
assert len(m.media) == 2
|
||||
for media in m.media:
|
||||
assert media.get("hash") is None
|
||||
|
||||
|
||||
# ── MetaEnricher ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMetaEnricherNoneFilename:
|
||||
"""meta_enricher should skip media with None filename without crashing."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self, setup_module):
|
||||
self.enricher = setup_module(MetaEnricher, {})
|
||||
|
||||
def test_skips_none_filename(self):
|
||||
m = Metadata().set_url("https://example.com")
|
||||
m.set("_processed_at", datetime.now(timezone.utc))
|
||||
media = Media(filename=None)
|
||||
media.set("src", "https://example.com/video.mp4")
|
||||
m.add_media(media)
|
||||
|
||||
# Should not raise
|
||||
self.enricher.enrich(m)
|
||||
assert m.get("total_bytes") == 0
|
||||
|
||||
def test_sizes_valid_skips_none(self, tmp_path):
|
||||
"""Mix of valid and None-filename media: only valid ones get sized."""
|
||||
valid_file = tmp_path / "test.txt"
|
||||
valid_file.write_text("A" * 500)
|
||||
|
||||
m = Metadata().set_url("https://example.com")
|
||||
m.set("_processed_at", datetime.now(timezone.utc))
|
||||
m.add_media(Media(filename=str(valid_file)))
|
||||
m.add_media(Media(filename=None))
|
||||
|
||||
self.enricher.enrich(m)
|
||||
|
||||
assert m.media[0].get("bytes") == 500
|
||||
assert m.media[1].get("bytes") is None
|
||||
assert m.get("total_bytes") == 500
|
||||
|
||||
|
||||
# ── Metadata.remove_duplicate_media_by_hash ───────────────────────────
|
||||
|
||||
|
||||
class TestRemoveDuplicateMediaNoneFilename:
|
||||
"""remove_duplicate_media_by_hash should keep media with None filename."""
|
||||
|
||||
def test_none_filename_kept(self):
|
||||
m = Metadata().set_url("https://example.com")
|
||||
none_media = Media(filename=None)
|
||||
none_media.set("src", "https://example.com/video.mp4")
|
||||
m.add_media(none_media)
|
||||
|
||||
m.remove_duplicate_media_by_hash()
|
||||
|
||||
assert len(m.media) == 1
|
||||
assert m.media[0].filename is None
|
||||
|
||||
def test_none_and_valid_mixed(self, tmp_path):
|
||||
"""None-filename media is kept alongside valid-filename media."""
|
||||
valid_file = tmp_path / "test.txt"
|
||||
valid_file.write_text("content")
|
||||
|
||||
m = Metadata().set_url("https://example.com")
|
||||
m.add_media(Media(filename=str(valid_file)))
|
||||
none_media = Media(filename=None)
|
||||
none_media.set("src", "https://example.com/video.mp4")
|
||||
m.add_media(none_media)
|
||||
|
||||
m.remove_duplicate_media_by_hash()
|
||||
|
||||
assert len(m.media) == 2
|
||||
|
||||
def test_multiple_none_filename_all_kept(self):
|
||||
"""Multiple None-filename media are all kept (can't deduplicate without file)."""
|
||||
m = Metadata().set_url("https://example.com")
|
||||
m.add_media(Media(filename=None))
|
||||
m.add_media(Media(filename=None))
|
||||
|
||||
m.remove_duplicate_media_by_hash()
|
||||
|
||||
assert len(m.media) == 2
|
||||
|
||||
|
||||
# ── Twitter dropin create_metadata ────────────────────────────────────
|
||||
|
||||
|
||||
class TestTwitterDropinNoneFilename:
|
||||
"""Twitter dropin should skip media when download_from_url returns None."""
|
||||
|
||||
@pytest.fixture
|
||||
def twitter_dropin(self):
|
||||
from auto_archiver.modules.generic_extractor.twitter import Twitter
|
||||
|
||||
return Twitter()
|
||||
|
||||
def test_create_metadata_skips_failed_photo_download(self, twitter_dropin):
|
||||
"""When download_from_url returns None for a photo, it's not added to media."""
|
||||
tweet = {
|
||||
"user": {"name": "Test User"},
|
||||
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
|
||||
"full_text": "Test tweet with photo",
|
||||
"entities": {
|
||||
"media": [
|
||||
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test.jpg"},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
mock_archiver = MagicMock()
|
||||
mock_archiver.download_from_url.return_value = None # simulate failed download
|
||||
|
||||
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
|
||||
|
||||
# The result should have no media since the download failed
|
||||
assert len(result.media) == 0
|
||||
|
||||
def test_create_metadata_skips_failed_video_download(self, twitter_dropin):
|
||||
"""When download_from_url returns None for a video, it's not added to media."""
|
||||
tweet = {
|
||||
"user": {"name": "Test User"},
|
||||
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
|
||||
"full_text": "Test tweet with video",
|
||||
"entities": {
|
||||
"media": [
|
||||
{
|
||||
"type": "video",
|
||||
"video_info": {
|
||||
"variants": [
|
||||
{
|
||||
"url": "https://video.twimg.com/vid/1280x720/test.mp4",
|
||||
"content_type": "video/mp4",
|
||||
},
|
||||
]
|
||||
},
|
||||
},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
mock_archiver = MagicMock()
|
||||
mock_archiver.download_from_url.return_value = None
|
||||
|
||||
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
|
||||
|
||||
assert len(result.media) == 0
|
||||
|
||||
def test_create_metadata_keeps_successful_download(self, twitter_dropin, tmp_path):
|
||||
"""When download_from_url succeeds, media is added."""
|
||||
tweet = {
|
||||
"user": {"name": "Test User"},
|
||||
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
|
||||
"full_text": "Test tweet with photo",
|
||||
"entities": {
|
||||
"media": [
|
||||
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test.jpg"},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
test_file = tmp_path / "test.jpg"
|
||||
test_file.write_text("fake image data")
|
||||
|
||||
mock_archiver = MagicMock()
|
||||
mock_archiver.download_from_url.return_value = str(test_file)
|
||||
|
||||
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
|
||||
|
||||
assert len(result.media) == 1
|
||||
assert result.media[0].filename == str(test_file)
|
||||
|
||||
def test_create_metadata_mixed_downloads(self, twitter_dropin, tmp_path):
|
||||
"""One download succeeds, one fails – only successful one is kept."""
|
||||
tweet = {
|
||||
"user": {"name": "Test User"},
|
||||
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
|
||||
"full_text": "Test tweet with two photos",
|
||||
"entities": {
|
||||
"media": [
|
||||
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test1.jpg"},
|
||||
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test2.jpg"},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
test_file = tmp_path / "test1.jpg"
|
||||
test_file.write_text("fake image data")
|
||||
|
||||
mock_archiver = MagicMock()
|
||||
# First call succeeds, second fails
|
||||
mock_archiver.download_from_url.side_effect = [str(test_file), None]
|
||||
|
||||
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
|
||||
|
||||
assert len(result.media) == 1
|
||||
assert result.media[0].filename == str(test_file)
|
||||
@@ -1,5 +1,6 @@
|
||||
import pytest
|
||||
from argparse import ArgumentParser, ArgumentTypeError
|
||||
from requests.exceptions import SSLError
|
||||
from auto_archiver.core.orchestrator import ArchivingOrchestrator
|
||||
from auto_archiver.version import __version__
|
||||
from auto_archiver.core.config import read_yaml, store_yaml
|
||||
@@ -256,3 +257,34 @@ def test_load_failed_extractor_cleanup(test_args, mocker, caplog):
|
||||
assert "Error during setup of modules: Test exception" in caplog.text
|
||||
# make sure the 'cleanup' is called
|
||||
assert "cleanup" in caplog.text
|
||||
|
||||
|
||||
def test_check_for_updates_ssl_error(orchestrator, mocker):
|
||||
"""check_for_updates should not raise when the HTTP request fails."""
|
||||
mocker.patch(
|
||||
"auto_archiver.core.orchestrator.requests.get",
|
||||
side_effect=SSLError("SSL handshake failed"),
|
||||
)
|
||||
# should not raise
|
||||
orchestrator.check_for_updates()
|
||||
|
||||
|
||||
def test_check_for_updates_timeout(orchestrator, mocker):
|
||||
"""check_for_updates should not raise on connection timeout."""
|
||||
from requests.exceptions import ConnectionError
|
||||
|
||||
mocker.patch(
|
||||
"auto_archiver.core.orchestrator.requests.get",
|
||||
side_effect=ConnectionError("Connection refused"),
|
||||
)
|
||||
orchestrator.check_for_updates()
|
||||
|
||||
|
||||
def test_check_for_updates_new_version_available(orchestrator, mocker):
|
||||
"""check_for_updates should not raise when a newer version exists."""
|
||||
mocker.patch(
|
||||
"auto_archiver.core.orchestrator.requests.get",
|
||||
return_value=mocker.Mock(json=lambda: {"info": {"version": "99.0.0"}}),
|
||||
)
|
||||
# should complete without error
|
||||
orchestrator.check_for_updates()
|
||||
|
||||
@@ -14,6 +14,7 @@ from auto_archiver.utils.misc import (
|
||||
calculate_file_hash,
|
||||
random_str,
|
||||
get_timestamp,
|
||||
ydl_entry_to_filename,
|
||||
)
|
||||
|
||||
|
||||
@@ -139,3 +140,47 @@ class TestMiscUtils:
|
||||
|
||||
def test_invalid_timestamp_returns_none(self):
|
||||
assert get_timestamp("invalid-date") is None
|
||||
|
||||
|
||||
class TestYdlEntryToFilename:
|
||||
"""Tests for ydl_entry_to_filename, especially .part file filtering."""
|
||||
|
||||
def _make_mock_ydl(self, prepared_filename):
|
||||
class MockYDL:
|
||||
def prepare_filename(self, entry):
|
||||
return prepared_filename
|
||||
|
||||
return MockYDL()
|
||||
|
||||
def test_returns_exact_file_if_exists(self, tmp_path):
|
||||
video = tmp_path / "video.mp4"
|
||||
video.write_bytes(b"data")
|
||||
ydl = self._make_mock_ydl(str(video))
|
||||
assert ydl_entry_to_filename(ydl, {}) == str(video)
|
||||
|
||||
def test_skips_part_file_returns_complete(self, tmp_path):
|
||||
"""Simulates yt-dlp leaving a .part file from a failed format
|
||||
while a complete .webm exists."""
|
||||
(tmp_path / "f5U3IKfoSYs.f399.mp4.part").write_bytes(b"incomplete")
|
||||
webm = tmp_path / "f5U3IKfoSYs.webm"
|
||||
webm.write_bytes(b"complete video")
|
||||
|
||||
# ydl.prepare_filename returns the expected .mp4 which doesn't exist
|
||||
ydl = self._make_mock_ydl(str(tmp_path / "f5U3IKfoSYs.mp4"))
|
||||
result = ydl_entry_to_filename(ydl, {})
|
||||
|
||||
assert result == str(webm)
|
||||
assert not result.endswith(".part")
|
||||
|
||||
def test_skips_part_file_returns_false_if_no_other_match(self, tmp_path):
|
||||
"""Only a .part file exists — should return False."""
|
||||
(tmp_path / "video.f399.mp4.part").write_bytes(b"incomplete")
|
||||
|
||||
ydl = self._make_mock_ydl(str(tmp_path / "video.mp4"))
|
||||
assert ydl_entry_to_filename(ydl, {}) is False
|
||||
|
||||
def test_returns_false_when_no_files_match(self, tmp_path):
|
||||
(tmp_path / "unrelated.txt").write_bytes(b"data")
|
||||
|
||||
ydl = self._make_mock_ydl(str(tmp_path / "video.mp4"))
|
||||
assert ydl_entry_to_filename(ydl, {}) is False
|
||||
|
||||
Reference in New Issue
Block a user