mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-08 03:18:28 +03:00
Merge branch 'main' into timestamping_rewrite
This commit is contained in:
@@ -3,12 +3,14 @@ pytest conftest file, for shared fixtures and configuration
|
||||
"""
|
||||
import os
|
||||
import pickle
|
||||
from datetime import datetime, timezone
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import Dict, Tuple
|
||||
import hashlib
|
||||
|
||||
import pytest
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
from auto_archiver.core.module import get_module, _LAZY_LOADED_MODULES
|
||||
from auto_archiver.core.module import ModuleFactory
|
||||
|
||||
# Test names inserted into this list will be run last. This is useful for expensive/costly tests
|
||||
# that you only want to run if everything else succeeds (e.g. API calls). The order here is important
|
||||
@@ -20,19 +22,19 @@ TESTS_TO_RUN_LAST = ['test_twitter_api_archiver']
|
||||
def setup_module(request):
|
||||
def _setup_module(module_name, config={}):
|
||||
|
||||
module_factory = ModuleFactory()
|
||||
|
||||
if isinstance(module_name, type):
|
||||
# get the module name:
|
||||
# if the class does not have a .name, use the name of the parent folder
|
||||
module_name = module_name.__module__.rsplit(".",2)[-2]
|
||||
|
||||
m = get_module(module_name, {module_name: config})
|
||||
|
||||
m = module_factory.get_module(module_name, {module_name: config})
|
||||
# add the tmp_dir to the module
|
||||
tmp_dir = TemporaryDirectory()
|
||||
m.tmp_dir = tmp_dir.name
|
||||
|
||||
|
||||
def cleanup():
|
||||
_LAZY_LOADED_MODULES.pop(module_name)
|
||||
tmp_dir.cleanup()
|
||||
request.addfinalizer(cleanup)
|
||||
|
||||
@@ -122,10 +124,36 @@ def pytest_runtest_setup(item):
|
||||
def unpickle():
|
||||
"""
|
||||
Returns a helper function that unpickles a file
|
||||
** gets the file from the test_files directory: tests/data/test_files **
|
||||
** gets the file from the test_files directory: tests/data/ **
|
||||
"""
|
||||
def _unpickle(path):
|
||||
test_data_dir = os.path.join(os.path.dirname(__file__), "data", "test_files")
|
||||
with open(os.path.join(test_data_dir, path), "rb") as f:
|
||||
with open(os.path.join("tests/data", path), "rb") as f:
|
||||
return pickle.load(f)
|
||||
return _unpickle
|
||||
return _unpickle
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_binary_dependencies(mocker):
|
||||
mock_shutil_which = mocker.patch("shutil.which")
|
||||
# Mock all binary dependencies as available
|
||||
mock_shutil_which.return_value = "/usr/bin/fake_binary"
|
||||
return mock_shutil_which
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_datetime():
|
||||
return datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_sleep(mocker):
|
||||
"""Globally mock time.sleep to avoid delays."""
|
||||
return mocker.patch("time.sleep")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def metadata():
|
||||
metadata = Metadata()
|
||||
metadata.set("_processed_at", "2021-01-01T00:00:00")
|
||||
metadata.set_url("https://example.com")
|
||||
return metadata
|
||||
BIN
tests/data/metadata_enricher_exif.pickle
Normal file
BIN
tests/data/metadata_enricher_exif.pickle
Normal file
Binary file not shown.
BIN
tests/data/metadata_enricher_ytshort_expected.pickle
Normal file
BIN
tests/data/metadata_enricher_ytshort_expected.pickle
Normal file
Binary file not shown.
BIN
tests/data/metadata_enricher_ytshort_input.pickle
Normal file
BIN
tests/data/metadata_enricher_ytshort_input.pickle
Normal file
Binary file not shown.
@@ -1,11 +1,29 @@
|
||||
{
|
||||
# Display Name of your module
|
||||
"name": "Example Module",
|
||||
# The author of your module (optional)
|
||||
"author": "John Doe",
|
||||
# Optional version number, for your own versioning purposes
|
||||
"version": 2.0,
|
||||
# The type of the module, must be one (or more) of the built in module types
|
||||
"type": ["extractor", "feeder", "formatter", "storage", "enricher", "database"],
|
||||
# a boolean indicating whether or not a module requires additional user setup before it can be used
|
||||
# for example: adding API keys, installing additional software etc.
|
||||
"requires_setup": False,
|
||||
"dependencies": {"python": ["loguru"]
|
||||
},
|
||||
# a dictionary of dependencies for this module, that must be installed before the module is loaded.
|
||||
# Can be python dependencies (external packages, or other auto-archiver modules), or you can
|
||||
# provide external bin dependencies (e.g. ffmpeg, docker etc.)
|
||||
"dependencies": {
|
||||
"python": ["loguru"],
|
||||
"bin": ["bash"],
|
||||
},
|
||||
# configurations that this module takes. These are argparse-compliant dicationaries, that are
|
||||
# used to create command line arguments when the programme is run.
|
||||
# The full name of the config option will become: `module_name.config_name`
|
||||
"configs": {
|
||||
"csv_file": {"default": "db.csv", "help": "CSV file name"},
|
||||
"required_field": {"required": True, "help": "required field in the CSV file"},
|
||||
},
|
||||
# A description of the module, used for documentation
|
||||
"description": "This is an example module",
|
||||
}
|
||||
14
tests/data/test_service_account.json
Normal file
14
tests/data/test_service_account.json
Normal file
@@ -0,0 +1,14 @@
|
||||
{
|
||||
"type": "service_account",
|
||||
"project_id": "some-project-id",
|
||||
"private_key_id": "some-private-key-id",
|
||||
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQDPlcaFJgt7HzoC\n4z0b18PzI2R5c892mLnNwRO8DOKid5INt6z5RAWKDPdnIyHjRBx74qNZl6768pia\nztQNgnud7mKcmvOvGrpUbFx2BdAw8xTyAlRVMalOBhUS9RKvjP5WgSwR5EKwfvzy\nrGioC6ml/segz5EchSaIzgASwB17ir0w6IrymBxUeNelfzCGJpCRhqG5nG+eEjct\nUYU0QIyihRD1Lq0f3Z3D0xfTLLZ630iFBj/Wr0BCJHkl6hdVuGhnyn4S98sMX1Bd\ntaJF/lWi4jdt7SoXD3+FWv66kHPpFfINMpReuB9u0ogfYkORgiRBOMhYBkGGQjUG\nOnBTxEc3AgMBAAECgf9bKiK8DdSz0ALzQbRLhgj2B9485jHI49wjgINOyceZ23uS\nQYXaO+DFLcgLqBkVSGanuHMpU0+qCpeM0v9yXSTIW8RguWMnFd8ID/yLRktxfQa1\n1FAQh+NlF4/gnuUoM8N/FYSy6R5grfaxwU8Qfg66IQXUB52OezSVu5lxNO4G5Rwv\nJ2e/+XYBUv/H26BnQSmjFCzbJkdbtrOeThpaLwLexKcollvoHKGyus0jpWg4C9Ez\n9EJaE+on4nd+cM1Vd+dWaHXoZ9Db9IvxPBqFJE8fynap7RDBeZK678OuCvQntrp4\nrTsE9hW8073Jhl/LbhfbDC0lhFR0JUHygVGE01ECgYEA+g+ddpGGY90yhhM76bTr\nkU6WwislMmfS0WDdLPemNgzLwCtkC2vsQgzg/egxqkVF5dJ9upiFhVgpYxY7ap9U\nSGFemb6T1ASl/1yeNhd0yc4PZFsJ29k+kNgSIlJYm9KDCIMqS1wPoXvFQhbMitOf\n/gLCPugxl67c+qg6nfuODTkCgYEA1IPngESOJnV8oa2WReWrO6+u6xb/OhqdmBzI\n5yq1z3f5gb98XESZR/rCH2vAOmHIJPn3XdZHsznOuxhZwGr1oztiRIurLmBlxQoL\n7tq0jDOUVSD2yeyQwKt5LaBH94P598FiauGxXM4raREWKtcNBGoOX1u1+kEBsoL4\ntf10Z+8CgYEA3QFkB+ECR8y91KW3NAzEjj5JG/8J9wyv1IGpuQ5/hhG1Gni/CSEv\nRAkh6QaIrpZe+ooYuQwIJhwPKBYEGW4MDZSRCYzYFnCtTY5L/j6o55sJG4cipX3R\nwC5XiKIC0mUxjhpvDP+miPBdHNYNnT0AkH1btEF/YzIW+Coq9GnZ2HECgYAOOpax\ne+WYpZ0mphy9qVcBtA2eJ/gGx+ltWeAJuk5aCcpm6Y9GDkHFFAETYX+JaSqhbysk\n2UgLs/8nf8XioEa6GyvFMyTPAh1OSBHseDBGgt2XpZFgi7pVbCW87FJlPCzsbcJN\nLbdWY2d8rWwyihuRBBjaQaW5j8ixTxuf88xreQKBgQCST4Fr8C5CkpakTA+KOost\nLOlziUBm0534mTg7dTcOE1H1+gxtqpXlXcJylpGz1lUXRlHCIutN5iPJcN5cxFES\nsP7wBd7BhficsMKDiWPm9XbP2zXVZu0ldUxA1mONMsS1P4p7i3Dh4uzrRDmSkTUL\njUpppYDumg3oM7wSJ6sTQA==\n-----END PRIVATE KEY-----",
|
||||
"client_email": "some-email",
|
||||
"client_id": "some-client-email",
|
||||
"auth_uri": "https://example.com/o/oauth2/auth",
|
||||
"token_uri": "https://oauth2.example.com/token",
|
||||
"auth_provider_x509_cert_url": "https://www.example.com/oauth2/v1/certs",
|
||||
"client_x509_cert_url": "https://www.example.com/robot/v1/metadata/x509/some-email",
|
||||
"universe_domain": "example.com"
|
||||
}
|
||||
|
||||
59
tests/databases/test_api_db.py
Normal file
59
tests/databases/test_api_db.py
Normal file
@@ -0,0 +1,59 @@
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata
|
||||
from auto_archiver.modules.api_db import AAApiDb
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def api_db(setup_module):
|
||||
configs: dict = {
|
||||
"api_endpoint": "https://api.example.com",
|
||||
"api_token": "test-token",
|
||||
"public": False,
|
||||
"author_id": "Someone",
|
||||
"group_id": "123",
|
||||
"use_api_cache": True,
|
||||
"store_results": True,
|
||||
"tags": "[]",
|
||||
}
|
||||
return setup_module(AAApiDb, configs)
|
||||
|
||||
|
||||
def test_fetch_no_cache(api_db, metadata):
|
||||
# Test fetch
|
||||
api_db.use_api_cache = False
|
||||
assert api_db.fetch(metadata) is None
|
||||
|
||||
|
||||
def test_fetch_fail_status(api_db, metadata, mocker):
|
||||
# Test response fail in fetch method
|
||||
mock_get = mocker.patch("auto_archiver.modules.api_db.api_db.requests.get")
|
||||
mock_get.return_value.status_code = 400
|
||||
mock_get.return_value.json.return_value = {}
|
||||
mock_error = mocker.patch("loguru.logger.error")
|
||||
assert api_db.fetch(metadata) is False
|
||||
mock_error.assert_called_once_with("AA API FAIL (400): {}")
|
||||
|
||||
|
||||
def test_fetch(api_db, metadata, mocker):
|
||||
# Test successful fetch method
|
||||
mock_get = mocker.patch("auto_archiver.modules.api_db.api_db.requests.get")
|
||||
mock_datetime = mocker.patch("auto_archiver.core.metadata.datetime.datetime")
|
||||
mock_datetime.now.return_value = "2021-01-01T00:00:00"
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.json.return_value = [{"result": {}}, {"result":
|
||||
{'media': [], 'metadata': {'_processed_at': '2021-01-01T00:00:00', 'url': 'https://example.com'},
|
||||
'status': 'no archiver'}}]
|
||||
assert api_db.fetch(metadata) == metadata
|
||||
|
||||
|
||||
def test_done_success(api_db, metadata, mocker):
|
||||
mock_post = mocker.patch("auto_archiver.modules.api_db.api_db.requests.post")
|
||||
mock_post.return_value.status_code = 201
|
||||
api_db.done(metadata)
|
||||
mock_post.assert_called_once()
|
||||
mock_post.assert_called_once_with("https://api.example.com/interop/submit-archive",
|
||||
json={'author_id': 'Someone', 'url': 'https://example.com',
|
||||
'public': False, 'group_id': '123', 'tags': ['[', ']'], 'result': '{"status": "no archiver", "metadata": {"_processed_at": "2021-01-01T00:00:00", "url": "https://example.com"}, "media": []}'},
|
||||
headers={'Authorization': 'Bearer test-token'})
|
||||
|
||||
110
tests/databases/test_atlos_db.py
Normal file
110
tests/databases/test_atlos_db.py
Normal file
@@ -0,0 +1,110 @@
|
||||
import pytest
|
||||
from datetime import datetime
|
||||
|
||||
from auto_archiver.core import Metadata
|
||||
from auto_archiver.modules.atlos_db import AtlosDb
|
||||
|
||||
|
||||
class FakeAPIResponse:
|
||||
"""Simulate a response object."""
|
||||
|
||||
def __init__(self, data: dict, raise_error: bool = False) -> None:
|
||||
self._data = data
|
||||
self.raise_error = raise_error
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
if self.raise_error:
|
||||
raise Exception("HTTP error")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def atlos_db(setup_module) -> AtlosDb:
|
||||
"""Fixture for AtlosDb."""
|
||||
configs: dict = {
|
||||
"api_token": "abc123",
|
||||
"atlos_url": "https://platform.atlos.org",
|
||||
}
|
||||
return setup_module("atlos_db", configs)
|
||||
|
||||
|
||||
def test_failed_no_atlos_id(atlos_db, metadata, mocker):
|
||||
"""Test failed() skips posting when no atlos_id present."""
|
||||
post_mock = mocker.patch("requests.post")
|
||||
atlos_db.failed(metadata, "failure reason")
|
||||
post_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_failed_with_atlos_id(atlos_db, metadata, mocker):
|
||||
"""Test failed() posts failure when atlos_id is present."""
|
||||
metadata.set("atlos_id", 42)
|
||||
fake_resp = FakeAPIResponse({}, raise_error=False)
|
||||
post_mock = mocker.patch("requests.post", return_value=fake_resp)
|
||||
atlos_db.failed(metadata, "failure reason")
|
||||
expected_url = (
|
||||
f"{atlos_db.atlos_url}/api/v2/source_material/metadata/42/auto_archiver"
|
||||
)
|
||||
expected_headers = {"Authorization": f"Bearer {atlos_db.api_token}"}
|
||||
expected_json = {
|
||||
"metadata": {"processed": True, "status": "error", "error": "failure reason"}
|
||||
}
|
||||
post_mock.assert_called_once_with(
|
||||
expected_url, headers=expected_headers, json=expected_json
|
||||
)
|
||||
|
||||
|
||||
def test_failed_http_error(atlos_db, metadata, mocker):
|
||||
"""Test failed() raises exception on HTTP error."""
|
||||
metadata.set("atlos_id", 42)
|
||||
fake_resp = FakeAPIResponse({}, raise_error=True)
|
||||
mocker.patch("requests.post", return_value=fake_resp)
|
||||
with pytest.raises(Exception, match="HTTP error"):
|
||||
atlos_db.failed(metadata, "failure reason")
|
||||
|
||||
|
||||
def test_fetch_returns_false(atlos_db):
|
||||
"""Test fetch() always returns False."""
|
||||
item = Metadata()
|
||||
assert atlos_db.fetch(item) is False
|
||||
|
||||
|
||||
def test_done_no_atlos_id(atlos_db, mocker):
|
||||
"""Test done() skips posting when no atlos_id present."""
|
||||
item = Metadata().set_url("http://example.com")
|
||||
post_mock = mocker.patch("requests.post")
|
||||
atlos_db.done(item)
|
||||
post_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_done_with_atlos_id(atlos_db, metadata, mocker):
|
||||
"""Test done() posts success when atlos_id is present."""
|
||||
metadata.set("atlos_id", 99)
|
||||
now = datetime.now()
|
||||
metadata.set("timestamp", now)
|
||||
fake_resp = FakeAPIResponse({}, raise_error=False)
|
||||
post_mock = mocker.patch("requests.post", return_value=fake_resp)
|
||||
atlos_db.done(metadata)
|
||||
expected_url = (
|
||||
f"{atlos_db.atlos_url}/api/v2/source_material/metadata/99/auto_archiver"
|
||||
)
|
||||
expected_headers = {"Authorization": f"Bearer {atlos_db.api_token}"}
|
||||
expected_results = metadata.metadata.copy()
|
||||
expected_results["timestamp"] = now.isoformat()
|
||||
expected_json = {
|
||||
"metadata": {
|
||||
"processed": True,
|
||||
"status": "success",
|
||||
"results": expected_results,
|
||||
}
|
||||
}
|
||||
post_mock.assert_called_once_with(
|
||||
expected_url, headers=expected_headers, json=expected_json
|
||||
)
|
||||
|
||||
|
||||
def test_done_http_error(atlos_db, metadata, mocker):
|
||||
"""Test done() raises exception on HTTP error."""
|
||||
metadata.set("atlos_id", 123)
|
||||
fake_resp = FakeAPIResponse({}, raise_error=True)
|
||||
mocker.patch("requests.post", return_value=fake_resp)
|
||||
with pytest.raises(Exception, match="HTTP error"):
|
||||
atlos_db.done(metadata)
|
||||
@@ -1,6 +1,4 @@
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
@@ -9,8 +7,8 @@ from auto_archiver.modules.gsheet_feeder import GWorksheet
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_gworksheet():
|
||||
mock_gworksheet = MagicMock(spec=GWorksheet)
|
||||
def mock_gworksheet(mocker):
|
||||
mock_gworksheet = mocker.MagicMock(spec=GWorksheet)
|
||||
mock_gworksheet.col_exists.return_value = True
|
||||
mock_gworksheet.get_cell.return_value = ""
|
||||
mock_gworksheet.get_row.return_value = {}
|
||||
@@ -18,14 +16,14 @@ def mock_gworksheet():
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_metadata():
|
||||
metadata: Metadata = MagicMock(spec=Metadata)
|
||||
def mock_metadata(mocker):
|
||||
metadata: Metadata = mocker.MagicMock(spec=Metadata)
|
||||
metadata.get_url.return_value = "http://example.com"
|
||||
metadata.status = "done"
|
||||
metadata.get_title.return_value = "Example Title"
|
||||
metadata.get.return_value = "Example Content"
|
||||
metadata.get_timestamp.return_value = "2025-01-01T00:00:00"
|
||||
metadata.get_final_media.return_value = MagicMock(spec=Media)
|
||||
metadata.get_final_media.return_value = mocker.MagicMock(spec=Media)
|
||||
metadata.get_all_media.return_value = []
|
||||
metadata.get_media_by_id.return_value = None
|
||||
metadata.get_first_image.return_value = None
|
||||
@@ -47,21 +45,21 @@ def metadata():
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_media():
|
||||
def mock_media(mocker):
|
||||
"""Fixture for a mock Media object."""
|
||||
mock_media = MagicMock(spec=Media)
|
||||
mock_media = mocker.MagicMock(spec=Media)
|
||||
mock_media.urls = ["http://example.com/media"]
|
||||
mock_media.get.return_value = "not-calculated"
|
||||
return mock_media
|
||||
|
||||
@pytest.fixture
|
||||
def gsheets_db(mock_gworksheet, setup_module):
|
||||
def gsheets_db(mock_gworksheet, setup_module, mocker):
|
||||
db = setup_module("gsheet_db", {
|
||||
"allow_worksheets": "set()",
|
||||
"block_worksheets": "set()",
|
||||
"use_sheet_names_in_stored_paths": "True",
|
||||
})
|
||||
db._retrieve_gsheet = MagicMock(return_value=(mock_gworksheet, 1))
|
||||
db._retrieve_gsheet = mocker.MagicMock(return_value=(mock_gworksheet, 1))
|
||||
return db
|
||||
|
||||
|
||||
@@ -109,27 +107,26 @@ def test_aborted(gsheets_db, mock_metadata, mock_gworksheet):
|
||||
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', '')
|
||||
|
||||
|
||||
def test_done(gsheets_db, metadata, mock_gworksheet, expected_calls):
|
||||
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00'):
|
||||
gsheets_db.done(metadata)
|
||||
def test_done(gsheets_db, metadata, mock_gworksheet, expected_calls, mocker):
|
||||
mocker.patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00')
|
||||
gsheets_db.done(metadata)
|
||||
mock_gworksheet.batch_set_cell.assert_called_once_with(expected_calls)
|
||||
|
||||
|
||||
def test_done_cached(gsheets_db, metadata, mock_gworksheet):
|
||||
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00'):
|
||||
gsheets_db.done(metadata, cached=True)
|
||||
def test_done_cached(gsheets_db, metadata, mock_gworksheet, mocker):
|
||||
mocker.patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00')
|
||||
gsheets_db.done(metadata, cached=True)
|
||||
|
||||
# Verify the status message includes "[cached]"
|
||||
call_args = mock_gworksheet.batch_set_cell.call_args[0][0]
|
||||
assert any(call[2].startswith("[cached]") for call in call_args)
|
||||
|
||||
|
||||
def test_done_missing_media(gsheets_db, metadata, mock_gworksheet):
|
||||
def test_done_missing_media(gsheets_db, metadata, mock_gworksheet, mocker):
|
||||
# clear media from metadata
|
||||
metadata.media = []
|
||||
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp",
|
||||
return_value='2025-02-01T00:00:00+00:00'):
|
||||
gsheets_db.done(metadata)
|
||||
mocker.patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00')
|
||||
gsheets_db.done(metadata)
|
||||
# Verify nothing media-related gets updated
|
||||
call_args = mock_gworksheet.batch_set_cell.call_args[0][0]
|
||||
media_fields = {'archive', 'screenshot', 'thumbnail', 'wacz', 'replaywebpage'}
|
||||
|
||||
@@ -2,7 +2,7 @@ import pytest
|
||||
|
||||
from auto_archiver.modules.hash_enricher import HashEnricher
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.core.module import get_module_lazy
|
||||
from auto_archiver.core.module import ModuleFactory
|
||||
|
||||
@pytest.mark.parametrize("algorithm, filename, expected_hash", [
|
||||
("SHA-256", "tests/data/testfile_1.txt", "1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014"),
|
||||
@@ -22,7 +22,7 @@ def test_default_config_values(setup_module):
|
||||
|
||||
def test_config():
|
||||
# test default config
|
||||
c = get_module_lazy('hash_enricher').configs
|
||||
c = ModuleFactory().get_module_lazy('hash_enricher').configs
|
||||
assert c["algorithm"]["default"] == "SHA-256"
|
||||
assert c["chunksize"]["default"] == 16000000
|
||||
assert c["algorithm"]["choices"] == ["SHA-256", "SHA3-512"]
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import datetime
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -9,29 +8,21 @@ from auto_archiver.modules.meta_enricher import MetaEnricher
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_metadata():
|
||||
def mock_metadata(mocker):
|
||||
"""Creates a mock Metadata object."""
|
||||
mock: Metadata = MagicMock(spec=Metadata)
|
||||
mock: Metadata = mocker.MagicMock(spec=Metadata)
|
||||
mock.get_url.return_value = "https://example.com"
|
||||
mock.is_empty.return_value = False # Default to not empty
|
||||
mock.get_all_media.return_value = []
|
||||
return mock
|
||||
|
||||
@pytest.fixture
|
||||
def mock_media():
|
||||
def mock_media(mocker):
|
||||
"""Creates a mock Media object."""
|
||||
mock: Media = MagicMock(spec=Media)
|
||||
mock: Media = mocker.MagicMock(spec=Media)
|
||||
mock.filename = "mock_file.txt"
|
||||
return mock
|
||||
|
||||
@pytest.fixture
|
||||
def metadata():
|
||||
m = Metadata()
|
||||
m.set_url("https://example.com")
|
||||
m.set_title("Test Title")
|
||||
m.set_content("Test Content")
|
||||
return m
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def meta_enricher(setup_module):
|
||||
@@ -90,14 +81,14 @@ def test_enrich_file_sizes_no_media(meta_enricher, metadata):
|
||||
assert metadata.get("total_size") == "0.0 bytes"
|
||||
|
||||
|
||||
def test_enrich_archive_duration(meta_enricher, metadata):
|
||||
def test_enrich_archive_duration(meta_enricher, metadata, mocker):
|
||||
# Set fixed "processed at" time in the past
|
||||
processed_at = datetime.now(timezone.utc) - timedelta(minutes=10, seconds=30)
|
||||
metadata.set("_processed_at", processed_at)
|
||||
# patch datetime
|
||||
with patch("datetime.datetime") as mock_datetime:
|
||||
mock_now = datetime.now(timezone.utc)
|
||||
mock_datetime.now.return_value = mock_now
|
||||
meta_enricher.enrich_archive_duration(metadata)
|
||||
mock_datetime = mocker.patch("datetime.datetime")
|
||||
mock_now = datetime.now(timezone.utc)
|
||||
mock_datetime.now.return_value = mock_now
|
||||
meta_enricher.enrich_archive_duration(metadata)
|
||||
|
||||
assert metadata.get("archive_duration_seconds") == 630
|
||||
88
tests/enrichers/test_metadata_enricher.py
Normal file
88
tests/enrichers/test_metadata_enricher.py
Normal file
@@ -0,0 +1,88 @@
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Media
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_media(mocker):
|
||||
"""Creates a mock Media object."""
|
||||
mock: Media = mocker.MagicMock(spec=Media)
|
||||
mock.filename = "mock_file.txt"
|
||||
return mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def enricher(setup_module, mock_binary_dependencies):
|
||||
return setup_module("metadata_enricher", {})
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"output,expected",
|
||||
[
|
||||
("Key1: Value1\nKey2: Value2", {"Key1": "Value1", "Key2": "Value2"}),
|
||||
("InvalidLine", {}),
|
||||
("", {}),
|
||||
],
|
||||
)
|
||||
def test_get_metadata(enricher, output, expected, mocker):
|
||||
mock_run = mocker.patch("subprocess.run")
|
||||
mock_run.return_value.stdout = output
|
||||
mock_run.return_value.stderr = ""
|
||||
mock_run.return_value.returncode = 0
|
||||
|
||||
result = enricher.get_metadata("test.jpg")
|
||||
assert result == expected
|
||||
mock_run.assert_called_once_with(
|
||||
["exiftool", "test.jpg"], capture_output=True, text=True
|
||||
)
|
||||
|
||||
|
||||
def test_get_metadata_exiftool_not_found(enricher, mocker):
|
||||
mock_run = mocker.patch("subprocess.run")
|
||||
mock_run.side_effect = FileNotFoundError
|
||||
result = enricher.get_metadata("test.jpg")
|
||||
assert result == {}
|
||||
|
||||
|
||||
def test_enrich_sets_metadata(enricher, mocker):
|
||||
media1 = mocker.Mock(filename="img1.jpg")
|
||||
media2 = mocker.Mock(filename="img2.jpg")
|
||||
metadata = mocker.Mock()
|
||||
metadata.media = [media1, media2]
|
||||
enricher.get_metadata = lambda f: {"key": "value"} if f == "img1.jpg" else {}
|
||||
|
||||
enricher.enrich(metadata)
|
||||
|
||||
media1.set.assert_called_once_with("metadata", {"key": "value"})
|
||||
media2.set.assert_not_called()
|
||||
assert metadata.media == [media1, media2]
|
||||
|
||||
|
||||
def test_enrich_empty_media(enricher, mocker):
|
||||
metadata = mocker.Mock()
|
||||
metadata.media = []
|
||||
# Should not raise errors
|
||||
enricher.enrich(metadata)
|
||||
|
||||
|
||||
def test_get_metadata_error_handling(enricher, mocker):
|
||||
mocker.patch("subprocess.run", side_effect=Exception("Test error"))
|
||||
mock_log = mocker.patch("loguru.logger.error")
|
||||
result = enricher.get_metadata("test.jpg")
|
||||
assert result == {}
|
||||
assert "Error occurred: " in mock_log.call_args[0][0]
|
||||
|
||||
|
||||
def test_metadata_pickle(enricher, unpickle, mocker):
|
||||
mock_run = mocker.patch("subprocess.run")
|
||||
# Uses pickled values
|
||||
mock_run.return_value = unpickle("metadata_enricher_exif.pickle")
|
||||
metadata = unpickle("metadata_enricher_ytshort_input.pickle")
|
||||
expected = unpickle("metadata_enricher_ytshort_expected.pickle")
|
||||
enricher.enrich(metadata)
|
||||
expected_media = expected.media
|
||||
actual_media = metadata.media
|
||||
assert len(expected_media) == len(actual_media)
|
||||
assert actual_media[0].properties.get("metadata") == expected_media[0].properties.get("metadata")
|
||||
|
||||
78
tests/enrichers/test_pdq_hash_enricher.py
Normal file
78
tests/enrichers/test_pdq_hash_enricher.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import pytest
|
||||
from PIL import UnidentifiedImageError
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.modules.pdq_hash_enricher import PdqHashEnricher
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def enricher(setup_module):
|
||||
return setup_module("pdq_hash_enricher", {})
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def metadata_with_images():
|
||||
m = Metadata()
|
||||
m.set_url("https://example.com")
|
||||
m.add_media(Media(filename="image1.jpg", key="image1"))
|
||||
m.add_media(Media(filename="image2.jpg", key="image2"))
|
||||
return m
|
||||
|
||||
|
||||
def test_successful_enrich(metadata_with_images, mocker):
|
||||
with (
|
||||
mocker.patch("pdqhash.compute", return_value=([1, 0, 1, 0] * 64, 100)),
|
||||
mocker.patch("PIL.Image.open"),
|
||||
mocker.patch.object(Media, "is_image", return_value=True) as mock_is_image,
|
||||
):
|
||||
enricher = PdqHashEnricher()
|
||||
enricher.enrich(metadata_with_images)
|
||||
|
||||
# Ensure the hash is set for image media
|
||||
for media in metadata_with_images.media:
|
||||
assert media.get("pdq_hash") is not None
|
||||
|
||||
|
||||
def test_enrich_skip_non_image(metadata_with_images, mocker):
|
||||
mocker.patch.object(Media, "is_image", return_value=False)
|
||||
mock_pdq = mocker.patch("pdqhash.compute")
|
||||
|
||||
enricher = PdqHashEnricher()
|
||||
enricher.enrich(metadata_with_images)
|
||||
mock_pdq.assert_not_called()
|
||||
|
||||
|
||||
def test_enrich_handles_corrupted_image(metadata_with_images, mocker):
|
||||
mocker.patch("PIL.Image.open", side_effect=UnidentifiedImageError("Corrupted image"))
|
||||
mock_pdq = mocker.patch("pdqhash.compute")
|
||||
mock_logger = mocker.patch("loguru.logger.error")
|
||||
enricher = PdqHashEnricher()
|
||||
enricher.enrich(metadata_with_images)
|
||||
|
||||
assert mock_logger.call_count == len(metadata_with_images.media)
|
||||
mock_pdq.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"media_id, should_have_hash",
|
||||
[
|
||||
("screenshot", False),
|
||||
("warc-file-123", False),
|
||||
("regular-image", True),
|
||||
]
|
||||
)
|
||||
def test_enrich_excludes_by_filetype(media_id, should_have_hash, mocker):
|
||||
metadata = Metadata()
|
||||
metadata.set_url("https://example.com")
|
||||
metadata.add_media(Media(filename="image.jpg").set("id", media_id))
|
||||
|
||||
mocker.patch("pdqhash.compute", return_value=([1, 0, 1, 0] * 64, 100))
|
||||
mocker.patch("PIL.Image.open")
|
||||
mocker.patch.object(Media, "is_image", return_value=True)
|
||||
|
||||
enricher = PdqHashEnricher()
|
||||
enricher.enrich(metadata)
|
||||
|
||||
media_item = metadata.media[0]
|
||||
assert (media_item.get("pdq_hash") is not None) == should_have_hash
|
||||
|
||||
195
tests/enrichers/test_screenshot_enricher.py
Normal file
195
tests/enrichers/test_screenshot_enricher.py
Normal file
@@ -0,0 +1,195 @@
|
||||
import base64
|
||||
|
||||
import pytest
|
||||
from selenium.common.exceptions import TimeoutException
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.modules.screenshot_enricher import ScreenshotEnricher
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_selenium_env(mocker):
|
||||
"""Patches Selenium calls and driver checks in one place."""
|
||||
|
||||
# Patch external dependencies
|
||||
mock_which = mocker.patch("shutil.which")
|
||||
mock_driver_class = mocker.patch("auto_archiver.utils.webdriver.CookieSettingDriver")
|
||||
mock_binary_paths = mocker.patch("selenium.webdriver.common.selenium_manager.SeleniumManager.binary_paths")
|
||||
mock_is_file = mocker.patch("pathlib.Path.is_file", return_value=True)
|
||||
mock_popen = mocker.patch("subprocess.Popen")
|
||||
mock_is_connectable = mocker.patch("selenium.webdriver.common.service.Service.is_connectable", return_value=True)
|
||||
mock_firefox_options = mocker.patch("selenium.webdriver.FirefoxOptions")
|
||||
# Define side effect for `shutil.which`
|
||||
def mock_which_side_effect(dep):
|
||||
return "/mock/geckodriver" if dep == "geckodriver" else None
|
||||
mock_which.side_effect = mock_which_side_effect
|
||||
|
||||
# Mock binary paths
|
||||
mock_binary_paths.return_value = {
|
||||
"driver_path": "/mock/driver",
|
||||
"browser_path": "/mock/browser",
|
||||
}
|
||||
# Mock `subprocess.Popen`
|
||||
mock_proc = mocker.MagicMock()
|
||||
mock_proc.poll.return_value = None
|
||||
mock_popen.return_value = mock_proc
|
||||
# Mock `CookieSettingDriver`
|
||||
mock_driver = mocker.MagicMock()
|
||||
mock_driver_class.return_value = mock_driver
|
||||
# Mock `FirefoxOptions`
|
||||
mock_options_instance = mocker.MagicMock()
|
||||
mock_firefox_options.return_value = mock_options_instance
|
||||
yield mock_driver, mock_driver_class, mock_options_instance
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def common_patches(tmp_path, mocker):
|
||||
"""Patches common utilities used across multiple tests."""
|
||||
mocker.patch("auto_archiver.utils.url.is_auth_wall", return_value=False)
|
||||
mocker.patch("os.path.join", return_value=str(tmp_path / "test.png"))
|
||||
mocker.patch("time.sleep")
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def screenshot_enricher(setup_module, mock_binary_dependencies) -> ScreenshotEnricher:
|
||||
configs: dict = {
|
||||
"width": 1280,
|
||||
"height": 720,
|
||||
"timeout": 60,
|
||||
"sleep_before_screenshot": 4,
|
||||
"http_proxy": "",
|
||||
"save_to_pdf": "False",
|
||||
"print_options": {},
|
||||
}
|
||||
return setup_module("screenshot_enricher", configs)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def metadata_with_video():
|
||||
m = Metadata()
|
||||
m.set_url("https://example.com")
|
||||
m.add_media(Media(filename="video.mp4").set("id", "video1"))
|
||||
return m
|
||||
|
||||
|
||||
def test_enrich_adds_screenshot(
|
||||
screenshot_enricher,
|
||||
metadata_with_video,
|
||||
mock_selenium_env,
|
||||
common_patches,
|
||||
tmp_path,
|
||||
):
|
||||
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
mock_driver_class.assert_called_once_with(
|
||||
cookies=None,
|
||||
cookiejar=None,
|
||||
facebook_accept_cookies=False,
|
||||
options=mock_options_instance,
|
||||
)
|
||||
# Verify the actual calls on the returned mock_driver
|
||||
mock_driver.get.assert_called_once_with("https://example.com")
|
||||
mock_driver.save_screenshot.assert_called_once_with(str(tmp_path / "test.png"))
|
||||
# Check that the media was added (2 = original video + screenshot)
|
||||
assert len(metadata_with_video.media) == 2
|
||||
assert metadata_with_video.media[1].properties.get("id") == "screenshot"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url,is_auth",
|
||||
[
|
||||
("https://example.com", False),
|
||||
("https://private.com", True),
|
||||
],
|
||||
)
|
||||
def test_enrich_auth_wall(
|
||||
screenshot_enricher,
|
||||
metadata_with_video,
|
||||
mock_selenium_env,
|
||||
common_patches,
|
||||
url,
|
||||
is_auth,
|
||||
mocker
|
||||
):
|
||||
# Testing with and without is_auth_wall
|
||||
mock_driver, mock_driver_class, _ = mock_selenium_env
|
||||
mocker.patch("auto_archiver.utils.url.is_auth_wall", return_value=is_auth)
|
||||
metadata_with_video.set_url(url)
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
|
||||
if is_auth:
|
||||
mock_driver.get.assert_not_called()
|
||||
assert len(metadata_with_video.media) == 1
|
||||
assert metadata_with_video.media[0].properties.get("id") == "video1"
|
||||
else:
|
||||
mock_driver.get.assert_called_once_with(url)
|
||||
assert len(metadata_with_video.media) == 2
|
||||
assert metadata_with_video.media[1].properties.get("id") == "screenshot"
|
||||
|
||||
|
||||
def test_handle_timeout_exception(
|
||||
screenshot_enricher, metadata_with_video, mock_selenium_env, mocker
|
||||
):
|
||||
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
|
||||
|
||||
mock_driver.get.side_effect = TimeoutException
|
||||
mock_log = mocker.patch("loguru.logger.info")
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
mock_log.assert_called_once_with("TimeoutException loading page for screenshot")
|
||||
assert len(metadata_with_video.media) == 1
|
||||
|
||||
|
||||
def test_handle_general_exception(
|
||||
screenshot_enricher, metadata_with_video, mock_selenium_env, mocker
|
||||
):
|
||||
"""Test proper handling of unexpected general exceptions"""
|
||||
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
|
||||
# Simulate a generic exception when save_screenshot is called
|
||||
mock_driver.get.return_value = None
|
||||
mock_driver.save_screenshot.side_effect = Exception("Unexpected Error")
|
||||
|
||||
mock_log = mocker.patch("loguru.logger.error")
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
# Verify that the exception was logged with the log
|
||||
mock_log.assert_called_once_with(
|
||||
"Got error while loading webdriver for screenshot enricher: Unexpected Error"
|
||||
)
|
||||
# And no new media was added due to the error
|
||||
assert len(metadata_with_video.media) == 1
|
||||
|
||||
|
||||
def test_pdf_creation(mocker, screenshot_enricher, metadata_with_video, mock_selenium_env):
|
||||
"""Test PDF creation when save_to_pdf is enabled"""
|
||||
mock_driver, mock_driver_class, mock_options_instance = mock_selenium_env
|
||||
# Override the save_to_pdf option
|
||||
screenshot_enricher.save_to_pdf = True
|
||||
# Mock the print_page method to return base64-encoded content
|
||||
mock_driver.print_page.return_value = base64.b64encode(b"fake_pdf_content").decode("utf-8")
|
||||
# Patch functions with mocker
|
||||
mock_os_path_join = mocker.patch("os.path.join", side_effect=lambda *args: f"{args[-1]}")
|
||||
mock_random_str = mocker.patch(
|
||||
"auto_archiver.modules.screenshot_enricher.screenshot_enricher.random_str",
|
||||
return_value="fixed123",
|
||||
)
|
||||
mock_open = mocker.patch("builtins.open", new_callable=mocker.mock_open)
|
||||
mock_log_error = mocker.patch("loguru.logger.error")
|
||||
|
||||
screenshot_enricher.enrich(metadata_with_video)
|
||||
# Verify screenshot and PDF creation
|
||||
mock_driver.save_screenshot.assert_called_once()
|
||||
mock_driver.print_page.assert_called_once_with(mock_driver.print_options)
|
||||
# Check that PDF file was opened and written
|
||||
mock_open.assert_any_call("pdf_fixed123.pdf", "wb")
|
||||
|
||||
# Ensure both screenshot and PDF were added as media
|
||||
assert len(metadata_with_video.media) == 3
|
||||
assert metadata_with_video.media[1].properties.get("id") == "screenshot"
|
||||
assert metadata_with_video.media[2].properties.get("id") == "pdf"
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def cleanup_files(tmp_path):
|
||||
yield
|
||||
for file in tmp_path.iterdir():
|
||||
file.unlink()
|
||||
54
tests/enrichers/test_ssl_enricher.py
Normal file
54
tests/enrichers/test_ssl_enricher.py
Normal file
@@ -0,0 +1,54 @@
|
||||
import ssl
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def enricher(setup_module):
|
||||
configs: dict = {
|
||||
"skip_when_nothing_archived": "True",
|
||||
}
|
||||
return setup_module("ssl_enricher", configs)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def metadata():
|
||||
m = Metadata()
|
||||
m.set_url("https://example.com")
|
||||
m.add_media(Media("tests/data/testfile_1.txt"))
|
||||
m.add_media(Media("tests/data/testfile_2.txt"))
|
||||
return m
|
||||
|
||||
|
||||
def test_http_raises(metadata, enricher):
|
||||
metadata.set_url("http://example.com")
|
||||
with pytest.raises(AssertionError) as exc_info:
|
||||
enricher.enrich(metadata)
|
||||
assert "Invalid URL scheme" in str(exc_info.value)
|
||||
|
||||
|
||||
def test_empty_metadata(metadata, enricher):
|
||||
metadata.media = []
|
||||
assert enricher.enrich(metadata) is None
|
||||
|
||||
|
||||
def test_ssl_enrich(metadata, enricher, mocker):
|
||||
mocker.patch("ssl.get_server_certificate", return_value="TEST_CERT")
|
||||
mock_file = mocker.patch("builtins.open", mocker.mock_open())
|
||||
media_len_before = len(metadata.media)
|
||||
enricher.enrich(metadata)
|
||||
|
||||
ssl.get_server_certificate.assert_called_once_with(("example.com", 443))
|
||||
mock_file.assert_called_once_with(f"{enricher.tmp_dir}/example-com.pem", "w")
|
||||
mock_file().write.assert_called_once_with("TEST_CERT")
|
||||
assert len(metadata.media) == media_len_before + 1
|
||||
# Ensure the certificate is added to metadata
|
||||
assert any(media.filename.endswith("example-com.pem") for media in metadata.media)
|
||||
|
||||
|
||||
def test_ssl_error_handling(enricher, metadata, mocker):
|
||||
mocker.patch("ssl.get_server_certificate", side_effect=ssl.SSLError("SSL error"))
|
||||
with pytest.raises(ssl.SSLError, match="SSL error"):
|
||||
enricher.enrich(metadata)
|
||||
|
||||
148
tests/enrichers/test_thumbnail_enricher.py
Normal file
148
tests/enrichers/test_thumbnail_enricher.py
Normal file
@@ -0,0 +1,148 @@
|
||||
import pytest
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.modules.thumbnail_enricher import ThumbnailEnricher
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def thumbnail_enricher(setup_module, mock_binary_dependencies) -> ThumbnailEnricher:
|
||||
config: dict = {
|
||||
"thumbnails_per_minute": 60,
|
||||
"max_thumbnails": 4,
|
||||
}
|
||||
return setup_module("thumbnail_enricher", config)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def metadata_with_video():
|
||||
m = Metadata()
|
||||
m.set_url("https://example.com")
|
||||
m.add_media(Media(filename="video.mp4").set("id", "video1"))
|
||||
return m
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_ffmpeg_environment(mocker):
|
||||
# Mocking all the ffmpeg calls in one place
|
||||
mock_ffmpeg_input = mocker.patch("ffmpeg.input")
|
||||
mock_makedirs = mocker.patch("os.makedirs")
|
||||
mocker.patch.object(Media, "is_video", return_value=True),
|
||||
mock_probe = mocker.patch(
|
||||
"ffmpeg.probe",
|
||||
return_value={
|
||||
"streams": [
|
||||
{"codec_type": "video", "duration": "120"}
|
||||
] # Default 2-minute duration, but can override in tests
|
||||
},
|
||||
)
|
||||
mock_output = mocker.MagicMock()
|
||||
mock_ffmpeg_input.return_value.filter.return_value.output.return_value = (
|
||||
mock_output
|
||||
)
|
||||
|
||||
return {
|
||||
"mock_ffmpeg_input": mock_ffmpeg_input,
|
||||
"mock_makedirs": mock_makedirs,
|
||||
"mock_output": mock_output,
|
||||
"mock_probe": mock_probe,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize("thumbnails_per_minute, max_thumbnails, expected_count", [
|
||||
(10, 5, 5), # Capped at max_thumbnails
|
||||
(1, 10, 2), # Less than max_thumbnails
|
||||
(60, 7, 7), # Matches exactly
|
||||
])
|
||||
def test_enrich_thumbnail_limits(
|
||||
thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment,
|
||||
thumbnails_per_minute, max_thumbnails, expected_count
|
||||
):
|
||||
thumbnail_enricher.thumbnails_per_minute = thumbnails_per_minute
|
||||
thumbnail_enricher.max_thumbnails = max_thumbnails
|
||||
|
||||
thumbnail_enricher.enrich(metadata_with_video)
|
||||
|
||||
assert mock_ffmpeg_environment["mock_output"].run.call_count == expected_count
|
||||
thumbnails = metadata_with_video.media[0].get("thumbnails")
|
||||
assert len(thumbnails) == expected_count
|
||||
|
||||
def test_enrich_handles_probe_failure(thumbnail_enricher, metadata_with_video, mocker):
|
||||
|
||||
mocker.patch("ffmpeg.probe", side_effect=Exception("Probe error"))
|
||||
mocker.patch("os.makedirs")
|
||||
mock_logger = mocker.patch("loguru.logger.error")
|
||||
mocker.patch.object(Media, "is_video", return_value=True)
|
||||
|
||||
thumbnail_enricher.enrich(metadata_with_video)
|
||||
# Ensure error was logged
|
||||
mock_logger.assert_called_with(
|
||||
f"error getting duration of video video.mp4: Probe error"
|
||||
)
|
||||
# Ensure no thumbnails were created
|
||||
thumbnails = metadata_with_video.media[0].get("thumbnails")
|
||||
assert thumbnails is None
|
||||
|
||||
|
||||
def test_enrich_skips_non_video_files(thumbnail_enricher, metadata_with_video, mocker):
|
||||
mocker.patch.object(Media, "is_video", return_value=False)
|
||||
mock_ffmpeg = mocker.patch("ffmpeg.input")
|
||||
thumbnail_enricher.enrich(metadata_with_video)
|
||||
mock_ffmpeg.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("thumbnails_per_minute,max_thumbnails,expected_count", [
|
||||
(60, 5, 5), # caught by max
|
||||
(60, 20, 10), # caught by t/min
|
||||
(0, 20, 1), # test min caught (1)
|
||||
(11, 20, 1), # test min caught (1)
|
||||
(12, 20, 2), # test caught by t/min
|
||||
])
|
||||
def test_enrich_handles_short_video(
|
||||
thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment, thumbnails_per_minute, max_thumbnails, expected_count, mocker
|
||||
):
|
||||
# override mock duration
|
||||
fake_duration = 10
|
||||
mocker.patch(
|
||||
"ffmpeg.probe",
|
||||
return_value={ "streams": [{"codec_type": "video", "duration": str(fake_duration)}]},
|
||||
)
|
||||
thumbnail_enricher.thumbnails_per_minute = thumbnails_per_minute
|
||||
thumbnail_enricher.max_thumbnails = max_thumbnails
|
||||
|
||||
thumbnail_enricher.enrich(metadata_with_video)
|
||||
assert mock_ffmpeg_environment["mock_output"].run.call_count == expected_count
|
||||
thumbnails = metadata_with_video.media[0].get("thumbnails")
|
||||
assert len(thumbnails) == expected_count
|
||||
|
||||
|
||||
def test_uses_existing_duration(
|
||||
thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment
|
||||
):
|
||||
metadata_with_video.media[0].set("duration", 60)
|
||||
thumbnail_enricher.enrich(metadata_with_video)
|
||||
mock_ffmpeg_environment["mock_probe"].assert_not_called()
|
||||
assert mock_ffmpeg_environment["mock_output"].run.call_count == 4
|
||||
|
||||
|
||||
def test_enrich_metadata_structure(thumbnail_enricher, metadata_with_video, mock_ffmpeg_environment, mocker):
|
||||
fake_duration = 120
|
||||
mocker.patch("ffmpeg.probe", return_value={'streams': [{'codec_type': 'video', 'duration': str(fake_duration)}]})
|
||||
thumbnail_enricher.thumbnails_per_minute = 2
|
||||
thumbnail_enricher.max_thumbnails = 4
|
||||
|
||||
thumbnail_enricher.enrich(metadata_with_video)
|
||||
|
||||
media_item = metadata_with_video.media[0]
|
||||
thumbnails = media_item.get("thumbnails")
|
||||
|
||||
# Assert normal metadata
|
||||
assert media_item.get("id") == "video1"
|
||||
assert media_item.get("duration") == fake_duration
|
||||
# Evenly spaced timestamps
|
||||
expected_timestamps = ["24.000s", "48.000s", "72.000s", "96.000s"]
|
||||
assert thumbnails is not None
|
||||
assert len(thumbnails) == 4
|
||||
|
||||
for index, thumbnail in enumerate(thumbnails):
|
||||
assert thumbnail.filename is not None
|
||||
assert thumbnail.properties.get("id") == f"thumbnail_{index}"
|
||||
assert thumbnail.properties.get("timestamp") == expected_timestamps[index]
|
||||
112
tests/enrichers/test_wacz_enricher.py
Normal file
112
tests/enrichers/test_wacz_enricher.py
Normal file
@@ -0,0 +1,112 @@
|
||||
import os
|
||||
from zipfile import ZipFile
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def wacz_enricher(setup_module, mock_binary_dependencies):
|
||||
configs: dict = {
|
||||
"profile": None,
|
||||
"docker_commands": None,
|
||||
"timeout": 120,
|
||||
"extract_media": False,
|
||||
"extract_screenshot": True,
|
||||
"socks_proxy_host": None,
|
||||
"socks_proxy_port": None,
|
||||
"proxy_server": None,
|
||||
}
|
||||
wacz = setup_module("wacz_enricher", configs)
|
||||
return wacz
|
||||
|
||||
|
||||
def test_setup_without_docker(wacz_enricher, mocker):
|
||||
mocker.patch.dict(os.environ, {"RUNNING_IN_DOCKER": "1"}, clear=True)
|
||||
wacz_enricher.setup()
|
||||
assert not wacz_enricher.docker_in_docker
|
||||
|
||||
|
||||
def test_setup_with_docker(wacz_enricher, mocker):
|
||||
mocker.patch.dict(os.environ, {"WACZ_ENABLE_DOCKER": "1"}, clear=True)
|
||||
wacz_enricher.setup()
|
||||
assert wacz_enricher.use_docker
|
||||
|
||||
|
||||
def test_already_ran(wacz_enricher, metadata, mocker):
|
||||
metadata.add_media(Media("test.wacz"), id="browsertrix")
|
||||
mock_log = mocker.patch("loguru.logger.info")
|
||||
assert wacz_enricher.enrich(metadata) is True
|
||||
assert "WACZ enricher had already been executed" in mock_log.call_args[0][0]
|
||||
|
||||
|
||||
def test_basic_call_execution(wacz_enricher, mocker):
|
||||
mock_run = mocker.patch("subprocess.run")
|
||||
mock_run.return_value = mocker.Mock(returncode=0)
|
||||
metadata = Metadata().set_url("https://example.com")
|
||||
wacz_enricher.enrich(metadata)
|
||||
assert mock_run.called
|
||||
# Checks that the url is passed to the cmd
|
||||
assert "--url https://example.com" in " ".join(mock_run.call_args[0][0])
|
||||
|
||||
|
||||
def test_download_success(wacz_enricher, mocker) -> None:
|
||||
"""Test download returns metadata on successful enrichment."""
|
||||
basic_metadata = Metadata().set_url("https://example.com")
|
||||
mocker.patch.object(wacz_enricher, "enrich", return_value=True)
|
||||
result = wacz_enricher.download(basic_metadata)
|
||||
assert result is not None
|
||||
assert isinstance(result, Metadata)
|
||||
assert result.status == "wacz: success"
|
||||
|
||||
|
||||
def test_enrich_already_executed(wacz_enricher, mocker) -> None:
|
||||
"""Test enrich if already executed."""
|
||||
mock_log = mocker.patch("loguru.logger.info")
|
||||
metadata = Metadata().set_url("https://example.com")
|
||||
media = Media(filename="some_file.wacz")
|
||||
metadata.add_media(media, id="browsertrix")
|
||||
result = wacz_enricher.enrich(metadata)
|
||||
assert result is True
|
||||
assert "WACZ enricher had already been executed:" in mock_log.call_args[0][0]
|
||||
|
||||
|
||||
def test_enrich_subprocess_exception(wacz_enricher, mocker, tmp_path) -> None:
|
||||
"""Test enrich returns False when subprocess fails."""
|
||||
wacz_enricher.tmp_dir = str(tmp_path)
|
||||
wacz_enricher.extract_media = False
|
||||
wacz_enricher.extract_screenshot = True
|
||||
mocker.patch("auto_archiver.utils.misc.random_str", return_value="TESTCOL")
|
||||
mocker.patch("subprocess.run", side_effect=Exception("fail"))
|
||||
basic_metadata = Metadata().set_url("https://example.com")
|
||||
result = wacz_enricher.enrich(basic_metadata)
|
||||
assert result is False
|
||||
|
||||
|
||||
def test_extract_media(wacz_enricher, metadata, tmp_path, mocker) -> None:
|
||||
"""Test extract_media_from_wacz extracts screenshot media."""
|
||||
wacz_enricher.tmp_dir = str(tmp_path)
|
||||
|
||||
# Create a *real* zip file so ZipFile won't fail.
|
||||
wacz_file = tmp_path / "dummy.wacz"
|
||||
with ZipFile(wacz_file, "w") as zf:
|
||||
zf.writestr("dummy.txt", "test content")
|
||||
|
||||
mocker.patch("os.listdir", return_value=[])
|
||||
warc_data = (
|
||||
b"WARC/1.0\r\n"
|
||||
b"WARC-Type: resource\r\n"
|
||||
b"Content-Type: image/png\r\n"
|
||||
b"WARC-Target-URI: http://example.com/image.png\r\n"
|
||||
b"Content-Length: 12\r\n"
|
||||
b"\r\n"
|
||||
b"image-bytes"
|
||||
b"\r\n\r\nWARC/1.0\r\n\r\n"
|
||||
)
|
||||
mock_file = mocker.mock_open(read_data=warc_data)
|
||||
mocker.patch("builtins.open", mock_file)
|
||||
metadata.add_media(Media("something.wacz"), "browsertrix")
|
||||
wacz_enricher.extract_media_from_wacz(metadata, str(wacz_file))
|
||||
assert len(metadata.media) == 2
|
||||
assert metadata.media[1].properties.get("id") == "browsertrix-screenshot"
|
||||
168
tests/enrichers/test_wayback_enricher.py
Normal file
168
tests/enrichers/test_wayback_enricher.py
Normal file
@@ -0,0 +1,168 @@
|
||||
import json
|
||||
import requests
|
||||
import pytest
|
||||
from auto_archiver.modules.wayback_extractor_enricher import WaybackExtractorEnricher
|
||||
from auto_archiver.core import Metadata
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_is_auth_wall(mocker):
|
||||
"""Fixture to mock is_auth_wall behavior."""
|
||||
def _mock_is_auth_wall(return_value: bool):
|
||||
return mocker.patch("auto_archiver.utils.url.is_auth_wall", return_value=return_value)
|
||||
return _mock_is_auth_wall
|
||||
|
||||
@pytest.fixture
|
||||
def mock_post_success(mocker):
|
||||
"""Fixture to mock POST requests with a successful response."""
|
||||
def _mock_post(json_data: dict = None, status_code: int = 200):
|
||||
json_data = json_data or {"job_id": "job123"}
|
||||
resp = mocker.Mock(status_code=status_code)
|
||||
resp.json.return_value = json_data
|
||||
return mocker.patch("requests.post", return_value=resp)
|
||||
return _mock_post
|
||||
|
||||
@pytest.fixture
|
||||
def mock_get_success(mocker):
|
||||
"""Fixture to mock GET requests returning a completed archive status."""
|
||||
def _mock_get(json_data: dict = None, status_code: int = 200):
|
||||
json_data = json_data or {
|
||||
"status": "success",
|
||||
"timestamp": "20250101010101",
|
||||
"original_url": "https://example.com"
|
||||
}
|
||||
resp = mocker.Mock(status_code=status_code)
|
||||
resp.json.return_value = json_data
|
||||
return mocker.patch("requests.get", return_value=resp)
|
||||
return _mock_get
|
||||
|
||||
@pytest.fixture
|
||||
def wayback_extractor_enricher(setup_module) -> WaybackExtractorEnricher:
|
||||
configs: dict = {
|
||||
"timeout": 5,
|
||||
"if_not_archived_within": None,
|
||||
"key": "somekey",
|
||||
"secret": "secret",
|
||||
"proxy_http": None,
|
||||
"proxy_https": None,
|
||||
}
|
||||
return setup_module("wayback_extractor_enricher", configs)
|
||||
|
||||
|
||||
def test_download_success(
|
||||
wayback_extractor_enricher,
|
||||
mock_is_auth_wall,
|
||||
mock_post_success,
|
||||
mock_get_success
|
||||
):
|
||||
mock_is_auth_wall(False)
|
||||
mock_post_success()
|
||||
mock_get_success()
|
||||
# Basic metadata to allow merge
|
||||
metadata = Metadata().set_url("https://example.com")
|
||||
result = wayback_extractor_enricher.download(metadata)
|
||||
assert result.get("wayback") == "https://web.archive.org/web/20250101010101/https://example.com"
|
||||
|
||||
def test_enrich_auth_wall(wayback_extractor_enricher, metadata, mock_is_auth_wall):
|
||||
mock_is_auth_wall(True)
|
||||
result = wayback_extractor_enricher.enrich(metadata)
|
||||
assert result is None
|
||||
|
||||
def test_enrich_already_enriched(wayback_extractor_enricher, metadata):
|
||||
metadata.set("wayback", "existing")
|
||||
result = wayback_extractor_enricher.enrich(metadata)
|
||||
assert result is True
|
||||
|
||||
def test_enrich_post_failure(
|
||||
wayback_extractor_enricher,
|
||||
metadata,
|
||||
mock_is_auth_wall,
|
||||
mock_post_success
|
||||
):
|
||||
mock_is_auth_wall(False)
|
||||
mock_post_success(json_data={"error": "server error"}, status_code=500)
|
||||
result = wayback_extractor_enricher.enrich(metadata)
|
||||
assert result is False
|
||||
assert "Internet archive failed with status of 500" in metadata.get("wayback")
|
||||
|
||||
def test_enrich_post_json_decode_error(
|
||||
wayback_extractor_enricher,
|
||||
metadata,
|
||||
mock_is_auth_wall,
|
||||
mocker
|
||||
):
|
||||
mock_is_auth_wall(False)
|
||||
resp = mocker.Mock(status_code=200)
|
||||
resp.json.side_effect = json.decoder.JSONDecodeError("msg", "doc", 0)
|
||||
resp.text = "invalid json"
|
||||
mocker.patch("requests.post", return_value=resp)
|
||||
assert wayback_extractor_enricher.enrich(metadata) is False
|
||||
|
||||
def test_enrich_no_job_id(
|
||||
wayback_extractor_enricher,
|
||||
metadata,
|
||||
mock_is_auth_wall,
|
||||
mock_post_success
|
||||
):
|
||||
mock_is_auth_wall(False)
|
||||
mock_post_success(json_data={})
|
||||
assert wayback_extractor_enricher.enrich(metadata) is False
|
||||
|
||||
def test_enrich_get_success(
|
||||
wayback_extractor_enricher,
|
||||
metadata,
|
||||
mock_is_auth_wall,
|
||||
mock_post_success,
|
||||
mock_get_success
|
||||
):
|
||||
mock_is_auth_wall(False)
|
||||
mock_post_success()
|
||||
mock_get_success()
|
||||
assert wayback_extractor_enricher.enrich(metadata) is True
|
||||
assert metadata.get("wayback") == "https://web.archive.org/web/20250101010101/https://example.com"
|
||||
assert metadata.get("check wayback") == "https://web.archive.org/web/*/https://example.com"
|
||||
|
||||
def test_enrich_get_failure(
|
||||
wayback_extractor_enricher,
|
||||
metadata,
|
||||
mock_is_auth_wall,
|
||||
mock_post_success,
|
||||
mock_get_success
|
||||
):
|
||||
mock_is_auth_wall(False)
|
||||
mock_post_success()
|
||||
mock_get_success(json_data={"status": "failed"}, status_code=400)
|
||||
assert wayback_extractor_enricher.enrich(metadata) is False
|
||||
|
||||
def test_enrich_get_request_exception(
|
||||
wayback_extractor_enricher,
|
||||
metadata,
|
||||
mock_is_auth_wall,
|
||||
mock_post_success,
|
||||
mocker
|
||||
):
|
||||
mock_is_auth_wall(False)
|
||||
mock_post_success()
|
||||
mocker.patch("requests.get", side_effect=requests.exceptions.RequestException("error"))
|
||||
mocker.patch("time.sleep", return_value=None)
|
||||
# check it still enriches the job_id information
|
||||
assert wayback_extractor_enricher.enrich(metadata) is True
|
||||
assert metadata.get("wayback").get("job_id") == "job123"
|
||||
|
||||
def test_enrich_get_json_decode_error(
|
||||
wayback_extractor_enricher,
|
||||
metadata,
|
||||
mock_is_auth_wall,
|
||||
mock_post_success,
|
||||
mocker
|
||||
):
|
||||
mock_is_auth_wall(False)
|
||||
mock_post_success()
|
||||
resp = mocker.Mock()
|
||||
resp.json.side_effect = json.decoder.JSONDecodeError("msg", "doc", 0)
|
||||
resp.text = "invalid json"
|
||||
mocker.patch("requests.get", return_value=resp)
|
||||
mocker.patch("time.sleep", return_value=None)
|
||||
# check it still enriches the job_id information
|
||||
assert wayback_extractor_enricher.enrich(metadata) is True
|
||||
assert metadata.get("wayback").get("job_id") == "job123"
|
||||
133
tests/enrichers/test_whisper_enricher.py
Normal file
133
tests/enrichers/test_whisper_enricher.py
Normal file
@@ -0,0 +1,133 @@
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.modules.s3_storage import S3Storage
|
||||
from auto_archiver.modules.whisper_enricher import WhisperEnricher
|
||||
|
||||
TEST_S3_URL = "http://cdn.example.com/test.mp4"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def enricher(mocker):
|
||||
"""Fixture with mocked S3 and API dependencies"""
|
||||
config = {
|
||||
"api_endpoint": "http://testapi",
|
||||
"api_key": "whisper-key",
|
||||
"include_srt": False,
|
||||
"timeout": 5,
|
||||
"action": "translate",
|
||||
"steps": {"storages": ["s3_storage"]}
|
||||
}
|
||||
mock_s3 = mocker.MagicMock(spec=S3Storage)
|
||||
mock_s3.get_cdn_url.return_value = TEST_S3_URL
|
||||
instance = WhisperEnricher()
|
||||
instance.name = "whisper_enricher"
|
||||
instance.display_name = "Whisper Enricher"
|
||||
instance.config_setup({instance.name: config})
|
||||
# bypassing the setup method and mocking S3 setup
|
||||
instance.stores = config['steps']['storages']
|
||||
instance.s3 = mock_s3
|
||||
yield instance, mock_s3
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def metadata():
|
||||
metadata = Metadata()
|
||||
metadata.set_url("http://test.url")
|
||||
metadata.set_title("test title")
|
||||
return metadata
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_requests(mocker):
|
||||
mock_requests = mocker.patch("auto_archiver.modules.whisper_enricher.whisper_enricher.requests")
|
||||
mock_response = mocker.MagicMock()
|
||||
mock_response.status_code = 201
|
||||
mock_response.json.return_value = {"id": "job123"}
|
||||
mock_requests.post.return_value = mock_response
|
||||
yield mock_requests
|
||||
|
||||
|
||||
def test_successful_job_submission(enricher, metadata, mock_requests, mocker):
|
||||
"""Test successful media processing with S3 configured"""
|
||||
whisper, mock_s3 = enricher
|
||||
# Configure mock S3 URL to match test expectation
|
||||
mock_s3.get_cdn_url.return_value = TEST_S3_URL
|
||||
|
||||
# Create test media with matching CDN URL
|
||||
m = Media("test.mp4")
|
||||
m.mimetype = "video/mp4"
|
||||
m.add_url(mock_s3.get_cdn_url.return_value)
|
||||
metadata.media = [m]
|
||||
|
||||
# Mock the complete API interaction chain
|
||||
mock_status_response = mocker.MagicMock()
|
||||
mock_status_response.status_code = 200
|
||||
mock_status_response.json.return_value = {
|
||||
"status": "success",
|
||||
"meta": {}
|
||||
}
|
||||
mock_artifacts_response = mocker.MagicMock()
|
||||
mock_artifacts_response.status_code = 200
|
||||
mock_artifacts_response.json.return_value = [{
|
||||
"data": [{"start": 0, "end": 5, "text": "test transcript"}]
|
||||
}]
|
||||
# Set up mock response sequence
|
||||
mock_requests.get.side_effect = [
|
||||
mock_status_response, # First call: status check
|
||||
mock_artifacts_response # Second call: artifacts check
|
||||
]
|
||||
|
||||
# Run enrichment (without opening file)
|
||||
whisper.enrich(metadata)
|
||||
# Check API interactions
|
||||
mock_requests.post.assert_called_once_with(
|
||||
"http://testapi/jobs",
|
||||
json={"url": "http://cdn.example.com/test.mp4", "type": "translate"},
|
||||
headers={"Authorization": "Bearer whisper-key"}
|
||||
)
|
||||
# Verify job status checks
|
||||
assert mock_requests.get.call_count == 2
|
||||
assert "artifact_0_text" in metadata.media[0].get("whisper_model")
|
||||
assert metadata.media[0].get("whisper_model") == {'artifact_0_text': 'test transcript',
|
||||
'job_artifacts_check': 'http://testapi/jobs/job123/artifacts',
|
||||
'job_id': 'job123',
|
||||
'job_status_check': 'http://testapi/jobs/job123'}
|
||||
|
||||
|
||||
def test_submit_job(enricher, mocker):
|
||||
"""Test job submission method"""
|
||||
whisper, _ = enricher
|
||||
m = Media("test.mp4")
|
||||
m.add_url(TEST_S3_URL)
|
||||
mock_requests = mocker.patch("auto_archiver.modules.whisper_enricher.whisper_enricher.requests")
|
||||
mock_response = mocker.MagicMock()
|
||||
mock_response.status_code = 201
|
||||
mock_response.json.return_value = {"id": "job123"}
|
||||
mock_requests.post.return_value = mock_response
|
||||
job_id = whisper.submit_job(m)
|
||||
assert job_id == "job123"
|
||||
|
||||
|
||||
def test_submit_raises_status(enricher, mocker):
|
||||
whisper, _ = enricher
|
||||
m = Media("test.mp4")
|
||||
m.add_url(TEST_S3_URL)
|
||||
mock_requests = mocker.patch("auto_archiver.modules.whisper_enricher.whisper_enricher.requests")
|
||||
mock_response = mocker.MagicMock()
|
||||
mock_response.status_code = 400
|
||||
mock_response.json.return_value = {"id": "job123"}
|
||||
mock_requests.post.return_value = mock_response
|
||||
with pytest.raises(AssertionError) as exc_info:
|
||||
whisper.submit_job(m)
|
||||
assert str(exc_info.value) == "calling the whisper api http://testapi returned a non-success code: 400"
|
||||
|
||||
|
||||
# @pytest.mark.parametrize("test_url, status", ["http://cdn.example.com/test.mp4",])
|
||||
def test_submit_job_fails(enricher):
|
||||
"""Test assertion fails with non-S3 URL"""
|
||||
whisper, mock_s3 = enricher
|
||||
m = Media("test.mp4")
|
||||
m.add_url("http://cdn.wrongurl.com/test.mp4")
|
||||
with pytest.raises(AssertionError):
|
||||
whisper.submit_job(m)
|
||||
@@ -9,6 +9,7 @@ import pytest
|
||||
from auto_archiver.modules.generic_extractor.generic_extractor import GenericExtractor
|
||||
from .test_extractor_base import TestExtractorBase
|
||||
|
||||
CI=os.getenv("GITHUB_ACTIONS", '') == 'true'
|
||||
class TestGenericExtractor(TestExtractorBase):
|
||||
"""Tests Generic Extractor
|
||||
"""
|
||||
@@ -77,10 +78,11 @@ class TestGenericExtractor(TestExtractorBase):
|
||||
result = self.extractor.download(item)
|
||||
assert not result
|
||||
|
||||
|
||||
@pytest.mark.skipif(CI, reason="Currently no way to authenticate when on CI. Youtube (yt-dlp) doesn't support logging in with username/password.")
|
||||
@pytest.mark.download
|
||||
def test_youtube_download(self, make_item):
|
||||
# url https://www.youtube.com/watch?v=5qap5aO4i9A
|
||||
|
||||
item = make_item("https://www.youtube.com/watch?v=J---aiyznGQ")
|
||||
result = self.extractor.download(item)
|
||||
assert result.get_url() == "https://www.youtube.com/watch?v=J---aiyznGQ"
|
||||
@@ -114,6 +116,7 @@ class TestGenericExtractor(TestExtractorBase):
|
||||
result = self.extractor.download(item)
|
||||
assert result is not False
|
||||
|
||||
@pytest.mark.skipif(CI, reason="Truth social blocks GH actions.")
|
||||
@pytest.mark.download
|
||||
def test_truthsocial_download_video(self, make_item):
|
||||
item = make_item("https://truthsocial.com/@DaynaTrueman/posts/110602446619561579")
|
||||
@@ -121,18 +124,21 @@ class TestGenericExtractor(TestExtractorBase):
|
||||
assert len(result.media) == 1
|
||||
assert result is not False
|
||||
|
||||
@pytest.mark.skipif(CI, reason="Truth social blocks GH actions.")
|
||||
@pytest.mark.download
|
||||
def test_truthsocial_download_no_media(self, make_item):
|
||||
item = make_item("https://truthsocial.com/@bbcnewa/posts/109598702184774628")
|
||||
result = self.extractor.download(item)
|
||||
assert result is not False
|
||||
|
||||
@pytest.mark.skipif(CI, reason="Truth social blocks GH actions.")
|
||||
@pytest.mark.download
|
||||
def test_truthsocial_download_poll(self, make_item):
|
||||
item = make_item("https://truthsocial.com/@CNN_US/posts/113724326568555098")
|
||||
result = self.extractor.download(item)
|
||||
assert result is not False
|
||||
|
||||
@pytest.mark.skipif(CI, reason="Truth social blocks GH actions.")
|
||||
@pytest.mark.download
|
||||
def test_truthsocial_download_single_image(self, make_item):
|
||||
item = make_item("https://truthsocial.com/@mariabartiromo/posts/113861116433335006")
|
||||
@@ -140,6 +146,7 @@ class TestGenericExtractor(TestExtractorBase):
|
||||
assert len(result.media) == 1
|
||||
assert result is not False
|
||||
|
||||
@pytest.mark.skipif(CI, reason="Truth social blocks GH actions.")
|
||||
@pytest.mark.download
|
||||
def test_truthsocial_download_multiple_images(self, make_item):
|
||||
item = make_item("https://truthsocial.com/@trrth/posts/113861302149349135")
|
||||
|
||||
@@ -1,15 +1,12 @@
|
||||
from datetime import datetime
|
||||
from typing import Type
|
||||
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from auto_archiver.core import Metadata
|
||||
from auto_archiver.modules.instagram_api_extractor.instagram_api_extractor import InstagramAPIExtractor
|
||||
from .test_extractor_base import TestExtractorBase
|
||||
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_user_response():
|
||||
return {
|
||||
@@ -115,74 +112,74 @@ class TestInstagramAPIExtractor(TestExtractorBase):
|
||||
# test gets text (metadata title)
|
||||
pass
|
||||
|
||||
def test_download_profile_basic(self, metadata, mock_user_response):
|
||||
def test_download_profile_basic(self, metadata, mock_user_response, mocker):
|
||||
"""Test basic profile download without full_profile"""
|
||||
with patch.object(self.extractor, 'call_api') as mock_call, \
|
||||
patch.object(self.extractor, 'download_from_url') as mock_download:
|
||||
# Mock API responses
|
||||
mock_call.return_value = mock_user_response
|
||||
mock_download.return_value = "profile.jpg"
|
||||
mock_call = mocker.patch.object(self.extractor, 'call_api')
|
||||
mock_download = mocker.patch.object(self.extractor, 'download_from_url')
|
||||
# Mock API responses
|
||||
mock_call.return_value = mock_user_response
|
||||
mock_download.return_value = "profile.jpg"
|
||||
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
assert result.status == "insta profile: success"
|
||||
assert result.get_title() == "Test User"
|
||||
assert result.get("data") == self.extractor.cleanup_dict(mock_user_response["user"])
|
||||
# Verify profile picture download
|
||||
mock_call.assert_called_once_with("v2/user/by/username", {"username": "test_user"})
|
||||
mock_download.assert_called_once_with("http://example.com/profile.jpg")
|
||||
assert len(result.media) == 1
|
||||
assert result.media[0].filename == "profile.jpg"
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
assert result.status == "insta profile: success"
|
||||
assert result.get_title() == "Test User"
|
||||
assert result.get("data") == self.extractor.cleanup_dict(mock_user_response["user"])
|
||||
# Verify profile picture download
|
||||
mock_call.assert_called_once_with("v2/user/by/username", {"username": "test_user"})
|
||||
mock_download.assert_called_once_with("http://example.com/profile.jpg")
|
||||
assert len(result.media) == 1
|
||||
assert result.media[0].filename == "profile.jpg"
|
||||
|
||||
def test_download_profile_full(self, metadata, mock_user_response, mock_story_response):
|
||||
def test_download_profile_full(self, metadata, mock_user_response, mock_story_response, mocker):
|
||||
"""Test full profile download with stories/posts"""
|
||||
with patch.object(self.extractor, 'call_api') as mock_call, \
|
||||
patch.object(self.extractor, 'download_all_posts') as mock_posts, \
|
||||
patch.object(self.extractor, 'download_all_highlights') as mock_highlights, \
|
||||
patch.object(self.extractor, 'download_all_tagged') as mock_tagged, \
|
||||
patch.object(self.extractor, '_download_stories_reusable') as mock_stories:
|
||||
mock_call = mocker.patch.object(self.extractor, 'call_api')
|
||||
mock_posts = mocker.patch.object(self.extractor, 'download_all_posts')
|
||||
mock_highlights = mocker.patch.object(self.extractor, 'download_all_highlights')
|
||||
mock_tagged = mocker.patch.object(self.extractor, 'download_all_tagged')
|
||||
mock_stories = mocker.patch.object(self.extractor, '_download_stories_reusable')
|
||||
|
||||
self.extractor.full_profile = True
|
||||
mock_call.side_effect = [
|
||||
mock_user_response,
|
||||
mock_story_response
|
||||
]
|
||||
mock_highlights.return_value = None
|
||||
mock_stories.return_value = mock_story_response
|
||||
mock_posts.return_value = None
|
||||
mock_tagged.return_value = None
|
||||
self.extractor.full_profile = True
|
||||
mock_call.side_effect = [
|
||||
mock_user_response,
|
||||
mock_story_response
|
||||
]
|
||||
mock_highlights.return_value = None
|
||||
mock_stories.return_value = mock_story_response
|
||||
mock_posts.return_value = None
|
||||
mock_tagged.return_value = None
|
||||
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
assert result.get("#stories") == len(mock_story_response)
|
||||
mock_posts.assert_called_once_with(result, "123")
|
||||
assert "errors" not in result.metadata
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
assert result.get("#stories") == len(mock_story_response)
|
||||
mock_posts.assert_called_once_with(result, "123")
|
||||
assert "errors" not in result.metadata
|
||||
|
||||
def test_download_profile_not_found(self, metadata):
|
||||
def test_download_profile_not_found(self, metadata, mocker):
|
||||
"""Test profile not found error"""
|
||||
with patch.object(self.extractor, 'call_api') as mock_call:
|
||||
mock_call.return_value = {"user": None}
|
||||
with pytest.raises(AssertionError) as exc_info:
|
||||
self.extractor.download_profile(metadata, "invalid_user")
|
||||
assert "User invalid_user not found" in str(exc_info.value)
|
||||
mock_call = mocker.patch.object(self.extractor, 'call_api')
|
||||
mock_call.return_value = {"user": None}
|
||||
with pytest.raises(AssertionError) as exc_info:
|
||||
self.extractor.download_profile(metadata, "invalid_user")
|
||||
assert "User invalid_user not found" in str(exc_info.value)
|
||||
|
||||
def test_download_profile_error_handling(self, metadata, mock_user_response):
|
||||
def test_download_profile_error_handling(self, metadata, mock_user_response, mocker):
|
||||
"""Test error handling in full profile mode"""
|
||||
with (patch.object(self.extractor, 'call_api') as mock_call, \
|
||||
patch.object(self.extractor, 'download_all_highlights') as mock_highlights, \
|
||||
patch.object(self.extractor, 'download_all_tagged') as mock_tagged, \
|
||||
patch.object(self.extractor, '_download_stories_reusable') as stories_tagged, \
|
||||
patch.object(self.extractor, 'download_all_posts') as mock_posts
|
||||
):
|
||||
self.extractor.full_profile = True
|
||||
mock_call.side_effect = [
|
||||
mock_user_response,
|
||||
Exception("Stories API failed"),
|
||||
Exception("Posts API failed")
|
||||
]
|
||||
mock_highlights.return_value = None
|
||||
mock_tagged.return_value = None
|
||||
stories_tagged.return_value = None
|
||||
mock_posts.return_value = None
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
mock_call = mocker.patch.object(self.extractor, 'call_api')
|
||||
mock_highlights = mocker.patch.object(self.extractor, 'download_all_highlights')
|
||||
mock_tagged = mocker.patch.object(self.extractor, 'download_all_tagged')
|
||||
stories_tagged = mocker.patch.object(self.extractor, '_download_stories_reusable')
|
||||
mock_posts = mocker.patch.object(self.extractor, 'download_all_posts')
|
||||
|
||||
assert result.is_success()
|
||||
assert "Error downloading stories for test_user" in result.metadata["errors"]
|
||||
self.extractor.full_profile = True
|
||||
mock_call.side_effect = [
|
||||
mock_user_response,
|
||||
Exception("Stories API failed"),
|
||||
Exception("Posts API failed")
|
||||
]
|
||||
mock_highlights.return_value = None
|
||||
mock_tagged.return_value = None
|
||||
stories_tagged.return_value = None
|
||||
mock_posts.return_value = None
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
|
||||
assert result.is_success()
|
||||
assert "Error downloading stories for test_user" in result.metadata["errors"]
|
||||
@@ -1,94 +1,108 @@
|
||||
import os
|
||||
from typing import Type
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata
|
||||
from auto_archiver.core.extractor import Extractor
|
||||
from auto_archiver.modules.instagram_tbot_extractor import InstagramTbotExtractor
|
||||
from tests.extractors.test_extractor_base import TestExtractorBase
|
||||
|
||||
TESTFILES = os.path.join(os.path.dirname(__file__), "testfiles")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def session_file(tmpdir):
|
||||
"""Fixture to create a test session file."""
|
||||
session_file = os.path.join(tmpdir, "test_session.session")
|
||||
with open(session_file, "w") as f:
|
||||
f.write("mock_session_data")
|
||||
return session_file.replace(".session", "")
|
||||
def patch_extractor_methods(request, setup_module, mocker):
|
||||
mocker.patch.object(InstagramTbotExtractor, '_prepare_session_file', return_value=None)
|
||||
mocker.patch.object(InstagramTbotExtractor, '_initialize_telegram_client', return_value=None)
|
||||
yield
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_extractor_methods(request, setup_module):
|
||||
with patch.object(InstagramTbotExtractor, '_prepare_session_file', return_value=None), \
|
||||
patch.object(InstagramTbotExtractor, '_initialize_telegram_client', return_value=None):
|
||||
if hasattr(request, 'cls') and hasattr(request.cls, 'config'):
|
||||
request.cls.extractor = setup_module("instagram_tbot_extractor", request.cls.config)
|
||||
|
||||
yield
|
||||
|
||||
@pytest.fixture
|
||||
def metadata_sample():
|
||||
m = Metadata()
|
||||
m.set_title("Test Title")
|
||||
m.set_timestamp("2021-01-01T00:00:00Z")
|
||||
m.set_timestamp("2021-01-01T00:00:00")
|
||||
m.set_url("https://www.instagram.com/p/1234567890")
|
||||
return m
|
||||
|
||||
|
||||
class TestInstagramTbotExtractor:
|
||||
@pytest.fixture
|
||||
def mock_telegram_client(mocker):
|
||||
"""Fixture to mock TelegramClient interactions."""
|
||||
mock_client = mocker.patch("auto_archiver.modules.instagram_tbot_extractor.client")
|
||||
instance = mocker.MagicMock()
|
||||
mock_client.return_value = instance
|
||||
return instance
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def extractor(setup_module, patch_extractor_methods, mocker):
|
||||
extractor_module = "instagram_tbot_extractor"
|
||||
extractor: InstagramTbotExtractor
|
||||
config = {
|
||||
"api_id": 12345,
|
||||
"api_hash": "test_api_hash",
|
||||
"session_file": "test_session",
|
||||
"timeout": 4
|
||||
}
|
||||
extractor = setup_module(extractor_module, config)
|
||||
extractor.client = mocker.MagicMock()
|
||||
extractor.session_file = "test_session"
|
||||
return extractor
|
||||
|
||||
|
||||
def test_non_instagram_url(extractor, metadata_sample):
|
||||
metadata_sample.set_url("https://www.youtube.com")
|
||||
assert extractor.download(metadata_sample) is False
|
||||
|
||||
|
||||
def test_download_success(extractor, metadata_sample, mocker):
|
||||
mocker.patch.object(extractor, "_send_url_to_bot", return_value=(mocker.MagicMock(), 101))
|
||||
mocker.patch.object(extractor, "_process_messages", return_value="Sample Instagram post caption")
|
||||
result = extractor.download(metadata_sample)
|
||||
assert result.is_success()
|
||||
assert result.status == "insta-via-bot: success"
|
||||
assert result.metadata.get("title") == "Sample Instagram post caption"
|
||||
|
||||
|
||||
def test_download_invalid(extractor, metadata_sample, mocker):
|
||||
mocker.patch.object(extractor, "_send_url_to_bot", return_value=(mocker.MagicMock(), 101))
|
||||
mocker.patch.object(extractor, "_process_messages", return_value="You must enter a URL to a post")
|
||||
assert extractor.download(metadata_sample) is False
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Requires authentication.")
|
||||
class TestInstagramTbotExtractorReal(TestExtractorBase):
|
||||
# To run these tests set the TELEGRAM_API_ID and TELEGRAM_API_HASH environment variables, and ensure the session file exists.
|
||||
# Note these are true at this point in time, but changes to source media could be reason for failure.
|
||||
extractor_module = "instagram_tbot_extractor"
|
||||
extractor: InstagramTbotExtractor
|
||||
config = {
|
||||
"api_id": os.environ.get("TELEGRAM_API_ID"),
|
||||
"api_hash": os.environ.get("TELEGRAM_API_HASH"),
|
||||
"session_file": "secrets/anon-insta",
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def mock_telegram_client(self):
|
||||
"""Fixture to mock TelegramClient interactions."""
|
||||
with patch("auto_archiver.modules.instagram_tbot_extractor._initialize_telegram_client") as mock_client:
|
||||
instance = MagicMock()
|
||||
mock_client.return_value = instance
|
||||
yield instance
|
||||
|
||||
def test_extractor_is_initialized(self):
|
||||
assert self.extractor is not None
|
||||
|
||||
|
||||
@patch("time.sleep")
|
||||
@pytest.mark.parametrize("url, expected_status, bot_responses", [
|
||||
("https://www.instagram.com/p/C4QgLbrIKXG", "insta-via-bot: success", [MagicMock(id=101, media=None, message="Are you new to Bellingcat? - The way we share our investigations is different. 💭\nWe want you to read our story but also learn ou")]),
|
||||
("https://www.instagram.com/reel/DEVLK8qoIbg/", "insta-via-bot: success", [MagicMock(id=101, media=None, message="Our volunteer community is at the centre of many incredible Bellingcat investigations and tools. Stephanie Ladel is one such vol")]),
|
||||
# todo tbot not working for stories :(
|
||||
("https://www.instagram.com/stories/bellingcatofficial/3556336382743057476/", False, [MagicMock(id=101, media=None, message="Media not found or unavailable")]),
|
||||
("https://www.youtube.com/watch?v=ymCMy8OffHM", False, []),
|
||||
("https://www.instagram.com/p/INVALID", False, [MagicMock(id=101, media=None, message="You must enter a URL to a post")]),
|
||||
@pytest.mark.parametrize("url, expected_status, message, len_media", [
|
||||
("https://www.instagram.com/p/C4QgLbrIKXG", "insta-via-bot: success",
|
||||
"Are you new to Bellingcat? - The way we share our investigations is different. 💭\nWe want you to read our story but also learn ou",
|
||||
6),
|
||||
("https://www.instagram.com/reel/DEVLK8qoIbg/", "insta-via-bot: success",
|
||||
"Our volunteer community is at the centre of many incredible Bellingcat investigations and tools. Stephanie Ladel is one such vol",
|
||||
3),
|
||||
# instagram tbot not working (potentially intermittently?) for stories - replace with a live story to retest
|
||||
# ("https://www.instagram.com/stories/bellingcatofficial/3556336382743057476/", False, "Media not found or unavailable"),
|
||||
# Seems to be working intermittently for highlights
|
||||
# ("https://www.instagram.com/stories/highlights/17868810693068139/", "insta-via-bot: success", None, 50),
|
||||
# Marking invalid url as success
|
||||
("https://www.instagram.com/p/INVALID", "insta-via-bot: success", "Media not found or unavailable", 0),
|
||||
("https://www.youtube.com/watch?v=ymCMy8OffHM", False, None, 0),
|
||||
])
|
||||
def test_download(self, mock_sleep, url, expected_status, bot_responses, metadata_sample):
|
||||
def test_download(self, url, expected_status, message, len_media, metadata_sample):
|
||||
"""Test the `download()` method with various Instagram URLs."""
|
||||
metadata_sample.set_url(url)
|
||||
self.extractor.client = MagicMock()
|
||||
|
||||
result = self.extractor.download(metadata_sample)
|
||||
pass
|
||||
# TODO fully mock or use as authenticated test
|
||||
# if expected_status:
|
||||
# assert result.is_success()
|
||||
# assert result.status == expected_status
|
||||
# assert result.metadata.get("title") in [msg.message[:128] for msg in bot_responses if msg.message]
|
||||
# else:
|
||||
# assert result is False
|
||||
|
||||
|
||||
|
||||
|
||||
# Test story
|
||||
# Test expired story
|
||||
# Test requires login/ access (?)
|
||||
# Test post
|
||||
# Test multiple images?
|
||||
if expected_status:
|
||||
assert result.is_success()
|
||||
assert result.status == expected_status
|
||||
assert result.metadata.get("title") == message
|
||||
assert len(result.media) == len_media
|
||||
else:
|
||||
assert result is False
|
||||
|
||||
@@ -23,7 +23,6 @@ class TestTwitterApiExtractor(TestExtractorBase):
|
||||
}
|
||||
|
||||
@pytest.mark.parametrize("url, expected", [
|
||||
("https://t.co/yl3oOJatFp", "https://www.bellingcat.com/category/resources/"), # t.co URL
|
||||
("https://x.com/bellingcat/status/1874097816571961839", "https://x.com/bellingcat/status/1874097816571961839"), # x.com urls unchanged
|
||||
("https://twitter.com/bellingcat/status/1874097816571961839", "https://twitter.com/bellingcat/status/1874097816571961839"), # twitter urls unchanged
|
||||
("https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w", "https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"), # don't strip params from twitter urls (changed Jan 2025)
|
||||
@@ -32,7 +31,11 @@ class TestTwitterApiExtractor(TestExtractorBase):
|
||||
])
|
||||
def test_sanitize_url(self, url, expected):
|
||||
assert expected == self.extractor.sanitize_url(url)
|
||||
|
||||
|
||||
@pytest.mark.download
|
||||
def test_sanitize_url_download(self):
|
||||
assert "https://www.bellingcat.com/category/resources/" == self.extractor.sanitize_url("https://t.co/yl3oOJatFp")
|
||||
|
||||
@pytest.mark.parametrize("url, exptected_username, exptected_tweetid", [
|
||||
("https://twitter.com/bellingcat/status/1874097816571961839", "bellingcat", "1874097816571961839"),
|
||||
("https://x.com/bellingcat/status/1874097816571961839", "bellingcat", "1874097816571961839"),
|
||||
|
||||
108
tests/feeders/test_atlos_feeder.py
Normal file
108
tests/feeders/test_atlos_feeder.py
Normal file
@@ -0,0 +1,108 @@
|
||||
import pytest
|
||||
from auto_archiver.modules.atlos_feeder import AtlosFeeder
|
||||
|
||||
|
||||
class FakeAPIResponse:
|
||||
"""Simulate a response object."""
|
||||
|
||||
def __init__(self, data: dict, raise_error: bool = False) -> None:
|
||||
self._data = data
|
||||
self.raise_error = raise_error
|
||||
|
||||
def json(self) -> dict:
|
||||
return self._data
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
if self.raise_error:
|
||||
raise Exception("HTTP error")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def atlos_feeder(setup_module) -> AtlosFeeder:
|
||||
"""Fixture for AtlosFeeder."""
|
||||
configs: dict = {
|
||||
"api_token": "abc123",
|
||||
"atlos_url": "https://platform.atlos.org",
|
||||
}
|
||||
return setup_module("atlos_feeder", configs)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_atlos_api(mocker):
|
||||
"""Fixture to mock requests to Atlos API."""
|
||||
def _mock_responses(responses):
|
||||
mocker.patch(
|
||||
"requests.get",
|
||||
side_effect=[FakeAPIResponse(data) for data in responses],
|
||||
)
|
||||
return _mock_responses
|
||||
|
||||
|
||||
def test_atlos_feeder_iter_yields_valid_metadata(atlos_feeder, mock_atlos_api):
|
||||
"""Test valid items are yielded and invalid ones ignored."""
|
||||
mock_atlos_api([
|
||||
{
|
||||
"next": None,
|
||||
"results": [
|
||||
{"source_url": "http://example.com", "id": 1,
|
||||
"metadata": {"auto_archiver": {"processed": False}},
|
||||
"visibility": "visible", "status": "complete"},
|
||||
{"source_url": "", "id": 2,
|
||||
"metadata": {"auto_archiver": {"processed": False}},
|
||||
"visibility": "visible", "status": "complete"},
|
||||
{"source_url": "http://example.org", "id": 3,
|
||||
"metadata": {"auto_archiver": {"processed": True}},
|
||||
"visibility": "visible", "status": "complete"},
|
||||
],
|
||||
}
|
||||
])
|
||||
|
||||
items = list(atlos_feeder)
|
||||
assert len(items) == 1
|
||||
assert items[0].get_url() == "http://example.com"
|
||||
assert items[0].get("atlos_id") == 1
|
||||
|
||||
|
||||
def test_atlos_feeder_multiple_pages(atlos_feeder, mock_atlos_api):
|
||||
"""Test iteration over multiple pages with valid items."""
|
||||
mock_atlos_api([
|
||||
{
|
||||
"next": "cursor2",
|
||||
"results": [
|
||||
{"source_url": "http://example1.com", "id": 10,
|
||||
"metadata": {"auto_archiver": {"processed": False}},
|
||||
"visibility": "visible", "status": "complete"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"next": None,
|
||||
"results": [
|
||||
{"source_url": "http://example2.com", "id": 20,
|
||||
"metadata": {"auto_archiver": {"processed": False}},
|
||||
"visibility": "visible", "status": "complete"},
|
||||
],
|
||||
},
|
||||
])
|
||||
|
||||
items = list(atlos_feeder)
|
||||
assert len(items) == 2
|
||||
assert items[0].get_url() == "http://example1.com"
|
||||
assert items[0].get("atlos_id") == 10
|
||||
assert items[1].get_url() == "http://example2.com"
|
||||
assert items[1].get("atlos_id") == 20
|
||||
|
||||
|
||||
def test_atlos_feeder_no_results(atlos_feeder, mock_atlos_api):
|
||||
"""Test iteration stops when no results are returned."""
|
||||
mock_atlos_api([{"next": None, "results": []}])
|
||||
assert list(atlos_feeder) == []
|
||||
|
||||
|
||||
def test_atlos_feeder_http_error(atlos_feeder, mocker):
|
||||
"""Test raises an exception on HTTP error."""
|
||||
mocker.patch(
|
||||
"requests.get",
|
||||
return_value=FakeAPIResponse({"next": None, "results": []}, raise_error=True),
|
||||
)
|
||||
with pytest.raises(Exception, match="HTTP error"):
|
||||
list(atlos_feeder)
|
||||
@@ -2,27 +2,23 @@ from typing import Type
|
||||
|
||||
import gspread
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from auto_archiver.modules.gsheet_feeder import GsheetsFeeder
|
||||
from auto_archiver.core import Metadata, Feeder
|
||||
|
||||
|
||||
def test_setup_without_sheet_and_sheet_id(setup_module):
|
||||
def test_setup_without_sheet_and_sheet_id(setup_module, mocker):
|
||||
# Ensure setup() raises AssertionError if neither sheet nor sheet_id is set.
|
||||
with patch("gspread.service_account"):
|
||||
with pytest.raises(AssertionError):
|
||||
setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": None, "sheet_id": None},
|
||||
)
|
||||
mocker.patch("gspread.service_account")
|
||||
with pytest.raises(AssertionError):
|
||||
setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": None, "sheet_id": None},
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def gsheet_feeder(setup_module) -> GsheetsFeeder:
|
||||
with patch("gspread.service_account"):
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
{
|
||||
def gsheet_feeder(setup_module, mocker) -> GsheetsFeeder:
|
||||
config: dict = {
|
||||
"service_account": "dummy.json",
|
||||
"sheet": "test-auto-archiver",
|
||||
"sheet_id": None,
|
||||
@@ -46,9 +42,13 @@ def gsheet_feeder(setup_module) -> GsheetsFeeder:
|
||||
"allow_worksheets": set(),
|
||||
"block_worksheets": set(),
|
||||
"use_sheet_names_in_stored_paths": True,
|
||||
},
|
||||
)
|
||||
feeder.gsheets_client = MagicMock()
|
||||
}
|
||||
mocker.patch("gspread.service_account")
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
config
|
||||
)
|
||||
feeder.gsheets_client = mocker.MagicMock()
|
||||
return feeder
|
||||
|
||||
|
||||
@@ -129,56 +129,56 @@ def test__set_metadata_with_folder(gsheet_feeder: GsheetsFeeder):
|
||||
],
|
||||
)
|
||||
def test_open_sheet_with_name_or_id(
|
||||
setup_module, sheet, sheet_id, expected_method, expected_arg, description
|
||||
setup_module, sheet, sheet_id, expected_method, expected_arg, description, mocker
|
||||
):
|
||||
"""Ensure open_sheet() correctly opens by name or ID based on configuration."""
|
||||
with patch("gspread.service_account") as mock_service_account:
|
||||
mock_client = MagicMock()
|
||||
mock_service_account.return_value = mock_client
|
||||
mock_client.open.return_value = "MockSheet"
|
||||
mock_client.open_by_key.return_value = "MockSheet"
|
||||
mock_service_account = mocker.patch("gspread.service_account")
|
||||
mock_client = mocker.MagicMock()
|
||||
mock_service_account.return_value = mock_client
|
||||
mock_client.open.return_value = "MockSheet"
|
||||
mock_client.open_by_key.return_value = "MockSheet"
|
||||
|
||||
# Setup module with parameterized values
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": sheet, "sheet_id": sheet_id},
|
||||
)
|
||||
sheet_result = feeder.open_sheet()
|
||||
# Validate the correct method was called
|
||||
getattr(mock_client, expected_method).assert_called_once_with(
|
||||
expected_arg
|
||||
), f"Failed: {description}"
|
||||
assert sheet_result == "MockSheet", f"Failed: {description}"
|
||||
# Setup module with parameterized values
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": sheet, "sheet_id": sheet_id},
|
||||
)
|
||||
sheet_result = feeder.open_sheet()
|
||||
# Validate the correct method was called
|
||||
getattr(mock_client, expected_method).assert_called_once_with(
|
||||
expected_arg
|
||||
), f"Failed: {description}"
|
||||
assert sheet_result == "MockSheet", f"Failed: {description}"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_module")
|
||||
def test_open_sheet_with_sheet_id(setup_module):
|
||||
def test_open_sheet_with_sheet_id(setup_module, mocker):
|
||||
"""Ensure open_sheet() correctly opens a sheet by ID."""
|
||||
with patch("gspread.service_account") as mock_service_account:
|
||||
mock_client = MagicMock()
|
||||
mock_service_account.return_value = mock_client
|
||||
mock_client.open_by_key.return_value = "MockSheet"
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": None, "sheet_id": "ABC123"},
|
||||
)
|
||||
sheet = feeder.open_sheet()
|
||||
mock_client.open_by_key.assert_called_once_with("ABC123")
|
||||
assert sheet == "MockSheet"
|
||||
mock_service_account = mocker.patch("gspread.service_account")
|
||||
mock_client = mocker.MagicMock()
|
||||
mock_service_account.return_value = mock_client
|
||||
mock_client.open_by_key.return_value = "MockSheet"
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": None, "sheet_id": "ABC123"},
|
||||
)
|
||||
sheet = feeder.open_sheet()
|
||||
mock_client.open_by_key.assert_called_once_with("ABC123")
|
||||
assert sheet == "MockSheet"
|
||||
|
||||
|
||||
def test_should_process_sheet(setup_module):
|
||||
with patch("gspread.service_account"):
|
||||
gdb = setup_module(
|
||||
"gsheet_feeder",
|
||||
{
|
||||
"service_account": "dummy.json",
|
||||
"sheet": "TestSheet",
|
||||
"sheet_id": None,
|
||||
"allow_worksheets": {"TestSheet", "Sheet2"},
|
||||
"block_worksheets": {"Sheet3"},
|
||||
},
|
||||
)
|
||||
def test_should_process_sheet(setup_module, mocker):
|
||||
mocker.patch("gspread.service_account")
|
||||
gdb = setup_module(
|
||||
"gsheet_feeder",
|
||||
{
|
||||
"service_account": "dummy.json",
|
||||
"sheet": "TestSheet",
|
||||
"sheet_id": None,
|
||||
"allow_worksheets": {"TestSheet", "Sheet2"},
|
||||
"block_worksheets": {"Sheet3"},
|
||||
},
|
||||
)
|
||||
assert gdb.should_process_sheet("TestSheet") == True
|
||||
assert gdb.should_process_sheet("Sheet3") == False
|
||||
# False if allow_worksheets is set
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
# Note this isn't a feeder, but contained as utility of the gsheet feeder module
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from auto_archiver.modules.gsheet_feeder import GWorksheet
|
||||
|
||||
|
||||
class TestGWorksheet:
|
||||
@pytest.fixture
|
||||
def mock_worksheet(self):
|
||||
mock_ws = MagicMock()
|
||||
def mock_worksheet(self, mocker):
|
||||
mock_ws = mocker.MagicMock()
|
||||
mock_ws.get_values.return_value = [
|
||||
["Link", "Archive Status", "Archive Location", "Archive Date"],
|
||||
["url1", "archived", "filepath1", "2023-01-01"],
|
||||
@@ -136,8 +136,8 @@ class TestGWorksheet:
|
||||
assert gworksheet.to_a1(row, col) == expected
|
||||
|
||||
# Test empty worksheet
|
||||
def test_empty_worksheet_initialization(self):
|
||||
mock_ws = MagicMock()
|
||||
def test_empty_worksheet_initialization(self, mocker):
|
||||
mock_ws = mocker.MagicMock()
|
||||
mock_ws.get_values.return_value = []
|
||||
g = GWorksheet(mock_ws)
|
||||
assert g.headers == []
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from typing import Type
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from auto_archiver.core import Media
|
||||
from auto_archiver.modules.s3_storage import S3Storage
|
||||
|
||||
@@ -11,7 +10,6 @@ class TestS3Storage:
|
||||
"""
|
||||
module_name: str = "s3_storage"
|
||||
storage: Type[S3Storage]
|
||||
s3: MagicMock
|
||||
config: dict = {
|
||||
"path_generator": "flat",
|
||||
"filename_generator": "static",
|
||||
@@ -25,13 +23,14 @@ class TestS3Storage:
|
||||
"private": False,
|
||||
}
|
||||
|
||||
@patch('boto3.client')
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_storage(self, setup_module):
|
||||
def setup_storage(self, setup_module, mocker):
|
||||
self.s3 = S3Storage()
|
||||
self.storage = setup_module(self.module_name, self.config)
|
||||
|
||||
def test_client_initialization(self):
|
||||
"""Test that S3 client is initialized with correct parameters"""
|
||||
|
||||
assert self.storage.s3 is not None
|
||||
assert self.storage.s3.meta.region_name == 'test-region'
|
||||
|
||||
@@ -44,81 +43,63 @@ class TestS3Storage:
|
||||
media.key = "another/path.jpg"
|
||||
assert self.storage.get_cdn_url(media) == "https://cdn.example.com/another/path.jpg"
|
||||
|
||||
def test_uploadf_sets_acl_public(self):
|
||||
def test_uploadf_sets_acl_public(self, mocker):
|
||||
media = Media("test.txt")
|
||||
mock_file = MagicMock()
|
||||
with patch.object(self.storage.s3, 'upload_fileobj') as mock_s3_upload, \
|
||||
patch.object(self.storage, 'is_upload_needed', return_value=True):
|
||||
self.storage.uploadf(mock_file, media)
|
||||
mock_s3_upload.assert_called_once_with(
|
||||
mock_file,
|
||||
Bucket='test-bucket',
|
||||
Key=media.key,
|
||||
ExtraArgs={'ACL': 'public-read', 'ContentType': 'text/plain'}
|
||||
)
|
||||
mock_file = mocker.MagicMock()
|
||||
mock_s3_upload = mocker.patch.object(self.storage.s3, 'upload_fileobj')
|
||||
mocker.patch.object(self.storage, 'is_upload_needed', return_value=True)
|
||||
self.storage.uploadf(mock_file, media)
|
||||
mock_s3_upload.assert_called_once_with(
|
||||
mock_file,
|
||||
Bucket='test-bucket',
|
||||
Key=media.key,
|
||||
ExtraArgs={'ACL': 'public-read', 'ContentType': 'text/plain'}
|
||||
)
|
||||
|
||||
def test_upload_decision_logic(self):
|
||||
def test_upload_decision_logic(self, mocker):
|
||||
"""Test is_upload_needed under different conditions"""
|
||||
media = Media("test.txt")
|
||||
# Test default state (random_no_duplicate=False)
|
||||
assert self.storage.is_upload_needed(media) is True
|
||||
# Set duplicate checking config to true:
|
||||
|
||||
self.storage.random_no_duplicate = True
|
||||
with patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash') as mock_calc_hash, \
|
||||
patch.object(self.storage, 'file_in_folder') as mock_file_in_folder:
|
||||
mock_calc_hash.return_value = 'beepboop123beepboop123beepboop123'
|
||||
mock_file_in_folder.return_value = 'existing_key.txt'
|
||||
# Test duplicate result
|
||||
assert self.storage.is_upload_needed(media) is False
|
||||
assert media.key == 'existing_key.txt'
|
||||
mock_file_in_folder.assert_called_with(
|
||||
# (first 24 chars of hash)
|
||||
'no-dups/beepboop123beepboop123be'
|
||||
)
|
||||
mock_calc_hash = mocker.patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash', return_value='beepboop123beepboop123beepboop123')
|
||||
mock_file_in_folder = mocker.patch.object(self.storage, 'file_in_folder', return_value='existing_key.txt')
|
||||
assert self.storage.is_upload_needed(media) is False
|
||||
assert media.key == 'existing_key.txt'
|
||||
mock_file_in_folder.assert_called_with('no-dups/beepboop123beepboop123be')
|
||||
|
||||
|
||||
@patch.object(S3Storage, 'file_in_folder')
|
||||
def test_skips_upload_when_duplicate_exists(self, mock_file_in_folder):
|
||||
def test_skips_upload_when_duplicate_exists(self, mocker):
|
||||
"""Test that upload skips when file_in_folder finds existing object"""
|
||||
self.storage.random_no_duplicate = True
|
||||
mock_file_in_folder.return_value = "existing_folder/existing_file.txt"
|
||||
# Create test media with calculated hash
|
||||
mock_file_in_folder = mocker.patch.object(S3Storage, 'file_in_folder', return_value="existing_folder/existing_file.txt")
|
||||
media = Media("test.txt")
|
||||
media.key = "original_path.txt"
|
||||
with patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash') as mock_calculate_hash:
|
||||
mock_calculate_hash.return_value = "beepboop123beepboop123beepboop123"
|
||||
# Verify upload
|
||||
assert self.storage.is_upload_needed(media) is False
|
||||
assert media.key == "existing_folder/existing_file.txt"
|
||||
assert media.get("previously archived") is True
|
||||
with patch.object(self.storage.s3, 'upload_fileobj') as mock_upload:
|
||||
result = self.storage.uploadf(None, media)
|
||||
mock_upload.assert_not_called()
|
||||
assert result is True
|
||||
mock_calculate_hash = mocker.patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash', return_value="beepboop123beepboop123beepboop123")
|
||||
assert self.storage.is_upload_needed(media) is False
|
||||
assert media.key == "existing_folder/existing_file.txt"
|
||||
assert media.get("previously archived") is True
|
||||
mock_upload = mocker.patch.object(self.storage.s3, 'upload_fileobj')
|
||||
result = self.storage.uploadf(None, media)
|
||||
mock_upload.assert_not_called()
|
||||
assert result is True
|
||||
|
||||
@patch.object(S3Storage, 'is_upload_needed')
|
||||
def test_uploads_with_correct_parameters(self, mock_upload_needed):
|
||||
def test_uploads_with_correct_parameters(self, mocker):
|
||||
media = Media("test.txt")
|
||||
media.key = "original_key.txt"
|
||||
mock_upload_needed.return_value = True
|
||||
mocker.patch.object(S3Storage, 'is_upload_needed', return_value=True)
|
||||
media.mimetype = 'image/png'
|
||||
mock_file = MagicMock()
|
||||
mock_file = mocker.MagicMock()
|
||||
mock_upload = mocker.patch.object(self.storage.s3, 'upload_fileobj')
|
||||
self.storage.uploadf(mock_file, media)
|
||||
mock_upload.assert_called_once_with(
|
||||
mock_file,
|
||||
Bucket='test-bucket',
|
||||
Key='original_key.txt',
|
||||
ExtraArgs={
|
||||
'ACL': 'public-read',
|
||||
'ContentType': 'image/png'
|
||||
}
|
||||
)
|
||||
|
||||
with patch.object(self.storage.s3, 'upload_fileobj') as mock_upload:
|
||||
self.storage.uploadf(mock_file, media)
|
||||
# verify call occured with these params
|
||||
mock_upload.assert_called_once_with(
|
||||
mock_file,
|
||||
Bucket='test-bucket',
|
||||
Key='original_key.txt',
|
||||
ExtraArgs={
|
||||
'ACL': 'public-read',
|
||||
'ContentType': 'image/png'
|
||||
}
|
||||
)
|
||||
|
||||
def test_file_in_folder_exists(self):
|
||||
with patch.object(self.storage.s3, 'list_objects') as mock_list_objects:
|
||||
mock_list_objects.return_value = {'Contents': [{'Key': 'path/to/file.txt'}]}
|
||||
assert self.storage.file_in_folder('path/to/') == 'path/to/file.txt'
|
||||
def test_file_in_folder_exists(self, mocker):
|
||||
mock_list_objects = mocker.patch.object(self.storage.s3, 'list_objects', return_value={'Contents': [{'Key': 'path/to/file.txt'}]})
|
||||
assert self.storage.file_in_folder('path/to/') == 'path/to/file.txt'
|
||||
|
||||
142
tests/storages/test_atlos_storage.py
Normal file
142
tests/storages/test_atlos_storage.py
Normal file
@@ -0,0 +1,142 @@
|
||||
import os
|
||||
import hashlib
|
||||
import pytest
|
||||
from auto_archiver.core import Media, Metadata
|
||||
from auto_archiver.modules.atlos_storage import AtlosStorage
|
||||
|
||||
|
||||
class FakeAPIResponse:
|
||||
"""Simulate a response object."""
|
||||
|
||||
def __init__(self, data: dict, raise_error: bool = False) -> None:
|
||||
self._data = data
|
||||
self.raise_error = raise_error
|
||||
|
||||
def json(self) -> dict:
|
||||
return self._data
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
if self.raise_error:
|
||||
raise Exception("HTTP error")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def atlos_storage(setup_module) -> AtlosStorage:
|
||||
"""Fixture for AtlosStorage."""
|
||||
configs: dict = {
|
||||
"api_token": "abc123",
|
||||
"atlos_url": "https://platform.atlos.org",
|
||||
}
|
||||
return setup_module("atlos_storage", configs)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def media(tmp_path) -> Media:
|
||||
"""Fixture for Media."""
|
||||
content = b"media content"
|
||||
file_path = tmp_path / "media.txt"
|
||||
file_path.write_bytes(content)
|
||||
media = Media(filename=str(file_path))
|
||||
media.properties = {"something": "Title"}
|
||||
media.key = "key"
|
||||
return media
|
||||
|
||||
|
||||
def test_get_cdn_url(atlos_storage: AtlosStorage) -> None:
|
||||
"""Test get_cdn_url returns the configured atlos_url."""
|
||||
media = Media(filename="dummy.mp4")
|
||||
url = atlos_storage.get_cdn_url(media)
|
||||
assert url == atlos_storage.atlos_url
|
||||
|
||||
|
||||
def test_hash(tmp_path, atlos_storage: AtlosStorage) -> None:
|
||||
"""Test _hash() computes the correct SHA-256 hash of a file."""
|
||||
content = b"hello world"
|
||||
file_path = tmp_path / "test.txt"
|
||||
file_path.write_bytes(content)
|
||||
media = Media(filename="dummy.mp4")
|
||||
media.filename = str(file_path)
|
||||
expected_hash = hashlib.sha256(content).hexdigest()
|
||||
assert atlos_storage._hash(media) == expected_hash
|
||||
|
||||
|
||||
def test_upload_no_atlos_id(tmp_path, atlos_storage: AtlosStorage, media: Media, mocker) -> None:
|
||||
"""Test upload() returns False when metadata lacks atlos_id."""
|
||||
metadata = Metadata() # atlos_id not set
|
||||
post_mock = mocker.patch("requests.post")
|
||||
result = atlos_storage.upload(media, metadata)
|
||||
assert result is False
|
||||
post_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_upload_already_uploaded(atlos_storage: AtlosStorage,
|
||||
metadata: Metadata,
|
||||
media: Media,
|
||||
tmp_path,
|
||||
mocker) -> None:
|
||||
"""Test upload() returns True if media hash already exists."""
|
||||
content = b"media content"
|
||||
metadata.set("atlos_id", 101)
|
||||
media_hash = hashlib.sha256(content).hexdigest()
|
||||
fake_get = FakeAPIResponse({
|
||||
"result": {"artifacts": [{"file_hash_sha256": media_hash}]}
|
||||
})
|
||||
get_mock = mocker.patch("requests.get", return_value=fake_get)
|
||||
post_mock = mocker.patch("requests.post")
|
||||
result = atlos_storage.upload(media, metadata)
|
||||
assert result is True
|
||||
get_mock.assert_called_once()
|
||||
post_mock.assert_not_called()
|
||||
|
||||
|
||||
def test_upload_not_uploaded(tmp_path, atlos_storage: AtlosStorage,
|
||||
metadata: Metadata,
|
||||
media: Media,
|
||||
mocker) -> None:
|
||||
"""Test upload() uploads media when not already present."""
|
||||
metadata.set("atlos_id", 202)
|
||||
fake_get = FakeAPIResponse({
|
||||
"result": {"artifacts": [{"file_hash_sha256": "different_hash"}]}
|
||||
})
|
||||
get_mock = mocker.patch("requests.get", return_value=fake_get)
|
||||
fake_post = FakeAPIResponse({}, raise_error=False)
|
||||
post_mock = mocker.patch("requests.post", return_value=fake_post)
|
||||
result = atlos_storage.upload(media, metadata)
|
||||
assert result is True
|
||||
get_mock.assert_called_once()
|
||||
post_mock.assert_called_once()
|
||||
expected_url = f"{atlos_storage.atlos_url}/api/v2/source_material/upload/202"
|
||||
expected_headers = {"Authorization": f"Bearer {atlos_storage.api_token}"}
|
||||
expected_params = {"title": media.properties}
|
||||
call_kwargs = post_mock.call_args.kwargs
|
||||
assert call_kwargs["headers"] == expected_headers
|
||||
assert call_kwargs["params"] == expected_params
|
||||
# Verify the URL passed to requests.post.
|
||||
posted_url = call_kwargs.get("url") or post_mock.call_args.args[0]
|
||||
assert posted_url == expected_url
|
||||
# Verify files parameter contains the correct filename.
|
||||
file_tuple = call_kwargs["files"]["file"]
|
||||
assert file_tuple[0] == os.path.basename(media.filename)
|
||||
|
||||
|
||||
def test_upload_post_http_error(tmp_path,
|
||||
atlos_storage: AtlosStorage,
|
||||
metadata: Metadata,
|
||||
media: Media,
|
||||
mocker) -> None:
|
||||
"""Test upload() propagates HTTP error during POST."""
|
||||
metadata.set("atlos_id", 303)
|
||||
fake_get = FakeAPIResponse({
|
||||
"result": {"artifacts": []}
|
||||
})
|
||||
mocker.patch("requests.get", return_value=fake_get)
|
||||
fake_post = FakeAPIResponse({}, raise_error=True)
|
||||
mocker.patch("requests.post", return_value=fake_post)
|
||||
with pytest.raises(Exception, match="HTTP error"):
|
||||
atlos_storage.upload(media, metadata)
|
||||
|
||||
|
||||
def test_uploadf_not_implemented(atlos_storage: AtlosStorage) -> None:
|
||||
"""Test uploadf() returns None (not implemented)."""
|
||||
result = atlos_storage.uploadf(None, "dummy")
|
||||
assert result is None
|
||||
@@ -1,44 +1,57 @@
|
||||
from typing import Type
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from oauth2client import service_account
|
||||
|
||||
from auto_archiver.core import Media
|
||||
from auto_archiver.modules.gdrive_storage import GDriveStorage
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
from tests.storages.test_storage_base import TestStorageBase
|
||||
|
||||
|
||||
class TestGDriveStorage:
|
||||
"""
|
||||
Test suite for GDriveStorage.
|
||||
"""
|
||||
|
||||
@pytest.fixture
|
||||
def gdrive_storage(setup_module, mocker):
|
||||
module_name: str = "gdrive_storage"
|
||||
storage: Type[GDriveStorage]
|
||||
storage: GDriveStorage
|
||||
config: dict = {'path_generator': 'url',
|
||||
'filename_generator': 'static',
|
||||
'root_folder_id': "fake_root_folder_id",
|
||||
'oauth_token': None,
|
||||
'service_account': 'fake_service_account.json'
|
||||
}
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def gdrive(self, setup_module):
|
||||
with patch('google.oauth2.service_account.Credentials.from_service_account_file') as mock_creds:
|
||||
self.storage = setup_module(self.module_name, self.config)
|
||||
|
||||
def test_initialize_fails_with_non_existent_creds(self):
|
||||
"""
|
||||
Test that the Google Drive service raises a FileNotFoundError when the service account file does not exist.
|
||||
"""
|
||||
# Act and Assert
|
||||
with pytest.raises(FileNotFoundError) as exc_info:
|
||||
self.storage.setup()
|
||||
assert "No such file or directory" in str(exc_info.value)
|
||||
mocker.patch('google.oauth2.service_account.Credentials.from_service_account_file')
|
||||
return setup_module(module_name, config)
|
||||
|
||||
|
||||
def test_path_parts(self):
|
||||
media = Media(filename="test.jpg")
|
||||
media.key = "folder1/folder2/test.jpg"
|
||||
def test_initialize_fails_with_non_existent_creds(setup_module):
|
||||
"""Test that the Google Drive service raises a FileNotFoundError when the service account file does not exist.
|
||||
(and isn't mocked)
|
||||
"""
|
||||
config: dict = {'path_generator': 'url',
|
||||
'filename_generator': 'static',
|
||||
'root_folder_id': "fake_root_folder_id",
|
||||
'oauth_token': None,
|
||||
'service_account': 'fake_service_account.json'
|
||||
}
|
||||
with pytest.raises(FileNotFoundError) as exc_info:
|
||||
setup_module("gdrive_storage", config)
|
||||
assert "No such file or directory" in str(exc_info.value)
|
||||
|
||||
|
||||
def test_get_id_from_parent_and_name(gdrive_storage, mocker):
|
||||
"""Test _get_id_from_parent_and_name returns correct id from an API result."""
|
||||
fake_list = mocker.MagicMock()
|
||||
fake_list.execute.return_value = {"files": [{"id": "123", "name": "testname"}]}
|
||||
fake_service = mocker.MagicMock()
|
||||
# mock the files.list return value
|
||||
fake_service.files.return_value.list.return_value = fake_list
|
||||
gdrive_storage.service = fake_service
|
||||
result = gdrive_storage._get_id_from_parent_and_name("parent", "mock", retries=1, use_mime_type=False)
|
||||
assert result == "123"
|
||||
|
||||
def test_path_parts():
|
||||
media = Media(filename="test.jpg")
|
||||
media.key = "folder1/folder2/test.jpg"
|
||||
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Requires real credentials")
|
||||
|
||||
54
tests/storages/test_local_storage.py
Normal file
54
tests/storages/test_local_storage.py
Normal file
@@ -0,0 +1,54 @@
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Media
|
||||
from auto_archiver.modules.local_storage import LocalStorage
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def local_storage(setup_module) -> LocalStorage:
|
||||
configs: dict = {
|
||||
"path_generator": "flat",
|
||||
"filename_generator": "static",
|
||||
"save_to": "./local_archive",
|
||||
"save_absolute": False,
|
||||
}
|
||||
return setup_module("local_storage", configs)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_media(tmp_path) -> Media:
|
||||
"""Fixture creating a Media object with temporary source file"""
|
||||
src_file = tmp_path / "source.txt"
|
||||
src_file.write_text("test content")
|
||||
return Media(key="subdir/test.txt", filename=str(src_file))
|
||||
|
||||
|
||||
def test_get_cdn_url_relative(local_storage):
|
||||
media = Media(key="test.txt", filename="dummy.txt")
|
||||
expected = os.path.join(local_storage.save_to, media.key)
|
||||
assert local_storage.get_cdn_url(media) == expected
|
||||
|
||||
|
||||
|
||||
def test_get_cdn_url_absolute(local_storage):
|
||||
media = Media(key="test.txt", filename="dummy.txt")
|
||||
local_storage.save_absolute = True
|
||||
expected = os.path.abspath(os.path.join(local_storage.save_to, media.key))
|
||||
assert local_storage.get_cdn_url(media) == expected
|
||||
|
||||
def test_upload_file_contents_and_metadata(local_storage, sample_media):
|
||||
dest = os.path.join(local_storage.save_to, sample_media.key)
|
||||
assert local_storage.upload(sample_media) is True
|
||||
assert Path(sample_media.filename).read_text() == Path(dest).read_text()
|
||||
|
||||
|
||||
def test_upload_nonexistent_source(local_storage):
|
||||
media = Media(key="missing.txt", filename="nonexistent.txt")
|
||||
with pytest.raises(FileNotFoundError):
|
||||
local_storage.upload(media)
|
||||
|
||||
|
||||
@@ -60,3 +60,15 @@ def test_run_auto_archiver_empty_file(caplog, autoarchiver, orchestration_file):
|
||||
|
||||
# should treat an empty file as if there is no file at all
|
||||
assert " No URLs provided. Please provide at least one URL via the com" in caplog.text
|
||||
|
||||
def test_call_autoarchiver_main(caplog, monkeypatch, tmp_path):
|
||||
from auto_archiver.__main__ import main
|
||||
|
||||
# monkey patch to change the current working directory, so that we don't use the user's real config file
|
||||
monkeypatch.chdir(tmp_path)
|
||||
with monkeypatch.context() as m:
|
||||
m.setattr(sys, "argv", ["auto-archiver"])
|
||||
with pytest.raises(SystemExit):
|
||||
main()
|
||||
|
||||
assert "No URLs provided. Please provide at least one" in caplog.text
|
||||
@@ -162,4 +162,25 @@ def test_get_context():
|
||||
|
||||
|
||||
def test_choose_most_complete():
|
||||
pass
|
||||
m_more = Metadata()
|
||||
m_more.set_title("Title 1")
|
||||
m_more.set_content("Content 1")
|
||||
m_more.set_url("https://example.com")
|
||||
|
||||
m_less = Metadata()
|
||||
m_less.set_title("Title 2")
|
||||
m_less.set_content("Content 2")
|
||||
m_less.set_url("https://example.com")
|
||||
m_less.set_context("key", "value")
|
||||
|
||||
res = Metadata.choose_most_complete([m_more, m_less])
|
||||
assert res.metadata.get("title") == "Title 1"
|
||||
|
||||
def test_choose_most_complete_from_pickles(unpickle):
|
||||
# test most complete from pickles before and after an enricher has run
|
||||
# Only compares length of media, not the actual media
|
||||
m_before_enriching = unpickle("metadata_enricher_ytshort_input.pickle")
|
||||
m_after_enriching = unpickle("metadata_enricher_ytshort_expected.pickle")
|
||||
# Iterates `for r in results[1:]:`
|
||||
res = Metadata.choose_most_complete([Metadata(), m_after_enriching, m_before_enriching])
|
||||
assert res.media == m_after_enriching.media
|
||||
|
||||
@@ -1,24 +1,18 @@
|
||||
import sys
|
||||
import pytest
|
||||
from auto_archiver.core.module import get_module_lazy, BaseModule, LazyBaseModule, _LAZY_LOADED_MODULES
|
||||
from auto_archiver.core.module import ModuleFactory, LazyBaseModule
|
||||
from auto_archiver.core.base_module import BaseModule
|
||||
|
||||
@pytest.fixture
|
||||
def example_module():
|
||||
import auto_archiver
|
||||
|
||||
module_factory = ModuleFactory()
|
||||
|
||||
previous_path = auto_archiver.modules.__path__
|
||||
auto_archiver.modules.__path__.append("tests/data/test_modules/")
|
||||
|
||||
module = get_module_lazy("example_module")
|
||||
yield module
|
||||
# cleanup
|
||||
try:
|
||||
del module._manifest
|
||||
except AttributeError:
|
||||
pass
|
||||
del _LAZY_LOADED_MODULES["example_module"]
|
||||
sys.modules.pop("auto_archiver.modules.example_module.example_module", None)
|
||||
auto_archiver.modules.__path__ = previous_path
|
||||
return module_factory.get_module_lazy("example_module")
|
||||
|
||||
def test_get_module_lazy(example_module):
|
||||
assert example_module.name == "example_module"
|
||||
@@ -46,12 +40,14 @@ def test_module_dependency_check_loads_module(example_module):
|
||||
# monkey patch the manifest to include a nonexistnet dependency
|
||||
example_module.manifest["dependencies"]["python"] = ["hash_enricher"]
|
||||
|
||||
module_factory = example_module.module_factory
|
||||
|
||||
loaded_module = example_module.load({})
|
||||
assert loaded_module is not None
|
||||
|
||||
# check the dependency is loaded
|
||||
assert _LAZY_LOADED_MODULES["hash_enricher"] is not None
|
||||
assert _LAZY_LOADED_MODULES["hash_enricher"]._instance is not None
|
||||
assert module_factory._lazy_modules["hash_enricher"] is not None
|
||||
assert module_factory._lazy_modules["hash_enricher"]._instance is not None
|
||||
|
||||
def test_load_module(example_module):
|
||||
|
||||
@@ -69,7 +65,7 @@ def test_load_module(example_module):
|
||||
@pytest.mark.parametrize("module_name", ["local_storage", "generic_extractor", "html_formatter", "csv_db"])
|
||||
def test_load_modules(module_name):
|
||||
# test that specific modules can be loaded
|
||||
module = get_module_lazy(module_name)
|
||||
module = ModuleFactory().get_module_lazy(module_name)
|
||||
assert module is not None
|
||||
assert isinstance(module, LazyBaseModule)
|
||||
assert module.name == module_name
|
||||
@@ -86,7 +82,7 @@ def test_load_modules(module_name):
|
||||
|
||||
@pytest.mark.parametrize("module_name", ["local_storage", "generic_extractor", "html_formatter", "csv_db"])
|
||||
def test_lazy_base_module(module_name):
|
||||
lazy_module = get_module_lazy(module_name)
|
||||
lazy_module = ModuleFactory().get_module_lazy(module_name)
|
||||
|
||||
assert lazy_module is not None
|
||||
assert isinstance(lazy_module, LazyBaseModule)
|
||||
|
||||
@@ -4,7 +4,7 @@ from argparse import ArgumentParser, ArgumentTypeError
|
||||
from auto_archiver.core.orchestrator import ArchivingOrchestrator
|
||||
from auto_archiver.version import __version__
|
||||
from auto_archiver.core.config import read_yaml, store_yaml
|
||||
from auto_archiver.core.module import _LAZY_LOADED_MODULES
|
||||
|
||||
|
||||
TEST_ORCHESTRATION = "tests/data/test_orchestration.yaml"
|
||||
TEST_MODULES = "tests/data/test_modules/"
|
||||
@@ -17,22 +17,7 @@ def test_args():
|
||||
|
||||
@pytest.fixture
|
||||
def orchestrator():
|
||||
yield ArchivingOrchestrator()
|
||||
# hack - the loguru logger starts with one logger, but if orchestrator has run before
|
||||
# it'll remove the default logger, add it back in:
|
||||
|
||||
from loguru import logger
|
||||
|
||||
if not logger._core.handlers.get(0):
|
||||
logger._core.handlers_count = 0
|
||||
logger.add(sys.stderr)
|
||||
# and remove the custom logger
|
||||
if logger._core.handlers.get(1):
|
||||
logger.remove(1)
|
||||
|
||||
# delete out any loaded modules
|
||||
_LAZY_LOADED_MODULES.clear()
|
||||
|
||||
return ArchivingOrchestrator()
|
||||
|
||||
@pytest.fixture
|
||||
def basic_parser(orchestrator) -> ArgumentParser:
|
||||
@@ -75,18 +60,36 @@ def test_help(orchestrator, basic_parser, capsys):
|
||||
orchestrator.show_help(args)
|
||||
|
||||
assert exit_error.value.code == 0
|
||||
assert "Usage: auto-archiver [--help] [--version] [--config CONFIG_FILE]" in capsys.readouterr().out
|
||||
|
||||
logs = capsys.readouterr().out
|
||||
assert "Usage: auto-archiver [--help] [--version] [--config CONFIG_FILE]" in logs
|
||||
|
||||
# basic config options
|
||||
assert "--version" in logs
|
||||
|
||||
# setting modules options
|
||||
assert "--feeders" in logs
|
||||
assert "--extractors" in logs
|
||||
|
||||
# authentication options
|
||||
assert "--authentication" in logs
|
||||
|
||||
# logging options
|
||||
assert "--logging.level" in logs
|
||||
|
||||
# individual module configs
|
||||
assert "--gsheet_feeder.sheet_id" in logs
|
||||
|
||||
|
||||
def test_add_custom_modules_path(orchestrator, test_args):
|
||||
orchestrator.run(test_args)
|
||||
orchestrator.setup_config(test_args)
|
||||
|
||||
import auto_archiver
|
||||
assert "tests/data/test_modules/" in auto_archiver.modules.__path__
|
||||
|
||||
def test_add_custom_modules_path_invalid(orchestrator, caplog, test_args):
|
||||
|
||||
orchestrator.run(test_args + # we still need to load the real path to get the example_module
|
||||
orchestrator.setup_config(test_args + # we still need to load the real path to get the example_module
|
||||
["--module_paths", "tests/data/invalid_test_modules/"])
|
||||
|
||||
assert caplog.records[0].message == "Path 'tests/data/invalid_test_modules/' does not exist. Skipping..."
|
||||
@@ -97,7 +100,7 @@ def test_check_required_values(orchestrator, caplog, test_args):
|
||||
test_args = test_args[:-2]
|
||||
|
||||
with pytest.raises(SystemExit) as exit_error:
|
||||
orchestrator.run(test_args)
|
||||
config = orchestrator.setup_config(test_args)
|
||||
|
||||
assert caplog.records[1].message == "the following arguments are required: --example_module.required_field"
|
||||
|
||||
@@ -111,24 +114,50 @@ def test_get_required_values_from_config(orchestrator, test_args, tmp_path):
|
||||
store_yaml(test_yaml, tmp_file)
|
||||
|
||||
# run the orchestrator
|
||||
orchestrator.run(["--config", tmp_file, "--module_paths", TEST_MODULES])
|
||||
assert orchestrator.config is not None
|
||||
config = orchestrator.setup_config(["--config", tmp_file, "--module_paths", TEST_MODULES])
|
||||
assert config is not None
|
||||
|
||||
def test_load_authentication_string(orchestrator, test_args):
|
||||
|
||||
orchestrator.run(test_args + ["--authentication", '{"facebook.com": {"username": "my_username", "password": "my_password"}}'])
|
||||
assert orchestrator.config['authentication'] == {"facebook.com": {"username": "my_username", "password": "my_password"}}
|
||||
config = orchestrator.setup_config(test_args + ["--authentication", '{"facebook.com": {"username": "my_username", "password": "my_password"}}'])
|
||||
assert config['authentication'] == {"facebook.com": {"username": "my_username", "password": "my_password"}}
|
||||
|
||||
def test_load_authentication_string_concat_site(orchestrator, test_args):
|
||||
|
||||
orchestrator.run(test_args + ["--authentication", '{"x.com,twitter.com": {"api_key": "my_key"}}'])
|
||||
assert orchestrator.config['authentication'] == {"x.com": {"api_key": "my_key"},
|
||||
config = orchestrator.setup_config(test_args + ["--authentication", '{"x.com,twitter.com": {"api_key": "my_key"}}'])
|
||||
assert config['authentication'] == {"x.com": {"api_key": "my_key"},
|
||||
"twitter.com": {"api_key": "my_key"}}
|
||||
|
||||
def test_load_invalid_authentication_string(orchestrator, test_args):
|
||||
with pytest.raises(ArgumentTypeError):
|
||||
orchestrator.run(test_args + ["--authentication", "{\''invalid_json"])
|
||||
orchestrator.setup_config(test_args + ["--authentication", "{\''invalid_json"])
|
||||
|
||||
def test_load_authentication_invalid_dict(orchestrator, test_args):
|
||||
with pytest.raises(ArgumentTypeError):
|
||||
orchestrator.run(test_args + ["--authentication", "[true, false]"])
|
||||
orchestrator.setup_config(test_args + ["--authentication", "[true, false]"])
|
||||
|
||||
def test_load_modules_from_commandline(orchestrator, test_args):
|
||||
args = test_args + ["--feeders", "example_module", "--extractors", "example_module", "--databases", "example_module", "--enrichers", "example_module", "--formatters", "example_module"]
|
||||
|
||||
orchestrator.setup(args)
|
||||
|
||||
assert len(orchestrator.feeders) == 1
|
||||
assert len(orchestrator.extractors) == 1
|
||||
assert len(orchestrator.databases) == 1
|
||||
assert len(orchestrator.enrichers) == 1
|
||||
assert len(orchestrator.formatters) == 1
|
||||
|
||||
assert orchestrator.feeders[0].name == "example_module"
|
||||
assert orchestrator.extractors[0].name == "example_module"
|
||||
assert orchestrator.databases[0].name == "example_module"
|
||||
assert orchestrator.enrichers[0].name == "example_module"
|
||||
assert orchestrator.formatters[0].name == "example_module"
|
||||
|
||||
def test_load_settings_for_module_from_commandline(orchestrator, test_args):
|
||||
args = test_args + ["--feeders", "gsheet_feeder", "--gsheet_feeder.sheet_id", "123", "--gsheet_feeder.service_account", "tests/data/test_service_account.json"]
|
||||
|
||||
orchestrator.setup(args)
|
||||
|
||||
assert len(orchestrator.feeders) == 1
|
||||
assert orchestrator.feeders[0].name == "gsheet_feeder"
|
||||
assert orchestrator.config['gsheet_feeder']['sheet_id'] == "123"
|
||||
144
tests/utils/test_misc.py
Normal file
144
tests/utils/test_misc.py
Normal file
@@ -0,0 +1,144 @@
|
||||
import hashlib
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.utils.misc import (
|
||||
mkdir_if_not_exists,
|
||||
expand_url,
|
||||
getattr_or,
|
||||
DateTimeEncoder,
|
||||
dump_payload,
|
||||
get_datetime_from_str,
|
||||
update_nested_dict,
|
||||
calculate_file_hash,
|
||||
random_str,
|
||||
get_timestamp
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_file(tmp_path):
|
||||
file_path = tmp_path / "test.txt"
|
||||
file_path.write_text("test content")
|
||||
return file_path
|
||||
|
||||
|
||||
class TestDirectoryUtils:
|
||||
def test_mkdir_creates_new_directory(self, tmp_path):
|
||||
new_dir = tmp_path / "new_folder"
|
||||
mkdir_if_not_exists(new_dir)
|
||||
assert new_dir.exists()
|
||||
assert new_dir.is_dir()
|
||||
|
||||
def test_mkdir_exists_quietly(self, tmp_path):
|
||||
existing_dir = tmp_path / "existing"
|
||||
existing_dir.mkdir()
|
||||
mkdir_if_not_exists(existing_dir)
|
||||
assert existing_dir.exists()
|
||||
|
||||
class TestURLExpansion:
|
||||
@pytest.mark.parametrize("input_url,expected", [
|
||||
("https://example.com", "https://example.com"),
|
||||
("https://t.co/test", "https://expanded.url")
|
||||
])
|
||||
def test_expand_url(self, input_url, expected, mocker):
|
||||
mock_response = mocker.Mock()
|
||||
mock_response.url = "https://expanded.url"
|
||||
mocker.patch('requests.get', return_value=mock_response)
|
||||
result = expand_url(input_url)
|
||||
assert result == expected
|
||||
|
||||
def test_expand_url_handles_errors(self, caplog, mocker):
|
||||
mocker.patch('requests.get', side_effect=Exception("Connection error"))
|
||||
url = "https://t.co/error"
|
||||
result = expand_url(url)
|
||||
assert result == url
|
||||
assert f"Failed to expand url {url}" in caplog.text
|
||||
|
||||
class TestAttributeHandling:
|
||||
class Sample:
|
||||
exists = "value"
|
||||
none = None
|
||||
|
||||
@pytest.mark.parametrize("obj,attr,default,expected", [
|
||||
(Sample(), "exists", "default", "value"),
|
||||
(Sample(), "none", "default", "default"),
|
||||
(Sample(), "missing", "default", "default"),
|
||||
(None, "anything", "fallback", "fallback"),
|
||||
])
|
||||
def test_getattr_or(self, obj, attr, default, expected):
|
||||
# Test gets attribute or returns a default value
|
||||
assert getattr_or(obj, attr, default) == expected
|
||||
|
||||
class TestDateTimeHandling:
|
||||
def test_datetime_encoder(self, sample_datetime):
|
||||
result = json.dumps({"dt": sample_datetime}, cls=DateTimeEncoder)
|
||||
loaded = json.loads(result)
|
||||
assert loaded["dt"] == str(sample_datetime)
|
||||
|
||||
def test_dump_payload(self, sample_datetime):
|
||||
payload = {"timestamp": sample_datetime}
|
||||
result = dump_payload(payload)
|
||||
assert str(sample_datetime) in result
|
||||
|
||||
@pytest.mark.parametrize("dt_str,fmt,expected", [
|
||||
("2023-01-01 12:00:00+00:00", None, datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc)),
|
||||
("20230101 120000", "%Y%m%d %H%M%S", datetime(2023, 1, 1, 12, 0)),
|
||||
("invalid", None, None),
|
||||
])
|
||||
def test_datetime_from_string(self, dt_str, fmt, expected):
|
||||
result = get_datetime_from_str(dt_str, fmt)
|
||||
if expected is None:
|
||||
assert result is None
|
||||
else:
|
||||
assert result == expected.replace(tzinfo=result.tzinfo)
|
||||
|
||||
class TestDictUtils:
|
||||
@pytest.mark.parametrize("original,update,expected", [
|
||||
({"a": 1}, {"b": 2}, {"a": 1, "b": 2}),
|
||||
({"nested": {"a": 1}}, {"nested": {"b": 2}}, {"nested": {"a": 1, "b": 2}}),
|
||||
({"a": {"b": {"c": 1}}}, {"a": {"b": {"c": 2}}}, {"a": {"b": {"c": 2}}}),
|
||||
])
|
||||
def test_update_nested_dict(self, original, update, expected):
|
||||
update_nested_dict(original, update)
|
||||
assert original == expected
|
||||
|
||||
class TestHashingUtils:
|
||||
def test_file_hashing(self, sample_file):
|
||||
expected = hashlib.sha256(b"test content").hexdigest()
|
||||
assert calculate_file_hash(str(sample_file)) == expected
|
||||
|
||||
def test_large_file_hashing(self, tmp_path):
|
||||
file_path = tmp_path / "large.bin"
|
||||
content = b"0" * 16_000_000 * 2 # 32MB
|
||||
file_path.write_bytes(content)
|
||||
|
||||
expected = hashlib.sha256(content).hexdigest()
|
||||
assert calculate_file_hash(str(file_path)) == expected
|
||||
|
||||
class TestMiscUtils:
|
||||
def test_random_str_length(self):
|
||||
for length in [8, 16, 32]:
|
||||
assert len(random_str(length)) == length
|
||||
|
||||
def test_random_str_raises_too_long(self):
|
||||
with pytest.raises(AssertionError) as exc_info:
|
||||
random_str(64)
|
||||
assert "length must be less than 32 as UUID4 is used" == str(exc_info.value)
|
||||
|
||||
def test_random_str_uniqueness(self):
|
||||
assert random_str() != random_str()
|
||||
|
||||
@pytest.mark.parametrize("ts_input,utc,iso,expected_type", [
|
||||
(datetime.now(), True, True, str),
|
||||
("2023-01-01T12:00:00+00:00", False, False, datetime),
|
||||
(1672574400, True, True, str),
|
||||
])
|
||||
def test_timestamp_parsing(self, ts_input, utc, iso, expected_type):
|
||||
result = get_timestamp(ts_input, utc=utc, iso=iso)
|
||||
assert isinstance(result, expected_type)
|
||||
|
||||
def test_invalid_timestamp_returns_none(self):
|
||||
assert get_timestamp("invalid-date") is None
|
||||
Reference in New Issue
Block a user