diff --git a/src/auto_archiver/modules/instagram_api_extractor/__manifest__.py b/src/auto_archiver/modules/instagram_api_extractor/__manifest__.py index c40a5d8..2ecc6b9 100644 --- a/src/auto_archiver/modules/instagram_api_extractor/__manifest__.py +++ b/src/auto_archiver/modules/instagram_api_extractor/__manifest__.py @@ -22,7 +22,7 @@ "full_profile_max_posts": { "default": 0, "type": "int", - "help": "Use to limit the number of posts to download when full_profile is true. 0 means no limit. limit is applied softly since posts are fetched in batch, once to: posts, tagged posts, and highlights", + "help": "Use to limit the number of posts to download when full_profile is true or when a URL for multiple posts is passed (like /stories /highlights ...). 0 means no limit. when full_profile is true the order of downloaded content is stories -> posts -> tagged posts -> highlights, so a value of 10 could download 2 stories, 7 posts, 1 tagged posts, and 0 highlights.", }, "minimize_json_output": { "default": True, diff --git a/src/auto_archiver/modules/instagram_api_extractor/instagram_api_extractor.py b/src/auto_archiver/modules/instagram_api_extractor/instagram_api_extractor.py index 5f13ecf..1694ddc 100644 --- a/src/auto_archiver/modules/instagram_api_extractor/instagram_api_extractor.py +++ b/src/auto_archiver/modules/instagram_api_extractor/instagram_api_extractor.py @@ -8,8 +8,10 @@ data, reducing JSON output size, and handling large profiles. """ +import math import re from datetime import datetime +import traceback import requests from loguru import logger @@ -35,10 +37,12 @@ class InstagramAPIExtractor(Extractor): def setup(self) -> None: if self.api_endpoint[-1] == "/": self.api_endpoint = self.api_endpoint[:-1] + self.full_profile_max_posts = int(self.full_profile_max_posts or 0) + if self.full_profile_max_posts == 0: + self.full_profile_max_posts = math.inf def download(self, item: Metadata) -> Metadata: url = item.get_url() - url.replace("instagr.com", "instagram.com").replace("instagr.am", "instagram.com") insta_matches = self.valid_url.findall(url) logger.info(f"{insta_matches=}") @@ -97,57 +101,74 @@ class InstagramAPIExtractor(Extractor): filename = self.download_from_url(pic_url) result.add_media(Media(filename=filename), id="profile_picture") + count_posts = 0 if self.full_profile: user_id = user.get("pk") # download all stories try: - stories = self._download_stories_reusable(result, username) + stories = self._download_stories_reusable( + result, username, max_to_download=self.full_profile_max_posts - count_posts + ) + count_posts += len(stories) result.set("#stories", len(stories)) except Exception as e: result.append("errors", f"Error downloading stories for {username}") - logger.error(f"Error downloading stories for {username}: {e}") + logger.error(f"Error downloading stories for {username}: {e} {traceback.format_exc()}") # download all posts try: - self.download_all_posts(result, user_id) + if count_posts < self.full_profile_max_posts: + count_posts += self.download_all_posts( + result, user_id, max_to_download=self.full_profile_max_posts - count_posts + ) except Exception as e: result.append("errors", f"Error downloading posts for {username}") - logger.error(f"Error downloading posts for {username}: {e}") + logger.error(f"Error downloading posts for {username}: {e} {traceback.format_exc()}") # download all tagged try: - self.download_all_tagged(result, user_id) + if count_posts < self.full_profile_max_posts: + count_posts += self.download_all_tagged( + result, user_id, max_to_download=self.full_profile_max_posts - count_posts + ) except Exception as e: result.append("errors", f"Error downloading tagged posts for {username}") - logger.error(f"Error downloading tagged posts for {username}: {e}") + logger.error(f"Error downloading tagged posts for {username}: {e} {traceback.format_exc()}") # download all highlights try: - self.download_all_highlights(result, username, user_id) + if count_posts < self.full_profile_max_posts: + count_posts += self.download_all_highlights( + result, username, user_id, max_to_download=self.full_profile_max_posts - count_posts + ) except Exception as e: result.append("errors", f"Error downloading highlights for {username}") - logger.error(f"Error downloading highlights for {username}: {e}") + logger.error(f"Error downloading highlights for {username}: {e} {traceback.format_exc()}") result.set_url(url) # reset as scrape_item modifies it return result.success("insta profile") - def download_all_highlights(self, result, username, user_id): + def download_all_highlights(self, result, username, user_id, max_to_download: int) -> int: count_highlights = 0 highlights = self.call_api("v1/user/highlights", {"user_id": user_id}) + highlights = highlights[: min(max_to_download, len(highlights))] # newest to oldest for h in highlights: try: - h_info = self._download_highlights_reusable(result, h.get("pk")) + h_info = self._download_highlights_reusable(result, h.get("pk"), max_to_download=max_to_download) count_highlights += len(h_info.get("items", [])) except Exception as e: result.append( "errors", f"Error downloading highlight id{h.get('pk')} for {username}", ) - logger.error(f"Error downloading highlight id{h.get('pk')} for {username}: {e}") - if self.full_profile_max_posts and count_highlights >= self.full_profile_max_posts: - logger.info(f"HIGHLIGHTS reached full_profile_max_posts={self.full_profile_max_posts}") + logger.error( + f"Error downloading highlight id{h.get('pk')} for {username}: {e} {traceback.format_exc()}" + ) + if count_highlights >= max_to_download: + logger.debug(f"HIGHLIGHTS reached max_to_download={self.full_profile_max_posts}") break result.set("#highlights", count_highlights) + return count_highlights def download_post(self, result: Metadata, code: str = None, id: str = None, context: str = None) -> Metadata: if id: @@ -166,13 +187,13 @@ class InstagramAPIExtractor(Extractor): return result.success(f"insta {context or 'post'}") def download_highlights(self, result: Metadata, id: str) -> Metadata: - h_info = self._download_highlights_reusable(result, id) + h_info = self._download_highlights_reusable(result, id, self.full_profile_max_posts) items = len(h_info.get("items", [])) del h_info["items"] result.set_title(h_info.get("title")).set("data", h_info).set("#reels", items) return result.success("insta highlights") - def _download_highlights_reusable(self, result: Metadata, id: str) -> dict: + def _download_highlights_reusable(self, result: Metadata, id: str, max_to_download: int) -> dict: full_h = self.call_api("v2/highlight/by/id", {"id": id}) h_info = full_h.get("response", {}).get("reels", {}).get(f"highlight:{id}") assert h_info, f"Highlight {id} not found: {full_h=}" @@ -182,38 +203,39 @@ class InstagramAPIExtractor(Extractor): result.add_media(Media(filename=filename), id=f"cover_media highlight {id}") items = h_info.get("items", [])[::-1] # newest to oldest + items = items[: min(max_to_download, len(items))] for h in tqdm(items, desc="downloading highlights", unit="highlight"): try: self.scrape_item(result, h, "highlight") except Exception as e: result.append("errors", f"Error downloading highlight {h.get('id')}") - logger.error(f"Error downloading highlight, skipping {h.get('id')}: {e}") + logger.error(f"Error downloading highlight, skipping {h.get('id')}: {e} {traceback.format_exc()}") return h_info def download_stories(self, result: Metadata, username: str) -> Metadata: now = datetime.now().strftime("%Y-%m-%d_%H-%M") - stories = self._download_stories_reusable(result, username) + stories = self._download_stories_reusable(result, username, max_to_download=self.full_profile_max_posts) if stories == []: return result.success("insta no story") result.set_title(f"stories {username} at {now}").set("#stories", len(stories)) return result.success(f"insta stories {now}") - def _download_stories_reusable(self, result: Metadata, username: str) -> list[dict]: + def _download_stories_reusable(self, result: Metadata, username: str, max_to_download: int) -> list[dict]: stories = self.call_api("v1/user/stories/by/username", {"username": username}) if not stories or not len(stories): return [] - stories = stories[::-1] # newest to oldest + stories = stories[::-1][: min(max_to_download, len(stories))] # newest to oldest for s in tqdm(stories, desc="downloading stories", unit="story"): try: self.scrape_item(result, s, "story") except Exception as e: result.append("errors", f"Error downloading story {s.get('id')}") - logger.error(f"Error downloading story, skipping {s.get('id')}: {e}") + logger.error(f"Error downloading story, skipping {s.get('id')}: {e} {traceback.format_exc()}") return stories - def download_all_posts(self, result: Metadata, user_id: str): + def download_all_posts(self, result: Metadata, user_id: str, max_to_download: int) -> int: end_cursor = None pbar = tqdm(desc="downloading posts") @@ -223,22 +245,23 @@ class InstagramAPIExtractor(Extractor): if not posts or not isinstance(posts, list) or len(posts) != 2: break posts, end_cursor = posts[0], posts[1] - logger.info(f"parsing {len(posts)} posts, next {end_cursor=}") - + posts = posts[: min(max_to_download, len(posts))] + logger.info(f"parsing {len(posts)} posts, next {end_cursor=} {post_count=} {max_to_download=}") for p in posts: try: self.scrape_item(result, p, "post") except Exception as e: result.append("errors", f"Error downloading post {p.get('id')}") - logger.error(f"Error downloading post, skipping {p.get('id')}: {e}") + logger.error(f"Error downloading post, skipping {p.get('id')}: {e} {traceback.format_exc()}") pbar.update(1) post_count += 1 - if self.full_profile_max_posts and post_count >= self.full_profile_max_posts: - logger.info(f"POSTS reached full_profile_max_posts={self.full_profile_max_posts}") + if post_count >= max_to_download: + logger.info(f"POSTS reached max_to_download={self.full_profile_max_posts}") break result.set("#posts", post_count) + return post_count - def download_all_tagged(self, result: Metadata, user_id: str): + def download_all_tagged(self, result: Metadata, user_id: str, max_to_download: int) -> int: next_page_id = "" pbar = tqdm(desc="downloading tagged posts") @@ -251,21 +274,22 @@ class InstagramAPIExtractor(Extractor): next_page_id = resp.get("next_page_id") logger.info(f"parsing {len(posts)} tagged posts, next {next_page_id=}") - + posts = posts[: min(max_to_download, len(posts))] for p in posts: try: self.scrape_item(result, p, "tagged") except Exception as e: result.append("errors", f"Error downloading tagged post {p.get('id')}") - logger.error(f"Error downloading tagged post, skipping {p.get('id')}: {e}") + logger.error(f"Error downloading tagged post, skipping {p.get('id')}: {e} {traceback.format_exc()}") pbar.update(1) tagged_count += 1 - if self.full_profile_max_posts and tagged_count >= self.full_profile_max_posts: - logger.info(f"TAGS reached full_profile_max_posts={self.full_profile_max_posts}") + if tagged_count >= max_to_download: + logger.info(f"TAGS reached max_to_download={self.full_profile_max_posts}") break result.set("#tagged", tagged_count) + return tagged_count - ### reusable parsing utils below + # reusable parsing utils below def scrape_item(self, result: Metadata, item: dict, context: str = None) -> dict: """ diff --git a/tests/extractors/test_instagram_api_extractor.py b/tests/extractors/test_instagram_api_extractor.py index d8a1cc0..8463c49 100644 --- a/tests/extractors/test_instagram_api_extractor.py +++ b/tests/extractors/test_instagram_api_extractor.py @@ -1,4 +1,5 @@ from datetime import datetime +import math import pytest @@ -147,14 +148,14 @@ class TestInstagramAPIExtractor(TestExtractorBase): self.extractor.full_profile = True mock_call.side_effect = [mock_user_response, mock_story_response] - mock_highlights.return_value = None + mock_highlights.return_value = 1 mock_stories.return_value = mock_story_response - mock_posts.return_value = None - mock_tagged.return_value = None + mock_posts.return_value = 2 + mock_tagged.return_value = 3 result = self.extractor.download_profile(metadata, "test_user") assert result.get("#stories") == len(mock_story_response) - mock_posts.assert_called_once_with(result, "123") + mock_posts.assert_called_once_with(result, "123", max_to_download=math.inf) assert "errors" not in result.metadata def test_download_profile_not_found(self, metadata, mocker): @@ -175,10 +176,10 @@ class TestInstagramAPIExtractor(TestExtractorBase): self.extractor.full_profile = True mock_call.side_effect = [mock_user_response, Exception("Stories API failed"), Exception("Posts API failed")] - mock_highlights.return_value = None - mock_tagged.return_value = None + mock_highlights.return_value = 1 + mock_tagged.return_value = 2 stories_tagged.return_value = None - mock_posts.return_value = None + mock_posts.return_value = 4 result = self.extractor.download_profile(metadata, "test_user") assert result.is_success()