diff --git a/.github/workflows/tests-download.yaml b/.github/workflows/tests-download.yaml index 2a1de73..fc31f42 100644 --- a/.github/workflows/tests-download.yaml +++ b/.github/workflows/tests-download.yaml @@ -35,4 +35,4 @@ jobs: run: poetry install --no-interaction --with dev - name: Run Download Tests - run: poetry run pytest -ra -v -m "download" + run: poetry run pytest -ra -v -x -m "download" diff --git a/poetry.lock b/poetry.lock index b7e811a..adb2726 100644 --- a/poetry.lock +++ b/poetry.lock @@ -630,18 +630,6 @@ future = "*" [package.extras] dev = ["Sphinx (==2.1.0)", "future (==0.17.1)", "numpy (==1.16.4)", "pytest (==4.6.1)", "pytest-mock (==1.10.4)", "tox (==3.12.1)"] -[[package]] -name = "filetype" -version = "1.2.0" -description = "Infer file type and MIME type of any file/buffer. No external dependencies." -optional = false -python-versions = "*" -groups = ["main"] -files = [ - {file = "filetype-1.2.0-py2.py3-none-any.whl", hash = "sha256:7ce71b6880181241cf7ac8697a2f1eb6a8bd9b429f7ad6d27b8db9ba5f1c2d25"}, - {file = "filetype-1.2.0.tar.gz", hash = "sha256:66b56cd6474bf41d8c54660347d37afcc3f7d1970648de365c102ef77548aadb"}, -] - [[package]] name = "future" version = "1.0.0" @@ -2387,4 +2375,4 @@ test = ["pytest (>=8.1,<9.0)", "pytest-rerunfailures (>=14.0,<15.0)"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.13" -content-hash = "1c421ff71f62bd25d3c25efd6c6b49d95446243e352a4111fd9e7462c4aeb704" +content-hash = "99800b85fc1678ba4eca510a3c01ba273f229644b08c711a2e466845794abf38" diff --git a/pyproject.toml b/pyproject.toml index 5a27cd6..995024a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,7 +56,6 @@ dependencies = [ "retrying (>=0.0.0)", "tsp-client (>=0.0.0)", "certvalidator (>=0.0.0)", - "filetype (>=1.2.0,<2.0.0)", ] [tool.poetry.group.dev.dependencies] diff --git a/src/auto_archiver/archivers/__init__.py b/src/auto_archiver/archivers/__init__.py index 996ca3b..24dde91 100644 --- a/src/auto_archiver/archivers/__init__.py +++ b/src/auto_archiver/archivers/__init__.py @@ -1,12 +1,9 @@ from .archiver import Archiver from .telethon_archiver import TelethonArchiver -from .twitter_archiver import TwitterArchiver from .twitter_api_archiver import TwitterApiArchiver from .instagram_archiver import InstagramArchiver from .instagram_tbot_archiver import InstagramTbotArchiver -from .tiktok_archiver import TiktokArchiver from .telegram_archiver import TelegramArchiver from .vk_archiver import VkArchiver -from .youtubedl_archiver import YoutubeDLArchiver -from .instagram_api_archiver import InstagramAPIArchiver -from .bluesky_archiver import BlueskyArchiver \ No newline at end of file +from .base_archiver.base_archiver import BaseArchiver as YoutubeDLArchiver +from .instagram_api_archiver import InstagramAPIArchiver \ No newline at end of file diff --git a/src/auto_archiver/archivers/archiver.py b/src/auto_archiver/archivers/archiver.py index 6fed8b7..911389a 100644 --- a/src/auto_archiver/archivers/archiver.py +++ b/src/auto_archiver/archivers/archiver.py @@ -2,7 +2,7 @@ from __future__ import annotations from pathlib import Path from abc import abstractmethod from dataclasses import dataclass -import filetype +import mimetypes import os import mimetypes, requests from loguru import logger @@ -68,21 +68,17 @@ class Archiver(Step): 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36' } try: - d = requests.get(url, stream=True, headers=headers) + d = requests.get(url, stream=True, headers=headers, timeout=30) d.raise_for_status() - # Peek at the first 256 bytes - first_256 = d.raw.read(256) - - # Use filetype to guess the extension if there isn't already one + # get mimetype from the response headers if not Path(to_filename).suffix: - guessed = filetype.guess(first_256) - extension = guessed.extension if guessed else None + content_type = d.headers.get('Content-Type') + extension = mimetypes.guess_extension(content_type) if extension: - to_filename += f".{extension}" + to_filename += extension with open(to_filename, 'wb') as f: - f.write(first_256) for chunk in d.iter_content(chunk_size=8192): f.write(chunk) return to_filename diff --git a/src/auto_archiver/archivers/base_archiver/__init__.py b/src/auto_archiver/archivers/base_archiver/__init__.py new file mode 100644 index 0000000..15ee4eb --- /dev/null +++ b/src/auto_archiver/archivers/base_archiver/__init__.py @@ -0,0 +1 @@ +from .base_archiver import BaseArchiver \ No newline at end of file diff --git a/src/auto_archiver/archivers/base_archiver/base_archiver.py b/src/auto_archiver/archivers/base_archiver/base_archiver.py new file mode 100644 index 0000000..b1cbabd --- /dev/null +++ b/src/auto_archiver/archivers/base_archiver/base_archiver.py @@ -0,0 +1,296 @@ +import datetime, os, yt_dlp, pysubs2 +from typing import Type +from yt_dlp.extractor.common import InfoExtractor + +from loguru import logger + +from . import bluesky, twitter +from auto_archiver.archivers.archiver import Archiver +from ...core import Metadata, Media, ArchivingContext + + +class BaseArchiver(Archiver): + name = "youtubedl_archiver" #left as is for backwards compat + + def __init__(self, config: dict) -> None: + super().__init__(config) + self.subtitles = bool(self.subtitles) + self.comments = bool(self.comments) + self.livestreams = bool(self.livestreams) + self.live_from_start = bool(self.live_from_start) + self.end_means_success = bool(self.end_means_success) + self.allow_playlist = bool(self.allow_playlist) + self.max_downloads = self.max_downloads + + @staticmethod + def configs() -> dict: + return { + "facebook_cookie": {"default": None, "help": "optional facebook cookie to have more access to content, from browser, looks like 'cookie: datr= xxxx'"}, + "subtitles": {"default": True, "help": "download subtitles if available"}, + "comments": {"default": False, "help": "download all comments if available, may lead to large metadata"}, + "livestreams": {"default": False, "help": "if set, will download live streams, otherwise will skip them; see --max-filesize for more control"}, + "live_from_start": {"default": False, "help": "if set, will download live streams from their earliest available moment, otherwise starts now."}, + "proxy": {"default": "", "help": "http/socks (https seems to not work atm) proxy to use for the webdriver, eg https://proxy-user:password@proxy-ip:port"}, + "end_means_success": {"default": True, "help": "if True, any archived content will mean a 'success', if False this archiver will not return a 'success' stage; this is useful for cases when the yt-dlp will archive a video but ignore other types of content like images or text only pages that the subsequent archivers can retrieve."}, + 'allow_playlist': {"default": False, "help": "If True will also download playlists, set to False if the expectation is to download a single video."}, + "max_downloads": {"default": "inf", "help": "Use to limit the number of videos to download when a channel or long page is being extracted. 'inf' means no limit."}, + "cookies_from_browser": {"default": None, "help": "optional browser for ytdl to extract cookies from, can be one of: brave, chrome, chromium, edge, firefox, opera, safari, vivaldi, whale"}, + "cookie_file": {"default": None, "help": "optional cookie file to use for Youtube, see instructions here on how to export from your browser: https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp"}, + } + + def download_additional_media(self, extractor_key: str, video_data: dict, metadata: Metadata) -> Metadata: + """ + Downloads additional media like images, comments, subtitles, etc. + + Creates a 'media' object and attaches it to the metadata object. + """ + + # Just get the main thumbnail. More thumbnails are available in + # video_data['thumbnails'] should they be required + thumbnail_url = video_data.get('thumbnail') + if thumbnail_url: + try: + cover_image_path = self.download_from_url(thumbnail_url) + media = Media(cover_image_path) + metadata.add_media(media, id="cover") + except Exception as e: + logger.error(f"Error downloading cover image {thumbnail_url}: {e}") + + return metadata + + def keys_to_clean(self, extractor_key: str, video_data: dict) -> dict: + """ + Clean up the video data to make it more readable and remove unnecessary keys that ytdlp adds + """ + + base_keys = ['formats', 'thumbnail', 'display_id', 'epoch', 'requested_downloads', + 'duration_string', 'thumbnails', 'http_headers', 'webpage_url_basename', 'webpage_url_domain', + 'extractor', 'extractor_key', 'playlist', 'playlist_index', 'duration_string', 'protocol', 'requested_subtitles', + 'format_id', 'acodec', 'vcodec', 'ext', 'epoch', '_has_drm', 'filesize', 'audio_ext', 'video_ext', 'vbr', 'abr', + 'resolution', 'dynamic_range', 'aspect_ratio', 'cookies', 'format', 'quality', 'preference', 'artists', + 'channel_id', 'subtitles', 'tbr', 'url', 'original_url', 'automatic_captions', 'playable_in_embed', 'live_status', + '_format_sort_fields', 'chapters', 'requested_formats', 'format_note', + 'audio_channels', 'asr', 'fps', 'was_live', 'is_live', 'heatmap', 'age_limit', 'stretched_ratio'] + if extractor_key == 'TikTok': + # Tiktok: only has videos so a valid ytdlp `video_data` object is returned. Base keys are enough + return base_keys + [] + elif extractor_key == "Bluesky": + # bluesky API response for non video URLs is already clean, nothing to add + return base_keys + [] + + return base_keys + + def add_metadata(self, extractor_key: str, video_data: dict, url:str, result: Metadata) -> Metadata: + """ + Creates a Metadata object from the give video_data + """ + + # first add the media + result = self.download_additional_media(extractor_key, video_data, result) + + # keep both 'title' and 'fulltitle', but prefer 'title', falling back to 'fulltitle' if it doesn't exist + result.set_title(video_data.pop('title', video_data.pop('fulltitle', ""))) + + # then add the platform specific additional metadata + for key, mapping in self.video_data_metadata_mapping(extractor_key, video_data).items(): + if isinstance(mapping, str): + result.set(key, eval(f"video_data{mapping}")) + elif callable(mapping): + result.set(key, mapping(video_data)) + result.set_url(url) + + # extract comments if enabled + if self.comments: + result.set("comments", [{ + "text": c["text"], + "author": c["author"], + "timestamp": datetime.datetime.fromtimestamp(c.get("timestamp"), tz = datetime.timezone.utc) + } for c in video_data.get("comments", [])]) + + # then add the common metadata + if timestamp := video_data.pop("timestamp", None): + timestamp = datetime.datetime.fromtimestamp(timestamp, tz = datetime.timezone.utc).isoformat() + result.set_timestamp(timestamp) + if upload_date := video_data.pop("upload_date", None): + upload_date = datetime.datetime.strptime(upload_date, '%Y%m%d').replace(tzinfo=datetime.timezone.utc) + result.set("upload_date", upload_date) + + # then clean away any keys we don't want + for clean_key in self.keys_to_clean(extractor_key, video_data): + video_data.pop(clean_key, None) + + # then add the rest of the video data + for k, v in video_data.items(): + if v: + result.set(k, v) + + return result + + def video_data_metadata_mapping(self, extractor_key: str, video_data: dict) -> dict: + """ + Returns a key->value mapping to map from the yt-dlp produced 'video_data' to the Metadata object. + Can be either a string for direct mapping, or a function, or a lambda. + """ + return {} + + def suitable_extractors(self, url: str) -> list[str]: + """ + Returns a list of valid extractors for the given URL""" + for info_extractor in yt_dlp.YoutubeDL()._ies.values(): + if info_extractor.suitable(url) and info_extractor.working(): + yield info_extractor + + def suitable(self, url: str) -> bool: + """ + Checks for valid URLs out of all ytdlp extractors. + Returns False for the GenericIE, which as labelled by yt-dlp: 'Generic downloader that works on some sites' + """ + return any(self.suitable_extractors(url)) + + def create_metadata_for_post(self, info_extractor: InfoExtractor, video_data: dict, url: str) -> Metadata: + """ + Standardizes the output of the ytdlp InfoExtractor to a common format + """ + if info_extractor.ie_key() == 'Bluesky': + return bluesky.create_metadata(video_data, self, url) + if info_extractor.ie_key() == 'Twitter': + return twitter.create_metadata(video_data, self, url) + + def get_metatdata_for_post(self, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata: + """ + Calls into the ytdlp InfoExtract subclass to use the prive _extract_post method to get the post metadata. + """ + + ie_instance = info_extractor(downloader=ydl) + post_data = None + + if info_extractor.ie_key() == 'Bluesky': + # bluesky kwargs are handle, video_id + handle, video_id = ie_instance._match_valid_url(url).group('handle', 'id') + post_data = ie_instance._extract_post(handle=handle, post_id=video_id) + elif info_extractor.ie_key() == 'Twitter': + # twitter kwargs are tweet_id + twid = ie_instance._match_valid_url(url).group('id') + # TODO: if ytdlp PR https://github.com/yt-dlp/yt-dlp/pull/12098 is merged, change to _extract_post + post_data = ie_instance._extract_status(twid=twid) + + elif info_extractor.ie_key() == 'TikTok': + pass + + else: + # lame attempt at trying to get data for an unknown extractor + # TODO: test some more video platforms and see if there's any improvement to be made + try: + post_data = ie_instance._extract_post(url) + except (NotImplementedError, AttributeError) as e: + logger.debug(f"Extractor {info_extractor.ie_key()} does not support extracting post info: {e}") + return False + + return self.create_metadata_for_post(ie_instance, post_data, url) + + def get_metatdata_for_video(self, info: dict, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata: + + # this time download + ydl.params['getcomments'] = self.comments + #TODO: for playlist or long lists of videos, how to download one at a time so they can be stored before the next one is downloaded? + info = ydl.extract_info(url, ie_key=info_extractor.ie_key(), download=True) + if "entries" in info: + entries = info.get("entries", []) + if not len(entries): + logger.warning('YoutubeDLArchiver could not find any video') + return False + else: entries = [info] + + extractor_key = info['extractor_key'] + result = Metadata() + + for entry in entries: + try: + filename = ydl.prepare_filename(entry) + if not os.path.exists(filename): + filename = filename.split('.')[0] + '.mkv' + + new_media = Media(filename) + for x in ["duration", "original_url", "fulltitle", "description", "upload_date"]: + if x in entry: new_media.set(x, entry[x]) + + # read text from subtitles if enabled + if self.subtitles: + for lang, val in (info.get('requested_subtitles') or {}).items(): + try: + subs = pysubs2.load(val.get('filepath'), encoding="utf-8") + text = " ".join([line.text for line in subs]) + new_media.set(f"subtitles_{lang}", text) + except Exception as e: + logger.error(f"Error loading subtitle file {val.get('filepath')}: {e}") + result.add_media(new_media) + except Exception as e: + logger.error(f"Error processing entry {entry}: {e}") + + return self.add_metadata(extractor_key, info, url, result) + + def download_for_extractor(self, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL) -> Metadata: + """ + Tries to download the given url using the specified extractor + + It first tries to use ytdlp directly to download the video. If the post is not a video, it will then try to + use the extractor's _extract_post method to get the post metadata if possible. + """ + # when getting info without download, we also don't need the comments + ydl.params['getcomments'] = False + result = False + + try: + # don't download since it can be a live stream + info = ydl.extract_info(url, ie_key=info_extractor.ie_key(), download=False) + if info.get('is_live', False) and not self.livestreams: + logger.warning("Livestream detected, skipping due to 'livestreams' configuration setting") + return False + # it's a valid video, that the youtubdedl can download out of the box + result = self.get_metatdata_for_video(info, info_extractor, url, ydl) + + except yt_dlp.utils.DownloadError as e: + logger.debug(f'No video found, attempting to use extractor directly: {e}') + result = self.get_metatdata_for_post(info_extractor, url, ydl) + except Exception as e: + logger.debug(f'ytdlp exception which is normal for example a facebook page with images only will cause a IndexError: list index out of range. Exception is: \n {e}') + return False + + if result: + extractor_name = "yt-dlp" + if info_extractor: + extractor_name += f"_{info_extractor.ie_key()}" + + if self.end_means_success: + result.success(extractor_name) + else: + result.status = extractor_name + + return result + + def download(self, item: Metadata) -> Metadata: + url = item.get_url() + + if item.netloc in ['facebook.com', 'www.facebook.com'] and self.facebook_cookie: + logger.debug('Using Facebook cookie') + yt_dlp.utils.std_headers['cookie'] = self.facebook_cookie + + ydl_options = {'outtmpl': os.path.join(ArchivingContext.get_tmp_dir(), f'%(id)s.%(ext)s'), 'quiet': False, 'noplaylist': not self.allow_playlist , 'writesubtitles': self.subtitles, 'writeautomaticsub': self.subtitles, "live_from_start": self.live_from_start, "proxy": self.proxy, "max_downloads": self.max_downloads, "playlistend": self.max_downloads} + + if item.netloc in ['youtube.com', 'www.youtube.com']: + if self.cookies_from_browser: + logger.debug(f'Extracting cookies from browser {self.cookies_from_browser} for Youtube') + ydl_options['cookiesfrombrowser'] = (self.cookies_from_browser,) + elif self.cookie_file: + logger.debug(f'Using cookies from file {self.cookie_file}') + ydl_options['cookiefile'] = self.cookie_file + + ydl = yt_dlp.YoutubeDL(ydl_options) # allsubtitles and subtitleslangs not working as expected, so default lang is always "en" + + for info_extractor in self.suitable_extractors(url): + result = self.download_for_extractor(info_extractor, url, ydl) + if result: + return result + + + return False diff --git a/src/auto_archiver/archivers/base_archiver/bluesky.py b/src/auto_archiver/archivers/base_archiver/bluesky.py new file mode 100644 index 0000000..176808b --- /dev/null +++ b/src/auto_archiver/archivers/base_archiver/bluesky.py @@ -0,0 +1,88 @@ +import os +import mimetypes + +import requests +from loguru import logger + +from auto_archiver.core.context import ArchivingContext +from auto_archiver.archivers.archiver import Archiver +from auto_archiver.core.metadata import Metadata, Media + + +def create_metadata(post: dict, archiver: Archiver, url: str) -> Metadata: + result = Metadata() + result.set_url(url) + result.set_title(post["record"]["text"]) + result.set_timestamp(post["record"]["createdAt"]) + for k, v in _get_post_data(post).items(): + if v: result.set(k, v) + + # download if embeds present (1 video XOR >=1 images) + for media in _download_bsky_embeds(post): + result.add_media(media) + logger.debug(f"Downloaded {len(result.media)} media files") + + return result + +def _download_bsky_embeds(post: dict) -> list[Media]: + """ + Iterates over image(s) or video in a Bluesky post and downloads them + """ + media = [] + embed = post.get("record", {}).get("embed", {}) + image_medias = embed.get("images", []) + embed.get("media", {}).get("images", []) + video_medias = [e for e in [embed.get("video"), embed.get("media", {}).get("video")] if e] + + for image_media in image_medias: + image_media = _download_bsky_file_as_media(image_media["image"]["ref"]["$link"], post["author"]["did"]) + media.append(image_media) + for video_media in video_medias: + video_media = _download_bsky_file_as_media(video_media["ref"]["$link"], post["author"]["did"]) + media.append(video_media) + return media + +def _download_bsky_file_as_media(cid: str, did: str) -> Media: + """ + Uses the Bluesky API to download a file by its `cid` and `did`. + """ + # TODO: replace with self.download_from_url once that function has been cleaned-up + file_url = f"https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={cid}&did={did}" + response = requests.get(file_url, stream=True) + response.raise_for_status() + ext = mimetypes.guess_extension(response.headers["Content-Type"]) + filename = os.path.join(ArchivingContext.get_tmp_dir(), f"{cid}{ext}") + with open(filename, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + media = Media(filename=filename) + media.set("src", file_url) + return media + +def _get_post_data(post: dict) -> dict: + """ + Extracts relevant information returned by the .getPostThread api call (excluding text/created_at): author, mentions, tags, links. + """ + author = post["author"] + if "labels" in author and not author["labels"]: + del author["labels"] + if "associated" in author: + del author["associated"] + + mentions, tags, links = [], [], [] + facets = post.get("record", {}).get("facets", []) + for f in facets: + for feature in f["features"]: + if feature["$type"] == "app.bsky.richtext.facet#mention": + mentions.append(feature["did"]) + elif feature["$type"] == "app.bsky.richtext.facet#tag": + tags.append(feature["tag"]) + elif feature["$type"] == "app.bsky.richtext.facet#link": + links.append(feature["uri"]) + res = {"author": author} + if mentions: + res["mentions"] = mentions + if tags: + res["tags"] = tags + if links: + res["links"] = links + return res \ No newline at end of file diff --git a/src/auto_archiver/archivers/base_archiver/twitter.py b/src/auto_archiver/archivers/base_archiver/twitter.py new file mode 100644 index 0000000..8cc323c --- /dev/null +++ b/src/auto_archiver/archivers/base_archiver/twitter.py @@ -0,0 +1,62 @@ +import re, mimetypes, json +from datetime import datetime + +from loguru import logger +from slugify import slugify + +from auto_archiver.core.metadata import Metadata, Media +from auto_archiver.utils import UrlUtil +from auto_archiver.archivers.archiver import Archiver + + +def choose_variant(variants): + # choosing the highest quality possible + variant, width, height = None, 0, 0 + for var in variants: + if var.get("content_type", "") == "video/mp4": + width_height = re.search(r"\/(\d+)x(\d+)\/", var["url"]) + if width_height: + w, h = int(width_height[1]), int(width_height[2]) + if w > width or h > height: + width, height = w, h + variant = var + else: + variant = var if not variant else variant + return variant + +def create_metadata(tweet: dict, archiver: Archiver, url: str) -> Metadata: + result = Metadata() + try: + if not tweet.get("user") or not tweet.get("created_at"): + raise ValueError(f"Error retreiving post. Are you sure it exists?") + timestamp = datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y") + except (ValueError, KeyError) as ex: + logger.warning(f"Unable to parse tweet: {str(ex)}\nRetreived tweet data: {tweet}") + return False + + result\ + .set_title(tweet.get('full_text', ''))\ + .set_content(json.dumps(tweet, ensure_ascii=False))\ + .set_timestamp(timestamp) + if not tweet.get("entities", {}).get("media"): + logger.debug('No media found, archiving tweet text only') + result.status = "twitter-ytdl" + return result + for i, tw_media in enumerate(tweet["entities"]["media"]): + media = Media(filename="") + mimetype = "" + if tw_media["type"] == "photo": + media.set("src", UrlUtil.twitter_best_quality_url(tw_media['media_url_https'])) + mimetype = "image/jpeg" + elif tw_media["type"] == "video": + variant = choose_variant(tw_media['video_info']['variants']) + media.set("src", variant['url']) + mimetype = variant['content_type'] + elif tw_media["type"] == "animated_gif": + variant = tw_media['video_info']['variants'][0] + media.set("src", variant['url']) + mimetype = variant['content_type'] + ext = mimetypes.guess_extension(mimetype) + media.filename = archiver.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}') + result.add_media(media) + return result \ No newline at end of file diff --git a/src/auto_archiver/archivers/bluesky_archiver.py b/src/auto_archiver/archivers/bluesky_archiver.py deleted file mode 100644 index 534fba2..0000000 --- a/src/auto_archiver/archivers/bluesky_archiver.py +++ /dev/null @@ -1,119 +0,0 @@ -import os -import re, requests, mimetypes -from loguru import logger - - -from . import Archiver -from ..core import Metadata, Media, ArchivingContext - - -class BlueskyArchiver(Archiver): - """ - Uses an unauthenticated Bluesky API to archive posts including metadata, images and videos. Relies on `public.api.bsky.app/xrpc` and `bsky.social/xrpc`. Avoids ATProto to avoid auth. - - Some inspiration from https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/extractor/bluesky.py - """ - name = "bluesky_archiver" - BSKY_POST = re.compile(r"/profile/([^/]+)/post/([a-zA-Z0-9]+)") - - def __init__(self, config: dict) -> None: - super().__init__(config) - - @staticmethod - def configs() -> dict: - return {} - - def download(self, item: Metadata) -> Metadata: - url = item.get_url() - if not re.search(self.BSKY_POST, url): - return False - - logger.debug(f"Identified a Bluesky post: {url}, archiving...") - result = Metadata() - - # fetch post info and update result - post = self._get_post_from_uri(url) - logger.debug(f"Extracted post info: {post['record']['text']}") - result.set_title(post["record"]["text"]) - result.set_timestamp(post["record"]["createdAt"]) - for k, v in self._get_post_data(post).items(): - if v: result.set(k, v) - - # download if embeds present (1 video XOR >=1 images) - for media in self._download_bsky_embeds(post): - result.add_media(media) - logger.debug(f"Downloaded {len(result.media)} media files") - - return result.success("bluesky") - - def _get_post_from_uri(self, post_uri: str) -> dict: - """ - Calls a public (no auth needed) Bluesky API to get a post from its uri, uses .getPostThread as it brings author info as well (unlike .getPost). - """ - post_match = re.search(self.BSKY_POST, post_uri) - username = post_match.group(1) - post_id = post_match.group(2) - at_uri = f'at://{username}/app.bsky.feed.post/{post_id}' - r = requests.get(f"https://public.api.bsky.app/xrpc/app.bsky.feed.getPostThread?uri={at_uri}&depth=0&parent_height=0") - r.raise_for_status() - thread = r.json() - assert thread["thread"]["$type"] == "app.bsky.feed.defs#threadViewPost" - return thread["thread"]["post"] - - def _download_bsky_embeds(self, post: dict) -> list[Media]: - """ - Iterates over image(s) or video in a Bluesky post and downloads them - """ - media = [] - embed = post.get("record", {}).get("embed", {}) - image_medias = embed.get("images", []) + embed.get("media", {}).get("images", []) - video_medias = [e for e in [embed.get("video"), embed.get("media", {}).get("video")] if e] - - for image_media in image_medias: - image_media = self._download_bsky_file_as_media(image_media["image"]["ref"]["$link"], post["author"]["did"]) - media.append(image_media) - for video_media in video_medias: - video_media = self._download_bsky_file_as_media(video_media["ref"]["$link"], post["author"]["did"]) - media.append(video_media) - return media - - def _download_bsky_file_as_media(self, cid: str, did: str) -> Media: - """ - Uses the Bluesky API to download a file by its `cid` and `did`. - """ - # TODO: replace with self.download_from_url once that function has been cleaned-up - file_url = f"https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={cid}&did={did}" - response = requests.get(file_url, stream=True) - response.raise_for_status() - ext = mimetypes.guess_extension(response.headers["Content-Type"]) - filename = os.path.join(ArchivingContext.get_tmp_dir(), f"{cid}{ext}") - with open(filename, "wb") as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - media = Media(filename=filename) - media.set("src", file_url) - return media - - def _get_post_data(self, post: dict) -> dict: - """ - Extracts relevant information returned by the .getPostThread api call (excluding text/created_at): author, mentions, tags, links. - """ - author = post["author"] - if "labels" in author and not author["labels"]: del author["labels"] - if "associated" in author: del author["associated"] - - mentions, tags, links = [], [], [] - facets = post.get("record", {}).get("facets", []) - for f in facets: - for feature in f["features"]: - if feature["$type"] == "app.bsky.richtext.facet#mention": - mentions.append(feature["did"]) - elif feature["$type"] == "app.bsky.richtext.facet#tag": - tags.append(feature["tag"]) - elif feature["$type"] == "app.bsky.richtext.facet#link": - links.append(feature["uri"]) - res = {"author": author} - if mentions: res["mentions"] = mentions - if tags: res["tags"] = tags - if links: res["links"] = links - return res diff --git a/src/auto_archiver/archivers/tiktok_archiver.py b/src/auto_archiver/archivers/tiktok_archiver.py deleted file mode 100644 index fac67d1..0000000 --- a/src/auto_archiver/archivers/tiktok_archiver.py +++ /dev/null @@ -1,55 +0,0 @@ -import json, os, traceback -from loguru import logger - - -from . import Archiver -from ..core import Metadata, Media, ArchivingContext -from ..utils.misc import random_str - - -class TiktokArchiver(Archiver): - name = "tiktok_archiver" - - def __init__(self, config: dict) -> None: - super().__init__(config) - - @staticmethod - def configs() -> dict: - return {} - - def download(self, item: Metadata) -> Metadata: - url = item.get_url() - if 'tiktok.com' not in url: - return False - - result = Metadata() - try: - info = tiktok_downloader.info_post(url) - result.set_title(info.desc) - result.set_timestamp(info.create_time) - result.set_content(json.dumps({ - "cover": info.cover, - "author": info.author, - "music_title": info.author, - "caption": getattr(info, "caption", info.desc), - }, ensure_ascii=False, indent=4)) - except: - error = traceback.format_exc() - logger.warning(f'Other Tiktok error {error}') - - try: - filename = os.path.join(ArchivingContext.get_tmp_dir(), f'{random_str(8)}.mp4') - tiktok_media = tiktok_downloader.snaptik(url).get_media() - - if len(tiktok_media) <= 0: - logger.debug(f"TikTok: could not get media from {url=}") - return False - - logger.info(f'downloading video {filename=}') - tiktok_media[0].download(filename) - - result.add_media(Media(filename)) - return result.success("tiktok") - except: - error = traceback.format_exc() - logger.warning(f'Other Tiktok error {error}') diff --git a/src/auto_archiver/archivers/twitter_api_archiver.py b/src/auto_archiver/archivers/twitter_api_archiver.py index a8c4673..d1e4dee 100644 --- a/src/auto_archiver/archivers/twitter_api_archiver.py +++ b/src/auto_archiver/archivers/twitter_api_archiver.py @@ -1,17 +1,19 @@ - -import json, mimetypes +import json +import re +import mimetypes +import requests from datetime import datetime + from loguru import logger from pytwitter import Api from slugify import slugify from . import Archiver -from .twitter_archiver import TwitterArchiver from ..core import Metadata,Media - -class TwitterApiArchiver(TwitterArchiver, Archiver): +class TwitterApiArchiver(Archiver): name = "twitter_api_archiver" + link_pattern = re.compile(r"(?:twitter|x).com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)") def __init__(self, config: dict) -> None: super().__init__(config) @@ -47,6 +49,17 @@ class TwitterApiArchiver(TwitterArchiver, Archiver): def api_client(self) -> str: return self.apis[self.api_index] + def sanitize_url(self, url: str) -> str: + # expand URL if t.co and clean tracker GET params + if 'https://t.co/' in url: + try: + r = requests.get(url, timeout=30) + logger.debug(f'Expanded url {url} to {r.url}') + url = r.url + except: + logger.error(f'Failed to expand url {url}') + return url + def download(self, item: Metadata) -> Metadata: # call download retry until success or no more apis @@ -56,6 +69,16 @@ class TwitterApiArchiver(TwitterArchiver, Archiver): self.api_index = 0 return False + def get_username_tweet_id(self, url): + # detect URLs that we definitely cannot handle + matches = self.link_pattern.findall(url) + if not len(matches): return False, False + + username, tweet_id = matches[0] # only one URL supported + logger.debug(f"Found {username=} and {tweet_id=} in {url=}") + + return username, tweet_id + def download_retry(self, item: Metadata) -> Metadata: url = item.get_url() # detect URLs that we definitely cannot handle @@ -102,10 +125,13 @@ class TwitterApiArchiver(TwitterArchiver, Archiver): "lang": tweet.data.lang, "media": urls }, ensure_ascii=False, indent=4)) - return result.success("twitter") + return result.success("twitter-api") def choose_variant(self, variants): - # choosing the highest quality possible + + """ + Chooses the highest quality variable possible out of a list of variants + """ variant, bit_rate = None, -1 for var in variants: if var.content_type == "video/mp4": diff --git a/src/auto_archiver/archivers/twitter_archiver.py b/src/auto_archiver/archivers/twitter_archiver.py deleted file mode 100644 index 995910b..0000000 --- a/src/auto_archiver/archivers/twitter_archiver.py +++ /dev/null @@ -1,209 +0,0 @@ -import re, requests, mimetypes, json, math -from typing import Union -from datetime import datetime -from loguru import logger -from yt_dlp import YoutubeDL -from yt_dlp.extractor.twitter import TwitterIE -from slugify import slugify - -from . import Archiver -from ..core import Metadata, Media -from ..utils import UrlUtil - - -class TwitterArchiver(Archiver): - """ - This Twitter Archiver uses unofficial scraping methods. - """ - - name = "twitter_archiver" - link_pattern = re.compile(r"(?:twitter|x).com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)") - link_clean_pattern = re.compile(r"(.+(?:twitter|x)\.com\/.+\/\d+)(\?)*.*") - - def __init__(self, config: dict) -> None: - super().__init__(config) - - @staticmethod - def configs() -> dict: - return {} - - def sanitize_url(self, url: str) -> str: - # expand URL if t.co and clean tracker GET params - if 'https://t.co/' in url: - try: - r = requests.get(url, timeout=30) - logger.debug(f'Expanded url {url} to {r.url}') - url = r.url - except: - logger.error(f'Failed to expand url {url}') - # https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w - return self.link_clean_pattern.sub("\\1", url) - - def download(self, item: Metadata) -> Metadata: - """ - if this url is archivable will download post info and look for other posts from the same group with media. - can handle private/public channels - """ - url = item.get_url() - username, tweet_id = self.get_username_tweet_id(url) - if not username: return False - - strategies = [self.download_yt_dlp, self.download_syndication] - for strategy in strategies: - logger.debug(f"Trying {strategy.__name__} for {url=}") - try: - result = strategy(item, url, tweet_id) - if result: return result - except Exception as ex: - logger.error(f"Failed to download {url} with {strategy.__name__}: {type(ex).__name__} occurred. args: {ex.args}") - - logger.warning(f"No free strategy worked for {url}") - return False - - - def generate_token(self, tweet_id: str) -> str: - """Generates the syndication token for a tweet ID. - - Taken from https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-2211358215 - And Vercel's code: https://github.com/vercel/react-tweet/blob/main/packages/react-tweet/src/api/fetch-tweet.ts#L27 - """ - - # Perform the division and multiplication by π - result = (int(tweet_id) / 1e15) * math.pi - fractional_part = result % 1 - - # Convert to base 36 - base_36 = '' - while result >= 1: - base_36 = "0123456789abcdefghijklmnopqrstuvwxyz"[int(result % 36)] + base_36 - result = math.floor(result / 36) - - # Append fractional part in base 36 - while fractional_part > 0 and len(base_36) < 11: # Limit to avoid infinite loop - fractional_part *= 36 - digit = int(fractional_part) - base_36 += "0123456789abcdefghijklmnopqrstuvwxyz"[digit] - fractional_part -= digit - - # Remove leading zeros and dots - return base_36.replace('0', '').replace('.', '') - - - - def download_syndication(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]: - """ - Downloads tweets using Twitter's own embed API (Hack). - - Background on method can be found at: - https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-1615937362 - https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-2211358215 - next to test: https://cdn.embedly.com/widgets/media.html?&schema=twitter&url=https://twitter.com/bellingcat/status/1674700676612386816 - """ - - hack_url = "https://cdn.syndication.twimg.com/tweet-result" - params = { - 'id': tweet_id, - 'token': self.generate_token(tweet_id) - } - - r = requests.get(hack_url, params=params, timeout=10) - if r.status_code != 200 or r.json()=={}: - logger.warning(f"SyndicationHack: Failed to get tweet information from {hack_url}.") - return False - - result = Metadata() - tweet = r.json() - - if tweet.get('__typename') == 'TweetTombstone': - logger.error(f"Failed to get tweet {tweet_id}: {tweet['tombstone']['text']['text']}") - return False - - urls = [] - for p in tweet.get("photos", []): - urls.append(p["url"]) - - # 1 tweet has 1 video max - if "video" in tweet: - v = tweet["video"] - urls.append(self.choose_variant(v.get("variants", []))['url']) - - logger.debug(f"Twitter hack got media {urls=}") - - for i, u in enumerate(urls): - media = Media(filename="") - u = UrlUtil.twitter_best_quality_url(u) - media.set("src", u) - ext = "" - if (mtype := mimetypes.guess_type(UrlUtil.remove_get_parameters(u))[0]): - ext = mimetypes.guess_extension(mtype) - - media.filename = self.download_from_url(u, f'{slugify(url)}_{i}{ext}') - result.add_media(media) - - result.set_title(tweet.get("text")).set_content(json.dumps(tweet, ensure_ascii=False)).set_timestamp(datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ")) - return result.success("twitter-syndication") - - def download_yt_dlp(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]: - downloader = YoutubeDL() - tie = TwitterIE(downloader) - tweet = tie._extract_status(tweet_id) - result = Metadata() - try: - if not tweet.get("user") or not tweet.get("created_at"): - raise ValueError(f"Error retreiving post with id {tweet_id}. Are you sure it exists?") - timestamp = datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y") - except (ValueError, KeyError) as ex: - logger.warning(f"Unable to parse tweet: {str(ex)}\nRetreived tweet data: {tweet}") - return False - - result\ - .set_title(tweet.get('full_text', ''))\ - .set_content(json.dumps(tweet, ensure_ascii=False))\ - .set_timestamp(timestamp) - if not tweet.get("entities", {}).get("media"): - logger.debug('No media found, archiving tweet text only') - result.status = "twitter-ytdl" - return result - for i, tw_media in enumerate(tweet["entities"]["media"]): - media = Media(filename="") - mimetype = "" - if tw_media["type"] == "photo": - media.set("src", UrlUtil.twitter_best_quality_url(tw_media['media_url_https'])) - mimetype = "image/jpeg" - elif tw_media["type"] == "video": - variant = self.choose_variant(tw_media['video_info']['variants']) - media.set("src", variant['url']) - mimetype = variant['content_type'] - elif tw_media["type"] == "animated_gif": - variant = tw_media['video_info']['variants'][0] - media.set("src", variant['url']) - mimetype = variant['content_type'] - ext = mimetypes.guess_extension(mimetype) - media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}', item) - result.add_media(media) - return result.success("twitter-ytdl") - - def get_username_tweet_id(self, url): - # detect URLs that we definitely cannot handle - matches = self.link_pattern.findall(url) - if not len(matches): return False, False - - username, tweet_id = matches[0] # only one URL supported - logger.debug(f"Found {username=} and {tweet_id=} in {url=}") - - return username, tweet_id - - def choose_variant(self, variants): - # choosing the highest quality possible - variant, width, height = None, 0, 0 - for var in variants: - if var.get("content_type", "") == "video/mp4": - width_height = re.search(r"\/(\d+)x(\d+)\/", var["url"]) - if width_height: - w, h = int(width_height[1]), int(width_height[2]) - if w > width or h > height: - width, height = w, h - variant = var - else: - variant = var if not variant else variant - return variant diff --git a/src/auto_archiver/archivers/youtubedl_archiver.py b/src/auto_archiver/archivers/youtubedl_archiver.py index 97ad569..1bc8966 100644 --- a/src/auto_archiver/archivers/youtubedl_archiver.py +++ b/src/auto_archiver/archivers/youtubedl_archiver.py @@ -1,221 +1,2 @@ -import datetime, os, yt_dlp, pysubs2 -from loguru import logger - -from . import Archiver -from ..core import Metadata, Media, ArchivingContext - - -class YoutubeDLArchiver(Archiver): - name = "youtubedl_archiver" - - def __init__(self, config: dict) -> None: - super().__init__(config) - self.subtitles = bool(self.subtitles) - self.comments = bool(self.comments) - self.livestreams = bool(self.livestreams) - self.live_from_start = bool(self.live_from_start) - self.end_means_success = bool(self.end_means_success) - self.allow_playlist = bool(self.allow_playlist) - self.max_downloads = self.max_downloads - - @staticmethod - def configs() -> dict: - return { - "facebook_cookie": {"default": None, "help": "optional facebook cookie to have more access to content, from browser, looks like 'cookie: datr= xxxx'"}, - "subtitles": {"default": True, "help": "download subtitles if available"}, - "comments": {"default": False, "help": "download all comments if available, may lead to large metadata"}, - "livestreams": {"default": False, "help": "if set, will download live streams, otherwise will skip them; see --max-filesize for more control"}, - "live_from_start": {"default": False, "help": "if set, will download live streams from their earliest available moment, otherwise starts now."}, - "proxy": {"default": "", "help": "http/socks (https seems to not work atm) proxy to use for the webdriver, eg https://proxy-user:password@proxy-ip:port"}, - "end_means_success": {"default": True, "help": "if True, any archived content will mean a 'success', if False this archiver will not return a 'success' stage; this is useful for cases when the yt-dlp will archive a video but ignore other types of content like images or text only pages that the subsequent archivers can retrieve."}, - 'allow_playlist': {"default": False, "help": "If True will also download playlists, set to False if the expectation is to download a single video."}, - "max_downloads": {"default": "inf", "help": "Use to limit the number of videos to download when a channel or long page is being extracted. 'inf' means no limit."}, - "cookies_from_browser": {"default": None, "help": "optional browser for ytdl to extract cookies from, can be one of: brave, chrome, chromium, edge, firefox, opera, safari, vivaldi, whale"}, - "cookie_file": {"default": None, "help": "optional cookie file to use for Youtube, see instructions here on how to export from your browser: https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp"}, - } - - def download_additional_media(self, ie: str, video_data: dict, metadata: Metadata) -> Metadata: - """ - Downloads additional media like images, comments, subtitles, etc. - - Creates a 'media' object and attaches it to the metadata object. - """ - - # TODO: should we download all thumbnails, or just the chosen thumbnail? - - # Right now, just getting the single thumbnail - thumbnail_url = video_data.get('thumbnail') - if thumbnail_url: - try: - cover_image_path = self.download_from_url(thumbnail_url) - media = Media(cover_image_path) - metadata.add_media(media, id="cover") - except Exception as e: - logger.error(f"Error downloading cover image {thumbnail_url}: {e}") - - return metadata - - def keys_to_clean(self, ie: str, video_data: dict) -> dict: - """ - Clean up the video data to make it more readable and remove unnecessary keys that ytdlp adds - """ - - base_keys = ['formats', 'thumbnail', 'display_id', 'epoch', 'requested_downloads', - 'duration_string', 'thumbnails', 'http_headers', 'webpage_url_basename', 'webpage_url_domain', - 'extractor', 'extractor_key', 'playlist', 'playlist_index', 'duration_string', 'protocol', 'requested_subtitles', - 'format_id', 'acodec', 'vcodec', 'ext', 'epoch', '_has_drm', 'filesize', 'audio_ext', 'video_ext', 'vbr', 'abr', - 'resolution', 'dynamic_range', 'aspect_ratio', 'cookies', 'format', 'quality', 'preference', 'artists', - 'channel_id', 'subtitles', 'tbr', 'url', 'original_url', 'automatic_captions', 'playable_in_embed', 'live_status', - '_format_sort_fields', 'chapters', 'uploader_id', 'uploader_url', 'requested_formats', 'format_note', - 'audio_channels', 'asr', 'fps', 'was_live', 'is_live', 'heatmap', 'age_limit', 'stretched_ratio'] - if ie == 'TikTok': - return base_keys + [] - - return base_keys - - def add_metadata(self, ie: str, video_data: dict, url:str, result: Metadata) -> Metadata: - """ - Creates a Metadata object from the give video_data - """ - - # first add the media - result = self.download_additional_media(ie, video_data, result) - - # keep the full title, no need for the shortened title (?) - video_data['title'] = video_data.pop('fulltitle', video_data.get('title')) - result.set_title(video_data.pop('title', url)) - - # then add the platform specific additional metadata - for key, mapping in self.video_data_metadata_mapping(ie, video_data).items(): - if isinstance(mapping, str): - result.set(key, eval(f"video_data{mapping}")) - elif callable(mapping): - result.set(key, mapping(video_data)) - result.set_url(url) - - # extract comments if enabled - if self.comments: - result.set("comments", [{ - "text": c["text"], - "author": c["author"], - "timestamp": datetime.datetime.fromtimestamp(c.get("timestamp"), tz = datetime.timezone.utc) - } for c in video_data.get("comments", [])]) - - # then add the common metadata - if (timestamp := video_data.pop("timestamp", None)): - timestamp = datetime.datetime.fromtimestamp(timestamp, tz = datetime.timezone.utc).isoformat() - result.set_timestamp(timestamp) - if (upload_date := video_data.pop("upload_date", None)): - upload_date = datetime.datetime.strptime(upload_date, '%Y%m%d').replace(tzinfo=datetime.timezone.utc) - result.set("upload_date", upload_date) - - # then clean away any keys we don't want - for clean_key in self.keys_to_clean(ie, video_data): - video_data.pop(clean_key, None) - - # then add the rest of the video data - for k, v in video_data.items(): - if v: - result.set(k, v) - - return result - - def video_data_metadata_mapping(self, ie: str, video_data: dict) -> dict: - """ - Returns a key->value mapping to map from the yt-dlp produced 'video_data' to the Metadata object. - Can be either a string for direct mapping, or a function, or a lambda. - """ - return {} - - def suitable(self, item: Metadata) -> bool: - """ - Checks for valid URLs out of all ytdlp extractors. - Returns False for the GenericIE, which as labelled by yt-dlp: 'Generic downloader that works on some sites' - """ - url = item.get_url() - for ie_key, ie in yt_dlp.YoutubeDL()._ies.items(): - # Note: this will return True for *all* URLs due to the 'generic' extractor from ytdlp (valid for all URLs). - # should we check for the 'GenericIE' extractor and return False? - # if ie.IE_NAME == 'generic'... - leaving it in for now, since we also want the ability to download from generic sites - # perhaps one solution is to return 'False' initially, and then if no other installed archivers work, we try again using the generic one - if ie.suitable(url) and ie.working(): - return True - return False - - def download(self, item: Metadata) -> Metadata: - url = item.get_url() - - if item.netloc in ['facebook.com', 'www.facebook.com'] and self.facebook_cookie: - logger.debug('Using Facebook cookie') - yt_dlp.utils.std_headers['cookie'] = self.facebook_cookie - - ydl_options = {'outtmpl': os.path.join(ArchivingContext.get_tmp_dir(), f'%(id)s.%(ext)s'), 'quiet': False, 'noplaylist': not self.allow_playlist , 'writesubtitles': self.subtitles, 'writeautomaticsub': self.subtitles, "live_from_start": self.live_from_start, "proxy": self.proxy, "max_downloads": self.max_downloads, "playlistend": self.max_downloads} - - if item.netloc in ['youtube.com', 'www.youtube.com']: - if self.cookies_from_browser: - logger.debug(f'Extracting cookies from browser {self.cookies_from_browser} for Youtube') - ydl_options['cookiesfrombrowser'] = (self.cookies_from_browser,) - elif self.cookie_file: - logger.debug(f'Using cookies from file {self.cookie_file}') - ydl_options['cookiefile'] = self.cookie_file - - ydl = yt_dlp.YoutubeDL(ydl_options) # allsubtitles and subtitleslangs not working as expected, so default lang is always "en" - - try: - # don't download since it can be a live stream - info = ydl.extract_info(url, download=False) - if info.get('is_live', False) and not self.livestreams: - logger.warning("Livestream detected, skipping due to 'livestreams' configuration setting") - return False - except yt_dlp.utils.DownloadError as e: - logger.debug(f'No video - Youtube normal control flow: {e}') - return False - except Exception as e: - logger.debug(f'ytdlp exception which is normal for example a facebook page with images only will cause a IndexError: list index out of range. Exception is: \n {e}') - return False - - # this time download - ydl = yt_dlp.YoutubeDL({**ydl_options, "getcomments": self.comments}) - #TODO: for playlist or long lists of videos, how to download one at a time so they can be stored before the next one is downloaded? - info = ydl.extract_info(url, download=True) - if "entries" in info: - entries = info.get("entries", []) - if not len(entries): - logger.warning('YoutubeDLArchiver could not find any video') - return False - else: entries = [info] - - ie = info['extractor_key'] - result = Metadata() - - for entry in entries: - try: - filename = ydl.prepare_filename(entry) - if not os.path.exists(filename): - filename = filename.split('.')[0] + '.mkv' - - new_media = Media(filename) - for x in ["duration", "original_url", "fulltitle", "description", "upload_date"]: - if x in entry: new_media.set(x, entry[x]) - - # read text from subtitles if enabled - if self.subtitles: - for lang, val in (info.get('requested_subtitles') or {}).items(): - try: - subs = pysubs2.load(val.get('filepath'), encoding="utf-8") - text = " ".join([line.text for line in subs]) - new_media.set(f"subtitles_{lang}", text) - except Exception as e: - logger.error(f"Error loading subtitle file {val.get('filepath')}: {e}") - result.add_media(new_media) - except Exception as e: - logger.error(f"Error processing entry {entry}: {e}") - - result = self.add_metadata(ie, info, url, result) - extractor_name = "yt-dlp" - if ie: - extractor_name += f"--{ie}IE" - - if self.end_means_success: result.success(extractor_name) - else: result.status = extractor_name - return result +# temporary hack, as we implement module +from .youtubedl_archiver import * diff --git a/tests/archivers/test_base_archiver.py b/tests/archivers/test_base_archiver.py new file mode 100644 index 0000000..e58ca30 --- /dev/null +++ b/tests/archivers/test_base_archiver.py @@ -0,0 +1,141 @@ +import pytest +from pathlib import Path +import datetime + +from auto_archiver.archivers.base_archiver import BaseArchiver + +from .test_archiver_base import TestArchiverBase + +class TestBaseArchiver(TestArchiverBase): + """Tests Base Archiver + """ + archiver_class = BaseArchiver + config = { + 'subtitles': False, + 'comments': False, + 'livestreams': False, + 'live_from_start': False, + 'end_means_success': True, + 'allow_playlist': False, + 'max_downloads': "inf", + 'proxy': None, + 'cookies_from_browser': False, + 'cookie_file': None, + } + + @pytest.mark.parametrize("url, is_suitable", [ + ("https://www.youtube.com/watch?v=5qap5aO4i9A", True), + ("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en", True), + ("https://www.instagram.com/p/CU1J9JYJ9Zz/", True), + ("https://www.facebook.com/nytimes/videos/10160796550110716", True), + ("https://www.twitch.tv/videos/1167226570", True), + ("https://bellingcat.com/news/2021/10/08/ukrainian-soldiers-are-being-killed-by-landmines-in-the-donbas/", True), + ("https://google.com", True)]) + def test_suitable_urls(self, make_item, url, is_suitable): + """ + Note: expected behaviour is to return True for all URLs, as YoutubeDLArchiver should be able to handle all URLs + This behaviour may be changed in the future (e.g. if we want the youtubedl archiver to just handle URLs it has extractors for, + and then if and only if all archivers fails, does it fall back to the generic archiver) + """ + assert self.archiver.suitable(url) == is_suitable + + @pytest.mark.download + def test_download_tiktok(self, make_item): + item = make_item("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970") + result = self.archiver.download(item) + assert result.get_url() == "https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970" + + @pytest.mark.download + def test_youtube_download(self, make_item): + # url https://www.youtube.com/watch?v=5qap5aO4i9A + item = make_item("https://www.youtube.com/watch?v=J---aiyznGQ") + result = self.archiver.download(item) + assert result.get_url() == "https://www.youtube.com/watch?v=J---aiyznGQ" + assert result.get_title() == "Keyboard Cat! - THE ORIGINAL!" + assert result.get('description') == "Buy NEW Keyboard Cat Merch! https://keyboardcat.creator-spring.com\n\nxo Keyboard Cat memes make your day better!\nhttp://www.keyboardcatstore.com/\nhttps://www.facebook.com/thekeyboardcat\nhttp://www.charlieschmidt.com/" + assert len(result.media) == 2 + assert Path(result.media[0].filename).name == "J---aiyznGQ.webm" + assert Path(result.media[1].filename).name == "hqdefault.jpg" + + @pytest.mark.download + def test_bluesky_download_multiple_images(self, make_item): + item = make_item("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y") + result = self.archiver.download(item) + assert result is not False + + @pytest.mark.skip("ytdlp supports bluesky, but there's currently no way to extract info from pages without videos") + @pytest.mark.download + def test_bluesky_download_single_image(self, make_item): + item = make_item("https://bsky.app/profile/colborne.bsky.social/post/3lcxcpgt6j42l") + result = self.archiver.download(item) + assert result is not False + + @pytest.mark.download + def test_bluesky_download_no_media(self, make_item): + item = make_item("https://bsky.app/profile/bellingcat.com/post/3lfphwmcs4c2z") + result = self.archiver.download(item) + assert result is not False + + @pytest.mark.download + def test_bluesky_download_video(self, make_item): + item = make_item("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i") + result = self.archiver.download(item) + assert result is not False + + @pytest.mark.download + def test_twitter_download_nonexistend_tweet(self, make_item): + # this tweet does not exist + url = "https://x.com/Bellingcat/status/17197025860711058" + response = self.archiver.download(make_item(url)) + assert not response + + @pytest.mark.download + def test_twitter_download_malformed_tweetid(self, make_item): + # this tweet does not exist + url = "https://x.com/Bellingcat/status/1719702a586071100058" + response = self.archiver.download(make_item(url)) + assert not response + + @pytest.mark.download + def test_twitter_download_tweet_no_media(self, make_item): + + item = make_item("https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w") + post = self.archiver.download(item) + + self.assertValidResponseMetadata( + post, + "Onion rings are just vegetable donuts.", + datetime.datetime(2023, 1, 24, 16, 25, 51, tzinfo=datetime.timezone.utc), + "yt-dlp_Twitter: success" + ) + + @pytest.mark.download + def test_twitter_download_video(self, make_item): + url = "https://x.com/bellingcat/status/1871552600346415571" + post = self.archiver.download(make_item(url)) + self.assertValidResponseMetadata( + post, + "Bellingcat - This month's Bellingchat Premium is with @KolinaKoltai. She reveals how she investigated a platform allowing users to create AI-generated child sexual abuse material and explains why it's crucial to investigate the people behind these services", + datetime.datetime(2024, 12, 24, 13, 44, 46, tzinfo=datetime.timezone.utc) + ) + + @pytest.mark.xfail(reason="Currently failing, sensitive content requires logged in users/cookies - not yet implemented") + @pytest.mark.download + @pytest.mark.parametrize("url, title, timestamp, image_hash", [ + ("https://x.com/SozinhoRamalho/status/1876710769913450647", "ignore tweet, testing sensitivity warning nudity", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"), + ("https://x.com/SozinhoRamalho/status/1876710875475681357", "ignore tweet, testing sensitivity warning violence", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"), + ("https://x.com/SozinhoRamalho/status/1876711053813227618", "ignore tweet, testing sensitivity warning sensitive", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"), + ("https://x.com/SozinhoRamalho/status/1876711141314801937", "ignore tweet, testing sensitivity warning nudity, violence, sensitivity", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"), + ]) + def test_twitter_download_sensitive_media(self, url, title, timestamp, image_hash, make_item): + + """Download tweets with sensitive media""" + + post = self.archiver.download(make_item(url)) + self.assertValidResponseMetadata( + post, + title, + timestamp + ) + assert len(post.media) == 1 + assert post.media[0].hash == image_hash \ No newline at end of file diff --git a/tests/archivers/test_bluesky_archiver.py b/tests/archivers/test_bluesky_archiver.py deleted file mode 100644 index c9e1c81..0000000 --- a/tests/archivers/test_bluesky_archiver.py +++ /dev/null @@ -1,73 +0,0 @@ -import pytest - -from auto_archiver.archivers.bluesky_archiver import BlueskyArchiver -from .test_archiver_base import TestArchiverBase - -class TestBlueskyArchiver(TestArchiverBase): - """Tests Bluesky Archiver - - Note that these tests will download API responses from the bluesky API, so they may be slow. - This is an intended feature, as we want to test to ensure the bluesky API format hasn't changed, - and also test the archiver's ability to download media. - """ - - archiver_class = BlueskyArchiver - config = {} - - @pytest.mark.download - def test_download_media_with_images(self): - # url https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y - post = self.archiver._get_post_from_uri("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y") - - # just make sure bsky haven't changed their format, images should be under "record/embed/media/images" - # there should be 2 images - assert "record" in post - assert "embed" in post["record"] - assert "media" in post["record"]["embed"] - assert "images" in post["record"]["embed"]["media"] - assert len(post["record"]["embed"]["media"]["images"]) == 2 - - # try downloading the media files - media = self.archiver._download_bsky_embeds(post) - assert len(media) == 2 - - # check the IDs - assert "bafkreiflrkfihcvwlhka5tb2opw2qog6gfvywsdzdlibveys2acozh75tq" in media[0].get('src') - assert "bafkreibsprmwchf7r6xcstqkdvvuj3ijw7efciw7l3y4crxr4cmynseo7u" in media[1].get('src') - - @pytest.mark.download - def test_download_post_with_single_image(self): - # url https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l - post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l") - - # just make sure bsky haven't changed their format, images should be under "record/embed/images" - # there should be 1 image - assert "record" in post - assert "embed" in post["record"] - assert "images" in post["record"]["embed"] - assert len(post["record"]["embed"]["images"]) == 1 - - media = self.archiver._download_bsky_embeds(post) - assert len(media) == 1 - - # check the ID - assert "bafkreihljdtomy4yulx4nfxuqdatlgvdg45vxdmjzzhclsd4ludk7zfma4" in media[0].get('src') - - - @pytest.mark.download - def test_download_post_with_video(self): - # url https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i - post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i") - - # just make sure bsky haven't changed their format, video should be under "record/embed/video" - assert "record" in post - assert "embed" in post["record"] - assert "video" in post["record"]["embed"] - - media = self.archiver._download_bsky_embeds(post) - assert len(media) == 1 - - # check the ID - assert "bafkreiaiskn2nt5cxjnxbgcqqcrnurvkr2ni3unekn6zvhvgr5nrqg6u2q" in media[0].get('src') - - diff --git a/tests/archivers/test_tiktok_archiver.py b/tests/archivers/test_tiktok_archiver.py deleted file mode 100644 index 8905c75..0000000 --- a/tests/archivers/test_tiktok_archiver.py +++ /dev/null @@ -1,17 +0,0 @@ -import pytest - -from .test_archiver_base import TestArchiverBase -from auto_archiver.archivers.tiktok_archiver import TiktokArchiver - -class TestBlueskyArchiver(TestArchiverBase): - - archiver_class = TiktokArchiver - config = {} - - @pytest.mark.xfail(reason="Tiktok API is not working") - @pytest.mark.download - def test_download_video(self, make_item): - # cat video - url = "https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en" - item = self.archiver.download(make_item(url)) - assert item.success \ No newline at end of file diff --git a/tests/archivers/test_twitter_archiver.py b/tests/archivers/test_twitter_api_archiver.py similarity index 55% rename from tests/archivers/test_twitter_archiver.py rename to tests/archivers/test_twitter_api_archiver.py index 17af2f2..fae1780 100644 --- a/tests/archivers/test_twitter_archiver.py +++ b/tests/archivers/test_twitter_api_archiver.py @@ -1,19 +1,31 @@ +import os import datetime + import pytest -from auto_archiver.archivers.twitter_archiver import TwitterArchiver - +from pytwitter.models.media import MediaVariant from .test_archiver_base import TestArchiverBase +from auto_archiver.archivers import TwitterApiArchiver -class TestTwitterArchiver(TestArchiverBase): - archiver_class = TwitterArchiver - config = {} +@pytest.mark.incremental +class TestTwitterApiArchiver(TestArchiverBase): + + archiver_class = TwitterApiArchiver + config = { + "bearer_tokens": [], + "bearer_token": os.environ.get("TWITTER_BEARER_TOKEN"), + "consumer_key": os.environ.get("TWITTER_CONSUMER_KEY"), + "consumer_secret": os.environ.get("TWITTER_CONSUMER_SECRET"), + "access_token": os.environ.get("TWITTER_ACCESS_TOKEN"), + "access_secret": os.environ.get("TWITTER_ACCESS_SECRET"), + } + @pytest.mark.parametrize("url, expected", [ ("https://t.co/yl3oOJatFp", "https://www.bellingcat.com/category/resources/"), # t.co URL ("https://x.com/bellingcat/status/1874097816571961839", "https://x.com/bellingcat/status/1874097816571961839"), # x.com urls unchanged ("https://twitter.com/bellingcat/status/1874097816571961839", "https://twitter.com/bellingcat/status/1874097816571961839"), # twitter urls unchanged - ("https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w", "https://twitter.com/bellingcat/status/1874097816571961839"), # strip tracking params + ("https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w", "https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"), # don't strip params from twitter urls (changed Jan 2025) ("https://www.bellingcat.com/category/resources/", "https://www.bellingcat.com/category/resources/"), # non-twitter/x urls unchanged ("https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w", "https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"), # shouldn't strip params from non-twitter/x URLs ]) @@ -25,64 +37,25 @@ class TestTwitterArchiver(TestArchiverBase): ("https://x.com/bellingcat/status/1874097816571961839", "bellingcat", "1874097816571961839"), ("https://www.bellingcat.com/category/resources/", False, False) ]) - def test_get_username_tweet_id_from_url(self, url, exptected_username, exptected_tweetid): username, tweet_id = self.archiver.get_username_tweet_id(url) assert exptected_username == username assert exptected_tweetid == tweet_id - + def test_choose_variants(self): # taken from the response for url https://x.com/bellingcat/status/1871552600346415571 - variant_list = [{'content_type': 'application/x-mpegURL', 'url': 'https://video.twimg.com/ext_tw_video/1871551993677852672/pu/pl/ovWo7ux-bKROwYIC.m3u8?tag=12&v=e1b'}, - {'bitrate': 256000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/480x270/OqZIrKV0LFswMvxS.mp4?tag=12'}, - {'bitrate': 832000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/640x360/uiDZDSmZ8MZn9hsi.mp4?tag=12'}, - {'bitrate': 2176000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/1280x720/6Y340Esh568WZnRZ.mp4?tag=12'} + variant_list = [MediaVariant(content_type='application/x-mpegURL', url='https://video.twimg.com/ext_tw_video/1871551993677852672/pu/pl/ovWo7ux-bKROwYIC.m3u8?tag=12&v=e1b'), + MediaVariant(bit_rate=256000, content_type='video/mp4', url='https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/480x270/OqZIrKV0LFswMvxS.mp4?tag=12'), + MediaVariant(bit_rate=832000, content_type='video/mp4', url='https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/640x360/uiDZDSmZ8MZn9hsi.mp4?tag=12'), + MediaVariant(bit_rate=2176000, content_type='video/mp4', url='https://video.twimg.com/ext_tw_video/1871551993677852672/pu/vid/avc1/1280x720/6Y340Esh568WZnRZ.mp4?tag=12') ] chosen_variant = self.archiver.choose_variant(variant_list) assert chosen_variant == variant_list[3] - @pytest.mark.parametrize("tweet_id, expected_token", [ - ("1874097816571961839", "4jjngwkifa"), - ("1674700676612386816", "42586mwa3uv"), - ("1877747914073620506", "4jv4aahw36n"), - ("1876710769913450647", "4jruzjz5lux"), - ("1346554693649113090", "39ibqxei7mo") - ]) - def test_reverse_engineer_token(self, tweet_id, expected_token): - # see Vercel's implementation here: https://github.com/vercel/react-tweet/blob/main/packages/react-tweet/src/api/fetch-tweet.ts#L27C1-L31C2 - # and the discussion here: https://github.com/JustAnotherArchivist/snscrape/issues/996#issuecomment-2211358215 - - generated_token = self.archiver.generate_token(tweet_id) - assert expected_token == generated_token @pytest.mark.download - def test_youtube_dlp_archiver(self, make_item): - - url = "https://x.com/bellingcat/status/1874097816571961839" - post = self.archiver.download_yt_dlp(make_item(url), url, "1874097816571961839") - assert post - self.assertValidResponseMetadata( - post, - "As 2024 comes to a close, here’s some examples of what Bellingcat investigated per month in our 10th year! 🧵", - datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), - "twitter-ytdl" - ) - - @pytest.mark.download - def test_syndication_archiver(self, make_item): - - url = "https://x.com/bellingcat/status/1874097816571961839" - post = self.archiver.download_syndication(make_item(url), url, "1874097816571961839") - assert post - self.assertValidResponseMetadata( - post, - "As 2024 comes to a close, here’s some examples of what Bellingcat investigated per month in our 10th year! 🧵", - datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc) - ) - - @pytest.mark.download - def test_download_nonexistend_tweet(self, make_item): + def test_download_nonexistent_tweet(self, make_item): # this tweet does not exist url = "https://x.com/Bellingcat/status/17197025860711058" response = self.archiver.download(make_item(url)) @@ -105,9 +78,9 @@ class TestTwitterArchiver(TestArchiverBase): post, "Onion rings are just vegetable donuts.", datetime.datetime(2023, 1, 24, 16, 25, 51, tzinfo=datetime.timezone.utc), - "twitter-ytdl" + "twitter-api: success" ) - + @pytest.mark.download def test_download_video(self, make_item): url = "https://x.com/bellingcat/status/1871552600346415571" @@ -118,14 +91,13 @@ class TestTwitterArchiver(TestArchiverBase): datetime.datetime(2024, 12, 24, 13, 44, 46, tzinfo=datetime.timezone.utc) ) - @pytest.mark.xfail(reason="Currently failing, sensitive content requires logged in users/cookies - not yet implemented") - @pytest.mark.download @pytest.mark.parametrize("url, title, timestamp, image_hash", [ - ("https://x.com/SozinhoRamalho/status/1876710769913450647", "ignore tweet, testing sensitivity warning nudity", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"), - ("https://x.com/SozinhoRamalho/status/1876710875475681357", "ignore tweet, testing sensitivity warning violence", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"), - ("https://x.com/SozinhoRamalho/status/1876711053813227618", "ignore tweet, testing sensitivity warning sensitive", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"), - ("https://x.com/SozinhoRamalho/status/1876711141314801937", "ignore tweet, testing sensitivity warning nudity, violence, sensitivity", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"), + ("https://x.com/SozinhoRamalho/status/1876710769913450647", "ignore tweet, testing sensitivity warning nudity https://t.co/t3u0hQsSB1", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"), + ("https://x.com/SozinhoRamalho/status/1876710875475681357", "ignore tweet, testing sensitivity warning violence https://t.co/syYDSkpjZD", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"), + ("https://x.com/SozinhoRamalho/status/1876711053813227618", "ignore tweet, testing sensitivity warning sensitive https://t.co/XE7cRdjzYq", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"), + ("https://x.com/SozinhoRamalho/status/1876711141314801937", "ignore tweet, testing sensitivity warning nudity, violence, sensitivity https://t.co/YxCFbbhYE3", datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc), "image_hash"), ]) + @pytest.mark.download def test_download_sensitive_media(self, url, title, timestamp, image_hash, make_item): """Download tweets with sensitive media""" diff --git a/tests/archivers/test_youtubedl_archiver.py b/tests/archivers/test_youtubedl_archiver.py deleted file mode 100644 index bb5a8d2..0000000 --- a/tests/archivers/test_youtubedl_archiver.py +++ /dev/null @@ -1,71 +0,0 @@ -import pytest -from pathlib import Path - -from auto_archiver.archivers.youtubedl_archiver import YoutubeDLArchiver - -from .test_archiver_base import TestArchiverBase - -class TestYoutubeDLArchiver(TestArchiverBase): - """Tests YoutubeDL Archiver - """ - archiver_class = YoutubeDLArchiver - config = { - 'subtitles': False, - 'comments': False, - 'livestreams': False, - 'live_from_start': False, - 'end_means_success': True, - 'allow_playlist': False, - 'max_downloads': "inf", - 'proxy': None, - 'cookies_from_browser': False, - 'cookie_file': None, - } - - @pytest.mark.parametrize("url, is_suitable", [ - ("https://www.youtube.com/watch?v=5qap5aO4i9A", True), - ("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en", True), - ("https://www.instagram.com/p/CU1J9JYJ9Zz/", True), - ("https://www.facebook.com/nytimes/videos/10160796550110716", True), - ("https://www.twitch.tv/videos/1167226570", True), - ("https://bellingcat.com/news/2021/10/08/ukrainian-soldiers-are-being-killed-by-landmines-in-the-donbas/", True), - ("https://google.com", True)]) - def test_suitable_urls(self, make_item, url, is_suitable): - """ - Note: expected behaviour is to return True for all URLs, as YoutubeDLArchiver should be able to handle all URLs - This behaviour may be changed in the future (e.g. if we want the youtubedl archiver to just handle URLs it has extractors for, - and then if and only if all archivers fails, does it fall back to the generic archiver) - """ - assert self.archiver.suitable(make_item(url)) == is_suitable - - @pytest.mark.download - def test_download_tiktok(self, make_item): - item = make_item("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970") - result = self.archiver.download(item) - assert result.get_url() == "https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970" - - @pytest.mark.download - def test_download_youtube(self, make_item): - # url https://www.youtube.com/watch?v=5qap5aO4i9A - item = make_item("https://www.youtube.com/watch?v=J---aiyznGQ") - result = self.archiver.download(item) - assert result.get_url() == "https://www.youtube.com/watch?v=J---aiyznGQ" - assert result.get_title() == "Keyboard Cat! - THE ORIGINAL!" - assert result.get('description') == "Buy NEW Keyboard Cat Merch! https://keyboardcat.creator-spring.com\n\nxo Keyboard Cat memes make your day better!\nhttp://www.keyboardcatstore.com/\nhttps://www.facebook.com/thekeyboardcat\nhttp://www.charlieschmidt.com/" - assert len(result.media) == 2 - assert Path(result.media[0].filename).name == "J---aiyznGQ.webm" - assert Path(result.media[1].filename).name == "hqdefault.jpg" - - @pytest.mark.skip("ytdlp supports bluesky, but there's currently no way to extract info from pages without videos") - @pytest.mark.download - def test_download_bluesky_with_images(self, make_item): - item = make_item("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y") - result = self.archiver.download(item) - assert result is not False - - @pytest.mark.skip("ytdlp supports twitter, but there's currently no way to extract info from pages without videos") - @pytest.mark.download - def test_download_twitter_textonly(self, make_item): - item = make_item("https://x.com/bellingcat/status/1874097816571961839") - result = self.archiver.download(item) - assert result is not False \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 87d4ac0..553b573 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,19 @@ +""" +pytest conftest file, for shared fixtures and configuration +""" + +from typing import Dict, Tuple + import pytest from auto_archiver.core.metadata import Metadata +# Test names inserted into this list will be run last. This is useful for expensive/costly tests +# that you only want to run if everything else succeeds (e.g. API calls). The order here is important +# what comes first will be run first (at the end of all other tests not mentioned) +# format is the name of the module (python file) without the .py extension +TESTS_TO_RUN_LAST = ['test_twitter_api_archiver'] + + @pytest.fixture def make_item(): def _make_item(url: str, **kwargs) -> Metadata: @@ -9,4 +22,61 @@ def make_item(): item.set(key, value) return item - return _make_item \ No newline at end of file + return _make_item + + + +def pytest_collection_modifyitems(items): + module_mapping = {item: item.module.__name__.split(".")[-1] for item in items} + + sorted_items = items.copy() + # Iteratively move tests of each module to the end of the test queue + for module in TESTS_TO_RUN_LAST: + if module in module_mapping.values(): + for item in sorted_items: + if module_mapping[item] == module: + sorted_items.remove(item) + sorted_items.append(item) + + items[:] = sorted_items + + + +# Incremental testing - fail tests in a class if any previous test fails +# taken from https://docs.pytest.org/en/latest/example/simple.html#incremental-testing-test-steps + +# store history of failures per test class name and per index in parametrize (if parametrize used) +_test_failed_incremental: Dict[str, Dict[Tuple[int, ...], str]] = {} + +def pytest_runtest_makereport(item, call): + if "incremental" in item.keywords: + # incremental marker is used + if call.excinfo is not None: + # the test has failed + # retrieve the class name of the test + cls_name = str(item.cls) + # retrieve the index of the test (if parametrize is used in combination with incremental) + parametrize_index = ( + tuple(item.callspec.indices.values()) + if hasattr(item, "callspec") + else () + ) + # retrieve the name of the test function + test_name = item.originalname or item.name + # store in _test_failed_incremental the original name of the failed test + _test_failed_incremental.setdefault(cls_name, {}).setdefault( + parametrize_index, test_name + ) + + +def pytest_runtest_setup(item): + if "incremental" in item.keywords: + # retrieve the class name of the test + cls_name = str(item.cls) + # check if a previous test has failed for this class + if cls_name in _test_failed_incremental: + # retrieve the name of the first test function to fail for this class name and index + test_name = _test_failed_incremental[cls_name].get((), None) + # if name found, test has failed for the combination of class name & test name + if test_name is not None: + pytest.xfail(f"previous test failed ({test_name})") \ No newline at end of file