diff --git a/src/auto_archiver/modules/gsheet_db/gsheet_db.py b/src/auto_archiver/modules/gsheet_db/gsheet_db.py index 644015e..3bb27b7 100644 --- a/src/auto_archiver/modules/gsheet_db/gsheet_db.py +++ b/src/auto_archiver/modules/gsheet_db/gsheet_db.py @@ -104,7 +104,6 @@ class GsheetsDb(Database): if gsheet := item.get_context("gsheet"): gw: GWorksheet = gsheet.get("worksheet") row: int = gsheet.get("row") - # todo doesn't exist, should be passed from elif self.sheet_id: print(self.sheet_id) diff --git a/src/auto_archiver/modules/gsheet_feeder/gsheet_feeder.py b/src/auto_archiver/modules/gsheet_feeder/gsheet_feeder.py index d129182..a51574e 100644 --- a/src/auto_archiver/modules/gsheet_feeder/gsheet_feeder.py +++ b/src/auto_archiver/modules/gsheet_feeder/gsheet_feeder.py @@ -37,41 +37,48 @@ class GsheetsFeeder(Feeder): def __iter__(self) -> Metadata: sh = self.open_sheet() - for ii, wks in enumerate(sh.worksheets()): - if not self.should_process_sheet(wks.title): - logger.debug(f"SKIPPED worksheet '{wks.title}' due to allow/block rules") + for ii, worksheet in enumerate(sh.worksheets()): + if not self.should_process_sheet(worksheet.title): + logger.debug(f"SKIPPED worksheet '{worksheet.title}' due to allow/block rules") continue - - logger.info(f'Opening worksheet {ii=}: {wks.title=} header={self.header}') - gw = GWorksheet(wks, header_row=self.header, columns=self.columns) - + logger.info(f'Opening worksheet {ii=}: {worksheet.title=} header={self.header}') + gw = GWorksheet(worksheet, header_row=self.header, columns=self.columns) if len(missing_cols := self.missing_required_columns(gw)): logger.warning(f"SKIPPED worksheet '{wks.title}' due to missing required column(s) for {missing_cols}") continue - for row in range(1 + self.header, gw.count_rows() + 1): - url = gw.get_cell(row, 'url').strip() - if not len(url): continue + # process and yield metadata here: + yield from self._process_rows(gw) + logger.success(f'Finished worksheet {worksheet.title}') - original_status = gw.get_cell(row, 'status') - status = gw.get_cell(row, 'status', fresh=original_status in ['', None]) - # TODO: custom status parser(?) aka should_retry_from_status - if status not in ['', None]: continue + def _process_rows(self, gw: GWorksheet) -> Metadata: + for row in range(1 + self.header, gw.count_rows() + 1): + url = gw.get_cell(row, 'url').strip() + if not len(url): continue + original_status = gw.get_cell(row, 'status') + status = gw.get_cell(row, 'status', fresh=original_status in ['', None]) + # TODO: custom status parser(?) aka should_retry_from_status + if status not in ['', None]: continue - # All checks done - archival process starts here - m = Metadata().set_url(url) - if gw.get_cell_or_default(row, 'folder', "") is None: - folder = '' - else: - folder = slugify(gw.get_cell_or_default(row, 'folder', "").strip()) - if len(folder) and self.use_sheet_names_in_stored_paths: - folder = os.path.join(folder, slugify(self.sheet), slugify(wks.title)) + # All checks done - archival process starts here + m = Metadata().set_url(url) + self._set_context(m, gw, row) + yield m - m.set_context('folder', folder) - m.set_context('worksheet', {"row": row, "worksheet": gw}) - yield m + def _set_context(self, m: Metadata, gw: GWorksheet, row: int) -> Metadata: + # TODO: Check folder value not being recognised + m.set_context("gsheet", {"row": row, "worksheet": gw}) + + if gw.get_cell_or_default(row, 'folder', "") is None: + folder = '' + else: + folder = slugify(gw.get_cell_or_default(row, 'folder', "").strip()) + if len(folder): + if self.use_sheet_names_in_stored_paths: + m.set_context("folder", os.path.join(folder, slugify(self.sheet), slugify(gw.wks.title))) + else: + m.set_context("folder", folder) - logger.success(f'Finished worksheet {wks.title}') def should_process_sheet(self, sheet_name: str) -> bool: if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets: diff --git a/tests/extractors/test_instagram_api_extractor.py b/tests/extractors/test_instagram_api_extractor.py index 7a19233..d3f7bd6 100644 --- a/tests/extractors/test_instagram_api_extractor.py +++ b/tests/extractors/test_instagram_api_extractor.py @@ -9,6 +9,7 @@ from auto_archiver.modules.instagram_api_extractor.instagram_api_extractor impor from .test_extractor_base import TestExtractorBase + @pytest.fixture def mock_user_response(): return { @@ -71,11 +72,18 @@ class TestInstagramAPIExtractor(TestExtractorBase): config = { "access_token": "test_access_token", "api_endpoint": "https://api.instagram.com/v1", - # "full_profile": False, + "full_profile": False, # "full_profile_max_posts": 0, # "minimize_json_output": True, } + @pytest.fixture + def metadata(self): + m = Metadata() + m.set_url("https://instagram.com/test_user") + m.set("netloc", "instagram.com") + return m + @pytest.mark.parametrize("url,expected", [ ("https://instagram.com/user", [("", "user", "")]), ("https://instagr.am/p/post_id", []), @@ -88,7 +96,6 @@ class TestInstagramAPIExtractor(TestExtractorBase): assert self.extractor.valid_url.findall(url) == expected def test_initialize(self): - self.extractor.initialise() assert self.extractor.api_endpoint[-1] != "/" @pytest.mark.parametrize("input_dict,expected", [ @@ -98,11 +105,85 @@ class TestInstagramAPIExtractor(TestExtractorBase): def test_cleanup_dict(self, input_dict, expected): assert self.extractor.cleanup_dict(input_dict) == expected - def test_download_post(self): + def test_download(self): + pass + + def test_download_post(self, metadata, mock_user_response): # test with context=reel # test with context=post # test with multiple images # test gets text (metadata title) + pass + def test_download_profile_basic(self, metadata, mock_user_response): + """Test basic profile download without full_profile""" + with patch.object(self.extractor, 'call_api') as mock_call, \ + patch.object(self.extractor, 'download_from_url') as mock_download: + # Mock API responses + mock_call.return_value = mock_user_response + mock_download.return_value = "profile.jpg" - pass \ No newline at end of file + result = self.extractor.download_profile(metadata, "test_user") + assert result.status == "insta profile: success" + assert result.get_title() == "Test User" + assert result.get("data") == self.extractor.cleanup_dict(mock_user_response["user"]) + # Verify profile picture download + mock_call.assert_called_once_with("v2/user/by/username", {"username": "test_user"}) + mock_download.assert_called_once_with("http://example.com/profile.jpg") + assert len(result.media) == 1 + assert result.media[0].filename == "profile.jpg" + + def test_download_profile_full(self, metadata, mock_user_response, mock_story_response): + """Test full profile download with stories/posts""" + with patch.object(self.extractor, 'call_api') as mock_call, \ + patch.object(self.extractor, 'download_all_posts') as mock_posts, \ + patch.object(self.extractor, 'download_all_highlights') as mock_highlights, \ + patch.object(self.extractor, 'download_all_tagged') as mock_tagged, \ + patch.object(self.extractor, '_download_stories_reusable') as mock_stories: + + self.extractor.full_profile = True + mock_call.side_effect = [ + mock_user_response, + mock_story_response + ] + mock_highlights.return_value = None + mock_stories.return_value = mock_story_response + mock_posts.return_value = None + mock_tagged.return_value = None + + result = self.extractor.download_profile(metadata, "test_user") + assert result.get("#stories") == len(mock_story_response) + mock_posts.assert_called_once_with(result, "123") + assert "errors" not in result.metadata + + def test_download_profile_not_found(self, metadata): + """Test profile not found error""" + with patch.object(self.extractor, 'call_api') as mock_call: + mock_call.return_value = {"user": None} + with pytest.raises(AssertionError) as exc_info: + self.extractor.download_profile(metadata, "invalid_user") + assert "User invalid_user not found" in str(exc_info.value) + + def test_download_profile_error_handling(self, metadata, mock_user_response): + """Test error handling in full profile mode""" + with (patch.object(self.extractor, 'call_api') as mock_call, \ + patch.object(self.extractor, 'download_all_highlights') as mock_highlights, \ + patch.object(self.extractor, 'download_all_tagged') as mock_tagged, \ + patch.object(self.extractor, '_download_stories_reusable') as stories_tagged, \ + patch.object(self.extractor, 'download_all_posts') as mock_posts + ): + self.extractor.full_profile = True + mock_call.side_effect = [ + mock_user_response, + Exception("Stories API failed"), + Exception("Posts API failed") + ] + mock_highlights.return_value = None + mock_tagged.return_value = None + stories_tagged.return_value = None + mock_posts.return_value = None + result = self.extractor.download_profile(metadata, "test_user") + + assert result.is_success() + assert "Error downloading stories for test_user" in result.metadata["errors"] + # assert "Error downloading posts for test_user" in result.metadata["errors"] \ No newline at end of file diff --git a/tests/feeders/test_gsheet_feeder.py b/tests/feeders/test_gsheet_feeder.py index dbd2416..62380f5 100644 --- a/tests/feeders/test_gsheet_feeder.py +++ b/tests/feeders/test_gsheet_feeder.py @@ -4,7 +4,7 @@ import gspread import pytest from unittest.mock import patch, MagicMock from auto_archiver.modules.gsheet_feeder import GsheetsFeeder -from auto_archiver.core import Metadata, Feeder, ArchivingContext +from auto_archiver.core import Metadata, Feeder def test_initialise_without_sheet_and_sheet_id(setup_module): @@ -100,21 +100,21 @@ def test__process_rows(gsheet_feeder: GsheetsFeeder): def test__set_metadata(gsheet_feeder: GsheetsFeeder, worksheet): gsheet_feeder._set_context(worksheet, 1) - assert ArchivingContext.get("gsheet") == {"row": 1, "worksheet": worksheet} + assert Metadata.get_context("gsheet") == {"row": 1, "worksheet": worksheet} @pytest.mark.skip(reason="Not recognising folder column") def test__set_metadata_with_folder_pickled(gsheet_feeder: GsheetsFeeder, worksheet): gsheet_feeder._set_context(worksheet, 7) - assert ArchivingContext.get("gsheet") == {"row": 1, "worksheet": worksheet} + assert Metadata.get_context("gsheet") == {"row": 1, "worksheet": worksheet} def test__set_metadata_with_folder(gsheet_feeder: GsheetsFeeder): testworksheet = TestWorksheet() testworksheet.wks.title = "TestSheet" gsheet_feeder._set_context(testworksheet, 6) - assert ArchivingContext.get("gsheet") == {"row": 6, "worksheet": testworksheet} - assert ArchivingContext.get("folder") == "some-folder/test-auto-archiver/testsheet" + assert Metadata.get_context("gsheet") == {"row": 6, "worksheet": testworksheet} + assert Metadata.get_context("folder") == "some-folder/test-auto-archiver/testsheet" @pytest.mark.usefixtures("setup_module") diff --git a/tests/test_metadata.py b/tests/test_metadata.py new file mode 100644 index 0000000..7270c80 --- /dev/null +++ b/tests/test_metadata.py @@ -0,0 +1,161 @@ +import pytest +from datetime import datetime, timezone +from dataclasses import dataclass +from typing import Any +from auto_archiver.core.metadata import Metadata + + +@pytest.fixture +def basic_metadata(): + m = Metadata() + m.set_url("https://example.com") + m.set("title", "Test Page") + return m + + +@dataclass +class MockMedia: + filename: str = "" + mimetype: str = "" + data: dict = None + + def get(self, key: str, default: Any = None) -> Any: + return self.data.get(key, default) if self.data else default + + def set(self, key: str, value: Any) -> None: + if not self.data: + self.data = {} + self.data[key] = value + + +@pytest.fixture +def media_file(): + def _create(filename="test.txt", mimetype="text/plain", hash_value=None): + m = MockMedia(filename=filename, mimetype=mimetype) + if hash_value: + m.set("hash", hash_value) + return m + + return _create + + +def test_initial_state(): + m = Metadata() + assert m.status == "no archiver" + assert m.metadata == {"_processed_at": m.get("_processed_at")} + assert m.media == [] + assert isinstance(m.get("_processed_at"), datetime) + + +def test_url_properties(basic_metadata): + assert basic_metadata.get_url() == "https://example.com" + assert basic_metadata.netloc == "example.com" + + +def test_simple_merge(basic_metadata): + right = Metadata(status="success") + right.set("title", "Test Title") + + basic_metadata.merge(right) + assert basic_metadata.status == "success" + assert basic_metadata.get("title") == "Test Title" + + +def test_left_merge(): + left = ( + Metadata() + .set("tags", ["a"]) + .set("stats", {"views": 10}) + .set("status", "success") + ) + right = ( + Metadata() + .set("tags", ["b"]) + .set("stats", {"likes": 5}) + .set("status", "no archiver") + ) + + left.merge(right, overwrite_left=True) + assert left.get("status") == "no archiver" + assert left.get("tags") == ["a", "b"] + assert left.get("stats") == {"views": 10, "likes": 5} + + +def test_media_management(basic_metadata, media_file): + media1 = media_file(hash_value="abc") + media2 = media_file(hash_value="abc") # Duplicate + media3 = media_file(hash_value="def") + + basic_metadata.add_media(media1, "m1") + basic_metadata.add_media(media2, "m2") + basic_metadata.add_media(media3) + + assert len(basic_metadata.media) == 3 + basic_metadata.remove_duplicate_media_by_hash() + assert len(basic_metadata.media) == 2 + assert basic_metadata.get_media_by_id("m1") == media1 + + +def test_success(): + m = Metadata() + assert not m.is_success() + m.success("context") + assert m.is_success() + assert m.status == "context: success" + + +def test_is_empty(): + m = Metadata() + assert m.is_empty() + # meaningless ids + ( + m.set("url", "example.com") + .set("total_bytes", 100) + .set("archive_duration_seconds", 10) + .set("_processed_at", datetime.now(timezone.utc)) + ) + assert m.is_empty() + + +def test_store(): + pass + +# Test Media operations + + +# Test custom getter/setters + + +def test_get_set_url(): + m = Metadata() + m.set_url("http://example.com") + assert m.get_url() == "http://example.com" + with pytest.raises(AssertionError): + m.set_url("") + assert m.get("url") == "http://example.com" + + +def test_set_content(): + m = Metadata() + m.set_content("Some content") + assert m.get("content") == "Some content" + # Test appending + m.set_content("New content") + # Do we want to add a line break to the method? + assert m.get("content") == "Some contentNew content" + + +def test_choose_most_complex(): + pass + + +def test_get_context(): + m = Metadata() + m.set_context("somekey", "somevalue") + assert m.get_context("somekey") == "somevalue" + assert m.get_context("nonexistent") is None + m.set_context("anotherkey", "anothervalue") + # check the previous is retained + assert m.get_context("somekey") == "somevalue" + assert m.get_context("anotherkey") == "anothervalue" + assert len(m._context) == 2