mirror of
https://github.com/bellingcat/auto-archiver.git
synced 2026-06-09 11:58:28 +03:00
Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d972ee9b8 | ||
|
|
b64826dc16 | ||
|
|
23e74803ee | ||
|
|
d03ecdb037 | ||
|
|
a5ebbf4726 | ||
|
|
89e387030d | ||
|
|
8ec053ed1b | ||
|
|
3ea02c115e | ||
|
|
3d4056ef70 | ||
|
|
51041bf91e | ||
|
|
0765640bff | ||
|
|
06b1f4c0ca | ||
|
|
59b910ec30 | ||
|
|
7e360240bf | ||
|
|
f8e846d59a |
@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[project]
|
||||
name = "auto-archiver"
|
||||
version = "0.13.6"
|
||||
version = "0.13.7"
|
||||
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
|
||||
|
||||
requires-python = ">=3.10,<3.13"
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from typing import Type
|
||||
from yt_dlp.extractor.common import InfoExtractor
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
from auto_archiver.core.extractor import Extractor
|
||||
@@ -24,6 +25,8 @@ class GenericDropin:
|
||||
|
||||
"""
|
||||
|
||||
extractor: Type[Extractor] = None
|
||||
|
||||
def extract_post(self, url: str, ie_instance: InfoExtractor):
|
||||
"""
|
||||
This method should return the post data from the url.
|
||||
@@ -55,3 +58,19 @@ class GenericDropin:
|
||||
This method should download any additional media from the post.
|
||||
"""
|
||||
return metadata
|
||||
|
||||
def suitable(self, url, info_extractor: InfoExtractor):
|
||||
"""
|
||||
A method to allow dropins to override their InfoExtractor's 'suitable' method.
|
||||
Dropins should override this method and return True if the url is suitable for the extractor
|
||||
(based on being able to parse other URLs). See the `suitable_extractors` method in the
|
||||
`GenericExtractor` class for how this is implemented.
|
||||
|
||||
The default behaviour of this method is to return the result of the InfoExtractor's 'suitable' method.
|
||||
|
||||
### Example: An example of where this is useful is for the FacebookIE extractor in yt-dlp. By default,
|
||||
it's 'suitable' method only returns True for video URLs. However, we can override this method in the
|
||||
Facebook dropin to return True for all Facebook URLs (photo/post types). This way, the Facebook dropin
|
||||
can be used for all Facebook URLs.
|
||||
"""
|
||||
return info_extractor.suitable(url)
|
||||
|
||||
@@ -1,17 +1,154 @@
|
||||
import re
|
||||
from .dropin import GenericDropin
|
||||
from auto_archiver.core.metadata import Metadata
|
||||
from yt_dlp.extractor.facebook import FacebookIE
|
||||
|
||||
# TODO: Remove if / when https://github.com/yt-dlp/yt-dlp/pull/12275 is merged
|
||||
from yt_dlp.utils import (
|
||||
clean_html,
|
||||
get_element_by_id,
|
||||
traverse_obj,
|
||||
get_first,
|
||||
merge_dicts,
|
||||
int_or_none,
|
||||
parse_count,
|
||||
)
|
||||
|
||||
|
||||
def _extract_metadata(self, webpage, video_id):
|
||||
post_data = [
|
||||
self._parse_json(j, video_id, fatal=False)
|
||||
for j in re.findall(r"data-sjs>({.*?ScheduledServerJS.*?})</script>", webpage)
|
||||
]
|
||||
post = (
|
||||
traverse_obj(
|
||||
post_data,
|
||||
(..., "require", ..., ..., ..., "__bbox", "require", ..., ..., ..., "__bbox", "result", "data"),
|
||||
expected_type=dict,
|
||||
)
|
||||
or []
|
||||
)
|
||||
media = traverse_obj(
|
||||
post,
|
||||
(
|
||||
...,
|
||||
"attachments",
|
||||
...,
|
||||
lambda k, v: (k == "media" and str(v["id"]) == video_id and v["__typename"] == "Video"),
|
||||
),
|
||||
expected_type=dict,
|
||||
)
|
||||
title = get_first(media, ("title", "text"))
|
||||
description = get_first(media, ("creation_story", "comet_sections", "message", "story", "message", "text"))
|
||||
page_title = title or self._html_search_regex(
|
||||
(
|
||||
r'<h2\s+[^>]*class="uiHeaderTitle"[^>]*>(?P<content>[^<]*)</h2>',
|
||||
r'(?s)<span class="fbPhotosPhotoCaption".*?id="fbPhotoPageCaption"><span class="hasCaption">(?P<content>.*?)</span>',
|
||||
self._meta_regex("og:title"),
|
||||
self._meta_regex("twitter:title"),
|
||||
r"<title>(?P<content>.+?)</title>",
|
||||
),
|
||||
webpage,
|
||||
"title",
|
||||
default=None,
|
||||
group="content",
|
||||
)
|
||||
description = description or self._html_search_meta(
|
||||
["description", "og:description", "twitter:description"], webpage, "description", default=None
|
||||
)
|
||||
uploader_data = (
|
||||
get_first(media, ("owner", {dict}))
|
||||
or get_first(
|
||||
post, ("video", "creation_story", "attachments", ..., "media", lambda k, v: k == "owner" and v["name"])
|
||||
)
|
||||
or get_first(post, (..., "video", lambda k, v: k == "owner" and v["name"]))
|
||||
or get_first(post, ("node", "actors", ..., {dict}))
|
||||
or get_first(post, ("event", "event_creator", {dict}))
|
||||
or get_first(post, ("video", "creation_story", "short_form_video_context", "video_owner", {dict}))
|
||||
or {}
|
||||
)
|
||||
uploader = uploader_data.get("name") or (
|
||||
clean_html(get_element_by_id("fbPhotoPageAuthorName", webpage))
|
||||
or self._search_regex(
|
||||
(r'ownerName\s*:\s*"([^"]+)"', *self._og_regexes("title")), webpage, "uploader", fatal=False
|
||||
)
|
||||
)
|
||||
timestamp = int_or_none(self._search_regex(r'<abbr[^>]+data-utime=["\'](\d+)', webpage, "timestamp", default=None))
|
||||
thumbnail = self._html_search_meta(["og:image", "twitter:image"], webpage, "thumbnail", default=None)
|
||||
# some webpages contain unretrievable thumbnail urls
|
||||
# like https://lookaside.fbsbx.com/lookaside/crawler/media/?media_id=10155168902769113&get_thumbnail=1
|
||||
# in https://www.facebook.com/yaroslav.korpan/videos/1417995061575415/
|
||||
if thumbnail and not re.search(r"\.(?:jpg|png)", thumbnail):
|
||||
thumbnail = None
|
||||
info_dict = {
|
||||
"description": description,
|
||||
"uploader": uploader,
|
||||
"uploader_id": uploader_data.get("id"),
|
||||
"timestamp": timestamp,
|
||||
"thumbnail": thumbnail,
|
||||
"view_count": parse_count(
|
||||
self._search_regex(
|
||||
(r'\bviewCount\s*:\s*["\']([\d,.]+)', r'video_view_count["\']\s*:\s*(\d+)'),
|
||||
webpage,
|
||||
"view count",
|
||||
default=None,
|
||||
)
|
||||
),
|
||||
"concurrent_view_count": get_first(
|
||||
post, (("video", (..., ..., "attachments", ..., "media")), "liveViewerCount", {int_or_none})
|
||||
),
|
||||
**traverse_obj(
|
||||
post,
|
||||
(
|
||||
lambda _, v: video_id in v["url"],
|
||||
"feedback",
|
||||
{
|
||||
"like_count": ("likers", "count", {int}),
|
||||
"comment_count": ("total_comment_count", {int}),
|
||||
"repost_count": ("share_count_reduced", {parse_count}),
|
||||
},
|
||||
),
|
||||
get_all=False,
|
||||
),
|
||||
}
|
||||
|
||||
info_json_ld = self._search_json_ld(webpage, video_id, default={})
|
||||
info_json_ld["title"] = (
|
||||
re.sub(r"\s*\|\s*Facebook$", "", title or info_json_ld.get("title") or page_title or "")
|
||||
or (description or "").replace("\n", " ")
|
||||
or f"Facebook video #{video_id}"
|
||||
)
|
||||
return merge_dicts(info_json_ld, info_dict)
|
||||
|
||||
|
||||
class Facebook(GenericDropin):
|
||||
def extract_post(self, url: str, ie_instance):
|
||||
video_id = ie_instance._match_valid_url(url).group("id")
|
||||
ie_instance._download_webpage(url.replace("://m.facebook.com/", "://www.facebook.com/"), video_id)
|
||||
webpage = ie_instance._download_webpage(url, ie_instance._match_valid_url(url).group("id"))
|
||||
def extract_post(self, url: str, ie_instance: FacebookIE):
|
||||
post_id_regex = r"(?P<id>pfbid[A-Za-z0-9]+|\d+|t\.(\d+\/\d+))"
|
||||
post_id = re.search(post_id_regex, url).group("id")
|
||||
webpage = ie_instance._download_webpage(url.replace("://m.facebook.com/", "://www.facebook.com/"), post_id)
|
||||
|
||||
# TODO: fix once https://github.com/yt-dlp/yt-dlp/pull/12275 is merged
|
||||
post_data = ie_instance._extract_metadata(webpage)
|
||||
# TODO: For long posts, this _extract_metadata only seems to return the first 100 or so characters, followed by ...
|
||||
|
||||
# TODO: If/when https://github.com/yt-dlp/yt-dlp/pull/12275 is merged, uncomment next line and delete the one after
|
||||
# post_data = ie_instance._extract_metadata(webpage, post_id)
|
||||
post_data = _extract_metadata(ie_instance, webpage, post_id)
|
||||
return post_data
|
||||
|
||||
def create_metadata(self, post: dict, ie_instance, archiver, url):
|
||||
metadata = archiver.create_metadata(url)
|
||||
metadata.set_title(post.get("title")).set_content(post.get("description")).set_post_data(post)
|
||||
return metadata
|
||||
def create_metadata(self, post: dict, ie_instance: FacebookIE, archiver, url):
|
||||
result = Metadata()
|
||||
result.set_content(post.get("description", ""))
|
||||
result.set_title(post.get("title", ""))
|
||||
result.set("author", post.get("uploader", ""))
|
||||
result.set_url(url)
|
||||
return result
|
||||
|
||||
def suitable(self, url, info_extractor: FacebookIE):
|
||||
regex = r"(?:https?://(?:[\w-]+\.)?(?:facebook\.com||facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd\.onion)/)"
|
||||
return re.match(regex, url)
|
||||
|
||||
def skip_ytdlp_download(self, url: str, is_instance: FacebookIE):
|
||||
"""
|
||||
Skip using the ytdlp download method for Facebook *photo* posts, they have a URL with an id of t.XXXXX/XXXXX
|
||||
"""
|
||||
if re.search(r"/t.\d+/\d+", url):
|
||||
return True
|
||||
|
||||
@@ -13,6 +13,8 @@ from loguru import logger
|
||||
|
||||
from auto_archiver.core.extractor import Extractor
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.utils import get_datetime_from_str
|
||||
from .dropin import GenericDropin
|
||||
|
||||
|
||||
class SkipYtdlp(Exception):
|
||||
@@ -67,7 +69,14 @@ class GenericExtractor(Extractor):
|
||||
"""
|
||||
Returns a list of valid extractors for the given URL"""
|
||||
for info_extractor in yt_dlp.YoutubeDL()._ies.values():
|
||||
if info_extractor.suitable(url) and info_extractor.working():
|
||||
if not info_extractor.working():
|
||||
continue
|
||||
|
||||
# check if there's a dropin and see if that declares whether it's suitable
|
||||
dropin: GenericDropin = self.dropin_for_name(info_extractor.ie_key())
|
||||
if dropin and dropin.suitable(url, info_extractor):
|
||||
yield info_extractor
|
||||
elif info_extractor.suitable(url):
|
||||
yield info_extractor
|
||||
|
||||
def suitable(self, url: str) -> bool:
|
||||
@@ -188,9 +197,13 @@ class GenericExtractor(Extractor):
|
||||
result = self.download_additional_media(video_data, info_extractor, result)
|
||||
|
||||
# keep both 'title' and 'fulltitle', but prefer 'title', falling back to 'fulltitle' if it doesn't exist
|
||||
result.set_title(video_data.pop("title", video_data.pop("fulltitle", "")))
|
||||
result.set_url(url)
|
||||
if "description" in video_data:
|
||||
if not result.get_title():
|
||||
result.set_title(video_data.pop("title", video_data.pop("fulltitle", "")))
|
||||
|
||||
if not result.get("url"):
|
||||
result.set_url(url)
|
||||
|
||||
if "description" in video_data and not result.get_content():
|
||||
result.set_content(video_data["description"])
|
||||
# extract comments if enabled
|
||||
if self.comments:
|
||||
@@ -207,11 +220,11 @@ class GenericExtractor(Extractor):
|
||||
)
|
||||
|
||||
# then add the common metadata
|
||||
if timestamp := video_data.pop("timestamp", None):
|
||||
if timestamp := video_data.pop("timestamp", None) and not result.get("timestamp"):
|
||||
timestamp = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc).isoformat()
|
||||
result.set_timestamp(timestamp)
|
||||
if upload_date := video_data.pop("upload_date", None):
|
||||
upload_date = datetime.datetime.strptime(upload_date, "%Y%m%d").replace(tzinfo=datetime.timezone.utc)
|
||||
if upload_date := video_data.pop("upload_date", None) and not result.get("upload_date"):
|
||||
upload_date = get_datetime_from_str(upload_date, "%Y%m%d").replace(tzinfo=datetime.timezone.utc)
|
||||
result.set("upload_date", upload_date)
|
||||
|
||||
# then clean away any keys we don't want
|
||||
@@ -240,7 +253,8 @@ class GenericExtractor(Extractor):
|
||||
return False
|
||||
|
||||
post_data = dropin.extract_post(url, ie_instance)
|
||||
return dropin.create_metadata(post_data, ie_instance, self, url)
|
||||
result = dropin.create_metadata(post_data, ie_instance, self, url)
|
||||
return self.add_metadata(post_data, info_extractor, url, result)
|
||||
|
||||
def get_metadata_for_video(
|
||||
self, data: dict, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL
|
||||
@@ -285,7 +299,7 @@ class GenericExtractor(Extractor):
|
||||
|
||||
return self.add_metadata(data, info_extractor, url, result)
|
||||
|
||||
def dropin_for_name(self, dropin_name: str, additional_paths=[], package=__package__) -> Type[InfoExtractor]:
|
||||
def dropin_for_name(self, dropin_name: str, additional_paths=[], package=__package__) -> GenericDropin:
|
||||
dropin_name = dropin_name.lower()
|
||||
|
||||
if dropin_name == "generic":
|
||||
@@ -296,6 +310,7 @@ class GenericExtractor(Extractor):
|
||||
|
||||
def _load_dropin(dropin):
|
||||
dropin_class = getattr(dropin, dropin_class_name)()
|
||||
dropin.extractor = self
|
||||
return self._dropins.setdefault(dropin_name, dropin_class)
|
||||
|
||||
try:
|
||||
@@ -340,7 +355,7 @@ class GenericExtractor(Extractor):
|
||||
dropin_submodule = self.dropin_for_name(info_extractor.ie_key())
|
||||
|
||||
try:
|
||||
if dropin_submodule and dropin_submodule.skip_ytdlp_download(info_extractor, url):
|
||||
if dropin_submodule and dropin_submodule.skip_ytdlp_download(url, info_extractor):
|
||||
logger.debug(f"Skipping using ytdlp to download files for {info_extractor.ie_key()}")
|
||||
raise SkipYtdlp()
|
||||
|
||||
@@ -359,7 +374,7 @@ class GenericExtractor(Extractor):
|
||||
|
||||
if not isinstance(e, SkipYtdlp):
|
||||
logger.debug(
|
||||
f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use extractor to get post data instead'
|
||||
f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use dropin to get post data instead'
|
||||
)
|
||||
|
||||
try:
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import requests
|
||||
from loguru import logger
|
||||
|
||||
from yt_dlp.extractor.tiktok import TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE
|
||||
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from datetime import datetime, timezone
|
||||
from .dropin import GenericDropin
|
||||
@@ -13,6 +16,11 @@ class Tiktok(GenericDropin):
|
||||
|
||||
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
|
||||
|
||||
def suitable(self, url, info_extractor) -> bool:
|
||||
"""This dropin (which uses Tikvm) is suitable for *all* Tiktok type URLs - videos, lives, VMs, and users.
|
||||
Return the 'suitable' method from the TikTokIE class."""
|
||||
return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE))
|
||||
|
||||
def extract_post(self, url: str, ie_instance):
|
||||
logger.debug(f"Using Tikwm API to attempt to download tiktok video from {url=}")
|
||||
|
||||
@@ -38,6 +46,9 @@ class Tiktok(GenericDropin):
|
||||
api_data["video_url"] = video_url
|
||||
return api_data
|
||||
|
||||
def keys_to_clean(self, video_data: dict, info_extractor):
|
||||
return ["video_url", "title", "create_time", "author", "cover", "origin_cover", "ai_dynamic_cover", "duration"]
|
||||
|
||||
def create_metadata(self, post: dict, ie_instance, archiver, url):
|
||||
# prepare result, start by downloading video
|
||||
result = Metadata()
|
||||
@@ -54,17 +65,17 @@ class Tiktok(GenericDropin):
|
||||
logger.error(f"failed to download video from {video_url}")
|
||||
return False
|
||||
video_media = Media(video_downloaded)
|
||||
if duration := post.pop("duration", None):
|
||||
if duration := post.get("duration", None):
|
||||
video_media.set("duration", duration)
|
||||
result.add_media(video_media)
|
||||
|
||||
# add remaining metadata
|
||||
result.set_title(post.pop("title", ""))
|
||||
result.set_title(post.get("title", ""))
|
||||
|
||||
if created_at := post.pop("create_time", None):
|
||||
if created_at := post.get("create_time", None):
|
||||
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
|
||||
|
||||
if author := post.pop("author", None):
|
||||
if author := post.get("author", None):
|
||||
result.set("author", author)
|
||||
|
||||
result.set("api_data", post)
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
import re
|
||||
import mimetypes
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
from loguru import logger
|
||||
from slugify import slugify
|
||||
|
||||
from auto_archiver.core.metadata import Metadata, Media
|
||||
from auto_archiver.utils import url as UrlUtil
|
||||
from auto_archiver.utils import url as UrlUtil, get_datetime_from_str
|
||||
from auto_archiver.core.extractor import Extractor
|
||||
|
||||
from .dropin import GenericDropin, InfoExtractor
|
||||
@@ -38,7 +37,7 @@ class Twitter(GenericDropin):
|
||||
try:
|
||||
if not tweet.get("user") or not tweet.get("created_at"):
|
||||
raise ValueError("Error retreiving post. Are you sure it exists?")
|
||||
timestamp = datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y")
|
||||
timestamp = get_datetime_from_str(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y")
|
||||
except (ValueError, KeyError) as ex:
|
||||
logger.warning(f"Unable to parse tweet: {str(ex)}\nRetreived tweet data: {tweet}")
|
||||
return False
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
"save_absolute": {
|
||||
"default": False,
|
||||
"type": "bool",
|
||||
"help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (WARN: leaks the file structure)",
|
||||
"help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (Warning: saving an absolute path will show your computer's file structure)",
|
||||
},
|
||||
},
|
||||
"description": """
|
||||
|
||||
@@ -2,7 +2,6 @@ import json
|
||||
import re
|
||||
import mimetypes
|
||||
import requests
|
||||
from datetime import datetime
|
||||
|
||||
from loguru import logger
|
||||
from pytwitter import Api
|
||||
@@ -10,6 +9,7 @@ from slugify import slugify
|
||||
|
||||
from auto_archiver.core import Extractor
|
||||
from auto_archiver.core import Metadata, Media
|
||||
from auto_archiver.utils import get_datetime_from_str
|
||||
|
||||
|
||||
class TwitterApiExtractor(Extractor):
|
||||
@@ -91,7 +91,7 @@ class TwitterApiExtractor(Extractor):
|
||||
|
||||
result = Metadata()
|
||||
result.set_title(tweet.data.text)
|
||||
result.set_timestamp(datetime.strptime(tweet.data.created_at, "%Y-%m-%dT%H:%M:%S.%fZ"))
|
||||
result.set_timestamp(get_datetime_from_str(tweet.data.created_at, "%Y-%m-%dT%H:%M:%S.%fZ"))
|
||||
|
||||
urls = []
|
||||
if tweet.includes:
|
||||
|
||||
@@ -118,7 +118,7 @@ def pytest_runtest_setup(item):
|
||||
pytest.xfail(f"previous test failed ({test_name})")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
@pytest.fixture
|
||||
def unpickle():
|
||||
"""
|
||||
Returns a helper function that unpickles a file
|
||||
|
||||
@@ -40,6 +40,22 @@ class TestGenericExtractor(TestExtractorBase):
|
||||
path = os.path.join(dirname(dirname(__file__)), "data/")
|
||||
assert self.extractor.dropin_for_name("dropin", additional_paths=[path])
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, suitable_extractors",
|
||||
[
|
||||
("https://www.youtube.com/watch?v=5qap5aO4i9A", ["youtube"]),
|
||||
("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en", ["tiktok"]),
|
||||
("https://www.instagram.com/p/CU1J9JYJ9Zz/", ["instagram"]),
|
||||
("https://www.facebook.com/nytimes/videos/10160796550110716", ["facebook"]),
|
||||
("https://www.facebook.com/BylineFest/photos/t.100057299682816/927879487315946/", ["facebook"]),
|
||||
],
|
||||
)
|
||||
def test_suitable_extractors(self, url, suitable_extractors):
|
||||
suitable_extractors = suitable_extractors + ["generic"] # the generic is valid for all
|
||||
extractors = list(self.extractor.suitable_extractors(url))
|
||||
assert len(extractors) == len(suitable_extractors)
|
||||
assert [e.ie_key().lower() for e in extractors] == suitable_extractors
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, is_suitable",
|
||||
[
|
||||
@@ -55,7 +71,7 @@ class TestGenericExtractor(TestExtractorBase):
|
||||
("https://google.com", True),
|
||||
],
|
||||
)
|
||||
def test_suitable_urls(self, make_item, url, is_suitable):
|
||||
def test_suitable_urls(self, url, is_suitable):
|
||||
"""
|
||||
Note: expected behaviour is to return True for all URLs, as YoutubeDLArchiver should be able to handle all URLs
|
||||
This behaviour may be changed in the future (e.g. if we want the youtubedl archiver to just handle URLs it has extractors for,
|
||||
@@ -245,3 +261,32 @@ class TestGenericExtractor(TestExtractorBase):
|
||||
self.assertValidResponseMetadata(post, title, timestamp)
|
||||
assert len(post.media) == 1
|
||||
assert post.media[0].hash == image_hash
|
||||
|
||||
@pytest.mark.download
|
||||
def test_download_facebook_video(self, make_item):
|
||||
post = self.extractor.download(make_item("https://www.facebook.com/bellingcat/videos/588371253839133"))
|
||||
assert len(post.media) == 2
|
||||
assert post.media[0].filename.endswith("588371253839133.mp4")
|
||||
assert post.media[0].mimetype == "video/mp4"
|
||||
|
||||
assert post.media[1].filename.endswith(".jpg")
|
||||
assert post.media[1].mimetype == "image/jpeg"
|
||||
|
||||
assert "Bellingchat Premium is with Kolina Koltai" in post.get_title()
|
||||
|
||||
@pytest.mark.download
|
||||
def test_download_facebook_image(self, make_item):
|
||||
post = self.extractor.download(
|
||||
make_item("https://www.facebook.com/BylineFest/photos/t.100057299682816/927879487315946/")
|
||||
)
|
||||
|
||||
assert len(post.media) == 1
|
||||
assert post.media[0].filename.endswith(".png")
|
||||
assert "Byline Festival - BylineFest Partner" == post.get_title()
|
||||
|
||||
@pytest.mark.download
|
||||
def test_download_facebook_text_only(self, make_item):
|
||||
url = "https://www.facebook.com/bellingcat/posts/pfbid02rzpwZxAZ8bLkAX8NvHv4DWAidFaqAUfJMbo9vWkpwxL7uMUWzWMiizXLWRSjwihVl"
|
||||
post = self.extractor.download(make_item(url))
|
||||
assert "Bellingcat researcher Kolina Koltai delves deeper into Clothoff" in post.get("content")
|
||||
assert post.get_title() == "Bellingcat"
|
||||
|
||||
@@ -4,6 +4,8 @@ import pytest
|
||||
import yt_dlp
|
||||
|
||||
from auto_archiver.modules.generic_extractor.generic_extractor import GenericExtractor
|
||||
from auto_archiver.modules.generic_extractor.tiktok import Tiktok, TikTokIE
|
||||
|
||||
from .test_extractor_base import TestExtractorBase
|
||||
|
||||
|
||||
@@ -17,11 +19,16 @@ def skip_ytdlp_own_methods(mocker):
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
@pytest.fixture
|
||||
def mock_get(mocker):
|
||||
return mocker.patch("auto_archiver.modules.generic_extractor.tiktok.requests.get")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tiktok_dropin() -> Tiktok:
|
||||
return Tiktok()
|
||||
|
||||
|
||||
class TestTiktokTikwmExtractor(TestExtractorBase):
|
||||
"""
|
||||
Test suite for TestTiktokTikwmExtractor.
|
||||
@@ -34,6 +41,25 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
|
||||
|
||||
VALID_EXAMPLE_URL = "https://www.tiktok.com/@example/video/1234"
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"url, is_suitable",
|
||||
[
|
||||
("https://bellingcat.com", False),
|
||||
("https://youtube.com", False),
|
||||
("https://tiktok.co/", False),
|
||||
("https://tiktok.com/", False),
|
||||
("https://www.tiktok.com/", False),
|
||||
("https://api.cool.tiktok.com/", False),
|
||||
(VALID_EXAMPLE_URL, True),
|
||||
("https://www.tiktok.com/@bbcnews/video/7478038212070411542", True),
|
||||
("https://www.tiktok.com/@ggs68taiwan.official/video/7441821351142362375", True),
|
||||
("https://www.tiktok.com/t/ZP8YQ8e5j/", True),
|
||||
("https://vt.tiktok.com/ZSMTJeqRP/", True),
|
||||
],
|
||||
)
|
||||
def test_is_suitable(self, url, is_suitable, tiktok_dropin):
|
||||
assert tiktok_dropin.suitable(url, TikTokIE()) == is_suitable
|
||||
|
||||
def test_invalid_json_responses(self, mock_get, make_item, caplog):
|
||||
mock_get.return_value.status_code = 200
|
||||
mock_get.return_value.json.side_effect = ValueError
|
||||
|
||||
Reference in New Issue
Block a user