mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-08 03:18:28 +03:00
Added tests, updated instagram_tbot_extractor.py raise failure.
This commit is contained in:
@@ -3,6 +3,7 @@ pytest conftest file, for shared fixtures and configuration
|
||||
"""
|
||||
import os
|
||||
import pickle
|
||||
from datetime import datetime, timezone
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import Dict, Tuple
|
||||
import hashlib
|
||||
@@ -138,3 +139,9 @@ def mock_binary_dependencies():
|
||||
# Mock all binary dependencies as available
|
||||
mock_shutil_which.return_value = "/usr/bin/fake_binary"
|
||||
yield mock_shutil_which
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_datetime():
|
||||
return datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc)
|
||||
|
||||
|
||||
69
tests/databases/test_api_db.py
Normal file
69
tests/databases/test_api_db.py
Normal file
@@ -0,0 +1,69 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata
|
||||
from auto_archiver.modules.api_db import AAApiDb
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def api_db(setup_module):
|
||||
configs: dict = {
|
||||
"api_endpoint": "https://api.example.com",
|
||||
"api_token": "test-token",
|
||||
"public": False,
|
||||
"author_id": "Someone",
|
||||
"group_id": "123",
|
||||
"use_api_cache": True,
|
||||
"store_results": True,
|
||||
"tags": "[]",
|
||||
}
|
||||
return setup_module(AAApiDb, configs)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def metadata():
|
||||
metadata = Metadata()
|
||||
metadata.set("_processed_at", "2021-01-01T00:00:00")
|
||||
metadata.set_url("https://example.com")
|
||||
return metadata
|
||||
|
||||
|
||||
def test_fetch_no_cache(api_db, metadata):
|
||||
# Test fetch
|
||||
api_db.use_api_cache = False
|
||||
assert api_db.fetch(metadata) is None
|
||||
|
||||
|
||||
def test_fetch_fail_status(api_db, metadata):
|
||||
# Test response fail in fetch method
|
||||
with patch("auto_archiver.modules.api_db.api_db.requests.get") as mock_get:
|
||||
mock_get.return_value.status_code = 400
|
||||
mock_get.return_value.json.return_value = {}
|
||||
with patch("loguru.logger.error") as mock_error:
|
||||
assert api_db.fetch(metadata) is False
|
||||
mock_error.assert_called_once_with("AA API FAIL (400): {}")
|
||||
|
||||
|
||||
def test_fetch(api_db, metadata):
|
||||
# Test successful fetch method
|
||||
with patch("auto_archiver.modules.api_db.api_db.requests.get") as mock_get,\
|
||||
patch("auto_archiver.core.metadata.datetime.datetime") as mock_datetime:
|
||||
mock_datetime.now.return_value = "2021-01-01T00:00:00"
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.json.return_value = [{"result": {}}, {"result":
|
||||
{'media': [], 'metadata': {'_processed_at': '2021-01-01T00:00:00', 'url': 'https://example.com'},
|
||||
'status': 'no archiver'}}]
|
||||
assert api_db.fetch(metadata) == metadata
|
||||
|
||||
|
||||
def test_done_success(api_db, metadata):
|
||||
with patch("auto_archiver.modules.api_db.api_db.requests.post") as mock_post:
|
||||
mock_post.return_value.status_code = 201
|
||||
api_db.done(metadata)
|
||||
mock_post.assert_called_once()
|
||||
mock_post.assert_called_once_with("https://api.example.com/interop/submit-archive",
|
||||
json={'author_id': 'Someone', 'url': 'https://example.com',
|
||||
'public': False, 'group_id': '123', 'tags': ['[', ']'], 'result': '{"status": "no archiver", "metadata": {"_processed_at": "2021-01-01T00:00:00", "url": "https://example.com"}, "media": []}'},
|
||||
headers={'Authorization': 'Bearer test-token'})
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
import os
|
||||
from typing import Type
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.core import Metadata
|
||||
from auto_archiver.core.extractor import Extractor
|
||||
from auto_archiver.modules.instagram_tbot_extractor import InstagramTbotExtractor
|
||||
from tests.extractors.test_extractor_base import TestExtractorBase
|
||||
|
||||
@@ -13,82 +11,103 @@ TESTFILES = os.path.join(os.path.dirname(__file__), "testfiles")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def session_file(tmpdir):
|
||||
"""Fixture to create a test session file."""
|
||||
session_file = os.path.join(tmpdir, "test_session.session")
|
||||
with open(session_file, "w") as f:
|
||||
f.write("mock_session_data")
|
||||
return session_file.replace(".session", "")
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def patch_extractor_methods(request, setup_module):
|
||||
with patch.object(InstagramTbotExtractor, '_prepare_session_file', return_value=None), \
|
||||
patch.object(InstagramTbotExtractor, '_initialize_telegram_client', return_value=None):
|
||||
if hasattr(request, 'cls') and hasattr(request.cls, 'config'):
|
||||
request.cls.extractor = setup_module("instagram_tbot_extractor", request.cls.config)
|
||||
|
||||
yield
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def mock_sleep():
|
||||
"""Globally mock time.sleep to avoid delays."""
|
||||
with patch("time.sleep") as mock_sleep:
|
||||
yield mock_sleep
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def metadata_sample():
|
||||
m = Metadata()
|
||||
m.set_title("Test Title")
|
||||
m.set_timestamp("2021-01-01T00:00:00Z")
|
||||
m.set_timestamp("2021-01-01T00:00:00")
|
||||
m.set_url("https://www.instagram.com/p/1234567890")
|
||||
return m
|
||||
|
||||
|
||||
class TestInstagramTbotExtractor:
|
||||
@pytest.fixture
|
||||
def mock_telegram_client():
|
||||
"""Fixture to mock TelegramClient interactions."""
|
||||
with patch("auto_archiver.modules.instagram_tbot_extractor.client") as mock_client:
|
||||
instance = MagicMock()
|
||||
mock_client.return_value = instance
|
||||
yield instance
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def extractor(setup_module, patch_extractor_methods):
|
||||
extractor_module = "instagram_tbot_extractor"
|
||||
extractor: InstagramTbotExtractor
|
||||
config = {
|
||||
"api_id": 12345,
|
||||
"api_hash": "test_api_hash",
|
||||
"session_file": "test_session",
|
||||
"timeout": 4
|
||||
}
|
||||
extractor = setup_module(extractor_module, config)
|
||||
extractor.client = MagicMock()
|
||||
extractor.session_file = "test_session"
|
||||
return extractor
|
||||
|
||||
|
||||
def test_non_instagram_url(extractor, metadata_sample):
|
||||
metadata_sample.set_url("https://www.youtube.com")
|
||||
assert extractor.download(metadata_sample) is False
|
||||
|
||||
def test_download_success(extractor, metadata_sample):
|
||||
with patch.object(extractor, "_send_url_to_bot", return_value=(MagicMock(), 101)), \
|
||||
patch.object(extractor, "_process_messages", return_value="Sample Instagram post caption"):
|
||||
result = extractor.download(metadata_sample)
|
||||
assert result.is_success()
|
||||
assert result.status == "insta-via-bot: success"
|
||||
assert result.metadata.get("title") == "Sample Instagram post caption"
|
||||
|
||||
|
||||
def test_download_invalid(extractor, metadata_sample):
|
||||
with patch.object(extractor, "_send_url_to_bot", return_value=(MagicMock(), 101)), \
|
||||
patch.object(extractor, "_process_messages", return_value="You must enter a URL to a post"):
|
||||
assert extractor.download(metadata_sample) is False
|
||||
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Requires authentication.")
|
||||
class TestInstagramTbotExtractorReal(TestExtractorBase):
|
||||
# To run these tests set the TELEGRAM_API_ID and TELEGRAM_API_HASH environment variables, and ensure the session file exists.
|
||||
# Note these are true at this point in time, but changes to source media could be reason for failure.
|
||||
extractor_module = "instagram_tbot_extractor"
|
||||
extractor: InstagramTbotExtractor
|
||||
config = {
|
||||
"api_id": os.environ.get("TELEGRAM_API_ID"),
|
||||
"api_hash": os.environ.get("TELEGRAM_API_HASH"),
|
||||
"session_file": "secrets/anon-insta",
|
||||
}
|
||||
|
||||
@pytest.fixture
|
||||
def mock_telegram_client(self):
|
||||
"""Fixture to mock TelegramClient interactions."""
|
||||
with patch("auto_archiver.modules.instagram_tbot_extractor._initialize_telegram_client") as mock_client:
|
||||
instance = MagicMock()
|
||||
mock_client.return_value = instance
|
||||
yield instance
|
||||
|
||||
def test_extractor_is_initialized(self):
|
||||
assert self.extractor is not None
|
||||
|
||||
|
||||
@patch("time.sleep")
|
||||
@pytest.mark.parametrize("url, expected_status, bot_responses", [
|
||||
("https://www.instagram.com/p/C4QgLbrIKXG", "insta-via-bot: success", [MagicMock(id=101, media=None, message="Are you new to Bellingcat? - The way we share our investigations is different. 💭\nWe want you to read our story but also learn ou")]),
|
||||
("https://www.instagram.com/reel/DEVLK8qoIbg/", "insta-via-bot: success", [MagicMock(id=101, media=None, message="Our volunteer community is at the centre of many incredible Bellingcat investigations and tools. Stephanie Ladel is one such vol")]),
|
||||
# todo tbot not working for stories :(
|
||||
("https://www.instagram.com/stories/bellingcatofficial/3556336382743057476/", False, [MagicMock(id=101, media=None, message="Media not found or unavailable")]),
|
||||
("https://www.youtube.com/watch?v=ymCMy8OffHM", False, []),
|
||||
("https://www.instagram.com/p/INVALID", False, [MagicMock(id=101, media=None, message="You must enter a URL to a post")]),
|
||||
@pytest.mark.parametrize("url, expected_status, message, len_media", [
|
||||
("https://www.instagram.com/p/C4QgLbrIKXG", "insta-via-bot: success", "Are you new to Bellingcat? - The way we share our investigations is different. 💭\nWe want you to read our story but also learn ou", 6),
|
||||
("https://www.instagram.com/reel/DEVLK8qoIbg/", "insta-via-bot: success", "Our volunteer community is at the centre of many incredible Bellingcat investigations and tools. Stephanie Ladel is one such vol", 3),
|
||||
# instagram tbot not working (potentially intermittently?) for stories - replace with a live story to retest
|
||||
# ("https://www.instagram.com/stories/bellingcatofficial/3556336382743057476/", False, "Media not found or unavailable"),
|
||||
# Seems to be working intermittently for highlights
|
||||
# ("https://www.instagram.com/stories/highlights/17868810693068139/", "insta-via-bot: success", None, 50),
|
||||
# Marking invalid url as success
|
||||
("https://www.instagram.com/p/INVALID", "insta-via-bot: success", "Media not found or unavailable", 0),
|
||||
("https://www.youtube.com/watch?v=ymCMy8OffHM", False, None, 0),
|
||||
])
|
||||
def test_download(self, mock_sleep, url, expected_status, bot_responses, metadata_sample):
|
||||
def test_download(self, url, expected_status, message, len_media, metadata_sample):
|
||||
"""Test the `download()` method with various Instagram URLs."""
|
||||
metadata_sample.set_url(url)
|
||||
self.extractor.client = MagicMock()
|
||||
|
||||
result = self.extractor.download(metadata_sample)
|
||||
pass
|
||||
# TODO fully mock or use as authenticated test
|
||||
# if expected_status:
|
||||
# assert result.is_success()
|
||||
# assert result.status == expected_status
|
||||
# assert result.metadata.get("title") in [msg.message[:128] for msg in bot_responses if msg.message]
|
||||
# else:
|
||||
# assert result is False
|
||||
|
||||
|
||||
|
||||
|
||||
# Test story
|
||||
# Test expired story
|
||||
# Test requires login/ access (?)
|
||||
# Test post
|
||||
# Test multiple images?
|
||||
if expected_status:
|
||||
assert result.is_success()
|
||||
assert result.status == expected_status
|
||||
assert result.metadata.get("title") == message
|
||||
assert len(result.media) == len_media
|
||||
else:
|
||||
assert result is False
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
# Note this isn't a feeder, but contained as utility of the gsheet feeder module
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
|
||||
146
tests/utils/test_misc.py
Normal file
146
tests/utils/test_misc.py
Normal file
@@ -0,0 +1,146 @@
|
||||
import hashlib
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from auto_archiver.utils.misc import (
|
||||
mkdir_if_not_exists,
|
||||
expand_url,
|
||||
getattr_or,
|
||||
DateTimeEncoder,
|
||||
dump_payload,
|
||||
get_datetime_from_str,
|
||||
update_nested_dict,
|
||||
calculate_file_hash,
|
||||
random_str,
|
||||
get_timestamp
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sample_file(tmp_path):
|
||||
file_path = tmp_path / "test.txt"
|
||||
file_path.write_text("test content")
|
||||
return file_path
|
||||
|
||||
|
||||
class TestDirectoryUtils:
|
||||
def test_mkdir_creates_new_directory(self, tmp_path):
|
||||
new_dir = tmp_path / "new_folder"
|
||||
mkdir_if_not_exists(new_dir)
|
||||
assert new_dir.exists()
|
||||
assert new_dir.is_dir()
|
||||
|
||||
def test_mkdir_exists_quietly(self, tmp_path):
|
||||
existing_dir = tmp_path / "existing"
|
||||
existing_dir.mkdir()
|
||||
mkdir_if_not_exists(existing_dir)
|
||||
assert existing_dir.exists()
|
||||
|
||||
class TestURLExpansion:
|
||||
@pytest.mark.parametrize("input_url,expected", [
|
||||
("https://example.com", "https://example.com"),
|
||||
("https://t.co/test", "https://expanded.url")
|
||||
])
|
||||
def test_expand_url(self, input_url, expected):
|
||||
mock_response = Mock()
|
||||
mock_response.url = "https://expanded.url"
|
||||
with patch('requests.get', return_value=mock_response):
|
||||
|
||||
result = expand_url(input_url)
|
||||
assert result == expected
|
||||
|
||||
def test_expand_url_handles_errors(self, caplog):
|
||||
with patch('requests.get', side_effect=Exception("Connection error")):
|
||||
url = "https://t.co/error"
|
||||
result = expand_url(url)
|
||||
assert result == url
|
||||
assert f"Failed to expand url {url}" in caplog.text
|
||||
|
||||
class TestAttributeHandling:
|
||||
class Sample:
|
||||
exists = "value"
|
||||
none = None
|
||||
|
||||
@pytest.mark.parametrize("obj,attr,default,expected", [
|
||||
(Sample(), "exists", "default", "value"),
|
||||
(Sample(), "none", "default", "default"),
|
||||
(Sample(), "missing", "default", "default"),
|
||||
(None, "anything", "fallback", "fallback"),
|
||||
])
|
||||
def test_getattr_or(self, obj, attr, default, expected):
|
||||
# Test gets attribute or returns a default value
|
||||
assert getattr_or(obj, attr, default) == expected
|
||||
|
||||
class TestDateTimeHandling:
|
||||
def test_datetime_encoder(self, sample_datetime):
|
||||
result = json.dumps({"dt": sample_datetime}, cls=DateTimeEncoder)
|
||||
loaded = json.loads(result)
|
||||
assert loaded["dt"] == str(sample_datetime)
|
||||
|
||||
def test_dump_payload(self, sample_datetime):
|
||||
payload = {"timestamp": sample_datetime}
|
||||
result = dump_payload(payload)
|
||||
assert str(sample_datetime) in result
|
||||
|
||||
@pytest.mark.parametrize("dt_str,fmt,expected", [
|
||||
("2023-01-01 12:00:00+00:00", None, datetime(2023, 1, 1, 12, 0, tzinfo=timezone.utc)),
|
||||
("20230101 120000", "%Y%m%d %H%M%S", datetime(2023, 1, 1, 12, 0)),
|
||||
("invalid", None, None),
|
||||
])
|
||||
def test_datetime_from_string(self, dt_str, fmt, expected):
|
||||
result = get_datetime_from_str(dt_str, fmt)
|
||||
if expected is None:
|
||||
assert result is None
|
||||
else:
|
||||
assert result == expected.replace(tzinfo=result.tzinfo)
|
||||
|
||||
class TestDictUtils:
|
||||
@pytest.mark.parametrize("original,update,expected", [
|
||||
({"a": 1}, {"b": 2}, {"a": 1, "b": 2}),
|
||||
({"nested": {"a": 1}}, {"nested": {"b": 2}}, {"nested": {"a": 1, "b": 2}}),
|
||||
({"a": {"b": {"c": 1}}}, {"a": {"b": {"c": 2}}}, {"a": {"b": {"c": 2}}}),
|
||||
])
|
||||
def test_update_nested_dict(self, original, update, expected):
|
||||
update_nested_dict(original, update)
|
||||
assert original == expected
|
||||
|
||||
class TestHashingUtils:
|
||||
def test_file_hashing(self, sample_file):
|
||||
expected = hashlib.sha256(b"test content").hexdigest()
|
||||
assert calculate_file_hash(str(sample_file)) == expected
|
||||
|
||||
def test_large_file_hashing(self, tmp_path):
|
||||
file_path = tmp_path / "large.bin"
|
||||
content = b"0" * 16_000_000 * 2 # 32MB
|
||||
file_path.write_bytes(content)
|
||||
|
||||
expected = hashlib.sha256(content).hexdigest()
|
||||
assert calculate_file_hash(str(file_path)) == expected
|
||||
|
||||
class TestMiscUtils:
|
||||
def test_random_str_length(self):
|
||||
for length in [8, 16, 32]:
|
||||
assert len(random_str(length)) == length
|
||||
|
||||
def test_random_str_raises_too_long(self):
|
||||
with pytest.raises(AssertionError) as exc_info:
|
||||
random_str(64)
|
||||
assert "length must be less than 32 as UUID4 is used" == str(exc_info.value)
|
||||
|
||||
def test_random_str_uniqueness(self):
|
||||
assert random_str() != random_str()
|
||||
|
||||
@pytest.mark.parametrize("ts_input,utc,iso,expected_type", [
|
||||
(datetime.now(), True, True, str),
|
||||
("2023-01-01T12:00:00+00:00", False, False, datetime),
|
||||
(1672574400, True, True, str),
|
||||
])
|
||||
def test_timestamp_parsing(self, ts_input, utc, iso, expected_type):
|
||||
result = get_timestamp(ts_input, utc=utc, iso=iso)
|
||||
assert isinstance(result, expected_type)
|
||||
|
||||
def test_invalid_timestamp_returns_none(self):
|
||||
assert get_timestamp("invalid-date") is None
|
||||
Reference in New Issue
Block a user