mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-08 03:18:28 +03:00
Compare commits
27 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e651bb849 | ||
|
|
6581bbe139 | ||
|
|
e633be1721 | ||
|
|
bc06de8e5c | ||
|
|
20fddce3a3 | ||
|
|
6efa439cdb | ||
|
|
ef77d1fc86 | ||
|
|
a57a5ee005 | ||
|
|
2582f567ac | ||
|
|
4e5c1a6218 | ||
|
|
12d9c469b2 | ||
|
|
792838f1a1 | ||
|
|
17c4ae15eb | ||
|
|
a08af07348 | ||
|
|
e54077f4e8 | ||
|
|
319c0528da | ||
|
|
ae0e53e434 | ||
|
|
82fc786d56 | ||
|
|
aa65299844 | ||
|
|
1b69ec1f00 | ||
|
|
304e5d40b1 | ||
|
|
3194fee95d | ||
|
|
0040810e2e | ||
|
|
63cfe34e23 | ||
|
|
23a88e3cf4 | ||
|
|
3cac160cc1 | ||
|
|
e9a92272c5 |
@@ -1,18 +1,17 @@
|
||||
FROM webrecorder/browsertrix-crawler:1.11.4 AS base
|
||||
FROM webrecorder/browsertrix-crawler:1.12.4 AS base
|
||||
|
||||
ENV RUNNING_IN_DOCKER=1 \
|
||||
LANG=C.UTF-8 \
|
||||
LC_ALL=C.UTF-8 \
|
||||
PYTHONDONTWRITEBYTECODE=1 \
|
||||
PYTHONFAULTHANDLER=1 \
|
||||
PATH="/root/.local/bin:$PATH"
|
||||
PYTHONFAULTHANDLER=1
|
||||
|
||||
|
||||
ARG TARGETARCH
|
||||
|
||||
# Installing system dependencies
|
||||
RUN apt-get update && \
|
||||
apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool python3-tk
|
||||
apt-get install -y --no-install-recommends gcc ffmpeg fonts-noto exiftool python3-tk
|
||||
|
||||
# Poetry and runtime
|
||||
FROM base AS runtime
|
||||
|
||||
1428
poetry.lock
generated
1428
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[project]
|
||||
name = "auto-archiver"
|
||||
version = "1.2.3"
|
||||
version = "1.2.7"
|
||||
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
|
||||
|
||||
requires-python = ">=3.10,<3.13"
|
||||
|
||||
@@ -11,6 +11,7 @@ Key Functionalities:
|
||||
|
||||
from __future__ import annotations
|
||||
import hashlib
|
||||
import os
|
||||
from typing import Any, List, Union, Dict
|
||||
from dataclasses import dataclass, field
|
||||
from dataclasses_json import dataclass_json
|
||||
@@ -181,8 +182,14 @@ class Metadata:
|
||||
media_hashes = set()
|
||||
new_media = []
|
||||
for m in self.media:
|
||||
if not m.filename:
|
||||
new_media.append(m)
|
||||
continue
|
||||
h = m.get("hash")
|
||||
if not h:
|
||||
if not os.path.exists(m.filename):
|
||||
logger.warning(f"Skipping missing media file: {m.filename}")
|
||||
continue
|
||||
h = calculate_hash_in_chunks(hashlib.sha256(), int(1.6e7), m.filename)
|
||||
if len(h) and h in media_hashes:
|
||||
continue
|
||||
|
||||
@@ -467,7 +467,11 @@ Here's how that would look: \n\nsteps:\n extractors:\n - [your_extractor_name_
|
||||
return self.setup_complete_parser(basic_config, yaml_config, unused_args)
|
||||
|
||||
def check_for_updates(self):
|
||||
response = requests.get("https://pypi.org/pypi/auto-archiver/json").json()
|
||||
try:
|
||||
response = requests.get("https://pypi.org/pypi/auto-archiver/json", timeout=10).json()
|
||||
except Exception as e:
|
||||
logger.debug(f"Unable to check for updates: {e}")
|
||||
return
|
||||
latest_version = version.parse(response["info"]["version"])
|
||||
current_version = version.parse(__version__)
|
||||
# check version compared to current version
|
||||
|
||||
@@ -73,6 +73,7 @@ class AntibotExtractorEnricher(Extractor, Enricher):
|
||||
if self.enrich(result):
|
||||
result.status = "antibot"
|
||||
return result
|
||||
return False
|
||||
|
||||
def _prepare_user_data_dir(self):
|
||||
if self.user_data_dir:
|
||||
|
||||
@@ -39,12 +39,18 @@ class Bluesky(GenericDropin):
|
||||
media_url = "https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={}&did={}"
|
||||
for image_media in image_medias:
|
||||
url = media_url.format(image_media["image"]["ref"]["$link"], post["author"]["did"])
|
||||
image_media = archiver.download_from_url(url)
|
||||
media.append(Media(image_media))
|
||||
filename = archiver.download_from_url(url)
|
||||
if filename:
|
||||
media.append(Media(filename))
|
||||
else:
|
||||
logger.warning(f"Failed to download Bluesky image from {url}")
|
||||
for video_media in video_medias:
|
||||
url = media_url.format(video_media["ref"]["$link"], post["author"]["did"])
|
||||
video_media = archiver.download_from_url(url)
|
||||
media.append(Media(video_media))
|
||||
filename = archiver.download_from_url(url)
|
||||
if filename:
|
||||
media.append(Media(filename))
|
||||
else:
|
||||
logger.warning(f"Failed to download Bluesky video from {url}")
|
||||
return media
|
||||
|
||||
def _get_post_data(self, post: dict) -> dict:
|
||||
|
||||
@@ -204,8 +204,11 @@ class GenericExtractor(Extractor):
|
||||
if thumbnail_url:
|
||||
try:
|
||||
cover_image_path = self.download_from_url(thumbnail_url)
|
||||
media = Media(cover_image_path)
|
||||
metadata.add_media(media, id="cover")
|
||||
if cover_image_path:
|
||||
media = Media(cover_image_path)
|
||||
metadata.add_media(media, id="cover")
|
||||
else:
|
||||
logger.warning(f"Failed to download cover image from {thumbnail_url}")
|
||||
except Exception as e:
|
||||
logger.error(f"Could not download cover image {thumbnail_url}: {e}")
|
||||
|
||||
@@ -572,6 +575,8 @@ class GenericExtractor(Extractor):
|
||||
"--live-from-start" if self.live_from_start else "--no-live-from-start",
|
||||
"--postprocessor-args",
|
||||
"ffmpeg:-bitexact", # ensure bitexact output to avoid mismatching hashes for same video
|
||||
"--js-runtimes",
|
||||
"node", # yt-dlp defaults to deno-only; node is available in the base image
|
||||
]
|
||||
|
||||
# proxy handling
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from typing import Type
|
||||
|
||||
from auto_archiver.utils import traverse_obj
|
||||
from auto_archiver.utils.custom_logger import logger
|
||||
from auto_archiver.core.metadata import Metadata, Media
|
||||
from auto_archiver.core.extractor import Extractor
|
||||
from yt_dlp.extractor.common import InfoExtractor
|
||||
@@ -58,6 +59,9 @@ class Truth(GenericDropin):
|
||||
# add the media
|
||||
for media in post.get("media_attachments", []):
|
||||
filename = archiver.download_from_url(media["url"])
|
||||
if not filename:
|
||||
logger.warning(f"Failed to download media from {media['url']}")
|
||||
continue
|
||||
result.add_media(Media(filename), id=media.get("id"))
|
||||
|
||||
return result
|
||||
|
||||
@@ -157,5 +157,8 @@ class Twitter(GenericDropin):
|
||||
mimetype = variant["content_type"]
|
||||
ext = mimetypes.guess_extension(mimetype)
|
||||
media.filename = archiver.download_from_url(media.get("src"), f"{slugify(url)}_{i}{ext}")
|
||||
if not media.filename:
|
||||
logger.warning(f"Failed to download media from {media.get('src')}")
|
||||
continue
|
||||
result.add_media(media)
|
||||
return result
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
from .ghostarchive_enricher import GhostarchiveEnricher
|
||||
@@ -0,0 +1,58 @@
|
||||
{
|
||||
"name": "Ghost Archive Enricher",
|
||||
"type": ["enricher"],
|
||||
"entry_point": "ghostarchive_enricher::GhostarchiveEnricher",
|
||||
"requires_setup": False,
|
||||
"dependencies": {
|
||||
"python": ["loguru", "requests", "bs4", "seleniumbase"],
|
||||
},
|
||||
"configs": {
|
||||
"timeout": {
|
||||
"default": 120,
|
||||
"type": "int",
|
||||
"help": "seconds to wait for successful archive confirmation from Ghost Archive.",
|
||||
},
|
||||
"check_existing": {
|
||||
"default": True,
|
||||
"type": "bool",
|
||||
"help": "whether to search for an existing archive before submitting a new one.",
|
||||
},
|
||||
"proxy_http": {
|
||||
"default": None,
|
||||
"help": "http proxy to use for requests, eg http://proxy-user:password@proxy-ip:port",
|
||||
},
|
||||
"proxy_https": {
|
||||
"default": None,
|
||||
"help": "https proxy to use for requests, eg https://proxy-user:password@proxy-ip:port",
|
||||
},
|
||||
},
|
||||
"description": """
|
||||
Submits the current URL to [Ghost Archive](https://ghostarchive.org/) for archiving and returns the archived page URL.
|
||||
|
||||
Used as an **enricher** to add a Ghost Archive URL to items already extracted by other modules.
|
||||
|
||||
### Features
|
||||
- Archives any public URL using the Ghost Archive service.
|
||||
- Optionally checks for existing archives before submitting a new one.
|
||||
- Supports HTTP and HTTPS proxies for requests.
|
||||
- Parses HTML responses to extract archive URLs (Ghost Archive has no JSON API).
|
||||
|
||||
### Important
|
||||
- This module confirms that Ghost Archive accepted the URL submission and returned an archive link.
|
||||
It does **not** verify the contents or completeness of the archived page.
|
||||
|
||||
### Notes
|
||||
- Ghost Archive is a free service with no authentication required.
|
||||
- Archived pages must be smaller than 50 MB (including CSS, fonts, images, etc.).
|
||||
- Videos are archived up to 360p and must be under 100 MB and shorter than 30 minutes.
|
||||
- Archival may take up to 5 minutes depending on the queue and page complexity.
|
||||
- Archived content is stored indefinitely.
|
||||
- Ghost Archive does not archive pages that require authentication or form submission.
|
||||
|
||||
### Limitations
|
||||
- No official API — this module interacts with the Ghost Archive web interface.
|
||||
- The submission endpoint is protected by Cloudflare, so a headless browser (SeleniumBase) is used for new submissions.
|
||||
- Searching for existing archives uses plain HTTP requests and does not require a browser.
|
||||
- Rate limiting may apply; consider using a delay between requests if archiving many URLs.
|
||||
""",
|
||||
}
|
||||
@@ -0,0 +1,153 @@
|
||||
import time
|
||||
import re
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from seleniumbase import SB
|
||||
from auto_archiver.utils.custom_logger import logger
|
||||
from auto_archiver.utils import url as UrlUtil
|
||||
from auto_archiver.core import Enricher, Metadata
|
||||
|
||||
|
||||
class GhostarchiveEnricher(Enricher):
|
||||
"""
|
||||
Submits the current URL to Ghost Archive (ghostarchive.org) for archiving
|
||||
and stores the archived page URL as enrichment metadata.
|
||||
|
||||
Ghost Archive has no official API — this module interacts with the web form
|
||||
and parses HTML responses. The submission endpoint is protected by Cloudflare,
|
||||
so a headless browser (SeleniumBase) is used for archival submissions, while
|
||||
plain HTTP requests are used for searching existing archives.
|
||||
|
||||
Note: this module only confirms that Ghost Archive accepted the submission
|
||||
and returned an archive URL. It does not verify that the archived page
|
||||
content is complete or correctly rendered.
|
||||
"""
|
||||
|
||||
GHOSTARCHIVE_BASE = "https://ghostarchive.org"
|
||||
ARCHIVE_ENDPOINT = f"{GHOSTARCHIVE_BASE}/archive2"
|
||||
SEARCH_ENDPOINT = f"{GHOSTARCHIVE_BASE}/search"
|
||||
ARCHIVE_URL_PATTERN = re.compile(r"/archive/([A-Za-z0-9]+)")
|
||||
|
||||
def _get_proxies(self) -> dict:
|
||||
proxies = {}
|
||||
if self.proxy_http:
|
||||
proxies["http"] = self.proxy_http
|
||||
if self.proxy_https:
|
||||
proxies["https"] = self.proxy_https
|
||||
return proxies
|
||||
|
||||
def _get_headers(self) -> dict:
|
||||
return {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||||
}
|
||||
|
||||
def _normalize_archive_href(self, href: str) -> str | None:
|
||||
"""Normalize an archive link href to a full HTTPS URL, filtering out replay links."""
|
||||
if "/archive/" not in href or "/replay/" in href:
|
||||
return None
|
||||
if href.startswith("/"):
|
||||
return f"{self.GHOSTARCHIVE_BASE}{href}"
|
||||
if href.startswith("http://ghostarchive.org"):
|
||||
return href.replace("http://", "https://")
|
||||
if href.startswith("https://ghostarchive.org"):
|
||||
return href
|
||||
return None
|
||||
|
||||
def _search_existing(self, url: str) -> str | None:
|
||||
"""
|
||||
Search Ghost Archive for an existing archive of the given URL.
|
||||
Returns the archive URL if found, otherwise None.
|
||||
"""
|
||||
try:
|
||||
r = requests.get(
|
||||
self.SEARCH_ENDPOINT,
|
||||
params={"term": url},
|
||||
headers=self._get_headers(),
|
||||
proxies=self._get_proxies(),
|
||||
timeout=30,
|
||||
)
|
||||
if r.status_code != 200:
|
||||
logger.warning(f"Ghost Archive search returned status {r.status_code}")
|
||||
return None
|
||||
|
||||
soup = BeautifulSoup(r.text, "html.parser")
|
||||
for link in soup.find_all("a", href=True):
|
||||
archive_url = self._normalize_archive_href(link["href"])
|
||||
if archive_url:
|
||||
logger.info(f"Found existing Ghost Archive: {archive_url}")
|
||||
return archive_url
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.warning(f"Ghost Archive search failed: {e}")
|
||||
|
||||
return None
|
||||
|
||||
def _submit_url(self, url: str) -> str | None:
|
||||
"""
|
||||
Submit a URL to Ghost Archive for archiving using a headless browser.
|
||||
The /archive2 endpoint is Cloudflare-protected, requiring JS execution.
|
||||
Returns the archive URL if successful, otherwise None.
|
||||
"""
|
||||
try:
|
||||
with SB(uc=True, headless=True) as sb:
|
||||
logger.debug("Opening Ghost Archive homepage in headless browser")
|
||||
sb.open(self.GHOSTARCHIVE_BASE)
|
||||
|
||||
# fill in the archive form and submit
|
||||
sb.type('input[name="archive"]', url)
|
||||
sb.click('input[type="submit"][value="Submit for archival"]')
|
||||
|
||||
# wait for navigation to /archive/{id} or timeout
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < self.timeout:
|
||||
current_url = sb.get_current_url()
|
||||
if self.ARCHIVE_URL_PATTERN.search(current_url):
|
||||
archive_url = current_url.split("?")[0]
|
||||
logger.info(f"Ghost Archive saved: {archive_url}")
|
||||
return archive_url
|
||||
time.sleep(2)
|
||||
|
||||
# if we didn't redirect, try parsing the page source
|
||||
page_source = sb.get_page_source()
|
||||
return self._parse_archive_url(page_source)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Ghost Archive submission failed: {e}")
|
||||
return None
|
||||
|
||||
def _parse_archive_url(self, html: str) -> str | None:
|
||||
"""Parse HTML response to find an archive URL."""
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
for link in soup.find_all("a", href=True):
|
||||
archive_url = self._normalize_archive_href(link["href"])
|
||||
if archive_url:
|
||||
return archive_url
|
||||
return None
|
||||
|
||||
def enrich(self, to_enrich: Metadata) -> bool:
|
||||
url = to_enrich.get_url()
|
||||
if UrlUtil.is_auth_wall(url):
|
||||
logger.debug("[SKIP] Ghost Archive since url is behind AUTH WALL")
|
||||
return False
|
||||
|
||||
if to_enrich.get("ghostarchive"):
|
||||
logger.info(f"Ghost Archive enricher had already been executed: {to_enrich.get('ghostarchive')}")
|
||||
return True
|
||||
|
||||
# optionally check for existing archive first
|
||||
archive_url = None
|
||||
if self.check_existing:
|
||||
logger.debug(f"Searching Ghost Archive for existing archive of {url}")
|
||||
archive_url = self._search_existing(url)
|
||||
|
||||
if not archive_url:
|
||||
logger.debug(f"Submitting {url} to Ghost Archive")
|
||||
archive_url = self._submit_url(url)
|
||||
|
||||
if archive_url:
|
||||
to_enrich.set("ghostarchive", archive_url)
|
||||
return True
|
||||
|
||||
logger.warning(f"Ghost Archive failed to archive {url}")
|
||||
return False
|
||||
@@ -25,6 +25,9 @@ class HashEnricher(Enricher):
|
||||
logger.debug(f"Calculating media hashes with algo={self.algorithm}")
|
||||
|
||||
for i, m in enumerate(to_enrich.media):
|
||||
if not m.filename:
|
||||
logger.warning(f"Skipping hash for media without filename: {m}")
|
||||
continue
|
||||
if len(hd := self.calculate_hash(m.filename)):
|
||||
to_enrich.media[i].set("hash", f"{self.algorithm}:{hd}")
|
||||
|
||||
|
||||
@@ -99,7 +99,10 @@ class InstagramAPIExtractor(Extractor):
|
||||
result.set_title(user.get("full_name", username)).set("data", user)
|
||||
if pic_url := user.get("profile_pic_url_hd", user.get("profile_pic_url")):
|
||||
filename = self.download_from_url(pic_url)
|
||||
result.add_media(Media(filename=filename), id="profile_picture")
|
||||
if filename:
|
||||
result.add_media(Media(filename=filename), id="profile_picture")
|
||||
else:
|
||||
logger.warning(f"Failed to download profile picture from {pic_url}")
|
||||
|
||||
count_posts = 0
|
||||
if self.full_profile:
|
||||
@@ -202,7 +205,10 @@ class InstagramAPIExtractor(Extractor):
|
||||
|
||||
if cover_media := h_info.get("cover_media", {}).get("cropped_image_version", {}).get("url"):
|
||||
filename = self.download_from_url(cover_media)
|
||||
result.add_media(Media(filename=filename), id=f"cover_media highlight {id}")
|
||||
if filename:
|
||||
result.add_media(Media(filename=filename), id=f"cover_media highlight {id}")
|
||||
else:
|
||||
logger.warning(f"Failed to download cover media from {cover_media}")
|
||||
|
||||
items = h_info.get("items", [])[::-1] # newest to oldest
|
||||
items = items[: min(max_to_download, len(items))]
|
||||
@@ -345,7 +351,10 @@ class InstagramAPIExtractor(Extractor):
|
||||
image_media = None
|
||||
if image_url := item.get("thumbnail_url"):
|
||||
filename = self.download_from_url(image_url, verbose=False)
|
||||
image_media = Media(filename=filename)
|
||||
if filename:
|
||||
image_media = Media(filename=filename)
|
||||
else:
|
||||
logger.warning(f"Failed to download thumbnail from {image_url}")
|
||||
|
||||
# retrieve video info
|
||||
best_id = item.get("id", item.get("pk"))
|
||||
@@ -357,16 +366,19 @@ class InstagramAPIExtractor(Extractor):
|
||||
|
||||
if video_url := item.get("video_url"):
|
||||
filename = self.download_from_url(video_url, verbose=False)
|
||||
video_media = Media(filename=filename)
|
||||
if taken_at:
|
||||
video_media.set("date", taken_at)
|
||||
if code:
|
||||
video_media.set("url", f"https://www.instagram.com/p/{code}")
|
||||
if caption_text:
|
||||
video_media.set("text", caption_text)
|
||||
video_media.set("preview", [image_media])
|
||||
video_media.set("data", [item])
|
||||
return item, video_media, f"{context or 'video'} {best_id}"
|
||||
if filename:
|
||||
video_media = Media(filename=filename)
|
||||
if taken_at:
|
||||
video_media.set("date", taken_at)
|
||||
if code:
|
||||
video_media.set("url", f"https://www.instagram.com/p/{code}")
|
||||
if caption_text:
|
||||
video_media.set("text", caption_text)
|
||||
video_media.set("preview", [image_media])
|
||||
video_media.set("data", [item])
|
||||
return item, video_media, f"{context or 'video'} {best_id}"
|
||||
else:
|
||||
logger.warning(f"Failed to download video from {video_url}")
|
||||
elif image_media:
|
||||
if taken_at:
|
||||
image_media.set("date", taken_at)
|
||||
|
||||
@@ -25,6 +25,9 @@ class MetaEnricher(Enricher):
|
||||
logger.debug(f"Calculating archive file sizes for {len(to_enrich.media)} media files")
|
||||
total_size = 0
|
||||
for media in to_enrich.get_all_media():
|
||||
if not media.filename:
|
||||
logger.warning(f"Skipping file size for media without filename: {media}")
|
||||
continue
|
||||
file_stats = os.stat(media.filename)
|
||||
media.set("bytes", file_stats.st_size)
|
||||
media.set("size", self.human_readable_bytes(file_stats.st_size))
|
||||
|
||||
@@ -49,10 +49,18 @@ class TelegramExtractor(Extractor):
|
||||
if not len(image_urls):
|
||||
return False
|
||||
for img_url in image_urls:
|
||||
result.add_media(Media(self.download_from_url(img_url)))
|
||||
filename = self.download_from_url(img_url)
|
||||
if not filename:
|
||||
logger.warning(f"Failed to download image from {img_url}")
|
||||
continue
|
||||
result.add_media(Media(filename))
|
||||
else:
|
||||
video_url = video.get("src")
|
||||
m_video = Media(self.download_from_url(video_url))
|
||||
video_filename = self.download_from_url(video_url)
|
||||
if not video_filename:
|
||||
logger.warning(f"Failed to download video from {video_url}")
|
||||
return False
|
||||
m_video = Media(video_filename)
|
||||
# extract duration from HTML
|
||||
try:
|
||||
duration = s.find_all("time")[0].contents[0]
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
import os
|
||||
import shutil
|
||||
import re
|
||||
@@ -53,6 +54,16 @@ class TelethonExtractor(Extractor):
|
||||
logger.debug(f"Making a copy of the session file {base_session_filepath} to {self.session_file}.session")
|
||||
shutil.copy(base_session_filepath, f"{self.session_file}.session")
|
||||
|
||||
# ensure a running event loop exists (Needed when used by Celery workers which may close the default one)
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_closed():
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
except RuntimeError:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
# initiate the client
|
||||
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
|
||||
|
||||
@@ -190,6 +201,9 @@ class TelethonExtractor(Extractor):
|
||||
)
|
||||
for i, om_url in enumerate(other_media_urls):
|
||||
filename = self.download_from_url(om_url, f"{chat}_{group_id}_{i}")
|
||||
if not filename:
|
||||
logger.warning(f"Failed to download media from {om_url}")
|
||||
continue
|
||||
result.add_media(Media(filename=filename), id=f"{group_id}_{i}")
|
||||
|
||||
filename_dest = os.path.join(self.tmp_dir, f"{chat}_{group_id}", str(mp.id))
|
||||
|
||||
@@ -114,6 +114,9 @@ class TwitterApiExtractor(Extractor):
|
||||
logger.info(f"Found media {media}")
|
||||
ext = mimetypes.guess_extension(mimetype)
|
||||
media.filename = self.download_from_url(media.get("src"), f"{slugify(url)}_{i}{ext}")
|
||||
if not media.filename:
|
||||
logger.warning(f"Failed to download media from {media.get('src')}")
|
||||
continue
|
||||
result.add_media(media)
|
||||
|
||||
result.set_content(
|
||||
|
||||
@@ -64,7 +64,6 @@ class DeletionIndicators:
|
||||
# YouTube deletion indicators
|
||||
YOUTUBE = [
|
||||
"This video isn't available anymore",
|
||||
"Video unavailable",
|
||||
"This video has been removed",
|
||||
"This video is no longer available",
|
||||
"This video is private",
|
||||
|
||||
@@ -120,6 +120,9 @@ def ydl_entry_to_filename(ydl, entry: dict) -> str:
|
||||
directory = os.path.dirname(base_filename) # '/get/path/to'
|
||||
basename = os.path.basename(base_filename) # 'file'
|
||||
for f in os.listdir(directory):
|
||||
# skip incomplete downloads left behind by yt-dlp
|
||||
if f.endswith(".part"):
|
||||
continue
|
||||
if (
|
||||
f.startswith(basename)
|
||||
or (entry_url and os.path.splitext(f)[0] in entry_url)
|
||||
|
||||
277
tests/enrichers/test_ghostarchive_enricher.py
Normal file
277
tests/enrichers/test_ghostarchive_enricher.py
Normal file
@@ -0,0 +1,277 @@
|
||||
import pytest
|
||||
import requests
|
||||
import os
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher import GhostarchiveEnricher
|
||||
|
||||
CI = os.getenv("GITHUB_ACTIONS", "") == "true"
|
||||
|
||||
# sample HTML responses for mocking
|
||||
SEARCH_HTML_FOUND = """
|
||||
<html><body>
|
||||
<h1>Archives for https://example.com</h1>
|
||||
<table>
|
||||
<tr><td><a href="http://ghostarchive.org/archive/Abc12">https://example.com</a></td></tr>
|
||||
</table>
|
||||
</body></html>
|
||||
"""
|
||||
|
||||
SEARCH_HTML_NOT_FOUND = """
|
||||
<html><body>
|
||||
<h1>Archives for https://example.com</h1>
|
||||
<p>Page 0 out of 0</p>
|
||||
<p>No archives for that site.</p>
|
||||
</body></html>
|
||||
"""
|
||||
|
||||
SAVE_RESPONSE_HTML_WITH_LINK = """
|
||||
<html><body>
|
||||
<h1>Archive saved</h1>
|
||||
<a href="/archive/Xyz99">View archive</a>
|
||||
</body></html>
|
||||
"""
|
||||
|
||||
ENRICHER_CONFIG = {
|
||||
"timeout": 120,
|
||||
"check_existing": True,
|
||||
"proxy_http": None,
|
||||
"proxy_https": None,
|
||||
}
|
||||
|
||||
|
||||
class TestGhostarchiveEnricher:
|
||||
"""Tests for Ghost Archive Enricher"""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_enricher(self, setup_module):
|
||||
self.enricher: GhostarchiveEnricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
|
||||
|
||||
def test_search_existing_found(self, mocker):
|
||||
"""When an existing archive is found, it should be returned."""
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.text = SEARCH_HTML_FOUND
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
|
||||
)
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
assert result == "https://ghostarchive.org/archive/Abc12"
|
||||
|
||||
def test_search_existing_not_found(self, mocker):
|
||||
"""When no existing archive is found, None should be returned."""
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.text = SEARCH_HTML_NOT_FOUND
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
|
||||
)
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
assert result is None
|
||||
|
||||
def test_search_existing_request_error(self, mocker):
|
||||
"""When search request fails, None should be returned."""
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get",
|
||||
side_effect=requests.exceptions.ConnectionError("connection failed"),
|
||||
)
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
assert result is None
|
||||
|
||||
def test_search_existing_non_200(self, mocker):
|
||||
"""When search returns non-200, None should be returned."""
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.status_code = 503
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get", return_value=mock_response
|
||||
)
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
assert result is None
|
||||
|
||||
def test_submit_url_success_redirect(self, mocker):
|
||||
"""Successful submission via headless browser should return archive URL."""
|
||||
mock_sb = MagicMock()
|
||||
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive/NewId1"
|
||||
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
|
||||
mock_sb.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
|
||||
|
||||
result = self.enricher._submit_url("https://example.com")
|
||||
assert result == "https://ghostarchive.org/archive/NewId1"
|
||||
mock_sb.type.assert_called_once()
|
||||
mock_sb.click.assert_called_once()
|
||||
|
||||
def test_submit_url_success_redirect_strips_query(self, mocker):
|
||||
"""Redirect URL query params should be stripped."""
|
||||
mock_sb = MagicMock()
|
||||
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive/NewId1?wr=false"
|
||||
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
|
||||
mock_sb.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
|
||||
|
||||
result = self.enricher._submit_url("https://example.com")
|
||||
assert result == "https://ghostarchive.org/archive/NewId1"
|
||||
|
||||
def test_submit_url_success_html_fallback(self, mocker):
|
||||
"""When browser doesn't redirect, should parse page source for archive link."""
|
||||
mock_sb = MagicMock()
|
||||
mock_sb.get_current_url.return_value = "https://ghostarchive.org/archive2"
|
||||
mock_sb.get_page_source.return_value = SAVE_RESPONSE_HTML_WITH_LINK
|
||||
mock_sb.__enter__ = MagicMock(return_value=mock_sb)
|
||||
mock_sb.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
# make timeout=0 so the polling loop exits immediately and falls through to HTML parsing
|
||||
self.enricher.timeout = 0
|
||||
mocker.patch("auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB", return_value=mock_sb)
|
||||
|
||||
result = self.enricher._submit_url("https://example.com")
|
||||
assert result == "https://ghostarchive.org/archive/Xyz99"
|
||||
|
||||
def test_submit_url_browser_error(self, mocker):
|
||||
"""Browser error during submission should return None."""
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.SB",
|
||||
side_effect=Exception("browser failed to start"),
|
||||
)
|
||||
|
||||
result = self.enricher._submit_url("https://example.com")
|
||||
assert result is None
|
||||
|
||||
def test_proxy_configuration(self, mocker):
|
||||
"""Proxies should be passed to search requests when configured."""
|
||||
self.enricher.proxy_http = "http://proxy:8080"
|
||||
self.enricher.proxy_https = "https://proxy:8443"
|
||||
|
||||
mock_get = mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.requests.get",
|
||||
)
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.text = SEARCH_HTML_FOUND
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
result = self.enricher._search_existing("https://example.com")
|
||||
|
||||
call_kwargs = mock_get.call_args
|
||||
assert call_kwargs.kwargs.get("proxies") == {"http": "http://proxy:8080", "https": "https://proxy:8443"}
|
||||
assert result is not None
|
||||
|
||||
def test_parse_archive_url_with_replay_links(self):
|
||||
"""Parser should ignore /replay/ links and only return /archive/ links."""
|
||||
html = """
|
||||
<html><body>
|
||||
<a href="/archive/replay/w/id-abc/mp_/https://example.com">replay</a>
|
||||
<a href="/archive/Valid1">valid</a>
|
||||
</body></html>
|
||||
"""
|
||||
result = self.enricher._parse_archive_url(html)
|
||||
assert result == "https://ghostarchive.org/archive/Valid1"
|
||||
|
||||
def test_parse_archive_url_no_links(self):
|
||||
"""Parser should return None when no archive links found."""
|
||||
html = "<html><body><p>No archive here</p></body></html>"
|
||||
result = self.enricher._parse_archive_url(html)
|
||||
assert result is None
|
||||
|
||||
def test_enrich_sets_ghostarchive_on_metadata(self, mocker, make_item):
|
||||
"""enrich() should set 'ghostarchive' key on the metadata object."""
|
||||
mocker.patch.object(self.enricher, "_search_existing", return_value="https://ghostarchive.org/archive/Enr1")
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/Enr1"
|
||||
|
||||
def test_enrich_skips_if_already_enriched(self, mocker, make_item):
|
||||
"""enrich() should skip if ghostarchive key is already set."""
|
||||
mock_search = mocker.patch.object(self.enricher, "_search_existing")
|
||||
|
||||
item = make_item("https://example.com", ghostarchive="https://ghostarchive.org/archive/Old1")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
mock_search.assert_not_called()
|
||||
|
||||
def test_enrich_returns_false_on_failure(self, mocker, make_item):
|
||||
"""enrich() should return False when both search and submit fail."""
|
||||
mocker.patch.object(self.enricher, "_search_existing", return_value=None)
|
||||
mocker.patch.object(self.enricher, "_submit_url", return_value=None)
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is False
|
||||
|
||||
def test_enrich_skips_auth_wall(self, mocker, make_item):
|
||||
"""enrich() should skip URLs behind auth walls."""
|
||||
mocker.patch(
|
||||
"auto_archiver.modules.ghostarchive_enricher.ghostarchive_enricher.UrlUtil.is_auth_wall", return_value=True
|
||||
)
|
||||
|
||||
item = make_item("https://example.com/login")
|
||||
result = self.enricher.enrich(item)
|
||||
assert result is False
|
||||
|
||||
def test_enrich_with_existing_archive(self, mocker, make_item):
|
||||
"""enrich() should use existing archive when check_existing is True."""
|
||||
mocker.patch.object(self.enricher, "_search_existing", return_value="https://ghostarchive.org/archive/Exist1")
|
||||
mock_submit = mocker.patch.object(self.enricher, "_submit_url")
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/Exist1"
|
||||
mock_submit.assert_not_called()
|
||||
|
||||
def test_enrich_submits_when_no_existing(self, mocker, make_item):
|
||||
"""enrich() should submit URL when no existing archive found."""
|
||||
mocker.patch.object(self.enricher, "_search_existing", return_value=None)
|
||||
mocker.patch.object(self.enricher, "_submit_url", return_value="https://ghostarchive.org/archive/New42")
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
assert item.get("ghostarchive") == "https://ghostarchive.org/archive/New42"
|
||||
|
||||
def test_enrich_skips_check_existing_when_disabled(self, mocker, make_item):
|
||||
"""enrich() should skip search when check_existing is False."""
|
||||
self.enricher.check_existing = False
|
||||
mock_search = mocker.patch.object(self.enricher, "_search_existing")
|
||||
mocker.patch.object(self.enricher, "_submit_url", return_value="https://ghostarchive.org/archive/Direct1")
|
||||
|
||||
item = make_item("https://example.com")
|
||||
result = self.enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
mock_search.assert_not_called()
|
||||
|
||||
@pytest.mark.download
|
||||
def test_real_search_existing(self, setup_module):
|
||||
"""Integration test: search for an existing archive on Ghost Archive."""
|
||||
enricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
|
||||
# example.com is commonly archived
|
||||
result = enricher._search_existing("https://example.com")
|
||||
# we just check it doesn't crash; result may or may not be found
|
||||
assert result is None or result.startswith("https://ghostarchive.org/archive/")
|
||||
|
||||
@pytest.mark.download
|
||||
@pytest.mark.skipif(CI, reason="Avoid submitting a real task on every CI run")
|
||||
def test_real_submit_example_com(self, setup_module, make_item):
|
||||
"""Integration test: submit example.com to Ghost Archive and verify enrichment."""
|
||||
enricher = setup_module("ghostarchive_enricher", ENRICHER_CONFIG)
|
||||
item = make_item("https://example.com")
|
||||
result = enricher.enrich(item)
|
||||
|
||||
assert result is True
|
||||
archive_url = item.get("ghostarchive")
|
||||
assert archive_url is not None
|
||||
assert archive_url.startswith("https://ghostarchive.org/archive/")
|
||||
@@ -53,6 +53,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
|
||||
}
|
||||
|
||||
@pytest.mark.download
|
||||
@pytest.mark.flaky(reruns=2, reruns_delay=5)
|
||||
@pytest.mark.parametrize(
|
||||
"url,in_title,in_text,image_count,video_count,skip_ci",
|
||||
[
|
||||
@@ -128,6 +129,7 @@ class TestAntibotExtractorEnricher(TestExtractorBase):
|
||||
item = make_item(url)
|
||||
result = self.extractor.download(item)
|
||||
|
||||
assert result, f"download() returned {result!r} — Selenium may have failed (e.g., window close timeout)"
|
||||
assert result.status == "antibot", "Expected status to be 'antibot'"
|
||||
|
||||
# Check title contains all required words (case-insensitive)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
import os
|
||||
from datetime import date
|
||||
|
||||
@@ -60,3 +61,53 @@ def test_valid_url_regex(url, expected, get_lazy_module):
|
||||
def test_invite_pattern_regex(invite, expected, get_lazy_module):
|
||||
match = TelethonExtractor.invite_pattern.search(invite)
|
||||
assert bool(match) == expected
|
||||
|
||||
|
||||
def test_setup_with_closed_event_loop(get_lazy_module, tmp_path, mocker):
|
||||
"""
|
||||
Simulate the Celery worker scenario where the asyncio event loop is closed
|
||||
before setup() runs. The fix should create a new event loop so that
|
||||
TelegramClient.start() does not raise 'Event loop is closed'.
|
||||
"""
|
||||
# create a session file so setup doesn't fail on missing file
|
||||
session_file = tmp_path / "test.session"
|
||||
session_file.touch()
|
||||
|
||||
# close the current event loop to simulate a Celery worker environment
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.close()
|
||||
|
||||
lazy_module = get_lazy_module("telethon_extractor")
|
||||
module = lazy_module.load(
|
||||
{"telethon_extractor": {"session_file": str(session_file), "api_id": 123, "api_hash": "ABC"}}
|
||||
)
|
||||
|
||||
# setup should have succeeded and a new open event loop should exist
|
||||
new_loop = asyncio.get_event_loop()
|
||||
assert not new_loop.is_closed()
|
||||
assert module.client is not None
|
||||
|
||||
|
||||
def test_setup_with_no_event_loop(get_lazy_module, tmp_path, mocker):
|
||||
"""
|
||||
Simulate the scenario where there is no current event loop at all
|
||||
(e.g. running in a non-main thread). The fix should create one.
|
||||
"""
|
||||
session_file = tmp_path / "test.session"
|
||||
session_file.touch()
|
||||
|
||||
# Remove the current event loop entirely
|
||||
# In Python 3.12+, get_event_loop() in a non-main thread raises RuntimeError
|
||||
mocker.patch("asyncio.get_event_loop", side_effect=RuntimeError("no current event loop"))
|
||||
new_loop_mock = mocker.MagicMock()
|
||||
new_loop_mock.is_closed.return_value = False
|
||||
mocker.patch("asyncio.new_event_loop", return_value=new_loop_mock)
|
||||
set_loop = mocker.patch("asyncio.set_event_loop")
|
||||
|
||||
lazy_module = get_lazy_module("telethon_extractor")
|
||||
lazy_module.load({"telethon_extractor": {"session_file": str(session_file), "api_id": 123, "api_hash": "ABC"}})
|
||||
|
||||
# a new event loop should have been created and set
|
||||
asyncio.new_event_loop.assert_called_once()
|
||||
set_loop.assert_called_once_with(new_loop_mock)
|
||||
|
||||
@@ -86,6 +86,22 @@ def test_media_management(basic_metadata, media_file):
|
||||
assert basic_metadata.get_media_by_id("m1") == media1
|
||||
|
||||
|
||||
def test_remove_duplicate_skips_missing_files(basic_metadata, media_file, tmp_path):
|
||||
"""Missing files should be dropped instead of crashing with FileNotFoundError."""
|
||||
real_file = tmp_path / "exists.txt"
|
||||
real_file.write_text("content")
|
||||
valid = media_file(filename=str(real_file), hash_value="abc")
|
||||
missing = media_file(filename="/nonexistent/path/gone.mp4")
|
||||
|
||||
basic_metadata.add_media(valid, "valid")
|
||||
basic_metadata.add_media(missing, "missing")
|
||||
|
||||
assert len(basic_metadata.media) == 2
|
||||
basic_metadata.remove_duplicate_media_by_hash()
|
||||
assert len(basic_metadata.media) == 1
|
||||
assert basic_metadata.get_media_by_id("valid") == valid
|
||||
|
||||
|
||||
def test_success():
|
||||
m = Metadata()
|
||||
assert not m.is_success()
|
||||
|
||||
259
tests/test_none_filename_handling.py
Normal file
259
tests/test_none_filename_handling.py
Normal file
@@ -0,0 +1,259 @@
|
||||
"""
|
||||
Tests for handling Media objects with None filename.
|
||||
|
||||
When download_from_url fails, it returns None. Various enrichers and
|
||||
the metadata deduplication logic must gracefully handle Media objects
|
||||
where filename is None, rather than crashing with TypeError.
|
||||
"""
|
||||
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core.metadata import Metadata, Media
|
||||
from auto_archiver.modules.hash_enricher import HashEnricher
|
||||
from auto_archiver.modules.meta_enricher import MetaEnricher
|
||||
|
||||
|
||||
# ── HashEnricher ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestHashEnricherNoneFilename:
|
||||
"""hash_enricher should skip media with None filename without crashing."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self, setup_module):
|
||||
self.enricher = setup_module(HashEnricher, {"algorithm": "SHA-256", "chunksize": 100})
|
||||
|
||||
def test_skips_none_filename(self):
|
||||
m = Metadata().set_url("https://example.com")
|
||||
media = Media(filename=None)
|
||||
media.set("src", "https://example.com/video.mp4")
|
||||
m.add_media(media)
|
||||
|
||||
# Should not raise
|
||||
self.enricher.enrich(m)
|
||||
# No hash should be set
|
||||
assert m.media[0].get("hash") is None
|
||||
|
||||
def test_hashes_valid_skips_none(self, tmp_path):
|
||||
"""Mix of valid and None-filename media: only valid ones get hashed."""
|
||||
valid_file = tmp_path / "test.txt"
|
||||
valid_file.write_text("hello world")
|
||||
|
||||
m = Metadata().set_url("https://example.com")
|
||||
m.add_media(Media(filename=str(valid_file)))
|
||||
m.add_media(Media(filename=None))
|
||||
|
||||
self.enricher.enrich(m)
|
||||
|
||||
assert m.media[0].get("hash") is not None
|
||||
assert m.media[1].get("hash") is None
|
||||
|
||||
def test_all_none_filenames(self):
|
||||
"""All media have None filename – enricher should not crash."""
|
||||
m = Metadata().set_url("https://example.com")
|
||||
m.add_media(Media(filename=None))
|
||||
m.add_media(Media(filename=None))
|
||||
|
||||
self.enricher.enrich(m)
|
||||
|
||||
assert len(m.media) == 2
|
||||
for media in m.media:
|
||||
assert media.get("hash") is None
|
||||
|
||||
|
||||
# ── MetaEnricher ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestMetaEnricherNoneFilename:
|
||||
"""meta_enricher should skip media with None filename without crashing."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self, setup_module):
|
||||
self.enricher = setup_module(MetaEnricher, {})
|
||||
|
||||
def test_skips_none_filename(self):
|
||||
m = Metadata().set_url("https://example.com")
|
||||
m.set("_processed_at", datetime.now(timezone.utc))
|
||||
media = Media(filename=None)
|
||||
media.set("src", "https://example.com/video.mp4")
|
||||
m.add_media(media)
|
||||
|
||||
# Should not raise
|
||||
self.enricher.enrich(m)
|
||||
assert m.get("total_bytes") == 0
|
||||
|
||||
def test_sizes_valid_skips_none(self, tmp_path):
|
||||
"""Mix of valid and None-filename media: only valid ones get sized."""
|
||||
valid_file = tmp_path / "test.txt"
|
||||
valid_file.write_text("A" * 500)
|
||||
|
||||
m = Metadata().set_url("https://example.com")
|
||||
m.set("_processed_at", datetime.now(timezone.utc))
|
||||
m.add_media(Media(filename=str(valid_file)))
|
||||
m.add_media(Media(filename=None))
|
||||
|
||||
self.enricher.enrich(m)
|
||||
|
||||
assert m.media[0].get("bytes") == 500
|
||||
assert m.media[1].get("bytes") is None
|
||||
assert m.get("total_bytes") == 500
|
||||
|
||||
|
||||
# ── Metadata.remove_duplicate_media_by_hash ───────────────────────────
|
||||
|
||||
|
||||
class TestRemoveDuplicateMediaNoneFilename:
|
||||
"""remove_duplicate_media_by_hash should keep media with None filename."""
|
||||
|
||||
def test_none_filename_kept(self):
|
||||
m = Metadata().set_url("https://example.com")
|
||||
none_media = Media(filename=None)
|
||||
none_media.set("src", "https://example.com/video.mp4")
|
||||
m.add_media(none_media)
|
||||
|
||||
m.remove_duplicate_media_by_hash()
|
||||
|
||||
assert len(m.media) == 1
|
||||
assert m.media[0].filename is None
|
||||
|
||||
def test_none_and_valid_mixed(self, tmp_path):
|
||||
"""None-filename media is kept alongside valid-filename media."""
|
||||
valid_file = tmp_path / "test.txt"
|
||||
valid_file.write_text("content")
|
||||
|
||||
m = Metadata().set_url("https://example.com")
|
||||
m.add_media(Media(filename=str(valid_file)))
|
||||
none_media = Media(filename=None)
|
||||
none_media.set("src", "https://example.com/video.mp4")
|
||||
m.add_media(none_media)
|
||||
|
||||
m.remove_duplicate_media_by_hash()
|
||||
|
||||
assert len(m.media) == 2
|
||||
|
||||
def test_multiple_none_filename_all_kept(self):
|
||||
"""Multiple None-filename media are all kept (can't deduplicate without file)."""
|
||||
m = Metadata().set_url("https://example.com")
|
||||
m.add_media(Media(filename=None))
|
||||
m.add_media(Media(filename=None))
|
||||
|
||||
m.remove_duplicate_media_by_hash()
|
||||
|
||||
assert len(m.media) == 2
|
||||
|
||||
|
||||
# ── Twitter dropin create_metadata ────────────────────────────────────
|
||||
|
||||
|
||||
class TestTwitterDropinNoneFilename:
|
||||
"""Twitter dropin should skip media when download_from_url returns None."""
|
||||
|
||||
@pytest.fixture
|
||||
def twitter_dropin(self):
|
||||
from auto_archiver.modules.generic_extractor.twitter import Twitter
|
||||
|
||||
return Twitter()
|
||||
|
||||
def test_create_metadata_skips_failed_photo_download(self, twitter_dropin):
|
||||
"""When download_from_url returns None for a photo, it's not added to media."""
|
||||
tweet = {
|
||||
"user": {"name": "Test User"},
|
||||
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
|
||||
"full_text": "Test tweet with photo",
|
||||
"entities": {
|
||||
"media": [
|
||||
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test.jpg"},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
mock_archiver = MagicMock()
|
||||
mock_archiver.download_from_url.return_value = None # simulate failed download
|
||||
|
||||
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
|
||||
|
||||
# The result should have no media since the download failed
|
||||
assert len(result.media) == 0
|
||||
|
||||
def test_create_metadata_skips_failed_video_download(self, twitter_dropin):
|
||||
"""When download_from_url returns None for a video, it's not added to media."""
|
||||
tweet = {
|
||||
"user": {"name": "Test User"},
|
||||
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
|
||||
"full_text": "Test tweet with video",
|
||||
"entities": {
|
||||
"media": [
|
||||
{
|
||||
"type": "video",
|
||||
"video_info": {
|
||||
"variants": [
|
||||
{
|
||||
"url": "https://video.twimg.com/vid/1280x720/test.mp4",
|
||||
"content_type": "video/mp4",
|
||||
},
|
||||
]
|
||||
},
|
||||
},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
mock_archiver = MagicMock()
|
||||
mock_archiver.download_from_url.return_value = None
|
||||
|
||||
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
|
||||
|
||||
assert len(result.media) == 0
|
||||
|
||||
def test_create_metadata_keeps_successful_download(self, twitter_dropin, tmp_path):
|
||||
"""When download_from_url succeeds, media is added."""
|
||||
tweet = {
|
||||
"user": {"name": "Test User"},
|
||||
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
|
||||
"full_text": "Test tweet with photo",
|
||||
"entities": {
|
||||
"media": [
|
||||
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test.jpg"},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
test_file = tmp_path / "test.jpg"
|
||||
test_file.write_text("fake image data")
|
||||
|
||||
mock_archiver = MagicMock()
|
||||
mock_archiver.download_from_url.return_value = str(test_file)
|
||||
|
||||
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
|
||||
|
||||
assert len(result.media) == 1
|
||||
assert result.media[0].filename == str(test_file)
|
||||
|
||||
def test_create_metadata_mixed_downloads(self, twitter_dropin, tmp_path):
|
||||
"""One download succeeds, one fails – only successful one is kept."""
|
||||
tweet = {
|
||||
"user": {"name": "Test User"},
|
||||
"created_at": "Sun Feb 08 18:45:00 +0000 2026",
|
||||
"full_text": "Test tweet with two photos",
|
||||
"entities": {
|
||||
"media": [
|
||||
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test1.jpg"},
|
||||
{"type": "photo", "media_url_https": "https://pbs.twimg.com/media/test2.jpg"},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
test_file = tmp_path / "test1.jpg"
|
||||
test_file.write_text("fake image data")
|
||||
|
||||
mock_archiver = MagicMock()
|
||||
# First call succeeds, second fails
|
||||
mock_archiver.download_from_url.side_effect = [str(test_file), None]
|
||||
|
||||
result = twitter_dropin.create_metadata(tweet, None, mock_archiver, "https://x.com/test/status/123")
|
||||
|
||||
assert len(result.media) == 1
|
||||
assert result.media[0].filename == str(test_file)
|
||||
@@ -1,5 +1,6 @@
|
||||
import pytest
|
||||
from argparse import ArgumentParser, ArgumentTypeError
|
||||
from requests.exceptions import SSLError
|
||||
from auto_archiver.core.orchestrator import ArchivingOrchestrator
|
||||
from auto_archiver.version import __version__
|
||||
from auto_archiver.core.config import read_yaml, store_yaml
|
||||
@@ -256,3 +257,34 @@ def test_load_failed_extractor_cleanup(test_args, mocker, caplog):
|
||||
assert "Error during setup of modules: Test exception" in caplog.text
|
||||
# make sure the 'cleanup' is called
|
||||
assert "cleanup" in caplog.text
|
||||
|
||||
|
||||
def test_check_for_updates_ssl_error(orchestrator, mocker):
|
||||
"""check_for_updates should not raise when the HTTP request fails."""
|
||||
mocker.patch(
|
||||
"auto_archiver.core.orchestrator.requests.get",
|
||||
side_effect=SSLError("SSL handshake failed"),
|
||||
)
|
||||
# should not raise
|
||||
orchestrator.check_for_updates()
|
||||
|
||||
|
||||
def test_check_for_updates_timeout(orchestrator, mocker):
|
||||
"""check_for_updates should not raise on connection timeout."""
|
||||
from requests.exceptions import ConnectionError
|
||||
|
||||
mocker.patch(
|
||||
"auto_archiver.core.orchestrator.requests.get",
|
||||
side_effect=ConnectionError("Connection refused"),
|
||||
)
|
||||
orchestrator.check_for_updates()
|
||||
|
||||
|
||||
def test_check_for_updates_new_version_available(orchestrator, mocker):
|
||||
"""check_for_updates should not raise when a newer version exists."""
|
||||
mocker.patch(
|
||||
"auto_archiver.core.orchestrator.requests.get",
|
||||
return_value=mocker.Mock(json=lambda: {"info": {"version": "99.0.0"}}),
|
||||
)
|
||||
# should complete without error
|
||||
orchestrator.check_for_updates()
|
||||
|
||||
@@ -14,6 +14,7 @@ from auto_archiver.utils.misc import (
|
||||
calculate_file_hash,
|
||||
random_str,
|
||||
get_timestamp,
|
||||
ydl_entry_to_filename,
|
||||
)
|
||||
|
||||
|
||||
@@ -139,3 +140,47 @@ class TestMiscUtils:
|
||||
|
||||
def test_invalid_timestamp_returns_none(self):
|
||||
assert get_timestamp("invalid-date") is None
|
||||
|
||||
|
||||
class TestYdlEntryToFilename:
|
||||
"""Tests for ydl_entry_to_filename, especially .part file filtering."""
|
||||
|
||||
def _make_mock_ydl(self, prepared_filename):
|
||||
class MockYDL:
|
||||
def prepare_filename(self, entry):
|
||||
return prepared_filename
|
||||
|
||||
return MockYDL()
|
||||
|
||||
def test_returns_exact_file_if_exists(self, tmp_path):
|
||||
video = tmp_path / "video.mp4"
|
||||
video.write_bytes(b"data")
|
||||
ydl = self._make_mock_ydl(str(video))
|
||||
assert ydl_entry_to_filename(ydl, {}) == str(video)
|
||||
|
||||
def test_skips_part_file_returns_complete(self, tmp_path):
|
||||
"""Simulates yt-dlp leaving a .part file from a failed format
|
||||
while a complete .webm exists."""
|
||||
(tmp_path / "f5U3IKfoSYs.f399.mp4.part").write_bytes(b"incomplete")
|
||||
webm = tmp_path / "f5U3IKfoSYs.webm"
|
||||
webm.write_bytes(b"complete video")
|
||||
|
||||
# ydl.prepare_filename returns the expected .mp4 which doesn't exist
|
||||
ydl = self._make_mock_ydl(str(tmp_path / "f5U3IKfoSYs.mp4"))
|
||||
result = ydl_entry_to_filename(ydl, {})
|
||||
|
||||
assert result == str(webm)
|
||||
assert not result.endswith(".part")
|
||||
|
||||
def test_skips_part_file_returns_false_if_no_other_match(self, tmp_path):
|
||||
"""Only a .part file exists — should return False."""
|
||||
(tmp_path / "video.f399.mp4.part").write_bytes(b"incomplete")
|
||||
|
||||
ydl = self._make_mock_ydl(str(tmp_path / "video.mp4"))
|
||||
assert ydl_entry_to_filename(ydl, {}) is False
|
||||
|
||||
def test_returns_false_when_no_files_match(self, tmp_path):
|
||||
(tmp_path / "unrelated.txt").write_bytes(b"data")
|
||||
|
||||
ydl = self._make_mock_ydl(str(tmp_path / "video.mp4"))
|
||||
assert ydl_entry_to_filename(ydl, {}) is False
|
||||
|
||||
Reference in New Issue
Block a user