From f0158ffd9c1cc040bdbf875a11eeae7d0f69f88f Mon Sep 17 00:00:00 2001 From: msramalho <19508417+msramalho@users.noreply.github.com> Date: Fri, 23 Feb 2024 14:08:17 +0000 Subject: [PATCH] adds tagged posts and better parsing --- .../archivers/instagram_api_archiver.py | 47 +++++++++++++++---- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/src/auto_archiver/archivers/instagram_api_archiver.py b/src/auto_archiver/archivers/instagram_api_archiver.py index edde209..ea64ca7 100644 --- a/src/auto_archiver/archivers/instagram_api_archiver.py +++ b/src/auto_archiver/archivers/instagram_api_archiver.py @@ -73,9 +73,9 @@ class InstagramAPIArchiver(Archiver): if type(d) == list: return [self.cleanup_dict(v) for v in d] if type(d) != dict: return d return { - k: self.cleanup_dict(v) if type(v) in [dict, list] else v + k: clean_v for k, v in d.items() - if v not in [0.0, 0, [], {}, "", None, "null"] and + if (clean_v := self.cleanup_dict(v)) not in [0.0, 0, [], {}, "", None, "null"] and k not in ["x", "y", "width", "height"] } @@ -93,9 +93,6 @@ class InstagramAPIArchiver(Archiver): if self.full_profile: user_id = user.get("pk") - # download all posts - self.download_all_posts(result, user_id) - # download all stories try: stories = self._download_stories_reusable(result, username) @@ -104,6 +101,12 @@ class InstagramAPIArchiver(Archiver): result.append("errors", f"Error downloading stories for {username}") logger.error(f"Error downloading stories for {username}: {e}") + # download all posts + self.download_all_posts(result, user_id) + + # download all tagged + self.download_all_tagged(result, user_id) + # download all highlights try: count_highlights = 0 @@ -120,6 +123,7 @@ class InstagramAPIArchiver(Archiver): result.append("errors", f"Error downloading highlights for {username}") logger.error(f"Error downloading highlights for {username}: {e}") + result.set_url(url) # reset as scrape_item modifies it return result.success("insta profile") @@ -200,6 +204,28 @@ class InstagramAPIArchiver(Archiver): pbar.update(1) post_count+=1 result.set("#posts", post_count) + + def download_all_tagged(self, result: Metadata, user_id: str): + next_page_id = "" + pbar = tqdm(desc="downloading tagged posts") + + tagged_count = 0 + while next_page_id != None: + resp = self.call_api(f"v2/user/tag/medias", {"user_id": user_id, "page_id": next_page_id}) + posts = resp.get("response", {}).get("items", []) + if not len(posts): break + next_page_id = resp.get("next_page_id") + + logger.info(f"parsing {len(posts)} tagged posts, next {next_page_id=}") + + for p in posts: + try: self.scrape_item(result, p, "tagged") + except Exception as e: + result.append("errors", f"Error downloading tagged post {p.get('id')}") + logger.error(f"Error downloading tagged post, skipping {p.get('id')}: {e}") + pbar.update(1) + tagged_count+=1 + result.set("#tagged", tagged_count) ### reusable parsing utils below @@ -220,7 +246,7 @@ class InstagramAPIArchiver(Archiver): if code := item.get("code"): result.set("url", f"https://www.instagram.com/p/{code}/") - resources = item.get("resources", []) + resources = item.get("resources", item.get("carousel_media", [])) item, media, media_id = self.scrape_media(item, context) # if resources are present take the main media from the first resource if not media and len(resources): @@ -242,7 +268,7 @@ class InstagramAPIArchiver(Archiver): def scrape_media(self, item: dict, context:str) -> tuple[dict, Media, str]: # remove unnecessary info if self.minimize_json_output: - for k in ["image_versions", "video_versions", "video_dash_manifest"]: + for k in ["image_versions", "video_versions", "video_dash_manifest", "image_versions2", "video_versions2"]: if k in item: del item[k] item = self.cleanup_dict(item) @@ -253,19 +279,24 @@ class InstagramAPIArchiver(Archiver): # retrieve video info best_id = item.get('id', item.get('pk')) - taken_at = item.get("taken_at") + taken_at = item.get("taken_at", item.get("taken_at_ts")) code = item.get("code") + caption_text = item.get("caption_text") + if "carousel_media" in item: del item["carousel_media"] + if video_url := item.get("video_url"): filename = self.download_from_url(video_url, verbose=False) video_media = Media(filename=filename) if taken_at: video_media.set("date", taken_at) if code: video_media.set("url", f"https://www.instagram.com/p/{code}") + if caption_text: video_media.set("text", caption_text) video_media.set("preview", [image_media]) video_media.set("data", [item]) return item, video_media, f"{context or 'video'} {best_id}" elif image_media: if taken_at: image_media.set("date", taken_at) if code: image_media.set("url", f"https://www.instagram.com/p/{code}") + if caption_text: image_media.set("text", caption_text) image_media.set("data", [item]) return item, image_media, f"{context or 'image'} {best_id}"