This commit is contained in:
msramalho
2025-06-18 16:40:55 +01:00
parent 4a36e6f6b0
commit 592dc30415
3 changed files with 66 additions and 41 deletions

View File

@@ -22,7 +22,7 @@
"full_profile_max_posts": { "full_profile_max_posts": {
"default": 0, "default": 0,
"type": "int", "type": "int",
"help": "Use to limit the number of posts to download when full_profile is true. 0 means no limit. limit is applied softly since posts are fetched in batch, once to: posts, tagged posts, and highlights", "help": "Use to limit the number of posts to download when full_profile is true or when a URL for multiple posts is passed (like /stories /highlights ...). 0 means no limit. when full_profile is true the order of downloaded content is stories -> posts -> tagged posts -> highlights, so a value of 10 could download 2 stories, 7 posts, 1 tagged posts, and 0 highlights.",
}, },
"minimize_json_output": { "minimize_json_output": {
"default": True, "default": True,

View File

@@ -8,8 +8,10 @@ data, reducing JSON output size, and handling large profiles.
""" """
import math
import re import re
from datetime import datetime from datetime import datetime
import traceback
import requests import requests
from loguru import logger from loguru import logger
@@ -35,10 +37,12 @@ class InstagramAPIExtractor(Extractor):
def setup(self) -> None: def setup(self) -> None:
if self.api_endpoint[-1] == "/": if self.api_endpoint[-1] == "/":
self.api_endpoint = self.api_endpoint[:-1] self.api_endpoint = self.api_endpoint[:-1]
self.full_profile_max_posts = int(self.full_profile_max_posts or 0)
if self.full_profile_max_posts == 0:
self.full_profile_max_posts = math.inf
def download(self, item: Metadata) -> Metadata: def download(self, item: Metadata) -> Metadata:
url = item.get_url() url = item.get_url()
url.replace("instagr.com", "instagram.com").replace("instagr.am", "instagram.com") url.replace("instagr.com", "instagram.com").replace("instagr.am", "instagram.com")
insta_matches = self.valid_url.findall(url) insta_matches = self.valid_url.findall(url)
logger.info(f"{insta_matches=}") logger.info(f"{insta_matches=}")
@@ -97,57 +101,74 @@ class InstagramAPIExtractor(Extractor):
filename = self.download_from_url(pic_url) filename = self.download_from_url(pic_url)
result.add_media(Media(filename=filename), id="profile_picture") result.add_media(Media(filename=filename), id="profile_picture")
count_posts = 0
if self.full_profile: if self.full_profile:
user_id = user.get("pk") user_id = user.get("pk")
# download all stories # download all stories
try: try:
stories = self._download_stories_reusable(result, username) stories = self._download_stories_reusable(
result, username, max_to_download=self.full_profile_max_posts - count_posts
)
count_posts += len(stories)
result.set("#stories", len(stories)) result.set("#stories", len(stories))
except Exception as e: except Exception as e:
result.append("errors", f"Error downloading stories for {username}") result.append("errors", f"Error downloading stories for {username}")
logger.error(f"Error downloading stories for {username}: {e}") logger.error(f"Error downloading stories for {username}: {e} {traceback.format_exc()}")
# download all posts # download all posts
try: try:
self.download_all_posts(result, user_id) if count_posts < self.full_profile_max_posts:
count_posts += self.download_all_posts(
result, user_id, max_to_download=self.full_profile_max_posts - count_posts
)
except Exception as e: except Exception as e:
result.append("errors", f"Error downloading posts for {username}") result.append("errors", f"Error downloading posts for {username}")
logger.error(f"Error downloading posts for {username}: {e}") logger.error(f"Error downloading posts for {username}: {e} {traceback.format_exc()}")
# download all tagged # download all tagged
try: try:
self.download_all_tagged(result, user_id) if count_posts < self.full_profile_max_posts:
count_posts += self.download_all_tagged(
result, user_id, max_to_download=self.full_profile_max_posts - count_posts
)
except Exception as e: except Exception as e:
result.append("errors", f"Error downloading tagged posts for {username}") result.append("errors", f"Error downloading tagged posts for {username}")
logger.error(f"Error downloading tagged posts for {username}: {e}") logger.error(f"Error downloading tagged posts for {username}: {e} {traceback.format_exc()}")
# download all highlights # download all highlights
try: try:
self.download_all_highlights(result, username, user_id) if count_posts < self.full_profile_max_posts:
count_posts += self.download_all_highlights(
result, username, user_id, max_to_download=self.full_profile_max_posts - count_posts
)
except Exception as e: except Exception as e:
result.append("errors", f"Error downloading highlights for {username}") result.append("errors", f"Error downloading highlights for {username}")
logger.error(f"Error downloading highlights for {username}: {e}") logger.error(f"Error downloading highlights for {username}: {e} {traceback.format_exc()}")
result.set_url(url) # reset as scrape_item modifies it result.set_url(url) # reset as scrape_item modifies it
return result.success("insta profile") return result.success("insta profile")
def download_all_highlights(self, result, username, user_id): def download_all_highlights(self, result, username, user_id, max_to_download: int) -> int:
count_highlights = 0 count_highlights = 0
highlights = self.call_api("v1/user/highlights", {"user_id": user_id}) highlights = self.call_api("v1/user/highlights", {"user_id": user_id})
highlights = highlights[: min(max_to_download, len(highlights))] # newest to oldest
for h in highlights: for h in highlights:
try: try:
h_info = self._download_highlights_reusable(result, h.get("pk")) h_info = self._download_highlights_reusable(result, h.get("pk"), max_to_download=max_to_download)
count_highlights += len(h_info.get("items", [])) count_highlights += len(h_info.get("items", []))
except Exception as e: except Exception as e:
result.append( result.append(
"errors", "errors",
f"Error downloading highlight id{h.get('pk')} for {username}", f"Error downloading highlight id{h.get('pk')} for {username}",
) )
logger.error(f"Error downloading highlight id{h.get('pk')} for {username}: {e}") logger.error(
if self.full_profile_max_posts and count_highlights >= self.full_profile_max_posts: f"Error downloading highlight id{h.get('pk')} for {username}: {e} {traceback.format_exc()}"
logger.info(f"HIGHLIGHTS reached full_profile_max_posts={self.full_profile_max_posts}") )
if count_highlights >= max_to_download:
logger.debug(f"HIGHLIGHTS reached max_to_download={self.full_profile_max_posts}")
break break
result.set("#highlights", count_highlights) result.set("#highlights", count_highlights)
return count_highlights
def download_post(self, result: Metadata, code: str = None, id: str = None, context: str = None) -> Metadata: def download_post(self, result: Metadata, code: str = None, id: str = None, context: str = None) -> Metadata:
if id: if id:
@@ -166,13 +187,13 @@ class InstagramAPIExtractor(Extractor):
return result.success(f"insta {context or 'post'}") return result.success(f"insta {context or 'post'}")
def download_highlights(self, result: Metadata, id: str) -> Metadata: def download_highlights(self, result: Metadata, id: str) -> Metadata:
h_info = self._download_highlights_reusable(result, id) h_info = self._download_highlights_reusable(result, id, self.full_profile_max_posts)
items = len(h_info.get("items", [])) items = len(h_info.get("items", []))
del h_info["items"] del h_info["items"]
result.set_title(h_info.get("title")).set("data", h_info).set("#reels", items) result.set_title(h_info.get("title")).set("data", h_info).set("#reels", items)
return result.success("insta highlights") return result.success("insta highlights")
def _download_highlights_reusable(self, result: Metadata, id: str) -> dict: def _download_highlights_reusable(self, result: Metadata, id: str, max_to_download: int) -> dict:
full_h = self.call_api("v2/highlight/by/id", {"id": id}) full_h = self.call_api("v2/highlight/by/id", {"id": id})
h_info = full_h.get("response", {}).get("reels", {}).get(f"highlight:{id}") h_info = full_h.get("response", {}).get("reels", {}).get(f"highlight:{id}")
assert h_info, f"Highlight {id} not found: {full_h=}" assert h_info, f"Highlight {id} not found: {full_h=}"
@@ -182,38 +203,39 @@ class InstagramAPIExtractor(Extractor):
result.add_media(Media(filename=filename), id=f"cover_media highlight {id}") result.add_media(Media(filename=filename), id=f"cover_media highlight {id}")
items = h_info.get("items", [])[::-1] # newest to oldest items = h_info.get("items", [])[::-1] # newest to oldest
items = items[: min(max_to_download, len(items))]
for h in tqdm(items, desc="downloading highlights", unit="highlight"): for h in tqdm(items, desc="downloading highlights", unit="highlight"):
try: try:
self.scrape_item(result, h, "highlight") self.scrape_item(result, h, "highlight")
except Exception as e: except Exception as e:
result.append("errors", f"Error downloading highlight {h.get('id')}") result.append("errors", f"Error downloading highlight {h.get('id')}")
logger.error(f"Error downloading highlight, skipping {h.get('id')}: {e}") logger.error(f"Error downloading highlight, skipping {h.get('id')}: {e} {traceback.format_exc()}")
return h_info return h_info
def download_stories(self, result: Metadata, username: str) -> Metadata: def download_stories(self, result: Metadata, username: str) -> Metadata:
now = datetime.now().strftime("%Y-%m-%d_%H-%M") now = datetime.now().strftime("%Y-%m-%d_%H-%M")
stories = self._download_stories_reusable(result, username) stories = self._download_stories_reusable(result, username, max_to_download=self.full_profile_max_posts)
if stories == []: if stories == []:
return result.success("insta no story") return result.success("insta no story")
result.set_title(f"stories {username} at {now}").set("#stories", len(stories)) result.set_title(f"stories {username} at {now}").set("#stories", len(stories))
return result.success(f"insta stories {now}") return result.success(f"insta stories {now}")
def _download_stories_reusable(self, result: Metadata, username: str) -> list[dict]: def _download_stories_reusable(self, result: Metadata, username: str, max_to_download: int) -> list[dict]:
stories = self.call_api("v1/user/stories/by/username", {"username": username}) stories = self.call_api("v1/user/stories/by/username", {"username": username})
if not stories or not len(stories): if not stories or not len(stories):
return [] return []
stories = stories[::-1] # newest to oldest stories = stories[::-1][: min(max_to_download, len(stories))] # newest to oldest
for s in tqdm(stories, desc="downloading stories", unit="story"): for s in tqdm(stories, desc="downloading stories", unit="story"):
try: try:
self.scrape_item(result, s, "story") self.scrape_item(result, s, "story")
except Exception as e: except Exception as e:
result.append("errors", f"Error downloading story {s.get('id')}") result.append("errors", f"Error downloading story {s.get('id')}")
logger.error(f"Error downloading story, skipping {s.get('id')}: {e}") logger.error(f"Error downloading story, skipping {s.get('id')}: {e} {traceback.format_exc()}")
return stories return stories
def download_all_posts(self, result: Metadata, user_id: str): def download_all_posts(self, result: Metadata, user_id: str, max_to_download: int) -> int:
end_cursor = None end_cursor = None
pbar = tqdm(desc="downloading posts") pbar = tqdm(desc="downloading posts")
@@ -223,22 +245,23 @@ class InstagramAPIExtractor(Extractor):
if not posts or not isinstance(posts, list) or len(posts) != 2: if not posts or not isinstance(posts, list) or len(posts) != 2:
break break
posts, end_cursor = posts[0], posts[1] posts, end_cursor = posts[0], posts[1]
logger.info(f"parsing {len(posts)} posts, next {end_cursor=}") posts = posts[: min(max_to_download, len(posts))]
logger.info(f"parsing {len(posts)} posts, next {end_cursor=} {post_count=} {max_to_download=}")
for p in posts: for p in posts:
try: try:
self.scrape_item(result, p, "post") self.scrape_item(result, p, "post")
except Exception as e: except Exception as e:
result.append("errors", f"Error downloading post {p.get('id')}") result.append("errors", f"Error downloading post {p.get('id')}")
logger.error(f"Error downloading post, skipping {p.get('id')}: {e}") logger.error(f"Error downloading post, skipping {p.get('id')}: {e} {traceback.format_exc()}")
pbar.update(1) pbar.update(1)
post_count += 1 post_count += 1
if self.full_profile_max_posts and post_count >= self.full_profile_max_posts: if post_count >= max_to_download:
logger.info(f"POSTS reached full_profile_max_posts={self.full_profile_max_posts}") logger.info(f"POSTS reached max_to_download={self.full_profile_max_posts}")
break break
result.set("#posts", post_count) result.set("#posts", post_count)
return post_count
def download_all_tagged(self, result: Metadata, user_id: str): def download_all_tagged(self, result: Metadata, user_id: str, max_to_download: int) -> int:
next_page_id = "" next_page_id = ""
pbar = tqdm(desc="downloading tagged posts") pbar = tqdm(desc="downloading tagged posts")
@@ -251,21 +274,22 @@ class InstagramAPIExtractor(Extractor):
next_page_id = resp.get("next_page_id") next_page_id = resp.get("next_page_id")
logger.info(f"parsing {len(posts)} tagged posts, next {next_page_id=}") logger.info(f"parsing {len(posts)} tagged posts, next {next_page_id=}")
posts = posts[: min(max_to_download, len(posts))]
for p in posts: for p in posts:
try: try:
self.scrape_item(result, p, "tagged") self.scrape_item(result, p, "tagged")
except Exception as e: except Exception as e:
result.append("errors", f"Error downloading tagged post {p.get('id')}") result.append("errors", f"Error downloading tagged post {p.get('id')}")
logger.error(f"Error downloading tagged post, skipping {p.get('id')}: {e}") logger.error(f"Error downloading tagged post, skipping {p.get('id')}: {e} {traceback.format_exc()}")
pbar.update(1) pbar.update(1)
tagged_count += 1 tagged_count += 1
if self.full_profile_max_posts and tagged_count >= self.full_profile_max_posts: if tagged_count >= max_to_download:
logger.info(f"TAGS reached full_profile_max_posts={self.full_profile_max_posts}") logger.info(f"TAGS reached max_to_download={self.full_profile_max_posts}")
break break
result.set("#tagged", tagged_count) result.set("#tagged", tagged_count)
return tagged_count
### reusable parsing utils below # reusable parsing utils below
def scrape_item(self, result: Metadata, item: dict, context: str = None) -> dict: def scrape_item(self, result: Metadata, item: dict, context: str = None) -> dict:
""" """

View File

@@ -1,4 +1,5 @@
from datetime import datetime from datetime import datetime
import math
import pytest import pytest
@@ -147,14 +148,14 @@ class TestInstagramAPIExtractor(TestExtractorBase):
self.extractor.full_profile = True self.extractor.full_profile = True
mock_call.side_effect = [mock_user_response, mock_story_response] mock_call.side_effect = [mock_user_response, mock_story_response]
mock_highlights.return_value = None mock_highlights.return_value = 1
mock_stories.return_value = mock_story_response mock_stories.return_value = mock_story_response
mock_posts.return_value = None mock_posts.return_value = 2
mock_tagged.return_value = None mock_tagged.return_value = 3
result = self.extractor.download_profile(metadata, "test_user") result = self.extractor.download_profile(metadata, "test_user")
assert result.get("#stories") == len(mock_story_response) assert result.get("#stories") == len(mock_story_response)
mock_posts.assert_called_once_with(result, "123") mock_posts.assert_called_once_with(result, "123", max_to_download=math.inf)
assert "errors" not in result.metadata assert "errors" not in result.metadata
def test_download_profile_not_found(self, metadata, mocker): def test_download_profile_not_found(self, metadata, mocker):
@@ -175,10 +176,10 @@ class TestInstagramAPIExtractor(TestExtractorBase):
self.extractor.full_profile = True self.extractor.full_profile = True
mock_call.side_effect = [mock_user_response, Exception("Stories API failed"), Exception("Posts API failed")] mock_call.side_effect = [mock_user_response, Exception("Stories API failed"), Exception("Posts API failed")]
mock_highlights.return_value = None mock_highlights.return_value = 1
mock_tagged.return_value = None mock_tagged.return_value = 2
stories_tagged.return_value = None stories_tagged.return_value = None
mock_posts.return_value = None mock_posts.return_value = 4
result = self.extractor.download_profile(metadata, "test_user") result = self.extractor.download_profile(metadata, "test_user")
assert result.is_success() assert result.is_success()