Compare commits

..

14 Commits

Author SHA1 Message Date
msramalho
a455728673 version bump 2024-04-16 12:44:42 +01:00
msramalho
8d4357a22c closes #135 2024-04-16 12:44:32 +01:00
Jett Chen
cf8691bad7 Add yt-dlp based archiving for TwitterArchiver (#138)
* Add ytdlp archiving capability

* Add type annotation

* version bump

---------

Co-authored-by: msramalho <19508417+msramalho@users.noreply.github.com>
2024-04-15 19:54:55 +01:00
R. Miles McCain
f603400d0d Add direct Atlos integration (#137)
* Add Atlos feeder

* Add Atlos db

* Add Atlos storage

* Fix Atlos storages

* Fix Atlos feeder

* Only include URLs in Atlos feeder once they're processed

* Remove print

* Add Atlos documentation to README

* Formatting fixes

* Don't archive existing material

* avoid KeyError in atlos_db

* version bump

---------

Co-authored-by: msramalho <19508417+msramalho@users.noreply.github.com>
2024-04-15 19:25:17 +01:00
msramalho
eb37f0b45b version bump 2024-04-15 19:02:54 +01:00
msramalho
75497f5773 minor bug fix when using an archiver_enricher in enrichers only 2024-04-15 19:02:40 +01:00
msramalho
623e555713 dependencies updates 2024-04-15 19:02:20 +01:00
msramalho
9c7824de57 browsertrix docker updates 2024-04-15 19:01:55 +01:00
msramalho
f4827770e6 adds instagram no stories as success, and fix for telethon-based archivers. 2024-03-05 14:49:10 +00:00
msramalho
601572d76e strip url 2024-02-29 11:54:01 +00:00
msramalho
d21e79a272 general security updates 2024-02-29 11:40:30 +00:00
msramalho
ccf5f857ef adds configurable limits to instagram/youtube 2024-02-25 15:14:17 +00:00
msramalho
7de317d1b5 avoiding exception 2024-02-23 15:54:33 +00:00
msramalho
70075a1e5e improving insta archiver 2024-02-23 15:37:28 +00:00
30 changed files with 895 additions and 442 deletions

3
.gitignore vendored
View File

@@ -27,4 +27,5 @@ instaloader.session
orchestration.yaml
auto_archiver.egg-info*
logs*
*.csv
*.csv
archived/

View File

@@ -1,4 +1,4 @@
FROM webrecorder/browsertrix-crawler:latest
FROM webrecorder/browsertrix-crawler:1.0.4
ENV RUNNING_IN_DOCKER=1
@@ -19,9 +19,8 @@ RUN pip install --upgrade pip && \
COPY Pipfile* ./
# install from pipenv, with browsertrix-only requirements
RUN pipenv install && \
pipenv install pywb uwsgi
RUN pipenv install
# doing this at the end helps during development, builds are quick
COPY ./src/ .

828
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -177,6 +177,38 @@ To use Google Drive storage you need the id of the shared folder in the `config.
#### Telethon + Instagram with telegram bot
The first time you run, you will be prompted to do a authentication with the phone number associated, alternatively you can put your `anon.session` in the root.
#### Atlos
When integrating with [Atlos](https://atlos.org), you will need to provide an API token in your configuration. You can learn more about Atlos and how to get an API token [here](https://docs.atlos.org/technical/api). You will have to provide this token to the `atlos_feeder`, `atlos_storage`, and `atlos_db` steps in your orchestration file. If you use a custom or self-hosted Atlos instance, you can also specify the `atlos_url` option to point to your custom instance's URL. For example:
```yaml
# orchestration.yaml content
steps:
feeder: atlos_feeder
archivers: # order matters
- youtubedl_archiver
enrichers:
- thumbnail_enricher
- hash_enricher
formatter: html_formatter
storages:
- atlos_storage
databases:
- console_db
- atlos_db
configurations:
atlos_feeder:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
atlos_db:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
atlos_storage:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
hash_enricher:
algorithm: "SHA-256"
```
## Running on Google Sheets Feeder (gsheet_feeder)
The `--gsheet_feeder.sheet` property is the name of the Google Sheet to check for URLs.

View File

@@ -7,6 +7,7 @@ steps:
# - telegram_archiver
# - twitter_archiver
# - twitter_api_archiver
# - instagram_api_archiver
# - instagram_tbot_archiver
# - instagram_archiver
# - tiktok_archiver

View File

@@ -22,6 +22,7 @@ class InstagramAPIArchiver(Archiver):
super().__init__(config)
self.assert_valid_string("access_token")
self.assert_valid_string("api_endpoint")
self.full_profile_max_posts = int(self.full_profile_max_posts)
if self.api_endpoint[-1] == "/": self.api_endpoint = self.api_endpoint[:-1]
self.full_profile = bool(self.full_profile)
@@ -33,6 +34,7 @@ class InstagramAPIArchiver(Archiver):
"access_token": {"default": None, "help": "a valid instagrapi-api token"},
"api_endpoint": {"default": None, "help": "API endpoint to use"},
"full_profile": {"default": False, "help": "if true, will download all posts, tagged posts, stories, and highlights for a profile, if false, will only download the profile pic and information."},
"full_profile_max_posts": {"default": 0, "help": "Use to limit the number of posts to download when full_profile is true. 0 means no limit. limit is applied softly since posts are fetched in batch, once to: posts, tagged posts, and highlights"},
"minimize_json_output": {"default": True, "help": "if true, will remove empty values from the json output"},
}
@@ -102,23 +104,22 @@ class InstagramAPIArchiver(Archiver):
logger.error(f"Error downloading stories for {username}: {e}")
# download all posts
self.download_all_posts(result, user_id)
try:
self.download_all_posts(result, user_id)
except Exception as e:
result.append("errors", f"Error downloading posts for {username}")
logger.error(f"Error downloading posts for {username}: {e}")
# download all tagged
self.download_all_tagged(result, user_id)
try:
self.download_all_tagged(result, user_id)
except Exception as e:
result.append("errors", f"Error downloading tagged posts for {username}")
logger.error(f"Error downloading tagged posts for {username}: {e}")
# download all highlights
try:
count_highlights = 0
highlights = self.call_api(f"v1/user/highlights", {"user_id": user_id})
for h in highlights:
try:
h_info = self._download_highlights_reusable(result, h.get("pk"))
count_highlights += len(h_info.get("items", []))
except Exception as e:
result.append("errors", f"Error downloading highlight id{h.get('pk')} for {username}")
logger.error(f"Error downloading highlight id{h.get('pk')} for {username}: {e}")
result.set("#highlights", count_highlights)
self.download_all_highlights(result, username, user_id)
except Exception as e:
result.append("errors", f"Error downloading highlights for {username}")
logger.error(f"Error downloading highlights for {username}: {e}")
@@ -127,6 +128,21 @@ class InstagramAPIArchiver(Archiver):
result.set_url(url) # reset as scrape_item modifies it
return result.success("insta profile")
def download_all_highlights(self, result, username, user_id):
count_highlights = 0
highlights = self.call_api(f"v1/user/highlights", {"user_id": user_id})
for h in highlights:
try:
h_info = self._download_highlights_reusable(result, h.get("pk"))
count_highlights += len(h_info.get("items", []))
except Exception as e:
result.append("errors", f"Error downloading highlight id{h.get('pk')} for {username}")
logger.error(f"Error downloading highlight id{h.get('pk')} for {username}: {e}")
if self.full_profile_max_posts and count_highlights >= self.full_profile_max_posts:
logger.info(f"HIGHLIGHTS reached full_profile_max_posts={self.full_profile_max_posts}")
break
result.set("#highlights", count_highlights)
def download_post(self, result: Metadata, code: str = None, id: str = None, context: str = None) -> Metadata:
if id:
post = self.call_api(f"v1/media/by/id", {"id": id})
@@ -170,12 +186,13 @@ class InstagramAPIArchiver(Archiver):
def download_stories(self, result: Metadata, username: str) -> Metadata:
now = datetime.now().strftime("%Y-%m-%d_%H-%M")
stories = self._download_stories_reusable(result, username)
if stories == []: return result.success("insta no story")
result.set_title(f"stories {username} at {now}").set("#stories", len(stories))
return result.success(f"insta stories {now}")
def _download_stories_reusable(self, result: Metadata, username: str) -> list[dict]:
stories = self.call_api(f"v1/user/stories/by/username", {"username": username})
assert stories, f"Stories for {username} not found"
if not stories or not len(stories): return []
stories = stories[::-1] # newest to oldest
for s in tqdm(stories, desc="downloading stories", unit="story"):
@@ -192,7 +209,7 @@ class InstagramAPIArchiver(Archiver):
post_count = 0
while end_cursor != "":
posts = self.call_api(f"v1/user/medias/chunk", {"user_id": user_id, "end_cursor": end_cursor})
if not len(posts): break
if not len(posts) or not type(posts) == list or len(posts) != 2: break
posts, end_cursor = posts[0], posts[1]
logger.info(f"parsing {len(posts)} posts, next {end_cursor=}")
@@ -203,6 +220,9 @@ class InstagramAPIArchiver(Archiver):
logger.error(f"Error downloading post, skipping {p.get('id')}: {e}")
pbar.update(1)
post_count+=1
if self.full_profile_max_posts and post_count >= self.full_profile_max_posts:
logger.info(f"POSTS reached full_profile_max_posts={self.full_profile_max_posts}")
break
result.set("#posts", post_count)
def download_all_tagged(self, result: Metadata, user_id: str):
@@ -225,6 +245,9 @@ class InstagramAPIArchiver(Archiver):
logger.error(f"Error downloading tagged post, skipping {p.get('id')}: {e}")
pbar.update(1)
tagged_count+=1
if self.full_profile_max_posts and tagged_count >= self.full_profile_max_posts:
logger.info(f"TAGS reached full_profile_max_posts={self.full_profile_max_posts}")
break
result.set("#tagged", tagged_count)
@@ -243,8 +266,8 @@ class InstagramAPIArchiver(Archiver):
if self.minimize_json_output:
del item["clips_metadata"]
if code := item.get("code"):
result.set("url", f"https://www.instagram.com/p/{code}/")
if code := item.get("code") and not result.get("url"):
result.set_url(f"https://www.instagram.com/p/{code}/")
resources = item.get("resources", item.get("carousel_media", []))
item, media, media_id = self.scrape_media(item, context)

View File

@@ -42,7 +42,7 @@ class InstagramTbotArchiver(Archiver):
# make a copy of the session that is used exclusively with this archiver instance
new_session_file = os.path.join("secrets/", f"instabot-{time.strftime('%Y-%m-%d')}{random_str(8)}.session")
shutil.copy(self.session_file + ".session", new_session_file)
self.session_file = new_session_file
self.session_file = new_session_file.replace(".session", "")
try:
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
@@ -54,8 +54,9 @@ class InstagramTbotArchiver(Archiver):
def cleanup(self) -> None:
logger.info(f"CLEANUP {self.name}.")
if os.path.exists(self.session_file):
os.remove(self.session_file)
session_file_name = self.session_file + ".session"
if os.path.exists(session_file_name):
os.remove(session_file_name)
def download(self, item: Metadata) -> Metadata:
url = item.get_url()

View File

@@ -49,7 +49,7 @@ class TelethonArchiver(Archiver):
# make a copy of the session that is used exclusively with this archiver instance
new_session_file = os.path.join("secrets/", f"telethon-{time.strftime('%Y-%m-%d')}{random_str(8)}.session")
shutil.copy(self.session_file + ".session", new_session_file)
self.session_file = new_session_file
self.session_file = new_session_file.replace(".session", "")
# initiate the client
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
@@ -101,8 +101,9 @@ class TelethonArchiver(Archiver):
def cleanup(self) -> None:
logger.info(f"CLEANUP {self.name}.")
if os.path.exists(self.session_file):
os.remove(self.session_file)
session_file_name = self.session_file + ".session"
if os.path.exists(session_file_name):
os.remove(session_file_name)
def download(self, item: Metadata) -> Metadata:
"""

View File

@@ -2,6 +2,8 @@ import re, requests, mimetypes, json
from datetime import datetime
from loguru import logger
from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo
from yt_dlp import YoutubeDL
from yt_dlp.extractor.twitter import TwitterIE
from slugify import slugify
from . import Archiver
@@ -98,7 +100,9 @@ class TwitterArchiver(Archiver):
hack_url = f"https://cdn.syndication.twimg.com/tweet-result?id={tweet_id}"
r = requests.get(hack_url)
if r.status_code != 200: return False
if r.status_code != 200 or r.json()=={}:
logger.warning(f"Failed to get tweet information from {hack_url}, trying ytdl")
return self.download_ytdl(item, url, tweet_id)
tweet = r.json()
urls = []
@@ -108,7 +112,7 @@ class TwitterArchiver(Archiver):
# 1 tweet has 1 video max
if "video" in tweet:
v = tweet["video"]
urls.append(self.choose_variant(v.get("variants", [])))
urls.append(self.choose_variant(v.get("variants", []))['url'])
logger.debug(f"Twitter hack got {urls=}")
@@ -125,6 +129,38 @@ class TwitterArchiver(Archiver):
result.set_title(tweet.get("text")).set_content(json.dumps(tweet, ensure_ascii=False)).set_timestamp(datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ"))
return result.success("twitter-hack")
def download_ytdl(self, item: Metadata, url:str, tweet_id:str) -> Metadata:
downloader = YoutubeDL()
tie = TwitterIE(downloader)
tweet = tie._extract_status(tweet_id)
result = Metadata()
result\
.set_title(tweet.get('full_text', ''))\
.set_content(json.dumps(tweet, ensure_ascii=False))\
.set_timestamp(datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y"))
if not tweet.get("entities", {}).get("media"):
logger.debug('No media found, archiving tweet text only')
return result
for i, tw_media in enumerate(tweet["entities"]["media"]):
media = Media(filename="")
mimetype = ""
if tw_media["type"] == "photo":
media.set("src", UrlUtil.twitter_best_quality_url(tw_media['media_url_https']))
mimetype = "image/jpeg"
elif tw_media["type"] == "video":
variant = self.choose_variant(tw_media['video_info']['variants'])
media.set("src", variant['url'])
mimetype = variant['content_type']
elif tw_media["type"] == "animated_gif":
variant = tw_media['video_info']['variants'][0]
media.set("src", variant['url'])
mimetype = variant['content_type']
ext = mimetypes.guess_extension(mimetype)
media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}', item)
result.add_media(media)
return result.success("twitter-ytdl")
def get_username_tweet_id(self, url):
# detect URLs that we definitely cannot handle
@@ -140,13 +176,13 @@ class TwitterArchiver(Archiver):
# choosing the highest quality possible
variant, width, height = None, 0, 0
for var in variants:
if var.get("type", "") == "video/mp4":
width_height = re.search(r"\/(\d+)x(\d+)\/", var["src"])
if var.get("content_type", "") == "video/mp4":
width_height = re.search(r"\/(\d+)x(\d+)\/", var["url"])
if width_height:
w, h = int(width_height[1]), int(width_height[2])
if w > width or h > height:
width, height = w, h
variant = var.get("src", variant)
variant = var
else:
variant = var.get("src") if not variant else variant
variant = var if not variant else variant
return variant

View File

@@ -15,6 +15,8 @@ class YoutubeDLArchiver(Archiver):
self.livestreams = bool(self.livestreams)
self.live_from_start = bool(self.live_from_start)
self.end_means_success = bool(self.end_means_success)
self.allow_playlist = bool(self.allow_playlist)
self.max_downloads = self.max_downloads
@staticmethod
def configs() -> dict:
@@ -26,6 +28,8 @@ class YoutubeDLArchiver(Archiver):
"live_from_start": {"default": False, "help": "if set, will download live streams from their earliest available moment, otherwise starts now."},
"proxy": {"default": "", "help": "http/socks (https seems to not work atm) proxy to use for the webdriver, eg https://proxy-user:password@proxy-ip:port"},
"end_means_success": {"default": True, "help": "if True, any archived content will mean a 'success', if False this archiver will not return a 'success' stage; this is useful for cases when the yt-dlp will archive a video but ignore other types of content like images or text only pages that the subsequent archivers can retrieve."},
'allow_playlist': {"default": False, "help": "If True will also download playlists, set to False if the expectation is to download a single video."},
"max_downloads": {"default": "inf", "help": "Use to limit the number of videos to download when a channel or long page is being extracted. 'inf' means no limit."},
}
def download(self, item: Metadata) -> Metadata:
@@ -35,7 +39,7 @@ class YoutubeDLArchiver(Archiver):
logger.debug('Using Facebook cookie')
yt_dlp.utils.std_headers['cookie'] = self.facebook_cookie
ydl_options = {'outtmpl': os.path.join(ArchivingContext.get_tmp_dir(), f'%(id)s.%(ext)s'), 'quiet': False, 'noplaylist': True, 'writesubtitles': self.subtitles, 'writeautomaticsub': self.subtitles, "live_from_start": self.live_from_start, "proxy": self.proxy}
ydl_options = {'outtmpl': os.path.join(ArchivingContext.get_tmp_dir(), f'%(id)s.%(ext)s'), 'quiet': False, 'noplaylist': not self.allow_playlist , 'writesubtitles': self.subtitles, 'writeautomaticsub': self.subtitles, "live_from_start": self.live_from_start, "proxy": self.proxy, "max_downloads": self.max_downloads, "playlistend": self.max_downloads}
ydl = yt_dlp.YoutubeDL(ydl_options) # allsubtitles and subtitleslangs not working as expected, so default lang is always "en"
try:
@@ -52,7 +56,8 @@ class YoutubeDLArchiver(Archiver):
return False
# this time download
ydl = yt_dlp.YoutubeDL({**ydl_options, "getcomments": self.comments})
ydl = yt_dlp.YoutubeDL({**ydl_options, "getcomments": self.comments})
#TODO: for playlist or long lists of videos, how to download one at a time so they can be stored before the next one is downloaded?
info = ydl.extract_info(url, download=True)
if "entries" in info:

View File

@@ -25,10 +25,11 @@ class Media:
_mimetype: str = None # eg: image/jpeg
_stored: bool = field(default=False, repr=False, metadata=config(exclude=lambda _: True)) # always exclude
def store(self: Media, override_storages: List = None, url: str = "url-not-available"):
# stores the media into the provided/available storages [Storage]
# repeats the process for its properties, in case they have inner media themselves
# for now it only goes down 1 level but it's easy to make it recursive if needed
def store(self: Media, override_storages: List = None, url: str = "url-not-available", metadata: Any = None):
# 'Any' typing for metadata to avoid circular imports. Stores the media
# into the provided/available storages [Storage] repeats the process for
# its properties, in case they have inner media themselves for now it
# only goes down 1 level but it's easy to make it recursive if needed.
storages = override_storages or ArchivingContext.get("storages")
if not len(storages):
logger.warning(f"No storages found in local context or provided directly for {self.filename}.")
@@ -36,7 +37,7 @@ class Media:
for s in storages:
for any_media in self.all_inner_media(include_self=True):
s.store(any_media, url)
s.store(any_media, url, metadata=metadata)
def all_inner_media(self, include_self=False):
""" Media can be inside media properties, examples include transformations on original media.

View File

@@ -48,7 +48,7 @@ class Metadata:
self.remove_duplicate_media_by_hash()
storages = override_storages or ArchivingContext.get("storages")
for media in self.media:
media.store(override_storages=storages, url=self.get_url())
media.store(override_storages=storages, url=self.get_url(), metadata=self)
def set(self, key: str, val: Any) -> Metadata:
self.metadata[key] = val

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
from typing import Generator, Union, List
from urllib.parse import urlparse
from ipaddress import ip_address
from .context import ArchivingContext
@@ -26,7 +28,7 @@ class ArchivingOrchestrator:
ArchivingContext.set("storages", self.storages, keep_on_reset=True)
try:
for a in self.archivers: a.setup()
for a in self.all_archivers_for_setup(): a.setup()
except (KeyboardInterrupt, Exception) as e:
logger.error(f"Error during setup of archivers: {e}\n{traceback.format_exc()}")
self.cleanup()
@@ -34,7 +36,7 @@ class ArchivingOrchestrator:
def cleanup(self)->None:
logger.info("Cleaning up")
for a in self.archivers: a.cleanup()
for a in self.all_archivers_for_setup(): a.cleanup()
def feed(self) -> Generator[Metadata]:
for item in self.feeder:
@@ -60,7 +62,9 @@ class ArchivingOrchestrator:
exit()
except Exception as e:
logger.error(f'Got unexpected error on item {item}: {e}\n{traceback.format_exc()}')
for d in self.databases: d.failed(item)
for d in self.databases:
if type(e) == AssertionError: d.failed(item, str(e))
else: d.failed(item)
def archive(self, result: Metadata) -> Union[Metadata, None]:
@@ -73,7 +77,8 @@ class ArchivingOrchestrator:
5. Store all downloaded/generated media
6. Call selected Formatter and store formatted if needed
"""
original_url = result.get_url()
original_url = result.get_url().strip()
self.assert_valid_url(original_url)
# 1 - sanitize - each archiver is responsible for cleaning/expanding its own URLs
url = original_url
@@ -115,7 +120,7 @@ class ArchivingOrchestrator:
# 6 - format and store formatted if needed
if (final_media := self.formatter.format(result)):
final_media.store(url=url)
final_media.store(url=url, metadata=result)
result.set_final_media(final_media)
if result.is_empty():
@@ -128,3 +133,26 @@ class ArchivingOrchestrator:
logger.error(f"ERROR database {d.name}: {e}: {traceback.format_exc()}")
return result
def assert_valid_url(self, url: str) -> bool:
"""
Blocks localhost, private, reserved, and link-local IPs and all non-http/https schemes.
"""
assert url.startswith("http://") or url.startswith("https://"), f"Invalid URL scheme"
parsed = urlparse(url)
assert parsed.scheme in ["http", "https"], f"Invalid URL scheme"
assert parsed.hostname, f"Invalid URL hostname"
assert parsed.hostname != "localhost", f"Invalid URL"
try: # special rules for IP addresses
ip = ip_address(parsed.hostname)
except ValueError: pass
else:
assert ip.is_global, f"Invalid IP used"
assert not ip.is_reserved, f"Invalid IP used"
assert not ip.is_link_local, f"Invalid IP used"
assert not ip.is_private, f"Invalid IP used"
def all_archivers_for_setup(self) -> List[Archiver]:
return self.archivers + [e for e in self.enrichers if isinstance(e, Archiver)]

View File

@@ -2,4 +2,5 @@ from .database import Database
from .gsheet_db import GsheetsDb
from .console_db import ConsoleDb
from .csv_db import CSVDb
from .api_db import AAApiDb
from .api_db import AAApiDb
from .atlos_db import AtlosDb

View File

@@ -0,0 +1,79 @@
import os
from typing import Union
from loguru import logger
from csv import DictWriter
from dataclasses import asdict
import requests
from . import Database
from ..core import Metadata
from ..utils import get_atlos_config_options
class AtlosDb(Database):
"""
Outputs results to Atlos
"""
name = "atlos_db"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
@staticmethod
def configs() -> dict:
return get_atlos_config_options()
def failed(self, item: Metadata, reason: str) -> None:
"""Update DB accordingly for failure"""
# If the item has no Atlos ID, there's nothing for us to do
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={"metadata": {"processed": True, "status": "error", "error": reason}},
).raise_for_status()
logger.info(
f"Stored failure for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos: {reason}"
)
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check and fetch if the given item has been archived already, each
database should handle its own caching, and configuration mechanisms"""
return False
def _process_metadata(self, item: Metadata) -> dict:
"""Process metadata for storage on Atlos. Will convert any datetime
objects to ISO format."""
return {
k: v.isoformat() if hasattr(v, "isoformat") else v
for k, v in item.metadata.items()
}
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={
"metadata": dict(
processed=True,
status="success",
results=self._process_metadata(item),
)
},
).raise_for_status()
logger.info(
f"Stored success for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos"
)

View File

@@ -21,8 +21,8 @@ class ConsoleDb(Database):
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
def failed(self, item: Metadata) -> None:
logger.error(f"FAILED {item}")
def failed(self, item: Metadata, reason:str) -> None:
logger.error(f"FAILED {item}: {reason}")
def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}")

View File

@@ -22,7 +22,7 @@ class Database(Step, ABC):
"""signals the DB that the given item archival has started"""
pass
def failed(self, item: Metadata) -> None:
def failed(self, item: Metadata, reason:str) -> None:
"""update DB accordingly for failure"""
pass

View File

@@ -29,9 +29,9 @@ class GsheetsDb(Database):
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, 'status', 'Archive in progress')
def failed(self, item: Metadata) -> None:
def failed(self, item: Metadata, reason:str) -> None:
logger.error(f"FAILED {item}")
self._safe_status_update(item, 'Archive failed')
self._safe_status_update(item, f'Archive failed {reason}')
def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}")
@@ -102,6 +102,11 @@ class GsheetsDb(Database):
def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]:
# TODO: to make gsheet_db less coupled with gsheet_feeder's "gsheet" parameter, this method could 1st try to fetch "gsheet" from ArchivingContext and, if missing, manage its own singleton - not needed for now
gw: GWorksheet = ArchivingContext.get("gsheet").get("worksheet")
row: int = ArchivingContext.get("gsheet").get("row")
if gsheet := ArchivingContext.get("gsheet"):
gw: GWorksheet = gsheet.get("worksheet")
row: int = gsheet.get("row")
elif self.sheet_id:
print(self.sheet_id)
return gw, row

View File

@@ -27,7 +27,10 @@ class SSLEnricher(Enricher):
if not to_enrich.media and self.skip_when_nothing_archived: return
url = to_enrich.get_url()
domain = urlparse(url).netloc
parsed = urlparse(url)
assert parsed.scheme in ["https"], f"Invalid URL scheme {url=}"
domain = parsed.netloc
logger.debug(f"fetching SSL certificate for {domain=} in {url=}")
cert = ssl.get_server_certificate((domain, 443))

View File

@@ -75,14 +75,16 @@ class WaczArchiverEnricher(Enricher, Archiver):
"--url", url,
"--scopeType", "page",
"--generateWACZ",
"--text",
"--text", "to-pages",
"--screenshot", "fullPage",
"--collection", collection,
"--id", collection,
"--saveState", "never",
"--behaviors", "autoscroll,autoplay,autofetch,siteSpecific",
"--behaviorTimeout", str(self.timeout),
"--timeout", str(self.timeout)]
"--timeout", str(self.timeout),
"--blockAds" # TODO: test
]
if self.docker_in_docker:
cmd.extend(["--cwd", self.cwd_dind])
@@ -110,9 +112,9 @@ class WaczArchiverEnricher(Enricher, Archiver):
try:
logger.info(f"Running browsertrix-crawler: {' '.join(cmd)}")
my_env = os.environ.copy()
if self.socks_proxy_host and self.socks_proxy_port:
logger.debug("Using SOCKS proxy for browsertrix-crawler")
my_env = os.environ.copy()
my_env["SOCKS_HOST"] = self.socks_proxy_host
my_env["SOCKS_PORT"] = str(self.socks_proxy_port)
subprocess.run(cmd, check=True, env=my_env)
@@ -161,7 +163,7 @@ class WaczArchiverEnricher(Enricher, Archiver):
"""
Receives a .wacz archive, and extracts all relevant media from it, adding them to to_enrich.
"""
logger.info(f"WACZ extract_media flag is set, extracting media from {wacz_filename=}")
logger.info(f"WACZ extract_media or extract_screenshot flag is set, extracting media from {wacz_filename=}")
# unzipping the .wacz
tmp_dir = ArchivingContext.get_tmp_dir()
@@ -182,10 +184,11 @@ class WaczArchiverEnricher(Enricher, Archiver):
# get media out of .warc
counter = 0
seen_urls = set()
import json
with open(warc_filename, 'rb') as warc_stream:
for record in ArchiveIterator(warc_stream):
# only include fetched resources
if record.rec_type == "resource" and self.extract_screenshot: # screenshots
if record.rec_type == "resource" and record.content_type == "image/png" and self.extract_screenshot: # screenshots
fn = os.path.join(tmp_dir, f"warc-file-{counter}.png")
with open(fn, "wb") as outf: outf.write(record.raw_stream.read())
m = Media(filename=fn)
@@ -231,4 +234,4 @@ class WaczArchiverEnricher(Enricher, Archiver):
to_enrich.add_media(m, warc_fn)
counter += 1
seen_urls.add(record_url)
logger.info(f"WACZ extract_media finished, found {counter} relevant media file(s)")
logger.info(f"WACZ extract_media/extract_screenshot finished, found {counter} relevant media file(s)")

View File

@@ -44,7 +44,7 @@ class WhisperEnricher(Enricher):
job_results = {}
for i, m in enumerate(to_enrich.media):
if m.is_video() or m.is_audio():
m.store(url=url)
m.store(url=url, metadata=to_enrich)
try:
job_id = self.submit_job(m)
job_results[job_id] = False

View File

@@ -1,3 +1,4 @@
from.feeder import Feeder
from .gsheet_feeder import GsheetsFeeder
from .cli_feeder import CLIFeeder
from .cli_feeder import CLIFeeder
from .atlos_feeder import AtlosFeeder

View File

@@ -0,0 +1,56 @@
from loguru import logger
import requests
from . import Feeder
from ..core import Metadata, ArchivingContext
from ..utils import get_atlos_config_options
class AtlosFeeder(Feeder):
name = "atlos_feeder"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
if type(self.api_token) != str:
raise Exception("Atlos Feeder did not receive an Atlos API token")
@staticmethod
def configs() -> dict:
return get_atlos_config_options()
def __iter__(self) -> Metadata:
# Get all the urls from the Atlos API
count = 0
cursor = None
while True:
response = requests.get(
f"{self.atlos_url}/api/v2/source_material",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"cursor": cursor},
)
data = response.json()
response.raise_for_status()
cursor = data["next"]
for item in data["results"]:
if (
item["source_url"] not in [None, ""]
and (
item["metadata"]
.get("auto_archiver", {})
.get("processed", False)
!= True
)
and item["visibility"] == "visible"
and item["status"] not in ["processing", "pending"]
):
yield Metadata().set_url(item["source_url"]).set(
"atlos_id", item["id"]
)
count += 1
if len(data["results"]) == 0 or cursor is None:
break
logger.success(f"Processed {count} URL(s)")

View File

@@ -21,7 +21,7 @@ class HtmlFormatter(Formatter):
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
self.environment = Environment(loader=FileSystemLoader(os.path.join(pathlib.Path(__file__).parent.resolve(), "templates/")))
self.environment = Environment(loader=FileSystemLoader(os.path.join(pathlib.Path(__file__).parent.resolve(), "templates/")), autoescape=True)
# JinjaHelper class static methods are added as filters
self.environment.filters.update({
k: v.__func__ for k, v in JinjaHelpers.__dict__.items() if isinstance(v, staticmethod)

View File

@@ -1,4 +1,5 @@
from .storage import Storage
from .s3 import S3Storage
from .local import LocalStorage
from .gd import GDriveStorage
from .gd import GDriveStorage
from .atlos import AtlosStorage

View File

@@ -0,0 +1,74 @@
import os
from typing import IO, List, Optional
from loguru import logger
import requests
import hashlib
from ..core import Media, Metadata
from ..storages import Storage
from ..utils import get_atlos_config_options
class AtlosStorage(Storage):
name = "atlos_storage"
def __init__(self, config: dict) -> None:
super().__init__(config)
@staticmethod
def configs() -> dict:
return dict(Storage.configs(), **get_atlos_config_options())
def get_cdn_url(self, _media: Media) -> str:
# It's not always possible to provide an exact URL, because it's
# possible that the media once uploaded could have been copied to
# another project.
return self.atlos_url
def _hash(self, media: Media) -> str:
# Hash the media file using sha-256. We don't use the existing auto archiver
# hash because there's no guarantee that the configuerer is using sha-256, which
# is how Atlos hashes files.
sha256 = hashlib.sha256()
with open(media.filename, "rb") as f:
while True:
buf = f.read(4096)
if not buf: break
sha256.update(buf)
return sha256.hexdigest()
def upload(self, media: Media, metadata: Optional[Metadata]=None, **_kwargs) -> bool:
atlos_id = metadata.get("atlos_id")
if atlos_id is None:
logger.error(f"No Atlos ID found in metadata; can't store {media.filename} on Atlos")
return False
media_hash = self._hash(media)
# Check whether the media has already been uploaded
source_material = requests.get(
f"{self.atlos_url}/api/v2/source_material/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
).json()["result"]
existing_media = [x["file_hash_sha256"] for x in source_material.get("artifacts", [])]
if media_hash in existing_media:
logger.info(f"{media.filename} with SHA256 {media_hash} already uploaded to Atlos")
return True
# Upload the media to the Atlos API
requests.post(
f"{self.atlos_url}/api/v2/source_material/upload/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
params={
"title": media.properties
},
files={"file": (os.path.basename(media.filename), open(media.filename, "rb"))},
).raise_for_status()
logger.info(f"Uploaded {media.filename} to Atlos with ID {atlos_id} and title {media.key}")
return True
# must be implemented even if unused
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass

View File

@@ -1,12 +1,12 @@
from __future__ import annotations
from abc import abstractmethod
from dataclasses import dataclass
from typing import IO
from typing import IO, Optional
import os
from ..utils.misc import random_str
from ..core import Media, Step, ArchivingContext
from ..core import Media, Step, ArchivingContext, Metadata
from ..enrichers import HashEnricher
from loguru import logger
from slugify import slugify
@@ -43,12 +43,12 @@ class Storage(Step):
# only for typing...
return Step.init(name, config, Storage)
def store(self, media: Media, url: str) -> None:
def store(self, media: Media, url: str, metadata: Optional[Metadata]=None) -> None:
if media.is_stored():
logger.debug(f"{media.key} already stored, skipping")
return
self.set_key(media, url)
self.upload(media)
self.upload(media, metadata=metadata)
media.add_url(self.get_cdn_url(media))
@abstractmethod

View File

@@ -3,4 +3,5 @@ from .gworksheet import GWorksheet
from .misc import *
from .webdriver import Webdriver
from .gsheet import Gsheets
from .url import UrlUtil
from .url import UrlUtil
from .atlos import get_atlos_config_options

View File

@@ -0,0 +1,13 @@
def get_atlos_config_options():
return {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"cli_set": lambda cli_val, _: cli_val
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"cli_set": lambda cli_val, _: cli_val
},
}

View File

@@ -1,9 +1,9 @@
_MAJOR = "0"
_MINOR = "9"
_MINOR = "11"
# On main and in a nightly release the patch should be one ahead of the last
# released build.
_PATCH = "8"
_PATCH = "3"
# This is mainly for nightly builds which have the suffix ".dev$DATE". See
# https://semver.org/#is-v123-a-semantic-version for the semantics.
_SUFFIX = ""