Merge tests from version with context.

This commit is contained in:
erinhmclark
2025-02-05 16:42:58 +00:00
parent 91ca325fd5
commit 52542812dc
13 changed files with 1022 additions and 33 deletions

View File

@@ -12,10 +12,11 @@ from auto_archiver.modules.gsheet_feeder import GWorksheet
class GsheetsDb(Database):
"""
NB: only works if GsheetFeeder is used.
could be updated in the future to support non-GsheetFeeder metadata
NB: only works if GsheetFeeder is used.
could be updated in the future to support non-GsheetFeeder metadata
"""
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
gw, row = self._retrieve_gsheet(item)
@@ -57,7 +58,7 @@ class GsheetsDb(Database):
media: Media = item.get_final_media()
if hasattr(media, "urls"):
batch_if_valid('archive', "\n".join(media.urls))
batch_if_valid('date', True, datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=datetime.timezone.utc).isoformat())
batch_if_valid('date', True, self._get_current_datetime_iso())
batch_if_valid('title', item.get_title())
batch_if_valid('text', item.get("content", ""))
batch_if_valid('timestamp', item.get_timestamp())
@@ -85,6 +86,12 @@ class GsheetsDb(Database):
gw.batch_set_cell(cell_updates)
@staticmethod
def _get_current_datetime_iso() -> str:
"""Helper method to generate the current datetime in ISO format."""
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=datetime.timezone.utc).isoformat()
def _safe_status_update(self, item: Metadata, new_status: str) -> None:
try:
gw, row = self._retrieve_gsheet(item)
@@ -93,9 +100,11 @@ class GsheetsDb(Database):
logger.debug(f"Unable to update sheet: {e}")
def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]:
if gsheet := item.get_context("gsheet"):
gw: GWorksheet = gsheet.get("worksheet")
row: int = gsheet.get("row")
# todo doesn't exist, should be passed from
elif self.sheet_id:
print(self.sheet_id)

View File

@@ -34,19 +34,30 @@ class InstagramTbotExtractor(Extractor):
"""
super().setup(configs)
logger.info(f"SETUP {self.name} checking login...")
self._prepare_session_file()
self._initialize_telegram_client()
# make a copy of the session that is used exclusively with this archiver instance
def _prepare_session_file(self):
"""
Creates a copy of the session file for exclusive use with this archiver instance.
Ensures that a valid session file exists before proceeding.
"""
new_session_file = os.path.join("secrets/", f"instabot-{time.strftime('%Y-%m-%d')}{random_str(8)}.session")
if not os.path.exists(f"{self.session_file}.session"):
raise FileNotFoundError(f"session file {self.session_file}.session not found, "
f"to set this up run the setup script in scripts/telegram_setup.py")
raise FileNotFoundError(f"Session file {self.session_file}.session not found.")
shutil.copy(self.session_file + ".session", new_session_file)
self.session_file = new_session_file.replace(".session", "")
def _initialize_telegram_client(self):
"""Initializes the Telegram client."""
try:
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
except OperationalError as e:
logger.error(f"Unable to access the {self.session_file} session, please make sure you don't use the same session file here and in telethon_extractor. if you do then disable at least one of the archivers for the 1st time you setup telethon session: {e}")
logger.error(
f"Unable to access the {self.session_file} session. "
"Ensure that you don't use the same session file here and in telethon_extractor. "
"If you do, disable at least one of the archivers for the first-time setup of the telethon session: {e}"
)
with self.client.start():
logger.success(f"SETUP {self.name} login works.")
@@ -63,32 +74,49 @@ class InstagramTbotExtractor(Extractor):
result = Metadata()
tmp_dir = self.tmp_dir
with self.client.start():
chat = self.client.get_entity("instagram_load_bot")
since_id = self.client.send_message(entity=chat, message=url).id
attempts = 0
seen_media = []
message = ""
time.sleep(3)
# media is added before text by the bot so it can be used as a stop-logic mechanism
while attempts < (self.timeout - 3) and (not message or not len(seen_media)):
attempts += 1
time.sleep(1)
for post in self.client.iter_messages(chat, min_id=since_id):
since_id = max(since_id, post.id)
if post.media and post.id not in seen_media:
filename_dest = os.path.join(tmp_dir, f'{chat.id}_{post.id}')
media = self.client.download_media(post.media, filename_dest)
if media:
result.add_media(Media(media))
seen_media.append(post.id)
if post.message: message += post.message
chat, since_id = self._send_url_to_bot(url)
message = self._process_messages(chat, since_id, tmp_dir, result)
if "You must enter a URL to a post" in message:
if "You must enter a URL to a post" in message:
logger.debug(f"invalid link {url=} for {self.name}: {message}")
return False
# # TODO: It currently returns this as a success - is that intentional?
# if "Media not found or unavailable" in message:
# logger.debug(f"invalid link {url=} for {self.name}: {message}")
# return False
if message:
result.set_content(message).set_title(message[:128])
return result.success("insta-via-bot")
def _send_url_to_bot(self, url: str):
"""
Sends the URL to the 'instagram_load_bot' and returns (chat, since_id).
"""
chat = self.client.get_entity("instagram_load_bot")
since_message = self.client.send_message(entity=chat, message=url)
return chat, since_message.id
def _process_messages(self, chat, since_id, tmp_dir, result):
attempts = 0
seen_media = []
message = ""
time.sleep(3)
# media is added before text by the bot so it can be used as a stop-logic mechanism
while attempts < (self.timeout - 3) and (not message or not len(seen_media)):
attempts += 1
time.sleep(1)
for post in self.client.iter_messages(chat, min_id=since_id):
since_id = max(since_id, post.id)
# Skip known filler message:
if post.message == 'The bot receives information through https://hikerapi.com/p/hJqpppqi':
continue
if post.media and post.id not in seen_media:
filename_dest = os.path.join(tmp_dir, f'{chat.id}_{post.id}')
media = self.client.download_media(post.media, filename_dest)
if media:
result.add_media(Media(media))
seen_media.append(post.id)
if post.message: message += post.message
return message.strip()

View File

@@ -1 +1 @@
from .telethon_extractor import TelethonArchiver
from .telethon_extractor import TelethonExtractor

View File

@@ -13,7 +13,7 @@ from auto_archiver.core import Metadata, Media
from auto_archiver.utils import random_str
class TelethonArchiver(Extractor):
class TelethonExtractor(Extractor):
valid_url = re.compile(r"https:\/\/t\.me(\/c){0,1}\/(.+)\/(\d+)")
invite_pattern = re.compile(r"t.me(\/joinchat){0,1}\/\+?(.+)")

View File

@@ -1,7 +1,8 @@
"""
pytest conftest file, for shared fixtures and configuration
"""
import os
import pickle
from tempfile import TemporaryDirectory
from typing import Dict, Tuple
import hashlib
@@ -113,4 +114,18 @@ def pytest_runtest_setup(item):
test_name = _test_failed_incremental[cls_name].get((), None)
# if name found, test has failed for the combination of class name & test name
if test_name is not None:
pytest.xfail(f"previous test failed ({test_name})")
pytest.xfail(f"previous test failed ({test_name})")
@pytest.fixture()
def unpickle():
"""
Returns a helper function that unpickles a file
** gets the file from the test_files directory: tests/data/test_files **
"""
def _unpickle(path):
test_data_dir = os.path.join(os.path.dirname(__file__), "data", "test_files")
with open(os.path.join(test_data_dir, path), "rb") as f:
return pickle.load(f)
return _unpickle

View File

@@ -0,0 +1,140 @@
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
import pytest
from auto_archiver.core import Metadata, Media
from auto_archiver.modules.gsheet_db import GsheetsDb
from auto_archiver.modules.gsheet_feeder import GWorksheet
@pytest.fixture
def mock_gworksheet():
mock_gworksheet = MagicMock(spec=GWorksheet)
mock_gworksheet.col_exists.return_value = True
mock_gworksheet.get_cell.return_value = ""
mock_gworksheet.get_row.return_value = {}
return mock_gworksheet
@pytest.fixture
def mock_metadata():
metadata: Metadata = MagicMock(spec=Metadata)
metadata.get_url.return_value = "http://example.com"
metadata.status = "done"
metadata.get_title.return_value = "Example Title"
metadata.get.return_value = "Example Content"
metadata.get_timestamp.return_value = "2025-01-01T00:00:00Z"
metadata.get_final_media.return_value = MagicMock(spec=Media)
metadata.get_all_media.return_value = []
metadata.get_media_by_id.return_value = None
metadata.get_first_image.return_value = None
return metadata
@pytest.fixture
def metadata():
metadata = Metadata()
metadata.add_media(Media(filename="screenshot", urls=["http://example.com/screenshot.png"]))
metadata.add_media(Media(filename="browsertrix", urls=["http://example.com/browsertrix.wacz"]))
metadata.add_media(Media(filename="thumbnail", urls=["http://example.com/thumbnail.png"]))
metadata.set_url("http://example.com")
metadata.set_title("Example Title")
metadata.set_content("Example Content")
metadata.success("my-archiver")
metadata.set("timestamp", "2025-01-01T00:00:00Z")
metadata.set("date", "2025-02-04T18:22:24.909112+00:00")
return metadata
@pytest.fixture
def mock_media():
"""Fixture for a mock Media object."""
mock_media = MagicMock(spec=Media)
mock_media.urls = ["http://example.com/media"]
mock_media.get.return_value = "not-calculated"
return mock_media
@pytest.fixture
def gsheets_db(mock_gworksheet, setup_module):
db = setup_module("gsheet_db", {
"allow_worksheets": "set()",
"block_worksheets": "set()",
"use_sheet_names_in_stored_paths": "True",
})
db._retrieve_gsheet = MagicMock(return_value=(mock_gworksheet, 1))
return db
@pytest.fixture
def fixed_timestamp():
"""Fixture for a fixed timestamp."""
return datetime(2025, 1, 1, tzinfo=timezone.utc)
@pytest.fixture
def expected_calls(mock_media, fixed_timestamp):
"""Fixture for the expected cell updates."""
return [
(1, 'status', 'my-archiver: success'),
(1, 'archive', 'http://example.com/screenshot.png'),
(1, 'date', '2025-02-01T00:00:00+00:00'),
(1, 'title', 'Example Title'),
(1, 'text', 'Example Content'),
(1, 'timestamp', '2025-01-01T00:00:00+00:00'),
(1, 'hash', 'not-calculated'),
# (1, 'screenshot', 'http://example.com/screenshot.png'),
# (1, 'thumbnail', '=IMAGE("http://example.com/thumbnail.png")'),
# (1, 'wacz', 'http://example.com/browsertrix.wacz'),
# (1, 'replaywebpage', 'https://replayweb.page/?source=http%3A%2F%2Fexample.com%2Fbrowsertrix.wacz#view=pages&url=')
]
def test_retrieve_gsheet(gsheets_db, metadata, mock_gworksheet):
gw, row = gsheets_db._retrieve_gsheet(metadata)
assert gw == mock_gworksheet
assert row == 1
def test_started(gsheets_db, mock_metadata, mock_gworksheet):
gsheets_db.started(mock_metadata)
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', 'Archive in progress')
def test_failed(gsheets_db, mock_metadata, mock_gworksheet):
reason = "Test failure"
gsheets_db.failed(mock_metadata, reason)
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', f'Archive failed {reason}')
def test_aborted(gsheets_db, mock_metadata, mock_gworksheet):
gsheets_db.aborted(mock_metadata)
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', '')
def test_done(gsheets_db, metadata, mock_gworksheet, expected_calls):
with patch.object(gsheets_db, '_get_current_datetime_iso', return_value='2025-02-01T00:00:00+00:00'):
gsheets_db.done(metadata)
mock_gworksheet.batch_set_cell.assert_called_once_with(expected_calls)
def test_done_cached(gsheets_db, metadata, mock_gworksheet):
with patch.object(gsheets_db, '_get_current_datetime_iso', return_value='2025-02-01T00:00:00+00:00'):
gsheets_db.done(metadata, cached=True)
# Verify the status message includes "[cached]"
call_args = mock_gworksheet.batch_set_cell.call_args[0][0]
assert any(call[2].startswith("[cached]") for call in call_args)
def test_done_missing_media(gsheets_db, metadata, mock_gworksheet):
# clear media from metadata
metadata.media = []
with patch.object(gsheets_db, '_get_current_datetime_iso', return_value='2025-02-01T00:00:00+00:00'):
gsheets_db.done(metadata)
# Verify nothing media-related gets updated
call_args = mock_gworksheet.batch_set_cell.call_args[0][0]
media_fields = {'archive', 'screenshot', 'thumbnail', 'wacz', 'replaywebpage'}
assert all(call[1] not in media_fields for call in call_args)
def test_safe_status_update(gsheets_db, metadata, mock_gworksheet):
gsheets_db._safe_status_update(metadata, "Test status")
mock_gworksheet.set_cell.assert_called_once_with(1, 'status', 'Test status')

View File

@@ -0,0 +1,108 @@
from datetime import datetime
from typing import Type
import pytest
from unittest.mock import patch, MagicMock
from auto_archiver.core import Metadata
from auto_archiver.modules.instagram_api_extractor.instagram_api_extractor import InstagramAPIExtractor
from .test_extractor_base import TestExtractorBase
@pytest.fixture
def mock_user_response():
return {
"user": {
"pk": "123",
"username": "test_user",
"full_name": "Test User",
"profile_pic_url_hd": "http://example.com/profile.jpg",
"profile_pic_url": "http://example.com/profile_lowres.jpg"
}
}
@pytest.fixture
def mock_post_response():
return {
"id": "post_123",
"code": "abc123",
"caption_text": "Test Caption",
"taken_at": datetime.now().timestamp(),
"video_url": "http://example.com/video.mp4",
"thumbnail_url": "http://example.com/thumbnail.jpg"
}
@pytest.fixture
def mock_story_response():
return [{
"id": "story_123",
"taken_at": datetime.now().timestamp(),
"video_url": "http://example.com/story.mp4"
}]
@pytest.fixture
def mock_highlight_response():
return {
"response": {
"reels": {
"highlight:123": {
"id": "123",
"title": "Test Highlight",
"items": [{
"id": "item_123",
"taken_at": datetime.now().timestamp(),
"video_url": "http://example.com/highlight.mp4"
}]
}
}
}
}
# @pytest.mark.incremental
class TestInstagramAPIExtractor(TestExtractorBase):
"""
Test suite for InstagramAPIExtractor.
"""
extractor_module = "instagram_api_extractor"
extractor: InstagramAPIExtractor
config = {
"access_token": "test_access_token",
"api_endpoint": "https://api.instagram.com/v1",
# "full_profile": False,
# "full_profile_max_posts": 0,
# "minimize_json_output": True,
}
@pytest.mark.parametrize("url,expected", [
("https://instagram.com/user", [("", "user", "")]),
("https://instagr.am/p/post_id", []),
("https://youtube.com", []),
("https://www.instagram.com/reel/reel_id", [("reel", "reel_id", "")]),
("https://instagram.com/stories/highlights/123", [("stories/highlights", "123", "")]),
("https://instagram.com/stories/user/123", [("stories", "user", "123")]),
])
def test_url_parsing(self, url, expected):
assert self.extractor.valid_url.findall(url) == expected
def test_initialize(self):
self.extractor.initialise()
assert self.extractor.api_endpoint[-1] != "/"
@pytest.mark.parametrize("input_dict,expected", [
({"x": 0, "valid": "data"}, {"valid": "data"}),
({"nested": {"y": None, "valid": [{}]}}, {"nested": {"valid": [{}]}}),
])
def test_cleanup_dict(self, input_dict, expected):
assert self.extractor.cleanup_dict(input_dict) == expected
def test_download_post(self):
# test with context=reel
# test with context=post
# test with multiple images
# test gets text (metadata title)
pass

View File

@@ -0,0 +1,111 @@
import os
import pickle
from typing import Type
from unittest.mock import patch, MagicMock
import pytest
from auto_archiver.core.extractor import Extractor
from auto_archiver.modules.instagram_tbot_extractor import InstagramTbotExtractor
TESTFILES = os.path.join(os.path.dirname(__file__), "testfiles")
@pytest.fixture
def test_session_file(tmpdir):
"""Fixture to create a test session file."""
session_file = os.path.join(tmpdir, "test_session.session")
with open(session_file, "w") as f:
f.write("mock_session_data")
return session_file.replace(".session", "")
@pytest.mark.incremental
class TestInstagramTbotExtractor(object):
"""
Test suite for InstagramTbotExtractor.
"""
extractor_module = "instagram_tbot_extractor"
extractor: InstagramTbotExtractor
config = {
"api_id": 12345,
"api_hash": "test_api_hash",
# "session_file"
}
@pytest.fixture(autouse=True)
def setup_extractor(self, setup_module):
assert self.extractor_module is not None, "self.extractor_module must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
extractor: Type[Extractor] = setup_module(self.extractor_module, self.config)
return extractor
@pytest.fixture
def mock_telegram_client(self):
"""Fixture to mock TelegramClient interactions."""
with patch("auto_archiver.modules.instagram_tbot_extractor._initialize_telegram_client") as mock_client:
instance = MagicMock()
mock_client.return_value = instance
yield instance
# @pytest.fixture
# def mock_session_file(self, temp_session_file):
# """Patch the extractors session file setup to use a temporary path."""
# with patch.object(InstagramTbotExtractor, "session_file", temp_session_file):
# with patch.object(InstagramTbotExtractor, "_prepare_session_file", return_value=None):
# yield # Mocks are applied for the duration of the test
@pytest.fixture
def metadata_sample(self):
"""Loads a Metadata object from a pickle file."""
with open(os.path.join(TESTFILES, "metadata_item.pkl"), "rb") as f:
return pickle.load(f)
@pytest.mark.download
@pytest.mark.parametrize("url, expected_status, bot_responses", [
("https://www.instagram.com/p/C4QgLbrIKXG", "insta-via-bot: success", [MagicMock(id=101, media=None, message="Are you new to Bellingcat? - The way we share our investigations is different. 💭\nWe want you to read our story but also learn ou")]),
("https://www.instagram.com/reel/DEVLK8qoIbg/", "insta-via-bot: success", [MagicMock(id=101, media=None, message="Our volunteer community is at the centre of many incredible Bellingcat investigations and tools. Stephanie Ladel is one such vol")]),
# todo tbot not working for stories :(
("https://www.instagram.com/stories/bellingcatofficial/3556336382743057476/", False, [MagicMock(id=101, media=None, message="Media not found or unavailable")]),
("https://www.youtube.com/watch?v=ymCMy8OffHM", False, []),
("https://www.instagram.com/p/INVALID", False, [MagicMock(id=101, media=None, message="You must enter a URL to a post")]),
])
def test_download(self, url, expected_status, bot_responses, metadata_sample):
"""Test the `download()` method with various Instagram URLs."""
metadata_sample.set_url(url)
self.extractor.initialise()
result = self.extractor.download(metadata_sample)
if expected_status:
assert result.is_success()
assert result.status == expected_status
assert result.metadata.get("title") in [msg.message[:128] for msg in bot_responses if msg.message]
else:
assert result is False
# self.extractor.cleanup()
# @patch.object(InstagramTbotExtractor, '_send_url_to_bot')
# @patch.object(InstagramTbotExtractor, '_process_messages')
# def test_download_invalid_link_returns_false(
# self, mock_process, mock_send, extractor, metadata_instagram
# ):
# # Setup Mocks
# # _send_url_to_bot -> simulate it returns (chat=MagicMock, since_id=100)
# mock_chat = MagicMock()
# mock_send.return_value = (mock_chat, 100)
# # _process_messages -> simulate it returns the text "You must enter a URL to a post"
# mock_process.return_value = "You must enter a URL to a post"
# result = extractor.download(metadata_instagram)
# assert result is False, "Should return False if message includes 'You must enter a URL to a post'"
# Test story
# Test expired story
# Test requires login/ access (?)
# Test post
# Test multiple images?

View File

@@ -0,0 +1,268 @@
from typing import Type
import gspread
import pytest
from unittest.mock import patch, MagicMock
from auto_archiver.modules.gsheet_feeder import GsheetsFeeder
from auto_archiver.core import Metadata, Feeder, ArchivingContext
def test_initialise_without_sheet_and_sheet_id(setup_module):
"""Ensure initialise() raises AssertionError if neither sheet nor sheet_id is set.
(shouldn't really be asserting in there)
"""
with patch("gspread.service_account"):
feeder = setup_module("gsheet_feeder",
{"service_account": "dummy.json",
"sheet": None,
"sheet_id": None})
with pytest.raises(AssertionError):
feeder.initialise()
@pytest.fixture
def gsheet_feeder(setup_module) -> GsheetsFeeder:
feeder = setup_module("gsheet_feeder",
{"service_account": "dummy.json",
"sheet": "test-auto-archiver",
"sheet_id": None,
"header": 1,
"columns": {
"url": "link",
"status": "archive status",
"folder": "destination folder",
"archive": "archive location",
"date": "archive date",
"thumbnail": "thumbnail",
"timestamp": "upload timestamp",
"title": "upload title",
"text": "text content",
"screenshot": "screenshot",
"hash": "hash",
"pdq_hash": "perceptual hashes",
"wacz": "wacz",
"replaywebpage": "replaywebpage",
},
"allow_worksheets": set(),
"block_worksheets": set(),
"use_sheet_names_in_stored_paths": True,
}
)
feeder.gsheets_client = MagicMock()
return feeder
@pytest.fixture()
def worksheet(unpickle):
# Load the worksheet data from the pickle file
# only works for simple usage, cant reauthenticate but give structure
return unpickle("test_worksheet.pickle")
class TestWorksheet():
"""
mimics the bits we need from gworksheet
"""
class SheetSheet:
title = "TestSheet"
rows = [
{ "row": 2, "url": "http://example.com", "status": "", "folder": "" },
{ "row": 3, "url": "http://example.com", "status": "", "folder": "" },
{ "row": 4, "url": "", "status": "", "folder": "" },
{ "row": 5, "url": "https://another.com", "status": None, "folder": "" },
{ "row": 6, "url": "https://another.com", "status": "success", "folder": "some_folder" },
]
def __init__(self):
self.wks = self.SheetSheet()
def count_rows(self):
if not self.rows:
return 0
return max(r["row"] for r in self.rows)
def get_cell(self, row, col_name, fresh=False):
matching = next((r for r in self.rows if r["row"] == row), {})
return matching.get(col_name, "")
def get_cell_or_default(self, row, col_name, default):
matching = next((r for r in self.rows if r["row"] == row), {})
return matching.get(col_name, default)
def test__process_rows(gsheet_feeder: GsheetsFeeder):
testworksheet = TestWorksheet()
metadata_items = list(gsheet_feeder._process_rows(testworksheet))
assert len(metadata_items) == 3
assert isinstance(metadata_items[0], Metadata)
assert metadata_items[0].get("url") == "http://example.com"
def test__set_metadata(gsheet_feeder: GsheetsFeeder, worksheet):
gsheet_feeder._set_context(worksheet, 1)
assert ArchivingContext.get("gsheet") == {"row": 1, "worksheet": worksheet}
@pytest.mark.skip(reason="Not recognising folder column")
def test__set_metadata_with_folder_pickled(gsheet_feeder: GsheetsFeeder, worksheet):
gsheet_feeder._set_context(worksheet, 7)
assert ArchivingContext.get("gsheet") == {"row": 1, "worksheet": worksheet}
def test__set_metadata_with_folder(gsheet_feeder: GsheetsFeeder):
testworksheet = TestWorksheet()
testworksheet.wks.title = "TestSheet"
gsheet_feeder._set_context(testworksheet, 6)
assert ArchivingContext.get("gsheet") == {"row": 6, "worksheet": testworksheet}
assert ArchivingContext.get("folder") == "some-folder/test-auto-archiver/testsheet"
@pytest.mark.usefixtures("setup_module")
@pytest.mark.parametrize("sheet, sheet_id, expected_method, expected_arg, description", [
("TestSheet", None, "open", "TestSheet", "opening by sheet name"),
(None, "ABC123", "open_by_key", "ABC123", "opening by sheet ID")
])
def test_open_sheet_with_name_or_id(setup_module, sheet, sheet_id, expected_method, expected_arg, description):
"""Ensure open_sheet() correctly opens by name or ID based on configuration."""
with patch("gspread.service_account") as mock_service_account:
mock_client = MagicMock()
mock_service_account.return_value = mock_client
mock_client.open.return_value = "MockSheet"
mock_client.open_by_key.return_value = "MockSheet"
# Setup module with parameterized values
feeder = setup_module("gsheet_feeder", {
"service_account": "dummy.json",
"sheet": sheet,
"sheet_id": sheet_id
})
feeder.initialise()
sheet_result = feeder.open_sheet()
# Validate the correct method was called
getattr(mock_client, expected_method).assert_called_once_with(expected_arg), f"Failed: {description}"
assert sheet_result == "MockSheet", f"Failed: {description}"
@pytest.mark.usefixtures("setup_module")
def test_open_sheet_with_sheet_id(setup_module):
"""Ensure open_sheet() correctly opens a sheet by ID."""
with patch("gspread.service_account") as mock_service_account:
mock_client = MagicMock()
mock_service_account.return_value = mock_client
mock_client.open_by_key.return_value = "MockSheet"
feeder = setup_module("gsheet_feeder",
{"service_account": "dummy.json",
"sheet": None,
"sheet_id": "ABC123"})
feeder.initialise()
sheet = feeder.open_sheet()
mock_client.open_by_key.assert_called_once_with("ABC123")
assert sheet == "MockSheet"
def test_should_process_sheet(setup_module):
gdb = setup_module("gsheet_feeder", {"service_account": "dummy.json",
"sheet": "TestSheet",
"sheet_id": None,
"allow_worksheets": {"TestSheet", "Sheet2"},
"block_worksheets": {"Sheet3"}}
)
assert gdb.should_process_sheet("TestSheet") == True
assert gdb.should_process_sheet("Sheet3") == False
# False if allow_worksheets is set
assert gdb.should_process_sheet("AnotherSheet") == False
@pytest.mark.skip
class TestGSheetsFeederReal:
""" Testing GSheetsFeeder class """
module_name: str = 'gsheet_feeder'
feeder: GsheetsFeeder
config: dict = {
# TODO: Create test creds
"service_account": "secrets/service_account.json",
"sheet": "test-auto-archiver",
"sheet_id": None,
"header": 1,
"columns": {
"url": "link",
"status": "archive status",
"folder": "destination folder",
"archive": "archive location",
"date": "archive date",
"thumbnail": "thumbnail",
"timestamp": "upload timestamp",
"title": "upload title",
"text": "text content",
"screenshot": "screenshot",
"hash": "hash",
"pdq_hash": "perceptual hashes",
"wacz": "wacz",
"replaywebpage": "replaywebpage",
},
"allow_worksheets": set(),
"block_worksheets": set(),
"use_sheet_names_in_stored_paths": True,
}
@pytest.fixture(autouse=True)
def setup_feeder(self, setup_module):
assert (
self.module_name is not None
), "self.module_name must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
self.feeder: Type[Feeder] = setup_module(
self.module_name, self.config
)
def reset_test_sheet(self):
"""Clears test sheet and re-adds headers to ensure consistent test results."""
client = gspread.service_account(self.config["service_account"])
sheet = client.open(self.config["sheet"])
worksheet = sheet.get_worksheet(0)
worksheet.clear()
worksheet.append_row(["Link", "Archive Status"])
def test_initialise(self):
self.feeder.initialise()
assert hasattr(self.feeder, "gsheets_client")
@pytest.mark.download
def test_open_sheet_real_connection(self):
"""Ensure open_sheet() connects to a real Google Sheets instance."""
self.feeder.initialise()
sheet = self.feeder.open_sheet()
assert sheet is not None, "open_sheet() should return a valid sheet instance"
assert hasattr(sheet, "worksheets"), "Returned object should have worksheets method"
@pytest.mark.download
def test_iter_yields_metadata_real_data(self):
"""Ensure __iter__() yields Metadata objects for real test sheet data."""
self.reset_test_sheet()
client = gspread.service_account(self.config["service_account"])
sheet = client.open(self.config["sheet"])
worksheet = sheet.get_worksheet(0)
# Insert test rows as a temp method
# Next we will refactor the feeder for better testing
test_rows = [
["https://example.com", ""],
["", ""],
["https://example.com", "done"],
]
worksheet.append_rows(test_rows)
self.feeder.initialise()
metadata_list = list(self.feeder)
# Validate that only the first row is processed
assert len(metadata_list) == 1
assert metadata_list[0].metadata.get("url") == "https://example.com"
# TODO
# Test two sheets
# test two sheets with different columns
# test folder implementation

View File

@@ -0,0 +1,144 @@
import pytest
from unittest.mock import MagicMock
from auto_archiver.modules.gsheet_feeder import GWorksheet
class TestGWorksheet:
@pytest.fixture
def mock_worksheet(self):
mock_ws = MagicMock()
mock_ws.get_values.return_value = [
["Link", "Archive Status", "Archive Location", "Archive Date"],
["url1", "archived", "filepath1", "2023-01-01"],
["url2", "pending", "filepath2", "2023-01-02"],
]
return mock_ws
@pytest.fixture
def gworksheet(self, mock_worksheet):
return GWorksheet(mock_worksheet)
# Test initialization and basic properties
def test_initialization_sets_headers(self, gworksheet):
assert gworksheet.headers == ["link", "archive status", "archive location", "archive date"]
def test_count_rows_returns_correct_value(self, gworksheet):
# inc header row
assert gworksheet.count_rows() == 3
# Test column validation and lookup
@pytest.mark.parametrize(
"col,expected_index",
[
("url", 0),
("status", 1),
("archive", 2),
("date", 3),
],
)
def test_col_index_returns_correct_index(self, gworksheet, col, expected_index):
assert gworksheet._col_index(col) == expected_index
def test_check_col_exists_raises_for_invalid_column(self, gworksheet):
with pytest.raises(Exception, match="Column invalid_col"):
gworksheet._check_col_exists("invalid_col")
# Test data retrieval
@pytest.mark.parametrize(
"row,expected",
[
(1, ["Link", "Archive Status", "Archive Location", "Archive Date"]),
(2, ["url1", "archived", "filepath1", "2023-01-01"]),
(3, ["url2", "pending", "filepath2", "2023-01-02"]),
],
)
def test_get_row_returns_correct_data(self, gworksheet, row, expected):
assert gworksheet.get_row(row) == expected
@pytest.mark.parametrize(
"row,col,expected",
[
(2, "url", "url1"),
(2, "status", "archived"),
(3, "date", "2023-01-02"),
],
)
def test_get_cell_returns_correct_value(self, gworksheet, row, col, expected):
assert gworksheet.get_cell(row, col) == expected
def test_get_cell_handles_fresh_data(self, mock_worksheet, gworksheet):
mock_worksheet.cell.return_value.value = "fresh_value"
result = gworksheet.get_cell(2, "url", fresh=True)
assert result == "fresh_value"
mock_worksheet.cell.assert_called_once_with(2, 1)
# Test edge cases and error handling
@pytest.mark.parametrize(
"when_empty,expected",
[
(True, "default"),
(False, ""),
],
)
def test_get_cell_or_default_handles_empty_values(
self, mock_worksheet, when_empty, expected
):
mock_worksheet.get_values.return_value[1][0] = "" # Empty URL cell
g = GWorksheet(mock_worksheet)
assert (
g.get_cell_or_default(
2, "url", default="default", when_empty_use_default=when_empty
)
== expected
)
def test_get_cell_or_default_handles_missing_columns(self, gworksheet):
assert (
gworksheet.get_cell_or_default(1, "invalid_col", default="safe") == "safe"
)
# Test write operations
def test_set_cell_updates_correct_position(self, mock_worksheet, gworksheet):
gworksheet.set_cell(2, "url", "new_url")
mock_worksheet.update_cell.assert_called_once_with(2, 1, "new_url")
def test_batch_set_cell_formats_requests_correctly(
self, mock_worksheet, gworksheet
):
updates = [(2, "url", "new_url"), (3, "status", "processed")]
gworksheet.batch_set_cell(updates)
expected_batch = [
{"range": "A2", "values": [["new_url"]]},
{"range": "B3", "values": [["processed"]]},
]
mock_worksheet.batch_update.assert_called_once_with(
expected_batch, value_input_option="USER_ENTERED"
)
def test_batch_set_cell_truncates_long_values(self, mock_worksheet, gworksheet):
long_value = "x" * 50000
gworksheet.batch_set_cell([(1, "url", long_value)])
submitted_value = mock_worksheet.batch_update.call_args[0][0][0]["values"][0][0]
assert len(submitted_value) == 49999
# Test coordinate conversion
@pytest.mark.parametrize(
"row,col,expected",
[
(1, "url", "A1"),
(2, "status", "B2"),
(3, "archive", "C3"),
(4, "date", "D4"),
],
)
def test_to_a1_conversion(self, gworksheet, row, col, expected):
assert gworksheet.to_a1(row, col) == expected
# Test empty worksheet
def test_empty_worksheet_initialization(self):
mock_ws = MagicMock()
mock_ws.get_values.return_value = []
g = GWorksheet(mock_ws)
assert g.headers == []
assert g.count_rows() == 0

View File

@@ -0,0 +1,100 @@
from typing import Type
import pytest
from unittest.mock import MagicMock, patch, mock_open
from auto_archiver.core import Media
from auto_archiver.modules.s3_storage import s3_storage
from tests.storages.test_storage_base import TestStorageBase
class TestGDriveStorage:
"""
Test suite for GDriveStorage.
"""
module_name: str = "s3_storage"
storage: Type[s3_storage]
s3: MagicMock
config: dict = {
"path_generator": "flat",
"filename_generator": "static",
"bucket": "test-bucket",
"region": "test-region",
"key": "test-key",
"secret": "test-secret",
"random_no_duplicate": False,
"endpoint_url": "https://{region}.example.com",
"cdn_url": "https://cdn.example.com/{key}",
"private": False,
}
@patch('boto3.client')
@pytest.fixture(autouse=True)
def setup_storage(self, setup_module):
self.storage = setup_module(self.module_name, self.config)
self.storage.initialise()
@patch('boto3.client')
def test_client_initialization(self, mock_boto_client, setup_module):
"""Test that S3 client is initialized with correct parameters"""
self.storage.initialise()
mock_boto_client.assert_called_once_with(
's3',
region_name='test-region',
endpoint_url='https://test-region.example.com',
aws_access_key_id='test-key',
aws_secret_access_key='test-secret'
)
def test_get_cdn_url_generation(self):
"""Test CDN URL formatting """
media = Media("test.txt")
media.key = "path/to/file.txt"
url = self.storage.get_cdn_url(media)
assert url == "https://cdn.example.com/path/to/file.txt"
media.key = "another/path.jpg"
assert self.storage.get_cdn_url(media) == "https://cdn.example.com/another/path.jpg"
@patch.object(s3_storage.S3Storage, 'file_in_folder')
def test_skips_upload_when_duplicate_exists(self, mock_file_in_folder):
"""Test that upload skips when file_in_folder finds existing object"""
# Setup test-specific configuration
self.storage.random_no_duplicate = True
mock_file_in_folder.return_value = "existing_folder/existing_file.txt"
# Create test media with calculated hash
media = Media("test.txt")
media.key = "original_path.txt"
# Mock hash calculation
with patch.object(self.storage, 'calculate_hash') as mock_calculate_hash:
mock_calculate_hash.return_value = "testhash123"
# Verify upload
assert self.storage.is_upload_needed(media) is False
assert media.key == "existing_folder/existing_file.txt"
assert media.get("previously archived") is True
with patch.object(self.storage.s3, 'upload_fileobj') as mock_upload:
result = self.storage.uploadf(None, media)
mock_upload.assert_not_called()
assert result is True
@patch.object(s3_storage.S3Storage, 'is_upload_needed')
def test_uploads_with_correct_parameters(self, mock_upload_needed):
media = Media("test.txt")
mock_upload_needed.return_value = True
media.mimetype = 'image/png'
mock_file = MagicMock()
with patch.object(self.storage.s3, 'upload_fileobj') as mock_upload:
self.storage.uploadf(mock_file, media)
# Verify core upload parameters
mock_upload.assert_called_once_with(
mock_file,
Bucket='test-bucket',
# Key='original_key.txt',
Key=None,
ExtraArgs={
'ACL': 'public-read',
'ContentType': 'image/png'
}
)

View File

@@ -0,0 +1,43 @@
from typing import Type
import pytest
from unittest.mock import MagicMock, patch
from auto_archiver.core import Media
from auto_archiver.modules.gdrive_storage import GDriveStorage
from auto_archiver.core.metadata import Metadata
from tests.storages.test_storage_base import TestStorageBase
class TestGDriveStorage(TestStorageBase):
"""
Test suite for GDriveStorage.
"""
module_name: str = "gdrive_storage"
storage: Type[GDriveStorage]
config: dict = {'path_generator': 'url',
'filename_generator': 'static',
'root_folder_id': "fake_root_folder_id",
'oauth_token': None,
'service_account': 'fake_service_account.json'
}
@pytest.mark.skip(reason="Requires real credentials")
@pytest.mark.download
def test_initialize_with_real_credentials(self):
"""
Test that the Google Drive service can be initialized with real credentials.
"""
self.storage.service_account = 'secrets/service_account.json' # Path to real credentials
self.storage.initialise()
assert self.storage.service is not None
def test_initialize_fails_with_non_existent_creds(self):
"""
Test that the Google Drive service raises a FileNotFoundError when the service account file does not exist.
"""
# Act and Assert
with pytest.raises(FileNotFoundError) as exc_info:
self.storage.initialise()
assert "No such file or directory" in str(exc_info.value)

View File

@@ -0,0 +1,23 @@
from typing import Type
import pytest
from auto_archiver.core.context import ArchivingContext
from auto_archiver.core.metadata import Metadata
from auto_archiver.core.storage import Storage
class TestStorageBase(object):
module_name: str = None
config: dict = None
@pytest.fixture(autouse=True)
def setup_storage(self, setup_module):
assert (
self.module_name is not None
), "self.module_name must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
self.storage: Type[Storage] = setup_module(
self.module_name, self.config
)