mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-07 19:08:30 +03:00
adding missing tests (no download)
This commit is contained in:
1
tests/core/__init__.py
Normal file
1
tests/core/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Core module tests
|
||||
198
tests/core/test_media.py
Normal file
198
tests/core/test_media.py
Normal file
@@ -0,0 +1,198 @@
|
||||
"""
|
||||
Tests for the Media class from auto_archiver.core.media
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
from auto_archiver.core.media import Media
|
||||
|
||||
|
||||
class TestMediaBasics:
|
||||
"""Test basic Media properties and methods."""
|
||||
|
||||
def test_media_creation_with_filename(self):
|
||||
media = Media(filename="test.mp4")
|
||||
assert media.filename == "test.mp4"
|
||||
assert media.urls == []
|
||||
assert media.properties == {}
|
||||
|
||||
def test_media_key_property(self):
|
||||
media = Media(filename="test.mp4", _key="my_key")
|
||||
assert media.key == "my_key"
|
||||
|
||||
def test_media_set_get_properties(self):
|
||||
media = Media(filename="test.mp4")
|
||||
result = media.set("author", "John Doe")
|
||||
assert result is media # returns self for chaining
|
||||
assert media.get("author") == "John Doe"
|
||||
assert media.get("nonexistent") is None
|
||||
assert media.get("nonexistent", "default") == "default"
|
||||
|
||||
def test_media_add_url(self):
|
||||
media = Media(filename="test.mp4")
|
||||
media.add_url("https://example.com/test.mp4")
|
||||
assert "https://example.com/test.mp4" in media.urls
|
||||
media.add_url("https://cdn.example.com/test.mp4")
|
||||
assert len(media.urls) == 2
|
||||
|
||||
|
||||
class TestMediaMimetype:
|
||||
"""Test mimetype detection and handling."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"filename,expected_mimetype",
|
||||
[
|
||||
("video.mp4", "video/mp4"),
|
||||
("image.jpg", "image/jpeg"),
|
||||
("image.png", "image/png"),
|
||||
("audio.mp3", "audio/mpeg"),
|
||||
("document.pdf", "application/pdf"),
|
||||
("text.txt", "text/plain"),
|
||||
],
|
||||
)
|
||||
def test_mimetype_detection(self, filename, expected_mimetype):
|
||||
media = Media(filename=filename)
|
||||
assert media.mimetype == expected_mimetype
|
||||
|
||||
def test_mimetype_setter(self):
|
||||
media = Media(filename="file.unknown")
|
||||
media.mimetype = "custom/type"
|
||||
assert media.mimetype == "custom/type"
|
||||
|
||||
def test_mimetype_empty_filename(self):
|
||||
media = Media(filename="")
|
||||
assert media.mimetype == ""
|
||||
|
||||
|
||||
class TestMediaTypeChecks:
|
||||
"""Test media type checking methods."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"filename,is_video,is_audio,is_image",
|
||||
[
|
||||
("video.mp4", True, False, False),
|
||||
("video.avi", True, False, False),
|
||||
("audio.mp3", False, True, False),
|
||||
("audio.wav", False, True, False),
|
||||
("image.jpg", False, False, True),
|
||||
("image.png", False, False, True),
|
||||
("document.pdf", False, False, False),
|
||||
],
|
||||
)
|
||||
def test_type_checks(self, filename, is_video, is_audio, is_image):
|
||||
media = Media(filename=filename)
|
||||
assert media.is_video() == is_video
|
||||
assert media.is_audio() == is_audio
|
||||
assert media.is_image() == is_image
|
||||
|
||||
|
||||
class TestMediaStore:
|
||||
"""Test media storage functionality."""
|
||||
|
||||
def test_store_with_no_storages(self, caplog):
|
||||
media = Media(filename="test.mp4")
|
||||
metadata = Mock()
|
||||
media.store(metadata, storages=[])
|
||||
assert "No storages found" in caplog.text
|
||||
|
||||
def test_store_with_storage(self):
|
||||
media = Media(filename="test.mp4")
|
||||
metadata = Mock()
|
||||
mock_storage = Mock()
|
||||
media.store(metadata, url="https://example.com", storages=[mock_storage])
|
||||
mock_storage.store.assert_called_once()
|
||||
|
||||
|
||||
class TestMediaInnerMedia:
|
||||
"""Test nested media retrieval."""
|
||||
|
||||
def test_all_inner_media_no_nested(self):
|
||||
media = Media(filename="test.mp4")
|
||||
inner = list(media.all_inner_media(include_self=False))
|
||||
assert len(inner) == 0
|
||||
|
||||
inner_with_self = list(media.all_inner_media(include_self=True))
|
||||
assert len(inner_with_self) == 1
|
||||
assert inner_with_self[0] is media
|
||||
|
||||
def test_all_inner_media_with_nested(self):
|
||||
parent = Media(filename="parent.mp4")
|
||||
child = Media(filename="child.jpg")
|
||||
grandchild = Media(filename="grandchild.png")
|
||||
|
||||
child.set("thumbnail", grandchild)
|
||||
parent.set("preview", child)
|
||||
|
||||
inner = list(parent.all_inner_media(include_self=False))
|
||||
assert len(inner) == 2
|
||||
assert child in inner
|
||||
assert grandchild in inner
|
||||
|
||||
def test_all_inner_media_with_list_property(self):
|
||||
parent = Media(filename="parent.mp4")
|
||||
child1 = Media(filename="frame1.jpg")
|
||||
child2 = Media(filename="frame2.jpg")
|
||||
|
||||
parent.set("frames", [child1, child2])
|
||||
|
||||
inner = list(parent.all_inner_media(include_self=False))
|
||||
assert len(inner) == 2
|
||||
assert child1 in inner
|
||||
assert child2 in inner
|
||||
|
||||
|
||||
class TestMediaIsStored:
|
||||
"""Test the is_stored method."""
|
||||
|
||||
def test_is_stored_no_urls(self):
|
||||
media = Media(filename="test.mp4")
|
||||
storage = Mock()
|
||||
storage.config = {"steps": {"storages": ["s3", "local"]}}
|
||||
assert media.is_stored(storage) is False
|
||||
|
||||
def test_is_stored_partial_urls(self):
|
||||
media = Media(filename="test.mp4")
|
||||
media.add_url("https://s3.example.com/test.mp4")
|
||||
storage = Mock()
|
||||
storage.config = {"steps": {"storages": ["s3", "local"]}}
|
||||
assert media.is_stored(storage) is False
|
||||
|
||||
def test_is_stored_full_urls(self):
|
||||
media = Media(filename="test.mp4")
|
||||
media.add_url("https://s3.example.com/test.mp4")
|
||||
media.add_url("file:///local/test.mp4")
|
||||
storage = Mock()
|
||||
storage.config = {"steps": {"storages": ["s3", "local"]}}
|
||||
assert media.is_stored(storage) is True
|
||||
|
||||
|
||||
class TestMediaValidVideo:
|
||||
"""Test video validation functionality."""
|
||||
|
||||
def test_is_valid_video_with_valid_probe(self):
|
||||
media = Media(filename="test.mp4")
|
||||
|
||||
mock_streams = {"streams": [{"duration_ts": 1000}]}
|
||||
|
||||
with patch("ffmpeg.probe", return_value=mock_streams):
|
||||
assert media.is_valid_video() is True
|
||||
|
||||
def test_is_valid_video_with_no_duration(self):
|
||||
media = Media(filename="test.mp4")
|
||||
|
||||
mock_streams = {"streams": [{"duration_ts": 0}]}
|
||||
|
||||
with patch("ffmpeg.probe", return_value=mock_streams):
|
||||
assert media.is_valid_video() is False
|
||||
|
||||
def test_is_valid_video_with_ffmpeg_error(self):
|
||||
media = Media(filename="test.mp4")
|
||||
|
||||
with patch("ffmpeg.probe", side_effect=Exception("ffmpeg error")):
|
||||
with patch("os.path.getsize", return_value=100):
|
||||
# Falls back to file size check, small file
|
||||
assert media.is_valid_video() is False
|
||||
|
||||
with patch("os.path.getsize", return_value=30000):
|
||||
# Falls back to file size check, larger file
|
||||
assert media.is_valid_video() is True
|
||||
98
tests/core/test_validators.py
Normal file
98
tests/core/test_validators.py
Normal file
@@ -0,0 +1,98 @@
|
||||
"""
|
||||
Tests for validators module from auto_archiver.core.validators
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core.validators import positive_number, valid_file, json_loader
|
||||
|
||||
|
||||
class TestPositiveNumber:
|
||||
"""Test the positive_number validator."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"value,expected",
|
||||
[
|
||||
(0, 0),
|
||||
(1, 1),
|
||||
(100, 100),
|
||||
(0.5, 0.5),
|
||||
(999999, 999999),
|
||||
],
|
||||
)
|
||||
def test_positive_values(self, value, expected):
|
||||
assert positive_number(value) == expected
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"value",
|
||||
[
|
||||
-1,
|
||||
-100,
|
||||
-0.5,
|
||||
-999999,
|
||||
],
|
||||
)
|
||||
def test_negative_values_raise_error(self, value):
|
||||
with pytest.raises(argparse.ArgumentTypeError) as exc_info:
|
||||
positive_number(value)
|
||||
assert "not a positive number" in str(exc_info.value)
|
||||
|
||||
|
||||
class TestValidFile:
|
||||
"""Test the valid_file validator."""
|
||||
|
||||
def test_valid_file_exists(self, tmp_path):
|
||||
test_file = tmp_path / "test.txt"
|
||||
test_file.write_text("test content")
|
||||
result = valid_file(str(test_file))
|
||||
assert result == str(test_file)
|
||||
|
||||
def test_valid_file_not_exists(self):
|
||||
with pytest.raises(argparse.ArgumentTypeError) as exc_info:
|
||||
valid_file("/nonexistent/path/to/file.txt")
|
||||
assert "does not exist" in str(exc_info.value)
|
||||
|
||||
def test_valid_file_directory_not_file(self, tmp_path):
|
||||
# A directory is not a file
|
||||
with pytest.raises(argparse.ArgumentTypeError) as exc_info:
|
||||
valid_file(str(tmp_path))
|
||||
assert "does not exist" in str(exc_info.value)
|
||||
|
||||
|
||||
class TestJsonLoader:
|
||||
"""Test the json_loader validator."""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"json_str,expected",
|
||||
[
|
||||
('{"key": "value"}', {"key": "value"}),
|
||||
('{"number": 123}', {"number": 123}),
|
||||
('{"list": [1, 2, 3]}', {"list": [1, 2, 3]}),
|
||||
('{"nested": {"inner": "value"}}', {"nested": {"inner": "value"}}),
|
||||
("[]", []),
|
||||
("[1, 2, 3]", [1, 2, 3]),
|
||||
('"string"', "string"),
|
||||
("123", 123),
|
||||
("true", True),
|
||||
("false", False),
|
||||
("null", None),
|
||||
],
|
||||
)
|
||||
def test_valid_json(self, json_str, expected):
|
||||
assert json_loader(json_str) == expected
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"invalid_json",
|
||||
[
|
||||
"{invalid}",
|
||||
"{'single': 'quotes'}",
|
||||
"{missing: quotes}",
|
||||
'{"unclosed": "brace"',
|
||||
"",
|
||||
],
|
||||
)
|
||||
def test_invalid_json_raises_error(self, invalid_json):
|
||||
with pytest.raises(json.JSONDecodeError):
|
||||
json_loader(invalid_json)
|
||||
62
tests/databases/test_console_db.py
Normal file
62
tests/databases/test_console_db.py
Normal file
@@ -0,0 +1,62 @@
|
||||
"""
|
||||
Tests for the ConsoleDb module
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def console_db(setup_module):
|
||||
return setup_module("console_db")
|
||||
|
||||
|
||||
class TestConsoleDb:
|
||||
"""Test the ConsoleDb functionality."""
|
||||
|
||||
def test_started_logs_info(self, console_db, make_item, caplog):
|
||||
"""Test that started() logs an info message."""
|
||||
item = make_item("https://example.com/test")
|
||||
|
||||
with caplog.at_level("INFO"):
|
||||
console_db.started(item)
|
||||
|
||||
assert "STARTED" in caplog.text
|
||||
assert "example.com" in caplog.text
|
||||
|
||||
def test_failed_logs_error(self, console_db, make_item, caplog):
|
||||
"""Test that failed() logs an error message with reason."""
|
||||
item = make_item("https://example.com/test")
|
||||
reason = "Connection timeout"
|
||||
|
||||
with caplog.at_level("ERROR"):
|
||||
console_db.failed(item, reason)
|
||||
|
||||
assert "FAILED" in caplog.text
|
||||
assert "Connection timeout" in caplog.text
|
||||
|
||||
def test_aborted_logs_warning(self, console_db, make_item, caplog):
|
||||
"""Test that aborted() logs a warning message."""
|
||||
item = make_item("https://example.com/test")
|
||||
|
||||
with caplog.at_level("WARNING"):
|
||||
console_db.aborted(item)
|
||||
|
||||
assert "ABORTED" in caplog.text
|
||||
|
||||
def test_done_logs_success(self, console_db, make_item, caplog):
|
||||
"""Test that done() logs a success message."""
|
||||
item = make_item("https://example.com/test")
|
||||
|
||||
with caplog.at_level("INFO"):
|
||||
console_db.done(item)
|
||||
|
||||
assert "DONE" in caplog.text
|
||||
|
||||
def test_done_cached(self, console_db, make_item, caplog):
|
||||
"""Test done() with cached=True (should behave the same)."""
|
||||
item = make_item("https://example.com/test")
|
||||
|
||||
with caplog.at_level("INFO"):
|
||||
console_db.done(item, cached=True)
|
||||
|
||||
assert "DONE" in caplog.text
|
||||
72
tests/enrichers/test_json_enricher.py
Normal file
72
tests/enrichers/test_json_enricher.py
Normal file
@@ -0,0 +1,72 @@
|
||||
"""
|
||||
Tests for the JsonEnricher module
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def json_enricher(setup_module):
|
||||
return setup_module("json_enricher")
|
||||
|
||||
|
||||
class TestJsonEnricher:
|
||||
"""Test the JsonEnricher functionality."""
|
||||
|
||||
def test_enrich_creates_json_file(self, json_enricher, make_item):
|
||||
"""Test that enrich creates a metadata.json file."""
|
||||
item = make_item("https://example.com/test")
|
||||
item.set("title", "Test Title")
|
||||
item.set("description", "Test description")
|
||||
|
||||
json_enricher.enrich(item)
|
||||
|
||||
# Check that a media with id 'metadata_json' was added
|
||||
json_media = item.get_media_by_id("metadata_json")
|
||||
assert json_media is not None
|
||||
assert json_media.filename.endswith("metadata.json")
|
||||
assert os.path.exists(json_media.filename)
|
||||
|
||||
def test_enrich_json_content(self, json_enricher, make_item):
|
||||
"""Test that the JSON content is correct."""
|
||||
item = make_item("https://example.com/test")
|
||||
item.set("title", "Test Title")
|
||||
item.set("custom_field", "custom_value")
|
||||
|
||||
json_enricher.enrich(item)
|
||||
|
||||
json_media = item.get_media_by_id("metadata_json")
|
||||
with open(json_media.filename, "r", encoding="utf-8") as f:
|
||||
content = json.load(f)
|
||||
|
||||
# The to_dict() returns nested structure: {status, metadata: {...}, media: [...]}
|
||||
assert content["metadata"]["title"] == "Test Title"
|
||||
assert content["metadata"]["custom_field"] == "custom_value"
|
||||
assert content["metadata"]["url"] == "https://example.com/test"
|
||||
|
||||
def test_enrich_handles_special_characters(self, json_enricher, make_item):
|
||||
"""Test that special characters are handled correctly."""
|
||||
item = make_item("https://example.com/test")
|
||||
item.set("title", "Test with émojis 🎉 and üñíçödé")
|
||||
|
||||
json_enricher.enrich(item)
|
||||
|
||||
json_media = item.get_media_by_id("metadata_json")
|
||||
with open(json_media.filename, "r", encoding="utf-8") as f:
|
||||
content = json.load(f)
|
||||
|
||||
# Access the nested metadata structure
|
||||
assert "émojis 🎉" in content["metadata"]["title"]
|
||||
assert "üñíçödé" in content["metadata"]["title"]
|
||||
|
||||
def test_enrich_empty_metadata(self, json_enricher, make_item):
|
||||
"""Test enriching metadata with minimal content."""
|
||||
item = make_item("https://example.com/minimal")
|
||||
|
||||
json_enricher.enrich(item)
|
||||
|
||||
json_media = item.get_media_by_id("metadata_json")
|
||||
assert json_media is not None
|
||||
assert os.path.exists(json_media.filename)
|
||||
70
tests/feeders/test_cli_feeder.py
Normal file
70
tests/feeders/test_cli_feeder.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""
|
||||
Tests for the CLIFeeder module
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.modules.cli_feeder.cli_feeder import CLIFeeder
|
||||
from auto_archiver.core.consts import SetupError
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cli_feeder_instance():
|
||||
"""Create a CLIFeeder instance with mocked config."""
|
||||
|
||||
def _create(urls):
|
||||
feeder = CLIFeeder()
|
||||
# Mock the config structure that cli_feeder expects
|
||||
feeder.config = {"urls": urls}
|
||||
feeder.name = "cli_feeder"
|
||||
feeder.tmp_dir = "/tmp"
|
||||
return feeder
|
||||
|
||||
return _create
|
||||
|
||||
|
||||
class TestCLIFeeder:
|
||||
"""Test the CLIFeeder functionality."""
|
||||
|
||||
def test_iter_yields_metadata_for_urls(self, cli_feeder_instance):
|
||||
"""Test that iteration yields Metadata objects for each URL."""
|
||||
urls = ["https://example.com/1", "https://example.com/2", "https://example.com/3"]
|
||||
feeder = cli_feeder_instance(urls)
|
||||
feeder.setup()
|
||||
|
||||
items = list(feeder)
|
||||
|
||||
assert len(items) == 3
|
||||
assert all(isinstance(item, Metadata) for item in items)
|
||||
assert items[0].get_url() == "https://example.com/1"
|
||||
assert items[1].get_url() == "https://example.com/2"
|
||||
assert items[2].get_url() == "https://example.com/3"
|
||||
|
||||
def test_iter_single_url(self, cli_feeder_instance):
|
||||
"""Test iteration with a single URL."""
|
||||
feeder = cli_feeder_instance(["https://example.com/single"])
|
||||
feeder.setup()
|
||||
|
||||
items = list(feeder)
|
||||
|
||||
assert len(items) == 1
|
||||
assert items[0].get_url() == "https://example.com/single"
|
||||
|
||||
def test_setup_raises_without_urls(self, cli_feeder_instance):
|
||||
"""Test that setup raises SetupError when no URLs provided."""
|
||||
feeder = cli_feeder_instance([])
|
||||
|
||||
with pytest.raises(SetupError) as exc_info:
|
||||
feeder.setup()
|
||||
|
||||
assert "No URLs provided" in str(exc_info.value)
|
||||
|
||||
def test_setup_raises_with_none_urls(self, cli_feeder_instance):
|
||||
"""Test that setup raises SetupError when urls is None."""
|
||||
feeder = cli_feeder_instance(None)
|
||||
|
||||
with pytest.raises(SetupError) as exc_info:
|
||||
feeder.setup()
|
||||
|
||||
assert "No URLs provided" in str(exc_info.value)
|
||||
43
tests/formatters/test_mute_formatter.py
Normal file
43
tests/formatters/test_mute_formatter.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""
|
||||
Tests for the MuteFormatter module
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mute_formatter(setup_module):
|
||||
return setup_module("mute_formatter")
|
||||
|
||||
|
||||
class TestMuteFormatter:
|
||||
"""Test the MuteFormatter functionality."""
|
||||
|
||||
def test_format_returns_none(self, mute_formatter, make_item):
|
||||
"""Test that format always returns None (mutes output)."""
|
||||
item = make_item("https://example.com/test")
|
||||
item.set("title", "Test Title")
|
||||
|
||||
result = mute_formatter.format(item)
|
||||
|
||||
assert result is None
|
||||
|
||||
def test_format_with_empty_metadata(self, mute_formatter):
|
||||
"""Test format with empty metadata."""
|
||||
item = Metadata().set_url("https://example.com/empty")
|
||||
|
||||
result = mute_formatter.format(item)
|
||||
|
||||
assert result is None
|
||||
|
||||
def test_format_with_media(self, mute_formatter, make_item):
|
||||
"""Test that format still returns None even with media attached."""
|
||||
from auto_archiver.core.media import Media
|
||||
|
||||
item = make_item("https://example.com/with-media")
|
||||
item.add_media(Media(filename="test.mp4"))
|
||||
|
||||
result = mute_formatter.format(item)
|
||||
|
||||
assert result is None
|
||||
Reference in New Issue
Block a user