Merge main

This commit is contained in:
Patrick Robertson
2025-02-20 10:29:57 +00:00
50 changed files with 2381 additions and 669 deletions

View File

@@ -3,12 +3,14 @@ pytest conftest file, for shared fixtures and configuration
"""
import os
import pickle
from datetime import datetime, timezone
from tempfile import TemporaryDirectory
from typing import Dict, Tuple
import hashlib
import pytest
from auto_archiver.core.metadata import Metadata
from auto_archiver.core.module import get_module, _LAZY_LOADED_MODULES
from auto_archiver.core.module import ModuleFactory
# Test names inserted into this list will be run last. This is useful for expensive/costly tests
# that you only want to run if everything else succeeds (e.g. API calls). The order here is important
@@ -20,19 +22,19 @@ TESTS_TO_RUN_LAST = ['test_twitter_api_archiver']
def setup_module(request):
def _setup_module(module_name, config={}):
module_factory = ModuleFactory()
if isinstance(module_name, type):
# get the module name:
# if the class does not have a .name, use the name of the parent folder
module_name = module_name.__module__.rsplit(".",2)[-2]
m = get_module(module_name, {module_name: config})
m = module_factory.get_module(module_name, {module_name: config})
# add the tmp_dir to the module
tmp_dir = TemporaryDirectory()
m.tmp_dir = tmp_dir.name
def cleanup():
_LAZY_LOADED_MODULES.pop(module_name)
tmp_dir.cleanup()
request.addfinalizer(cleanup)
@@ -122,10 +124,36 @@ def pytest_runtest_setup(item):
def unpickle():
"""
Returns a helper function that unpickles a file
** gets the file from the test_files directory: tests/data/test_files **
** gets the file from the test_files directory: tests/data/ **
"""
def _unpickle(path):
test_data_dir = os.path.join(os.path.dirname(__file__), "data", "test_files")
with open(os.path.join(test_data_dir, path), "rb") as f:
with open(os.path.join("tests/data", path), "rb") as f:
return pickle.load(f)
return _unpickle
return _unpickle
@pytest.fixture
def mock_binary_dependencies(mocker):
mock_shutil_which = mocker.patch("shutil.which")
# Mock all binary dependencies as available
mock_shutil_which.return_value = "/usr/bin/fake_binary"
return mock_shutil_which
@pytest.fixture
def sample_datetime():
return datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc)
@pytest.fixture(autouse=True)
def mock_sleep(mocker):
"""Globally mock time.sleep to avoid delays."""
return mocker.patch("time.sleep")
@pytest.fixture
def metadata():
metadata = Metadata()
metadata.set("_processed_at", "2021-01-01T00:00:00")
metadata.set_url("https://example.com")
return metadata

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,59 @@
import pytest
from auto_archiver.core import Metadata
from auto_archiver.modules.api_db import AAApiDb
@pytest.fixture
def api_db(setup_module):
configs: dict = {
"api_endpoint": "https://api.example.com",
"api_token": "test-token",
"public": False,
"author_id": "Someone",
"group_id": "123",
"use_api_cache": True,
"store_results": True,
"tags": "[]",
}
return setup_module(AAApiDb, configs)
def test_fetch_no_cache(api_db, metadata):
# Test fetch
api_db.use_api_cache = False
assert api_db.fetch(metadata) is None
def test_fetch_fail_status(api_db, metadata, mocker):
# Test response fail in fetch method
mock_get = mocker.patch("auto_archiver.modules.api_db.api_db.requests.get")
mock_get.return_value.status_code = 400
mock_get.return_value.json.return_value = {}
mock_error = mocker.patch("loguru.logger.error")
assert api_db.fetch(metadata) is False
mock_error.assert_called_once_with("AA API FAIL (400): {}")
def test_fetch(api_db, metadata, mocker):
# Test successful fetch method
mock_get = mocker.patch("auto_archiver.modules.api_db.api_db.requests.get")
mock_datetime = mocker.patch("auto_archiver.core.metadata.datetime.datetime")
mock_datetime.now.return_value = "2021-01-01T00:00:00"
mock_get.return_value.status_code = 200
mock_get.return_value.json.return_value = [{"result": {}}, {"result":
{'media': [], 'metadata': {'_processed_at': '2021-01-01T00:00:00', 'url': 'https://example.com'},
'status': 'no archiver'}}]
assert api_db.fetch(metadata) == metadata
def test_done_success(api_db, metadata, mocker):
mock_post = mocker.patch("auto_archiver.modules.api_db.api_db.requests.post")
mock_post.return_value.status_code = 201
api_db.done(metadata)
mock_post.assert_called_once()
mock_post.assert_called_once_with("https://api.example.com/interop/submit-archive",
json={'author_id': 'Someone', 'url': 'https://example.com',
'public': False, 'group_id': '123', 'tags': ['[', ']'], 'result': '{"status": "no archiver", "metadata": {"_processed_at": "2021-01-01T00:00:00", "url": "https://example.com"}, "media": []}'},
headers={'Authorization': 'Bearer test-token'})

View File

@@ -0,0 +1,110 @@
import pytest
from datetime import datetime
from auto_archiver.core import Metadata
from auto_archiver.modules.atlos_db import AtlosDb
class FakeAPIResponse:
"""Simulate a response object."""
def __init__(self, data: dict, raise_error: bool = False) -> None:
self._data = data
self.raise_error = raise_error
def raise_for_status(self) -> None:
if self.raise_error:
raise Exception("HTTP error")
@pytest.fixture
def atlos_db(setup_module) -> AtlosDb:
"""Fixture for AtlosDb."""
configs: dict = {
"api_token": "abc123",
"atlos_url": "https://platform.atlos.org",
}
return setup_module("atlos_db", configs)
def test_failed_no_atlos_id(atlos_db, metadata, mocker):
"""Test failed() skips posting when no atlos_id present."""
post_mock = mocker.patch("requests.post")
atlos_db.failed(metadata, "failure reason")
post_mock.assert_not_called()
def test_failed_with_atlos_id(atlos_db, metadata, mocker):
"""Test failed() posts failure when atlos_id is present."""
metadata.set("atlos_id", 42)
fake_resp = FakeAPIResponse({}, raise_error=False)
post_mock = mocker.patch("requests.post", return_value=fake_resp)
atlos_db.failed(metadata, "failure reason")
expected_url = (
f"{atlos_db.atlos_url}/api/v2/source_material/metadata/42/auto_archiver"
)
expected_headers = {"Authorization": f"Bearer {atlos_db.api_token}"}
expected_json = {
"metadata": {"processed": True, "status": "error", "error": "failure reason"}
}
post_mock.assert_called_once_with(
expected_url, headers=expected_headers, json=expected_json
)
def test_failed_http_error(atlos_db, metadata, mocker):
"""Test failed() raises exception on HTTP error."""
metadata.set("atlos_id", 42)
fake_resp = FakeAPIResponse({}, raise_error=True)
mocker.patch("requests.post", return_value=fake_resp)
with pytest.raises(Exception, match="HTTP error"):
atlos_db.failed(metadata, "failure reason")
def test_fetch_returns_false(atlos_db):
"""Test fetch() always returns False."""
item = Metadata()
assert atlos_db.fetch(item) is False
def test_done_no_atlos_id(atlos_db, mocker):
"""Test done() skips posting when no atlos_id present."""
item = Metadata().set_url("http://example.com")
post_mock = mocker.patch("requests.post")
atlos_db.done(item)
post_mock.assert_not_called()
def test_done_with_atlos_id(atlos_db, metadata, mocker):
"""Test done() posts success when atlos_id is present."""
metadata.set("atlos_id", 99)
now = datetime.now()
metadata.set("timestamp", now)
fake_resp = FakeAPIResponse({}, raise_error=False)
post_mock = mocker.patch("requests.post", return_value=fake_resp)
atlos_db.done(metadata)
expected_url = (
f"{atlos_db.atlos_url}/api/v2/source_material/metadata/99/auto_archiver"
)
expected_headers = {"Authorization": f"Bearer {atlos_db.api_token}"}
expected_results = metadata.metadata.copy()
expected_results["timestamp"] = now.isoformat()
expected_json = {
"metadata": {
"processed": True,
"status": "success",
"results": expected_results,
}
}
post_mock.assert_called_once_with(
expected_url, headers=expected_headers, json=expected_json
)
def test_done_http_error(atlos_db, metadata, mocker):
"""Test done() raises exception on HTTP error."""
metadata.set("atlos_id", 123)
fake_resp = FakeAPIResponse({}, raise_error=True)
mocker.patch("requests.post", return_value=fake_resp)
with pytest.raises(Exception, match="HTTP error"):
atlos_db.done(metadata)

View File

@@ -1,6 +1,4 @@
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
import pytest
from auto_archiver.core import Metadata, Media
@@ -9,8 +7,8 @@ from auto_archiver.modules.gsheet_feeder import GWorksheet
@pytest.fixture
def mock_gworksheet():
mock_gworksheet = MagicMock(spec=GWorksheet)
def mock_gworksheet(mocker):
mock_gworksheet = mocker.MagicMock(spec=GWorksheet)
mock_gworksheet.col_exists.return_value = True
mock_gworksheet.get_cell.return_value = ""
mock_gworksheet.get_row.return_value = {}
@@ -18,14 +16,14 @@ def mock_gworksheet():
@pytest.fixture
def mock_metadata():
metadata: Metadata = MagicMock(spec=Metadata)
def mock_metadata(mocker):
metadata: Metadata = mocker.MagicMock(spec=Metadata)
metadata.get_url.return_value = "http://example.com"
metadata.status = "done"
metadata.get_title.return_value = "Example Title"
metadata.get.return_value = "Example Content"
metadata.get_timestamp.return_value = "2025-01-01T00:00:00"
metadata.get_final_media.return_value = MagicMock(spec=Media)
metadata.get_final_media.return_value = mocker.MagicMock(spec=Media)
metadata.get_all_media.return_value = []
metadata.get_media_by_id.return_value = None
metadata.get_first_image.return_value = None
@@ -47,21 +45,21 @@ def metadata():
@pytest.fixture
def mock_media():
def mock_media(mocker):
"""Fixture for a mock Media object."""
mock_media = MagicMock(spec=Media)
mock_media = mocker.MagicMock(spec=Media)
mock_media.urls = ["http://example.com/media"]
mock_media.get.return_value = "not-calculated"
return mock_media
@pytest.fixture
def gsheets_db(mock_gworksheet, setup_module):
def gsheets_db(mock_gworksheet, setup_module, mocker):
db = setup_module("gsheet_db", {
"allow_worksheets": "set()",
"block_worksheets": "set()",
"use_sheet_names_in_stored_paths": "True",
})
db._retrieve_gsheet = MagicMock(return_value=(mock_gworksheet, 1))
db._retrieve_gsheet = mocker.MagicMock(return_value=(mock_gworksheet, 1))
return db
@@ -109,27 +107,26 @@ def test_aborted(gsheets_db, mock_metadata, mock_gworksheet):
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', '')
def test_done(gsheets_db, metadata, mock_gworksheet, expected_calls):
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00'):
gsheets_db.done(metadata)
def test_done(gsheets_db, metadata, mock_gworksheet, expected_calls, mocker):
mocker.patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00')
gsheets_db.done(metadata)
mock_gworksheet.batch_set_cell.assert_called_once_with(expected_calls)
def test_done_cached(gsheets_db, metadata, mock_gworksheet):
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00'):
gsheets_db.done(metadata, cached=True)
def test_done_cached(gsheets_db, metadata, mock_gworksheet, mocker):
mocker.patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00')
gsheets_db.done(metadata, cached=True)
# Verify the status message includes "[cached]"
call_args = mock_gworksheet.batch_set_cell.call_args[0][0]
assert any(call[2].startswith("[cached]") for call in call_args)
def test_done_missing_media(gsheets_db, metadata, mock_gworksheet):
def test_done_missing_media(gsheets_db, metadata, mock_gworksheet, mocker):
# clear media from metadata
metadata.media = []
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp",
return_value='2025-02-01T00:00:00+00:00'):
gsheets_db.done(metadata)
mocker.patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00')
gsheets_db.done(metadata)
# Verify nothing media-related gets updated
call_args = mock_gworksheet.batch_set_cell.call_args[0][0]
media_fields = {'archive', 'screenshot', 'thumbnail', 'wacz', 'replaywebpage'}

View File

@@ -2,7 +2,7 @@ import pytest
from auto_archiver.modules.hash_enricher import HashEnricher
from auto_archiver.core import Metadata, Media
from auto_archiver.core.module import get_module_lazy
from auto_archiver.core.module import ModuleFactory
@pytest.mark.parametrize("algorithm, filename, expected_hash", [
("SHA-256", "tests/data/testfile_1.txt", "1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014"),
@@ -22,7 +22,7 @@ def test_default_config_values(setup_module):
def test_config():
# test default config
c = get_module_lazy('hash_enricher').configs
c = ModuleFactory().get_module_lazy('hash_enricher').configs
assert c["algorithm"]["default"] == "SHA-256"
assert c["chunksize"]["default"] == 16000000
assert c["algorithm"]["choices"] == ["SHA-256", "SHA3-512"]

View File

@@ -1,6 +1,5 @@
import datetime
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import pytest
@@ -9,29 +8,21 @@ from auto_archiver.modules.meta_enricher import MetaEnricher
@pytest.fixture
def mock_metadata():
def mock_metadata(mocker):
"""Creates a mock Metadata object."""
mock: Metadata = MagicMock(spec=Metadata)
mock: Metadata = mocker.MagicMock(spec=Metadata)
mock.get_url.return_value = "https://example.com"
mock.is_empty.return_value = False # Default to not empty
mock.get_all_media.return_value = []
return mock
@pytest.fixture
def mock_media():
def mock_media(mocker):
"""Creates a mock Media object."""
mock: Media = MagicMock(spec=Media)
mock: Media = mocker.MagicMock(spec=Media)
mock.filename = "mock_file.txt"
return mock
@pytest.fixture
def metadata():
m = Metadata()
m.set_url("https://example.com")
m.set_title("Test Title")
m.set_content("Test Content")
return m
@pytest.fixture(autouse=True)
def meta_enricher(setup_module):
@@ -90,14 +81,14 @@ def test_enrich_file_sizes_no_media(meta_enricher, metadata):
assert metadata.get("total_size") == "0.0 bytes"
def test_enrich_archive_duration(meta_enricher, metadata):
def test_enrich_archive_duration(meta_enricher, metadata, mocker):
# Set fixed "processed at" time in the past
processed_at = datetime.now(timezone.utc) - timedelta(minutes=10, seconds=30)
metadata.set("_processed_at", processed_at)
# patch datetime
with patch("datetime.datetime") as mock_datetime:
mock_now = datetime.now(timezone.utc)
mock_datetime.now.return_value = mock_now
meta_enricher.enrich_archive_duration(metadata)
mock_datetime = mocker.patch("datetime.datetime")
mock_now = datetime.now(timezone.utc)
mock_datetime.now.return_value = mock_now
meta_enricher.enrich_archive_duration(metadata)
assert metadata.get("archive_duration_seconds") == 630

View File

@@ -0,0 +1,88 @@
import pytest
from auto_archiver.core import Media
@pytest.fixture
def mock_media(mocker):
"""Creates a mock Media object."""
mock: Media = mocker.MagicMock(spec=Media)
mock.filename = "mock_file.txt"
return mock
@pytest.fixture
def enricher(setup_module, mock_binary_dependencies):
return setup_module("metadata_enricher", {})
@pytest.mark.parametrize(
"output,expected",
[
("Key1: Value1\nKey2: Value2", {"Key1": "Value1", "Key2": "Value2"}),
("InvalidLine", {}),
("", {}),
],
)
def test_get_metadata(enricher, output, expected, mocker):
mock_run = mocker.patch("subprocess.run")
mock_run.return_value.stdout = output
mock_run.return_value.stderr = ""
mock_run.return_value.returncode = 0
result = enricher.get_metadata("test.jpg")
assert result == expected
mock_run.assert_called_once_with(
["exiftool", "test.jpg"], capture_output=True, text=True
)
def test_get_metadata_exiftool_not_found(enricher, mocker):
mock_run = mocker.patch("subprocess.run")
mock_run.side_effect = FileNotFoundError
result = enricher.get_metadata("test.jpg")
assert result == {}
def test_enrich_sets_metadata(enricher, mocker):
media1 = mocker.Mock(filename="img1.jpg")
media2 = mocker.Mock(filename="img2.jpg")
metadata = mocker.Mock()
metadata.media = [media1, media2]
enricher.get_metadata = lambda f: {"key": "value"} if f == "img1.jpg" else {}
enricher.enrich(metadata)
media1.set.assert_called_once_with("metadata", {"key": "value"})
media2.set.assert_not_called()
assert metadata.media == [media1, media2]
def test_enrich_empty_media(enricher, mocker):
metadata = mocker.Mock()
metadata.media = []
# Should not raise errors
enricher.enrich(metadata)
def test_get_metadata_error_handling(enricher, mocker):
mocker.patch("subprocess.run", side_effect=Exception("Test error"))
mock_log = mocker.patch("loguru.logger.error")
result = enricher.get_metadata("test.jpg")
assert result == {}
assert "Error occurred: " in mock_log.call_args[0][0]
def test_metadata_pickle(enricher, unpickle, mocker):
mock_run = mocker.patch("subprocess.run")
# Uses pickled values
mock_run.return_value = unpickle("metadata_enricher_exif.pickle")
metadata = unpickle("metadata_enricher_ytshort_input.pickle")
expected = unpickle("metadata_enricher_ytshort_expected.pickle")
enricher.enrich(metadata)
expected_media = expected.media
actual_media = metadata.media
assert len(expected_media) == len(actual_media)
assert actual_media[0].properties.get("metadata") == expected_media[0].properties.get("metadata")

View File

@@ -0,0 +1,78 @@
import pytest
from PIL import UnidentifiedImageError
from auto_archiver.core import Metadata, Media
from auto_archiver.modules.pdq_hash_enricher import PdqHashEnricher
@pytest.fixture
def enricher(setup_module):
return setup_module("pdq_hash_enricher", {})
@pytest.fixture
def metadata_with_images():
m = Metadata()
m.set_url("https://example.com")
m.add_media(Media(filename="image1.jpg", key="image1"))
m.add_media(Media(filename="image2.jpg", key="image2"))
return m
def test_successful_enrich(metadata_with_images, mocker):
with (
mocker.patch("pdqhash.compute", return_value=([1, 0, 1, 0] * 64, 100)),
mocker.patch("PIL.Image.open"),
mocker.patch.object(Media, "is_image", return_value=True) as mock_is_image,
):
enricher = PdqHashEnricher()
enricher.enrich(metadata_with_images)
# Ensure the hash is set for image media
for media in metadata_with_images.media:
assert media.get("pdq_hash") is not None
def test_enrich_skip_non_image(metadata_with_images, mocker):
mocker.patch.object(Media, "is_image", return_value=False)
mock_pdq = mocker.patch("pdqhash.compute")
enricher = PdqHashEnricher()
enricher.enrich(metadata_with_images)
mock_pdq.assert_not_called()
def test_enrich_handles_corrupted_image(metadata_with_images, mocker):
mocker.patch("PIL.Image.open", side_effect=UnidentifiedImageError("Corrupted image"))
mock_pdq = mocker.patch("pdqhash.compute")
mock_logger = mocker.patch("loguru.logger.error")
enricher = PdqHashEnricher()
enricher.enrich(metadata_with_images)
assert mock_logger.call_count == len(metadata_with_images.media)
mock_pdq.assert_not_called()
@pytest.mark.parametrize(
"media_id, should_have_hash",
[
("screenshot", False),
("warc-file-123", False),
("regular-image", True),
]
)
def test_enrich_excludes_by_filetype(media_id, should_have_hash, mocker):
metadata = Metadata()
metadata.set_url("https://example.com")
metadata.add_media(Media(filename="image.jpg").set("id", media_id))
mocker.patch("pdqhash.compute", return_value=([1, 0, 1, 0] * 64, 100))
mocker.patch("PIL.Image.open")
mocker.patch.object(Media, "is_image", return_value=True)
enricher = PdqHashEnricher()
enricher.enrich(metadata)
media_item = metadata.media[0]
assert (media_item.get("pdq_hash") is not None) == should_have_hash

View File

@@ -0,0 +1,195 @@
import base64
import pytest
from selenium.common.exceptions import TimeoutException
from auto_archiver.core import Metadata, Media
from auto_archiver.modules.screenshot_enricher import ScreenshotEnricher
@pytest.fixture
def mock_selenium_env(mocker):
"""Patches Selenium calls and driver checks in one place."""
# Patch external dependencies
mock_which = mocker.patch("shutil.which")
mock_driver_class = mocker.patch("auto_archiver.utils.webdriver.CookieSettingDriver")
mock_binary_paths = mocker.patch("selenium.webdriver.common.selenium_manager.SeleniumManager.binary_paths")
mock_is_file = mocker.patch("pathlib.Path.is_file", return_value=True)
mock_popen = mocker.patch("subprocess.Popen")
mock_is_connectable = mocker.patch("selenium.webdriver.common.service.Service.is_connectable", return_value=True)
mock_firefox_options = mocker.patch("selenium.webdriver.FirefoxOptions")
# Define side effect for `shutil.which`
def mock_which_side_effect(dep):
return "/mock/geckodriver" if dep == "geckodriver" else None
mock_which.side_effect = mock_which_side_effect
# Mock binary paths
mock_binary_paths.return_value = {
"driver_path": "/mock/driver",
"browser_path": "/mock/browser",
}
# Mock `subprocess.Popen`
mock_proc = mocker.MagicMock()
mock_proc.poll.return_value = None
mock_popen.return_value = mock_proc
# Mock `CookieSettingDriver`
mock_driver = mocker.MagicMock()
mock_driver_class.return_value = mock_driver
# Mock `FirefoxOptions`
mock_options_instance = mocker.MagicMock()
mock_firefox_options.return_value = mock_options_instance
yield mock_driver, mock_driver_class, mock_options_instance
@pytest.fixture
def common_patches(tmp_path, mocker):
"""Patches common utilities used across multiple tests."""
mocker.patch("auto_archiver.utils.url.is_auth_wall", return_value=False)
mocker.patch("os.path.join", return_value=str(tmp_path / "test.png"))
mocker.patch("time.sleep")
yield
@pytest.fixture
def screenshot_enricher(setup_module, mock_binary_dependencies) -> ScreenshotEnricher:
configs: dict = {
"width": 1280,
"height": 720,
"timeout": 60,
"sleep_before_screenshot": 4,
"http_proxy": "",
"save_to_pdf": "False",
"print_options": {},
}
return setup_module("screenshot_enricher", configs)
@pytest.fixture
def metadata_with_video():
m = Metadata()
m.set_url("https://example.com")
m.add_media(Media(filename="video.mp4").set("id", "video1"))
return m
def test_enrich_adds_screenshot(
screenshot_enricher,
metadata_with_video,
mock_selenium_env,
common_patches,
tmp_path,
):
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
screenshot_enricher.enrich(metadata_with_video)
mock_driver_class.assert_called_once_with(
cookies=None,
cookiejar=None,
facebook_accept_cookies=False,
options=mock_options_instance,
)
# Verify the actual calls on the returned mock_driver
mock_driver.get.assert_called_once_with("https://example.com")
mock_driver.save_screenshot.assert_called_once_with(str(tmp_path / "test.png"))
# Check that the media was added (2 = original video + screenshot)
assert len(metadata_with_video.media) == 2
assert metadata_with_video.media[1].properties.get("id") == "screenshot"
@pytest.mark.parametrize(
"url,is_auth",
[
("https://example.com", False),
("https://private.com", True),
],
)
def test_enrich_auth_wall(
screenshot_enricher,
metadata_with_video,
mock_selenium_env,
common_patches,
url,
is_auth,
mocker
):
# Testing with and without is_auth_wall
mock_driver, mock_driver_class, _ = mock_selenium_env
mocker.patch("auto_archiver.utils.url.is_auth_wall", return_value=is_auth)
metadata_with_video.set_url(url)
screenshot_enricher.enrich(metadata_with_video)
if is_auth:
mock_driver.get.assert_not_called()
assert len(metadata_with_video.media) == 1
assert metadata_with_video.media[0].properties.get("id") == "video1"
else:
mock_driver.get.assert_called_once_with(url)
assert len(metadata_with_video.media) == 2
assert metadata_with_video.media[1].properties.get("id") == "screenshot"
def test_handle_timeout_exception(
screenshot_enricher, metadata_with_video, mock_selenium_env, mocker
):
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
mock_driver.get.side_effect = TimeoutException
mock_log = mocker.patch("loguru.logger.info")
screenshot_enricher.enrich(metadata_with_video)
mock_log.assert_called_once_with("TimeoutException loading page for screenshot")
assert len(metadata_with_video.media) == 1
def test_handle_general_exception(
screenshot_enricher, metadata_with_video, mock_selenium_env, mocker
):
"""Test proper handling of unexpected general exceptions"""
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
# Simulate a generic exception when save_screenshot is called
mock_driver.get.return_value = None
mock_driver.save_screenshot.side_effect = Exception("Unexpected Error")
mock_log = mocker.patch("loguru.logger.error")
screenshot_enricher.enrich(metadata_with_video)
# Verify that the exception was logged with the log
mock_log.assert_called_once_with(
"Got error while loading webdriver for screenshot enricher: Unexpected Error"
)
# And no new media was added due to the error
assert len(metadata_with_video.media) == 1
def test_pdf_creation(mocker, screenshot_enricher, metadata_with_video, mock_selenium_env):
"""Test PDF creation when save_to_pdf is enabled"""
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
# Override the save_to_pdf option
screenshot_enricher.save_to_pdf = True
# Mock the print_page method to return base64-encoded content
mock_driver.print_page.return_value = base64.b64encode(b"fake_pdf_content").decode("utf-8")
# Patch functions with mocker
mock_os_path_join = mocker.patch("os.path.join", side_effect=lambda *args: f"{args[-1]}")
mock_random_str = mocker.patch(
"auto_archiver.modules.screenshot_enricher.screenshot_enricher.random_str",
return_value="fixed123",
)
mock_open = mocker.patch("builtins.open", new_callable=mocker.mock_open)
mock_log_error = mocker.patch("loguru.logger.error")
screenshot_enricher.enrich(metadata_with_video)
# Verify screenshot and PDF creation
mock_driver.save_screenshot.assert_called_once()
mock_driver.print_page.assert_called_once_with(mock_driver.print_options)
# Check that PDF file was opened and written
mock_open.assert_any_call("pdf_fixed123.pdf", "wb")
# Ensure both screenshot and PDF were added as media
assert len(metadata_with_video.media) == 3
assert metadata_with_video.media[1].properties.get("id") == "screenshot"
assert metadata_with_video.media[2].properties.get("id") == "pdf"
@pytest.fixture(autouse=True)
def cleanup_files(tmp_path):
yield
for file in tmp_path.iterdir():
file.unlink()

View File

@@ -0,0 +1,54 @@
import ssl
import pytest
from auto_archiver.core import Metadata, Media
@pytest.fixture
def enricher(setup_module):
configs: dict = {
"skip_when_nothing_archived": "True",
}
return setup_module("ssl_enricher", configs)
@pytest.fixture
def metadata():
m = Metadata()
m.set_url("https://example.com")
m.add_media(Media("tests/data/testfile_1.txt"))
m.add_media(Media("tests/data/testfile_2.txt"))
return m
def test_http_raises(metadata, enricher):
metadata.set_url("http://example.com")
with pytest.raises(AssertionError) as exc_info:
enricher.enrich(metadata)
assert "Invalid URL scheme" in str(exc_info.value)
def test_empty_metadata(metadata, enricher):
metadata.media = []
assert enricher.enrich(metadata) is None
def test_ssl_enrich(metadata, enricher, mocker):
mocker.patch("ssl.get_server_certificate", return_value="TEST_CERT")
mock_file = mocker.patch("builtins.open", mocker.mock_open())
media_len_before = len(metadata.media)
enricher.enrich(metadata)
ssl.get_server_certificate.assert_called_once_with(("example.com", 443))
mock_file.assert_called_once_with(f"{enricher.tmp_dir}/example-com.pem", "w")
mock_file().write.assert_called_once_with("TEST_CERT")
assert len(metadata.media) == media_len_before + 1
# Ensure the certificate is added to metadata
assert any(media.filename.endswith("example-com.pem") for media in metadata.media)
def test_ssl_error_handling(enricher, metadata, mocker):
mocker.patch("ssl.get_server_certificate", side_effect=ssl.SSLError("SSL error"))
with pytest.raises(ssl.SSLError, match="SSL error"):
enricher.enrich(metadata)

View File

@@ -0,0 +1,148 @@
import pytest
from auto_archiver.core import Metadata, Media
from auto_archiver.modules.thumbnail_enricher import ThumbnailEnricher
@pytest.fixture
def thumbnail_enricher(setup_module, mock_binary_dependencies) -> ThumbnailEnricher:
config: dict = {
"thumbnails_per_minute": 60,
"max_thumbnails": 4,
}
return setup_module("thumbnail_enricher", config)
@pytest.fixture
def metadata_with_video():
m = Metadata()
m.set_url("https://example.com")
m.add_media(Media(filename="video.mp4").set("id", "video1"))
return m
@pytest.fixture
def mock_ffmpeg_environment(mocker):
# Mocking all the ffmpeg calls in one place
mock_ffmpeg_input = mocker.patch("ffmpeg.input")
mock_makedirs = mocker.patch("os.makedirs")
mocker.patch.object(Media, "is_video", return_value=True),
mock_probe = mocker.patch(
"ffmpeg.probe",
return_value={
"streams": [
{"codec_type": "video", "duration": "120"}
] # Default 2-minute duration, but can override in tests
},
)
mock_output = mocker.MagicMock()
mock_ffmpeg_input.return_value.filter.return_value.output.return_value = (
mock_output
)
return {
"mock_ffmpeg_input": mock_ffmpeg_input,
"mock_makedirs": mock_makedirs,
"mock_output": mock_output,
"mock_probe": mock_probe,
}
@pytest.mark.parametrize("thumbnails_per_minute, max_thumbnails, expected_count", [
(10, 5, 5), # Capped at max_thumbnails
(1, 10, 2), # Less than max_thumbnails
(60, 7, 7), # Matches exactly
])
def test_enrich_thumbnail_limits(
thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment,
thumbnails_per_minute, max_thumbnails, expected_count
):
thumbnail_enricher.thumbnails_per_minute = thumbnails_per_minute
thumbnail_enricher.max_thumbnails = max_thumbnails
thumbnail_enricher.enrich(metadata_with_video)
assert mock_ffmpeg_environment["mock_output"].run.call_count == expected_count
thumbnails = metadata_with_video.media[0].get("thumbnails")
assert len(thumbnails) == expected_count
def test_enrich_handles_probe_failure(thumbnail_enricher, metadata_with_video, mocker):
mocker.patch("ffmpeg.probe", side_effect=Exception("Probe error"))
mocker.patch("os.makedirs")
mock_logger = mocker.patch("loguru.logger.error")
mocker.patch.object(Media, "is_video", return_value=True)
thumbnail_enricher.enrich(metadata_with_video)
# Ensure error was logged
mock_logger.assert_called_with(
f"error getting duration of video video.mp4: Probe error"
)
# Ensure no thumbnails were created
thumbnails = metadata_with_video.media[0].get("thumbnails")
assert thumbnails is None
def test_enrich_skips_non_video_files(thumbnail_enricher, metadata_with_video, mocker):
mocker.patch.object(Media, "is_video", return_value=False)
mock_ffmpeg = mocker.patch("ffmpeg.input")
thumbnail_enricher.enrich(metadata_with_video)
mock_ffmpeg.assert_not_called()
@pytest.mark.parametrize("thumbnails_per_minute,max_thumbnails,expected_count", [
(60, 5, 5), # caught by max
(60, 20, 10), # caught by t/min
(0, 20, 1), # test min caught (1)
(11, 20, 1), # test min caught (1)
(12, 20, 2), # test caught by t/min
])
def test_enrich_handles_short_video(
thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment, thumbnails_per_minute, max_thumbnails, expected_count, mocker
):
# override mock duration
fake_duration = 10
mocker.patch(
"ffmpeg.probe",
return_value={ "streams": [{"codec_type": "video", "duration": str(fake_duration)}]},
)
thumbnail_enricher.thumbnails_per_minute = thumbnails_per_minute
thumbnail_enricher.max_thumbnails = max_thumbnails
thumbnail_enricher.enrich(metadata_with_video)
assert mock_ffmpeg_environment["mock_output"].run.call_count == expected_count
thumbnails = metadata_with_video.media[0].get("thumbnails")
assert len(thumbnails) == expected_count
def test_uses_existing_duration(
thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment
):
metadata_with_video.media[0].set("duration", 60)
thumbnail_enricher.enrich(metadata_with_video)
mock_ffmpeg_environment["mock_probe"].assert_not_called()
assert mock_ffmpeg_environment["mock_output"].run.call_count == 4
def test_enrich_metadata_structure(thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment, mocker):
fake_duration = 120
mocker.patch("ffmpeg.probe", return_value={'streams': [{'codec_type': 'video', 'duration': str(fake_duration)}]})
thumbnail_enricher.thumbnails_per_minute = 2
thumbnail_enricher.max_thumbnails = 4
thumbnail_enricher.enrich(metadata_with_video)
media_item = metadata_with_video.media[0]
thumbnails = media_item.get("thumbnails")
# Assert normal metadata
assert media_item.get("id") == "video1"
assert media_item.get("duration") == fake_duration
# Evenly spaced timestamps
expected_timestamps = ["24.000s", "48.000s", "72.000s", "96.000s"]
assert thumbnails is not None
assert len(thumbnails) == 4
for index, thumbnail in enumerate(thumbnails):
assert thumbnail.filename is not None
assert thumbnail.properties.get("id") == f"thumbnail_{index}"
assert thumbnail.properties.get("timestamp") == expected_timestamps[index]

View File

@@ -0,0 +1,112 @@
import os
from zipfile import ZipFile
import pytest
from auto_archiver.core import Metadata, Media
@pytest.fixture
def wacz_enricher(setup_module, mock_binary_dependencies):
configs: dict = {
"profile": None,
"docker_commands": None,
"timeout": 120,
"extract_media": False,
"extract_screenshot": True,
"socks_proxy_host": None,
"socks_proxy_port": None,
"proxy_server": None,
}
wacz = setup_module("wacz_enricher", configs)
return wacz
def test_setup_without_docker(wacz_enricher, mocker):
mocker.patch.dict(os.environ, {"RUNNING_IN_DOCKER": "1"}, clear=True)
wacz_enricher.setup()
assert not wacz_enricher.docker_in_docker
def test_setup_with_docker(wacz_enricher, mocker):
mocker.patch.dict(os.environ, {"WACZ_ENABLE_DOCKER": "1"}, clear=True)
wacz_enricher.setup()
assert wacz_enricher.use_docker
def test_already_ran(wacz_enricher, metadata, mocker):
metadata.add_media(Media("test.wacz"), id="browsertrix")
mock_log = mocker.patch("loguru.logger.info")
assert wacz_enricher.enrich(metadata) is True
assert "WACZ enricher had already been executed" in mock_log.call_args[0][0]
def test_basic_call_execution(wacz_enricher, mocker):
mock_run = mocker.patch("subprocess.run")
mock_run.return_value = mocker.Mock(returncode=0)
metadata = Metadata().set_url("https://example.com")
wacz_enricher.enrich(metadata)
assert mock_run.called
# Checks that the url is passed to the cmd
assert "--url https://example.com" in " ".join(mock_run.call_args[0][0])
def test_download_success(wacz_enricher, mocker) -> None:
"""Test download returns metadata on successful enrichment."""
basic_metadata = Metadata().set_url("https://example.com")
mocker.patch.object(wacz_enricher, "enrich", return_value=True)
result = wacz_enricher.download(basic_metadata)
assert result is not None
assert isinstance(result, Metadata)
assert result.status == "wacz: success"
def test_enrich_already_executed(wacz_enricher, mocker) -> None:
"""Test enrich if already executed."""
mock_log = mocker.patch("loguru.logger.info")
metadata = Metadata().set_url("https://example.com")
media = Media(filename="some_file.wacz")
metadata.add_media(media, id="browsertrix")
result = wacz_enricher.enrich(metadata)
assert result is True
assert "WACZ enricher had already been executed:" in mock_log.call_args[0][0]
def test_enrich_subprocess_exception(wacz_enricher, mocker, tmp_path) -> None:
"""Test enrich returns False when subprocess fails."""
wacz_enricher.tmp_dir = str(tmp_path)
wacz_enricher.extract_media = False
wacz_enricher.extract_screenshot = True
mocker.patch("auto_archiver.utils.misc.random_str", return_value="TESTCOL")
mocker.patch("subprocess.run", side_effect=Exception("fail"))
basic_metadata = Metadata().set_url("https://example.com")
result = wacz_enricher.enrich(basic_metadata)
assert result is False
def test_extract_media(wacz_enricher, metadata, tmp_path, mocker) -> None:
"""Test extract_media_from_wacz extracts screenshot media."""
wacz_enricher.tmp_dir = str(tmp_path)
# Create a *real* zip file so ZipFile won't fail.
wacz_file = tmp_path / "dummy.wacz"
with ZipFile(wacz_file, "w") as zf:
zf.writestr("dummy.txt", "test content")
mocker.patch("os.listdir", return_value=[])
warc_data = (
b"WARC/1.0\r\n"
b"WARC-Type: resource\r\n"
b"Content-Type: image/png\r\n"
b"WARC-Target-URI: http://example.com/image.png\r\n"
b"Content-Length: 12\r\n"
b"\r\n"
b"image-bytes"
b"\r\n\r\nWARC/1.0\r\n\r\n"
)
mock_file = mocker.mock_open(read_data=warc_data)
mocker.patch("builtins.open", mock_file)
metadata.add_media(Media("something.wacz"), "browsertrix")
wacz_enricher.extract_media_from_wacz(metadata, str(wacz_file))
assert len(metadata.media) == 2
assert metadata.media[1].properties.get("id") == "browsertrix-screenshot"

View File

@@ -0,0 +1,168 @@
import json
import requests
import pytest
from auto_archiver.modules.wayback_extractor_enricher import WaybackExtractorEnricher
from auto_archiver.core import Metadata
@pytest.fixture
def mock_is_auth_wall(mocker):
"""Fixture to mock is_auth_wall behavior."""
def _mock_is_auth_wall(return_value: bool):
return mocker.patch("auto_archiver.utils.url.is_auth_wall", return_value=return_value)
return _mock_is_auth_wall
@pytest.fixture
def mock_post_success(mocker):
"""Fixture to mock POST requests with a successful response."""
def _mock_post(json_data: dict = None, status_code: int = 200):
json_data = json_data or {"job_id": "job123"}
resp = mocker.Mock(status_code=status_code)
resp.json.return_value = json_data
return mocker.patch("requests.post", return_value=resp)
return _mock_post
@pytest.fixture
def mock_get_success(mocker):
"""Fixture to mock GET requests returning a completed archive status."""
def _mock_get(json_data: dict = None, status_code: int = 200):
json_data = json_data or {
"status": "success",
"timestamp": "20250101010101",
"original_url": "https://example.com"
}
resp = mocker.Mock(status_code=status_code)
resp.json.return_value = json_data
return mocker.patch("requests.get", return_value=resp)
return _mock_get
@pytest.fixture
def wayback_extractor_enricher(setup_module) -> WaybackExtractorEnricher:
configs: dict = {
"timeout": 5,
"if_not_archived_within": None,
"key": "somekey",
"secret": "secret",
"proxy_http": None,
"proxy_https": None,
}
return setup_module("wayback_extractor_enricher", configs)
def test_download_success(
wayback_extractor_enricher,
mock_is_auth_wall,
mock_post_success,
mock_get_success
):
mock_is_auth_wall(False)
mock_post_success()
mock_get_success()
# Basic metadata to allow merge
metadata = Metadata().set_url("https://example.com")
result = wayback_extractor_enricher.download(metadata)
assert result.get("wayback") == "https://web.archive.org/web/20250101010101/https://example.com"
def test_enrich_auth_wall(wayback_extractor_enricher, metadata, mock_is_auth_wall):
mock_is_auth_wall(True)
result = wayback_extractor_enricher.enrich(metadata)
assert result is None
def test_enrich_already_enriched(wayback_extractor_enricher, metadata):
metadata.set("wayback", "existing")
result = wayback_extractor_enricher.enrich(metadata)
assert result is True
def test_enrich_post_failure(
wayback_extractor_enricher,
metadata,
mock_is_auth_wall,
mock_post_success
):
mock_is_auth_wall(False)
mock_post_success(json_data={"error": "server error"}, status_code=500)
result = wayback_extractor_enricher.enrich(metadata)
assert result is False
assert "Internet archive failed with status of 500" in metadata.get("wayback")
def test_enrich_post_json_decode_error(
wayback_extractor_enricher,
metadata,
mock_is_auth_wall,
mocker
):
mock_is_auth_wall(False)
resp = mocker.Mock(status_code=200)
resp.json.side_effect = json.decoder.JSONDecodeError("msg", "doc", 0)
resp.text = "invalid json"
mocker.patch("requests.post", return_value=resp)
assert wayback_extractor_enricher.enrich(metadata) is False
def test_enrich_no_job_id(
wayback_extractor_enricher,
metadata,
mock_is_auth_wall,
mock_post_success
):
mock_is_auth_wall(False)
mock_post_success(json_data={})
assert wayback_extractor_enricher.enrich(metadata) is False
def test_enrich_get_success(
wayback_extractor_enricher,
metadata,
mock_is_auth_wall,
mock_post_success,
mock_get_success
):
mock_is_auth_wall(False)
mock_post_success()
mock_get_success()
assert wayback_extractor_enricher.enrich(metadata) is True
assert metadata.get("wayback") == "https://web.archive.org/web/20250101010101/https://example.com"
assert metadata.get("check wayback") == "https://web.archive.org/web/*/https://example.com"
def test_enrich_get_failure(
wayback_extractor_enricher,
metadata,
mock_is_auth_wall,
mock_post_success,
mock_get_success
):
mock_is_auth_wall(False)
mock_post_success()
mock_get_success(json_data={"status": "failed"}, status_code=400)
assert wayback_extractor_enricher.enrich(metadata) is False
def test_enrich_get_request_exception(
wayback_extractor_enricher,
metadata,
mock_is_auth_wall,
mock_post_success,
mocker
):
mock_is_auth_wall(False)
mock_post_success()
mocker.patch("requests.get", side_effect=requests.exceptions.RequestException("error"))
mocker.patch("time.sleep", return_value=None)
# check it still enriches the job_id information
assert wayback_extractor_enricher.enrich(metadata) is True
assert metadata.get("wayback").get("job_id") == "job123"
def test_enrich_get_json_decode_error(
wayback_extractor_enricher,
metadata,
mock_is_auth_wall,
mock_post_success,
mocker
):
mock_is_auth_wall(False)
mock_post_success()
resp = mocker.Mock()
resp.json.side_effect = json.decoder.JSONDecodeError("msg", "doc", 0)
resp.text = "invalid json"
mocker.patch("requests.get", return_value=resp)
mocker.patch("time.sleep", return_value=None)
# check it still enriches the job_id information
assert wayback_extractor_enricher.enrich(metadata) is True
assert metadata.get("wayback").get("job_id") == "job123"

View File

@@ -0,0 +1,133 @@
import pytest
from auto_archiver.core import Metadata, Media
from auto_archiver.modules.s3_storage import S3Storage
from auto_archiver.modules.whisper_enricher import WhisperEnricher
TEST_S3_URL = "http://cdn.example.com/test.mp4"
@pytest.fixture
def enricher(mocker):
"""Fixture with mocked S3 and API dependencies"""
config = {
"api_endpoint": "http://testapi",
"api_key": "whisper-key",
"include_srt": False,
"timeout": 5,
"action": "translate",
"steps": {"storages": ["s3_storage"]}
}
mock_s3 = mocker.MagicMock(spec=S3Storage)
mock_s3.get_cdn_url.return_value = TEST_S3_URL
instance = WhisperEnricher()
instance.name = "whisper_enricher"
instance.display_name = "Whisper Enricher"
instance.config_setup({instance.name: config})
# bypassing the setup method and mocking S3 setup
instance.stores = config['steps']['storages']
instance.s3 = mock_s3
yield instance, mock_s3
@pytest.fixture
def metadata():
metadata = Metadata()
metadata.set_url("http://test.url")
metadata.set_title("test title")
return metadata
@pytest.fixture
def mock_requests(mocker):
mock_requests = mocker.patch("auto_archiver.modules.whisper_enricher.whisper_enricher.requests")
mock_response = mocker.MagicMock()
mock_response.status_code = 201
mock_response.json.return_value = {"id": "job123"}
mock_requests.post.return_value = mock_response
yield mock_requests
def test_successful_job_submission(enricher, metadata, mock_requests, mocker):
"""Test successful media processing with S3 configured"""
whisper, mock_s3 = enricher
# Configure mock S3 URL to match test expectation
mock_s3.get_cdn_url.return_value = TEST_S3_URL
# Create test media with matching CDN URL
m = Media("test.mp4")
m.mimetype = "video/mp4"
m.add_url(mock_s3.get_cdn_url.return_value)
metadata.media = [m]
# Mock the complete API interaction chain
mock_status_response = mocker.MagicMock()
mock_status_response.status_code = 200
mock_status_response.json.return_value = {
"status": "success",
"meta": {}
}
mock_artifacts_response = mocker.MagicMock()
mock_artifacts_response.status_code = 200
mock_artifacts_response.json.return_value = [{
"data": [{"start": 0, "end": 5, "text": "test transcript"}]
}]
# Set up mock response sequence
mock_requests.get.side_effect = [
mock_status_response, # First call: status check
mock_artifacts_response # Second call: artifacts check
]
# Run enrichment (without opening file)
whisper.enrich(metadata)
# Check API interactions
mock_requests.post.assert_called_once_with(
"http://testapi/jobs",
json={"url": "http://cdn.example.com/test.mp4", "type": "translate"},
headers={"Authorization": "Bearer whisper-key"}
)
# Verify job status checks
assert mock_requests.get.call_count == 2
assert "artifact_0_text" in metadata.media[0].get("whisper_model")
assert metadata.media[0].get("whisper_model") == {'artifact_0_text': 'test transcript',
'job_artifacts_check': 'http://testapi/jobs/job123/artifacts',
'job_id': 'job123',
'job_status_check': 'http://testapi/jobs/job123'}
def test_submit_job(enricher, mocker):
"""Test job submission method"""
whisper, _ = enricher
m = Media("test.mp4")
m.add_url(TEST_S3_URL)
mock_requests = mocker.patch("auto_archiver.modules.whisper_enricher.whisper_enricher.requests")
mock_response = mocker.MagicMock()
mock_response.status_code = 201
mock_response.json.return_value = {"id": "job123"}
mock_requests.post.return_value = mock_response
job_id = whisper.submit_job(m)
assert job_id == "job123"
def test_submit_raises_status(enricher, mocker):
whisper, _ = enricher
m = Media("test.mp4")
m.add_url(TEST_S3_URL)
mock_requests = mocker.patch("auto_archiver.modules.whisper_enricher.whisper_enricher.requests")
mock_response = mocker.MagicMock()
mock_response.status_code = 400
mock_response.json.return_value = {"id": "job123"}
mock_requests.post.return_value = mock_response
with pytest.raises(AssertionError) as exc_info:
whisper.submit_job(m)
assert str(exc_info.value) == "calling the whisper api http://testapi returned a non-success code: 400"
# @pytest.mark.parametrize("test_url, status", ["http://cdn.example.com/test.mp4",])
def test_submit_job_fails(enricher):
"""Test assertion fails with non-S3 URL"""
whisper, mock_s3 = enricher
m = Media("test.mp4")
m.add_url("http://cdn.wrongurl.com/test.mp4")
with pytest.raises(AssertionError):
whisper.submit_job(m)

View File

@@ -1,15 +1,12 @@
from datetime import datetime
from typing import Type
import pytest
from unittest.mock import patch, MagicMock
from auto_archiver.core import Metadata
from auto_archiver.modules.instagram_api_extractor.instagram_api_extractor import InstagramAPIExtractor
from .test_extractor_base import TestExtractorBase
@pytest.fixture
def mock_user_response():
return {
@@ -115,74 +112,74 @@ class TestInstagramAPIExtractor(TestExtractorBase):
# test gets text (metadata title)
pass
def test_download_profile_basic(self, metadata, mock_user_response):
def test_download_profile_basic(self, metadata, mock_user_response, mocker):
"""Test basic profile download without full_profile"""
with patch.object(self.extractor, 'call_api') as mock_call, \
patch.object(self.extractor, 'download_from_url') as mock_download:
# Mock API responses
mock_call.return_value = mock_user_response
mock_download.return_value = "profile.jpg"
mock_call = mocker.patch.object(self.extractor, 'call_api')
mock_download = mocker.patch.object(self.extractor, 'download_from_url')
# Mock API responses
mock_call.return_value = mock_user_response
mock_download.return_value = "profile.jpg"
result = self.extractor.download_profile(metadata, "test_user")
assert result.status == "insta profile: success"
assert result.get_title() == "Test User"
assert result.get("data") == self.extractor.cleanup_dict(mock_user_response["user"])
# Verify profile picture download
mock_call.assert_called_once_with("v2/user/by/username", {"username": "test_user"})
mock_download.assert_called_once_with("http://example.com/profile.jpg")
assert len(result.media) == 1
assert result.media[0].filename == "profile.jpg"
result = self.extractor.download_profile(metadata, "test_user")
assert result.status == "insta profile: success"
assert result.get_title() == "Test User"
assert result.get("data") == self.extractor.cleanup_dict(mock_user_response["user"])
# Verify profile picture download
mock_call.assert_called_once_with("v2/user/by/username", {"username": "test_user"})
mock_download.assert_called_once_with("http://example.com/profile.jpg")
assert len(result.media) == 1
assert result.media[0].filename == "profile.jpg"
def test_download_profile_full(self, metadata, mock_user_response, mock_story_response):
def test_download_profile_full(self, metadata, mock_user_response, mock_story_response, mocker):
"""Test full profile download with stories/posts"""
with patch.object(self.extractor, 'call_api') as mock_call, \
patch.object(self.extractor, 'download_all_posts') as mock_posts, \
patch.object(self.extractor, 'download_all_highlights') as mock_highlights, \
patch.object(self.extractor, 'download_all_tagged') as mock_tagged, \
patch.object(self.extractor, '_download_stories_reusable') as mock_stories:
mock_call = mocker.patch.object(self.extractor, 'call_api')
mock_posts = mocker.patch.object(self.extractor, 'download_all_posts')
mock_highlights = mocker.patch.object(self.extractor, 'download_all_highlights')
mock_tagged = mocker.patch.object(self.extractor, 'download_all_tagged')
mock_stories = mocker.patch.object(self.extractor, '_download_stories_reusable')
self.extractor.full_profile = True
mock_call.side_effect = [
mock_user_response,
mock_story_response
]
mock_highlights.return_value = None
mock_stories.return_value = mock_story_response
mock_posts.return_value = None
mock_tagged.return_value = None
self.extractor.full_profile = True
mock_call.side_effect = [
mock_user_response,
mock_story_response
]
mock_highlights.return_value = None
mock_stories.return_value = mock_story_response
mock_posts.return_value = None
mock_tagged.return_value = None
result = self.extractor.download_profile(metadata, "test_user")
assert result.get("#stories") == len(mock_story_response)
mock_posts.assert_called_once_with(result, "123")
assert "errors" not in result.metadata
result = self.extractor.download_profile(metadata, "test_user")
assert result.get("#stories") == len(mock_story_response)
mock_posts.assert_called_once_with(result, "123")
assert "errors" not in result.metadata
def test_download_profile_not_found(self, metadata):
def test_download_profile_not_found(self, metadata, mocker):
"""Test profile not found error"""
with patch.object(self.extractor, 'call_api') as mock_call:
mock_call.return_value = {"user": None}
with pytest.raises(AssertionError) as exc_info:
self.extractor.download_profile(metadata, "invalid_user")
assert "User invalid_user not found" in str(exc_info.value)
mock_call = mocker.patch.object(self.extractor, 'call_api')
mock_call.return_value = {"user": None}
with pytest.raises(AssertionError) as exc_info:
self.extractor.download_profile(metadata, "invalid_user")
assert "User invalid_user not found" in str(exc_info.value)
def test_download_profile_error_handling(self, metadata, mock_user_response):
def test_download_profile_error_handling(self, metadata, mock_user_response, mocker):
"""Test error handling in full profile mode"""
with (patch.object(self.extractor, 'call_api') as mock_call, \
patch.object(self.extractor, 'download_all_highlights') as mock_highlights, \
patch.object(self.extractor, 'download_all_tagged') as mock_tagged, \
patch.object(self.extractor, '_download_stories_reusable') as stories_tagged, \
patch.object(self.extractor, 'download_all_posts') as mock_posts
):
self.extractor.full_profile = True
mock_call.side_effect = [
mock_user_response,
Exception("Stories API failed"),
Exception("Posts API failed")
]
mock_highlights.return_value = None
mock_tagged.return_value = None
stories_tagged.return_value = None
mock_posts.return_value = None
result = self.extractor.download_profile(metadata, "test_user")
mock_call = mocker.patch.object(self.extractor, 'call_api')
mock_highlights = mocker.patch.object(self.extractor, 'download_all_highlights')
mock_tagged = mocker.patch.object(self.extractor, 'download_all_tagged')
stories_tagged = mocker.patch.object(self.extractor, '_download_stories_reusable')
mock_posts = mocker.patch.object(self.extractor, 'download_all_posts')
assert result.is_success()
assert "Error downloading stories for test_user" in result.metadata["errors"]
self.extractor.full_profile = True
mock_call.side_effect = [
mock_user_response,
Exception("Stories API failed"),
Exception("Posts API failed")
]
mock_highlights.return_value = None
mock_tagged.return_value = None
stories_tagged.return_value = None
mock_posts.return_value = None
result = self.extractor.download_profile(metadata, "test_user")
assert result.is_success()
assert "Error downloading stories for test_user" in result.metadata["errors"]

View File

@@ -1,94 +1,108 @@
import os
from typing import Type
from unittest.mock import patch, MagicMock
import pytest
from auto_archiver.core import Metadata
from auto_archiver.core.extractor import Extractor
from auto_archiver.modules.instagram_tbot_extractor import InstagramTbotExtractor
from tests.extractors.test_extractor_base import TestExtractorBase
TESTFILES = os.path.join(os.path.dirname(__file__), "testfiles")
@pytest.fixture
def session_file(tmpdir):
"""Fixture to create a test session file."""
session_file = os.path.join(tmpdir, "test_session.session")
with open(session_file, "w") as f:
f.write("mock_session_data")
return session_file.replace(".session", "")
def patch_extractor_methods(request, setup_module, mocker):
mocker.patch.object(InstagramTbotExtractor, '_prepare_session_file', return_value=None)
mocker.patch.object(InstagramTbotExtractor, '_initialize_telegram_client', return_value=None)
yield
@pytest.fixture(autouse=True)
def patch_extractor_methods(request, setup_module):
with patch.object(InstagramTbotExtractor, '_prepare_session_file', return_value=None), \
patch.object(InstagramTbotExtractor, '_initialize_telegram_client', return_value=None):
if hasattr(request, 'cls') and hasattr(request.cls, 'config'):
request.cls.extractor = setup_module("instagram_tbot_extractor", request.cls.config)
yield
@pytest.fixture
def metadata_sample():
m = Metadata()
m.set_title("Test Title")
m.set_timestamp("2021-01-01T00:00:00Z")
m.set_timestamp("2021-01-01T00:00:00")
m.set_url("https://www.instagram.com/p/1234567890")
return m
class TestInstagramTbotExtractor:
@pytest.fixture
def mock_telegram_client(mocker):
"""Fixture to mock TelegramClient interactions."""
mock_client = mocker.patch("auto_archiver.modules.instagram_tbot_extractor.client")
instance = mocker.MagicMock()
mock_client.return_value = instance
return instance
@pytest.fixture
def extractor(setup_module, patch_extractor_methods, mocker):
extractor_module = "instagram_tbot_extractor"
extractor: InstagramTbotExtractor
config = {
"api_id": 12345,
"api_hash": "test_api_hash",
"session_file": "test_session",
"timeout": 4
}
extractor = setup_module(extractor_module, config)
extractor.client = mocker.MagicMock()
extractor.session_file = "test_session"
return extractor
def test_non_instagram_url(extractor, metadata_sample):
metadata_sample.set_url("https://www.youtube.com")
assert extractor.download(metadata_sample) is False
def test_download_success(extractor, metadata_sample, mocker):
mocker.patch.object(extractor, "_send_url_to_bot", return_value=(mocker.MagicMock(), 101))
mocker.patch.object(extractor, "_process_messages", return_value="Sample Instagram post caption")
result = extractor.download(metadata_sample)
assert result.is_success()
assert result.status == "insta-via-bot: success"
assert result.metadata.get("title") == "Sample Instagram post caption"
def test_download_invalid(extractor, metadata_sample, mocker):
mocker.patch.object(extractor, "_send_url_to_bot", return_value=(mocker.MagicMock(), 101))
mocker.patch.object(extractor, "_process_messages", return_value="You must enter a URL to a post")
assert extractor.download(metadata_sample) is False
@pytest.mark.skip(reason="Requires authentication.")
class TestInstagramTbotExtractorReal(TestExtractorBase):
# To run these tests set the TELEGRAM_API_ID and TELEGRAM_API_HASH environment variables, and ensure the session file exists.
# Note these are true at this point in time, but changes to source media could be reason for failure.
extractor_module = "instagram_tbot_extractor"
extractor: InstagramTbotExtractor
config = {
"api_id": os.environ.get("TELEGRAM_API_ID"),
"api_hash": os.environ.get("TELEGRAM_API_HASH"),
"session_file": "secrets/anon-insta",
}
@pytest.fixture
def mock_telegram_client(self):
"""Fixture to mock TelegramClient interactions."""
with patch("auto_archiver.modules.instagram_tbot_extractor._initialize_telegram_client") as mock_client:
instance = MagicMock()
mock_client.return_value = instance
yield instance
def test_extractor_is_initialized(self):
assert self.extractor is not None
@patch("time.sleep")
@pytest.mark.parametrize("url, expected_status, bot_responses", [
("https://www.instagram.com/p/C4QgLbrIKXG", "insta-via-bot: success", [MagicMock(id=101, media=None, message="Are you new to Bellingcat? - The way we share our investigations is different. 💭\nWe want you to read our story but also learn ou")]),
("https://www.instagram.com/reel/DEVLK8qoIbg/", "insta-via-bot: success", [MagicMock(id=101, media=None, message="Our volunteer community is at the centre of many incredible Bellingcat investigations and tools. Stephanie Ladel is one such vol")]),
# todo tbot not working for stories :(
("https://www.instagram.com/stories/bellingcatofficial/3556336382743057476/", False, [MagicMock(id=101, media=None, message="Media not found or unavailable")]),
("https://www.youtube.com/watch?v=ymCMy8OffHM", False, []),
("https://www.instagram.com/p/INVALID", False, [MagicMock(id=101, media=None, message="You must enter a URL to a post")]),
@pytest.mark.parametrize("url, expected_status, message, len_media", [
("https://www.instagram.com/p/C4QgLbrIKXG", "insta-via-bot: success",
"Are you new to Bellingcat? - The way we share our investigations is different. 💭\nWe want you to read our story but also learn ou",
6),
("https://www.instagram.com/reel/DEVLK8qoIbg/", "insta-via-bot: success",
"Our volunteer community is at the centre of many incredible Bellingcat investigations and tools. Stephanie Ladel is one such vol",
3),
# instagram tbot not working (potentially intermittently?) for stories - replace with a live story to retest
# ("https://www.instagram.com/stories/bellingcatofficial/3556336382743057476/", False, "Media not found or unavailable"),
# Seems to be working intermittently for highlights
# ("https://www.instagram.com/stories/highlights/17868810693068139/", "insta-via-bot: success", None, 50),
# Marking invalid url as success
("https://www.instagram.com/p/INVALID", "insta-via-bot: success", "Media not found or unavailable", 0),
("https://www.youtube.com/watch?v=ymCMy8OffHM", False, None, 0),
])
def test_download(self, mock_sleep, url, expected_status, bot_responses, metadata_sample):
def test_download(self, url, expected_status, message, len_media, metadata_sample):
"""Test the `download()` method with various Instagram URLs."""
metadata_sample.set_url(url)
self.extractor.client = MagicMock()
result = self.extractor.download(metadata_sample)
pass
# TODO fully mock or use as authenticated test
# if expected_status:
# assert result.is_success()
# assert result.status == expected_status
# assert result.metadata.get("title") in [msg.message[:128] for msg in bot_responses if msg.message]
# else:
# assert result is False
# Test story
# Test expired story
# Test requires login/ access (?)
# Test post
# Test multiple images?
if expected_status:
assert result.is_success()
assert result.status == expected_status
assert result.metadata.get("title") == message
assert len(result.media) == len_media
else:
assert result is False

View File

@@ -0,0 +1,108 @@
import pytest
from auto_archiver.modules.atlos_feeder import AtlosFeeder
class FakeAPIResponse:
"""Simulate a response object."""
def __init__(self, data: dict, raise_error: bool = False) -> None:
self._data = data
self.raise_error = raise_error
def json(self) -> dict:
return self._data
def raise_for_status(self) -> None:
if self.raise_error:
raise Exception("HTTP error")
@pytest.fixture
def atlos_feeder(setup_module) -> AtlosFeeder:
"""Fixture for AtlosFeeder."""
configs: dict = {
"api_token": "abc123",
"atlos_url": "https://platform.atlos.org",
}
return setup_module("atlos_feeder", configs)
@pytest.fixture
def mock_atlos_api(mocker):
"""Fixture to mock requests to Atlos API."""
def _mock_responses(responses):
mocker.patch(
"requests.get",
side_effect=[FakeAPIResponse(data) for data in responses],
)
return _mock_responses
def test_atlos_feeder_iter_yields_valid_metadata(atlos_feeder, mock_atlos_api):
"""Test valid items are yielded and invalid ones ignored."""
mock_atlos_api([
{
"next": None,
"results": [
{"source_url": "http://example.com", "id": 1,
"metadata": {"auto_archiver": {"processed": False}},
"visibility": "visible", "status": "complete"},
{"source_url": "", "id": 2,
"metadata": {"auto_archiver": {"processed": False}},
"visibility": "visible", "status": "complete"},
{"source_url": "http://example.org", "id": 3,
"metadata": {"auto_archiver": {"processed": True}},
"visibility": "visible", "status": "complete"},
],
}
])
items = list(atlos_feeder)
assert len(items) == 1
assert items[0].get_url() == "http://example.com"
assert items[0].get("atlos_id") == 1
def test_atlos_feeder_multiple_pages(atlos_feeder, mock_atlos_api):
"""Test iteration over multiple pages with valid items."""
mock_atlos_api([
{
"next": "cursor2",
"results": [
{"source_url": "http://example1.com", "id": 10,
"metadata": {"auto_archiver": {"processed": False}},
"visibility": "visible", "status": "complete"},
],
},
{
"next": None,
"results": [
{"source_url": "http://example2.com", "id": 20,
"metadata": {"auto_archiver": {"processed": False}},
"visibility": "visible", "status": "complete"},
],
},
])
items = list(atlos_feeder)
assert len(items) == 2
assert items[0].get_url() == "http://example1.com"
assert items[0].get("atlos_id") == 10
assert items[1].get_url() == "http://example2.com"
assert items[1].get("atlos_id") == 20
def test_atlos_feeder_no_results(atlos_feeder, mock_atlos_api):
"""Test iteration stops when no results are returned."""
mock_atlos_api([{"next": None, "results": []}])
assert list(atlos_feeder) == []
def test_atlos_feeder_http_error(atlos_feeder, mocker):
"""Test raises an exception on HTTP error."""
mocker.patch(
"requests.get",
return_value=FakeAPIResponse({"next": None, "results": []}, raise_error=True),
)
with pytest.raises(Exception, match="HTTP error"):
list(atlos_feeder)

View File

@@ -2,27 +2,23 @@ from typing import Type
import gspread
import pytest
from unittest.mock import patch, MagicMock
from auto_archiver.modules.gsheet_feeder import GsheetsFeeder
from auto_archiver.core import Metadata, Feeder
def test_setup_without_sheet_and_sheet_id(setup_module):
def test_setup_without_sheet_and_sheet_id(setup_module, mocker):
# Ensure setup() raises AssertionError if neither sheet nor sheet_id is set.
with patch("gspread.service_account"):
with pytest.raises(AssertionError):
setup_module(
"gsheet_feeder",
{"service_account": "dummy.json", "sheet": None, "sheet_id": None},
)
mocker.patch("gspread.service_account")
with pytest.raises(AssertionError):
setup_module(
"gsheet_feeder",
{"service_account": "dummy.json", "sheet": None, "sheet_id": None},
)
@pytest.fixture
def gsheet_feeder(setup_module) -> GsheetsFeeder:
with patch("gspread.service_account"):
feeder = setup_module(
"gsheet_feeder",
{
def gsheet_feeder(setup_module, mocker) -> GsheetsFeeder:
config: dict = {
"service_account": "dummy.json",
"sheet": "test-auto-archiver",
"sheet_id": None,
@@ -46,9 +42,13 @@ def gsheet_feeder(setup_module) -> GsheetsFeeder:
"allow_worksheets": set(),
"block_worksheets": set(),
"use_sheet_names_in_stored_paths": True,
},
)
feeder.gsheets_client = MagicMock()
}
mocker.patch("gspread.service_account")
feeder = setup_module(
"gsheet_feeder",
config
)
feeder.gsheets_client = mocker.MagicMock()
return feeder
@@ -129,56 +129,56 @@ def test__set_metadata_with_folder(gsheet_feeder: GsheetsFeeder):
],
)
def test_open_sheet_with_name_or_id(
setup_module, sheet, sheet_id, expected_method, expected_arg, description
setup_module, sheet, sheet_id, expected_method, expected_arg, description, mocker
):
"""Ensure open_sheet() correctly opens by name or ID based on configuration."""
with patch("gspread.service_account") as mock_service_account:
mock_client = MagicMock()
mock_service_account.return_value = mock_client
mock_client.open.return_value = "MockSheet"
mock_client.open_by_key.return_value = "MockSheet"
mock_service_account = mocker.patch("gspread.service_account")
mock_client = mocker.MagicMock()
mock_service_account.return_value = mock_client
mock_client.open.return_value = "MockSheet"
mock_client.open_by_key.return_value = "MockSheet"
# Setup module with parameterized values
feeder = setup_module(
"gsheet_feeder",
{"service_account": "dummy.json", "sheet": sheet, "sheet_id": sheet_id},
)
sheet_result = feeder.open_sheet()
# Validate the correct method was called
getattr(mock_client, expected_method).assert_called_once_with(
expected_arg
), f"Failed: {description}"
assert sheet_result == "MockSheet", f"Failed: {description}"
# Setup module with parameterized values
feeder = setup_module(
"gsheet_feeder",
{"service_account": "dummy.json", "sheet": sheet, "sheet_id": sheet_id},
)
sheet_result = feeder.open_sheet()
# Validate the correct method was called
getattr(mock_client, expected_method).assert_called_once_with(
expected_arg
), f"Failed: {description}"
assert sheet_result == "MockSheet", f"Failed: {description}"
@pytest.mark.usefixtures("setup_module")
def test_open_sheet_with_sheet_id(setup_module):
def test_open_sheet_with_sheet_id(setup_module, mocker):
"""Ensure open_sheet() correctly opens a sheet by ID."""
with patch("gspread.service_account") as mock_service_account:
mock_client = MagicMock()
mock_service_account.return_value = mock_client
mock_client.open_by_key.return_value = "MockSheet"
feeder = setup_module(
"gsheet_feeder",
{"service_account": "dummy.json", "sheet": None, "sheet_id": "ABC123"},
)
sheet = feeder.open_sheet()
mock_client.open_by_key.assert_called_once_with("ABC123")
assert sheet == "MockSheet"
mock_service_account = mocker.patch("gspread.service_account")
mock_client = mocker.MagicMock()
mock_service_account.return_value = mock_client
mock_client.open_by_key.return_value = "MockSheet"
feeder = setup_module(
"gsheet_feeder",
{"service_account": "dummy.json", "sheet": None, "sheet_id": "ABC123"},
)
sheet = feeder.open_sheet()
mock_client.open_by_key.assert_called_once_with("ABC123")
assert sheet == "MockSheet"
def test_should_process_sheet(setup_module):
with patch("gspread.service_account"):
gdb = setup_module(
"gsheet_feeder",
{
"service_account": "dummy.json",
"sheet": "TestSheet",
"sheet_id": None,
"allow_worksheets": {"TestSheet", "Sheet2"},
"block_worksheets": {"Sheet3"},
},
)
def test_should_process_sheet(setup_module, mocker):
mocker.patch("gspread.service_account")
gdb = setup_module(
"gsheet_feeder",
{
"service_account": "dummy.json",
"sheet": "TestSheet",
"sheet_id": None,
"allow_worksheets": {"TestSheet", "Sheet2"},
"block_worksheets": {"Sheet3"},
},
)
assert gdb.should_process_sheet("TestSheet") == True
assert gdb.should_process_sheet("Sheet3") == False
# False if allow_worksheets is set

View File

@@ -1,13 +1,13 @@
# Note this isn't a feeder, but contained as utility of the gsheet feeder module
import pytest
from unittest.mock import MagicMock
from auto_archiver.modules.gsheet_feeder import GWorksheet
class TestGWorksheet:
@pytest.fixture
def mock_worksheet(self):
mock_ws = MagicMock()
def mock_worksheet(self, mocker):
mock_ws = mocker.MagicMock()
mock_ws.get_values.return_value = [
["Link", "Archive Status", "Archive Location", "Archive Date"],
["url1", "archived", "filepath1", "2023-01-01"],
@@ -136,8 +136,8 @@ class TestGWorksheet:
assert gworksheet.to_a1(row, col) == expected
# Test empty worksheet
def test_empty_worksheet_initialization(self):
mock_ws = MagicMock()
def test_empty_worksheet_initialization(self, mocker):
mock_ws = mocker.MagicMock()
mock_ws.get_values.return_value = []
g = GWorksheet(mock_ws)
assert g.headers == []

View File

@@ -1,6 +1,5 @@
from typing import Type
import pytest
from unittest.mock import MagicMock, patch
from auto_archiver.core import Media
from auto_archiver.modules.s3_storage import S3Storage
@@ -11,7 +10,6 @@ class TestS3Storage:
"""
module_name: str = "s3_storage"
storage: Type[S3Storage]
s3: MagicMock
config: dict = {
"path_generator": "flat",
"filename_generator": "static",
@@ -25,13 +23,14 @@ class TestS3Storage:
"private": False,
}
@patch('boto3.client')
@pytest.fixture(autouse=True)
def setup_storage(self, setup_module):
def setup_storage(self, setup_module, mocker):
self.s3 = S3Storage()
self.storage = setup_module(self.module_name, self.config)
def test_client_initialization(self):
"""Test that S3 client is initialized with correct parameters"""
assert self.storage.s3 is not None
assert self.storage.s3.meta.region_name == 'test-region'
@@ -44,81 +43,63 @@ class TestS3Storage:
media.key = "another/path.jpg"
assert self.storage.get_cdn_url(media) == "https://cdn.example.com/another/path.jpg"
def test_uploadf_sets_acl_public(self):
def test_uploadf_sets_acl_public(self, mocker):
media = Media("test.txt")
mock_file = MagicMock()
with patch.object(self.storage.s3, 'upload_fileobj') as mock_s3_upload, \
patch.object(self.storage, 'is_upload_needed', return_value=True):
self.storage.uploadf(mock_file, media)
mock_s3_upload.assert_called_once_with(
mock_file,
Bucket='test-bucket',
Key=media.key,
ExtraArgs={'ACL': 'public-read', 'ContentType': 'text/plain'}
)
mock_file = mocker.MagicMock()
mock_s3_upload = mocker.patch.object(self.storage.s3, 'upload_fileobj')
mocker.patch.object(self.storage, 'is_upload_needed', return_value=True)
self.storage.uploadf(mock_file, media)
mock_s3_upload.assert_called_once_with(
mock_file,
Bucket='test-bucket',
Key=media.key,
ExtraArgs={'ACL': 'public-read', 'ContentType': 'text/plain'}
)
def test_upload_decision_logic(self):
def test_upload_decision_logic(self, mocker):
"""Test is_upload_needed under different conditions"""
media = Media("test.txt")
# Test default state (random_no_duplicate=False)
assert self.storage.is_upload_needed(media) is True
# Set duplicate checking config to true:
self.storage.random_no_duplicate = True
with patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash') as mock_calc_hash, \
patch.object(self.storage, 'file_in_folder') as mock_file_in_folder:
mock_calc_hash.return_value = 'beepboop123beepboop123beepboop123'
mock_file_in_folder.return_value = 'existing_key.txt'
# Test duplicate result
assert self.storage.is_upload_needed(media) is False
assert media.key == 'existing_key.txt'
mock_file_in_folder.assert_called_with(
# (first 24 chars of hash)
'no-dups/beepboop123beepboop123be'
)
mock_calc_hash = mocker.patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash', return_value='beepboop123beepboop123beepboop123')
mock_file_in_folder = mocker.patch.object(self.storage, 'file_in_folder', return_value='existing_key.txt')
assert self.storage.is_upload_needed(media) is False
assert media.key == 'existing_key.txt'
mock_file_in_folder.assert_called_with('no-dups/beepboop123beepboop123be')
@patch.object(S3Storage, 'file_in_folder')
def test_skips_upload_when_duplicate_exists(self, mock_file_in_folder):
def test_skips_upload_when_duplicate_exists(self, mocker):
"""Test that upload skips when file_in_folder finds existing object"""
self.storage.random_no_duplicate = True
mock_file_in_folder.return_value = "existing_folder/existing_file.txt"
# Create test media with calculated hash
mock_file_in_folder = mocker.patch.object(S3Storage, 'file_in_folder', return_value="existing_folder/existing_file.txt")
media = Media("test.txt")
media.key = "original_path.txt"
with patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash') as mock_calculate_hash:
mock_calculate_hash.return_value = "beepboop123beepboop123beepboop123"
# Verify upload
assert self.storage.is_upload_needed(media) is False
assert media.key == "existing_folder/existing_file.txt"
assert media.get("previously archived") is True
with patch.object(self.storage.s3, 'upload_fileobj') as mock_upload:
result = self.storage.uploadf(None, media)
mock_upload.assert_not_called()
assert result is True
mock_calculate_hash = mocker.patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash', return_value="beepboop123beepboop123beepboop123")
assert self.storage.is_upload_needed(media) is False
assert media.key == "existing_folder/existing_file.txt"
assert media.get("previously archived") is True
mock_upload = mocker.patch.object(self.storage.s3, 'upload_fileobj')
result = self.storage.uploadf(None, media)
mock_upload.assert_not_called()
assert result is True
@patch.object(S3Storage, 'is_upload_needed')
def test_uploads_with_correct_parameters(self, mock_upload_needed):
def test_uploads_with_correct_parameters(self, mocker):
media = Media("test.txt")
media.key = "original_key.txt"
mock_upload_needed.return_value = True
mocker.patch.object(S3Storage, 'is_upload_needed', return_value=True)
media.mimetype = 'image/png'
mock_file = MagicMock()
mock_file = mocker.MagicMock()
mock_upload = mocker.patch.object(self.storage.s3, 'upload_fileobj')
self.storage.uploadf(mock_file, media)
mock_upload.assert_called_once_with(
mock_file,
Bucket='test-bucket',
Key='original_key.txt',
ExtraArgs={
'ACL': 'public-read',
'ContentType': 'image/png'
}
)
with patch.object(self.storage.s3, 'upload_fileobj') as mock_upload:
self.storage.uploadf(mock_file, media)
# verify call occured with these params
mock_upload.assert_called_once_with(
mock_file,
Bucket='test-bucket',
Key='original_key.txt',
ExtraArgs={
'ACL': 'public-read',
'ContentType': 'image/png'
}
)
def test_file_in_folder_exists(self):
with patch.object(self.storage.s3, 'list_objects') as mock_list_objects:
mock_list_objects.return_value = {'Contents': [{'Key': 'path/to/file.txt'}]}
assert self.storage.file_in_folder('path/to/') == 'path/to/file.txt'
def test_file_in_folder_exists(self, mocker):
mock_list_objects = mocker.patch.object(self.storage.s3, 'list_objects', return_value={'Contents': [{'Key': 'path/to/file.txt'}]})
assert self.storage.file_in_folder('path/to/') == 'path/to/file.txt'

View File

@@ -0,0 +1,142 @@
import os
import hashlib
import pytest
from auto_archiver.core import Media, Metadata
from auto_archiver.modules.atlos_storage import AtlosStorage
class FakeAPIResponse:
"""Simulate a response object."""
def __init__(self, data: dict, raise_error: bool = False) -> None:
self._data = data
self.raise_error = raise_error
def json(self) -> dict:
return self._data
def raise_for_status(self) -> None:
if self.raise_error:
raise Exception("HTTP error")
@pytest.fixture
def atlos_storage(setup_module) -> AtlosStorage:
"""Fixture for AtlosStorage."""
configs: dict = {
"api_token": "abc123",
"atlos_url": "https://platform.atlos.org",
}
return setup_module("atlos_storage", configs)
@pytest.fixture
def media(tmp_path) -> Media:
"""Fixture for Media."""
content = b"media content"
file_path = tmp_path / "media.txt"
file_path.write_bytes(content)
media = Media(filename=str(file_path))
media.properties = {"something": "Title"}
media.key = "key"
return media
def test_get_cdn_url(atlos_storage: AtlosStorage) -> None:
"""Test get_cdn_url returns the configured atlos_url."""
media = Media(filename="dummy.mp4")
url = atlos_storage.get_cdn_url(media)
assert url == atlos_storage.atlos_url
def test_hash(tmp_path, atlos_storage: AtlosStorage) -> None:
"""Test _hash() computes the correct SHA-256 hash of a file."""
content = b"hello world"
file_path = tmp_path / "test.txt"
file_path.write_bytes(content)
media = Media(filename="dummy.mp4")
media.filename = str(file_path)
expected_hash = hashlib.sha256(content).hexdigest()
assert atlos_storage._hash(media) == expected_hash
def test_upload_no_atlos_id(tmp_path, atlos_storage: AtlosStorage, media: Media, mocker) -> None:
"""Test upload() returns False when metadata lacks atlos_id."""
metadata = Metadata() # atlos_id not set
post_mock = mocker.patch("requests.post")
result = atlos_storage.upload(media, metadata)
assert result is False
post_mock.assert_not_called()
def test_upload_already_uploaded(atlos_storage: AtlosStorage,
metadata: Metadata,
media: Media,
tmp_path,
mocker) -> None:
"""Test upload() returns True if media hash already exists."""
content = b"media content"
metadata.set("atlos_id", 101)
media_hash = hashlib.sha256(content).hexdigest()
fake_get = FakeAPIResponse({
"result": {"artifacts": [{"file_hash_sha256": media_hash}]}
})
get_mock = mocker.patch("requests.get", return_value=fake_get)
post_mock = mocker.patch("requests.post")
result = atlos_storage.upload(media, metadata)
assert result is True
get_mock.assert_called_once()
post_mock.assert_not_called()
def test_upload_not_uploaded(tmp_path, atlos_storage: AtlosStorage,
metadata: Metadata,
media: Media,
mocker) -> None:
"""Test upload() uploads media when not already present."""
metadata.set("atlos_id", 202)
fake_get = FakeAPIResponse({
"result": {"artifacts": [{"file_hash_sha256": "different_hash"}]}
})
get_mock = mocker.patch("requests.get", return_value=fake_get)
fake_post = FakeAPIResponse({}, raise_error=False)
post_mock = mocker.patch("requests.post", return_value=fake_post)
result = atlos_storage.upload(media, metadata)
assert result is True
get_mock.assert_called_once()
post_mock.assert_called_once()
expected_url = f"{atlos_storage.atlos_url}/api/v2/source_material/upload/202"
expected_headers = {"Authorization": f"Bearer {atlos_storage.api_token}"}
expected_params = {"title": media.properties}
call_kwargs = post_mock.call_args.kwargs
assert call_kwargs["headers"] == expected_headers
assert call_kwargs["params"] == expected_params
# Verify the URL passed to requests.post.
posted_url = call_kwargs.get("url") or post_mock.call_args.args[0]
assert posted_url == expected_url
# Verify files parameter contains the correct filename.
file_tuple = call_kwargs["files"]["file"]
assert file_tuple[0] == os.path.basename(media.filename)
def test_upload_post_http_error(tmp_path,
atlos_storage: AtlosStorage,
metadata: Metadata,
media: Media,
mocker) -> None:
"""Test upload() propagates HTTP error during POST."""
metadata.set("atlos_id", 303)
fake_get = FakeAPIResponse({
"result": {"artifacts": []}
})
mocker.patch("requests.get", return_value=fake_get)
fake_post = FakeAPIResponse({}, raise_error=True)
mocker.patch("requests.post", return_value=fake_post)
with pytest.raises(Exception, match="HTTP error"):
atlos_storage.upload(media, metadata)
def test_uploadf_not_implemented(atlos_storage: AtlosStorage) -> None:
"""Test uploadf() returns None (not implemented)."""
result = atlos_storage.uploadf(None, "dummy")
assert result is None

View File

@@ -1,44 +1,57 @@
from typing import Type
import pytest
from unittest.mock import MagicMock, patch
from oauth2client import service_account
from auto_archiver.core import Media
from auto_archiver.modules.gdrive_storage import GDriveStorage
from auto_archiver.core.metadata import Metadata
from tests.storages.test_storage_base import TestStorageBase
class TestGDriveStorage:
"""
Test suite for GDriveStorage.
"""
@pytest.fixture
def gdrive_storage(setup_module, mocker):
module_name: str = "gdrive_storage"
storage: Type[GDriveStorage]
storage: GDriveStorage
config: dict = {'path_generator': 'url',
'filename_generator': 'static',
'root_folder_id': "fake_root_folder_id",
'oauth_token': None,
'service_account': 'fake_service_account.json'
}
@pytest.fixture(autouse=True)
def gdrive(self, setup_module):
with patch('google.oauth2.service_account.Credentials.from_service_account_file') as mock_creds:
self.storage = setup_module(self.module_name, self.config)
def test_initialize_fails_with_non_existent_creds(self):
"""
Test that the Google Drive service raises a FileNotFoundError when the service account file does not exist.
"""
# Act and Assert
with pytest.raises(FileNotFoundError) as exc_info:
self.storage.setup()
assert "No such file or directory" in str(exc_info.value)
mocker.patch('google.oauth2.service_account.Credentials.from_service_account_file')
return setup_module(module_name, config)
def test_path_parts(self):
media = Media(filename="test.jpg")
media.key = "folder1/folder2/test.jpg"
def test_initialize_fails_with_non_existent_creds(setup_module):
"""Test that the Google Drive service raises a FileNotFoundError when the service account file does not exist.
(and isn't mocked)
"""
config: dict = {'path_generator': 'url',
'filename_generator': 'static',
'root_folder_id': "fake_root_folder_id",
'oauth_token': None,
'service_account': 'fake_service_account.json'
}
with pytest.raises(FileNotFoundError) as exc_info:
setup_module("gdrive_storage", config)
assert "No such file or directory" in str(exc_info.value)
def test_get_id_from_parent_and_name(gdrive_storage, mocker):
"""Test _get_id_from_parent_and_name returns correct id from an API result."""
fake_list = mocker.MagicMock()
fake_list.execute.return_value = {"files": [{"id": "123", "name": "testname"}]}
fake_service = mocker.MagicMock()
# mock the files.list return value
fake_service.files.return_value.list.return_value = fake_list
gdrive_storage.service = fake_service
result = gdrive_storage._get_id_from_parent_and_name("parent", "mock", retries=1, use_mime_type=False)
assert result == "123"
def test_path_parts():
media = Media(filename="test.jpg")
media.key = "folder1/folder2/test.jpg"
@pytest.mark.skip(reason="Requires real credentials")

View File

@@ -0,0 +1,54 @@
import os
from pathlib import Path
import pytest
from auto_archiver.core import Media
from auto_archiver.modules.local_storage import LocalStorage
@pytest.fixture
def local_storage(setup_module) -> LocalStorage:
configs: dict = {
"path_generator": "flat",
"filename_generator": "static",
"save_to": "./local_archive",
"save_absolute": False,
}
return setup_module("local_storage", configs)
@pytest.fixture
def sample_media(tmp_path) -> Media:
"""Fixture creating a Media object with temporary source file"""
src_file = tmp_path / "source.txt"
src_file.write_text("test content")
return Media(key="subdir/test.txt", filename=str(src_file))
def test_get_cdn_url_relative(local_storage):
media = Media(key="test.txt", filename="dummy.txt")
expected = os.path.join(local_storage.save_to, media.key)
assert local_storage.get_cdn_url(media) == expected
def test_get_cdn_url_absolute(local_storage):
media = Media(key="test.txt", filename="dummy.txt")
local_storage.save_absolute = True
expected = os.path.abspath(os.path.join(local_storage.save_to, media.key))
assert local_storage.get_cdn_url(media) == expected
def test_upload_file_contents_and_metadata(local_storage, sample_media):
dest = os.path.join(local_storage.save_to, sample_media.key)
assert local_storage.upload(sample_media) is True
assert Path(sample_media.filename).read_text() == Path(dest).read_text()
def test_upload_nonexistent_source(local_storage):
media = Media(key="missing.txt", filename="nonexistent.txt")
with pytest.raises(FileNotFoundError):
local_storage.upload(media)

View File

@@ -162,4 +162,25 @@ def test_get_context():
def test_choose_most_complete():
pass
m_more = Metadata()
m_more.set_title("Title 1")
m_more.set_content("Content 1")
m_more.set_url("https://example.com")
m_less = Metadata()
m_less.set_title("Title 2")
m_less.set_content("Content 2")
m_less.set_url("https://example.com")
m_less.set_context("key", "value")
res = Metadata.choose_most_complete([m_more, m_less])
assert res.metadata.get("title") == "Title 1"
def test_choose_most_complete_from_pickles(unpickle):
# test most complete from pickles before and after an enricher has run
# Only compares length of media, not the actual media
m_before_enriching = unpickle("metadata_enricher_ytshort_input.pickle")
m_after_enriching = unpickle("metadata_enricher_ytshort_expected.pickle")
# Iterates `for r in results[1:]:`
res = Metadata.choose_most_complete([Metadata(), m_after_enriching, m_before_enriching])
assert res.media == m_after_enriching.media

View File

@@ -1,24 +1,18 @@
import sys
import pytest
from auto_archiver.core.module import get_module_lazy, BaseModule, LazyBaseModule, _LAZY_LOADED_MODULES
from auto_archiver.core.module import ModuleFactory, LazyBaseModule
from auto_archiver.core.base_module import BaseModule
@pytest.fixture
def example_module():
import auto_archiver
module_factory = ModuleFactory()
previous_path = auto_archiver.modules.__path__
auto_archiver.modules.__path__.append("tests/data/test_modules/")
module = get_module_lazy("example_module")
yield module
# cleanup
try:
del module._manifest
except AttributeError:
pass
del _LAZY_LOADED_MODULES["example_module"]
sys.modules.pop("auto_archiver.modules.example_module.example_module", None)
auto_archiver.modules.__path__ = previous_path
return module_factory.get_module_lazy("example_module")
def test_get_module_lazy(example_module):
assert example_module.name == "example_module"
@@ -46,12 +40,14 @@ def test_module_dependency_check_loads_module(example_module):
# monkey patch the manifest to include a nonexistnet dependency
example_module.manifest["dependencies"]["python"] = ["hash_enricher"]
module_factory = example_module.module_factory
loaded_module = example_module.load({})
assert loaded_module is not None
# check the dependency is loaded
assert _LAZY_LOADED_MODULES["hash_enricher"] is not None
assert _LAZY_LOADED_MODULES["hash_enricher"]._instance is not None
assert module_factory._lazy_modules["hash_enricher"] is not None
assert module_factory._lazy_modules["hash_enricher"]._instance is not None
def test_load_module(example_module):
@@ -69,7 +65,7 @@ def test_load_module(example_module):
@pytest.mark.parametrize("module_name", ["local_storage", "generic_extractor", "html_formatter", "csv_db"])
def test_load_modules(module_name):
# test that specific modules can be loaded
module = get_module_lazy(module_name)
module = ModuleFactory().get_module_lazy(module_name)
assert module is not None
assert isinstance(module, LazyBaseModule)
assert module.name == module_name
@@ -86,7 +82,7 @@ def test_load_modules(module_name):
@pytest.mark.parametrize("module_name", ["local_storage", "generic_extractor", "html_formatter", "csv_db"])
def test_lazy_base_module(module_name):
lazy_module = get_module_lazy(module_name)
lazy_module = ModuleFactory().get_module_lazy(module_name)
assert lazy_module is not None
assert isinstance(lazy_module, LazyBaseModule)

View File

@@ -4,7 +4,7 @@ from argparse import ArgumentParser, ArgumentTypeError
from auto_archiver.core.orchestrator import ArchivingOrchestrator
from auto_archiver.version import __version__
from auto_archiver.core.config import read_yaml, store_yaml
from auto_archiver.core.module import _LAZY_LOADED_MODULES
TEST_ORCHESTRATION = "tests/data/test_orchestration.yaml"
TEST_MODULES = "tests/data/test_modules/"
@@ -17,22 +17,7 @@ def test_args():
@pytest.fixture
def orchestrator():
yield ArchivingOrchestrator()
# hack - the loguru logger starts with one logger, but if orchestrator has run before
# it'll remove the default logger, add it back in:
from loguru import logger
if not logger._core.handlers.get(0):
logger._core.handlers_count = 0
logger.add(sys.stderr)
# and remove the custom logger
if logger._core.handlers.get(1):
logger.remove(1)
# delete out any loaded modules
_LAZY_LOADED_MODULES.clear()
return ArchivingOrchestrator()
@pytest.fixture
def basic_parser(orchestrator) -> ArgumentParser:

144
tests/utils/test_misc.py Normal file
View File

@@ -0,0 +1,144 @@
import hashlib
import json
from datetime import datetime, timezone
import pytest
from auto_archiver.utils.misc import (
mkdir_if_not_exists,
expand_url,
getattr_or,
DateTimeEncoder,
dump_payload,
get_datetime_from_str,
update_nested_dict,
calculate_file_hash,
random_str,
get_timestamp
)
@pytest.fixture
def sample_file(tmp_path):
file_path = tmp_path / "test.txt"
file_path.write_text("test content")
return file_path
class TestDirectoryUtils:
def test_mkdir_creates_new_directory(self, tmp_path):
new_dir = tmp_path / "new_folder"
mkdir_if_not_exists(new_dir)
assert new_dir.exists()
assert new_dir.is_dir()
def test_mkdir_exists_quietly(self, tmp_path):
existing_dir = tmp_path / "existing"
existing_dir.mkdir()
mkdir_if_not_exists(existing_dir)
assert existing_dir.exists()
class TestURLExpansion:
@pytest.mark.parametrize("input_url,expected", [
("https://example.com", "https://example.com"),
("https://t.co/test", "https://expanded.url")
])
def test_expand_url(self, input_url, expected, mocker):
mock_response = mocker.Mock()
mock_response.url = "https://expanded.url"
mocker.patch('requests.get', return_value=mock_response)
result = expand_url(input_url)
assert result == expected
def test_expand_url_handles_errors(self, caplog, mocker):
mocker.patch('requests.get', side_effect=Exception("Connection error"))
url = "https://t.co/error"
result = expand_url(url)
assert result == url
assert f"Failed to expand url {url}" in caplog.text
class TestAttributeHandling:
class Sample:
exists = "value"
none = None
@pytest.mark.parametrize("obj,attr,default,expected", [
(Sample(), "exists", "default", "value"),
(Sample(), "none", "default", "default"),
(Sample(), "missing", "default", "default"),
(None, "anything", "fallback", "fallback"),
])
def test_getattr_or(self, obj, attr, default, expected):
# Test gets attribute or returns a default value
assert getattr_or(obj, attr, default) == expected
class TestDateTimeHandling:
def test_datetime_encoder(self, sample_datetime):
result = json.dumps({"dt": sample_datetime}, cls=DateTimeEncoder)
loaded = json.loads(result)
assert loaded["dt"] == str(sample_datetime)
def test_dump_payload(self, sample_datetime):
payload = {"timestamp": sample_datetime}
result = dump_payload(payload)
assert str(sample_datetime) in result
@pytest.mark.parametrize("dt_str,fmt,expected", [
("2023-01-01 12:00:00+00:00", None, datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc)),
("20230101 120000", "%Y%m%d %H%M%S", datetime(2023, 1, 1, 12, 0)),
("invalid", None, None),
])
def test_datetime_from_string(self, dt_str, fmt, expected):
result = get_datetime_from_str(dt_str, fmt)
if expected is None:
assert result is None
else:
assert result == expected.replace(tzinfo=result.tzinfo)
class TestDictUtils:
@pytest.mark.parametrize("original,update,expected", [
({"a": 1}, {"b": 2}, {"a": 1, "b": 2}),
({"nested": {"a": 1}}, {"nested": {"b": 2}}, {"nested": {"a": 1, "b": 2}}),
({"a": {"b": {"c": 1}}}, {"a": {"b": {"c": 2}}}, {"a": {"b": {"c": 2}}}),
])
def test_update_nested_dict(self, original, update, expected):
update_nested_dict(original, update)
assert original == expected
class TestHashingUtils:
def test_file_hashing(self, sample_file):
expected = hashlib.sha256(b"test content").hexdigest()
assert calculate_file_hash(str(sample_file)) == expected
def test_large_file_hashing(self, tmp_path):
file_path = tmp_path / "large.bin"
content = b"0" * 16_000_000 * 2 # 32MB
file_path.write_bytes(content)
expected = hashlib.sha256(content).hexdigest()
assert calculate_file_hash(str(file_path)) == expected
class TestMiscUtils:
def test_random_str_length(self):
for length in [8, 16, 32]:
assert len(random_str(length)) == length
def test_random_str_raises_too_long(self):
with pytest.raises(AssertionError) as exc_info:
random_str(64)
assert "length must be less than 32 as UUID4 is used" == str(exc_info.value)
def test_random_str_uniqueness(self):
assert random_str() != random_str()
@pytest.mark.parametrize("ts_input,utc,iso,expected_type", [
(datetime.now(), True, True, str),
("2023-01-01T12:00:00+00:00", False, False, datetime),
(1672574400, True, True, str),
])
def test_timestamp_parsing(self, ts_input, utc, iso, expected_type):
result = get_timestamp(ts_input, utc=utc, iso=iso)
assert isinstance(result, expected_type)
def test_invalid_timestamp_returns_none(self):
assert get_timestamp("invalid-date") is None