refactor youtubedlp archiver to work for all valid websites

1. Extract more metadata
2. Better extract thumbnail
3. Setup framework for specific sites to provide more granular metadata processing
This commit is contained in:
Patrick Robertson
2025-01-15 17:39:47 +01:00
parent c3dd19f309
commit 4f2b9baa73
4 changed files with 182 additions and 21 deletions

View File

@@ -34,6 +34,14 @@ class Archiver(Step):
def sanitize_url(self, url: str) -> str:
# used to clean unnecessary URL parameters OR unfurl redirect links
return url
def suitable(self, url: str) -> bool:
"""
Returns True if this archiver can handle the given URL
Should be overridden by subclasses
"""
return True
def _guess_file_type(self, path: str) -> str:
"""

View File

@@ -33,7 +33,115 @@ class YoutubeDLArchiver(Archiver):
"cookies_from_browser": {"default": None, "help": "optional browser for ytdl to extract cookies from, can be one of: brave, chrome, chromium, edge, firefox, opera, safari, vivaldi, whale"},
"cookie_file": {"default": None, "help": "optional cookie file to use for Youtube, see instructions here on how to export from your browser: https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp"},
}
def download_additional_media(self, ie: str, video_data: dict, metadata: Metadata) -> Metadata:
"""
Downloads additional media like images, comments, subtitles, etc.
Creates a 'media' object and attaches it to the metadata object.
"""
# TODO: should we download all thumbnails, or just the chosen thumbnail?
# Right now, just getting the single thumbnail
thumbnail_url = video_data.get('thumbnail')
if thumbnail_url:
try:
cover_image_path = self.download_from_url(thumbnail_url)
media = Media(cover_image_path)
metadata.add_media(media, id="cover")
except Exception as e:
logger.error(f"Error downloading cover image {thumbnail_url}: {e}")
return metadata
def keys_to_clean(self, ie: str, video_data: dict) -> dict:
"""
Clean up the video data to make it more readable and remove unnecessary keys that ytdlp adds
"""
base_keys = ['formats', 'thumbnail', 'display_id', 'epoch', 'requested_downloads',
'duration_string', 'thumbnails', 'http_headers', 'webpage_url_basename', 'webpage_url_domain',
'extractor', 'extractor_key', 'playlist', 'playlist_index', 'duration_string', 'protocol', 'requested_subtitles',
'format_id', 'acodec', 'vcodec', 'ext', 'epoch', '_has_drm', 'filesize', 'audio_ext', 'video_ext', 'vbr', 'abr',
'resolution', 'dynamic_range', 'aspect_ratio', 'cookies', 'format', 'quality', 'preference', 'artists',
'channel_id', 'subtitles', 'tbr', 'url', 'original_url', 'automatic_captions', 'playable_in_embed', 'live_status',
'_format_sort_fields', 'chapters', 'uploader_id', 'uploader_url', 'requested_formats', 'format_note',
'audio_channels', 'asr', 'fps', 'was_live', 'is_live', 'heatmap', 'age_limit', 'stretched_ratio']
if ie == 'TikTok':
return base_keys + []
return base_keys
def add_metadata(self, ie: str, video_data: dict, url:str, result: Metadata) -> Metadata:
"""
Creates a Metadata object from the give video_data
"""
# first add the media
result = self.download_additional_media(ie, video_data, result)
# keep the full title, no need for the shortened title (?)
video_data['title'] = video_data.pop('fulltitle', video_data.get('title'))
result.set_title(video_data.pop('title', url))
# then add the platform specific additional metadata
for key, mapping in self.video_data_metadata_mapping(ie, video_data).items():
if isinstance(mapping, str):
result.set(key, eval(f"video_data{mapping}"))
elif callable(mapping):
result.set(key, mapping(video_data))
result.set_url(url)
# extract comments if enabled
if self.comments:
result.set("comments", [{
"text": c["text"],
"author": c["author"],
"timestamp": datetime.datetime.fromtimestamp(c.get("timestamp"), tz = datetime.timezone.utc)
} for c in video_data.get("comments", [])])
# then add the common metadata
if (timestamp := video_data.pop("timestamp", None)):
timestamp = datetime.datetime.fromtimestamp(timestamp, tz = datetime.timezone.utc).isoformat()
result.set_timestamp(timestamp)
if (upload_date := video_data.pop("upload_date", None)):
upload_date = datetime.datetime.strptime(upload_date, '%Y%m%d').replace(tzinfo=datetime.timezone.utc)
result.set("upload_date", upload_date)
# then clean away any keys we don't want
for clean_key in self.keys_to_clean(ie, video_data):
video_data.pop(clean_key, None)
# then add the rest of the video data
for k, v in video_data.items():
if v:
result.set(k, v)
return result
def video_data_metadata_mapping(self, ie: str, video_data: dict) -> dict:
"""
Returns a key->value mapping to map from the yt-dlp produced 'video_data' to the Metadata object.
Can be either a string for direct mapping, or a function, or a lambda.
"""
return {}
def suitable(self, item: Metadata) -> bool:
"""
Checks for valid URLs out of all ytdlp extractors.
Returns False for the GenericIE, which as labelled by yt-dlp: 'Generic downloader that works on some sites'
"""
url = item.get_url()
for ie_key, ie in yt_dlp.YoutubeDL()._ies.items():
# Note: this will return True for *all* URLs due to the 'generic' extractor from ytdlp (valid for all URLs).
# should we check for the 'GenericIE' extractor and return False?
# if ie.IE_NAME == 'generic'... - leaving it in for now, since we also want the ability to download from generic sites
# perhaps one solution is to return 'False' initially, and then if no other installed archivers work, we try again using the generic one
if ie.suitable(url) and ie.working():
return True
return False
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
@@ -70,7 +178,6 @@ class YoutubeDLArchiver(Archiver):
ydl = yt_dlp.YoutubeDL({**ydl_options, "getcomments": self.comments})
#TODO: for playlist or long lists of videos, how to download one at a time so they can be stored before the next one is downloaded?
info = ydl.extract_info(url, download=True)
if "entries" in info:
entries = info.get("entries", [])
if not len(entries):
@@ -78,9 +185,9 @@ class YoutubeDLArchiver(Archiver):
return False
else: entries = [info]
ie = info['extractor_key']
result = Metadata()
result.set_title(info.get("title"))
if "description" in info: result.set_content(info["description"])
for entry in entries:
try:
filename = ydl.prepare_filename(entry)
@@ -104,22 +211,11 @@ class YoutubeDLArchiver(Archiver):
except Exception as e:
logger.error(f"Error processing entry {entry}: {e}")
# extract comments if enabled
if self.comments:
result.set("comments", [{
"text": c["text"],
"author": c["author"],
"timestamp": datetime.datetime.fromtimestamp(c.get("timestamp"), tz = datetime.timezone.utc)
} for c in info.get("comments", [])])
result = self.add_metadata(ie, info, url, result)
extractor_name = "yt-dlp"
if ie:
extractor_name += f"--{ie}IE"
if (timestamp := info.get("timestamp")):
#TODO: fix deprecated timestamp,
timestamp = datetime.datetime.fromtimestamp(timestamp, tz = datetime.timezone.utc).isoformat()
result.set_timestamp(timestamp)
if (upload_date := info.get("upload_date")):
upload_date = datetime.datetime.strptime(upload_date, '%Y%m%d').replace(tzinfo=datetime.timezone.utc)
result.set("upload_date", upload_date)
if self.end_means_success: result.success("yt-dlp")
else: result.status = "yt-dlp"
if self.end_means_success: result.success(extractor_name)
else: result.status = extractor_name
return result

View File

@@ -13,7 +13,7 @@ class TestArchiverBase(object):
def setup_archiver(self):
assert self.archiver_class is not None, "self.archiver_class must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
self.archiver = self.archiver_class(self.config)
self.archiver = self.archiver_class({self.archiver_class.name: self.config})
def assertValidResponseMetadata(self, test_response: Metadata, title: str, timestamp: str, status: str = ""):
assert test_response is not False

View File

@@ -0,0 +1,57 @@
import pytest
from pathlib import Path
from auto_archiver.archivers.youtubedl_archiver import YoutubeDLArchiver
from .test_archiver_base import TestArchiverBase
class TestYoutubeDLArchiver(TestArchiverBase):
"""Tests YoutubeDL Archiver
"""
archiver_class = YoutubeDLArchiver
config = {
'subtitles': False,
'comments': False,
'livestreams': False,
'live_from_start': False,
'end_means_success': True,
'allow_playlist': False,
'max_downloads': "inf",
'proxy': None,
'cookies_from_browser': False,
'cookie_file': None,
}
@pytest.mark.parametrize("url, is_suitable", [
("https://www.youtube.com/watch?v=5qap5aO4i9A", True),
("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en", True),
("https://www.instagram.com/p/CU1J9JYJ9Zz/", True),
("https://www.facebook.com/nytimes/videos/10160796550110716", True),
("https://www.twitch.tv/videos/1167226570", True),
("https://bellingcat.com/news/2021/10/08/ukrainian-soldiers-are-being-killed-by-landmines-in-the-donbas/", True),
("https://google.com", True)])
def test_suitable_urls(self, make_item, url, is_suitable):
"""
Note: expected behaviour is to return True for all URLs, as YoutubeDLArchiver should be able to handle all URLs
This behaviour may be changed in the future (e.g. if we want the youtubedl archiver to just handle URLs it has extractors for,
and then if and only if all archivers fails, does it fall back to the generic archiver)
"""
assert self.archiver.suitable(make_item(url)) == is_suitable
@pytest.mark.download
def test_download_tiktok(self, make_item):
item = make_item("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970")
result = self.archiver.download(item)
assert result.get_url() == "https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970"
@pytest.mark.download
def test_download_youtube(self, make_item):
# url https://www.youtube.com/watch?v=5qap5aO4i9A
item = make_item("https://www.youtube.com/watch?v=J---aiyznGQ")
result = self.archiver.download(item)
assert result.get_url() == "https://www.youtube.com/watch?v=J---aiyznGQ"
assert result.get_title() == "Keyboard Cat! - THE ORIGINAL!"
assert result.get('description') == "Buy NEW Keyboard Cat Merch! https://keyboardcat.creator-spring.com\n\nxo Keyboard Cat memes make your day better!\nhttp://www.keyboardcatstore.com/\nhttps://www.facebook.com/thekeyboardcat\nhttp://www.charlieschmidt.com/"
assert len(result.media) == 2
assert Path(result.media[0].filename).name == "J---aiyznGQ.webm"
assert Path(result.media[1].filename).name == "hqdefault.jpg"