Merge branch 'main' into small_issues

This commit is contained in:
Patrick Robertson
2025-01-12 11:47:55 +01:00
17 changed files with 397 additions and 51 deletions

View File

@@ -2,6 +2,7 @@ steps:
# only 1 feeder allowed
feeder: gsheet_feeder # defaults to cli_feeder
archivers: # order matters, uncomment to activate
- bluesky_archiver
# - vk_archiver
# - telethon_archiver
# - telegram_archiver
@@ -94,9 +95,33 @@ configurations:
password: "vk pass"
session_file: "secrets/vk_config.v2.json"
youtubedl_archiver:
subtitles: true
# use one of the following two methods to authenticate in youtube - either provide a cookies file or use the cookies of the given browser
# for more information, see https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp
# cookie_file: "secrets/youtube_cookies.txt"
# cookies_from_browser: firefox
# proxy: socks5://proxy-user:password@proxy-ip:port
screenshot_enricher:
width: 1280
height: 2300
# to save as pdf, uncomment the following lines and adjust the print options
# save_to_pdf: true
# print_options:
# for all options see https://www.selenium.dev/selenium/docs/api/py/webdriver/selenium.webdriver.common.print_page_options.html
# background: true
# orientation: "portrait"
# scale: 1
# page_width: 8.5in
# page_height: 11in
# margin_top: 0.4in
# margin_bottom: 0.4in
# margin_left: 0.4in
# margin_right: 0.4in
# page_ranges: ""
# shrink_to_fit: true
wayback_archiver_enricher:
timeout: 10
key: "wayback key"

View File

@@ -8,4 +8,5 @@ from .tiktok_archiver import TiktokArchiver
from .telegram_archiver import TelegramArchiver
from .vk_archiver import VkArchiver
from .youtubedl_archiver import YoutubeDLArchiver
from .instagram_api_archiver import InstagramAPIArchiver
from .instagram_api_archiver import InstagramAPIArchiver
from .bluesky_archiver import BlueskyArchiver

View File

@@ -48,6 +48,8 @@ class Archiver(Step):
"""
downloads a URL to provided filename, or inferred from URL, returns local filename
"""
# TODO: should we refactor to use requests.get(url, stream=True) and write to file in chunks? compare approaches
# TODO: should we guess the extension?
if not to_filename:
to_filename = url.split('/')[-1].split('?')[0]
if len(to_filename) > 64:

View File

@@ -0,0 +1,119 @@
import os
import re, requests, mimetypes
from loguru import logger
from . import Archiver
from ..core import Metadata, Media, ArchivingContext
class BlueskyArchiver(Archiver):
"""
Uses an unauthenticated Bluesky API to archive posts including metadata, images and videos. Relies on `public.api.bsky.app/xrpc` and `bsky.social/xrpc`. Avoids ATProto to avoid auth.
Some inspiration from https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/extractor/bluesky.py
"""
name = "bluesky_archiver"
BSKY_POST = re.compile(r"/profile/([^/]+)/post/([a-zA-Z0-9]+)")
def __init__(self, config: dict) -> None:
super().__init__(config)
@staticmethod
def configs() -> dict:
return {}
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
if not re.search(self.BSKY_POST, url):
return False
logger.debug(f"Identified a Bluesky post: {url}, archiving...")
result = Metadata()
# fetch post info and update result
post = self._get_post_from_uri(url)
logger.debug(f"Extracted post info: {post['record']['text']}")
result.set_title(post["record"]["text"])
result.set_timestamp(post["record"]["createdAt"])
for k, v in self._get_post_data(post).items():
if v: result.set(k, v)
# download if embeds present (1 video XOR >=1 images)
for media in self._download_bsky_embeds(post):
result.add_media(media)
logger.debug(f"Downloaded {len(result.media)} media files")
return result.success("bluesky")
def _get_post_from_uri(self, post_uri: str) -> dict:
"""
Calls a public (no auth needed) Bluesky API to get a post from its uri, uses .getPostThread as it brings author info as well (unlike .getPost).
"""
post_match = re.search(self.BSKY_POST, post_uri)
username = post_match.group(1)
post_id = post_match.group(2)
at_uri = f'at://{username}/app.bsky.feed.post/{post_id}'
r = requests.get(f"https://public.api.bsky.app/xrpc/app.bsky.feed.getPostThread?uri={at_uri}&depth=0&parent_height=0")
r.raise_for_status()
thread = r.json()
assert thread["thread"]["$type"] == "app.bsky.feed.defs#threadViewPost"
return thread["thread"]["post"]
def _download_bsky_embeds(self, post: dict) -> list[Media]:
"""
Iterates over image(s) or video in a Bluesky post and downloads them
"""
media = []
embed = post.get("record", {}).get("embed", {})
image_medias = embed.get("images", []) + embed.get("media", {}).get("images", [])
video_medias = [e for e in [embed.get("video"), embed.get("media", {}).get("video")] if e]
for image_media in image_medias:
image_media = self._download_bsky_file_as_media(image_media["image"]["ref"]["$link"], post["author"]["did"])
media.append(image_media)
for video_media in video_medias:
video_media = self._download_bsky_file_as_media(video_media["ref"]["$link"], post["author"]["did"])
media.append(video_media)
return media
def _download_bsky_file_as_media(self, cid: str, did: str) -> Media:
"""
Uses the Bluesky API to download a file by its `cid` and `did`.
"""
# TODO: replace with self.download_from_url once that function has been cleaned-up
file_url = f"https://bsky.social/xrpc/com.atproto.sync.getBlob?cid={cid}&did={did}"
response = requests.get(file_url, stream=True)
response.raise_for_status()
ext = mimetypes.guess_extension(response.headers["Content-Type"])
filename = os.path.join(ArchivingContext.get_tmp_dir(), f"{cid}{ext}")
with open(filename, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
media = Media(filename=filename)
media.set("src", file_url)
return media
def _get_post_data(self, post: dict) -> dict:
"""
Extracts relevant information returned by the .getPostThread api call (excluding text/created_at): author, mentions, tags, links.
"""
author = post["author"]
if "labels" in author and not author["labels"]: del author["labels"]
if "associated" in author: del author["associated"]
mentions, tags, links = [], [], []
facets = post.get("record", {}).get("facets", [])
for f in facets:
for feature in f["features"]:
if feature["$type"] == "app.bsky.richtext.facet#mention":
mentions.append(feature["did"])
elif feature["$type"] == "app.bsky.richtext.facet#tag":
tags.append(feature["tag"])
elif feature["$type"] == "app.bsky.richtext.facet#link":
links.append(feature["uri"])
res = {"author": author}
if mentions: res["mentions"] = mentions
if tags: res["tags"] = tags
if links: res["links"] = links
return res

View File

@@ -2,7 +2,6 @@ import re, requests, mimetypes, json
from typing import Union
from datetime import datetime
from loguru import logger
from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo
from yt_dlp import YoutubeDL
from yt_dlp.extractor.twitter import TwitterIE
from slugify import slugify
@@ -49,7 +48,7 @@ class TwitterArchiver(Archiver):
username, tweet_id = self.get_username_tweet_id(url)
if not username: return False
strategies = [self.download_yt_dlp, self.download_snscrape, self.download_syndication]
strategies = [self.download_yt_dlp, self.download_syndication]
for strategy in strategies:
logger.debug(f"Trying {strategy.__name__} for {url=}")
try:
@@ -61,45 +60,6 @@ class TwitterArchiver(Archiver):
logger.warning(f"No free strategy worked for {url}")
return False
def download_snscrape(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]:
scr = TwitterTweetScraper(tweet_id)
try:
tweet = next(scr.get_items())
except Exception as ex:
logger.warning(f"SNSCRAPE FAILED, can't get tweet: {type(ex).__name__} occurred. args: {ex.args}")
return False
result = Metadata()
result.set_title(tweet.content).set_content(tweet.json()).set_timestamp(tweet.date)
if tweet.media is None:
logger.debug(f'No media found, archiving tweet text only')
return result
for i, tweet_media in enumerate(tweet.media):
media = Media(filename="")
mimetype = ""
if type(tweet_media) == Video:
variant = max(
[v for v in tweet_media.variants if v.bitrate], key=lambda v: v.bitrate)
media.set("src", variant.url).set("duration", tweet_media.duration)
mimetype = variant.contentType
elif type(tweet_media) == Gif:
variant = tweet_media.variants[0]
media.set("src", variant.url)
mimetype = variant.contentType
elif type(tweet_media) == Photo:
media.set("src", UrlUtil.twitter_best_quality_url(tweet_media.fullUrl))
mimetype = "image/jpeg"
else:
logger.warning(f"Could not get media URL of {tweet_media}")
continue
ext = mimetypes.guess_extension(mimetype)
media.filename = self.download_from_url(media.get("src"), f'{slugify(url)}_{i}{ext}')
result.add_media(media)
return result.success("twitter-snscrape")
def download_syndication(self, item: Metadata, url: str, tweet_id: str) -> Union[Metadata|bool]:
"""
Hack alternative working again.

View File

@@ -30,6 +30,8 @@ class YoutubeDLArchiver(Archiver):
"end_means_success": {"default": True, "help": "if True, any archived content will mean a 'success', if False this archiver will not return a 'success' stage; this is useful for cases when the yt-dlp will archive a video but ignore other types of content like images or text only pages that the subsequent archivers can retrieve."},
'allow_playlist': {"default": False, "help": "If True will also download playlists, set to False if the expectation is to download a single video."},
"max_downloads": {"default": "inf", "help": "Use to limit the number of videos to download when a channel or long page is being extracted. 'inf' means no limit."},
"cookies_from_browser": {"default": None, "help": "optional browser for ytdl to extract cookies from, can be one of: brave, chrome, chromium, edge, firefox, opera, safari, vivaldi, whale"},
"cookie_file": {"default": None, "help": "optional cookie file to use for Youtube, see instructions here on how to export from your browser: https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp"},
}
def download(self, item: Metadata) -> Metadata:
@@ -38,8 +40,17 @@ class YoutubeDLArchiver(Archiver):
if item.netloc in ['facebook.com', 'www.facebook.com'] and self.facebook_cookie:
logger.debug('Using Facebook cookie')
yt_dlp.utils.std_headers['cookie'] = self.facebook_cookie
ydl_options = {'outtmpl': os.path.join(ArchivingContext.get_tmp_dir(), f'%(id)s.%(ext)s'), 'quiet': False, 'noplaylist': not self.allow_playlist , 'writesubtitles': self.subtitles, 'writeautomaticsub': self.subtitles, "live_from_start": self.live_from_start, "proxy": self.proxy, "max_downloads": self.max_downloads, "playlistend": self.max_downloads}
if item.netloc in ['youtube.com', 'www.youtube.com']:
if self.cookies_from_browser:
logger.debug(f'Extracting cookies from browser {self.cookies_from_browser} for Youtube')
ydl_options['cookiesfrombrowser'] = (self.cookies_from_browser,)
elif self.cookie_file:
logger.debug(f'Using cookies from file {self.cookie_file}')
ydl_options['cookiefile'] = self.cookie_file
ydl = yt_dlp.YoutubeDL(ydl_options) # allsubtitles and subtitleslangs not working as expected, so default lang is always "en"
try:

View File

@@ -1,5 +1,7 @@
from loguru import logger
import time, os
import base64
from selenium.common.exceptions import TimeoutException
@@ -18,22 +20,31 @@ class ScreenshotEnricher(Enricher):
"timeout": {"default": 60, "help": "timeout for taking the screenshot"},
"sleep_before_screenshot": {"default": 4, "help": "seconds to wait for the pages to load before taking screenshot"},
"http_proxy": {"default": "", "help": "http proxy to use for the webdriver, eg http://proxy-user:password@proxy-ip:port"},
"save_to_pdf": {"default": False, "help": "save the page as pdf along with the screenshot. PDF saving options can be adjusted with the 'print_options' parameter"},
"print_options": {"default": {}, "help": "options to pass to the pdf printer"}
}
def enrich(self, to_enrich: Metadata) -> None:
url = to_enrich.get_url()
if UrlUtil.is_auth_wall(url):
logger.debug(f"[SKIP] SCREENSHOT since url is behind AUTH WALL: {url=}")
return
logger.debug(f"Enriching screenshot for {url=}")
with Webdriver(self.width, self.height, self.timeout, 'facebook.com' in url, http_proxy=self.http_proxy) as driver:
with Webdriver(self.width, self.height, self.timeout, 'facebook.com' in url, http_proxy=self.http_proxy, print_options=self.print_options) as driver:
try:
driver.get(url)
time.sleep(int(self.sleep_before_screenshot))
screenshot_file = os.path.join(ArchivingContext.get_tmp_dir(), f"screenshot_{random_str(8)}.png")
driver.save_screenshot(screenshot_file)
to_enrich.add_media(Media(filename=screenshot_file), id="screenshot")
if self.save_to_pdf:
pdf_file = os.path.join(ArchivingContext.get_tmp_dir(), f"pdf_{random_str(8)}.pdf")
pdf = driver.print_page(driver.print_options)
with open(pdf_file, "wb") as f:
f.write(base64.b64decode(pdf))
to_enrich.add_media(Media(filename=pdf_file), id="pdf")
except TimeoutException:
logger.info("TimeoutException loading page for screenshot")
except Exception as e:

View File

@@ -1,3 +1,4 @@
import json
from loguru import logger
import time, requests
@@ -70,11 +71,16 @@ class WaybackArchiverEnricher(Enricher, Archiver):
return False
# check job status
job_id = r.json().get('job_id')
if not job_id:
logger.error(f"Wayback failed with {r.json()}")
try:
job_id = r.json().get('job_id')
if not job_id:
logger.error(f"Wayback failed with {r.json()}")
return False
except json.decoder.JSONDecodeError as e:
logger.error(f"Expected a JSON with job_id from Wayback and got {r.text}")
return False
# waits at most timeout seconds until job is completed, otherwise only enriches the job_id information
start_time = time.time()
wayback_url = False
@@ -92,6 +98,9 @@ class WaybackArchiverEnricher(Enricher, Archiver):
except requests.exceptions.RequestException as e:
logger.warning(f"RequestException: fetching status for {url=} due to: {e}")
break
except json.decoder.JSONDecodeError as e:
logger.error(f"Expected a JSON from Wayback and got {r.text} for {url=}")
break
except Exception as e:
logger.warning(f"error fetching status for {url=} due to: {e}")
if not wayback_url:

View File

@@ -286,11 +286,11 @@
// logic for enabled/disabled greyscale
// Get references to the checkboxes and images/videos
const safeImageViewCheckbox = document.getElementById('safe-media-view');
const imagesVideos = document.querySelectorAll('img, video');
const visualPreviews = document.querySelectorAll('img, video,embed');
// Function to toggle grayscale effect
function toggleGrayscale() {
imagesVideos.forEach(element => {
visualPreviews.forEach(element => {
if (safeImageViewCheckbox.checked) {
// Enable grayscale effect
element.style.filter = 'grayscale(1)';
@@ -307,7 +307,7 @@
safeImageViewCheckbox.addEventListener('change', toggleGrayscale);
// Handle the hover effect using JavaScript
imagesVideos.forEach(element => {
visualPreviews.forEach(element => {
element.addEventListener('mouseenter', () => {
// Disable grayscale effect on hover
element.style.filter = 'none';

View File

@@ -32,6 +32,10 @@ No URL available for {{ m.key }}.
Your browser does not support the video element.
</video>
</div>
{% elif 'application/pdf' in m.mimetype %}
<div>
<embed src="{{ url }}" width="100%" height="400px"/>
</div>
{% elif 'audio' in m.mimetype %}
<div>
<audio controls>

View File

@@ -2,18 +2,24 @@ from __future__ import annotations
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.proxy import Proxy, ProxyType
from selenium.webdriver.common.print_page_options import PrintOptions
from loguru import logger
from selenium.webdriver.common.by import By
import time
class Webdriver:
def __init__(self, width: int, height: int, timeout_seconds: int, facebook_accept_cookies: bool = False, http_proxy: str = "") -> webdriver:
def __init__(self, width: int, height: int, timeout_seconds: int, facebook_accept_cookies: bool = False, http_proxy: str = "", print_options: dict = {}) -> webdriver:
self.width = width
self.height = height
self.timeout_seconds = timeout_seconds
self.facebook_accept_cookies = facebook_accept_cookies
self.http_proxy = http_proxy
# create and set print options
self.print_options = PrintOptions()
for k, v in print_options.items():
setattr(self.print_options, k, v)
def __enter__(self) -> webdriver:
options = webdriver.FirefoxOptions()
@@ -24,6 +30,7 @@ class Webdriver:
self.driver = webdriver.Firefox(options=options)
self.driver.set_window_size(self.width, self.height)
self.driver.set_page_load_timeout(self.timeout_seconds)
self.driver.print_options = self.print_options
except TimeoutException as e:
logger.error(f"failed to get new webdriver, possibly due to insufficient system resources or timeout settings: {e}")

4
tests/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
import unittest
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,7 @@
import tempfile
from auto_archiver.core.context import ArchivingContext
ArchivingContext.reset(full_reset=True)
ArchivingContext.set_tmp_dir(tempfile.gettempdir())

View File

@@ -0,0 +1,22 @@
from auto_archiver.core import Metadata
class TestArchiverBase(object):
archiver_class = None
config = None
def setUp(self):
assert self.archiver_class is not None, "self.archiver_class must be set on the subclass"
assert self.config is not None, "self.config must be a dict set on the subclass"
self.archiver = self.archiver_class(self.config)
def create_item(self, url, **kwargs):
item = Metadata().set_url(url)
for key, value in kwargs.items():
item.set(key, value)
return item
def assertValidResponseMetadata(self, test_response, title, timestamp):
self.assertTrue(test_response.is_success())
self.assertEqual(title, test_response.get_title())
self.assertTrue(timestamp, test_response.get("timestamp"))

View File

@@ -0,0 +1,69 @@
from auto_archiver.archivers.bluesky_archiver import BlueskyArchiver
from .test_archiver_base import TestArchiverBase
import unittest
class TestBlueskyArchiver(TestArchiverBase, unittest.TestCase):
"""Tests Bluesky Archiver
Note that these tests will download API responses from the bluesky API, so they may be slow.
This is an intended feature, as we want to test to ensure the bluesky API format hasn't changed,
and also test the archiver's ability to download media.
"""
archiver_class = BlueskyArchiver
config = {}
def test_download_media_with_images(self):
# url https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y
post = self.archiver._get_post_from_uri("https://bsky.app/profile/colborne.bsky.social/post/3lec2bqjc5s2y")
# just make sure bsky haven't changed their format, images should be under "record/embed/media/images"
# there should be 2 images
self.assertTrue("record" in post)
self.assertTrue("embed" in post["record"])
self.assertTrue("media" in post["record"]["embed"])
self.assertTrue("images" in post["record"]["embed"]["media"])
self.assertEqual(len(post["record"]["embed"]["media"]["images"]), 2)
# try downloading the media files
media = self.archiver._download_bsky_embeds(post)
self.assertEqual(len(media), 2)
# check the IDs
self.assertTrue("bafkreiflrkfihcvwlhka5tb2opw2qog6gfvywsdzdlibveys2acozh75tq" in media[0].get('src'))
self.assertTrue("bafkreibsprmwchf7r6xcstqkdvvuj3ijw7efciw7l3y4crxr4cmynseo7u" in media[1].get('src'))
def test_download_post_with_single_image(self):
# url https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l
post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3lcxcpgt6j42l")
# just make sure bsky haven't changed their format, images should be under "record/embed/images"
# there should be 1 image
self.assertTrue("record" in post)
self.assertTrue("embed" in post["record"])
self.assertTrue("images" in post["record"]["embed"])
self.assertEqual(len(post["record"]["embed"]["images"]), 1)
media = self.archiver._download_bsky_embeds(post)
self.assertEqual(len(media), 1)
# check the ID
self.assertTrue("bafkreihljdtomy4yulx4nfxuqdatlgvdg45vxdmjzzhclsd4ludk7zfma4" in media[0].get('src'))
def test_download_post_with_video(self):
# url https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i
post = self.archiver._get_post_from_uri("https://bsky.app/profile/bellingcat.com/post/3le2l4gsxlk2i")
# just make sure bsky haven't changed their format, video should be under "record/embed/video"
self.assertTrue("record" in post)
self.assertTrue("embed" in post["record"])
self.assertTrue("video" in post["record"]["embed"])
media = self.archiver._download_bsky_embeds(post)
self.assertEqual(len(media), 1)
# check the ID
self.assertTrue("bafkreiaiskn2nt5cxjnxbgcqqcrnurvkr2ni3unekn6zvhvgr5nrqg6u2q" in media[0].get('src'))

View File

@@ -0,0 +1,95 @@
import unittest
import datetime
from auto_archiver.archivers.twitter_archiver import TwitterArchiver
from .test_archiver_base import TestArchiverBase
class TestTwitterArchiver(TestArchiverBase, unittest.TestCase):
archiver_class = TwitterArchiver
config = {}
def test_sanitize_url(self):
# should expand t.co URLs
t_co_url = "https://t.co/yl3oOJatFp"
t_co_resolved_url = "https://www.bellingcat.com/category/resources/"
self.assertEqual(t_co_resolved_url, self.archiver.sanitize_url(t_co_url))
# shouldn't alter valid x URLs
x_url = "https://x.com/bellingcat/status/1874097816571961839"
self.assertEqual(x_url, self.archiver.sanitize_url(x_url))
# shouldn't alter valid twitter.com URLs
twitter_url = "https://twitter.com/bellingcat/status/1874097816571961839"
self.assertEqual(twitter_url, self.archiver.sanitize_url(twitter_url))
# should strip tracking params
tracking_url = "https://twitter.com/bellingcat/status/1874097816571961839?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"
self.assertEqual("https://twitter.com/bellingcat/status/1874097816571961839", self.archiver.sanitize_url(tracking_url))
# shouldn't alter non-twitter/x URLs
test_url = "https://www.bellingcat.com/category/resources/"
self.assertEqual(test_url, self.archiver.sanitize_url(test_url))
# shouldn't strip params from non-twitter/x URLs
test_url = "https://www.bellingcat.com/category/resources/?s=20&t=3d0g4ZQis7dCbSDg-mE7-w"
self.assertEqual(test_url, self.archiver.sanitize_url(test_url))
def test_get_username_tweet_id_from_url(self):
# test valid twitter URL
url = "https://twitter.com/bellingcat/status/1874097816571961839"
username, tweet_id = self.archiver.get_username_tweet_id(url)
self.assertEqual("bellingcat", username)
self.assertEqual("1874097816571961839", tweet_id)
# test valid x URL
url = "https://x.com/bellingcat/status/1874097816571961839"
username, tweet_id = self.archiver.get_username_tweet_id(url)
self.assertEqual("bellingcat", username)
self.assertEqual("1874097816571961839", tweet_id)
# test invalid URL
# TODO: should this return None, False or raise an exception? Right now it returns False
url = "https://www.bellingcat.com/category/resources/"
username, tweet_id = self.archiver.get_username_tweet_id(url)
self.assertFalse(username)
self.assertFalse(tweet_id)
def test_youtube_dlp_archiver(self):
url = "https://x.com/bellingcat/status/1874097816571961839"
post = self.archiver.download_yt_dlp(self.create_item(url), url, "1874097816571961839")
self.assertTrue(post)
self.assertValidResponseMetadata(
post,
"As 2024 comes to a close, heres some examples of what Bellingcat investigated per month in our 10th year! 🧵",
datetime.datetime(2024, 12, 31, 14, 18, 33, tzinfo=datetime.timezone.utc)
)
breakpoint()
def test_download_media_with_images(self):
# url https://twitter.com/MeCookieMonster/status/1617921633456640001?s=20&t=3d0g4ZQis7dCbSDg-mE7-w
post = self.archiver.download()
# just make sure twitter haven't changed their format, images should be under "record/embed/media/images"
# there should be 2 images
self.assertTrue("record" in post)
self.assertTrue("embed" in post["record"])
self.assertTrue("media" in post["record"]["embed"])
self.assertTrue("images" in post["record"]["embed"]["media"])
self.assertEqual(len(post["record"]["embed"]["media"]["images"]), 2)
# try downloading the media files
media = self.archiver.download(post)
self.assertEqual(len(media), 2)
# check the IDs
self.assertTrue("bafkreiflrkfihcvwlhka5tb2opw2qog6gfvywsdzdlibveys2acozh75tq" in media[0].get('src'))
self.assertTrue("bafkreibsprmwchf7r6xcstqkdvvuj3ijw7efciw7l3y4crxr4cmynseo7u" in media[1].get('src'))

View File