Add documentation, pre-commit hook, more make commands and

This commit is contained in:
erinhmclark
2025-03-13 13:21:32 +00:00
parent 6e52a534e7
commit e76551ba22
21 changed files with 558 additions and 270 deletions

View File

@@ -22,7 +22,9 @@ TESTS_TO_RUN_LAST = ["test_twitter_api_archiver"]
@pytest.fixture
def setup_module(request):
def _setup_module(module_name, config={}):
def _setup_module(module_name, config=None):
if config is None:
config = {}
module_factory = ModuleFactory()
if isinstance(module_name, type):

View File

@@ -24,17 +24,20 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
mock_logger = mocker.patch("auto_archiver.modules.tiktok_tikwm_extractor.tiktok_tikwm_extractor.logger")
return mock_get, mock_logger
@pytest.mark.parametrize("url,valid_url", [
("https://bellingcat.com", False),
("https://youtube.com", False),
("https://tiktok.co/", False),
("https://tiktok.com/", False),
("https://www.tiktok.com/", False),
("https://api.cool.tiktok.com/", False),
(VALID_EXAMPLE_URL, True),
("https://www.tiktok.com/@bbcnews/video/7478038212070411542", True),
("https://www.tiktok.com/@ggs68taiwan.official/video/7441821351142362375", True),
])
@pytest.mark.parametrize(
"url,valid_url",
[
("https://bellingcat.com", False),
("https://youtube.com", False),
("https://tiktok.co/", False),
("https://tiktok.com/", False),
("https://www.tiktok.com/", False),
("https://api.cool.tiktok.com/", False),
(VALID_EXAMPLE_URL, True),
("https://www.tiktok.com/@bbcnews/video/7478038212070411542", True),
("https://www.tiktok.com/@ggs68taiwan.official/video/7441821351142362375", True),
],
)
def test_valid_urls(self, mocker, make_item, url, valid_url):
mock_get, mock_logger = self.get_mockers(mocker)
if valid_url:
@@ -53,17 +56,20 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
mock_logger.error.assert_called_once()
assert mock_logger.error.call_args[0][0].startswith("failed to parse JSON response")
mock_get.return_value.json.side_effect = Exception
with pytest.raises(Exception):
mock_get.return_value.json.side_effect = ValueError
with pytest.raises(ValueError):
self.extractor.download(make_item(self.VALID_EXAMPLE_URL))
mock_get.assert_called()
assert mock_get.call_count == 2
assert mock_get.return_value.json.call_count == 2
@pytest.mark.parametrize("response", [
({"msg": "failure"}),
({"msg": "success"}),
])
@pytest.mark.parametrize(
"response",
[
({"msg": "failure"}),
({"msg": "success"}),
],
)
def test_unsuccessful_responses(self, mocker, make_item, response):
mock_get, mock_logger = self.get_mockers(mocker)
mock_get.return_value.status_code = 200
@@ -74,11 +80,14 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
mock_logger.error.assert_called_once()
assert mock_logger.error.call_args[0][0].startswith("failed to get a valid response")
@pytest.mark.parametrize("response,has_vid", [
({"data": {"id": 123}}, False),
({"data": {"wmplay": "url"}}, True),
({"data": {"play": "url"}}, True),
])
@pytest.mark.parametrize(
"response,has_vid",
[
({"data": {"id": 123}}, False),
({"data": {"wmplay": "url"}}, True),
({"data": {"play": "url"}}, True),
],
)
def test_correct_extraction(self, mocker, make_item, response, has_vid):
mock_get, mock_logger = self.get_mockers(mocker)
mock_get.return_value.status_code = 200
@@ -102,16 +111,19 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
def test_correct_data_extracted(self, mocker, make_item):
mock_get, _ = self.get_mockers(mocker)
mock_get.return_value.status_code = 200
mock_get.return_value.json.return_value = {"msg": "success", "data": {
"wmplay": "url",
"origin_cover": "cover.jpg",
"title": "Title",
"id": 123,
"duration": 60,
"create_time": 1736301699,
"author": "Author",
"other": "data"
}}
mock_get.return_value.json.return_value = {
"msg": "success",
"data": {
"wmplay": "url",
"origin_cover": "cover.jpg",
"title": "Title",
"id": 123,
"duration": 60,
"create_time": 1736301699,
"author": "Author",
"other": "data",
},
}
result = self.extractor.download(make_item(self.VALID_EXAMPLE_URL))
assert result.is_success()
@@ -129,9 +141,12 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
result = self.extractor.download(make_item(url))
assert result.is_success()
assert len(result.media) == 2
assert result.get_title() == "The A23a iceberg is one of the world's oldest and it's so big you can see it from space. #Iceberg #A23a #Antarctica #Ice #ClimateChange #DavidAttenborough #Ocean #Sea #SouthGeorgia #BBCNews "
assert (
result.get_title()
== "The A23a iceberg is one of the world's oldest and it's so big you can see it from space. #Iceberg #A23a #Antarctica #Ice #ClimateChange #DavidAttenborough #Ocean #Sea #SouthGeorgia #BBCNews "
)
assert result.get("author").get("unique_id") == "bbcnews"
assert result.get("api_data").get("id") == '7478038212070411542'
assert result.get("api_data").get("id") == "7478038212070411542"
assert result.media[1].get("duration") == 59
assert result.get("timestamp") == datetime.fromtimestamp(1741122000, tz=timezone.utc)
@@ -149,6 +164,6 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
assert len(result.media) == 2
assert result.get_title() == "Căng nhất lúc này #ggs68 #ggs68taiwan #taiwan #dailoan #tiktoknews"
assert result.get("author").get("id") == "7197400619475649562"
assert result.get("api_data").get("id") == '7441821351142362375'
assert result.get("api_data").get("id") == "7441821351142362375"
assert result.media[1].get("duration") == 34
assert result.get("timestamp") == datetime.fromtimestamp(1732684060, tz=timezone.utc)

View File

@@ -8,6 +8,7 @@ class TestS3Storage:
"""
Test suite for S3Storage.
"""
module_name: str = "s3_storage"
storage: Type[S3Storage]
config: dict = {
@@ -32,10 +33,10 @@ class TestS3Storage:
"""Test that S3 client is initialized with correct parameters"""
assert self.storage.s3 is not None
assert self.storage.s3.meta.region_name == 'test-region'
assert self.storage.s3.meta.region_name == "test-region"
def test_get_cdn_url_generation(self):
"""Test CDN URL formatting """
"""Test CDN URL formatting"""
media = Media("test.txt")
media._key = "path/to/file.txt"
url = self.storage.get_cdn_url(media)
@@ -46,14 +47,14 @@ class TestS3Storage:
def test_uploadf_sets_acl_public(self, mocker):
media = Media("test.txt")
mock_file = mocker.MagicMock()
mock_s3_upload = mocker.patch.object(self.storage.s3, 'upload_fileobj')
mocker.patch.object(self.storage, 'is_upload_needed', return_value=True)
mock_s3_upload = mocker.patch.object(self.storage.s3, "upload_fileobj")
mocker.patch.object(self.storage, "is_upload_needed", return_value=True)
self.storage.uploadf(mock_file, media)
mock_s3_upload.assert_called_once_with(
mock_file,
Bucket='test-bucket',
Bucket="test-bucket",
Key=media.key,
ExtraArgs={'ACL': 'public-read', 'ContentType': 'text/plain'}
ExtraArgs={"ACL": "public-read", "ContentType": "text/plain"},
)
def test_upload_decision_logic(self, mocker):
@@ -61,23 +62,29 @@ class TestS3Storage:
media = Media("test.txt")
assert self.storage.is_upload_needed(media) is True
self.storage.random_no_duplicate = True
mocker.patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash', return_value='beepboop123beepboop123beepboop123')
mock_file_in_folder = mocker.patch.object(self.storage, 'file_in_folder', return_value='existing_key.txt')
mocker.patch(
"auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash",
return_value="beepboop123beepboop123beepboop123",
)
mock_file_in_folder = mocker.patch.object(self.storage, "file_in_folder", return_value="existing_key.txt")
assert self.storage.is_upload_needed(media) is False
assert media.key == 'existing_key.txt'
mock_file_in_folder.assert_called_with('no-dups/beepboop123beepboop123be')
assert media.key == "existing_key.txt"
mock_file_in_folder.assert_called_with("no-dups/beepboop123beepboop123be")
def test_skips_upload_when_duplicate_exists(self, mocker):
"""Test that upload skips when file_in_folder finds existing object"""
self.storage.random_no_duplicate = True
mocker.patch.object(S3Storage, 'file_in_folder', return_value="existing_folder/existing_file.txt")
mocker.patch.object(S3Storage, "file_in_folder", return_value="existing_folder/existing_file.txt")
media = Media("test.txt")
media._key = "original_path.txt"
mocker.patch('auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash', return_value="beepboop123beepboop123beepboop123")
mocker.patch(
"auto_archiver.modules.s3_storage.s3_storage.calculate_file_hash",
return_value="beepboop123beepboop123beepboop123",
)
assert self.storage.is_upload_needed(media) is False
assert media.key == "existing_folder/existing_file.txt"
assert media.get("previously archived") is True
mock_upload = mocker.patch.object(self.storage.s3, 'upload_fileobj')
mock_upload = mocker.patch.object(self.storage.s3, "upload_fileobj")
result = self.storage.uploadf(None, media)
mock_upload.assert_not_called()
assert result is True
@@ -85,21 +92,18 @@ class TestS3Storage:
def test_uploads_with_correct_parameters(self, mocker):
media = Media("test.txt")
media._key = "original_key.txt"
mocker.patch.object(S3Storage, 'is_upload_needed', return_value=True)
media.mimetype = 'image/png'
mocker.patch.object(S3Storage, "is_upload_needed", return_value=True)
media.mimetype = "image/png"
mock_file = mocker.MagicMock()
mock_upload = mocker.patch.object(self.storage.s3, 'upload_fileobj')
mock_upload = mocker.patch.object(self.storage.s3, "upload_fileobj")
self.storage.uploadf(mock_file, media)
mock_upload.assert_called_once_with(
mock_file,
Bucket='test-bucket',
Key='original_key.txt',
ExtraArgs={
'ACL': 'public-read',
'ContentType': 'image/png'
}
Bucket="test-bucket",
Key="original_key.txt",
ExtraArgs={"ACL": "public-read", "ContentType": "image/png"},
)
def test_file_in_folder_exists(self, mocker):
mocker.patch.object(self.storage.s3, 'list_objects', return_value={'Contents': [{'Key': 'path/to/file.txt'}]})
assert self.storage.file_in_folder('path/to/') == 'path/to/file.txt'
mocker.patch.object(self.storage.s3, "list_objects", return_value={"Contents": [{"Key": "path/to/file.txt"}]})
assert self.storage.file_in_folder("path/to/") == "path/to/file.txt"

View File

@@ -1,4 +1,3 @@
import os
from pathlib import Path
@@ -8,6 +7,7 @@ from auto_archiver.core import Media, Metadata
from auto_archiver.modules.local_storage import LocalStorage
from auto_archiver.core.consts import SetupError
@pytest.fixture
def local_storage(setup_module, tmp_path) -> LocalStorage:
save_to = tmp_path / "local_archive"
@@ -20,6 +20,7 @@ def local_storage(setup_module, tmp_path) -> LocalStorage:
}
return setup_module("local_storage", configs)
@pytest.fixture
def sample_media(tmp_path) -> Media:
"""Fixture creating a Media object with temporary source file"""
@@ -27,9 +28,11 @@ def sample_media(tmp_path) -> Media:
src_file.write_text("test content")
return Media(filename=str(src_file))
def test_too_long_save_path(setup_module):
with pytest.raises(SetupError):
setup_module("local_storage", {"save_to": "long"*100})
setup_module("local_storage", {"save_to": "long" * 100})
def test_get_cdn_url_relative(local_storage):
local_storage.filename_generator = "random"
@@ -38,6 +41,7 @@ def test_get_cdn_url_relative(local_storage):
expected = os.path.join(local_storage.save_to, media.key)
assert local_storage.get_cdn_url(media) == expected
def test_get_cdn_url_absolute(local_storage):
local_storage.filename_generator = "random"
@@ -47,14 +51,14 @@ def test_get_cdn_url_absolute(local_storage):
expected = os.path.abspath(os.path.join(local_storage.save_to, media.key))
assert local_storage.get_cdn_url(media) == expected
def test_upload_file_contents_and_metadata(local_storage, sample_media):
local_storage.store(sample_media, "https://example.com", Metadata())
dest = os.path.join(local_storage.save_to, sample_media.key)
assert Path(sample_media.filename).read_text() == Path(dest).read_text()
def test_upload_nonexistent_source(local_storage):
media = Media(_key="missing.txt", filename="nonexistent.txt")
with pytest.raises(FileNotFoundError):
local_storage.upload(media)

View File

@@ -6,32 +6,28 @@ from auto_archiver.core.metadata import Metadata, Media
from auto_archiver.core.storage import Storage
from auto_archiver.core.module import ModuleFactory
class TestStorageBase(object):
class TestStorageBase(object):
module_name: str = None
config: dict = None
@pytest.fixture(autouse=True)
def setup_storage(self, setup_module):
assert (
self.module_name is not None
), "self.module_name must be set on the subclass"
assert self.module_name is not None, "self.module_name must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
self.storage: Type[Storage] = setup_module(
self.module_name, self.config
)
self.storage: Type[Storage] = setup_module(self.module_name, self.config)
class TestBaseStorage(Storage):
name = "test_storage"
def get_cdn_url(self, media):
return "cdn_url"
def uploadf(self, file, key, **kwargs):
return True
@pytest.fixture
def dummy_file(tmp_path):
# create dummy.txt file
@@ -39,16 +35,18 @@ def dummy_file(tmp_path):
dummy_file.write_text("test content")
return str(dummy_file)
@pytest.fixture
def storage_base():
def _storage_base(config):
storage_base = TestBaseStorage()
storage_base.config_setup({TestBaseStorage.name : config})
storage_base.config_setup({TestBaseStorage.name: config})
storage_base.module_factory = ModuleFactory()
return storage_base
return _storage_base
@pytest.mark.parametrize(
"path_generator, filename_generator, url, expected_key",
[
@@ -58,11 +56,11 @@ def storage_base():
("url", "random", "https://example.com/file/", "folder/https-example-com-file/pretend-random.txt"),
("random", "static", "https://example.com/file/", "folder/pretend-random/6ae8a75555209fd6c44157c0.txt"),
("random", "random", "https://example.com/file/", "folder/pretend-random/pretend-random.txt"),
],
)
def test_storage_name_generation(storage_base, path_generator, filename_generator, url,
expected_key, mocker, tmp_path, dummy_file):
def test_storage_name_generation(
storage_base, path_generator, filename_generator, url, expected_key, mocker, tmp_path, dummy_file
):
mock_random = mocker.patch("auto_archiver.core.storage.random_str")
mock_random.return_value = "pretend-random"
@@ -89,10 +87,10 @@ def test_really_long_name(storage_base, dummy_file):
}
storage: Storage = storage_base(config)
url = f"https://example.com/{'file'*100}"
url = f"https://example.com/{'file' * 100}"
media = Media(filename=dummy_file)
storage.set_key(media, url, Metadata())
assert media.key == f"https-example-com-{'file'*13}/6ae8a75555209fd6c44157c0.txt"
assert media.key == f"https-example-com-{'file' * 13}/6ae8a75555209fd6c44157c0.txt"
def test_storage_loads_hash_enricher(storage_base, dummy_file):