Compare commits

...

22 Commits

Author SHA1 Message Date
R. Miles McCain
f603400d0d Add direct Atlos integration (#137)
* Add Atlos feeder

* Add Atlos db

* Add Atlos storage

* Fix Atlos storages

* Fix Atlos feeder

* Only include URLs in Atlos feeder once they're processed

* Remove print

* Add Atlos documentation to README

* Formatting fixes

* Don't archive existing material

* avoid KeyError in atlos_db

* version bump

---------

Co-authored-by: msramalho <19508417+msramalho@users.noreply.github.com>
2024-04-15 19:25:17 +01:00
msramalho
eb37f0b45b version bump 2024-04-15 19:02:54 +01:00
msramalho
75497f5773 minor bug fix when using an archiver_enricher in enrichers only 2024-04-15 19:02:40 +01:00
msramalho
623e555713 dependencies updates 2024-04-15 19:02:20 +01:00
msramalho
9c7824de57 browsertrix docker updates 2024-04-15 19:01:55 +01:00
msramalho
f4827770e6 adds instagram no stories as success, and fix for telethon-based archivers. 2024-03-05 14:49:10 +00:00
msramalho
601572d76e strip url 2024-02-29 11:54:01 +00:00
msramalho
d21e79a272 general security updates 2024-02-29 11:40:30 +00:00
msramalho
ccf5f857ef adds configurable limits to instagram/youtube 2024-02-25 15:14:17 +00:00
msramalho
7de317d1b5 avoiding exception 2024-02-23 15:54:33 +00:00
msramalho
70075a1e5e improving insta archiver 2024-02-23 15:37:28 +00:00
msramalho
5b9bc4919a version bump 2024-02-23 14:08:23 +00:00
msramalho
f0158ffd9c adds tagged posts and better parsing 2024-02-23 14:08:17 +00:00
msramalho
bfb35a43a9 adds more details from yt-dlp 2024-02-23 14:08:05 +00:00
msramalho
ef5b39c4f1 dind exception 2024-02-22 18:05:56 +00:00
msramalho
24ceafcb64 missing forward slash 2024-02-22 17:47:13 +00:00
msramalho
9fd4bb56a8 new attempt at dind wacz 2024-02-22 17:24:27 +00:00
msramalho
5324d562ba cleanup wacz patch 2024-02-21 18:14:30 +00:00
msramalho
5bf0a0206d version update 2024-02-21 17:26:07 +00:00
msramalho
4941823565 fix growing volume size in wacz_enricher 2024-02-21 17:25:55 +00:00
msramalho
27310c2911 fixes issue with api requests 2024-02-21 12:25:05 +00:00
msramalho
eb973ba42d v0.9.1 fixes to bad parsing in ssl certificates 2024-02-20 19:31:19 +00:00
31 changed files with 945 additions and 460 deletions

3
.gitignore vendored
View File

@@ -27,4 +27,5 @@ instaloader.session
orchestration.yaml orchestration.yaml
auto_archiver.egg-info* auto_archiver.egg-info*
logs* logs*
*.csv *.csv
archived/

View File

@@ -1,4 +1,4 @@
FROM webrecorder/browsertrix-crawler:latest FROM webrecorder/browsertrix-crawler:1.0.4
ENV RUNNING_IN_DOCKER=1 ENV RUNNING_IN_DOCKER=1
@@ -19,9 +19,8 @@ RUN pip install --upgrade pip && \
COPY Pipfile* ./ COPY Pipfile* ./
# install from pipenv, with browsertrix-only requirements # install from pipenv, with browsertrix-only requirements
RUN pipenv install && \ RUN pipenv install
pipenv install pywb uwsgi
# doing this at the end helps during development, builds are quick # doing this at the end helps during development, builds are quick
COPY ./src/ . COPY ./src/ .

828
Pipfile.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -177,6 +177,38 @@ To use Google Drive storage you need the id of the shared folder in the `config.
#### Telethon + Instagram with telegram bot #### Telethon + Instagram with telegram bot
The first time you run, you will be prompted to do a authentication with the phone number associated, alternatively you can put your `anon.session` in the root. The first time you run, you will be prompted to do a authentication with the phone number associated, alternatively you can put your `anon.session` in the root.
#### Atlos
When integrating with [Atlos](https://atlos.org), you will need to provide an API token in your configuration. You can learn more about Atlos and how to get an API token [here](https://docs.atlos.org/technical/api). You will have to provide this token to the `atlos_feeder`, `atlos_storage`, and `atlos_db` steps in your orchestration file. If you use a custom or self-hosted Atlos instance, you can also specify the `atlos_url` option to point to your custom instance's URL. For example:
```yaml
# orchestration.yaml content
steps:
feeder: atlos_feeder
archivers: # order matters
- youtubedl_archiver
enrichers:
- thumbnail_enricher
- hash_enricher
formatter: html_formatter
storages:
- atlos_storage
databases:
- console_db
- atlos_db
configurations:
atlos_feeder:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
atlos_db:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
atlos_storage:
atlos_url: "https://platform.atlos.org" # optional
api_token: "...your API token..."
hash_enricher:
algorithm: "SHA-256"
```
## Running on Google Sheets Feeder (gsheet_feeder) ## Running on Google Sheets Feeder (gsheet_feeder)
The `--gsheet_feeder.sheet` property is the name of the Google Sheet to check for URLs. The `--gsheet_feeder.sheet` property is the name of the Google Sheet to check for URLs.

View File

@@ -7,6 +7,7 @@ steps:
# - telegram_archiver # - telegram_archiver
# - twitter_archiver # - twitter_archiver
# - twitter_api_archiver # - twitter_api_archiver
# - instagram_api_archiver
# - instagram_tbot_archiver # - instagram_tbot_archiver
# - instagram_archiver # - instagram_archiver
# - tiktok_archiver # - tiktok_archiver

View File

@@ -22,6 +22,7 @@ class InstagramAPIArchiver(Archiver):
super().__init__(config) super().__init__(config)
self.assert_valid_string("access_token") self.assert_valid_string("access_token")
self.assert_valid_string("api_endpoint") self.assert_valid_string("api_endpoint")
self.full_profile_max_posts = int(self.full_profile_max_posts)
if self.api_endpoint[-1] == "/": self.api_endpoint = self.api_endpoint[:-1] if self.api_endpoint[-1] == "/": self.api_endpoint = self.api_endpoint[:-1]
self.full_profile = bool(self.full_profile) self.full_profile = bool(self.full_profile)
@@ -33,6 +34,7 @@ class InstagramAPIArchiver(Archiver):
"access_token": {"default": None, "help": "a valid instagrapi-api token"}, "access_token": {"default": None, "help": "a valid instagrapi-api token"},
"api_endpoint": {"default": None, "help": "API endpoint to use"}, "api_endpoint": {"default": None, "help": "API endpoint to use"},
"full_profile": {"default": False, "help": "if true, will download all posts, tagged posts, stories, and highlights for a profile, if false, will only download the profile pic and information."}, "full_profile": {"default": False, "help": "if true, will download all posts, tagged posts, stories, and highlights for a profile, if false, will only download the profile pic and information."},
"full_profile_max_posts": {"default": 0, "help": "Use to limit the number of posts to download when full_profile is true. 0 means no limit. limit is applied softly since posts are fetched in batch, once to: posts, tagged posts, and highlights"},
"minimize_json_output": {"default": True, "help": "if true, will remove empty values from the json output"}, "minimize_json_output": {"default": True, "help": "if true, will remove empty values from the json output"},
} }
@@ -73,9 +75,9 @@ class InstagramAPIArchiver(Archiver):
if type(d) == list: return [self.cleanup_dict(v) for v in d] if type(d) == list: return [self.cleanup_dict(v) for v in d]
if type(d) != dict: return d if type(d) != dict: return d
return { return {
k: self.cleanup_dict(v) if type(v) in [dict, list] else v k: clean_v
for k, v in d.items() for k, v in d.items()
if v not in [0.0, 0, [], {}, "", None, "null"] and if (clean_v := self.cleanup_dict(v)) not in [0.0, 0, [], {}, "", None, "null"] and
k not in ["x", "y", "width", "height"] k not in ["x", "y", "width", "height"]
} }
@@ -93,9 +95,6 @@ class InstagramAPIArchiver(Archiver):
if self.full_profile: if self.full_profile:
user_id = user.get("pk") user_id = user.get("pk")
# download all posts
self.download_all_posts(result, user_id)
# download all stories # download all stories
try: try:
stories = self._download_stories_reusable(result, username) stories = self._download_stories_reusable(result, username)
@@ -104,25 +103,46 @@ class InstagramAPIArchiver(Archiver):
result.append("errors", f"Error downloading stories for {username}") result.append("errors", f"Error downloading stories for {username}")
logger.error(f"Error downloading stories for {username}: {e}") logger.error(f"Error downloading stories for {username}: {e}")
# download all posts
try:
self.download_all_posts(result, user_id)
except Exception as e:
result.append("errors", f"Error downloading posts for {username}")
logger.error(f"Error downloading posts for {username}: {e}")
# download all tagged
try:
self.download_all_tagged(result, user_id)
except Exception as e:
result.append("errors", f"Error downloading tagged posts for {username}")
logger.error(f"Error downloading tagged posts for {username}: {e}")
# download all highlights # download all highlights
try: try:
count_highlights = 0 self.download_all_highlights(result, username, user_id)
highlights = self.call_api(f"v1/user/highlights", {"user_id": user_id})
for h in highlights:
try:
h_info = self._download_highlights_reusable(result, h.get("pk"))
count_highlights += len(h_info.get("items", []))
except Exception as e:
result.append("errors", f"Error downloading highlight id{h.get('pk')} for {username}")
logger.error(f"Error downloading highlight id{h.get('pk')} for {username}: {e}")
result.set("#highlights", count_highlights)
except Exception as e: except Exception as e:
result.append("errors", f"Error downloading highlights for {username}") result.append("errors", f"Error downloading highlights for {username}")
logger.error(f"Error downloading highlights for {username}: {e}") logger.error(f"Error downloading highlights for {username}: {e}")
result.set_url(url) # reset as scrape_item modifies it result.set_url(url) # reset as scrape_item modifies it
return result.success("insta profile") return result.success("insta profile")
def download_all_highlights(self, result, username, user_id):
count_highlights = 0
highlights = self.call_api(f"v1/user/highlights", {"user_id": user_id})
for h in highlights:
try:
h_info = self._download_highlights_reusable(result, h.get("pk"))
count_highlights += len(h_info.get("items", []))
except Exception as e:
result.append("errors", f"Error downloading highlight id{h.get('pk')} for {username}")
logger.error(f"Error downloading highlight id{h.get('pk')} for {username}: {e}")
if self.full_profile_max_posts and count_highlights >= self.full_profile_max_posts:
logger.info(f"HIGHLIGHTS reached full_profile_max_posts={self.full_profile_max_posts}")
break
result.set("#highlights", count_highlights)
def download_post(self, result: Metadata, code: str = None, id: str = None, context: str = None) -> Metadata: def download_post(self, result: Metadata, code: str = None, id: str = None, context: str = None) -> Metadata:
if id: if id:
post = self.call_api(f"v1/media/by/id", {"id": id}) post = self.call_api(f"v1/media/by/id", {"id": id})
@@ -166,12 +186,13 @@ class InstagramAPIArchiver(Archiver):
def download_stories(self, result: Metadata, username: str) -> Metadata: def download_stories(self, result: Metadata, username: str) -> Metadata:
now = datetime.now().strftime("%Y-%m-%d_%H-%M") now = datetime.now().strftime("%Y-%m-%d_%H-%M")
stories = self._download_stories_reusable(result, username) stories = self._download_stories_reusable(result, username)
if stories == []: return result.success("insta no story")
result.set_title(f"stories {username} at {now}").set("#stories", len(stories)) result.set_title(f"stories {username} at {now}").set("#stories", len(stories))
return result.success(f"insta stories {now}") return result.success(f"insta stories {now}")
def _download_stories_reusable(self, result: Metadata, username: str) -> list[dict]: def _download_stories_reusable(self, result: Metadata, username: str) -> list[dict]:
stories = self.call_api(f"v1/user/stories/by/username", {"username": username}) stories = self.call_api(f"v1/user/stories/by/username", {"username": username})
assert stories, f"Stories for {username} not found" if not stories or not len(stories): return []
stories = stories[::-1] # newest to oldest stories = stories[::-1] # newest to oldest
for s in tqdm(stories, desc="downloading stories", unit="story"): for s in tqdm(stories, desc="downloading stories", unit="story"):
@@ -188,7 +209,7 @@ class InstagramAPIArchiver(Archiver):
post_count = 0 post_count = 0
while end_cursor != "": while end_cursor != "":
posts = self.call_api(f"v1/user/medias/chunk", {"user_id": user_id, "end_cursor": end_cursor}) posts = self.call_api(f"v1/user/medias/chunk", {"user_id": user_id, "end_cursor": end_cursor})
if not len(posts): break if not len(posts) or not type(posts) == list or len(posts) != 2: break
posts, end_cursor = posts[0], posts[1] posts, end_cursor = posts[0], posts[1]
logger.info(f"parsing {len(posts)} posts, next {end_cursor=}") logger.info(f"parsing {len(posts)} posts, next {end_cursor=}")
@@ -199,7 +220,35 @@ class InstagramAPIArchiver(Archiver):
logger.error(f"Error downloading post, skipping {p.get('id')}: {e}") logger.error(f"Error downloading post, skipping {p.get('id')}: {e}")
pbar.update(1) pbar.update(1)
post_count+=1 post_count+=1
if self.full_profile_max_posts and post_count >= self.full_profile_max_posts:
logger.info(f"POSTS reached full_profile_max_posts={self.full_profile_max_posts}")
break
result.set("#posts", post_count) result.set("#posts", post_count)
def download_all_tagged(self, result: Metadata, user_id: str):
next_page_id = ""
pbar = tqdm(desc="downloading tagged posts")
tagged_count = 0
while next_page_id != None:
resp = self.call_api(f"v2/user/tag/medias", {"user_id": user_id, "page_id": next_page_id})
posts = resp.get("response", {}).get("items", [])
if not len(posts): break
next_page_id = resp.get("next_page_id")
logger.info(f"parsing {len(posts)} tagged posts, next {next_page_id=}")
for p in posts:
try: self.scrape_item(result, p, "tagged")
except Exception as e:
result.append("errors", f"Error downloading tagged post {p.get('id')}")
logger.error(f"Error downloading tagged post, skipping {p.get('id')}: {e}")
pbar.update(1)
tagged_count+=1
if self.full_profile_max_posts and tagged_count >= self.full_profile_max_posts:
logger.info(f"TAGS reached full_profile_max_posts={self.full_profile_max_posts}")
break
result.set("#tagged", tagged_count)
### reusable parsing utils below ### reusable parsing utils below
@@ -217,10 +266,10 @@ class InstagramAPIArchiver(Archiver):
if self.minimize_json_output: if self.minimize_json_output:
del item["clips_metadata"] del item["clips_metadata"]
if code := item.get("code"): if code := item.get("code") and not result.get("url"):
result.set("url", f"https://www.instagram.com/p/{code}/") result.set_url(f"https://www.instagram.com/p/{code}/")
resources = item.get("resources", []) resources = item.get("resources", item.get("carousel_media", []))
item, media, media_id = self.scrape_media(item, context) item, media, media_id = self.scrape_media(item, context)
# if resources are present take the main media from the first resource # if resources are present take the main media from the first resource
if not media and len(resources): if not media and len(resources):
@@ -242,7 +291,7 @@ class InstagramAPIArchiver(Archiver):
def scrape_media(self, item: dict, context:str) -> tuple[dict, Media, str]: def scrape_media(self, item: dict, context:str) -> tuple[dict, Media, str]:
# remove unnecessary info # remove unnecessary info
if self.minimize_json_output: if self.minimize_json_output:
for k in ["image_versions", "video_versions", "video_dash_manifest"]: for k in ["image_versions", "video_versions", "video_dash_manifest", "image_versions2", "video_versions2"]:
if k in item: del item[k] if k in item: del item[k]
item = self.cleanup_dict(item) item = self.cleanup_dict(item)
@@ -253,19 +302,24 @@ class InstagramAPIArchiver(Archiver):
# retrieve video info # retrieve video info
best_id = item.get('id', item.get('pk')) best_id = item.get('id', item.get('pk'))
taken_at = item.get("taken_at") taken_at = item.get("taken_at", item.get("taken_at_ts"))
code = item.get("code") code = item.get("code")
caption_text = item.get("caption_text")
if "carousel_media" in item: del item["carousel_media"]
if video_url := item.get("video_url"): if video_url := item.get("video_url"):
filename = self.download_from_url(video_url, verbose=False) filename = self.download_from_url(video_url, verbose=False)
video_media = Media(filename=filename) video_media = Media(filename=filename)
if taken_at: video_media.set("date", taken_at) if taken_at: video_media.set("date", taken_at)
if code: video_media.set("url", f"https://www.instagram.com/p/{code}") if code: video_media.set("url", f"https://www.instagram.com/p/{code}")
if caption_text: video_media.set("text", caption_text)
video_media.set("preview", [image_media]) video_media.set("preview", [image_media])
video_media.set("data", [item]) video_media.set("data", [item])
return item, video_media, f"{context or 'video'} {best_id}" return item, video_media, f"{context or 'video'} {best_id}"
elif image_media: elif image_media:
if taken_at: image_media.set("date", taken_at) if taken_at: image_media.set("date", taken_at)
if code: image_media.set("url", f"https://www.instagram.com/p/{code}") if code: image_media.set("url", f"https://www.instagram.com/p/{code}")
if caption_text: image_media.set("text", caption_text)
image_media.set("data", [item]) image_media.set("data", [item])
return item, image_media, f"{context or 'image'} {best_id}" return item, image_media, f"{context or 'image'} {best_id}"

View File

@@ -42,7 +42,7 @@ class InstagramTbotArchiver(Archiver):
# make a copy of the session that is used exclusively with this archiver instance # make a copy of the session that is used exclusively with this archiver instance
new_session_file = os.path.join("secrets/", f"instabot-{time.strftime('%Y-%m-%d')}{random_str(8)}.session") new_session_file = os.path.join("secrets/", f"instabot-{time.strftime('%Y-%m-%d')}{random_str(8)}.session")
shutil.copy(self.session_file + ".session", new_session_file) shutil.copy(self.session_file + ".session", new_session_file)
self.session_file = new_session_file self.session_file = new_session_file.replace(".session", "")
try: try:
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash) self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)

View File

@@ -49,7 +49,7 @@ class TelethonArchiver(Archiver):
# make a copy of the session that is used exclusively with this archiver instance # make a copy of the session that is used exclusively with this archiver instance
new_session_file = os.path.join("secrets/", f"telethon-{time.strftime('%Y-%m-%d')}{random_str(8)}.session") new_session_file = os.path.join("secrets/", f"telethon-{time.strftime('%Y-%m-%d')}{random_str(8)}.session")
shutil.copy(self.session_file + ".session", new_session_file) shutil.copy(self.session_file + ".session", new_session_file)
self.session_file = new_session_file self.session_file = new_session_file.replace(".session", "")
# initiate the client # initiate the client
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash) self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)

View File

@@ -15,6 +15,8 @@ class YoutubeDLArchiver(Archiver):
self.livestreams = bool(self.livestreams) self.livestreams = bool(self.livestreams)
self.live_from_start = bool(self.live_from_start) self.live_from_start = bool(self.live_from_start)
self.end_means_success = bool(self.end_means_success) self.end_means_success = bool(self.end_means_success)
self.allow_playlist = bool(self.allow_playlist)
self.max_downloads = self.max_downloads
@staticmethod @staticmethod
def configs() -> dict: def configs() -> dict:
@@ -26,6 +28,8 @@ class YoutubeDLArchiver(Archiver):
"live_from_start": {"default": False, "help": "if set, will download live streams from their earliest available moment, otherwise starts now."}, "live_from_start": {"default": False, "help": "if set, will download live streams from their earliest available moment, otherwise starts now."},
"proxy": {"default": "", "help": "http/socks (https seems to not work atm) proxy to use for the webdriver, eg https://proxy-user:password@proxy-ip:port"}, "proxy": {"default": "", "help": "http/socks (https seems to not work atm) proxy to use for the webdriver, eg https://proxy-user:password@proxy-ip:port"},
"end_means_success": {"default": True, "help": "if True, any archived content will mean a 'success', if False this archiver will not return a 'success' stage; this is useful for cases when the yt-dlp will archive a video but ignore other types of content like images or text only pages that the subsequent archivers can retrieve."}, "end_means_success": {"default": True, "help": "if True, any archived content will mean a 'success', if False this archiver will not return a 'success' stage; this is useful for cases when the yt-dlp will archive a video but ignore other types of content like images or text only pages that the subsequent archivers can retrieve."},
'allow_playlist': {"default": False, "help": "If True will also download playlists, set to False if the expectation is to download a single video."},
"max_downloads": {"default": "inf", "help": "Use to limit the number of videos to download when a channel or long page is being extracted. 'inf' means no limit."},
} }
def download(self, item: Metadata) -> Metadata: def download(self, item: Metadata) -> Metadata:
@@ -35,11 +39,11 @@ class YoutubeDLArchiver(Archiver):
logger.debug('Using Facebook cookie') logger.debug('Using Facebook cookie')
yt_dlp.utils.std_headers['cookie'] = self.facebook_cookie yt_dlp.utils.std_headers['cookie'] = self.facebook_cookie
ydl_options = {'outtmpl': os.path.join(ArchivingContext.get_tmp_dir(), f'%(id)s.%(ext)s'), 'quiet': False, 'noplaylist': True, 'writesubtitles': self.subtitles, 'writeautomaticsub': self.subtitles, "live_from_start": self.live_from_start, "proxy": self.proxy} ydl_options = {'outtmpl': os.path.join(ArchivingContext.get_tmp_dir(), f'%(id)s.%(ext)s'), 'quiet': False, 'noplaylist': not self.allow_playlist , 'writesubtitles': self.subtitles, 'writeautomaticsub': self.subtitles, "live_from_start": self.live_from_start, "proxy": self.proxy, "max_downloads": self.max_downloads, "playlistend": self.max_downloads}
ydl = yt_dlp.YoutubeDL(ydl_options) # allsubtitles and subtitleslangs not working as expected, so default lang is always "en" ydl = yt_dlp.YoutubeDL(ydl_options) # allsubtitles and subtitleslangs not working as expected, so default lang is always "en"
try: try:
# don'd download since it can be a live stream # don't download since it can be a live stream
info = ydl.extract_info(url, download=False) info = ydl.extract_info(url, download=False)
if info.get('is_live', False) and not self.livestreams: if info.get('is_live', False) and not self.livestreams:
logger.warning("Livestream detected, skipping due to 'livestreams' configuration setting") logger.warning("Livestream detected, skipping due to 'livestreams' configuration setting")
@@ -52,7 +56,8 @@ class YoutubeDLArchiver(Archiver):
return False return False
# this time download # this time download
ydl = yt_dlp.YoutubeDL({**ydl_options, "getcomments": self.comments}) ydl = yt_dlp.YoutubeDL({**ydl_options, "getcomments": self.comments})
#TODO: for playlist or long lists of videos, how to download one at a time so they can be stored before the next one is downloaded?
info = ydl.extract_info(url, download=True) info = ydl.extract_info(url, download=True)
if "entries" in info: if "entries" in info:
@@ -64,13 +69,17 @@ class YoutubeDLArchiver(Archiver):
result = Metadata() result = Metadata()
result.set_title(info.get("title")) result.set_title(info.get("title"))
if "description" in info: result.set_content(info["description"])
for entry in entries: for entry in entries:
try: try:
filename = ydl.prepare_filename(entry) filename = ydl.prepare_filename(entry)
if not os.path.exists(filename): if not os.path.exists(filename):
filename = filename.split('.')[0] + '.mkv' filename = filename.split('.')[0] + '.mkv'
new_media = Media(filename).set("duration", info.get("duration"))
new_media = Media(filename)
for x in ["duration", "original_url", "fulltitle", "description", "upload_date"]:
if x in entry: new_media.set(x, entry[x])
# read text from subtitles if enabled # read text from subtitles if enabled
if self.subtitles: if self.subtitles:
for lang, val in (info.get('requested_subtitles') or {}).items(): for lang, val in (info.get('requested_subtitles') or {}).items():

View File

@@ -25,10 +25,11 @@ class Media:
_mimetype: str = None # eg: image/jpeg _mimetype: str = None # eg: image/jpeg
_stored: bool = field(default=False, repr=False, metadata=config(exclude=lambda _: True)) # always exclude _stored: bool = field(default=False, repr=False, metadata=config(exclude=lambda _: True)) # always exclude
def store(self: Media, override_storages: List = None, url: str = "url-not-available"): def store(self: Media, override_storages: List = None, url: str = "url-not-available", metadata: Any = None):
# stores the media into the provided/available storages [Storage] # 'Any' typing for metadata to avoid circular imports. Stores the media
# repeats the process for its properties, in case they have inner media themselves # into the provided/available storages [Storage] repeats the process for
# for now it only goes down 1 level but it's easy to make it recursive if needed # its properties, in case they have inner media themselves for now it
# only goes down 1 level but it's easy to make it recursive if needed.
storages = override_storages or ArchivingContext.get("storages") storages = override_storages or ArchivingContext.get("storages")
if not len(storages): if not len(storages):
logger.warning(f"No storages found in local context or provided directly for {self.filename}.") logger.warning(f"No storages found in local context or provided directly for {self.filename}.")
@@ -36,7 +37,7 @@ class Media:
for s in storages: for s in storages:
for any_media in self.all_inner_media(include_self=True): for any_media in self.all_inner_media(include_self=True):
s.store(any_media, url) s.store(any_media, url, metadata=metadata)
def all_inner_media(self, include_self=False): def all_inner_media(self, include_self=False):
""" Media can be inside media properties, examples include transformations on original media. """ Media can be inside media properties, examples include transformations on original media.

View File

@@ -48,7 +48,7 @@ class Metadata:
self.remove_duplicate_media_by_hash() self.remove_duplicate_media_by_hash()
storages = override_storages or ArchivingContext.get("storages") storages = override_storages or ArchivingContext.get("storages")
for media in self.media: for media in self.media:
media.store(override_storages=storages, url=self.get_url()) media.store(override_storages=storages, url=self.get_url(), metadata=self)
def set(self, key: str, val: Any) -> Metadata: def set(self, key: str, val: Any) -> Metadata:
self.metadata[key] = val self.metadata[key] = val

View File

@@ -1,5 +1,7 @@
from __future__ import annotations from __future__ import annotations
from typing import Generator, Union, List from typing import Generator, Union, List
from urllib.parse import urlparse
from ipaddress import ip_address
from .context import ArchivingContext from .context import ArchivingContext
@@ -26,7 +28,7 @@ class ArchivingOrchestrator:
ArchivingContext.set("storages", self.storages, keep_on_reset=True) ArchivingContext.set("storages", self.storages, keep_on_reset=True)
try: try:
for a in self.archivers: a.setup() for a in self.all_archivers_for_setup(): a.setup()
except (KeyboardInterrupt, Exception) as e: except (KeyboardInterrupt, Exception) as e:
logger.error(f"Error during setup of archivers: {e}\n{traceback.format_exc()}") logger.error(f"Error during setup of archivers: {e}\n{traceback.format_exc()}")
self.cleanup() self.cleanup()
@@ -34,7 +36,7 @@ class ArchivingOrchestrator:
def cleanup(self)->None: def cleanup(self)->None:
logger.info("Cleaning up") logger.info("Cleaning up")
for a in self.archivers: a.cleanup() for a in self.all_archivers_for_setup(): a.cleanup()
def feed(self) -> Generator[Metadata]: def feed(self) -> Generator[Metadata]:
for item in self.feeder: for item in self.feeder:
@@ -60,7 +62,9 @@ class ArchivingOrchestrator:
exit() exit()
except Exception as e: except Exception as e:
logger.error(f'Got unexpected error on item {item}: {e}\n{traceback.format_exc()}') logger.error(f'Got unexpected error on item {item}: {e}\n{traceback.format_exc()}')
for d in self.databases: d.failed(item) for d in self.databases:
if type(e) == AssertionError: d.failed(item, str(e))
else: d.failed(item)
def archive(self, result: Metadata) -> Union[Metadata, None]: def archive(self, result: Metadata) -> Union[Metadata, None]:
@@ -73,7 +77,8 @@ class ArchivingOrchestrator:
5. Store all downloaded/generated media 5. Store all downloaded/generated media
6. Call selected Formatter and store formatted if needed 6. Call selected Formatter and store formatted if needed
""" """
original_url = result.get_url() original_url = result.get_url().strip()
self.assert_valid_url(original_url)
# 1 - sanitize - each archiver is responsible for cleaning/expanding its own URLs # 1 - sanitize - each archiver is responsible for cleaning/expanding its own URLs
url = original_url url = original_url
@@ -90,7 +95,9 @@ class ArchivingOrchestrator:
if cached_result: if cached_result:
logger.debug("Found previously archived entry") logger.debug("Found previously archived entry")
for d in self.databases: for d in self.databases:
d.done(cached_result, cached=True) try: d.done(cached_result, cached=True)
except Exception as e:
logger.error(f"ERROR database {d.name}: {e}: {traceback.format_exc()}")
return cached_result return cached_result
# 3 - call archivers until one succeeds # 3 - call archivers until one succeeds
@@ -113,13 +120,39 @@ class ArchivingOrchestrator:
# 6 - format and store formatted if needed # 6 - format and store formatted if needed
if (final_media := self.formatter.format(result)): if (final_media := self.formatter.format(result)):
final_media.store(url=url) final_media.store(url=url, metadata=result)
result.set_final_media(final_media) result.set_final_media(final_media)
if result.is_empty(): if result.is_empty():
result.status = "nothing archived" result.status = "nothing archived"
# signal completion to databases and archivers # signal completion to databases and archivers
for d in self.databases: d.done(result) for d in self.databases:
try: d.done(result)
except Exception as e:
logger.error(f"ERROR database {d.name}: {e}: {traceback.format_exc()}")
return result return result
def assert_valid_url(self, url: str) -> bool:
"""
Blocks localhost, private, reserved, and link-local IPs and all non-http/https schemes.
"""
assert url.startswith("http://") or url.startswith("https://"), f"Invalid URL scheme"
parsed = urlparse(url)
assert parsed.scheme in ["http", "https"], f"Invalid URL scheme"
assert parsed.hostname, f"Invalid URL hostname"
assert parsed.hostname != "localhost", f"Invalid URL"
try: # special rules for IP addresses
ip = ip_address(parsed.hostname)
except ValueError: pass
else:
assert ip.is_global, f"Invalid IP used"
assert not ip.is_reserved, f"Invalid IP used"
assert not ip.is_link_local, f"Invalid IP used"
assert not ip.is_private, f"Invalid IP used"
def all_archivers_for_setup(self) -> List[Archiver]:
return self.archivers + [e for e in self.enrichers if isinstance(e, Archiver)]

View File

@@ -2,4 +2,5 @@ from .database import Database
from .gsheet_db import GsheetsDb from .gsheet_db import GsheetsDb
from .console_db import ConsoleDb from .console_db import ConsoleDb
from .csv_db import CSVDb from .csv_db import CSVDb
from .api_db import AAApiDb from .api_db import AAApiDb
from .atlos_db import AtlosDb

View File

@@ -23,8 +23,7 @@ class AAApiDb(Database):
def configs() -> dict: def configs() -> dict:
return { return {
"api_endpoint": {"default": None, "help": "API endpoint where calls are made to"}, "api_endpoint": {"default": None, "help": "API endpoint where calls are made to"},
"api_secret": {"default": None, "help": "API Basic authentication secret [deprecating soon]"}, "api_token": {"default": None, "help": "API Bearer token."},
"api_token": {"default": None, "help": "API Bearer token, to be preferred over secret (Basic auth) going forward"},
"public": {"default": False, "help": "whether the URL should be publicly available via the API"}, "public": {"default": False, "help": "whether the URL should be publicly available via the API"},
"author_id": {"default": None, "help": "which email to assign as author"}, "author_id": {"default": None, "help": "which email to assign as author"},
"group_id": {"default": None, "help": "which group of users have access to the archive in case public=false as author"}, "group_id": {"default": None, "help": "which group of users have access to the archive in case public=false as author"},
@@ -59,7 +58,7 @@ class AAApiDb(Database):
logger.debug(f"saving archive of {item.get_url()} to the AA API.") logger.debug(f"saving archive of {item.get_url()} to the AA API.")
payload = {'result': item.to_json(), 'public': self.public, 'author_id': self.author_id, 'group_id': self.group_id, 'tags': list(self.tags)} payload = {'result': item.to_json(), 'public': self.public, 'author_id': self.author_id, 'group_id': self.group_id, 'tags': list(self.tags)}
headers = {"Authorization": f"Bearer {self.api_secret}"} headers = {"Authorization": f"Bearer {self.api_token}"}
response = requests.post(os.path.join(self.api_endpoint, "submit-archive"), json=payload, headers=headers) response = requests.post(os.path.join(self.api_endpoint, "submit-archive"), json=payload, headers=headers)
if response.status_code == 200: if response.status_code == 200:

View File

@@ -0,0 +1,79 @@
import os
from typing import Union
from loguru import logger
from csv import DictWriter
from dataclasses import asdict
import requests
from . import Database
from ..core import Metadata
from ..utils import get_atlos_config_options
class AtlosDb(Database):
"""
Outputs results to Atlos
"""
name = "atlos_db"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
@staticmethod
def configs() -> dict:
return get_atlos_config_options()
def failed(self, item: Metadata, reason: str) -> None:
"""Update DB accordingly for failure"""
# If the item has no Atlos ID, there's nothing for us to do
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={"metadata": {"processed": True, "status": "error", "error": reason}},
).raise_for_status()
logger.info(
f"Stored failure for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos: {reason}"
)
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check and fetch if the given item has been archived already, each
database should handle its own caching, and configuration mechanisms"""
return False
def _process_metadata(self, item: Metadata) -> dict:
"""Process metadata for storage on Atlos. Will convert any datetime
objects to ISO format."""
return {
k: v.isoformat() if hasattr(v, "isoformat") else v
for k, v in item.metadata.items()
}
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
if not item.metadata.get("atlos_id"):
logger.info(f"Item {item.get_url()} has no Atlos ID, skipping")
return
requests.post(
f"{self.atlos_url}/api/v2/source_material/metadata/{item.metadata['atlos_id']}/auto_archiver",
headers={"Authorization": f"Bearer {self.api_token}"},
json={
"metadata": dict(
processed=True,
status="success",
results=self._process_metadata(item),
)
},
).raise_for_status()
logger.info(
f"Stored success for {item.get_url()} (ID {item.metadata['atlos_id']}) on Atlos"
)

View File

@@ -21,8 +21,8 @@ class ConsoleDb(Database):
def started(self, item: Metadata) -> None: def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}") logger.warning(f"STARTED {item}")
def failed(self, item: Metadata) -> None: def failed(self, item: Metadata, reason:str) -> None:
logger.error(f"FAILED {item}") logger.error(f"FAILED {item}: {reason}")
def aborted(self, item: Metadata) -> None: def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}") logger.warning(f"ABORTED {item}")

View File

@@ -22,7 +22,7 @@ class Database(Step, ABC):
"""signals the DB that the given item archival has started""" """signals the DB that the given item archival has started"""
pass pass
def failed(self, item: Metadata) -> None: def failed(self, item: Metadata, reason:str) -> None:
"""update DB accordingly for failure""" """update DB accordingly for failure"""
pass pass

View File

@@ -29,9 +29,9 @@ class GsheetsDb(Database):
gw, row = self._retrieve_gsheet(item) gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, 'status', 'Archive in progress') gw.set_cell(row, 'status', 'Archive in progress')
def failed(self, item: Metadata) -> None: def failed(self, item: Metadata, reason:str) -> None:
logger.error(f"FAILED {item}") logger.error(f"FAILED {item}")
self._safe_status_update(item, 'Archive failed') self._safe_status_update(item, f'Archive failed {reason}')
def aborted(self, item: Metadata) -> None: def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}") logger.warning(f"ABORTED {item}")
@@ -102,6 +102,11 @@ class GsheetsDb(Database):
def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]: def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]:
# TODO: to make gsheet_db less coupled with gsheet_feeder's "gsheet" parameter, this method could 1st try to fetch "gsheet" from ArchivingContext and, if missing, manage its own singleton - not needed for now # TODO: to make gsheet_db less coupled with gsheet_feeder's "gsheet" parameter, this method could 1st try to fetch "gsheet" from ArchivingContext and, if missing, manage its own singleton - not needed for now
gw: GWorksheet = ArchivingContext.get("gsheet").get("worksheet") if gsheet := ArchivingContext.get("gsheet"):
row: int = ArchivingContext.get("gsheet").get("row") gw: GWorksheet = gsheet.get("worksheet")
row: int = gsheet.get("row")
elif self.sheet_id:
print(self.sheet_id)
return gw, row return gw, row

View File

@@ -27,7 +27,10 @@ class SSLEnricher(Enricher):
if not to_enrich.media and self.skip_when_nothing_archived: return if not to_enrich.media and self.skip_when_nothing_archived: return
url = to_enrich.get_url() url = to_enrich.get_url()
domain = urlparse(url).netloc parsed = urlparse(url)
assert parsed.scheme in ["https"], f"Invalid URL scheme {url=}"
domain = parsed.netloc
logger.debug(f"fetching SSL certificate for {domain=} in {url=}") logger.debug(f"fetching SSL certificate for {domain=} in {url=}")
cert = ssl.get_server_certificate((domain, 443)) cert = ssl.get_server_certificate((domain, 443))

View File

@@ -35,6 +35,22 @@ class WaczArchiverEnricher(Enricher, Archiver):
"socks_proxy_host": {"default": None, "help": "SOCKS proxy host for browsertrix-crawler, use in combination with socks_proxy_port. eg: user:password@host"}, "socks_proxy_host": {"default": None, "help": "SOCKS proxy host for browsertrix-crawler, use in combination with socks_proxy_port. eg: user:password@host"},
"socks_proxy_port": {"default": None, "help": "SOCKS proxy port for browsertrix-crawler, use in combination with socks_proxy_host. eg 1234"}, "socks_proxy_port": {"default": None, "help": "SOCKS proxy port for browsertrix-crawler, use in combination with socks_proxy_host. eg 1234"},
} }
def setup(self) -> None:
self.use_docker = os.environ.get('WACZ_ENABLE_DOCKER') or not os.environ.get('RUNNING_IN_DOCKER')
self.docker_in_docker = os.environ.get('WACZ_ENABLE_DOCKER') and os.environ.get('RUNNING_IN_DOCKER')
self.cwd_dind = f"/crawls/crawls{random_str(8)}"
self.browsertrix_home_host = os.environ.get('BROWSERTRIX_HOME_HOST')
self.browsertrix_home_container = os.environ.get('BROWSERTRIX_HOME_CONTAINER') or self.browsertrix_home_host
# create crawls folder if not exists, so it can be safely removed in cleanup
if self.docker_in_docker:
os.makedirs(self.cwd_dind, exist_ok=True)
def cleanup(self) -> None:
if self.docker_in_docker:
logger.debug(f"Removing {self.cwd_dind=}")
shutil.rmtree(self.cwd_dind, ignore_errors=True)
def download(self, item: Metadata) -> Metadata: def download(self, item: Metadata) -> Metadata:
# this new Metadata object is required to avoid duplication # this new Metadata object is required to avoid duplication
@@ -51,27 +67,30 @@ class WaczArchiverEnricher(Enricher, Archiver):
url = to_enrich.get_url() url = to_enrich.get_url()
collection = random_str(8) collection = random_str(8)
browsertrix_home_host = os.environ.get('BROWSERTRIX_HOME_HOST') or os.path.abspath(ArchivingContext.get_tmp_dir()) browsertrix_home_host = self.browsertrix_home_host or os.path.abspath(ArchivingContext.get_tmp_dir())
browsertrix_home_container = os.environ.get('BROWSERTRIX_HOME_CONTAINER') or browsertrix_home_host browsertrix_home_container = self.browsertrix_home_container or browsertrix_home_host
cmd = [ cmd = [
"crawl", "crawl",
"--url", url, "--url", url,
"--scopeType", "page", "--scopeType", "page",
"--generateWACZ", "--generateWACZ",
"--text", "--text", "to-pages",
"--screenshot", "fullPage", "--screenshot", "fullPage",
"--collection", collection, "--collection", collection,
"--id", collection, "--id", collection,
"--saveState", "never", "--saveState", "never",
"--behaviors", "autoscroll,autoplay,autofetch,siteSpecific", "--behaviors", "autoscroll,autoplay,autofetch,siteSpecific",
"--behaviorTimeout", str(self.timeout), "--behaviorTimeout", str(self.timeout),
"--timeout", str(self.timeout)] "--timeout", str(self.timeout),
"--blockAds" # TODO: test
]
if self.docker_in_docker:
cmd.extend(["--cwd", self.cwd_dind])
# call docker if explicitly enabled or we are running on the host (not in docker) # call docker if explicitly enabled or we are running on the host (not in docker)
use_docker = os.environ.get('WACZ_ENABLE_DOCKER') or not os.environ.get('RUNNING_IN_DOCKER') if self.use_docker:
if use_docker:
logger.debug(f"generating WACZ in Docker for {url=}") logger.debug(f"generating WACZ in Docker for {url=}")
logger.debug(f"{browsertrix_home_host=} {browsertrix_home_container=}") logger.debug(f"{browsertrix_home_host=} {browsertrix_home_container=}")
if self.docker_commands: if self.docker_commands:
@@ -93,9 +112,9 @@ class WaczArchiverEnricher(Enricher, Archiver):
try: try:
logger.info(f"Running browsertrix-crawler: {' '.join(cmd)}") logger.info(f"Running browsertrix-crawler: {' '.join(cmd)}")
my_env = os.environ.copy()
if self.socks_proxy_host and self.socks_proxy_port: if self.socks_proxy_host and self.socks_proxy_port:
logger.debug("Using SOCKS proxy for browsertrix-crawler") logger.debug("Using SOCKS proxy for browsertrix-crawler")
my_env = os.environ.copy()
my_env["SOCKS_HOST"] = self.socks_proxy_host my_env["SOCKS_HOST"] = self.socks_proxy_host
my_env["SOCKS_PORT"] = str(self.socks_proxy_port) my_env["SOCKS_PORT"] = str(self.socks_proxy_port)
subprocess.run(cmd, check=True, env=my_env) subprocess.run(cmd, check=True, env=my_env)
@@ -103,7 +122,10 @@ class WaczArchiverEnricher(Enricher, Archiver):
logger.error(f"WACZ generation failed: {e}") logger.error(f"WACZ generation failed: {e}")
return False return False
if use_docker:
if self.docker_in_docker:
wacz_fn = os.path.join(self.cwd_dind, "collections", collection, f"{collection}.wacz")
elif self.use_docker:
wacz_fn = os.path.join(browsertrix_home_container, "collections", collection, f"{collection}.wacz") wacz_fn = os.path.join(browsertrix_home_container, "collections", collection, f"{collection}.wacz")
else: else:
wacz_fn = os.path.join("collections", collection, f"{collection}.wacz") wacz_fn = os.path.join("collections", collection, f"{collection}.wacz")
@@ -116,7 +138,9 @@ class WaczArchiverEnricher(Enricher, Archiver):
if self.extract_media or self.extract_screenshot: if self.extract_media or self.extract_screenshot:
self.extract_media_from_wacz(to_enrich, wacz_fn) self.extract_media_from_wacz(to_enrich, wacz_fn)
if use_docker: if self.docker_in_docker:
jsonl_fn = os.path.join(self.cwd_dind, "collections", collection, "pages", "pages.jsonl")
elif self.use_docker:
jsonl_fn = os.path.join(browsertrix_home_container, "collections", collection, "pages", "pages.jsonl") jsonl_fn = os.path.join(browsertrix_home_container, "collections", collection, "pages", "pages.jsonl")
else: else:
jsonl_fn = os.path.join("collections", collection, "pages", "pages.jsonl") jsonl_fn = os.path.join("collections", collection, "pages", "pages.jsonl")
@@ -139,7 +163,7 @@ class WaczArchiverEnricher(Enricher, Archiver):
""" """
Receives a .wacz archive, and extracts all relevant media from it, adding them to to_enrich. Receives a .wacz archive, and extracts all relevant media from it, adding them to to_enrich.
""" """
logger.info(f"WACZ extract_media flag is set, extracting media from {wacz_filename=}") logger.info(f"WACZ extract_media or extract_screenshot flag is set, extracting media from {wacz_filename=}")
# unzipping the .wacz # unzipping the .wacz
tmp_dir = ArchivingContext.get_tmp_dir() tmp_dir = ArchivingContext.get_tmp_dir()
@@ -160,10 +184,11 @@ class WaczArchiverEnricher(Enricher, Archiver):
# get media out of .warc # get media out of .warc
counter = 0 counter = 0
seen_urls = set() seen_urls = set()
import json
with open(warc_filename, 'rb') as warc_stream: with open(warc_filename, 'rb') as warc_stream:
for record in ArchiveIterator(warc_stream): for record in ArchiveIterator(warc_stream):
# only include fetched resources # only include fetched resources
if record.rec_type == "resource" and self.extract_screenshot: # screenshots if record.rec_type == "resource" and record.content_type == "image/png" and self.extract_screenshot: # screenshots
fn = os.path.join(tmp_dir, f"warc-file-{counter}.png") fn = os.path.join(tmp_dir, f"warc-file-{counter}.png")
with open(fn, "wb") as outf: outf.write(record.raw_stream.read()) with open(fn, "wb") as outf: outf.write(record.raw_stream.read())
m = Media(filename=fn) m = Media(filename=fn)
@@ -209,4 +234,4 @@ class WaczArchiverEnricher(Enricher, Archiver):
to_enrich.add_media(m, warc_fn) to_enrich.add_media(m, warc_fn)
counter += 1 counter += 1
seen_urls.add(record_url) seen_urls.add(record_url)
logger.info(f"WACZ extract_media finished, found {counter} relevant media file(s)") logger.info(f"WACZ extract_media/extract_screenshot finished, found {counter} relevant media file(s)")

View File

@@ -44,7 +44,7 @@ class WhisperEnricher(Enricher):
job_results = {} job_results = {}
for i, m in enumerate(to_enrich.media): for i, m in enumerate(to_enrich.media):
if m.is_video() or m.is_audio(): if m.is_video() or m.is_audio():
m.store(url=url) m.store(url=url, metadata=to_enrich)
try: try:
job_id = self.submit_job(m) job_id = self.submit_job(m)
job_results[job_id] = False job_results[job_id] = False

View File

@@ -1,3 +1,4 @@
from.feeder import Feeder from.feeder import Feeder
from .gsheet_feeder import GsheetsFeeder from .gsheet_feeder import GsheetsFeeder
from .cli_feeder import CLIFeeder from .cli_feeder import CLIFeeder
from .atlos_feeder import AtlosFeeder

View File

@@ -0,0 +1,56 @@
from loguru import logger
import requests
from . import Feeder
from ..core import Metadata, ArchivingContext
from ..utils import get_atlos_config_options
class AtlosFeeder(Feeder):
name = "atlos_feeder"
def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called
super().__init__(config)
if type(self.api_token) != str:
raise Exception("Atlos Feeder did not receive an Atlos API token")
@staticmethod
def configs() -> dict:
return get_atlos_config_options()
def __iter__(self) -> Metadata:
# Get all the urls from the Atlos API
count = 0
cursor = None
while True:
response = requests.get(
f"{self.atlos_url}/api/v2/source_material",
headers={"Authorization": f"Bearer {self.api_token}"},
params={"cursor": cursor},
)
data = response.json()
response.raise_for_status()
cursor = data["next"]
for item in data["results"]:
if (
item["source_url"] not in [None, ""]
and (
item["metadata"]
.get("auto_archiver", {})
.get("processed", False)
!= True
)
and item["visibility"] == "visible"
and item["status"] not in ["processing", "pending"]
):
yield Metadata().set_url(item["source_url"]).set(
"atlos_id", item["id"]
)
count += 1
if len(data["results"]) == 0 or cursor is None:
break
logger.success(f"Processed {count} URL(s)")

View File

@@ -21,7 +21,7 @@ class HtmlFormatter(Formatter):
def __init__(self, config: dict) -> None: def __init__(self, config: dict) -> None:
# without this STEP.__init__ is not called # without this STEP.__init__ is not called
super().__init__(config) super().__init__(config)
self.environment = Environment(loader=FileSystemLoader(os.path.join(pathlib.Path(__file__).parent.resolve(), "templates/"))) self.environment = Environment(loader=FileSystemLoader(os.path.join(pathlib.Path(__file__).parent.resolve(), "templates/")), autoescape=True)
# JinjaHelper class static methods are added as filters # JinjaHelper class static methods are added as filters
self.environment.filters.update({ self.environment.filters.update({
k: v.__func__ for k, v in JinjaHelpers.__dict__.items() if isinstance(v, staticmethod) k: v.__func__ for k, v in JinjaHelpers.__dict__.items() if isinstance(v, staticmethod)

View File

@@ -177,14 +177,23 @@
} }
async function run() { async function run() {
await PreviewCertificates(); let setupFunctions = [
await PreviewText(); previewCertificates,
await enableCopyLogic(); previewText,
await enableCollapsibleLogic(); enableCopyLogic,
await setupSafeView(); enableCollapsibleLogic,
setupSafeView
];
setupFunctions.forEach(async f => {
try {
await f();
} catch (e) {
console.error(`Error in ${f.name}: ${e}`);
}
});
} }
async function PreviewCertificates() { async function previewCertificates() {
await Promise.all( await Promise.all(
Array.from(document.querySelectorAll(".pem-certificate")).map(async el => { Array.from(document.querySelectorAll(".pem-certificate")).map(async el => {
let certificate = await (await fetch(el.getAttribute("pem"))).text(); let certificate = await (await fetch(el.getAttribute("pem"))).text();
@@ -202,7 +211,7 @@
console.log("certificate preview done"); console.log("certificate preview done");
} }
async function PreviewText() { async function previewText() {
await Promise.all( await Promise.all(
Array.from(document.querySelectorAll(".text-preview")).map(async el => { Array.from(document.querySelectorAll(".text-preview")).map(async el => {
let textContent = await (await fetch(el.getAttribute("url"))).text(); let textContent = await (await fetch(el.getAttribute("url"))).text();

View File

@@ -1,4 +1,5 @@
from .storage import Storage from .storage import Storage
from .s3 import S3Storage from .s3 import S3Storage
from .local import LocalStorage from .local import LocalStorage
from .gd import GDriveStorage from .gd import GDriveStorage
from .atlos import AtlosStorage

View File

@@ -0,0 +1,74 @@
import os
from typing import IO, List, Optional
from loguru import logger
import requests
import hashlib
from ..core import Media, Metadata
from ..storages import Storage
from ..utils import get_atlos_config_options
class AtlosStorage(Storage):
name = "atlos_storage"
def __init__(self, config: dict) -> None:
super().__init__(config)
@staticmethod
def configs() -> dict:
return dict(Storage.configs(), **get_atlos_config_options())
def get_cdn_url(self, _media: Media) -> str:
# It's not always possible to provide an exact URL, because it's
# possible that the media once uploaded could have been copied to
# another project.
return self.atlos_url
def _hash(self, media: Media) -> str:
# Hash the media file using sha-256. We don't use the existing auto archiver
# hash because there's no guarantee that the configuerer is using sha-256, which
# is how Atlos hashes files.
sha256 = hashlib.sha256()
with open(media.filename, "rb") as f:
while True:
buf = f.read(4096)
if not buf: break
sha256.update(buf)
return sha256.hexdigest()
def upload(self, media: Media, metadata: Optional[Metadata]=None, **_kwargs) -> bool:
atlos_id = metadata.get("atlos_id")
if atlos_id is None:
logger.error(f"No Atlos ID found in metadata; can't store {media.filename} on Atlos")
return False
media_hash = self._hash(media)
# Check whether the media has already been uploaded
source_material = requests.get(
f"{self.atlos_url}/api/v2/source_material/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
).json()["result"]
existing_media = [x["file_hash_sha256"] for x in source_material.get("artifacts", [])]
if media_hash in existing_media:
logger.info(f"{media.filename} with SHA256 {media_hash} already uploaded to Atlos")
return True
# Upload the media to the Atlos API
requests.post(
f"{self.atlos_url}/api/v2/source_material/upload/{atlos_id}",
headers={"Authorization": f"Bearer {self.api_token}"},
params={
"title": media.properties
},
files={"file": (os.path.basename(media.filename), open(media.filename, "rb"))},
).raise_for_status()
logger.info(f"Uploaded {media.filename} to Atlos with ID {atlos_id} and title {media.key}")
return True
# must be implemented even if unused
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool: pass

View File

@@ -1,12 +1,12 @@
from __future__ import annotations from __future__ import annotations
from abc import abstractmethod from abc import abstractmethod
from dataclasses import dataclass from dataclasses import dataclass
from typing import IO from typing import IO, Optional
import os import os
from ..utils.misc import random_str from ..utils.misc import random_str
from ..core import Media, Step, ArchivingContext from ..core import Media, Step, ArchivingContext, Metadata
from ..enrichers import HashEnricher from ..enrichers import HashEnricher
from loguru import logger from loguru import logger
from slugify import slugify from slugify import slugify
@@ -43,12 +43,12 @@ class Storage(Step):
# only for typing... # only for typing...
return Step.init(name, config, Storage) return Step.init(name, config, Storage)
def store(self, media: Media, url: str) -> None: def store(self, media: Media, url: str, metadata: Optional[Metadata]=None) -> None:
if media.is_stored(): if media.is_stored():
logger.debug(f"{media.key} already stored, skipping") logger.debug(f"{media.key} already stored, skipping")
return return
self.set_key(media, url) self.set_key(media, url)
self.upload(media) self.upload(media, metadata=metadata)
media.add_url(self.get_cdn_url(media)) media.add_url(self.get_cdn_url(media))
@abstractmethod @abstractmethod

View File

@@ -3,4 +3,5 @@ from .gworksheet import GWorksheet
from .misc import * from .misc import *
from .webdriver import Webdriver from .webdriver import Webdriver
from .gsheet import Gsheets from .gsheet import Gsheets
from .url import UrlUtil from .url import UrlUtil
from .atlos import get_atlos_config_options

View File

@@ -0,0 +1,13 @@
def get_atlos_config_options():
return {
"api_token": {
"default": None,
"help": "An Atlos API token. For more information, see https://docs.atlos.org/technical/api/",
"cli_set": lambda cli_val, _: cli_val
},
"atlos_url": {
"default": "https://platform.atlos.org",
"help": "The URL of your Atlos instance (e.g., https://platform.atlos.org), without a trailing slash.",
"cli_set": lambda cli_val, _: cli_val
},
}

View File

@@ -1,9 +1,9 @@
_MAJOR = "0" _MAJOR = "0"
_MINOR = "9" _MINOR = "11"
# On main and in a nightly release the patch should be one ahead of the last # On main and in a nightly release the patch should be one ahead of the last
# released build. # released build.
_PATCH = "0" _PATCH = "1"
# This is mainly for nightly builds which have the suffix ".dev$DATE". See # This is mainly for nightly builds which have the suffix ".dev$DATE". See
# https://semver.org/#is-v123-a-semantic-version for the semantics. # https://semver.org/#is-v123-a-semantic-version for the semantics.
_SUFFIX = "" _SUFFIX = ""