mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-08 03:18:28 +03:00
Merge branch 'load_modules' into timestamping_rewrite
This commit is contained in:
@@ -1,6 +0,0 @@
|
||||
import tempfile
|
||||
|
||||
from auto_archiver.core.context import ArchivingContext
|
||||
|
||||
ArchivingContext.reset(full_reset=True)
|
||||
ArchivingContext.set_tmp_dir(tempfile.gettempdir())
|
||||
@@ -1,7 +1,9 @@
|
||||
"""
|
||||
pytest conftest file, for shared fixtures and configuration
|
||||
"""
|
||||
|
||||
import os
|
||||
import pickle
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import Dict, Tuple
|
||||
import hashlib
|
||||
import pytest
|
||||
@@ -23,13 +25,15 @@ def setup_module(request):
|
||||
# if the class does not have a .name, use the name of the parent folder
|
||||
module_name = module_name.__module__.rsplit(".",2)[-2]
|
||||
|
||||
m = get_module(module_name).load()
|
||||
m.name = module_name
|
||||
m.setup({module_name : config})
|
||||
m = get_module(module_name, {module_name: config})
|
||||
|
||||
# add the tmp_dir to the module
|
||||
tmp_dir = TemporaryDirectory()
|
||||
m.tmp_dir = tmp_dir.name
|
||||
|
||||
def cleanup():
|
||||
_LAZY_LOADED_MODULES.pop(module_name)
|
||||
tmp_dir.cleanup()
|
||||
request.addfinalizer(cleanup)
|
||||
|
||||
return m
|
||||
@@ -110,4 +114,18 @@ def pytest_runtest_setup(item):
|
||||
test_name = _test_failed_incremental[cls_name].get((), None)
|
||||
# if name found, test has failed for the combination of class name & test name
|
||||
if test_name is not None:
|
||||
pytest.xfail(f"previous test failed ({test_name})")
|
||||
pytest.xfail(f"previous test failed ({test_name})")
|
||||
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def unpickle():
|
||||
"""
|
||||
Returns a helper function that unpickles a file
|
||||
** gets the file from the test_files directory: tests/data/test_files **
|
||||
"""
|
||||
def _unpickle(path):
|
||||
test_data_dir = os.path.join(os.path.dirname(__file__), "data", "test_files")
|
||||
with open(os.path.join(test_data_dir, path), "rb") as f:
|
||||
return pickle.load(f)
|
||||
return _unpickle
|
||||
2
tests/data/csv_no_headers.csv
Normal file
2
tests/data/csv_no_headers.csv
Normal file
@@ -0,0 +1,2 @@
|
||||
https://example.com/1/,data 1
|
||||
https://example.com/2/,data 2
|
||||
|
3
tests/data/csv_with_headers.csv
Normal file
3
tests/data/csv_with_headers.csv
Normal file
@@ -0,0 +1,3 @@
|
||||
webpages,other data
|
||||
https://example.com/1/,data 1
|
||||
https://example.com/2/,data 2
|
||||
|
1
tests/data/test_modules/example_module/__init__.py
Normal file
1
tests/data/test_modules/example_module/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .example_module import ExampleModule
|
||||
11
tests/data/test_modules/example_module/__manifest__.py
Normal file
11
tests/data/test_modules/example_module/__manifest__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"name": "Example Module",
|
||||
"type": ["extractor", "feeder", "formatter", "storage", "enricher", "database"],
|
||||
"requires_setup": False,
|
||||
"dependencies": {"python": ["loguru"]
|
||||
},
|
||||
"configs": {
|
||||
"csv_file": {"default": "db.csv", "help": "CSV file name"},
|
||||
"required_field": {"required": True, "help": "required field in the CSV file"},
|
||||
},
|
||||
}
|
||||
28
tests/data/test_modules/example_module/example_module.py
Normal file
28
tests/data/test_modules/example_module/example_module.py
Normal file
@@ -0,0 +1,28 @@
|
||||
from auto_archiver.core import Extractor, Enricher, Feeder, Database, Storage, Formatter, Metadata
|
||||
|
||||
class ExampleModule(Extractor, Enricher, Feeder, Database, Storage, Formatter):
|
||||
def download(self, item):
|
||||
print("download")
|
||||
|
||||
def __iter__(self):
|
||||
yield Metadata().set_url("https://example.com")
|
||||
|
||||
|
||||
def done(self, result):
|
||||
print("done")
|
||||
|
||||
def enrich(self, to_enrich):
|
||||
print("enrich")
|
||||
|
||||
def get_cdn_url(self, media):
|
||||
return "nice_url"
|
||||
|
||||
def save(self, item):
|
||||
print("save")
|
||||
|
||||
def uploadf(self, file, key, **kwargs):
|
||||
print("uploadf")
|
||||
|
||||
|
||||
def format(self, item):
|
||||
print("format")
|
||||
16
tests/data/test_orchestration.yaml
Normal file
16
tests/data/test_orchestration.yaml
Normal file
@@ -0,0 +1,16 @@
|
||||
steps:
|
||||
feeders:
|
||||
- example_module
|
||||
extractors:
|
||||
- example_module
|
||||
formatters:
|
||||
- example_module
|
||||
storages:
|
||||
- example_module
|
||||
databases:
|
||||
- example_module
|
||||
enrichers:
|
||||
- example_module
|
||||
|
||||
|
||||
# Global configuration
|
||||
142
tests/databases/test_gsheet_db.py
Normal file
142
tests/databases/test_gsheet_db.py
Normal file
@@ -0,0 +1,142 @@
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.modules.gsheet_db import GsheetsDb
|
||||
from auto_archiver.modules.gsheet_feeder import GWorksheet
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_gworksheet():
|
||||
mock_gworksheet = MagicMock(spec=GWorksheet)
|
||||
mock_gworksheet.col_exists.return_value = True
|
||||
mock_gworksheet.get_cell.return_value = ""
|
||||
mock_gworksheet.get_row.return_value = {}
|
||||
return mock_gworksheet
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_metadata():
|
||||
metadata: Metadata = MagicMock(spec=Metadata)
|
||||
metadata.get_url.return_value = "http://example.com"
|
||||
metadata.status = "done"
|
||||
metadata.get_title.return_value = "Example Title"
|
||||
metadata.get.return_value = "Example Content"
|
||||
metadata.get_timestamp.return_value = "2025-01-01T00:00:00"
|
||||
metadata.get_final_media.return_value = MagicMock(spec=Media)
|
||||
metadata.get_all_media.return_value = []
|
||||
metadata.get_media_by_id.return_value = None
|
||||
metadata.get_first_image.return_value = None
|
||||
return metadata
|
||||
|
||||
@pytest.fixture
|
||||
def metadata():
|
||||
metadata = Metadata()
|
||||
metadata.add_media(Media(filename="screenshot", urls=["http://example.com/screenshot.png"]))
|
||||
metadata.add_media(Media(filename="browsertrix", urls=["http://example.com/browsertrix.wacz"]))
|
||||
metadata.add_media(Media(filename="thumbnail", urls=["http://example.com/thumbnail.png"]))
|
||||
metadata.set_url("http://example.com")
|
||||
metadata.set_title("Example Title")
|
||||
metadata.set_content("Example Content")
|
||||
metadata.success("my-archiver")
|
||||
metadata.set("timestamp", "2025-01-01T00:00:00")
|
||||
metadata.set("date", "2025-02-04T18:22:24.909112+00:00")
|
||||
return metadata
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_media():
|
||||
"""Fixture for a mock Media object."""
|
||||
mock_media = MagicMock(spec=Media)
|
||||
mock_media.urls = ["http://example.com/media"]
|
||||
mock_media.get.return_value = "not-calculated"
|
||||
return mock_media
|
||||
|
||||
@pytest.fixture
|
||||
def gsheets_db(mock_gworksheet, setup_module):
|
||||
db = setup_module("gsheet_db", {
|
||||
"allow_worksheets": "set()",
|
||||
"block_worksheets": "set()",
|
||||
"use_sheet_names_in_stored_paths": "True",
|
||||
})
|
||||
db._retrieve_gsheet = MagicMock(return_value=(mock_gworksheet, 1))
|
||||
return db
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fixed_timestamp():
|
||||
"""Fixture for a fixed timestamp."""
|
||||
return datetime(2025, 1, 1, tzinfo=timezone.utc)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def expected_calls(mock_media, fixed_timestamp):
|
||||
"""Fixture for the expected cell updates."""
|
||||
return [
|
||||
(1, 'status', 'my-archiver: success'),
|
||||
(1, 'archive', 'http://example.com/screenshot.png'),
|
||||
(1, 'date', '2025-02-01T00:00:00+00:00'),
|
||||
(1, 'title', 'Example Title'),
|
||||
(1, 'text', 'Example Content'),
|
||||
(1, 'timestamp', '2025-01-01T00:00:00+00:00'),
|
||||
(1, 'hash', 'not-calculated'),
|
||||
# (1, 'screenshot', 'http://example.com/screenshot.png'),
|
||||
# (1, 'thumbnail', '=IMAGE("http://example.com/thumbnail.png")'),
|
||||
# (1, 'wacz', 'http://example.com/browsertrix.wacz'),
|
||||
# (1, 'replaywebpage', 'https://replayweb.page/?source=http%3A%2F%2Fexample.com%2Fbrowsertrix.wacz#view=pages&url=')
|
||||
]
|
||||
|
||||
def test_retrieve_gsheet(gsheets_db, metadata, mock_gworksheet):
|
||||
gw, row = gsheets_db._retrieve_gsheet(metadata)
|
||||
assert gw == mock_gworksheet
|
||||
assert row == 1
|
||||
|
||||
|
||||
def test_started(gsheets_db, mock_metadata, mock_gworksheet):
|
||||
gsheets_db.started(mock_metadata)
|
||||
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', 'Archive in progress')
|
||||
|
||||
def test_failed(gsheets_db, mock_metadata, mock_gworksheet):
|
||||
reason = "Test failure"
|
||||
gsheets_db.failed(mock_metadata, reason)
|
||||
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', f'Archive failed {reason}')
|
||||
|
||||
|
||||
def test_aborted(gsheets_db, mock_metadata, mock_gworksheet):
|
||||
gsheets_db.aborted(mock_metadata)
|
||||
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', '')
|
||||
|
||||
|
||||
def test_done(gsheets_db, metadata, mock_gworksheet, expected_calls):
|
||||
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00'):
|
||||
gsheets_db.done(metadata)
|
||||
mock_gworksheet.batch_set_cell.assert_called_once_with(expected_calls)
|
||||
|
||||
|
||||
def test_done_cached(gsheets_db, metadata, mock_gworksheet):
|
||||
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp", return_value='2025-02-01T00:00:00+00:00'):
|
||||
gsheets_db.done(metadata, cached=True)
|
||||
|
||||
# Verify the status message includes "[cached]"
|
||||
call_args = mock_gworksheet.batch_set_cell.call_args[0][0]
|
||||
assert any(call[2].startswith("[cached]") for call in call_args)
|
||||
|
||||
|
||||
def test_done_missing_media(gsheets_db, metadata, mock_gworksheet):
|
||||
# clear media from metadata
|
||||
metadata.media = []
|
||||
with patch("auto_archiver.modules.gsheet_db.gsheet_db.get_current_timestamp",
|
||||
return_value='2025-02-01T00:00:00+00:00'):
|
||||
gsheets_db.done(metadata)
|
||||
# Verify nothing media-related gets updated
|
||||
call_args = mock_gworksheet.batch_set_cell.call_args[0][0]
|
||||
media_fields = {'archive', 'screenshot', 'thumbnail', 'wacz', 'replaywebpage'}
|
||||
assert all(call[1] not in media_fields for call in call_args)
|
||||
|
||||
def test_safe_status_update(gsheets_db, metadata, mock_gworksheet):
|
||||
gsheets_db._safe_status_update(metadata, "Test status")
|
||||
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', 'Test status')
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ import pytest
|
||||
|
||||
from auto_archiver.modules.hash_enricher import HashEnricher
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.core.module import get_module
|
||||
from auto_archiver.core.module import get_module_lazy
|
||||
|
||||
@pytest.mark.parametrize("algorithm, filename, expected_hash", [
|
||||
("SHA-256", "tests/data/testfile_1.txt", "1b4f0e9851971998e732078544c96b36c3d01cedf7caa332359d6f1d83567014"),
|
||||
@@ -12,7 +12,7 @@ from auto_archiver.core.module import get_module
|
||||
])
|
||||
def test_calculate_hash(algorithm, filename, expected_hash, setup_module):
|
||||
# test SHA-256
|
||||
he = setup_module(HashEnricher, {"algorithm": algorithm, "chunksize": 1})
|
||||
he = setup_module(HashEnricher, {"algorithm": algorithm, "chunksize": 100})
|
||||
assert he.calculate_hash(filename) == expected_hash
|
||||
|
||||
def test_default_config_values(setup_module):
|
||||
@@ -22,7 +22,7 @@ def test_default_config_values(setup_module):
|
||||
|
||||
def test_config():
|
||||
# test default config
|
||||
c = get_module('hash_enricher').configs
|
||||
c = get_module_lazy('hash_enricher').configs
|
||||
assert c["algorithm"]["default"] == "SHA-256"
|
||||
assert c["chunksize"]["default"] == 16000000
|
||||
assert c["algorithm"]["choices"] == ["SHA-256", "SHA3-512"]
|
||||
|
||||
103
tests/enrichers/test_meta_enricher.py
Normal file
103
tests/enrichers/test_meta_enricher.py
Normal file
@@ -0,0 +1,103 @@
|
||||
import datetime
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.modules.meta_enricher import MetaEnricher
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_metadata():
|
||||
"""Creates a mock Metadata object."""
|
||||
mock: Metadata = MagicMock(spec=Metadata)
|
||||
mock.get_url.return_value = "https://example.com"
|
||||
mock.is_empty.return_value = False # Default to not empty
|
||||
mock.get_all_media.return_value = []
|
||||
return mock
|
||||
|
||||
@pytest.fixture
|
||||
def mock_media():
|
||||
"""Creates a mock Media object."""
|
||||
mock: Media = MagicMock(spec=Media)
|
||||
mock.filename = "mock_file.txt"
|
||||
return mock
|
||||
|
||||
@pytest.fixture
|
||||
def metadata():
|
||||
m = Metadata()
|
||||
m.set_url("https://example.com")
|
||||
m.set_title("Test Title")
|
||||
m.set_content("Test Content")
|
||||
return m
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def meta_enricher(setup_module):
|
||||
return setup_module(MetaEnricher, {})
|
||||
|
||||
|
||||
def test_enrich_skips_empty_metadata(meta_enricher, mock_metadata):
|
||||
"""Test that enrich() does nothing when Metadata is empty."""
|
||||
mock_metadata.is_empty.return_value = True
|
||||
meta_enricher.enrich(mock_metadata)
|
||||
mock_metadata.get_url.assert_called_once()
|
||||
|
||||
|
||||
def test_enrich_file_sizes(meta_enricher, metadata, tmp_path):
|
||||
"""Test that enrich_file_sizes() calculates and sets file sizes correctly."""
|
||||
file1 = tmp_path / "testfile_1.txt"
|
||||
file2 = tmp_path / "testfile_2.txt"
|
||||
file1.write_text("A" * 1000)
|
||||
file2.write_text("B" * 2000)
|
||||
metadata.add_media(Media(str(file1)))
|
||||
metadata.add_media(Media(str(file2)))
|
||||
|
||||
meta_enricher.enrich_file_sizes(metadata)
|
||||
|
||||
# Verify individual media file sizes
|
||||
media1 = metadata.get_all_media()[0]
|
||||
media2 = metadata.get_all_media()[1]
|
||||
|
||||
assert media1.get("bytes") == 1000
|
||||
assert media1.get("size") == "1000.0 bytes"
|
||||
assert media2.get("bytes") == 2000
|
||||
assert media2.get("size") == "2.0 KB"
|
||||
|
||||
assert metadata.get("total_bytes") == 3000
|
||||
assert metadata.get("total_size") == "2.9 KB"
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"size, expected",
|
||||
[
|
||||
(500, "500.0 bytes"),
|
||||
(1024, "1.0 KB"),
|
||||
(2048, "2.0 KB"),
|
||||
(1048576, "1.0 MB"),
|
||||
(1073741824, "1.0 GB"),
|
||||
],
|
||||
)
|
||||
def test_human_readable_bytes(size, expected):
|
||||
"""Test that human_readable_bytes() converts sizes correctly."""
|
||||
enricher = MetaEnricher()
|
||||
assert enricher.human_readable_bytes(size) == expected
|
||||
|
||||
def test_enrich_file_sizes_no_media(meta_enricher, metadata):
|
||||
"""Test that enrich_file_sizes() handles empty media list gracefully."""
|
||||
meta_enricher.enrich_file_sizes(metadata)
|
||||
assert metadata.get("total_bytes") == 0
|
||||
assert metadata.get("total_size") == "0.0 bytes"
|
||||
|
||||
|
||||
def test_enrich_archive_duration(meta_enricher, metadata):
|
||||
# Set fixed "processed at" time in the past
|
||||
processed_at = datetime.now(timezone.utc) - timedelta(minutes=10, seconds=30)
|
||||
metadata.set("_processed_at", processed_at)
|
||||
# patch datetime
|
||||
with patch("datetime.datetime") as mock_datetime:
|
||||
mock_now = datetime.now(timezone.utc)
|
||||
mock_datetime.now.return_value = mock_now
|
||||
meta_enricher.enrich_archive_duration(metadata)
|
||||
|
||||
assert metadata.get("archive_duration_seconds") == 630
|
||||
@@ -1,18 +1,22 @@
|
||||
from typing import Type
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
from auto_archiver.core.extractor import Extractor
|
||||
from auto_archiver.core.module import get_module
|
||||
|
||||
|
||||
class TestExtractorBase(object):
|
||||
|
||||
extractor_module: str = None
|
||||
config: dict = None
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_archiver(self, setup_module):
|
||||
def setup_extractor(self, setup_module):
|
||||
assert self.extractor_module is not None, "self.extractor_module must be set on the subclass"
|
||||
assert self.config is not None, "self.config must be a dict set on the subclass"
|
||||
self.extractor: Extractor = setup_module(self.extractor_module, self.config)
|
||||
|
||||
self.extractor: Type[Extractor] = setup_module(self.extractor_module, self.config)
|
||||
|
||||
def assertValidResponseMetadata(self, test_response: Metadata, title: str, timestamp: str, status: str = ""):
|
||||
assert test_response is not False
|
||||
|
||||
188
tests/extractors/test_instagram_api_extractor.py
Normal file
188
tests/extractors/test_instagram_api_extractor.py
Normal file
@@ -0,0 +1,188 @@
|
||||
from datetime import datetime
|
||||
from typing import Type
|
||||
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from auto_archiver.core import Metadata
|
||||
from auto_archiver.modules.instagram_api_extractor.instagram_api_extractor import InstagramAPIExtractor
|
||||
from .test_extractor_base import TestExtractorBase
|
||||
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_user_response():
|
||||
return {
|
||||
"user": {
|
||||
"pk": "123",
|
||||
"username": "test_user",
|
||||
"full_name": "Test User",
|
||||
"profile_pic_url_hd": "http://example.com/profile.jpg",
|
||||
"profile_pic_url": "http://example.com/profile_lowres.jpg"
|
||||
}
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def mock_post_response():
|
||||
return {
|
||||
"id": "post_123",
|
||||
"code": "abc123",
|
||||
"caption_text": "Test Caption",
|
||||
"taken_at": datetime.now().timestamp(),
|
||||
"video_url": "http://example.com/video.mp4",
|
||||
"thumbnail_url": "http://example.com/thumbnail.jpg"
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def mock_story_response():
|
||||
return [{
|
||||
"id": "story_123",
|
||||
"taken_at": datetime.now().timestamp(),
|
||||
"video_url": "http://example.com/story.mp4"
|
||||
}]
|
||||
|
||||
@pytest.fixture
|
||||
def mock_highlight_response():
|
||||
return {
|
||||
"response": {
|
||||
"reels": {
|
||||
"highlight:123": {
|
||||
"id": "123",
|
||||
"title": "Test Highlight",
|
||||
"items": [{
|
||||
"id": "item_123",
|
||||
"taken_at": datetime.now().timestamp(),
|
||||
"video_url": "http://example.com/highlight.mp4"
|
||||
}]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# @pytest.mark.incremental
|
||||
class TestInstagramAPIExtractor(TestExtractorBase):
|
||||
"""
|
||||
Test suite for InstagramAPIExtractor.
|
||||
"""
|
||||
|
||||
extractor_module = "instagram_api_extractor"
|
||||
extractor: InstagramAPIExtractor
|
||||
|
||||
config = {
|
||||
"access_token": "test_access_token",
|
||||
"api_endpoint": "https://api.instagram.com/v1",
|
||||
"full_profile": False,
|
||||
# "full_profile_max_posts": 0,
|
||||
# "minimize_json_output": True,
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def metadata(self):
|
||||
m = Metadata()
|
||||
m.set_url("https://instagram.com/test_user")
|
||||
m.set("netloc", "instagram.com")
|
||||
return m
|
||||
|
||||
@pytest.mark.parametrize("url,expected", [
|
||||
("https://instagram.com/user", [("", "user", "")]),
|
||||
("https://instagr.am/p/post_id", []),
|
||||
("https://youtube.com", []),
|
||||
("https://www.instagram.com/reel/reel_id", [("reel", "reel_id", "")]),
|
||||
("https://instagram.com/stories/highlights/123", [("stories/highlights", "123", "")]),
|
||||
("https://instagram.com/stories/user/123", [("stories", "user", "123")]),
|
||||
])
|
||||
def test_url_parsing(self, url, expected):
|
||||
assert self.extractor.valid_url.findall(url) == expected
|
||||
|
||||
def test_initialize(self):
|
||||
assert self.extractor.api_endpoint[-1] != "/"
|
||||
|
||||
@pytest.mark.parametrize("input_dict,expected", [
|
||||
({"x": 0, "valid": "data"}, {"valid": "data"}),
|
||||
({"nested": {"y": None, "valid": [{}]}}, {"nested": {"valid": [{}]}}),
|
||||
])
|
||||
def test_cleanup_dict(self, input_dict, expected):
|
||||
assert self.extractor.cleanup_dict(input_dict) == expected
|
||||
|
||||
def test_download(self):
|
||||
pass
|
||||
|
||||
def test_download_post(self, metadata, mock_user_response):
|
||||
# test with context=reel
|
||||
# test with context=post
|
||||
# test with multiple images
|
||||
# test gets text (metadata title)
|
||||
pass
|
||||
|
||||
def test_download_profile_basic(self, metadata, mock_user_response):
|
||||
"""Test basic profile download without full_profile"""
|
||||
with patch.object(self.extractor, 'call_api') as mock_call, \
|
||||
patch.object(self.extractor, 'download_from_url') as mock_download:
|
||||
# Mock API responses
|
||||
mock_call.return_value = mock_user_response
|
||||
mock_download.return_value = "profile.jpg"
|
||||
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
assert result.status == "insta profile: success"
|
||||
assert result.get_title() == "Test User"
|
||||
assert result.get("data") == self.extractor.cleanup_dict(mock_user_response["user"])
|
||||
# Verify profile picture download
|
||||
mock_call.assert_called_once_with("v2/user/by/username", {"username": "test_user"})
|
||||
mock_download.assert_called_once_with("http://example.com/profile.jpg")
|
||||
assert len(result.media) == 1
|
||||
assert result.media[0].filename == "profile.jpg"
|
||||
|
||||
def test_download_profile_full(self, metadata, mock_user_response, mock_story_response):
|
||||
"""Test full profile download with stories/posts"""
|
||||
with patch.object(self.extractor, 'call_api') as mock_call, \
|
||||
patch.object(self.extractor, 'download_all_posts') as mock_posts, \
|
||||
patch.object(self.extractor, 'download_all_highlights') as mock_highlights, \
|
||||
patch.object(self.extractor, 'download_all_tagged') as mock_tagged, \
|
||||
patch.object(self.extractor, '_download_stories_reusable') as mock_stories:
|
||||
|
||||
self.extractor.full_profile = True
|
||||
mock_call.side_effect = [
|
||||
mock_user_response,
|
||||
mock_story_response
|
||||
]
|
||||
mock_highlights.return_value = None
|
||||
mock_stories.return_value = mock_story_response
|
||||
mock_posts.return_value = None
|
||||
mock_tagged.return_value = None
|
||||
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
assert result.get("#stories") == len(mock_story_response)
|
||||
mock_posts.assert_called_once_with(result, "123")
|
||||
assert "errors" not in result.metadata
|
||||
|
||||
def test_download_profile_not_found(self, metadata):
|
||||
"""Test profile not found error"""
|
||||
with patch.object(self.extractor, 'call_api') as mock_call:
|
||||
mock_call.return_value = {"user": None}
|
||||
with pytest.raises(AssertionError) as exc_info:
|
||||
self.extractor.download_profile(metadata, "invalid_user")
|
||||
assert "User invalid_user not found" in str(exc_info.value)
|
||||
|
||||
def test_download_profile_error_handling(self, metadata, mock_user_response):
|
||||
"""Test error handling in full profile mode"""
|
||||
with (patch.object(self.extractor, 'call_api') as mock_call, \
|
||||
patch.object(self.extractor, 'download_all_highlights') as mock_highlights, \
|
||||
patch.object(self.extractor, 'download_all_tagged') as mock_tagged, \
|
||||
patch.object(self.extractor, '_download_stories_reusable') as stories_tagged, \
|
||||
patch.object(self.extractor, 'download_all_posts') as mock_posts
|
||||
):
|
||||
self.extractor.full_profile = True
|
||||
mock_call.side_effect = [
|
||||
mock_user_response,
|
||||
Exception("Stories API failed"),
|
||||
Exception("Posts API failed")
|
||||
]
|
||||
mock_highlights.return_value = None
|
||||
mock_tagged.return_value = None
|
||||
stories_tagged.return_value = None
|
||||
mock_posts.return_value = None
|
||||
result = self.extractor.download_profile(metadata, "test_user")
|
||||
|
||||
assert result.is_success()
|
||||
assert "Error downloading stories for test_user" in result.metadata["errors"]
|
||||
21
tests/extractors/test_instagram_extractor.py
Normal file
21
tests/extractors/test_instagram_extractor.py
Normal file
@@ -0,0 +1,21 @@
|
||||
import pytest
|
||||
|
||||
from auto_archiver.modules.instagram_extractor import InstagramExtractor
|
||||
from .test_extractor_base import TestExtractorBase
|
||||
|
||||
class TestInstagramExtractor(TestExtractorBase):
|
||||
|
||||
extractor_module: str = 'instagram_extractor'
|
||||
config: dict = {}
|
||||
|
||||
@pytest.mark.parametrize("url", [
|
||||
"https://www.instagram.com/p/",
|
||||
"https://www.instagram.com/p/1234567890/",
|
||||
"https://www.instagram.com/reel/1234567890/",
|
||||
"https://www.instagram.com/username/",
|
||||
"https://www.instagram.com/username/stories/",
|
||||
"https://www.instagram.com/username/highlights/",
|
||||
])
|
||||
def test_regex_matches(self, url):
|
||||
# post
|
||||
assert InstagramExtractor.valid_url.match(url)
|
||||
94
tests/extractors/test_instagram_tbot_extractor.py
Normal file
94
tests/extractors/test_instagram_tbot_extractor.py
Normal file
@@ -0,0 +1,94 @@
|
||||
import os
|
||||
from typing import Type
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata
|
||||
from auto_archiver.core.extractor import Extractor
|
||||
from auto_archiver.modules.instagram_tbot_extractor import InstagramTbotExtractor
|
||||
from tests.extractors.test_extractor_base import TestExtractorBase
|
||||
|
||||
TESTFILES = os.path.join(os.path.dirname(__file__), "testfiles")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def session_file(tmpdir):
|
||||
"""Fixture to create a test session file."""
|
||||
session_file = os.path.join(tmpdir, "test_session.session")
|
||||
with open(session_file, "w") as f:
|
||||
f.write("mock_session_data")
|
||||
return session_file.replace(".session", "")
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_extractor_methods(request, setup_module):
|
||||
with patch.object(InstagramTbotExtractor, '_prepare_session_file', return_value=None), \
|
||||
patch.object(InstagramTbotExtractor, '_initialize_telegram_client', return_value=None):
|
||||
if hasattr(request, 'cls') and hasattr(request.cls, 'config'):
|
||||
request.cls.extractor = setup_module("instagram_tbot_extractor", request.cls.config)
|
||||
|
||||
yield
|
||||
|
||||
@pytest.fixture
|
||||
def metadata_sample():
|
||||
m = Metadata()
|
||||
m.set_title("Test Title")
|
||||
m.set_timestamp("2021-01-01T00:00:00Z")
|
||||
m.set_url("https://www.instagram.com/p/1234567890")
|
||||
return m
|
||||
|
||||
|
||||
class TestInstagramTbotExtractor:
|
||||
|
||||
extractor_module = "instagram_tbot_extractor"
|
||||
extractor: InstagramTbotExtractor
|
||||
config = {
|
||||
"api_id": 12345,
|
||||
"api_hash": "test_api_hash",
|
||||
"session_file": "test_session",
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def mock_telegram_client(self):
|
||||
"""Fixture to mock TelegramClient interactions."""
|
||||
with patch("auto_archiver.modules.instagram_tbot_extractor._initialize_telegram_client") as mock_client:
|
||||
instance = MagicMock()
|
||||
mock_client.return_value = instance
|
||||
yield instance
|
||||
|
||||
def test_extractor_is_initialized(self):
|
||||
assert self.extractor is not None
|
||||
|
||||
|
||||
@patch("time.sleep")
|
||||
@pytest.mark.parametrize("url, expected_status, bot_responses", [
|
||||
("https://www.instagram.com/p/C4QgLbrIKXG", "insta-via-bot: success", [MagicMock(id=101, media=None, message="Are you new to Bellingcat? - The way we share our investigations is different. 💭\nWe want you to read our story but also learn ou")]),
|
||||
("https://www.instagram.com/reel/DEVLK8qoIbg/", "insta-via-bot: success", [MagicMock(id=101, media=None, message="Our volunteer community is at the centre of many incredible Bellingcat investigations and tools. Stephanie Ladel is one such vol")]),
|
||||
# todo tbot not working for stories :(
|
||||
("https://www.instagram.com/stories/bellingcatofficial/3556336382743057476/", False, [MagicMock(id=101, media=None, message="Media not found or unavailable")]),
|
||||
("https://www.youtube.com/watch?v=ymCMy8OffHM", False, []),
|
||||
("https://www.instagram.com/p/INVALID", False, [MagicMock(id=101, media=None, message="You must enter a URL to a post")]),
|
||||
])
|
||||
def test_download(self, mock_sleep, url, expected_status, bot_responses, metadata_sample):
|
||||
"""Test the `download()` method with various Instagram URLs."""
|
||||
metadata_sample.set_url(url)
|
||||
self.extractor.client = MagicMock()
|
||||
result = self.extractor.download(metadata_sample)
|
||||
pass
|
||||
# TODO fully mock or use as authenticated test
|
||||
# if expected_status:
|
||||
# assert result.is_success()
|
||||
# assert result.status == expected_status
|
||||
# assert result.metadata.get("title") in [msg.message[:128] for msg in bot_responses if msg.message]
|
||||
# else:
|
||||
# assert result is False
|
||||
|
||||
|
||||
|
||||
|
||||
# Test story
|
||||
# Test expired story
|
||||
# Test requires login/ access (?)
|
||||
# Test post
|
||||
# Test multiple images?
|
||||
57
tests/feeders/test_csv_feeder.py
Normal file
57
tests/feeders/test_csv_feeder.py
Normal file
@@ -0,0 +1,57 @@
|
||||
import pytest
|
||||
|
||||
@pytest.fixture
|
||||
def headerless_csv_file():
|
||||
return "tests/data/csv_no_headers.csv"
|
||||
|
||||
@pytest.fixture
|
||||
def header_csv_file():
|
||||
return "tests/data/csv_with_headers.csv"
|
||||
|
||||
@pytest.fixture
|
||||
def header_csv_file_non_default_column():
|
||||
return "tests/data/csv_with_headers_non_default_column.csv"
|
||||
|
||||
|
||||
def test_csv_feeder_no_headers(headerless_csv_file, setup_module):
|
||||
from auto_archiver.modules.csv_feeder.csv_feeder import CSVFeeder
|
||||
|
||||
feeder = setup_module(CSVFeeder, {"files": [headerless_csv_file]})
|
||||
|
||||
urls = list(feeder)
|
||||
assert len(urls) == 2
|
||||
assert urls[0].get_url() == "https://example.com/1/"
|
||||
assert urls[1].get_url() == "https://example.com/2/"
|
||||
|
||||
def test_csv_feeder_with_headers(header_csv_file, setup_module):
|
||||
from auto_archiver.modules.csv_feeder.csv_feeder import CSVFeeder
|
||||
|
||||
feeder = setup_module(CSVFeeder, {"files": [header_csv_file]})
|
||||
|
||||
urls = list(feeder)
|
||||
assert len(urls) == 2
|
||||
assert urls[0].get_url() == "https://example.com/1/"
|
||||
assert urls[1].get_url() == "https://example.com/2/"
|
||||
|
||||
def test_csv_feeder_wrong_column(header_csv_file, setup_module, caplog):
|
||||
from auto_archiver.modules.csv_feeder.csv_feeder import CSVFeeder
|
||||
|
||||
|
||||
with caplog.at_level("WARNING"):
|
||||
feeder = setup_module(CSVFeeder, {"files": [header_csv_file], "column": 1})
|
||||
urls = list(feeder)
|
||||
|
||||
assert len(urls) == 0
|
||||
assert "Not a valid URL in row" in caplog.text
|
||||
assert len(caplog.records) == 2
|
||||
|
||||
|
||||
def test_csv_feeder_column_by_name(header_csv_file, setup_module):
|
||||
from auto_archiver.modules.csv_feeder.csv_feeder import CSVFeeder
|
||||
|
||||
feeder = setup_module(CSVFeeder, {"files": [header_csv_file], "column": "webpages"})
|
||||
|
||||
urls = list(feeder)
|
||||
assert len(urls) == 2
|
||||
assert urls[0].get_url() == "https://example.com/1/"
|
||||
assert urls[1].get_url() == "https://example.com/2/"
|
||||
273
tests/feeders/test_gsheet_feeder.py
Normal file
273
tests/feeders/test_gsheet_feeder.py
Normal file
@@ -0,0 +1,273 @@
|
||||
from typing import Type
|
||||
|
||||
import gspread
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from auto_archiver.modules.gsheet_feeder import GsheetsFeeder
|
||||
from auto_archiver.core import Metadata, Feeder
|
||||
|
||||
|
||||
def test_setup_without_sheet_and_sheet_id(setup_module):
|
||||
# Ensure setup() raises AssertionError if neither sheet nor sheet_id is set.
|
||||
with patch("gspread.service_account"):
|
||||
with pytest.raises(AssertionError):
|
||||
setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": None, "sheet_id": None},
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def gsheet_feeder(setup_module) -> GsheetsFeeder:
|
||||
with patch("gspread.service_account"):
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
{
|
||||
"service_account": "dummy.json",
|
||||
"sheet": "test-auto-archiver",
|
||||
"sheet_id": None,
|
||||
"header": 1,
|
||||
"columns": {
|
||||
"url": "link",
|
||||
"status": "archive status",
|
||||
"folder": "destination folder",
|
||||
"archive": "archive location",
|
||||
"date": "archive date",
|
||||
"thumbnail": "thumbnail",
|
||||
"timestamp": "upload timestamp",
|
||||
"title": "upload title",
|
||||
"text": "text content",
|
||||
"screenshot": "screenshot",
|
||||
"hash": "hash",
|
||||
"pdq_hash": "perceptual hashes",
|
||||
"wacz": "wacz",
|
||||
"replaywebpage": "replaywebpage",
|
||||
},
|
||||
"allow_worksheets": set(),
|
||||
"block_worksheets": set(),
|
||||
"use_sheet_names_in_stored_paths": True,
|
||||
},
|
||||
)
|
||||
feeder.gsheets_client = MagicMock()
|
||||
return feeder
|
||||
|
||||
|
||||
class MockWorksheet:
|
||||
"""
|
||||
mimics the bits we need from gworksheet
|
||||
"""
|
||||
|
||||
class SheetSheet:
|
||||
title = "TestSheet"
|
||||
|
||||
rows = [
|
||||
{"row": 2, "url": "http://example.com", "status": "", "folder": ""},
|
||||
{"row": 3, "url": "http://example.com", "status": "", "folder": ""},
|
||||
{"row": 4, "url": "", "status": "", "folder": ""},
|
||||
{"row": 5, "url": "https://another.com", "status": None, "folder": ""},
|
||||
{
|
||||
"row": 6,
|
||||
"url": "https://another.com",
|
||||
"status": "success",
|
||||
"folder": "some_folder",
|
||||
},
|
||||
]
|
||||
|
||||
def __init__(self):
|
||||
self.wks = self.SheetSheet()
|
||||
|
||||
def count_rows(self):
|
||||
if not self.rows:
|
||||
return 0
|
||||
return max(r["row"] for r in self.rows)
|
||||
|
||||
def get_cell(self, row, col_name, fresh=False):
|
||||
matching = next((r for r in self.rows if r["row"] == row), {})
|
||||
return matching.get(col_name, "")
|
||||
|
||||
def get_cell_or_default(self, row, col_name, default):
|
||||
matching = next((r for r in self.rows if r["row"] == row), {})
|
||||
return matching.get(col_name, default)
|
||||
|
||||
|
||||
def test__process_rows(gsheet_feeder: GsheetsFeeder):
|
||||
testworksheet = MockWorksheet()
|
||||
metadata_items = list(gsheet_feeder._process_rows(testworksheet))
|
||||
assert len(metadata_items) == 3
|
||||
assert isinstance(metadata_items[0], Metadata)
|
||||
assert metadata_items[0].get("url") == "http://example.com"
|
||||
|
||||
|
||||
def test__set_metadata(gsheet_feeder: GsheetsFeeder):
|
||||
worksheet = MockWorksheet()
|
||||
metadata = Metadata()
|
||||
gsheet_feeder._set_context(metadata, worksheet, 1)
|
||||
assert metadata.get_context("gsheet") == {"row": 1, "worksheet": worksheet}
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Not recognising folder column")
|
||||
def test__set_metadata_with_folder_pickled(gsheet_feeder: GsheetsFeeder, worksheet):
|
||||
gsheet_feeder._set_context(worksheet, 7)
|
||||
assert Metadata.get_context("gsheet") == {"row": 1, "worksheet": worksheet}
|
||||
|
||||
|
||||
def test__set_metadata_with_folder(gsheet_feeder: GsheetsFeeder):
|
||||
testworksheet = MockWorksheet()
|
||||
metadata = Metadata()
|
||||
testworksheet.wks.title = "TestSheet"
|
||||
gsheet_feeder._set_context(metadata, testworksheet, 6)
|
||||
assert metadata.get_context("gsheet") == {"row": 6, "worksheet": testworksheet}
|
||||
assert metadata.get_context("folder") == "some-folder/test-auto-archiver/testsheet"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_module")
|
||||
@pytest.mark.parametrize(
|
||||
"sheet, sheet_id, expected_method, expected_arg, description",
|
||||
[
|
||||
("TestSheet", None, "open", "TestSheet", "opening by sheet name"),
|
||||
(None, "ABC123", "open_by_key", "ABC123", "opening by sheet ID"),
|
||||
],
|
||||
)
|
||||
def test_open_sheet_with_name_or_id(
|
||||
setup_module, sheet, sheet_id, expected_method, expected_arg, description
|
||||
):
|
||||
"""Ensure open_sheet() correctly opens by name or ID based on configuration."""
|
||||
with patch("gspread.service_account") as mock_service_account:
|
||||
mock_client = MagicMock()
|
||||
mock_service_account.return_value = mock_client
|
||||
mock_client.open.return_value = "MockSheet"
|
||||
mock_client.open_by_key.return_value = "MockSheet"
|
||||
|
||||
# Setup module with parameterized values
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": sheet, "sheet_id": sheet_id},
|
||||
)
|
||||
sheet_result = feeder.open_sheet()
|
||||
# Validate the correct method was called
|
||||
getattr(mock_client, expected_method).assert_called_once_with(
|
||||
expected_arg
|
||||
), f"Failed: {description}"
|
||||
assert sheet_result == "MockSheet", f"Failed: {description}"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_module")
|
||||
def test_open_sheet_with_sheet_id(setup_module):
|
||||
"""Ensure open_sheet() correctly opens a sheet by ID."""
|
||||
with patch("gspread.service_account") as mock_service_account:
|
||||
mock_client = MagicMock()
|
||||
mock_service_account.return_value = mock_client
|
||||
mock_client.open_by_key.return_value = "MockSheet"
|
||||
feeder = setup_module(
|
||||
"gsheet_feeder",
|
||||
{"service_account": "dummy.json", "sheet": None, "sheet_id": "ABC123"},
|
||||
)
|
||||
sheet = feeder.open_sheet()
|
||||
mock_client.open_by_key.assert_called_once_with("ABC123")
|
||||
assert sheet == "MockSheet"
|
||||
|
||||
|
||||
def test_should_process_sheet(setup_module):
|
||||
with patch("gspread.service_account"):
|
||||
gdb = setup_module(
|
||||
"gsheet_feeder",
|
||||
{
|
||||
"service_account": "dummy.json",
|
||||
"sheet": "TestSheet",
|
||||
"sheet_id": None,
|
||||
"allow_worksheets": {"TestSheet", "Sheet2"},
|
||||
"block_worksheets": {"Sheet3"},
|
||||
},
|
||||
)
|
||||
assert gdb.should_process_sheet("TestSheet") == True
|
||||
assert gdb.should_process_sheet("Sheet3") == False
|
||||
# False if allow_worksheets is set
|
||||
assert gdb.should_process_sheet("AnotherSheet") == False
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Requires a real connection")
|
||||
class TestGSheetsFeederReal:
|
||||
"""Testing GSheetsFeeder class"""
|
||||
|
||||
module_name: str = "gsheet_feeder"
|
||||
feeder: GsheetsFeeder
|
||||
# You must follow the setup process explain in the docs for this to work
|
||||
config: dict = {
|
||||
"service_account": "secrets/service_account.json",
|
||||
"sheet": "test-auto-archiver",
|
||||
"sheet_id": None,
|
||||
"header": 1,
|
||||
"columns": {
|
||||
"url": "link",
|
||||
"status": "archive status",
|
||||
"folder": "destination folder",
|
||||
"archive": "archive location",
|
||||
"date": "archive date",
|
||||
"thumbnail": "thumbnail",
|
||||
"timestamp": "upload timestamp",
|
||||
"title": "upload title",
|
||||
"text": "text content",
|
||||
"screenshot": "screenshot",
|
||||
"hash": "hash",
|
||||
"pdq_hash": "perceptual hashes",
|
||||
"wacz": "wacz",
|
||||
"replaywebpage": "replaywebpage",
|
||||
},
|
||||
"allow_worksheets": set(),
|
||||
"block_worksheets": set(),
|
||||
"use_sheet_names_in_stored_paths": True,
|
||||
}
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_feeder(self, setup_module):
|
||||
assert (
|
||||
self.module_name is not None
|
||||
), "self.module_name must be set on the subclass"
|
||||
assert self.config is not None, "self.config must be a dict set on the subclass"
|
||||
self.feeder: Type[Feeder] = setup_module(self.module_name, self.config)
|
||||
|
||||
def reset_test_sheet(self):
|
||||
"""Clears test sheet and re-adds headers to ensure consistent test results."""
|
||||
client = gspread.service_account(self.config["service_account"])
|
||||
sheet = client.open(self.config["sheet"])
|
||||
worksheet = sheet.get_worksheet(0)
|
||||
worksheet.clear()
|
||||
worksheet.append_row(["Link", "Archive Status"])
|
||||
|
||||
def test_setup(self):
|
||||
assert hasattr(self.feeder, "gsheets_client")
|
||||
|
||||
def test_open_sheet_real_connection(self):
|
||||
"""Ensure open_sheet() connects to a real Google Sheets instance."""
|
||||
sheet = self.feeder.open_sheet()
|
||||
assert sheet is not None, "open_sheet() should return a valid sheet instance"
|
||||
assert hasattr(
|
||||
sheet, "worksheets"
|
||||
), "Returned object should have worksheets method"
|
||||
|
||||
def test_iter_yields_metadata_real_data(self):
|
||||
"""Ensure __iter__() yields Metadata objects for real test sheet data."""
|
||||
self.reset_test_sheet()
|
||||
client = gspread.service_account(self.config["service_account"])
|
||||
sheet = client.open(self.config["sheet"])
|
||||
worksheet = sheet.get_worksheet(0)
|
||||
# Insert test rows as a temp method
|
||||
# Next we will refactor the feeder for better testing
|
||||
test_rows = [
|
||||
["https://example.com", ""],
|
||||
["", ""],
|
||||
["https://example.com", "done"],
|
||||
]
|
||||
worksheet.append_rows(test_rows)
|
||||
metadata_list = list(self.feeder)
|
||||
|
||||
# Validate that only the first row is processed
|
||||
assert len(metadata_list) == 1
|
||||
assert metadata_list[0].metadata.get("url") == "https://example.com"
|
||||
|
||||
|
||||
# TODO
|
||||
|
||||
# Test two sheets
|
||||
# test two sheets with different columns
|
||||
# test folder implementation
|
||||
144
tests/feeders/test_gworksheet.py
Normal file
144
tests/feeders/test_gworksheet.py
Normal file
@@ -0,0 +1,144 @@
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from auto_archiver.modules.gsheet_feeder import GWorksheet
|
||||
|
||||
|
||||
class TestGWorksheet:
|
||||
@pytest.fixture
|
||||
def mock_worksheet(self):
|
||||
mock_ws = MagicMock()
|
||||
mock_ws.get_values.return_value = [
|
||||
["Link", "Archive Status", "Archive Location", "Archive Date"],
|
||||
["url1", "archived", "filepath1", "2023-01-01"],
|
||||
["url2", "pending", "filepath2", "2023-01-02"],
|
||||
]
|
||||
return mock_ws
|
||||
|
||||
@pytest.fixture
|
||||
def gworksheet(self, mock_worksheet):
|
||||
return GWorksheet(mock_worksheet)
|
||||
|
||||
# Test initialization and basic properties
|
||||
def test_initialization_sets_headers(self, gworksheet):
|
||||
assert gworksheet.headers == ["link", "archive status", "archive location", "archive date"]
|
||||
|
||||
def test_count_rows_returns_correct_value(self, gworksheet):
|
||||
# inc header row
|
||||
assert gworksheet.count_rows() == 3
|
||||
|
||||
# Test column validation and lookup
|
||||
@pytest.mark.parametrize(
|
||||
"col,expected_index",
|
||||
[
|
||||
("url", 0),
|
||||
("status", 1),
|
||||
("archive", 2),
|
||||
("date", 3),
|
||||
],
|
||||
)
|
||||
def test_col_index_returns_correct_index(self, gworksheet, col, expected_index):
|
||||
assert gworksheet._col_index(col) == expected_index
|
||||
|
||||
def test_check_col_exists_raises_for_invalid_column(self, gworksheet):
|
||||
with pytest.raises(Exception, match="Column invalid_col"):
|
||||
gworksheet._check_col_exists("invalid_col")
|
||||
|
||||
# Test data retrieval
|
||||
@pytest.mark.parametrize(
|
||||
"row,expected",
|
||||
[
|
||||
(1, ["Link", "Archive Status", "Archive Location", "Archive Date"]),
|
||||
(2, ["url1", "archived", "filepath1", "2023-01-01"]),
|
||||
(3, ["url2", "pending", "filepath2", "2023-01-02"]),
|
||||
],
|
||||
)
|
||||
def test_get_row_returns_correct_data(self, gworksheet, row, expected):
|
||||
assert gworksheet.get_row(row) == expected
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"row,col,expected",
|
||||
[
|
||||
(2, "url", "url1"),
|
||||
(2, "status", "archived"),
|
||||
(3, "date", "2023-01-02"),
|
||||
],
|
||||
)
|
||||
def test_get_cell_returns_correct_value(self, gworksheet, row, col, expected):
|
||||
assert gworksheet.get_cell(row, col) == expected
|
||||
|
||||
def test_get_cell_handles_fresh_data(self, mock_worksheet, gworksheet):
|
||||
mock_worksheet.cell.return_value.value = "fresh_value"
|
||||
result = gworksheet.get_cell(2, "url", fresh=True)
|
||||
assert result == "fresh_value"
|
||||
mock_worksheet.cell.assert_called_once_with(2, 1)
|
||||
|
||||
# Test edge cases and error handling
|
||||
@pytest.mark.parametrize(
|
||||
"when_empty,expected",
|
||||
[
|
||||
(True, "default"),
|
||||
(False, ""),
|
||||
],
|
||||
)
|
||||
def test_get_cell_or_default_handles_empty_values(
|
||||
self, mock_worksheet, when_empty, expected
|
||||
):
|
||||
mock_worksheet.get_values.return_value[1][0] = "" # Empty URL cell
|
||||
g = GWorksheet(mock_worksheet)
|
||||
assert (
|
||||
g.get_cell_or_default(
|
||||
2, "url", default="default", when_empty_use_default=when_empty
|
||||
)
|
||||
== expected
|
||||
)
|
||||
|
||||
def test_get_cell_or_default_handles_missing_columns(self, gworksheet):
|
||||
assert (
|
||||
gworksheet.get_cell_or_default(1, "invalid_col", default="safe") == "safe"
|
||||
)
|
||||
|
||||
# Test write operations
|
||||
def test_set_cell_updates_correct_position(self, mock_worksheet, gworksheet):
|
||||
gworksheet.set_cell(2, "url", "new_url")
|
||||
mock_worksheet.update_cell.assert_called_once_with(2, 1, "new_url")
|
||||
|
||||
def test_batch_set_cell_formats_requests_correctly(
|
||||
self, mock_worksheet, gworksheet
|
||||
):
|
||||
updates = [(2, "url", "new_url"), (3, "status", "processed")]
|
||||
gworksheet.batch_set_cell(updates)
|
||||
expected_batch = [
|
||||
{"range": "A2", "values": [["new_url"]]},
|
||||
{"range": "B3", "values": [["processed"]]},
|
||||
]
|
||||
mock_worksheet.batch_update.assert_called_once_with(
|
||||
expected_batch, value_input_option="USER_ENTERED"
|
||||
)
|
||||
|
||||
def test_batch_set_cell_truncates_long_values(self, mock_worksheet, gworksheet):
|
||||
long_value = "x" * 50000
|
||||
gworksheet.batch_set_cell([(1, "url", long_value)])
|
||||
submitted_value = mock_worksheet.batch_update.call_args[0][0][0]["values"][0][0]
|
||||
assert len(submitted_value) == 49999
|
||||
|
||||
# Test coordinate conversion
|
||||
@pytest.mark.parametrize(
|
||||
"row,col,expected",
|
||||
[
|
||||
(1, "url", "A1"),
|
||||
(2, "status", "B2"),
|
||||
(3, "archive", "C3"),
|
||||
(4, "date", "D4"),
|
||||
],
|
||||
)
|
||||
def test_to_a1_conversion(self, gworksheet, row, col, expected):
|
||||
assert gworksheet.to_a1(row, col) == expected
|
||||
|
||||
# Test empty worksheet
|
||||
def test_empty_worksheet_initialization(self):
|
||||
mock_ws = MagicMock()
|
||||
mock_ws.get_values.return_value = []
|
||||
g = GWorksheet(mock_ws)
|
||||
assert g.headers == []
|
||||
assert g.count_rows() == 0
|
||||
124
tests/storages/test_S3_storage.py
Normal file
124
tests/storages/test_S3_storage.py
Normal file
@@ -0,0 +1,124 @@
|
||||
from typing import Type
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from auto_archiver.core import Media
|
||||
from auto_archiver.modules.s3_storage import S3Storage
|
||||
|
||||
|
||||
class TestS3Storage:
|
||||
"""
|
||||
Test suite for S3Storage.
|
||||
"""
|
||||
module_name: str = "s3_storage"
|
||||
storage: Type[S3Storage]
|
||||
s3: MagicMock
|
||||
config: dict = {
|
||||
"path_generator": "flat",
|
||||
"filename_generator": "static",
|
||||
"bucket": "test-bucket",
|
||||
"region": "test-region",
|
||||
"key": "test-key",
|
||||
"secret": "test-secret",
|
||||
"random_no_duplicate": False,
|
||||
"endpoint_url": "https://{region}.example.com",
|
||||
"cdn_url": "https://cdn.example.com/{key}",
|
||||
"private": False,
|
||||
}
|
||||
|
||||
@patch('boto3.client')
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_storage(self, setup_module):
|
||||
self.storage = setup_module(self.module_name, self.config)
|
||||
|
||||
def test_client_initialization(self):
|
||||
"""Test that S3 client is initialized with correct parameters"""
|
||||
assert self.storage.s3 is not None
|
||||
assert self.storage.s3.meta.region_name == 'test-region'
|
||||
|
||||
def test_get_cdn_url_generation(self):
|
||||
"""Test CDN URL formatting """
|
||||
media = Media("test.txt")
|
||||
media.key = "path/to/file.txt"
|
||||
url = self.storage.get_cdn_url(media)
|
||||
assert url == "https://cdn.example.com/path/to/file.txt"
|
||||
media.key = "another/path.jpg"
|
||||
assert self.storage.get_cdn_url(media) == "https://cdn.example.com/another/path.jpg"
|
||||
|
||||
def test_uploadf_sets_acl_public(self):
|
||||
media = Media("test.txt")
|
||||
mock_file = MagicMock()
|
||||
with patch.object(self.storage.s3, 'upload_fileobj') as mock_s3_upload, \
|
||||
patch.object(self.storage, 'is_upload_needed', return_value=True):
|
||||
self.storage.uploadf(mock_file, media)
|
||||
mock_s3_upload.assert_called_once_with(
|
||||
mock_file,
|
||||
Bucket='test-bucket',
|
||||
Key=media.key,
|
||||
ExtraArgs={'ACL': 'public-read', 'ContentType': 'text/plain'}
|
||||
)
|
||||
|
||||
def test_upload_decision_logic(self):
|
||||
"""Test is_upload_needed under different conditions"""
|
||||
media = Media("test.txt")
|
||||
# Test default state (random_no_duplicate=False)
|
||||
assert self.storage.is_upload_needed(media) is True
|
||||
# Set duplicate checking config to true:
|
||||
|
||||
self.storage.random_no_duplicate = True
|
||||
with patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash') as mock_calc_hash, \
|
||||
patch.object(self.storage, 'file_in_folder') as mock_file_in_folder:
|
||||
mock_calc_hash.return_value = 'beepboop123beepboop123beepboop123'
|
||||
mock_file_in_folder.return_value = 'existing_key.txt'
|
||||
# Test duplicate result
|
||||
assert self.storage.is_upload_needed(media) is False
|
||||
assert media.key == 'existing_key.txt'
|
||||
mock_file_in_folder.assert_called_with(
|
||||
# (first 24 chars of hash)
|
||||
'no-dups/beepboop123beepboop123be'
|
||||
)
|
||||
|
||||
|
||||
@patch.object(S3Storage, 'file_in_folder')
|
||||
def test_skips_upload_when_duplicate_exists(self, mock_file_in_folder):
|
||||
"""Test that upload skips when file_in_folder finds existing object"""
|
||||
self.storage.random_no_duplicate = True
|
||||
mock_file_in_folder.return_value = "existing_folder/existing_file.txt"
|
||||
# Create test media with calculated hash
|
||||
media = Media("test.txt")
|
||||
media.key = "original_path.txt"
|
||||
with patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash') as mock_calculate_hash:
|
||||
mock_calculate_hash.return_value = "beepboop123beepboop123beepboop123"
|
||||
# Verify upload
|
||||
assert self.storage.is_upload_needed(media) is False
|
||||
assert media.key == "existing_folder/existing_file.txt"
|
||||
assert media.get("previously archived") is True
|
||||
with patch.object(self.storage.s3, 'upload_fileobj') as mock_upload:
|
||||
result = self.storage.uploadf(None, media)
|
||||
mock_upload.assert_not_called()
|
||||
assert result is True
|
||||
|
||||
@patch.object(S3Storage, 'is_upload_needed')
|
||||
def test_uploads_with_correct_parameters(self, mock_upload_needed):
|
||||
media = Media("test.txt")
|
||||
media.key = "original_key.txt"
|
||||
mock_upload_needed.return_value = True
|
||||
media.mimetype = 'image/png'
|
||||
mock_file = MagicMock()
|
||||
|
||||
with patch.object(self.storage.s3, 'upload_fileobj') as mock_upload:
|
||||
self.storage.uploadf(mock_file, media)
|
||||
# verify call occured with these params
|
||||
mock_upload.assert_called_once_with(
|
||||
mock_file,
|
||||
Bucket='test-bucket',
|
||||
Key='original_key.txt',
|
||||
ExtraArgs={
|
||||
'ACL': 'public-read',
|
||||
'ContentType': 'image/png'
|
||||
}
|
||||
)
|
||||
|
||||
def test_file_in_folder_exists(self):
|
||||
with patch.object(self.storage.s3, 'list_objects') as mock_list_objects:
|
||||
mock_list_objects.return_value = {'Contents': [{'Key': 'path/to/file.txt'}]}
|
||||
assert self.storage.file_in_folder('path/to/') == 'path/to/file.txt'
|
||||
68
tests/storages/test_gdrive_storage.py
Normal file
68
tests/storages/test_gdrive_storage.py
Normal file
@@ -0,0 +1,68 @@
|
||||
from typing import Type
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
from auto_archiver.core import Media
|
||||
from auto_archiver.modules.gdrive_storage import GDriveStorage
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
from tests.storages.test_storage_base import TestStorageBase
|
||||
|
||||
|
||||
class TestGDriveStorage:
|
||||
"""
|
||||
Test suite for GDriveStorage.
|
||||
"""
|
||||
|
||||
module_name: str = "gdrive_storage"
|
||||
storage: Type[GDriveStorage]
|
||||
config: dict = {'path_generator': 'url',
|
||||
'filename_generator': 'static',
|
||||
'root_folder_id': "fake_root_folder_id",
|
||||
'oauth_token': None,
|
||||
'service_account': 'fake_service_account.json'
|
||||
}
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def gdrive(self, setup_module):
|
||||
with patch('google.oauth2.service_account.Credentials.from_service_account_file') as mock_creds:
|
||||
self.storage = setup_module(self.module_name, self.config)
|
||||
|
||||
def test_initialize_fails_with_non_existent_creds(self):
|
||||
"""
|
||||
Test that the Google Drive service raises a FileNotFoundError when the service account file does not exist.
|
||||
"""
|
||||
# Act and Assert
|
||||
with pytest.raises(FileNotFoundError) as exc_info:
|
||||
self.storage.setup()
|
||||
assert "No such file or directory" in str(exc_info.value)
|
||||
|
||||
|
||||
def test_path_parts(self):
|
||||
media = Media(filename="test.jpg")
|
||||
media.key = "folder1/folder2/test.jpg"
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Requires real credentials")
|
||||
@pytest.mark.download
|
||||
class TestGDriveStorageConnected(TestStorageBase):
|
||||
"""
|
||||
'Real' tests for GDriveStorage.
|
||||
"""
|
||||
|
||||
module_name: str = "gdrive_storage"
|
||||
storage: Type[GDriveStorage]
|
||||
config: dict = {'path_generator': 'url',
|
||||
'filename_generator': 'static',
|
||||
# TODO: replace with real root folder id
|
||||
'root_folder_id': "1TVY_oJt95_dmRSEdP9m5zFy7l50TeCSk",
|
||||
'oauth_token': None,
|
||||
'service_account': 'secrets/service_account.json'
|
||||
}
|
||||
|
||||
|
||||
def test_initialize_with_real_credentials(self):
|
||||
"""
|
||||
Test that the Google Drive service can be initialized with real credentials.
|
||||
"""
|
||||
assert self.storage.service is not None
|
||||
|
||||
|
||||
22
tests/storages/test_storage_base.py
Normal file
22
tests/storages/test_storage_base.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from typing import Type
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
from auto_archiver.core.storage import Storage
|
||||
|
||||
|
||||
class TestStorageBase(object):
|
||||
|
||||
module_name: str = None
|
||||
config: dict = None
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_storage(self, setup_module):
|
||||
assert (
|
||||
self.module_name is not None
|
||||
), "self.module_name must be set on the subclass"
|
||||
assert self.config is not None, "self.config must be a dict set on the subclass"
|
||||
self.storage: Type[Storage] = setup_module(
|
||||
self.module_name, self.config
|
||||
)
|
||||
62
tests/test_implementation.py
Normal file
62
tests/test_implementation.py
Normal file
@@ -0,0 +1,62 @@
|
||||
import sys
|
||||
import pytest
|
||||
|
||||
from auto_archiver.__main__ import main
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def orchestration_file_path(tmp_path):
|
||||
return (tmp_path / "example_orch.yaml").as_posix()
|
||||
|
||||
@pytest.fixture
|
||||
def orchestration_file(orchestration_file_path):
|
||||
def _orchestration_file(content=''):
|
||||
with open(orchestration_file_path, "w") as f:
|
||||
f.write(content)
|
||||
return orchestration_file_path
|
||||
|
||||
return _orchestration_file
|
||||
|
||||
@pytest.fixture
|
||||
def autoarchiver(tmp_path, monkeypatch, request):
|
||||
def _autoarchiver(args=[]):
|
||||
|
||||
def cleanup():
|
||||
from loguru import logger
|
||||
if not logger._core.handlers.get(0):
|
||||
logger._core.handlers_count = 0
|
||||
logger.add(sys.stderr)
|
||||
|
||||
request.addfinalizer(cleanup)
|
||||
|
||||
# change dir to tmp_path
|
||||
monkeypatch.chdir(tmp_path)
|
||||
with monkeypatch.context() as m:
|
||||
m.setattr(sys, "argv", ["auto-archiver"] + args)
|
||||
return main()
|
||||
|
||||
return _autoarchiver
|
||||
|
||||
|
||||
def test_run_auto_archiver_no_args(caplog, autoarchiver):
|
||||
with pytest.raises(SystemExit):
|
||||
autoarchiver()
|
||||
|
||||
assert "provide at least one URL via the command line, or set up an alternative feeder" in caplog.text
|
||||
|
||||
def test_run_auto_archiver_invalid_file(caplog, autoarchiver):
|
||||
# exec 'auto-archiver' on the command lin
|
||||
with pytest.raises(SystemExit):
|
||||
autoarchiver(["--config", "nonexistent_file.yaml"])
|
||||
|
||||
assert "Make sure the file exists and try again, or run without th" in caplog.text
|
||||
|
||||
def test_run_auto_archiver_empty_file(caplog, autoarchiver, orchestration_file):
|
||||
# create a valid (empty) orchestration file
|
||||
path = orchestration_file(content="")
|
||||
# exec 'auto-archiver' on the command lin
|
||||
with pytest.raises(SystemExit):
|
||||
autoarchiver(["--config", path])
|
||||
|
||||
# should treat an empty file as if there is no file at all
|
||||
assert " No URLs provided. Please provide at least one URL via the com" in caplog.text
|
||||
165
tests/test_metadata.py
Normal file
165
tests/test_metadata.py
Normal file
@@ -0,0 +1,165 @@
|
||||
import pytest
|
||||
from datetime import datetime, timezone
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def basic_metadata():
|
||||
m = Metadata()
|
||||
m.set_url("https://example.com")
|
||||
m.set("title", "Test Page")
|
||||
return m
|
||||
|
||||
|
||||
@dataclass
|
||||
class MockMedia:
|
||||
filename: str = ""
|
||||
mimetype: str = ""
|
||||
data: dict = None
|
||||
|
||||
def get(self, key: str, default: Any = None) -> Any:
|
||||
return self.data.get(key, default) if self.data else default
|
||||
|
||||
def set(self, key: str, value: Any) -> None:
|
||||
if not self.data:
|
||||
self.data = {}
|
||||
self.data[key] = value
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def media_file():
|
||||
def _create(filename="test.txt", mimetype="text/plain", hash_value=None):
|
||||
m = MockMedia(filename=filename, mimetype=mimetype)
|
||||
if hash_value:
|
||||
m.set("hash", hash_value)
|
||||
return m
|
||||
|
||||
return _create
|
||||
|
||||
|
||||
def test_initial_state():
|
||||
m = Metadata()
|
||||
assert m.status == "no archiver"
|
||||
assert m.metadata == {"_processed_at": m.get("_processed_at")}
|
||||
assert m.media == []
|
||||
assert isinstance(m.get("_processed_at"), datetime)
|
||||
|
||||
|
||||
def test_url_properties(basic_metadata):
|
||||
assert basic_metadata.get_url() == "https://example.com"
|
||||
assert basic_metadata.netloc == "example.com"
|
||||
|
||||
|
||||
def test_simple_merge(basic_metadata):
|
||||
right = Metadata(status="success")
|
||||
right.set("title", "Test Title")
|
||||
|
||||
basic_metadata.merge(right)
|
||||
assert basic_metadata.status == "success"
|
||||
assert basic_metadata.get("title") == "Test Title"
|
||||
|
||||
|
||||
def test_left_merge():
|
||||
left = (
|
||||
Metadata()
|
||||
.set("tags", ["a"])
|
||||
.set("stats", {"views": 10})
|
||||
.set("status", "success")
|
||||
)
|
||||
right = (
|
||||
Metadata()
|
||||
.set("tags", ["b"])
|
||||
.set("stats", {"likes": 5})
|
||||
.set("status", "no archiver")
|
||||
)
|
||||
|
||||
left.merge(right, overwrite_left=True)
|
||||
assert left.get("status") == "no archiver"
|
||||
assert left.get("tags") == ["a", "b"]
|
||||
assert left.get("stats") == {"views": 10, "likes": 5}
|
||||
|
||||
|
||||
def test_media_management(basic_metadata, media_file):
|
||||
media1 = media_file(hash_value="abc")
|
||||
media2 = media_file(hash_value="abc") # Duplicate
|
||||
media3 = media_file(hash_value="def")
|
||||
|
||||
basic_metadata.add_media(media1, "m1")
|
||||
basic_metadata.add_media(media2, "m2")
|
||||
basic_metadata.add_media(media3)
|
||||
|
||||
assert len(basic_metadata.media) == 3
|
||||
basic_metadata.remove_duplicate_media_by_hash()
|
||||
assert len(basic_metadata.media) == 2
|
||||
assert basic_metadata.get_media_by_id("m1") == media1
|
||||
|
||||
|
||||
def test_success():
|
||||
m = Metadata()
|
||||
assert not m.is_success()
|
||||
m.success("context")
|
||||
assert m.is_success()
|
||||
assert m.status == "context: success"
|
||||
|
||||
|
||||
def test_is_empty():
|
||||
m = Metadata()
|
||||
assert m.is_empty()
|
||||
# meaningless ids
|
||||
(
|
||||
m.set("url", "example.com")
|
||||
.set("total_bytes", 100)
|
||||
.set("archive_duration_seconds", 10)
|
||||
.set("_processed_at", datetime.now(timezone.utc))
|
||||
)
|
||||
assert m.is_empty()
|
||||
|
||||
|
||||
def test_store():
|
||||
pass
|
||||
|
||||
# Test Media operations
|
||||
|
||||
|
||||
# Test custom getter/setters
|
||||
|
||||
|
||||
def test_get_set_url():
|
||||
m = Metadata()
|
||||
m.set_url("http://example.com")
|
||||
assert m.get_url() == "http://example.com"
|
||||
with pytest.raises(AssertionError):
|
||||
m.set_url("")
|
||||
assert m.get("url") == "http://example.com"
|
||||
|
||||
|
||||
def test_set_content():
|
||||
m = Metadata()
|
||||
m.set_content("Some content")
|
||||
assert m.get("content") == "Some content"
|
||||
# Test appending
|
||||
m.set_content("New content")
|
||||
# Do we want to add a line break to the method?
|
||||
assert m.get("content") == "Some contentNew content"
|
||||
|
||||
|
||||
def test_choose_most_complex():
|
||||
pass
|
||||
|
||||
|
||||
def test_get_context():
|
||||
m = Metadata()
|
||||
m.set_context("somekey", "somevalue")
|
||||
assert m.get_context("somekey") == "somevalue"
|
||||
assert m.get_context("nonexistent") is None
|
||||
m.set_context("anotherkey", "anothervalue")
|
||||
# check the previous is retained
|
||||
assert m.get_context("somekey") == "somevalue"
|
||||
assert m.get_context("anotherkey") == "anothervalue"
|
||||
assert len(m._context) == 2
|
||||
|
||||
|
||||
def test_choose_most_complete():
|
||||
pass
|
||||
@@ -1,26 +1,92 @@
|
||||
import sys
|
||||
import pytest
|
||||
from auto_archiver.core.module import get_module, BaseModule, LazyBaseModule
|
||||
from auto_archiver.core.module import get_module_lazy, BaseModule, LazyBaseModule, _LAZY_LOADED_MODULES
|
||||
|
||||
@pytest.mark.parametrize("module_name", ["cli_feeder", "local_storage", "generic_extractor", "html_formatter", "csv_db"])
|
||||
@pytest.fixture
|
||||
def example_module():
|
||||
import auto_archiver
|
||||
|
||||
previous_path = auto_archiver.modules.__path__
|
||||
auto_archiver.modules.__path__.append("tests/data/test_modules/")
|
||||
|
||||
module = get_module_lazy("example_module")
|
||||
yield module
|
||||
# cleanup
|
||||
try:
|
||||
del module._manifest
|
||||
except AttributeError:
|
||||
pass
|
||||
del _LAZY_LOADED_MODULES["example_module"]
|
||||
sys.modules.pop("auto_archiver.modules.example_module.example_module", None)
|
||||
auto_archiver.modules.__path__ = previous_path
|
||||
|
||||
def test_get_module_lazy(example_module):
|
||||
assert example_module.name == "example_module"
|
||||
assert example_module.display_name == "Example Module"
|
||||
|
||||
assert example_module.manifest is not None
|
||||
|
||||
def test_python_dependency_check(example_module):
|
||||
# example_module requires loguru, which is not installed
|
||||
# monkey patch the manifest to include a nonexistnet dependency
|
||||
example_module.manifest["dependencies"]["python"] = ["does_not_exist"]
|
||||
|
||||
with pytest.raises(SystemExit) as load_error:
|
||||
example_module.load({})
|
||||
|
||||
assert load_error.value.code == 1
|
||||
|
||||
def test_binary_dependency_check(example_module):
|
||||
# example_module requires ffmpeg, which is not installed
|
||||
# monkey patch the manifest to include a nonexistnet dependency
|
||||
example_module.manifest["dependencies"]["binary"] = ["does_not_exist"]
|
||||
|
||||
def test_module_dependency_check_loads_module(example_module):
|
||||
# example_module requires cli_feeder, which is not installed
|
||||
# monkey patch the manifest to include a nonexistnet dependency
|
||||
example_module.manifest["dependencies"]["python"] = ["hash_enricher"]
|
||||
|
||||
loaded_module = example_module.load({})
|
||||
assert loaded_module is not None
|
||||
|
||||
# check the dependency is loaded
|
||||
assert _LAZY_LOADED_MODULES["hash_enricher"] is not None
|
||||
assert _LAZY_LOADED_MODULES["hash_enricher"]._instance is not None
|
||||
|
||||
def test_load_module(example_module):
|
||||
|
||||
# setup the module, and check that config is set to the default values
|
||||
loaded_module = example_module.load({})
|
||||
assert loaded_module is not None
|
||||
assert isinstance(loaded_module, BaseModule)
|
||||
assert loaded_module.name == "example_module"
|
||||
assert loaded_module.display_name == "Example Module"
|
||||
assert loaded_module.config["example_module"] == {"csv_file" : "db.csv"}
|
||||
|
||||
# check that the vlaue is set on the module itself
|
||||
assert loaded_module.csv_file == "db.csv"
|
||||
|
||||
@pytest.mark.parametrize("module_name", ["local_storage", "generic_extractor", "html_formatter", "csv_db"])
|
||||
def test_load_modules(module_name):
|
||||
# test that specific modules can be loaded
|
||||
module = get_module(module_name)
|
||||
module = get_module_lazy(module_name)
|
||||
assert module is not None
|
||||
assert isinstance(module, LazyBaseModule)
|
||||
assert module.name == module_name
|
||||
|
||||
loaded_module = module.load()
|
||||
loaded_module = module.load({})
|
||||
assert isinstance(loaded_module, BaseModule)
|
||||
assert loaded_module.name == module_name
|
||||
assert loaded_module.display_name == module.display_name
|
||||
|
||||
# test module setup
|
||||
loaded_module.setup(config={})
|
||||
|
||||
assert loaded_module.config == {}
|
||||
# check that default settings are applied
|
||||
default_config = module.configs
|
||||
assert loaded_module.name in loaded_module.config.keys()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("module_name", ["cli_feeder", "local_storage", "generic_extractor", "html_formatter", "csv_db"])
|
||||
@pytest.mark.parametrize("module_name", ["local_storage", "generic_extractor", "html_formatter", "csv_db"])
|
||||
def test_lazy_base_module(module_name):
|
||||
lazy_module = get_module(module_name)
|
||||
lazy_module = get_module_lazy(module_name)
|
||||
|
||||
assert lazy_module is not None
|
||||
assert isinstance(lazy_module, LazyBaseModule)
|
||||
|
||||
134
tests/test_orchestrator.py
Normal file
134
tests/test_orchestrator.py
Normal file
@@ -0,0 +1,134 @@
|
||||
import pytest
|
||||
import sys
|
||||
from argparse import ArgumentParser, ArgumentTypeError
|
||||
from auto_archiver.core.orchestrator import ArchivingOrchestrator
|
||||
from auto_archiver.version import __version__
|
||||
from auto_archiver.core.config import read_yaml, store_yaml
|
||||
from auto_archiver.core.module import _LAZY_LOADED_MODULES
|
||||
|
||||
TEST_ORCHESTRATION = "tests/data/test_orchestration.yaml"
|
||||
TEST_MODULES = "tests/data/test_modules/"
|
||||
|
||||
@pytest.fixture
|
||||
def test_args():
|
||||
return ["--config", TEST_ORCHESTRATION,
|
||||
"--module_paths", TEST_MODULES,
|
||||
"--example_module.required_field", "some_value"] # just set this for normal testing, we will remove it later
|
||||
|
||||
@pytest.fixture
|
||||
def orchestrator():
|
||||
yield ArchivingOrchestrator()
|
||||
# hack - the loguru logger starts with one logger, but if orchestrator has run before
|
||||
# it'll remove the default logger, add it back in:
|
||||
|
||||
from loguru import logger
|
||||
|
||||
if not logger._core.handlers.get(0):
|
||||
logger._core.handlers_count = 0
|
||||
logger.add(sys.stderr)
|
||||
# and remove the custom logger
|
||||
if logger._core.handlers.get(1):
|
||||
logger.remove(1)
|
||||
|
||||
# delete out any loaded modules
|
||||
_LAZY_LOADED_MODULES.clear()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def basic_parser(orchestrator) -> ArgumentParser:
|
||||
return orchestrator.setup_basic_parser()
|
||||
|
||||
def test_setup_orchestrator(orchestrator):
|
||||
assert orchestrator is not None
|
||||
|
||||
def test_parse_config():
|
||||
pass
|
||||
|
||||
def test_parse_basic(basic_parser):
|
||||
args = basic_parser.parse_args(["--config", TEST_ORCHESTRATION])
|
||||
assert args.config_file == TEST_ORCHESTRATION
|
||||
|
||||
@pytest.mark.parametrize("mode", ["simple", "full"])
|
||||
def test_mode(basic_parser, mode):
|
||||
args = basic_parser.parse_args(["--mode", mode])
|
||||
assert args.mode == mode
|
||||
|
||||
def test_mode_invalid(basic_parser, capsys):
|
||||
with pytest.raises(SystemExit) as exit_error:
|
||||
basic_parser.parse_args(["--mode", "invalid"])
|
||||
assert exit_error.value.code == 2
|
||||
assert "invalid choice" in capsys.readouterr().err
|
||||
|
||||
def test_version(basic_parser, capsys):
|
||||
with pytest.raises(SystemExit) as exit_error:
|
||||
basic_parser.parse_args(["--version"])
|
||||
assert exit_error.value.code == 0
|
||||
assert capsys.readouterr().out == f"{__version__}\n"
|
||||
|
||||
def test_help(orchestrator, basic_parser, capsys):
|
||||
|
||||
args = basic_parser.parse_args(["--help"])
|
||||
assert args.help == True
|
||||
|
||||
# test the show_help() on orchestrator
|
||||
with pytest.raises(SystemExit) as exit_error:
|
||||
orchestrator.show_help(args)
|
||||
|
||||
assert exit_error.value.code == 0
|
||||
assert "Usage: auto-archiver [--help] [--version] [--config CONFIG_FILE]" in capsys.readouterr().out
|
||||
|
||||
|
||||
def test_add_custom_modules_path(orchestrator, test_args):
|
||||
orchestrator.run(test_args)
|
||||
|
||||
import auto_archiver
|
||||
assert "tests/data/test_modules/" in auto_archiver.modules.__path__
|
||||
|
||||
def test_add_custom_modules_path_invalid(orchestrator, caplog, test_args):
|
||||
|
||||
orchestrator.run(test_args + # we still need to load the real path to get the example_module
|
||||
["--module_paths", "tests/data/invalid_test_modules/"])
|
||||
|
||||
assert caplog.records[0].message == "Path 'tests/data/invalid_test_modules/' does not exist. Skipping..."
|
||||
|
||||
|
||||
def test_check_required_values(orchestrator, caplog, test_args):
|
||||
# drop the example_module.required_field from the test_args
|
||||
test_args = test_args[:-2]
|
||||
|
||||
with pytest.raises(SystemExit) as exit_error:
|
||||
orchestrator.run(test_args)
|
||||
|
||||
assert caplog.records[1].message == "the following arguments are required: --example_module.required_field"
|
||||
|
||||
def test_get_required_values_from_config(orchestrator, test_args, tmp_path):
|
||||
|
||||
# load the default example yaml, add a required field, then run the orchestrator
|
||||
test_yaml = read_yaml(TEST_ORCHESTRATION)
|
||||
test_yaml['example_module'] = {'required_field': 'some_value'}
|
||||
# write it to a temp file
|
||||
tmp_file = (tmp_path / "temp_config.yaml").as_posix()
|
||||
store_yaml(test_yaml, tmp_file)
|
||||
|
||||
# run the orchestrator
|
||||
orchestrator.run(["--config", tmp_file, "--module_paths", TEST_MODULES])
|
||||
assert orchestrator.config is not None
|
||||
|
||||
def test_load_authentication_string(orchestrator, test_args):
|
||||
|
||||
orchestrator.run(test_args + ["--authentication", '{"facebook.com": {"username": "my_username", "password": "my_password"}}'])
|
||||
assert orchestrator.config['authentication'] == {"facebook.com": {"username": "my_username", "password": "my_password"}}
|
||||
|
||||
def test_load_authentication_string_concat_site(orchestrator, test_args):
|
||||
|
||||
orchestrator.run(test_args + ["--authentication", '{"x.com,twitter.com": {"api_key": "my_key"}}'])
|
||||
assert orchestrator.config['authentication'] == {"x.com": {"api_key": "my_key"},
|
||||
"twitter.com": {"api_key": "my_key"}}
|
||||
|
||||
def test_load_invalid_authentication_string(orchestrator, test_args):
|
||||
with pytest.raises(ArgumentTypeError):
|
||||
orchestrator.run(test_args + ["--authentication", "{\''invalid_json"])
|
||||
|
||||
def test_load_authentication_invalid_dict(orchestrator, test_args):
|
||||
with pytest.raises(ArgumentTypeError):
|
||||
orchestrator.run(test_args + ["--authentication", "[true, false]"])
|
||||
Reference in New Issue
Block a user