Merge main into timestamping_enricher

This commit is contained in:
Patrick Robertson
2025-03-24 15:09:29 +04:00
219 changed files with 11049 additions and 2933 deletions

View File

@@ -4,34 +4,50 @@ from auto_archiver.modules.hash_enricher import HashEnricher
from auto_archiver.core import Metadata, Media
from auto_archiver.core.module import ModuleFactory
@pytest.mark.parametrize("algorithm, filename, expected_hash", [
("SHA-256", "tests/data/testfile_1.txt", "1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014"),
("SHA-256", "tests/data/testfile_2.txt", "60303ae22b998861bce3b28f33eec1be758a213c86c93c076dbe9f558c11c752"),
("SHA3-512", "tests/data/testfile_1.txt", "d2d8cc4f369b340130bd2b29b8b54e918b7c260c3279176da9ccaa37c96eb71735fc97568e892dc6220bf4ae0d748edb46bd75622751556393be3f482e6f794e"),
("SHA3-512", "tests/data/testfile_2.txt", "e35970edaa1e0d8af7d948491b2da0450a49fd9cc1e83c5db4c6f175f9550cf341f642f6be8cfb0bfa476e4258e5088c5ad549087bf02811132ac2fa22b734c6")
])
@pytest.mark.parametrize(
"algorithm, filename, expected_hash",
[
("SHA-256", "tests/data/testfile_1.txt", "1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014"),
("SHA-256", "tests/data/testfile_2.txt", "60303ae22b998861bce3b28f33eec1be758a213c86c93c076dbe9f558c11c752"),
(
"SHA3-512",
"tests/data/testfile_1.txt",
"d2d8cc4f369b340130bd2b29b8b54e918b7c260c3279176da9ccaa37c96eb71735fc97568e892dc6220bf4ae0d748edb46bd75622751556393be3f482e6f794e",
),
(
"SHA3-512",
"tests/data/testfile_2.txt",
"e35970edaa1e0d8af7d948491b2da0450a49fd9cc1e83c5db4c6f175f9550cf341f642f6be8cfb0bfa476e4258e5088c5ad549087bf02811132ac2fa22b734c6",
),
],
)
def test_calculate_hash(algorithm, filename, expected_hash, setup_module):
# test SHA-256
he = setup_module(HashEnricher, {"algorithm": algorithm, "chunksize": 100})
assert he.calculate_hash(filename) == expected_hash
def test_default_config_values(setup_module):
he = setup_module(HashEnricher)
assert he.algorithm == "SHA-256"
assert he.chunksize == 16000000
def test_config():
# test default config
c = ModuleFactory().get_module_lazy('hash_enricher').configs
c = ModuleFactory().get_module_lazy("hash_enricher").configs
assert c["algorithm"]["default"] == "SHA-256"
assert c["chunksize"]["default"] == 16000000
assert c["algorithm"]["choices"] == ["SHA-256", "SHA3-512"]
assert c["algorithm"]["help"] == "hash algorithm to use"
assert c["chunksize"]["help"] == "number of bytes to use when reading files in chunks (if this value is too large you will run out of RAM), default is 16MB"
assert (
c["chunksize"]["help"]
== "number of bytes to use when reading files in chunks (if this value is too large you will run out of RAM), default is 16MB"
)
def test_hash_media(setup_module):
he = setup_module(HashEnricher, {"algorithm": "SHA-256", "chunksize": 1})
# generate metadata with two test files
@@ -46,4 +62,4 @@ def test_hash_media(setup_module):
he.enrich(m)
assert m.media[0].get("hash") == "SHA-256:1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014"
assert m.media[1].get("hash") == "SHA-256:60303ae22b998861bce3b28f33eec1be758a213c86c93c076dbe9f558c11c752"
assert m.media[1].get("hash") == "SHA-256:60303ae22b998861bce3b28f33eec1be758a213c86c93c076dbe9f558c11c752"

View File

@@ -1,4 +1,3 @@
import datetime
from datetime import datetime, timedelta, timezone
import pytest
@@ -16,6 +15,7 @@ def mock_metadata(mocker):
mock.get_all_media.return_value = []
return mock
@pytest.fixture
def mock_media(mocker):
"""Creates a mock Media object."""
@@ -59,6 +59,7 @@ def test_enrich_file_sizes(meta_enricher, metadata, tmp_path):
assert metadata.get("total_bytes") == 3000
assert metadata.get("total_size") == "2.9 KB"
@pytest.mark.parametrize(
"size, expected",
[
@@ -74,6 +75,7 @@ def test_human_readable_bytes(size, expected):
enricher = MetaEnricher()
assert enricher.human_readable_bytes(size) == expected
def test_enrich_file_sizes_no_media(meta_enricher, metadata):
"""Test that enrich_file_sizes() handles empty media list gracefully."""
meta_enricher.enrich_file_sizes(metadata)
@@ -91,4 +93,4 @@ def test_enrich_archive_duration(meta_enricher, metadata, mocker):
mock_datetime.now.return_value = mock_now
meta_enricher.enrich_archive_duration(metadata)
assert metadata.get("archive_duration_seconds") == 630
assert metadata.get("archive_duration_seconds") == 630

View File

@@ -1,4 +1,3 @@
import pytest
from auto_archiver.core import Media
@@ -33,9 +32,7 @@ def test_get_metadata(enricher, output, expected, mocker):
result = enricher.get_metadata("test.jpg")
assert result == expected
mock_run.assert_called_once_with(
["exiftool", "test.jpg"], capture_output=True, text=True
)
mock_run.assert_called_once_with(["exiftool", "test.jpg"], capture_output=True, text=True)
def test_get_metadata_exiftool_not_found(enricher, mocker):
@@ -85,4 +82,3 @@ def test_metadata_pickle(enricher, unpickle, mocker):
actual_media = metadata.media
assert len(expected_media) == len(actual_media)
assert actual_media[0].properties.get("metadata") == expected_media[0].properties.get("metadata")

View File

@@ -0,0 +1,276 @@
import pytest
import hashlib
from opentimestamps.core.timestamp import Timestamp, DetachedTimestampFile
from opentimestamps.calendar import RemoteCalendar
from opentimestamps.core.notary import PendingAttestation, BitcoinBlockHeaderAttestation
from auto_archiver.core import Metadata, Media
# TODO: Remove once timestamping overhaul is merged
@pytest.fixture
def sample_media(tmp_path) -> Media:
"""Fixture creating a Media object with temporary source file"""
src_file = tmp_path / "source.txt"
src_file.write_text("test content")
return Media(_key="subdir/test.txt", filename=str(src_file))
@pytest.fixture
def sample_file_path(tmp_path):
tmp_file = tmp_path / "test.txt"
tmp_file.write_text("This is a test file content for OpenTimestamps")
return str(tmp_file)
@pytest.fixture
def detached_timestamp_file():
"""Create a simple detached timestamp file for testing"""
file_hash = hashlib.sha256(b"Test content").digest()
from opentimestamps.core.op import OpSHA256
file_hash_op = OpSHA256()
timestamp = Timestamp(file_hash)
# Add a pending attestation
pending = PendingAttestation("https://example.calendar.com")
timestamp.attestations.add(pending)
# Add a bitcoin attestation
bitcoin = BitcoinBlockHeaderAttestation(783000) # Some block height
timestamp.attestations.add(bitcoin)
return DetachedTimestampFile(file_hash_op, timestamp)
@pytest.fixture
def verified_timestamp_file():
"""Create a timestamp file with a Bitcoin attestation"""
file_hash = hashlib.sha256(b"Verified content").digest()
from opentimestamps.core.op import OpSHA256
file_hash_op = OpSHA256()
timestamp = Timestamp(file_hash)
# Add only a Bitcoin attestation
bitcoin = BitcoinBlockHeaderAttestation(783000) # Some block height
timestamp.attestations.add(bitcoin)
return DetachedTimestampFile(file_hash_op, timestamp)
@pytest.fixture
def pending_timestamp_file():
"""Create a timestamp file with only pending attestations"""
file_hash = hashlib.sha256(b"Pending content").digest()
from opentimestamps.core.op import OpSHA256
file_hash_op = OpSHA256()
timestamp = Timestamp(file_hash)
# Add only pending attestations
pending1 = PendingAttestation("https://example1.calendar.com")
pending2 = PendingAttestation("https://example2.calendar.com")
timestamp.attestations.add(pending1)
timestamp.attestations.add(pending2)
return DetachedTimestampFile(file_hash_op, timestamp)
@pytest.mark.download
def test_download_tsr(setup_module, mocker):
"""Test submitting a hash to calendar servers"""
# Mock the RemoteCalendar submit method
mock_submit = mocker.patch.object(RemoteCalendar, "submit")
test_timestamp = Timestamp(hashlib.sha256(b"test").digest())
mock_submit.return_value = test_timestamp
# Create a calendar
calendar = RemoteCalendar("https://alice.btc.calendar.opentimestamps.org")
# Test submission
file_hash = hashlib.sha256(b"Test file content").digest()
result = calendar.submit(file_hash)
assert mock_submit.called
assert isinstance(result, Timestamp)
assert result == test_timestamp
def test_verify_timestamp(setup_module, detached_timestamp_file):
"""Test the verification of timestamp attestations"""
ots = setup_module("opentimestamps_enricher")
# Test verification
verification_info = ots.verify_timestamp(detached_timestamp_file)
# Check verification results
assert verification_info["attestation_count"] == 2
assert verification_info["verified"] is True
assert len(verification_info["attestations"]) == 2
# Check attestation types
assertion_types = [a["status"] for a in verification_info["attestations"]]
assert "pending" in assertion_types
assert "confirmed" in assertion_types
# Check Bitcoin attestation details
bitcoin_attestation = next(a for a in verification_info["attestations"] if a["status"] == "confirmed")
assert bitcoin_attestation["block_height"] == 783000
def test_verify_pending_only(setup_module, pending_timestamp_file):
"""Test verification of timestamps with only pending attestations"""
ots = setup_module("opentimestamps_enricher")
verification_info = ots.verify_timestamp(pending_timestamp_file)
assert verification_info["attestation_count"] == 2
assert verification_info["verified"] is False
# All attestations should be of type "pending"
assert all(a["status"] == "pending" for a in verification_info["attestations"])
# Check URIs of pending attestations
uris = [a["uri"] for a in verification_info["attestations"]]
assert "https://example1.calendar.com" in uris
assert "https://example2.calendar.com" in uris
def test_verify_bitcoin_completed(setup_module, verified_timestamp_file):
"""Test verification of timestamps with completed Bitcoin attestations"""
ots = setup_module("opentimestamps_enricher")
verification_info = ots.verify_timestamp(verified_timestamp_file)
assert verification_info["attestation_count"] == 1
assert verification_info["verified"] is True
assert "pending" not in verification_info
# Check that the attestation is a Bitcoin attestation
attestation = verification_info["attestations"][0]
assert attestation["status"] == "confirmed"
assert attestation["block_height"] == 783000
def test_full_enriching(setup_module, sample_file_path, sample_media, mocker):
"""Test the complete enrichment process"""
# Mock the calendar submission to avoid network requests
mock_calendar = mocker.patch.object(RemoteCalendar, "submit")
# Create a function that returns a new timestamp for each call
def side_effect(digest):
test_timestamp = Timestamp(digest)
# Add a bitcoin attestation to the test timestamp
bitcoin = BitcoinBlockHeaderAttestation(783000)
test_timestamp.attestations.add(bitcoin)
return test_timestamp
mock_calendar.side_effect = side_effect
ots = setup_module("opentimestamps_enricher")
# Create test metadata with sample file
metadata = Metadata().set_url("https://example.com")
sample_media.filename = sample_file_path
metadata.add_media(sample_media)
# Run enrichment
ots.enrich(metadata)
# Verify results
assert metadata.get("opentimestamped") is True
assert metadata.get("opentimestamps_count") == 1
# Check that we have one parent media item: the original
assert len(metadata.media) == 1
# Check that the original media was updated
assert metadata.media[0].get("opentimestamps") is True
# Check the timestamp file media is a child of the original
assert len(metadata.media[0].get("opentimestamp_files")) == 1
timestamp_media = metadata.media[0].get("opentimestamp_files")[0]
assert timestamp_media.get("opentimestamps_version") is not None
# Check verification results on the timestamp media
assert timestamp_media.get("verified") is True
assert timestamp_media.get("attestation_count") == 1
def test_full_enriching_one_calendar_error(
setup_module, sample_file_path, sample_media, mocker, pending_timestamp_file
):
"""Test enrichment when one calendar server returns an error"""
# Mock the calendar submission to raise an exception
mock_calendar = mocker.patch.object(RemoteCalendar, "submit")
test_timestamp = Timestamp(bytes.fromhex("583988e03646c26fa290c5c2408540a2f4e2aa9be087aa4546aefb531385b935"))
# Add a bitcoin attestation to the test timestamp
bitcoin = BitcoinBlockHeaderAttestation(783000)
test_timestamp.attestations.add(bitcoin)
mock_calendar.side_effect = [test_timestamp, Exception("Calendar server error")]
ots = setup_module(
"opentimestamps_enricher",
{
"calendar_urls": [
"https://alice.btc.calendar.opentimestamps.org",
"https://bob.btc.calendar.opentimestamps.org",
]
},
)
# Create test metadata with sample file
metadata = Metadata().set_url("https://example.com")
sample_media.filename = sample_file_path
metadata.add_media(sample_media)
# Run enrichment (should complete despite calendar errors)
ots.enrich(metadata)
# Verify results
assert metadata.get("opentimestamped") is True
assert metadata.get("opentimestamps_count") == 1 # only alice worked, not bob
def test_full_enriching_calendar_error(setup_module, sample_file_path, sample_media, mocker):
"""Test enrichment when calendar servers return errors"""
# Mock the calendar submission to raise an exception
mock_calendar = mocker.patch.object(RemoteCalendar, "submit")
mock_calendar.side_effect = Exception("Calendar server error")
ots = setup_module("opentimestamps_enricher")
# Create test metadata with sample file
metadata = Metadata().set_url("https://example.com")
sample_media.filename = sample_file_path
metadata.add_media(sample_media)
# Run enrichment (should complete despite calendar errors)
ots.enrich(metadata)
# Verify results
assert metadata.get("opentimestamped") is False
assert metadata.get("opentimestamps_count") is None
def test_no_files_to_stamp(setup_module):
"""Test enrichment with no files to timestamp"""
ots = setup_module("opentimestamps_enricher")
# Create empty metadata
metadata = Metadata().set_url("https://example.com")
# Run enrichment
ots.enrich(metadata)
# Verify no timestamping occurred
assert metadata.get("opentimestamped") is None
assert len(metadata.media) == 0

View File

@@ -14,23 +14,21 @@ def enricher(setup_module):
def metadata_with_images():
m = Metadata()
m.set_url("https://example.com")
m.add_media(Media(filename="image1.jpg", key="image1"))
m.add_media(Media(filename="image2.jpg", key="image2"))
m.add_media(Media(filename="image1.jpg", _key="image1"))
m.add_media(Media(filename="image2.jpg", _key="image2"))
return m
def test_successful_enrich(metadata_with_images, mocker):
with (
mocker.patch("pdqhash.compute", return_value=([1, 0, 1, 0] * 64, 100)),
mocker.patch("PIL.Image.open"),
mocker.patch.object(Media, "is_image", return_value=True) as mock_is_image,
):
enricher = PdqHashEnricher()
enricher.enrich(metadata_with_images)
mocker.patch("pdqhash.compute", return_value=([1, 0, 1, 0] * 64, 100))
mocker.patch("PIL.Image.open")
mocker.patch.object(Media, "is_image", return_value=True)
enricher = PdqHashEnricher()
enricher.enrich(metadata_with_images)
# Ensure the hash is set for image media
for media in metadata_with_images.media:
assert media.get("pdq_hash") is not None
# Ensure the hash is set for image media
for media in metadata_with_images.media:
assert media.get("pdq_hash") is not None
def test_enrich_skip_non_image(metadata_with_images, mocker):
@@ -59,7 +57,7 @@ def test_enrich_handles_corrupted_image(metadata_with_images, mocker):
("screenshot", False),
("warc-file-123", False),
("regular-image", True),
]
],
)
def test_enrich_excludes_by_filetype(media_id, should_have_hash, mocker):
metadata = Metadata()
@@ -75,4 +73,3 @@ def test_enrich_excludes_by_filetype(media_id, should_have_hash, mocker):
media_item = metadata.media[0]
assert (media_item.get("pdq_hash") is not None) == should_have_hash

View File

@@ -15,13 +15,15 @@ def mock_selenium_env(mocker):
mock_which = mocker.patch("shutil.which")
mock_driver_class = mocker.patch("auto_archiver.utils.webdriver.CookieSettingDriver")
mock_binary_paths = mocker.patch("selenium.webdriver.common.selenium_manager.SeleniumManager.binary_paths")
mock_is_file = mocker.patch("pathlib.Path.is_file", return_value=True)
mocker.patch("pathlib.Path.is_file", return_value=True)
mock_popen = mocker.patch("subprocess.Popen")
mock_is_connectable = mocker.patch("selenium.webdriver.common.service.Service.is_connectable", return_value=True)
mocker.patch("selenium.webdriver.common.service.Service.is_connectable", return_value=True)
mock_firefox_options = mocker.patch("selenium.webdriver.FirefoxOptions")
# Define side effect for `shutil.which`
def mock_which_side_effect(dep):
return "/mock/geckodriver" if dep == "geckodriver" else None
mock_which.side_effect = mock_which_side_effect
# Mock binary paths
@@ -83,8 +85,8 @@ def test_enrich_adds_screenshot(
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
screenshot_enricher.enrich(metadata_with_video)
mock_driver_class.assert_called_once_with(
cookies=None,
cookiejar=None,
cookie=None,
cookie_jar=None,
facebook_accept_cookies=False,
options=mock_options_instance,
)
@@ -104,13 +106,7 @@ def test_enrich_adds_screenshot(
],
)
def test_enrich_auth_wall(
screenshot_enricher,
metadata_with_video,
mock_selenium_env,
common_patches,
url,
is_auth,
mocker
screenshot_enricher, metadata_with_video, mock_selenium_env, common_patches, url, is_auth, mocker
):
# Testing with and without is_auth_wall
mock_driver, mock_driver_class, _ = mock_selenium_env
@@ -128,9 +124,39 @@ def test_enrich_auth_wall(
assert metadata_with_video.media[1].properties.get("id") == "screenshot"
def test_handle_timeout_exception(
screenshot_enricher, metadata_with_video, mock_selenium_env, mocker
):
def test_skip_authwall_no_cookies(screenshot_enricher, caplog):
with caplog.at_level("WARNING"):
screenshot_enricher.enrich(Metadata().set_url("https://instagram.com"))
assert "[SKIP] SCREENSHOT since url" in caplog.text
@pytest.mark.parametrize(
"auth",
[
{"cookie": "cookie"},
{"cookies_jar": "cookie"},
],
)
def test_dont_skip_authwall_with_cookies(screenshot_enricher, caplog, mocker, mock_selenium_env, auth):
mocker.patch("auto_archiver.utils.url.is_auth_wall", return_value=True)
# patch the authentication dict:
screenshot_enricher.authentication = {"example.com": auth}
with caplog.at_level("WARNING"):
screenshot_enricher.enrich(Metadata().set_url("https://example.com"))
assert "[SKIP] SCREENSHOT since url" not in caplog.text
def test_show_warning_wrong_auth_type(screenshot_enricher, caplog, mocker, mock_selenium_env):
mock_driver, mock_driver_class, _ = mock_selenium_env
mocker.patch("auto_archiver.utils.url.is_auth_wall", return_value=True)
screenshot_enricher.authentication = {"example.com": {"username": "user", "password": "pass"}}
with caplog.at_level("WARNING"):
screenshot_enricher.enrich(Metadata().set_url("https://example.com"))
assert "Screenshot enricher only supports cookie-type authentication" in caplog.text
def test_handle_timeout_exception(screenshot_enricher, metadata_with_video, mock_selenium_env, mocker):
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
mock_driver.get.side_effect = TimeoutException
@@ -140,9 +166,7 @@ def test_handle_timeout_exception(
assert len(metadata_with_video.media) == 1
def test_handle_general_exception(
screenshot_enricher, metadata_with_video, mock_selenium_env, mocker
):
def test_handle_general_exception(screenshot_enricher, metadata_with_video, mock_selenium_env, mocker):
"""Test proper handling of unexpected general exceptions"""
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
# Simulate a generic exception when save_screenshot is called
@@ -152,9 +176,7 @@ def test_handle_general_exception(
mock_log = mocker.patch("loguru.logger.error")
screenshot_enricher.enrich(metadata_with_video)
# Verify that the exception was logged with the log
mock_log.assert_called_once_with(
"Got error while loading webdriver for screenshot enricher: Unexpected Error"
)
mock_log.assert_called_once_with("Got error while loading webdriver for screenshot enricher: Unexpected Error")
# And no new media was added due to the error
assert len(metadata_with_video.media) == 1
@@ -167,13 +189,12 @@ def test_pdf_creation(mocker, screenshot_enricher, metadata_with_video, mock_sel
# Mock the print_page method to return base64-encoded content
mock_driver.print_page.return_value = base64.b64encode(b"fake_pdf_content").decode("utf-8")
# Patch functions with mocker
mock_os_path_join = mocker.patch("os.path.join", side_effect=lambda *args: f"{args[-1]}")
mock_random_str = mocker.patch(
mocker.patch("os.path.join", side_effect=lambda *args: f"{args[-1]}")
mocker.patch(
"auto_archiver.modules.screenshot_enricher.screenshot_enricher.random_str",
return_value="fixed123",
)
mock_open = mocker.patch("builtins.open", new_callable=mocker.mock_open)
mock_log_error = mocker.patch("loguru.logger.error")
screenshot_enricher.enrich(metadata_with_video)
# Verify screenshot and PDF creation

View File

@@ -51,4 +51,3 @@ def test_ssl_error_handling(enricher, metadata, mocker):
mocker.patch("ssl.get_server_certificate", side_effect=ssl.SSLError("SSL error"))
with pytest.raises(ssl.SSLError, match="SSL error"):
enricher.enrich(metadata)

View File

@@ -25,7 +25,7 @@ def mock_ffmpeg_environment(mocker):
# Mocking all the ffmpeg calls in one place
mock_ffmpeg_input = mocker.patch("ffmpeg.input")
mock_makedirs = mocker.patch("os.makedirs")
mocker.patch.object(Media, "is_video", return_value=True),
(mocker.patch.object(Media, "is_video", return_value=True),)
mock_probe = mocker.patch(
"ffmpeg.probe",
return_value={
@@ -35,9 +35,7 @@ def mock_ffmpeg_environment(mocker):
},
)
mock_output = mocker.MagicMock()
mock_ffmpeg_input.return_value.filter.return_value.output.return_value = (
mock_output
)
mock_ffmpeg_input.return_value.filter.return_value.output.return_value = mock_output
return {
"mock_ffmpeg_input": mock_ffmpeg_input,
@@ -47,14 +45,21 @@ def mock_ffmpeg_environment(mocker):
}
@pytest.mark.parametrize("thumbnails_per_minute, max_thumbnails, expected_count", [
(10, 5, 5), # Capped at max_thumbnails
(1, 10, 2), # Less than max_thumbnails
(60, 7, 7), # Matches exactly
])
@pytest.mark.parametrize(
"thumbnails_per_minute, max_thumbnails, expected_count",
[
(10, 5, 5), # Capped at max_thumbnails
(1, 10, 2), # Less than max_thumbnails
(60, 7, 7), # Matches exactly
],
)
def test_enrich_thumbnail_limits(
thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment,
thumbnails_per_minute, max_thumbnails, expected_count
thumbnail_enricher,
metadata_with_video,
mock_ffmpeg_environment,
thumbnails_per_minute,
max_thumbnails,
expected_count,
):
thumbnail_enricher.thumbnails_per_minute = thumbnails_per_minute
thumbnail_enricher.max_thumbnails = max_thumbnails
@@ -65,8 +70,8 @@ def test_enrich_thumbnail_limits(
thumbnails = metadata_with_video.media[0].get("thumbnails")
assert len(thumbnails) == expected_count
def test_enrich_handles_probe_failure(thumbnail_enricher, metadata_with_video, mocker):
def test_enrich_handles_probe_failure(thumbnail_enricher, metadata_with_video, mocker):
mocker.patch("ffmpeg.probe", side_effect=Exception("Probe error"))
mocker.patch("os.makedirs")
mock_logger = mocker.patch("loguru.logger.error")
@@ -74,36 +79,43 @@ def test_enrich_handles_probe_failure(thumbnail_enricher, metadata_with_video, m
thumbnail_enricher.enrich(metadata_with_video)
# Ensure error was logged
mock_logger.assert_called_with(
f"error getting duration of video video.mp4: Probe error"
)
mock_logger.assert_called_with("error getting duration of video video.mp4: Probe error")
# Ensure no thumbnails were created
thumbnails = metadata_with_video.media[0].get("thumbnails")
assert thumbnails is None
def test_enrich_skips_non_video_files(thumbnail_enricher, metadata_with_video, mocker):
mocker.patch.object(Media, "is_video", return_value=False)
mock_ffmpeg = mocker.patch("ffmpeg.input")
thumbnail_enricher.enrich(metadata_with_video)
mock_ffmpeg.assert_not_called()
mocker.patch.object(Media, "is_video", return_value=False)
mock_ffmpeg = mocker.patch("ffmpeg.input")
thumbnail_enricher.enrich(metadata_with_video)
mock_ffmpeg.assert_not_called()
@pytest.mark.parametrize("thumbnails_per_minute,max_thumbnails,expected_count", [
(60, 5, 5), # caught by max
(60, 20, 10), # caught by t/min
(0, 20, 1), # test min caught (1)
(11, 20, 1), # test min caught (1)
(12, 20, 2), # test caught by t/min
])
@pytest.mark.parametrize(
"thumbnails_per_minute,max_thumbnails,expected_count",
[
(60, 5, 5), # caught by max
(60, 20, 10), # caught by t/min
(0, 20, 1), # test min caught (1)
(11, 20, 1), # test min caught (1)
(12, 20, 2), # test caught by t/min
],
)
def test_enrich_handles_short_video(
thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment, thumbnails_per_minute, max_thumbnails, expected_count, mocker
thumbnail_enricher,
metadata_with_video,
mock_ffmpeg_environment,
thumbnails_per_minute,
max_thumbnails,
expected_count,
mocker,
):
# override mock duration
fake_duration = 10
mocker.patch(
"ffmpeg.probe",
return_value={ "streams": [{"codec_type": "video", "duration": str(fake_duration)}]},
return_value={"streams": [{"codec_type": "video", "duration": str(fake_duration)}]},
)
thumbnail_enricher.thumbnails_per_minute = thumbnails_per_minute
thumbnail_enricher.max_thumbnails = max_thumbnails
@@ -114,9 +126,7 @@ def test_enrich_handles_short_video(
assert len(thumbnails) == expected_count
def test_uses_existing_duration(
thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment
):
def test_uses_existing_duration(thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment):
metadata_with_video.media[0].set("duration", 60)
thumbnail_enricher.enrich(metadata_with_video)
mock_ffmpeg_environment["mock_probe"].assert_not_called()
@@ -125,7 +135,7 @@ def test_uses_existing_duration(
def test_enrich_metadata_structure(thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment, mocker):
fake_duration = 120
mocker.patch("ffmpeg.probe", return_value={'streams': [{'codec_type': 'video', 'duration': str(fake_duration)}]})
mocker.patch("ffmpeg.probe", return_value={"streams": [{"codec_type": "video", "duration": str(fake_duration)}]})
thumbnail_enricher.thumbnails_per_minute = 2
thumbnail_enricher.max_thumbnails = 4

View File

@@ -4,6 +4,7 @@ from zipfile import ZipFile
import pytest
from auto_archiver.core import Metadata, Media
from auto_archiver.core.consts import SetupError
@pytest.fixture
@@ -22,6 +23,15 @@ def wacz_enricher(setup_module, mock_binary_dependencies):
return wacz
def test_raises_error_without_docker_installed(setup_module, mocker, caplog):
# pretend that docker isn't installed
mocker.patch("shutil.which").return_value = None
with pytest.raises(SetupError):
setup_module("wacz_extractor_enricher", {})
assert "requires external dependency 'docker' which is not available/setup" in caplog.text
def test_setup_without_docker(wacz_enricher, mocker):
mocker.patch.dict(os.environ, {"RUNNING_IN_DOCKER": "1"}, clear=True)
wacz_enricher.setup()

View File

@@ -5,37 +5,52 @@ from auto_archiver.modules.wayback_extractor_enricher import WaybackExtractorEnr
from auto_archiver.core import Metadata
@pytest.fixture(autouse=True)
def mock_sleep(mocker):
"""Mock time.sleep to avoid delays."""
return mocker.patch("time.sleep")
@pytest.fixture
def mock_is_auth_wall(mocker):
"""Fixture to mock is_auth_wall behavior."""
def _mock_is_auth_wall(return_value: bool):
return mocker.patch("auto_archiver.utils.url.is_auth_wall", return_value=return_value)
return _mock_is_auth_wall
@pytest.fixture
def mock_post_success(mocker):
"""Fixture to mock POST requests with a successful response."""
def _mock_post(json_data: dict = None, status_code: int = 200):
json_data = json_data or {"job_id": "job123"}
json_data = {"job_id": "job123"} if json_data is None else json_data
resp = mocker.Mock(status_code=status_code)
resp.json.return_value = json_data
return mocker.patch("requests.post", return_value=resp)
return _mock_post
@pytest.fixture
def mock_get_success(mocker):
"""Fixture to mock GET requests returning a completed archive status."""
def _mock_get(json_data: dict = None, status_code: int = 200):
json_data = json_data or {
"status": "success",
"timestamp": "20250101010101",
"original_url": "https://example.com"
"original_url": "https://example.com",
}
resp = mocker.Mock(status_code=status_code)
resp.json.return_value = json_data
return mocker.patch("requests.get", return_value=resp)
return _mock_get
@pytest.fixture
def wayback_extractor_enricher(setup_module) -> WaybackExtractorEnricher:
configs: dict = {
@@ -49,12 +64,7 @@ def wayback_extractor_enricher(setup_module) -> WaybackExtractorEnricher:
return setup_module("wayback_extractor_enricher", configs)
def test_download_success(
wayback_extractor_enricher,
mock_is_auth_wall,
mock_post_success,
mock_get_success
):
def test_download_success(wayback_extractor_enricher, mock_is_auth_wall, mock_post_success, mock_get_success):
mock_is_auth_wall(False)
mock_post_success()
mock_get_success()
@@ -63,34 +73,28 @@ def test_download_success(
result = wayback_extractor_enricher.download(metadata)
assert result.get("wayback") == "https://web.archive.org/web/20250101010101/https://example.com"
def test_enrich_auth_wall(wayback_extractor_enricher, metadata, mock_is_auth_wall):
mock_is_auth_wall(True)
result = wayback_extractor_enricher.enrich(metadata)
assert result is None
def test_enrich_already_enriched(wayback_extractor_enricher, metadata):
metadata.set("wayback", "existing")
result = wayback_extractor_enricher.enrich(metadata)
assert result is True
def test_enrich_post_failure(
wayback_extractor_enricher,
metadata,
mock_is_auth_wall,
mock_post_success
):
def test_enrich_post_failure(wayback_extractor_enricher, metadata, mock_is_auth_wall, mock_post_success):
mock_is_auth_wall(False)
mock_post_success(json_data={"error": "server error"}, status_code=500)
result = wayback_extractor_enricher.enrich(metadata)
assert result is False
assert "Internet archive failed with status of 500" in metadata.get("wayback")
def test_enrich_post_json_decode_error(
wayback_extractor_enricher,
metadata,
mock_is_auth_wall,
mocker
):
def test_enrich_post_json_decode_error(wayback_extractor_enricher, metadata, mock_is_auth_wall, mocker):
mock_is_auth_wall(False)
resp = mocker.Mock(status_code=200)
resp.json.side_effect = json.decoder.JSONDecodeError("msg", "doc", 0)
@@ -98,22 +102,15 @@ def test_enrich_post_json_decode_error(
mocker.patch("requests.post", return_value=resp)
assert wayback_extractor_enricher.enrich(metadata) is False
def test_enrich_no_job_id(
wayback_extractor_enricher,
metadata,
mock_is_auth_wall,
mock_post_success
):
def test_enrich_no_job_id(wayback_extractor_enricher, metadata, mock_is_auth_wall, mock_post_success):
mock_is_auth_wall(False)
mock_post_success(json_data={})
assert wayback_extractor_enricher.enrich(metadata) is False
def test_enrich_get_success(
wayback_extractor_enricher,
metadata,
mock_is_auth_wall,
mock_post_success,
mock_get_success
wayback_extractor_enricher, metadata, mock_is_auth_wall, mock_post_success, mock_get_success
):
mock_is_auth_wall(False)
mock_post_success()
@@ -122,24 +119,18 @@ def test_enrich_get_success(
assert metadata.get("wayback") == "https://web.archive.org/web/20250101010101/https://example.com"
assert metadata.get("check wayback") == "https://web.archive.org/web/*/https://example.com"
def test_enrich_get_failure(
wayback_extractor_enricher,
metadata,
mock_is_auth_wall,
mock_post_success,
mock_get_success
wayback_extractor_enricher, metadata, mock_is_auth_wall, mock_post_success, mock_get_success
):
mock_is_auth_wall(False)
mock_post_success()
mock_get_success(json_data={"status": "failed"}, status_code=400)
assert wayback_extractor_enricher.enrich(metadata) is False
def test_enrich_get_request_exception(
wayback_extractor_enricher,
metadata,
mock_is_auth_wall,
mock_post_success,
mocker
wayback_extractor_enricher, metadata, mock_is_auth_wall, mock_post_success, mocker
):
mock_is_auth_wall(False)
mock_post_success()
@@ -149,12 +140,9 @@ def test_enrich_get_request_exception(
assert wayback_extractor_enricher.enrich(metadata) is True
assert metadata.get("wayback").get("job_id") == "job123"
def test_enrich_get_json_decode_error(
wayback_extractor_enricher,
metadata,
mock_is_auth_wall,
mock_post_success,
mocker
wayback_extractor_enricher, metadata, mock_is_auth_wall, mock_post_success, mocker
):
mock_is_auth_wall(False)
mock_post_success()

View File

@@ -7,6 +7,12 @@ from auto_archiver.modules.whisper_enricher import WhisperEnricher
TEST_S3_URL = "http://cdn.example.com/test.mp4"
@pytest.fixture(autouse=True)
def mock_sleep(mocker):
"""Mock time.sleep to avoid delays."""
return mocker.patch("time.sleep")
@pytest.fixture
def enricher(mocker):
"""Fixture with mocked S3 and API dependencies"""
@@ -16,7 +22,7 @@ def enricher(mocker):
"include_srt": False,
"timeout": 5,
"action": "translate",
"steps": {"storages": ["s3_storage"]}
"steps": {"storages": ["s3_storage"]},
}
mock_s3 = mocker.MagicMock(spec=S3Storage)
mock_s3.get_cdn_url.return_value = TEST_S3_URL
@@ -25,7 +31,7 @@ def enricher(mocker):
instance.display_name = "Whisper Enricher"
instance.config_setup({instance.name: config})
# bypassing the setup method and mocking S3 setup
instance.stores = config['steps']['storages']
instance.stores = config["steps"]["storages"]
instance.s3 = mock_s3
yield instance, mock_s3
@@ -63,19 +69,14 @@ def test_successful_job_submission(enricher, metadata, mock_requests, mocker):
# Mock the complete API interaction chain
mock_status_response = mocker.MagicMock()
mock_status_response.status_code = 200
mock_status_response.json.return_value = {
"status": "success",
"meta": {}
}
mock_status_response.json.return_value = {"status": "success", "meta": {}}
mock_artifacts_response = mocker.MagicMock()
mock_artifacts_response.status_code = 200
mock_artifacts_response.json.return_value = [{
"data": [{"start": 0, "end": 5, "text": "test transcript"}]
}]
mock_artifacts_response.json.return_value = [{"data": [{"start": 0, "end": 5, "text": "test transcript"}]}]
# Set up mock response sequence
mock_requests.get.side_effect = [
mock_status_response, # First call: status check
mock_artifacts_response # Second call: artifacts check
mock_artifacts_response, # Second call: artifacts check
]
# Run enrichment (without opening file)
@@ -84,15 +85,17 @@ def test_successful_job_submission(enricher, metadata, mock_requests, mocker):
mock_requests.post.assert_called_once_with(
"http://testapi/jobs",
json={"url": "http://cdn.example.com/test.mp4", "type": "translate"},
headers={"Authorization": "Bearer whisper-key"}
headers={"Authorization": "Bearer whisper-key"},
)
# Verify job status checks
assert mock_requests.get.call_count == 2
assert "artifact_0_text" in metadata.media[0].get("whisper_model")
assert metadata.media[0].get("whisper_model") == {'artifact_0_text': 'test transcript',
'job_artifacts_check': 'http://testapi/jobs/job123/artifacts',
'job_id': 'job123',
'job_status_check': 'http://testapi/jobs/job123'}
assert metadata.media[0].get("whisper_model") == {
"artifact_0_text": "test transcript",
"job_artifacts_check": "http://testapi/jobs/job123/artifacts",
"job_id": "job123",
"job_status_check": "http://testapi/jobs/job123",
}
def test_submit_job(enricher, mocker):