Merge branch 'main' into linting_etc

# Conflicts:
#	src/auto_archiver/core/consts.py
#	src/auto_archiver/core/orchestrator.py
#	src/auto_archiver/core/storage.py
#	src/auto_archiver/modules/local_storage/local_storage.py
#	src/auto_archiver/modules/s3_storage/s3_storage.py
#	tests/storages/test_S3_storage.py
#	tests/storages/test_local_storage.py
#	tests/storages/test_storage_base.py
This commit is contained in:
erinhmclark
2025-03-11 10:39:47 +00:00
19 changed files with 614 additions and 320 deletions

View File

@@ -23,9 +23,9 @@
"help": "which group of users have access to the archive in case public=false as author",
},
"use_api_cache": {
"default": True,
"default": False,
"type": "bool",
"help": "if False then the API database will be queried prior to any archiving operations and stop if the link has already been archived",
"help": "if True then the API database will be queried prior to any archiving operations and stop if the link has already been archived",
},
"store_results": {
"default": True,

View File

@@ -17,7 +17,6 @@ class CLIFeeder(Feeder):
for url in urls:
logger.debug(f"Processing {url}")
m = Metadata().set_url(url)
m.set_context("folder", "cli")
yield m
logger.success(f"Processed {len(urls)} URL(s)")

View File

@@ -1,3 +1,4 @@
import shutil
from typing import IO
import os
@@ -5,25 +6,42 @@ from loguru import logger
from auto_archiver.core import Media
from auto_archiver.core import Storage
from auto_archiver.core.consts import SetupError
class LocalStorage(Storage):
def setup(self) -> None:
if len(self.save_to) > 200:
raise SetupError(f"Your save_to path is too long, this will cause issues saving files on your computer. Please use a shorter path.")
def get_cdn_url(self, media: Media) -> str:
# TODO: is this viable with Storage.configs on path/filename?
dest = os.path.join(self.save_to, media.key)
dest = media.key
if self.save_absolute:
dest = os.path.abspath(dest)
return dest
def set_key(self, media, url, metadata):
# clarify we want to save the file to the save_to folder
old_folder = metadata.get('folder', '')
metadata.set_context('folder', os.path.join(self.save_to, metadata.get('folder', '')))
super().set_key(media, url, metadata)
# don't impact other storages that might want a different 'folder' set
metadata.set_context('folder', old_folder)
def upload(self, media: Media, **kwargs) -> bool:
# override parent so that we can use shutil.copy2 and keep metadata
dest = os.path.join(self.save_to, media.key)
dest = media.key
os.makedirs(os.path.dirname(dest), exist_ok=True)
logger.debug(f"[{self.__class__.__name__}] storing file {media.filename} with key {media.key} to {dest}")
logger.debug(f'[{self.__class__.__name__}] storing file {media.filename} with key {media.key} to {dest}')
res = shutil.copy2(media.filename, dest)
logger.info(res)
return True
# must be implemented even if unused
def uploadf(self, file: IO[bytes], key: str, **kwargs: dict) -> bool:
pass
pass

View File

@@ -1,3 +1,4 @@
from typing import IO
import boto3
@@ -10,36 +11,33 @@ from auto_archiver.utils.misc import calculate_file_hash, random_str
NO_DUPLICATES_FOLDER = "no-dups/"
class S3Storage(Storage):
def setup(self) -> None:
self.s3 = boto3.client(
"s3",
's3',
region_name=self.region,
endpoint_url=self.endpoint_url.format(region=self.region),
aws_access_key_id=self.key,
aws_secret_access_key=self.secret,
aws_secret_access_key=self.secret
)
if self.random_no_duplicate:
logger.warning(
"random_no_duplicate is set to True, this will override `path_generator`, `filename_generator` and `folder`."
)
logger.warning("random_no_duplicate is set to True, this will override `path_generator`, `filename_generator` and `folder`.")
def get_cdn_url(self, media: Media) -> str:
return self.cdn_url.format(bucket=self.bucket, region=self.region, key=media.key)
def uploadf(self, file: IO[bytes], media: Media, **kwargs: dict) -> None:
if not self.is_upload_needed(media):
return True
if not self.is_upload_needed(media): return True
extra_args = kwargs.get("extra_args", {})
if not self.private and "ACL" not in extra_args:
extra_args["ACL"] = "public-read"
if not self.private and 'ACL' not in extra_args:
extra_args['ACL'] = 'public-read'
if "ContentType" not in extra_args:
if 'ContentType' not in extra_args:
try:
if media.mimetype:
extra_args["ContentType"] = media.mimetype
extra_args['ContentType'] = media.mimetype
except Exception as e:
logger.warning(f"Unable to get mimetype for {media.key=}, error: {e}")
self.s3.upload_fileobj(file, Bucket=self.bucket, Key=media.key, ExtraArgs=extra_args)
@@ -51,21 +49,21 @@ class S3Storage(Storage):
hd = calculate_file_hash(media.filename)
path = os.path.join(NO_DUPLICATES_FOLDER, hd[:24])
if existing_key := self.file_in_folder(path):
media.key = existing_key
if existing_key:=self.file_in_folder(path):
media._key = existing_key
media.set("previously archived", True)
logger.debug(f"skipping upload of {media.filename} because it already exists in {media.key}")
return False
_, ext = os.path.splitext(media.key)
media.key = os.path.join(path, f"{random_str(24)}{ext}")
media._key = os.path.join(path, f"{random_str(24)}{ext}")
return True
def file_in_folder(self, path: str) -> str:
def file_in_folder(self, path:str) -> str:
# checks if path exists and is not an empty folder
if not path.endswith("/"):
path = path + "/"
resp = self.s3.list_objects(Bucket=self.bucket, Prefix=path, Delimiter="/", MaxKeys=1)
if "Contents" in resp:
return resp["Contents"][0]["Key"]
return False
if not path.endswith('/'):
path = path + '/'
resp = self.s3.list_objects(Bucket=self.bucket, Prefix=path, Delimiter='/', MaxKeys=1)
if 'Contents' in resp:
return resp['Contents'][0]['Key']
return False

View File

@@ -0,0 +1 @@
from .tiktok_tikwm_extractor import TiktokTikwmExtractor

View File

@@ -0,0 +1,23 @@
{
"name": "Tiktok Tikwm Extractor",
"type": ["extractor"],
"requires_setup": False,
"dependencies": {
"python": ["loguru", "requests"],
"bin": []
},
"description": """
Uses an unofficial TikTok video download platform's API to download videos: https://tikwm.com/
This extractor complements the generic_extractor which can already get TikTok videos, but this one can extract special videos like those marked as sensitive.
### Features
- Downloads the video and, if possible, also the video cover.
- Stores extra metadata about the post like author information, and more as returned by tikwm.com.
### Notes
- If tikwm.com is down, this extractor will not work.
- If tikwm.com changes their API, this extractor may break.
- If no video is found, this extractor will consider the extraction failed.
"""
}

View File

@@ -0,0 +1,75 @@
import re
import requests
from loguru import logger
from datetime import datetime, timezone
from yt_dlp.extractor.tiktok import TikTokIE
from auto_archiver.core import Extractor
from auto_archiver.core import Metadata, Media
class TiktokTikwmExtractor(Extractor):
"""
Extractor for TikTok that uses an unofficial API and can capture content that requires a login, like sensitive content.
"""
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
if not re.match(TikTokIE._VALID_URL, url):
return False
endpoint = TiktokTikwmExtractor.TIKWM_ENDPOINT.format(url=url)
r = requests.get(endpoint)
if r.status_code != 200:
logger.error(f"unexpected status code '{r.status_code}' from tikwm.com for {url=}:")
return False
try:
json_response = r.json()
except ValueError:
logger.error(f"failed to parse JSON response from tikwm.com for {url=}")
return False
if not json_response.get('msg') == 'success' or not (api_data := json_response.get('data', {})):
logger.error(f"failed to get a valid response from tikwm.com for {url=}: {json_response}")
return False
# tries to get the non-watermarked version first
video_url = api_data.pop("play", api_data.pop("wmplay", None))
if not video_url:
logger.error(f"no valid video URL found in response from tikwm.com for {url=}")
return False
# prepare result, start by downloading video
result = Metadata()
# get the cover if possible
cover_url = api_data.pop("origin_cover", api_data.pop("cover", api_data.pop("ai_dynamic_cover", None)))
if cover_url and (cover_downloaded := self.download_from_url(cover_url)):
result.add_media(Media(cover_downloaded))
# get the video or fail
video_downloaded = self.download_from_url(video_url, f"vid_{api_data.get('id', '')}")
if not video_downloaded:
logger.error(f"failed to download video from {video_url}")
return False
video_media = Media(video_downloaded)
if duration := api_data.pop("duration", None):
video_media.set("duration", duration)
result.add_media(video_media)
# add remaining metadata
result.set_title(api_data.pop("title", ""))
if created_at := api_data.pop("create_time", None):
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
if (author := api_data.pop("author", None)):
result.set("author", author)
result.set("api_data", api_data)
return result.success("tikwm")