Compare commits

...

15 Commits

Author SHA1 Message Date
Patrick Robertson
7d972ee9b8 Merge pull request #258 from bellingcat/version_bump
Version bump
2025-03-18 12:18:09 +00:00
Patrick Robertson
b64826dc16 Merge pull request #257 from bellingcat/standardise_parsedates
Standardise parse dates to get_datetime_from_str
2025-03-18 12:17:51 +00:00
Patrick Robertson
23e74803ee Version bump 2025-03-18 10:52:23 +00:00
Patrick Robertson
d03ecdb037 Standardise parse dates to get_datetime_from_str 2025-03-18 10:22:58 +00:00
Patrick Robertson
a5ebbf4726 Merge pull request #256 from bellingcat/dropin_cleanup
Refactor the dropin 'is_suitable' method + fix for tikwm
2025-03-18 10:08:24 +00:00
Patrick Robertson
89e387030d Tests for suitable URLs for tikwm 2025-03-18 10:04:03 +00:00
Patrick Robertson
8ec053ed1b Refactor the dropin 'is_suitable' method + fix tikwm implementation
Makes it easier to maintain/understand.
2025-03-18 09:14:14 +00:00
Patrick Robertson
3ea02c115e Merge pull request #254 from bellingcat/rtd_docs
Add info on building RTD versions + automated building of tagged versions
2025-03-17 13:01:20 +00:00
Patrick Robertson
3d4056ef70 Merge pull request #223 from bellingcat/facebook_extractor
Create facebook dropin - working for images + text.
2025-03-17 12:45:05 +00:00
Patrick Robertson
51041bf91e Merge pull request #253 from bellingcat/settings_page
Update material version, minify code
2025-03-17 11:59:37 +00:00
Patrick Robertson
0765640bff Fix up tiktok dropin for slightly modified generic_extractor format 2025-03-17 10:31:22 +00:00
Patrick Robertson
06b1f4c0ca Fix lingering merge conflict issues 2025-03-17 10:12:55 +00:00
Patrick Robertson
59b910ec30 Merge main 2025-03-17 10:05:11 +00:00
Patrick Robertson
7e360240bf Copy ytdlp code into AA project - seems like ytdlp won't be merged anytime soon 2025-03-17 09:57:05 +00:00
Patrick Robertson
f8e846d59a Create facebook dropin - working for images + text. CAVEAT: only gets the first ~100 chars of the post at the moment 2025-02-25 11:44:35 +00:00
11 changed files with 287 additions and 35 deletions

View File

@@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[project]
name = "auto-archiver"
version = "0.13.6"
version = "0.13.7"
description = "Automatically archive links to videos, images, and social media content from Google Sheets (and more)."
requires-python = ">=3.10,<3.13"

View File

@@ -1,3 +1,4 @@
from typing import Type
from yt_dlp.extractor.common import InfoExtractor
from auto_archiver.core.metadata import Metadata
from auto_archiver.core.extractor import Extractor
@@ -24,6 +25,8 @@ class GenericDropin:
"""
extractor: Type[Extractor] = None
def extract_post(self, url: str, ie_instance: InfoExtractor):
"""
This method should return the post data from the url.
@@ -55,3 +58,19 @@ class GenericDropin:
This method should download any additional media from the post.
"""
return metadata
def suitable(self, url, info_extractor: InfoExtractor):
"""
A method to allow dropins to override their InfoExtractor's 'suitable' method.
Dropins should override this method and return True if the url is suitable for the extractor
(based on being able to parse other URLs). See the `suitable_extractors` method in the
`GenericExtractor` class for how this is implemented.
The default behaviour of this method is to return the result of the InfoExtractor's 'suitable' method.
### Example: An example of where this is useful is for the FacebookIE extractor in yt-dlp. By default,
it's 'suitable' method only returns True for video URLs. However, we can override this method in the
Facebook dropin to return True for all Facebook URLs (photo/post types). This way, the Facebook dropin
can be used for all Facebook URLs.
"""
return info_extractor.suitable(url)

View File

@@ -1,17 +1,154 @@
import re
from .dropin import GenericDropin
from auto_archiver.core.metadata import Metadata
from yt_dlp.extractor.facebook import FacebookIE
# TODO: Remove if / when https://github.com/yt-dlp/yt-dlp/pull/12275 is merged
from yt_dlp.utils import (
clean_html,
get_element_by_id,
traverse_obj,
get_first,
merge_dicts,
int_or_none,
parse_count,
)
def _extract_metadata(self, webpage, video_id):
post_data = [
self._parse_json(j, video_id, fatal=False)
for j in re.findall(r"data-sjs>({.*?ScheduledServerJS.*?})</script>", webpage)
]
post = (
traverse_obj(
post_data,
(..., "require", ..., ..., ..., "__bbox", "require", ..., ..., ..., "__bbox", "result", "data"),
expected_type=dict,
)
or []
)
media = traverse_obj(
post,
(
...,
"attachments",
...,
lambda k, v: (k == "media" and str(v["id"]) == video_id and v["__typename"] == "Video"),
),
expected_type=dict,
)
title = get_first(media, ("title", "text"))
description = get_first(media, ("creation_story", "comet_sections", "message", "story", "message", "text"))
page_title = title or self._html_search_regex(
(
r'<h2\s+[^>]*class="uiHeaderTitle"[^>]*>(?P<content>[^<]*)</h2>',
r'(?s)<span class="fbPhotosPhotoCaption".*?id="fbPhotoPageCaption"><span class="hasCaption">(?P<content>.*?)</span>',
self._meta_regex("og:title"),
self._meta_regex("twitter:title"),
r"<title>(?P<content>.+?)</title>",
),
webpage,
"title",
default=None,
group="content",
)
description = description or self._html_search_meta(
["description", "og:description", "twitter:description"], webpage, "description", default=None
)
uploader_data = (
get_first(media, ("owner", {dict}))
or get_first(
post, ("video", "creation_story", "attachments", ..., "media", lambda k, v: k == "owner" and v["name"])
)
or get_first(post, (..., "video", lambda k, v: k == "owner" and v["name"]))
or get_first(post, ("node", "actors", ..., {dict}))
or get_first(post, ("event", "event_creator", {dict}))
or get_first(post, ("video", "creation_story", "short_form_video_context", "video_owner", {dict}))
or {}
)
uploader = uploader_data.get("name") or (
clean_html(get_element_by_id("fbPhotoPageAuthorName", webpage))
or self._search_regex(
(r'ownerName\s*:\s*"([^"]+)"', *self._og_regexes("title")), webpage, "uploader", fatal=False
)
)
timestamp = int_or_none(self._search_regex(r'<abbr[^>]+data-utime=["\'](\d+)', webpage, "timestamp", default=None))
thumbnail = self._html_search_meta(["og:image", "twitter:image"], webpage, "thumbnail", default=None)
# some webpages contain unretrievable thumbnail urls
# like https://lookaside.fbsbx.com/lookaside/crawler/media/?media_id=10155168902769113&get_thumbnail=1
# in https://www.facebook.com/yaroslav.korpan/videos/1417995061575415/
if thumbnail and not re.search(r"\.(?:jpg|png)", thumbnail):
thumbnail = None
info_dict = {
"description": description,
"uploader": uploader,
"uploader_id": uploader_data.get("id"),
"timestamp": timestamp,
"thumbnail": thumbnail,
"view_count": parse_count(
self._search_regex(
(r'\bviewCount\s*:\s*["\']([\d,.]+)', r'video_view_count["\']\s*:\s*(\d+)'),
webpage,
"view count",
default=None,
)
),
"concurrent_view_count": get_first(
post, (("video", (..., ..., "attachments", ..., "media")), "liveViewerCount", {int_or_none})
),
**traverse_obj(
post,
(
lambda _, v: video_id in v["url"],
"feedback",
{
"like_count": ("likers", "count", {int}),
"comment_count": ("total_comment_count", {int}),
"repost_count": ("share_count_reduced", {parse_count}),
},
),
get_all=False,
),
}
info_json_ld = self._search_json_ld(webpage, video_id, default={})
info_json_ld["title"] = (
re.sub(r"\s*\|\s*Facebook$", "", title or info_json_ld.get("title") or page_title or "")
or (description or "").replace("\n", " ")
or f"Facebook video #{video_id}"
)
return merge_dicts(info_json_ld, info_dict)
class Facebook(GenericDropin):
def extract_post(self, url: str, ie_instance):
video_id = ie_instance._match_valid_url(url).group("id")
ie_instance._download_webpage(url.replace("://m.facebook.com/", "://www.facebook.com/"), video_id)
webpage = ie_instance._download_webpage(url, ie_instance._match_valid_url(url).group("id"))
def extract_post(self, url: str, ie_instance: FacebookIE):
post_id_regex = r"(?P<id>pfbid[A-Za-z0-9]+|\d+|t\.(\d+\/\d+))"
post_id = re.search(post_id_regex, url).group("id")
webpage = ie_instance._download_webpage(url.replace("://m.facebook.com/", "://www.facebook.com/"), post_id)
# TODO: fix once https://github.com/yt-dlp/yt-dlp/pull/12275 is merged
post_data = ie_instance._extract_metadata(webpage)
# TODO: For long posts, this _extract_metadata only seems to return the first 100 or so characters, followed by ...
# TODO: If/when https://github.com/yt-dlp/yt-dlp/pull/12275 is merged, uncomment next line and delete the one after
# post_data = ie_instance._extract_metadata(webpage, post_id)
post_data = _extract_metadata(ie_instance, webpage, post_id)
return post_data
def create_metadata(self, post: dict, ie_instance, archiver, url):
metadata = archiver.create_metadata(url)
metadata.set_title(post.get("title")).set_content(post.get("description")).set_post_data(post)
return metadata
def create_metadata(self, post: dict, ie_instance: FacebookIE, archiver, url):
result = Metadata()
result.set_content(post.get("description", ""))
result.set_title(post.get("title", ""))
result.set("author", post.get("uploader", ""))
result.set_url(url)
return result
def suitable(self, url, info_extractor: FacebookIE):
regex = r"(?:https?://(?:[\w-]+\.)?(?:facebook\.com||facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd\.onion)/)"
return re.match(regex, url)
def skip_ytdlp_download(self, url: str, is_instance: FacebookIE):
"""
Skip using the ytdlp download method for Facebook *photo* posts, they have a URL with an id of t.XXXXX/XXXXX
"""
if re.search(r"/t.\d+/\d+", url):
return True

View File

@@ -13,6 +13,8 @@ from loguru import logger
from auto_archiver.core.extractor import Extractor
from auto_archiver.core import Metadata, Media
from auto_archiver.utils import get_datetime_from_str
from .dropin import GenericDropin
class SkipYtdlp(Exception):
@@ -67,7 +69,14 @@ class GenericExtractor(Extractor):
"""
Returns a list of valid extractors for the given URL"""
for info_extractor in yt_dlp.YoutubeDL()._ies.values():
if info_extractor.suitable(url) and info_extractor.working():
if not info_extractor.working():
continue
# check if there's a dropin and see if that declares whether it's suitable
dropin: GenericDropin = self.dropin_for_name(info_extractor.ie_key())
if dropin and dropin.suitable(url, info_extractor):
yield info_extractor
elif info_extractor.suitable(url):
yield info_extractor
def suitable(self, url: str) -> bool:
@@ -188,9 +197,13 @@ class GenericExtractor(Extractor):
result = self.download_additional_media(video_data, info_extractor, result)
# keep both 'title' and 'fulltitle', but prefer 'title', falling back to 'fulltitle' if it doesn't exist
result.set_title(video_data.pop("title", video_data.pop("fulltitle", "")))
result.set_url(url)
if "description" in video_data:
if not result.get_title():
result.set_title(video_data.pop("title", video_data.pop("fulltitle", "")))
if not result.get("url"):
result.set_url(url)
if "description" in video_data and not result.get_content():
result.set_content(video_data["description"])
# extract comments if enabled
if self.comments:
@@ -207,11 +220,11 @@ class GenericExtractor(Extractor):
)
# then add the common metadata
if timestamp := video_data.pop("timestamp", None):
if timestamp := video_data.pop("timestamp", None) and not result.get("timestamp"):
timestamp = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc).isoformat()
result.set_timestamp(timestamp)
if upload_date := video_data.pop("upload_date", None):
upload_date = datetime.datetime.strptime(upload_date, "%Y%m%d").replace(tzinfo=datetime.timezone.utc)
if upload_date := video_data.pop("upload_date", None) and not result.get("upload_date"):
upload_date = get_datetime_from_str(upload_date, "%Y%m%d").replace(tzinfo=datetime.timezone.utc)
result.set("upload_date", upload_date)
# then clean away any keys we don't want
@@ -240,7 +253,8 @@ class GenericExtractor(Extractor):
return False
post_data = dropin.extract_post(url, ie_instance)
return dropin.create_metadata(post_data, ie_instance, self, url)
result = dropin.create_metadata(post_data, ie_instance, self, url)
return self.add_metadata(post_data, info_extractor, url, result)
def get_metadata_for_video(
self, data: dict, info_extractor: Type[InfoExtractor], url: str, ydl: yt_dlp.YoutubeDL
@@ -285,7 +299,7 @@ class GenericExtractor(Extractor):
return self.add_metadata(data, info_extractor, url, result)
def dropin_for_name(self, dropin_name: str, additional_paths=[], package=__package__) -> Type[InfoExtractor]:
def dropin_for_name(self, dropin_name: str, additional_paths=[], package=__package__) -> GenericDropin:
dropin_name = dropin_name.lower()
if dropin_name == "generic":
@@ -296,6 +310,7 @@ class GenericExtractor(Extractor):
def _load_dropin(dropin):
dropin_class = getattr(dropin, dropin_class_name)()
dropin.extractor = self
return self._dropins.setdefault(dropin_name, dropin_class)
try:
@@ -340,7 +355,7 @@ class GenericExtractor(Extractor):
dropin_submodule = self.dropin_for_name(info_extractor.ie_key())
try:
if dropin_submodule and dropin_submodule.skip_ytdlp_download(info_extractor, url):
if dropin_submodule and dropin_submodule.skip_ytdlp_download(url, info_extractor):
logger.debug(f"Skipping using ytdlp to download files for {info_extractor.ie_key()}")
raise SkipYtdlp()
@@ -359,7 +374,7 @@ class GenericExtractor(Extractor):
if not isinstance(e, SkipYtdlp):
logger.debug(
f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use extractor to get post data instead'
f'Issue using "{info_extractor.IE_NAME}" extractor to download video (error: {repr(e)}), attempting to use dropin to get post data instead'
)
try:

View File

@@ -1,5 +1,8 @@
import requests
from loguru import logger
from yt_dlp.extractor.tiktok import TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE
from auto_archiver.core import Metadata, Media
from datetime import datetime, timezone
from .dropin import GenericDropin
@@ -13,6 +16,11 @@ class Tiktok(GenericDropin):
TIKWM_ENDPOINT = "https://www.tikwm.com/api/?url={url}"
def suitable(self, url, info_extractor) -> bool:
"""This dropin (which uses Tikvm) is suitable for *all* Tiktok type URLs - videos, lives, VMs, and users.
Return the 'suitable' method from the TikTokIE class."""
return any(extractor().suitable(url) for extractor in (TikTokIE, TikTokLiveIE, TikTokVMIE, TikTokUserIE))
def extract_post(self, url: str, ie_instance):
logger.debug(f"Using Tikwm API to attempt to download tiktok video from {url=}")
@@ -38,6 +46,9 @@ class Tiktok(GenericDropin):
api_data["video_url"] = video_url
return api_data
def keys_to_clean(self, video_data: dict, info_extractor):
return ["video_url", "title", "create_time", "author", "cover", "origin_cover", "ai_dynamic_cover", "duration"]
def create_metadata(self, post: dict, ie_instance, archiver, url):
# prepare result, start by downloading video
result = Metadata()
@@ -54,17 +65,17 @@ class Tiktok(GenericDropin):
logger.error(f"failed to download video from {video_url}")
return False
video_media = Media(video_downloaded)
if duration := post.pop("duration", None):
if duration := post.get("duration", None):
video_media.set("duration", duration)
result.add_media(video_media)
# add remaining metadata
result.set_title(post.pop("title", ""))
result.set_title(post.get("title", ""))
if created_at := post.pop("create_time", None):
if created_at := post.get("create_time", None):
result.set_timestamp(datetime.fromtimestamp(created_at, tz=timezone.utc))
if author := post.pop("author", None):
if author := post.get("author", None):
result.set("author", author)
result.set("api_data", post)

View File

@@ -1,13 +1,12 @@
import re
import mimetypes
import json
from datetime import datetime
from loguru import logger
from slugify import slugify
from auto_archiver.core.metadata import Metadata, Media
from auto_archiver.utils import url as UrlUtil
from auto_archiver.utils import url as UrlUtil, get_datetime_from_str
from auto_archiver.core.extractor import Extractor
from .dropin import GenericDropin, InfoExtractor
@@ -38,7 +37,7 @@ class Twitter(GenericDropin):
try:
if not tweet.get("user") or not tweet.get("created_at"):
raise ValueError("Error retreiving post. Are you sure it exists?")
timestamp = datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y")
timestamp = get_datetime_from_str(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y")
except (ValueError, KeyError) as ex:
logger.warning(f"Unable to parse tweet: {str(ex)}\nRetreived tweet data: {tweet}")
return False

View File

@@ -20,7 +20,7 @@
"save_absolute": {
"default": False,
"type": "bool",
"help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (WARN: leaks the file structure)",
"help": "whether the path to the stored file is absolute or relative in the output result inc. formatters (Warning: saving an absolute path will show your computer's file structure)",
},
},
"description": """

View File

@@ -2,7 +2,6 @@ import json
import re
import mimetypes
import requests
from datetime import datetime
from loguru import logger
from pytwitter import Api
@@ -10,6 +9,7 @@ from slugify import slugify
from auto_archiver.core import Extractor
from auto_archiver.core import Metadata, Media
from auto_archiver.utils import get_datetime_from_str
class TwitterApiExtractor(Extractor):
@@ -91,7 +91,7 @@ class TwitterApiExtractor(Extractor):
result = Metadata()
result.set_title(tweet.data.text)
result.set_timestamp(datetime.strptime(tweet.data.created_at, "%Y-%m-%dT%H:%M:%S.%fZ"))
result.set_timestamp(get_datetime_from_str(tweet.data.created_at, "%Y-%m-%dT%H:%M:%S.%fZ"))
urls = []
if tweet.includes:

View File

@@ -118,7 +118,7 @@ def pytest_runtest_setup(item):
pytest.xfail(f"previous test failed ({test_name})")
@pytest.fixture()
@pytest.fixture
def unpickle():
"""
Returns a helper function that unpickles a file

View File

@@ -40,6 +40,22 @@ class TestGenericExtractor(TestExtractorBase):
path = os.path.join(dirname(dirname(__file__)), "data/")
assert self.extractor.dropin_for_name("dropin", additional_paths=[path])
@pytest.mark.parametrize(
"url, suitable_extractors",
[
("https://www.youtube.com/watch?v=5qap5aO4i9A", ["youtube"]),
("https://www.tiktok.com/@funnycats0ftiktok/video/7345101300750748970?lang=en", ["tiktok"]),
("https://www.instagram.com/p/CU1J9JYJ9Zz/", ["instagram"]),
("https://www.facebook.com/nytimes/videos/10160796550110716", ["facebook"]),
("https://www.facebook.com/BylineFest/photos/t.100057299682816/927879487315946/", ["facebook"]),
],
)
def test_suitable_extractors(self, url, suitable_extractors):
suitable_extractors = suitable_extractors + ["generic"] # the generic is valid for all
extractors = list(self.extractor.suitable_extractors(url))
assert len(extractors) == len(suitable_extractors)
assert [e.ie_key().lower() for e in extractors] == suitable_extractors
@pytest.mark.parametrize(
"url, is_suitable",
[
@@ -55,7 +71,7 @@ class TestGenericExtractor(TestExtractorBase):
("https://google.com", True),
],
)
def test_suitable_urls(self, make_item, url, is_suitable):
def test_suitable_urls(self, url, is_suitable):
"""
Note: expected behaviour is to return True for all URLs, as YoutubeDLArchiver should be able to handle all URLs
This behaviour may be changed in the future (e.g. if we want the youtubedl archiver to just handle URLs it has extractors for,
@@ -245,3 +261,32 @@ class TestGenericExtractor(TestExtractorBase):
self.assertValidResponseMetadata(post, title, timestamp)
assert len(post.media) == 1
assert post.media[0].hash == image_hash
@pytest.mark.download
def test_download_facebook_video(self, make_item):
post = self.extractor.download(make_item("https://www.facebook.com/bellingcat/videos/588371253839133"))
assert len(post.media) == 2
assert post.media[0].filename.endswith("588371253839133.mp4")
assert post.media[0].mimetype == "video/mp4"
assert post.media[1].filename.endswith(".jpg")
assert post.media[1].mimetype == "image/jpeg"
assert "Bellingchat Premium is with Kolina Koltai" in post.get_title()
@pytest.mark.download
def test_download_facebook_image(self, make_item):
post = self.extractor.download(
make_item("https://www.facebook.com/BylineFest/photos/t.100057299682816/927879487315946/")
)
assert len(post.media) == 1
assert post.media[0].filename.endswith(".png")
assert "Byline Festival - BylineFest Partner" == post.get_title()
@pytest.mark.download
def test_download_facebook_text_only(self, make_item):
url = "https://www.facebook.com/bellingcat/posts/pfbid02rzpwZxAZ8bLkAX8NvHv4DWAidFaqAUfJMbo9vWkpwxL7uMUWzWMiizXLWRSjwihVl"
post = self.extractor.download(make_item(url))
assert "Bellingcat researcher Kolina Koltai delves deeper into Clothoff" in post.get("content")
assert post.get_title() == "Bellingcat"

View File

@@ -4,6 +4,8 @@ import pytest
import yt_dlp
from auto_archiver.modules.generic_extractor.generic_extractor import GenericExtractor
from auto_archiver.modules.generic_extractor.tiktok import Tiktok, TikTokIE
from .test_extractor_base import TestExtractorBase
@@ -17,11 +19,16 @@ def skip_ytdlp_own_methods(mocker):
)
@pytest.fixture()
@pytest.fixture
def mock_get(mocker):
return mocker.patch("auto_archiver.modules.generic_extractor.tiktok.requests.get")
@pytest.fixture
def tiktok_dropin() -> Tiktok:
return Tiktok()
class TestTiktokTikwmExtractor(TestExtractorBase):
"""
Test suite for TestTiktokTikwmExtractor.
@@ -34,6 +41,25 @@ class TestTiktokTikwmExtractor(TestExtractorBase):
VALID_EXAMPLE_URL = "https://www.tiktok.com/@example/video/1234"
@pytest.mark.parametrize(
"url, is_suitable",
[
("https://bellingcat.com", False),
("https://youtube.com", False),
("https://tiktok.co/", False),
("https://tiktok.com/", False),
("https://www.tiktok.com/", False),
("https://api.cool.tiktok.com/", False),
(VALID_EXAMPLE_URL, True),
("https://www.tiktok.com/@bbcnews/video/7478038212070411542", True),
("https://www.tiktok.com/@ggs68taiwan.official/video/7441821351142362375", True),
("https://www.tiktok.com/t/ZP8YQ8e5j/", True),
("https://vt.tiktok.com/ZSMTJeqRP/", True),
],
)
def test_is_suitable(self, url, is_suitable, tiktok_dropin):
assert tiktok_dropin.suitable(url, TikTokIE()) == is_suitable
def test_invalid_json_responses(self, mock_get, make_item, caplog):
mock_get.return_value.status_code = 200
mock_get.return_value.json.side_effect = ValueError