Create manifest files for archiver modules.

This commit is contained in:
erinhmclark
2025-01-21 22:29:50 +00:00
committed by Patrick Robertson
parent 4830f99300
commit 7b3a1468cd
23 changed files with 467 additions and 129 deletions

View File

@@ -0,0 +1,30 @@
{
"name": "Instagram API Archiver",
"type": ["extractor"],
"entry_point": "instagram_api_archiver:InstagramApiArchiver",
"depends": ["core"],
"external_dependencies":
{"python": ["requests",
"loguru",
"retrying",
"tqdm",],
},
"no_setup_required": False,
"configs": {
"access_token": {"default": None, "help": "a valid instagrapi-api token"},
"api_endpoint": {"default": None, "help": "API endpoint to use"},
"full_profile": {
"default": False,
"help": "if true, will download all posts, tagged posts, stories, and highlights for a profile, if false, will only download the profile pic and information.",
},
"full_profile_max_posts": {
"default": 0,
"help": "Use to limit the number of posts to download when full_profile is true. 0 means no limit. limit is applied softly since posts are fetched in batch, once to: posts, tagged posts, and highlights",
},
"minimize_json_output": {
"default": True,
"help": "if true, will remove empty values from the json output",
},
},
"description": "",
}

View File

@@ -0,0 +1,426 @@
"""
The `instagram_api_archiver` module provides tools for archiving various types of Instagram content
using the [Instagrapi API](https://github.com/subzeroid/instagrapi).
Connects to an Instagrapi API deployment and allows for downloading Instagram user profiles,
posts, stories, highlights, and tagged content. It offers advanced configuration options for filtering
data, reducing JSON output size, and handling large profiles.
"""
import re
from datetime import datetime
import requests
from loguru import logger
from retrying import retry
from tqdm import tqdm
from auto_archiver.archivers import Archiver
from auto_archiver.core import Media
from auto_archiver.core import Metadata
class InstagramAPIArchiver(Archiver):
"""
Uses an https://github.com/subzeroid/instagrapi API deployment to fetch instagram posts data
# TODO: improvement collect aggregates of locations[0].location and mentions for all posts
"""
name = "instagram_api_archiver"
global_pattern = re.compile(
r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com)\/(stories(?:\/highlights)?|p|reel)?\/?([^\/\?]*)\/?(\d+)?"
)
def __init__(self, config: dict) -> None:
super().__init__(config)
self.assert_valid_string("access_token")
self.assert_valid_string("api_endpoint")
self.full_profile_max_posts = int(self.full_profile_max_posts)
if self.api_endpoint[-1] == "/":
self.api_endpoint = self.api_endpoint[:-1]
self.full_profile = bool(self.full_profile)
self.minimize_json_output = bool(self.minimize_json_output)
@staticmethod
def configs() -> dict:
return {
"access_token": {"default": None, "help": "a valid instagrapi-api token"},
"api_endpoint": {"default": None, "help": "API endpoint to use"},
"full_profile": {
"default": False,
"help": "if true, will download all posts, tagged posts, stories, and highlights for a profile, if false, will only download the profile pic and information.",
},
"full_profile_max_posts": {
"default": 0,
"help": "Use to limit the number of posts to download when full_profile is true. 0 means no limit. limit is applied softly since posts are fetched in batch, once to: posts, tagged posts, and highlights",
},
"minimize_json_output": {
"default": True,
"help": "if true, will remove empty values from the json output",
},
}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
url.replace("instagr.com", "instagram.com").replace(
"instagr.am", "instagram.com"
)
insta_matches = self.global_pattern.findall(url)
logger.info(f"{insta_matches=}")
if not len(insta_matches) or len(insta_matches[0]) != 3:
return
if len(insta_matches) > 1:
logger.warning(
f"Multiple instagram matches found in {url=}, using the first one"
)
return
g1, g2, g3 = insta_matches[0][0], insta_matches[0][1], insta_matches[0][2]
if g1 == "":
return self.download_profile(item, g2)
elif g1 == "p":
return self.download_post(item, g2, context="post")
elif g1 == "reel":
return self.download_post(item, g2, context="reel")
elif g1 == "stories/highlights":
return self.download_highlights(item, g2)
elif g1 == "stories":
if len(g3):
return self.download_post(item, id=g3, context="story")
return self.download_stories(item, g2)
else:
logger.warning(f"Unknown instagram regex group match {g1=} found in {url=}")
return
@retry(wait_random_min=1000, wait_random_max=3000, stop_max_attempt_number=5)
def call_api(self, path: str, params: dict) -> dict:
headers = {"accept": "application/json", "x-access-key": self.access_token}
logger.debug(f"calling {self.api_endpoint}/{path} with {params=}")
return requests.get(
f"{self.api_endpoint}/{path}", headers=headers, params=params
).json()
def cleanup_dict(self, d: dict | list) -> dict:
# repeats 3 times to remove nested empty values
if not self.minimize_json_output:
return d
if type(d) == list:
return [self.cleanup_dict(v) for v in d]
if type(d) != dict:
return d
return {
k: clean_v
for k, v in d.items()
if (clean_v := self.cleanup_dict(v))
not in [0.0, 0, [], {}, "", None, "null"]
and k not in ["x", "y", "width", "height"]
}
def download_profile(self, result: Metadata, username: str) -> Metadata:
# download basic profile info
url = result.get_url()
user = self.call_api("v2/user/by/username", {"username": username}).get("user")
assert user, f"User {username} not found"
user = self.cleanup_dict(user)
result.set_title(user.get("full_name", username)).set("data", user)
if pic_url := user.get("profile_pic_url_hd", user.get("profile_pic_url")):
filename = self.download_from_url(pic_url)
result.add_media(Media(filename=filename), id=f"profile_picture")
if self.full_profile:
user_id = user.get("pk")
# download all stories
try:
stories = self._download_stories_reusable(result, username)
result.set("#stories", len(stories))
except Exception as e:
result.append("errors", f"Error downloading stories for {username}")
logger.error(f"Error downloading stories for {username}: {e}")
# download all posts
try:
self.download_all_posts(result, user_id)
except Exception as e:
result.append("errors", f"Error downloading posts for {username}")
logger.error(f"Error downloading posts for {username}: {e}")
# download all tagged
try:
self.download_all_tagged(result, user_id)
except Exception as e:
result.append(
"errors", f"Error downloading tagged posts for {username}"
)
logger.error(f"Error downloading tagged posts for {username}: {e}")
# download all highlights
try:
self.download_all_highlights(result, username, user_id)
except Exception as e:
result.append("errors", f"Error downloading highlights for {username}")
logger.error(f"Error downloading highlights for {username}: {e}")
result.set_url(url) # reset as scrape_item modifies it
return result.success("insta profile")
def download_all_highlights(self, result, username, user_id):
count_highlights = 0
highlights = self.call_api(f"v1/user/highlights", {"user_id": user_id})
for h in highlights:
try:
h_info = self._download_highlights_reusable(result, h.get("pk"))
count_highlights += len(h_info.get("items", []))
except Exception as e:
result.append(
"errors",
f"Error downloading highlight id{h.get('pk')} for {username}",
)
logger.error(
f"Error downloading highlight id{h.get('pk')} for {username}: {e}"
)
if (
self.full_profile_max_posts
and count_highlights >= self.full_profile_max_posts
):
logger.info(
f"HIGHLIGHTS reached full_profile_max_posts={self.full_profile_max_posts}"
)
break
result.set("#highlights", count_highlights)
def download_post(
self, result: Metadata, code: str = None, id: str = None, context: str = None
) -> Metadata:
if id:
post = self.call_api(f"v1/media/by/id", {"id": id})
else:
post = self.call_api(f"v1/media/by/code", {"code": code})
assert post, f"Post {id or code} not found"
if caption_text := post.get("caption_text"):
result.set_title(caption_text)
post = self.scrape_item(result, post, context)
if post.get("taken_at"):
result.set_timestamp(post.get("taken_at"))
return result.success(f"insta {context or 'post'}")
def download_highlights(self, result: Metadata, id: str) -> Metadata:
h_info = self._download_highlights_reusable(result, id)
items = len(h_info.get("items", []))
del h_info["items"]
result.set_title(h_info.get("title")).set("data", h_info).set("#reels", items)
return result.success("insta highlights")
def _download_highlights_reusable(self, result: Metadata, id: str) -> dict:
full_h = self.call_api(f"v2/highlight/by/id", {"id": id})
h_info = full_h.get("response", {}).get("reels", {}).get(f"highlight:{id}")
assert h_info, f"Highlight {id} not found: {full_h=}"
if (
cover_media := h_info.get("cover_media", {})
.get("cropped_image_version", {})
.get("url")
):
filename = self.download_from_url(cover_media)
result.add_media(Media(filename=filename), id=f"cover_media highlight {id}")
items = h_info.get("items", [])[::-1] # newest to oldest
for h in tqdm(items, desc="downloading highlights", unit="highlight"):
try:
self.scrape_item(result, h, "highlight")
except Exception as e:
result.append("errors", f"Error downloading highlight {h.get('id')}")
logger.error(
f"Error downloading highlight, skipping {h.get('id')}: {e}"
)
return h_info
def download_stories(self, result: Metadata, username: str) -> Metadata:
now = datetime.now().strftime("%Y-%m-%d_%H-%M")
stories = self._download_stories_reusable(result, username)
if stories == []:
return result.success("insta no story")
result.set_title(f"stories {username} at {now}").set("#stories", len(stories))
return result.success(f"insta stories {now}")
def _download_stories_reusable(self, result: Metadata, username: str) -> list[dict]:
stories = self.call_api(f"v1/user/stories/by/username", {"username": username})
if not stories or not len(stories):
return []
stories = stories[::-1] # newest to oldest
for s in tqdm(stories, desc="downloading stories", unit="story"):
try:
self.scrape_item(result, s, "story")
except Exception as e:
result.append("errors", f"Error downloading story {s.get('id')}")
logger.error(f"Error downloading story, skipping {s.get('id')}: {e}")
return stories
def download_all_posts(self, result: Metadata, user_id: str):
end_cursor = None
pbar = tqdm(desc="downloading posts")
post_count = 0
while end_cursor != "":
posts = self.call_api(
f"v1/user/medias/chunk", {"user_id": user_id, "end_cursor": end_cursor}
)
if not len(posts) or not type(posts) == list or len(posts) != 2:
break
posts, end_cursor = posts[0], posts[1]
logger.info(f"parsing {len(posts)} posts, next {end_cursor=}")
for p in posts:
try:
self.scrape_item(result, p, "post")
except Exception as e:
result.append("errors", f"Error downloading post {p.get('id')}")
logger.error(f"Error downloading post, skipping {p.get('id')}: {e}")
pbar.update(1)
post_count += 1
if (
self.full_profile_max_posts
and post_count >= self.full_profile_max_posts
):
logger.info(
f"POSTS reached full_profile_max_posts={self.full_profile_max_posts}"
)
break
result.set("#posts", post_count)
def download_all_tagged(self, result: Metadata, user_id: str):
next_page_id = ""
pbar = tqdm(desc="downloading tagged posts")
tagged_count = 0
while next_page_id != None:
resp = self.call_api(
f"v2/user/tag/medias", {"user_id": user_id, "page_id": next_page_id}
)
posts = resp.get("response", {}).get("items", [])
if not len(posts):
break
next_page_id = resp.get("next_page_id")
logger.info(f"parsing {len(posts)} tagged posts, next {next_page_id=}")
for p in posts:
try:
self.scrape_item(result, p, "tagged")
except Exception as e:
result.append(
"errors", f"Error downloading tagged post {p.get('id')}"
)
logger.error(
f"Error downloading tagged post, skipping {p.get('id')}: {e}"
)
pbar.update(1)
tagged_count += 1
if (
self.full_profile_max_posts
and tagged_count >= self.full_profile_max_posts
):
logger.info(
f"TAGS reached full_profile_max_posts={self.full_profile_max_posts}"
)
break
result.set("#tagged", tagged_count)
### reusable parsing utils below
def scrape_item(self, result: Metadata, item: dict, context: str = None) -> dict:
"""
receives a Metadata and an API dict response
fetches the media and adds it to the Metadata
cleans and returns the API dict
context can be used to give specific id prefixes to media
"""
if "clips_metadata" in item:
if reusable_text := item.get("clips_metadata", {}).get(
"reusable_text_attribute_string"
):
item["clips_metadata_text"] = reusable_text
if self.minimize_json_output:
del item["clips_metadata"]
if code := item.get("code") and not result.get("url"):
result.set_url(f"https://www.instagram.com/p/{code}/")
resources = item.get("resources", item.get("carousel_media", []))
item, media, media_id = self.scrape_media(item, context)
# if resources are present take the main media from the first resource
if not media and len(resources):
_, media, media_id = self.scrape_media(resources[0], context)
resources = resources[1:]
assert media, f"Image/video not found in {item=}"
# posts with multiple items contain a resources list
resources_metadata = Metadata()
for r in resources:
self.scrape_item(resources_metadata, r)
if not resources_metadata.is_empty():
media.set("other media", resources_metadata.media)
result.add_media(media, id=media_id)
return item
def scrape_media(self, item: dict, context: str) -> tuple[dict, Media, str]:
# remove unnecessary info
if self.minimize_json_output:
for k in [
"image_versions",
"video_versions",
"video_dash_manifest",
"image_versions2",
"video_versions2",
]:
if k in item:
del item[k]
item = self.cleanup_dict(item)
image_media = None
if image_url := item.get("thumbnail_url"):
filename = self.download_from_url(image_url, verbose=False)
image_media = Media(filename=filename)
# retrieve video info
best_id = item.get("id", item.get("pk"))
taken_at = item.get("taken_at", item.get("taken_at_ts"))
code = item.get("code")
caption_text = item.get("caption_text")
if "carousel_media" in item:
del item["carousel_media"]
if video_url := item.get("video_url"):
filename = self.download_from_url(video_url, verbose=False)
video_media = Media(filename=filename)
if taken_at:
video_media.set("date", taken_at)
if code:
video_media.set("url", f"https://www.instagram.com/p/{code}")
if caption_text:
video_media.set("text", caption_text)
video_media.set("preview", [image_media])
video_media.set("data", [item])
return item, video_media, f"{context or 'video'} {best_id}"
elif image_media:
if taken_at:
image_media.set("date", taken_at)
if code:
image_media.set("url", f"https://www.instagram.com/p/{code}")
if caption_text:
image_media.set("text", caption_text)
image_media.set("data", [item])
return item, image_media, f"{context or 'image'} {best_id}"
return item, None, None

View File

@@ -0,0 +1,33 @@
{
"name": "Instagram Archiver",
"type": ["extractor"],
"entry_point": "instagram_archiver:InstagramArchiver",
"depends": ["core"],
"external_dependencies": {
"python": ["instaloader",
"loguru",],
},
"no_setup_required": False,
"configs": {
"username": {"default": None, "help": "a valid Instagram username"},
"password": {
"default": None,
"help": "the corresponding Instagram account password",
},
"download_folder": {
"default": "instaloader",
"help": "name of a folder to temporarily download content to",
},
"session_file": {
"default": "secrets/instaloader.session",
"help": "path to the instagram session which saves session credentials",
},
# TODO: fine-grain
# "download_stories": {"default": True, "help": "if the link is to a user profile: whether to get stories information"},
},
"description": """Uses the Instaloader library to download content from Instagram. This class handles both individual posts
and user profiles, downloading as much information as possible, including images, videos, text, stories,
highlights, and tagged posts. Authentication is required via username/password or a session file.
""",
}

View File

@@ -0,0 +1,148 @@
""" Uses the Instaloader library to download content from Instagram. This class handles both individual posts
and user profiles, downloading as much information as possible, including images, videos, text, stories,
highlights, and tagged posts. Authentication is required via username/password or a session file.
"""
import re, os, shutil, traceback
import instaloader # https://instaloader.github.io/as-module.html
from loguru import logger
from auto_archiver.archivers import Archiver
from auto_archiver.core import Metadata
from auto_archiver.core import Media
class InstagramArchiver(Archiver):
"""
Uses Instaloader to download either a post (inc images, videos, text) or as much as possible from a profile (posts, stories, highlights, ...)
"""
name = "instagram_archiver"
# NB: post regex should be tested before profile
# https://regex101.com/r/MGPquX/1
post_pattern = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/(?:p|reel)\/(\w+)")
# https://regex101.com/r/6Wbsxa/1
profile_pattern = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/(\w+)")
# TODO: links to stories
def __init__(self, config: dict) -> None:
super().__init__(config)
# TODO: refactor how configuration validation is done
self.assert_valid_string("username")
self.assert_valid_string("password")
self.assert_valid_string("download_folder")
self.assert_valid_string("session_file")
self.insta = instaloader.Instaloader(
download_geotags=True, download_comments=True, compress_json=False, dirname_pattern=self.download_folder, filename_pattern="{date_utc}_UTC_{target}__{typename}"
)
try:
self.insta.load_session_from_file(self.username, self.session_file)
except Exception as e:
logger.error(f"Unable to login from session file: {e}\n{traceback.format_exc()}")
try:
self.insta.login(self.username, config.instagram_self.password)
# TODO: wait for this issue to be fixed https://github.com/instaloader/instaloader/issues/1758
self.insta.save_session_to_file(self.session_file)
except Exception as e2:
logger.error(f"Unable to finish login (retrying from file): {e2}\n{traceback.format_exc()}")
@staticmethod
def configs() -> dict:
return {
"username": {"default": None, "help": "a valid Instagram username"},
"password": {"default": None, "help": "the corresponding Instagram account password"},
"download_folder": {"default": "instaloader", "help": "name of a folder to temporarily download content to"},
"session_file": {"default": "secrets/instaloader.session", "help": "path to the instagram session which saves session credentials"},
#TODO: fine-grain
# "download_stories": {"default": True, "help": "if the link is to a user profile: whether to get stories information"},
}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
# detect URLs that we definitely cannot handle
post_matches = self.post_pattern.findall(url)
profile_matches = self.profile_pattern.findall(url)
# return if not a valid instagram link
if not len(post_matches) and not len(profile_matches): return
result = None
try:
os.makedirs(self.download_folder, exist_ok=True)
# process if post
if len(post_matches):
result = self.download_post(url, post_matches[0])
# process if profile
elif len(profile_matches):
result = self.download_profile(url, profile_matches[0])
except Exception as e:
logger.error(f"Failed to download with instagram archiver due to: {e}, make sure your account credentials are valid.")
finally:
shutil.rmtree(self.download_folder, ignore_errors=True)
return result
def download_post(self, url: str, post_id: str) -> Metadata:
logger.debug(f"Instagram {post_id=} detected in {url=}")
post = instaloader.Post.from_shortcode(self.insta.context, post_id)
if self.insta.download_post(post, target=post.owner_username):
return self.process_downloads(url, post.title, post._asdict(), post.date)
def download_profile(self, url: str, username: str) -> Metadata:
# gets posts, posts where username is tagged, igtv postss, stories, and highlights
logger.debug(f"Instagram {username=} detected in {url=}")
profile = instaloader.Profile.from_username(self.insta.context, username)
try:
for post in profile.get_posts():
try: self.insta.download_post(post, target=f"profile_post_{post.owner_username}")
except Exception as e: logger.error(f"Failed to download post: {post.shortcode}: {e}")
except Exception as e: logger.error(f"Failed profile.get_posts: {e}")
try:
for post in profile.get_tagged_posts():
try: self.insta.download_post(post, target=f"tagged_post_{post.owner_username}")
except Exception as e: logger.error(f"Failed to download tagged post: {post.shortcode}: {e}")
except Exception as e: logger.error(f"Failed profile.get_tagged_posts: {e}")
try:
for post in profile.get_igtv_posts():
try: self.insta.download_post(post, target=f"igtv_post_{post.owner_username}")
except Exception as e: logger.error(f"Failed to download igtv post: {post.shortcode}: {e}")
except Exception as e: logger.error(f"Failed profile.get_igtv_posts: {e}")
try:
for story in self.insta.get_stories([profile.userid]):
for item in story.get_items():
try: self.insta.download_storyitem(item, target=f"story_item_{story.owner_username}")
except Exception as e: logger.error(f"Failed to download story item: {item}: {e}")
except Exception as e: logger.error(f"Failed get_stories: {e}")
try:
for highlight in self.insta.get_highlights(profile.userid):
for item in highlight.get_items():
try: self.insta.download_storyitem(item, target=f"highlight_item_{highlight.owner_username}")
except Exception as e: logger.error(f"Failed to download highlight item: {item}: {e}")
except Exception as e: logger.error(f"Failed get_highlights: {e}")
return self.process_downloads(url, f"@{username}", profile._asdict(), None)
def process_downloads(self, url, title, content, date):
result = Metadata()
result.set_title(title).set_content(str(content)).set_timestamp(date)
try:
all_media = []
for f in os.listdir(self.download_folder):
if os.path.isfile((filename := os.path.join(self.download_folder, f))):
if filename[-4:] == ".txt": continue
all_media.append(Media(filename))
assert len(all_media) > 1, "No uploaded media found"
all_media.sort(key=lambda m: m.filename, reverse=True)
for m in all_media:
result.add_media(m)
return result.success("instagram")
except Exception as e:
logger.error(f"Could not fetch instagram post {url} due to: {e}")

View File

@@ -0,0 +1,35 @@
{
"name": "Instagram Telegram Bot Archiver",
"type": ["extractor"],
"entry_point": "instagram_tbot_archiver:InstagramTbotArchiver",
"depends": ["core", "utils"],
"external_dependencies": {"python": ["loguru",
"telethon",],
},
"requires_setup": True,
"configs": {
"api_id": {"default": None, "help": "telegram API_ID value, go to https://my.telegram.org/apps"},
"api_hash": {"default": None, "help": "telegram API_HASH value, go to https://my.telegram.org/apps"},
"session_file": {"default": "secrets/anon-insta", "help": "optional, records the telegram login session for future usage, '.session' will be appended to the provided value."},
"timeout": {"default": 45, "help": "timeout to fetch the instagram content in seconds."},
},
"description": """
The `InstagramTbotArchiver` module uses a Telegram bot (`instagram_load_bot`) to fetch and archive Instagram content,
such as posts and stories. It leverages the Telethon library to interact with the Telegram API, sending Instagram URLs
to the bot and downloading the resulting media and metadata. The downloaded content is stored as `Media` objects and
returned as part of a `Metadata` object.
### Features
- Supports archiving Instagram posts and stories through the Telegram bot.
- Downloads and saves media files (e.g., images, videos) in a temporary directory.
- Captures and returns metadata, including titles and descriptions, as a `Metadata` object.
- Automatically manages Telegram session files for secure access.
### Setup
To use the `InstagramTbotArchiver`, you need to provide the following configuration settings:
- **API ID and Hash**: Telegram API credentials obtained from [my.telegram.org/apps](https://my.telegram.org/apps).
- **Session File**: Optional path to store the Telegram session file for future use.
""",
}

View File

@@ -0,0 +1,107 @@
"""
InstagramTbotArchiver Module
This module provides functionality to archive Instagram content (posts, stories, etc.) using a Telegram bot (`instagram_load_bot`).
It interacts with the Telegram API via the Telethon library to send Instagram URLs to the bot, which retrieves the
relevant media and metadata. The fetched content is saved as `Media` objects in a temporary directory and returned as a
`Metadata` object.
"""
import os
import shutil
import time
from sqlite3 import OperationalError
from loguru import logger
from telethon.sync import TelegramClient
from auto_archiver.archivers import Archiver
from auto_archiver.core import Metadata, Media, ArchivingContext
from auto_archiver.utils import random_str
class InstagramTbotArchiver(Archiver):
"""
calls a telegram bot to fetch instagram posts/stories... and gets available media from it
https://github.com/adw0rd/instagrapi
https://t.me/instagram_load_bot
"""
name = "instagram_tbot_archiver"
def __init__(self, config: dict) -> None:
super().__init__(config)
self.assert_valid_string("api_id")
self.assert_valid_string("api_hash")
self.timeout = int(self.timeout)
@staticmethod
def configs() -> dict:
return {
"api_id": {"default": None, "help": "telegram API_ID value, go to https://my.telegram.org/apps"},
"api_hash": {"default": None, "help": "telegram API_HASH value, go to https://my.telegram.org/apps"},
"session_file": {"default": "secrets/anon-insta", "help": "optional, records the telegram login session for future usage, '.session' will be appended to the provided value."},
"timeout": {"default": 45, "help": "timeout to fetch the instagram content in seconds."},
}
def setup(self) -> None:
"""
1. makes a copy of session_file that is removed in cleanup
2. checks if the session file is valid
"""
logger.info(f"SETUP {self.name} checking login...")
# make a copy of the session that is used exclusively with this archiver instance
new_session_file = os.path.join("secrets/", f"instabot-{time.strftime('%Y-%m-%d')}{random_str(8)}.session")
shutil.copy(self.session_file + ".session", new_session_file)
self.session_file = new_session_file.replace(".session", "")
try:
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
except OperationalError as e:
logger.error(f"Unable to access the {self.session_file} session, please make sure you don't use the same session file here and in telethon_archiver. if you do then disable at least one of the archivers for the 1st time you setup telethon session: {e}")
with self.client.start():
logger.success(f"SETUP {self.name} login works.")
def cleanup(self) -> None:
logger.info(f"CLEANUP {self.name}.")
session_file_name = self.session_file + ".session"
if os.path.exists(session_file_name):
os.remove(session_file_name)
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
if not "instagram.com" in url: return False
result = Metadata()
tmp_dir = ArchivingContext.get_tmp_dir()
with self.client.start():
chat = self.client.get_entity("instagram_load_bot")
since_id = self.client.send_message(entity=chat, message=url).id
attempts = 0
seen_media = []
message = ""
time.sleep(3)
# media is added before text by the bot so it can be used as a stop-logic mechanism
while attempts < (self.timeout - 3) and (not message or not len(seen_media)):
attempts += 1
time.sleep(1)
for post in self.client.iter_messages(chat, min_id=since_id):
since_id = max(since_id, post.id)
if post.media and post.id not in seen_media:
filename_dest = os.path.join(tmp_dir, f'{chat.id}_{post.id}')
media = self.client.download_media(post.media, filename_dest)
if media:
result.add_media(Media(media))
seen_media.append(post.id)
if post.message: message += post.message
if "You must enter a URL to a post" in message:
logger.debug(f"invalid link {url=} for {self.name}: {message}")
return False
if message:
result.set_content(message).set_title(message[:128])
return result.success("insta-via-bot")

View File

@@ -0,0 +1,26 @@
{
"name": "Telegram Archiver",
"type": ["extractor"],
"entry_point": "telegram_archiver:TelegramArchiver",
"requires_setup": False,
"depends": ["core"],
"external_dependencies": {
"python": [
"requests",
"bs4",
"loguru",
],
},
"description": """
The `TelegramArchiver` retrieves publicly available media content from Telegram message links without requiring login credentials.
It processes URLs to fetch images and videos embedded in Telegram messages, ensuring a structured output using `Metadata`
and `Media` objects. Recommended for scenarios where login-based archiving is not viable, although `telethon_archiver`
is advised for more comprehensive functionality.
### Features
- Extracts images and videos from public Telegram message links (`t.me`).
- Processes HTML content of messages to retrieve embedded media.
- Sets structured metadata, including timestamps, content, and media details.
- Does not require user authentication for Telegram.
""",
}

View File

@@ -0,0 +1,73 @@
import requests, re, html
from bs4 import BeautifulSoup
from loguru import logger
from auto_archiver.archivers import Archiver
from auto_archiver.core import Metadata, Media
class TelegramArchiver(Archiver):
"""
Archiver for telegram that does not require login, but the telethon_archiver is much more advised,
will only return if at least one image or one video is found
"""
name = "telegram_archiver"
def __init__(self, config: dict) -> None:
super().__init__(config)
@staticmethod
def configs() -> dict:
return {}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
# detect URLs that we definitely cannot handle
if 't.me' != item.netloc:
return False
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
}
# TODO: check if we can do this more resilient to variable URLs
if url[-8:] != "?embed=1":
url += "?embed=1"
t = requests.get(url, headers=headers)
s = BeautifulSoup(t.content, 'html.parser')
result = Metadata()
result.set_content(html.escape(str(t.content)))
if (timestamp := (s.find_all('time') or [{}])[0].get('datetime')):
result.set_timestamp(timestamp)
video = s.find("video")
if video is None:
logger.warning("could not find video")
image_tags = s.find_all(class_="tgme_widget_message_photo_wrap")
image_urls = []
for im in image_tags:
urls = [u.replace("'", "") for u in re.findall(r'url\((.*?)\)', im['style'])]
image_urls += urls
if not len(image_urls): return False
for img_url in image_urls:
result.add_media(Media(self.download_from_url(img_url)))
else:
video_url = video.get('src')
m_video = Media(self.download_from_url(video_url))
# extract duration from HTML
try:
duration = s.find_all('time')[0].contents[0]
if ':' in duration:
duration = float(duration.split(
':')[0]) * 60 + float(duration.split(':')[1])
else:
duration = float(duration)
m_video.set("duration", duration)
except: pass
result.add_media(m_video)
return result.success("telegram")

View File

@@ -0,0 +1,48 @@
# TODO rm dependency on json
{
"name": "telethon_archiver",
"type": ["extractor"],
"entry_point": "telethon_archiver:TelethonArchiver",
"requires_setup": True,
"depends": [""],
"external_dependencies": {
"python": ["telethon",
"loguru",
"tqdm",
],
"bin": [""]
},
"configs": {
"api_id": {"default": None, "help": "telegram API_ID value, go to https://my.telegram.org/apps"},
"api_hash": {"default": None, "help": "telegram API_HASH value, go to https://my.telegram.org/apps"},
"bot_token": {"default": None, "help": "optional, but allows access to more content such as large videos, talk to @botfather"},
"session_file": {"default": "secrets/anon", "help": "optional, records the telegram login session for future usage, '.session' will be appended to the provided value."},
"join_channels": {"default": True, "help": "disables the initial setup with channel_invites config, useful if you have a lot and get stuck"},
"channel_invites": {
"default": {},
"help": "(JSON string) private channel invite links (format: t.me/joinchat/HASH OR t.me/+HASH) and (optional but important to avoid hanging for minutes on startup) channel id (format: CHANNEL_ID taken from a post url like https://t.me/c/CHANNEL_ID/1), the telegram account will join any new channels on setup",
# TODO
#"cli_set": lambda cli_val, cur_val: dict(cur_val, **json.loads(cli_val))
}
},
"description": """
The `TelethonArchiver` uses the Telethon library to archive posts and media from Telegram channels and groups.
It supports private and public channels, downloading grouped posts with media, and can join channels using invite links
if provided in the configuration.
### Features
- Fetches posts and metadata from Telegram channels and groups, including private channels.
- Downloads media attachments (e.g., images, videos, audio) from individual posts or grouped posts.
- Handles channel invites to join channels dynamically during setup.
- Utilizes Telethon's capabilities for reliable Telegram interactions.
- Outputs structured metadata and media using `Metadata` and `Media` objects.
### Setup
To use the `TelethonArchiver`, you must configure the following:
- **API ID and API Hash**: Obtain these from [my.telegram.org](https://my.telegram.org/apps).
- **Session File**: Optional, but records login sessions for future use (default: `secrets/anon.session`).
- **Bot Token**: Optional, allows access to additional content (e.g., large videos) but limits private channel archiving.
- **Channel Invites**: Optional, specify a JSON string of invite links to join channels during setup.
"""
}

View File

@@ -0,0 +1,187 @@
import shutil
from telethon.sync import TelegramClient
from telethon.errors import ChannelInvalidError
from telethon.tl.functions.messages import ImportChatInviteRequest
from telethon.errors.rpcerrorlist import UserAlreadyParticipantError, FloodWaitError, InviteRequestSentError, InviteHashExpiredError
from loguru import logger
from tqdm import tqdm
import re, time, json, os
from auto_archiver.archivers import Archiver
from auto_archiver.core import Metadata, Media, ArchivingContext
from auto_archiver.utils import random_str
class TelethonArchiver(Archiver):
name = "telethon_archiver"
link_pattern = re.compile(r"https:\/\/t\.me(\/c){0,1}\/(.+)\/(\d+)")
invite_pattern = re.compile(r"t.me(\/joinchat){0,1}\/\+?(.+)")
def __init__(self, config: dict) -> None:
super().__init__(config)
self.assert_valid_string("api_id")
self.assert_valid_string("api_hash")
@staticmethod
def configs() -> dict:
return {
"api_id": {"default": None, "help": "telegram API_ID value, go to https://my.telegram.org/apps"},
"api_hash": {"default": None, "help": "telegram API_HASH value, go to https://my.telegram.org/apps"},
"bot_token": {"default": None, "help": "optional, but allows access to more content such as large videos, talk to @botfather"},
"session_file": {"default": "secrets/anon", "help": "optional, records the telegram login session for future usage, '.session' will be appended to the provided value."},
"join_channels": {"default": True, "help": "disables the initial setup with channel_invites config, useful if you have a lot and get stuck"},
"channel_invites": {
"default": {},
"help": "(JSON string) private channel invite links (format: t.me/joinchat/HASH OR t.me/+HASH) and (optional but important to avoid hanging for minutes on startup) channel id (format: CHANNEL_ID taken from a post url like https://t.me/c/CHANNEL_ID/1), the telegram account will join any new channels on setup",
"cli_set": lambda cli_val, cur_val: dict(cur_val, **json.loads(cli_val))
}
}
def setup(self) -> None:
"""
1. makes a copy of session_file that is removed in cleanup
2. trigger login process for telegram or proceed if already saved in a session file
3. joins channel_invites where needed
"""
logger.info(f"SETUP {self.name} checking login...")
# make a copy of the session that is used exclusively with this archiver instance
new_session_file = os.path.join("secrets/", f"telethon-{time.strftime('%Y-%m-%d')}{random_str(8)}.session")
shutil.copy(self.session_file + ".session", new_session_file)
self.session_file = new_session_file.replace(".session", "")
# initiate the client
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
with self.client.start():
logger.success(f"SETUP {self.name} login works.")
if self.join_channels and len(self.channel_invites):
logger.info(f"SETUP {self.name} joining channels...")
with self.client.start():
# get currently joined channels
# https://docs.telethon.dev/en/stable/modules/custom.html#module-telethon.tl.custom.dialog
joined_channel_ids = [c.id for c in self.client.get_dialogs() if c.is_channel]
logger.info(f"already part of {len(joined_channel_ids)} channels")
i = 0
pbar = tqdm(desc=f"joining {len(self.channel_invites)} invite links", total=len(self.channel_invites))
while i < len(self.channel_invites):
channel_invite = self.channel_invites[i]
channel_id = channel_invite.get("id", False)
invite = channel_invite["invite"]
if (match := self.invite_pattern.search(invite)):
try:
if channel_id:
ent = self.client.get_entity(int(channel_id)) # fails if not a member
else:
ent = self.client.get_entity(invite) # fails if not a member
logger.warning(f"please add the property id='{ent.id}' to the 'channel_invites' configuration where {invite=}, not doing so can lead to a minutes-long setup time due to telegram's rate limiting.")
except ValueError as e:
logger.info(f"joining new channel {invite=}")
try:
self.client(ImportChatInviteRequest(match.group(2)))
except UserAlreadyParticipantError as e:
logger.info(f"already joined {invite=}")
except InviteRequestSentError:
logger.warning(f"already sent a join request with {invite} still no answer")
except InviteHashExpiredError:
logger.warning(f"{invite=} has expired please find a more recent one")
except Exception as e:
logger.error(f"could not join channel with {invite=} due to {e}")
except FloodWaitError as e:
logger.warning(f"got a flood error, need to wait {e.seconds} seconds")
time.sleep(e.seconds)
continue
else:
logger.warning(f"Invalid invite link {invite}")
i += 1
pbar.update()
def cleanup(self) -> None:
logger.info(f"CLEANUP {self.name}.")
session_file_name = self.session_file + ".session"
if os.path.exists(session_file_name):
os.remove(session_file_name)
def download(self, item: Metadata) -> Metadata:
"""
if this url is archivable will download post info and look for other posts from the same group with media.
can handle private/public channels
"""
url = item.get_url()
# detect URLs that we definitely cannot handle
match = self.link_pattern.search(url)
logger.debug(f"TELETHON: {match=}")
if not match: return False
is_private = match.group(1) == "/c"
chat = int(match.group(2)) if is_private else match.group(2)
post_id = int(match.group(3))
result = Metadata()
# NB: not using bot_token since then private channels cannot be archived: self.client.start(bot_token=self.bot_token)
with self.client.start():
# with self.client.start(bot_token=self.bot_token):
try:
post = self.client.get_messages(chat, ids=post_id)
except ValueError as e:
logger.error(f"Could not fetch telegram {url} possibly it's private: {e}")
return False
except ChannelInvalidError as e:
logger.error(f"Could not fetch telegram {url}. This error may be fixed if you setup a bot_token in addition to api_id and api_hash (but then private channels will not be archived, we need to update this logic to handle both): {e}")
return False
logger.debug(f"TELETHON GOT POST {post=}")
if post is None: return False
media_posts = self._get_media_posts_in_group(chat, post)
logger.debug(f'got {len(media_posts)=} for {url=}')
tmp_dir = ArchivingContext.get_tmp_dir()
group_id = post.grouped_id if post.grouped_id is not None else post.id
title = post.message
for mp in media_posts:
if len(mp.message) > len(title): title = mp.message # save the longest text found (usually only 1)
# media can also be in entities
if mp.entities:
other_media_urls = [e.url for e in mp.entities if hasattr(e, "url") and e.url and self._guess_file_type(e.url) in ["video", "image", "audio"]]
if len(other_media_urls):
logger.debug(f"Got {len(other_media_urls)} other media urls from {mp.id=}: {other_media_urls}")
for i, om_url in enumerate(other_media_urls):
filename = self.download_from_url(om_url, f'{chat}_{group_id}_{i}')
result.add_media(Media(filename=filename), id=f"{group_id}_{i}")
filename_dest = os.path.join(tmp_dir, f'{chat}_{group_id}', str(mp.id))
filename = self.client.download_media(mp.media, filename_dest)
if not filename:
logger.debug(f"Empty media found, skipping {str(mp)=}")
continue
result.add_media(Media(filename))
result.set_title(title).set_timestamp(post.date).set("api_data", post.to_dict())
if post.message != title:
result.set_content(post.message)
return result.success("telethon")
def _get_media_posts_in_group(self, chat, original_post, max_amp=10):
"""
Searches for Telegram posts that are part of the same group of uploads
The search is conducted around the id of the original post with an amplitude
of `max_amp` both ways
Returns a list of [post] where each post has media and is in the same grouped_id
"""
if getattr(original_post, "grouped_id", None) is None:
return [original_post] if getattr(original_post, "media", False) else []
search_ids = [i for i in range(original_post.id - max_amp, original_post.id + max_amp + 1)]
posts = self.client.get_messages(chat, ids=search_ids)
media = []
for post in posts:
if post is not None and post.grouped_id == original_post.grouped_id and post.media is not None:
media.append(post)
return media

View File

@@ -0,0 +1,45 @@
{
"name": "Twitter API Archiver",
"type": ["extractor"],
"entry_point": "twitter_api_archiver:TwitterApiArchiver",
"requires_setup": True,
"depends": ["core"],
"external_dependencies": {
"python": ["requests",
"loguru",
"pytwitter",
"slugify",],
"bin": [""]
},
"configs": {
"bearer_token": {"default": None, "help": "[deprecated: see bearer_tokens] twitter API bearer_token which is enough for archiving, if not provided you will need consumer_key, consumer_secret, access_token, access_secret"},
"bearer_tokens": {"default": [], "help": " a list of twitter API bearer_token which is enough for archiving, if not provided you will need consumer_key, consumer_secret, access_token, access_secret, if provided you can still add those for better rate limits. CSV of bearer tokens if provided via the command line", "cli_set": lambda cli_val, cur_val: list(set(cli_val.split(",")))},
"consumer_key": {"default": None, "help": "twitter API consumer_key"},
"consumer_secret": {"default": None, "help": "twitter API consumer_secret"},
"access_token": {"default": None, "help": "twitter API access_token"},
"access_secret": {"default": None, "help": "twitter API access_secret"},
},
"description": """
The `TwitterApiArchiver` fetches tweets and associated media using the Twitter API.
It supports multiple API configurations for extended rate limits and reliable access.
Features include URL expansion, media downloads (e.g., images, videos), and structured output
via `Metadata` and `Media` objects. Requires Twitter API credentials such as bearer tokens
or consumer key/secret and access token/secret.
### Features
- Fetches tweets and their metadata, including text, creation timestamp, and author information.
- Downloads media attachments (e.g., images, videos) in high quality.
- Supports multiple API configurations for improved rate limiting.
- Expands shortened URLs (e.g., `t.co` links).
- Outputs structured metadata and media using `Metadata` and `Media` objects.
### Setup
To use the `TwitterApiArchiver`, you must provide valid Twitter API credentials via configuration:
- **Bearer Token(s)**: A single token or a list for rate-limited API access.
- **Consumer Key and Secret**: Required for user-authenticated API access.
- **Access Token and Secret**: Complements the consumer key for enhanced API capabilities.
Credentials can be obtained by creating a Twitter developer account at [Twitter Developer Platform](https://developer.twitter.com/en).
"""
,
}

View File

@@ -0,0 +1,143 @@
import json
import re
import mimetypes
import requests
from datetime import datetime
from loguru import logger
from pytwitter import Api
from slugify import slugify
from auto_archiver.archivers import Archiver
from auto_archiver.core import Metadata,Media
class TwitterApiArchiver(Archiver):
name = "twitter_api_archiver"
link_pattern = re.compile(r"(?:twitter|x).com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
def __init__(self, config: dict) -> None:
super().__init__(config)
self.api_index = 0
self.apis = []
if len(self.bearer_tokens):
self.apis.extend([Api(bearer_token=bearer_token) for bearer_token in self.bearer_tokens])
if self.bearer_token:
self.assert_valid_string("bearer_token")
self.apis.append(Api(bearer_token=self.bearer_token))
if self.consumer_key and self.consumer_secret and self.access_token and self.access_secret:
self.assert_valid_string("consumer_key")
self.assert_valid_string("consumer_secret")
self.assert_valid_string("access_token")
self.assert_valid_string("access_secret")
self.apis.append(Api(consumer_key=self.consumer_key, consumer_secret=self.consumer_secret,
access_token=self.access_token, access_secret=self.access_secret))
assert self.api_client is not None, "Missing Twitter API configurations, please provide either AND/OR (consumer_key, consumer_secret, access_token, access_secret) to use this archiver, you can provide both for better rate-limit results."
@staticmethod
def configs() -> dict:
return {
"bearer_token": {"default": None, "help": "[deprecated: see bearer_tokens] twitter API bearer_token which is enough for archiving, if not provided you will need consumer_key, consumer_secret, access_token, access_secret"},
"bearer_tokens": {"default": [], "help": " a list of twitter API bearer_token which is enough for archiving, if not provided you will need consumer_key, consumer_secret, access_token, access_secret, if provided you can still add those for better rate limits. CSV of bearer tokens if provided via the command line", "cli_set": lambda cli_val, cur_val: list(set(cli_val.split(",")))},
"consumer_key": {"default": None, "help": "twitter API consumer_key"},
"consumer_secret": {"default": None, "help": "twitter API consumer_secret"},
"access_token": {"default": None, "help": "twitter API access_token"},
"access_secret": {"default": None, "help": "twitter API access_secret"},
}
@property # getter .mimetype
def api_client(self) -> str:
return self.apis[self.api_index]
def sanitize_url(self, url: str) -> str:
# expand URL if t.co and clean tracker GET params
if 'https://t.co/' in url:
try:
r = requests.get(url, timeout=30)
logger.debug(f'Expanded url {url} to {r.url}')
url = r.url
except:
logger.error(f'Failed to expand url {url}')
return url
def download(self, item: Metadata) -> Metadata:
# call download retry until success or no more apis
while self.api_index < len(self.apis):
if res := self.download_retry(item): return res
self.api_index += 1
self.api_index = 0
return False
def get_username_tweet_id(self, url):
# detect URLs that we definitely cannot handle
matches = self.link_pattern.findall(url)
if not len(matches): return False, False
username, tweet_id = matches[0] # only one URL supported
logger.debug(f"Found {username=} and {tweet_id=} in {url=}")
return username, tweet_id
def download_retry(self, item: Metadata) -> Metadata:
url = item.get_url()
# detect URLs that we definitely cannot handle
username, tweet_id = self.get_username_tweet_id(url)
if not username: return False
try:
tweet = self.api_client.get_tweet(tweet_id, expansions=["attachments.media_keys"], media_fields=["type", "duration_ms", "url", "variants"], tweet_fields=["attachments", "author_id", "created_at", "entities", "id", "text", "possibly_sensitive"])
logger.debug(tweet)
except Exception as e:
logger.error(f"Could not get tweet: {e}")
return False
result = Metadata()
result.set_title(tweet.data.text)
result.set_timestamp(datetime.strptime(tweet.data.created_at, "%Y-%m-%dT%H:%M:%S.%fZ"))
urls = []
if tweet.includes:
for i, m in enumerate(tweet.includes.media):
media = Media(filename="")
if m.url and len(m.url):
media.set("src", m.url)
media.set("duration", (m.duration_ms or 1) // 1000)
mimetype = "image/jpeg"
elif hasattr(m, "variants"):
variant = self.choose_variant(m.variants)
if not variant: continue
media.set("src", variant.url)
mimetype = variant.content_type
else:
continue
logger.info(f"Found media {media}")
ext = mimetypes.guess_extension(mimetype)
media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}')
result.add_media(media)
result.set_content(json.dumps({
"id": tweet.data.id,
"text": tweet.data.text,
"created_at": tweet.data.created_at,
"author_id": tweet.data.author_id,
"geo": tweet.data.geo,
"lang": tweet.data.lang,
"media": urls
}, ensure_ascii=False, indent=4))
return result.success("twitter-api")
def choose_variant(self, variants):
"""
Chooses the highest quality variable possible out of a list of variants
"""
variant, bit_rate = None, -1
for var in variants:
if var.content_type == "video/mp4":
if var.bit_rate > bit_rate:
bit_rate = var.bit_rate
variant = var
else:
variant = var if not variant else variant
return variant

View File

@@ -0,0 +1,37 @@
{
"name": "VKontakte Archiver",
"type": ["extractor"],
"entry_point": "vk_archiver:VKArchiver",
"requires_setup": True,
"depends": ["core", "utils"],
"external_dependencies": {
"python": ["loguru",
"vk_url_scraper"],
},
"configs": {
"username": {"default": None, "help": "valid VKontakte username"},
"password": {"default": None, "help": "valid VKontakte password"},
"session_file": {"default": "secrets/vk_config.v2.json", "help": "valid VKontakte password"},
},
"description": """
The `VkArchiver` fetches posts, text, and images from VK (VKontakte) social media pages.
This archiver is specialized for `/wall` posts and uses the `VkScraper` library to extract
and download content. Note that VK videos are handled separately by the `YTDownloader`.
### Features
- Extracts text, timestamps, and metadata from VK `/wall` posts.
- Downloads associated images and attaches them to the resulting `Metadata` object.
- Processes multiple segments of VK URLs that contain mixed content (e.g., wall, photo).
- Outputs structured metadata and media using `Metadata` and `Media` objects.
### Setup
To use the `VkArchiver`, you must provide valid VKontakte login credentials and session information:
- **Username**: A valid VKontakte account username.
- **Password**: The corresponding password for the VKontakte account.
- **Session File**: Optional. Path to a session configuration file (`.json`) for persistent VK login.
Credentials can be set in the configuration file or directly via environment variables. Ensure you
have access to the VKontakte API by creating an account at [VKontakte](https://vk.com/).
"""
,
}

View File

@@ -0,0 +1,53 @@
from loguru import logger
from vk_url_scraper import VkScraper
from auto_archiver.utils.misc import dump_payload
from auto_archiver.archivers import Archiver
from auto_archiver.core import Metadata, Media, ArchivingContext
class VkArchiver(Archiver):
""""
VK videos are handled by YTDownloader, this archiver gets posts text and images.
Currently only works for /wall posts
"""
name = "vk_archiver"
def __init__(self, config: dict) -> None:
super().__init__(config)
self.assert_valid_string("username")
self.assert_valid_string("password")
self.vks = VkScraper(self.username, self.password, session_file=self.session_file)
@staticmethod
def configs() -> dict:
return {
"username": {"default": None, "help": "valid VKontakte username"},
"password": {"default": None, "help": "valid VKontakte password"},
"session_file": {"default": "secrets/vk_config.v2.json", "help": "valid VKontakte password"},
}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
if "vk.com" not in item.netloc: return False
# some urls can contain multiple wall/photo/... parts and all will be fetched
vk_scrapes = self.vks.scrape(url)
if not len(vk_scrapes): return False
logger.debug(f"VK: got {len(vk_scrapes)} scraped instances")
result = Metadata()
for scrape in vk_scrapes:
if not result.get_title():
result.set_title(scrape["text"])
if not result.get_timestamp():
result.set_timestamp(scrape["datetime"])
result.set_content(dump_payload(vk_scrapes))
filenames = self.vks.download_media(vk_scrapes, ArchivingContext.get_tmp_dir())
for filename in filenames:
result.add_media(Media(filename))
return result.success("vk")