Files
auto-archiver/archivers/twitter_archiver.py
Ed Summers 3b87dffe6b Add browsertrix-crawler capture
The [browsertrix-crawler] utility is a browser-based crawler that can
crawl one or more pages. browsertrix-crawler creates archives in the
[WACZ] format which is essentially a standardized ZIP file (similar to DOCX, EPUB, JAR, etc) which can then be replayed using the [ReplayWeb.page] web
component, or unzipped to get the original WARC data (the ISO standard
format used by the Internet Archive Wayback Machine).

This PR adds browsertrix-crawler to archiver classes where screenshots are made made. The WACZ is uploaded to storage and then added to a new column in the spreadsheet. A column can be added that will display the WACZ, loaded from cloud storage (S3, digitalocean, etc) using the client side ReplayWeb page. You can see an example of the spreadsheet here:

https://docs.google.com/spreadsheets/d/1Tk-iJWzT9Sx2-YccuPttL9HcMdZEnhv_OR7Bc6tfeu8/edit#gid=0

browsertrix-crawler requires Docker to be installed. If Docker is not
installed an error message will be logged and things continue as normal.

[browsertrix-crawler]: https://github.com/webrecorder/browsertrix-crawler
[WACZ]: https://specs.webrecorder.net/wacz/latest/
[ReplayWeb.page]: https://replayweb.page
2022-09-25 19:46:29 +00:00

105 lines
4.3 KiB
Python

import html, re, requests
from datetime import datetime
from loguru import logger
from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo
from .base_archiver import Archiver, ArchiveResult
class TwitterArchiver(Archiver):
"""
This Twitter Archiver uses unofficial scraping methods, and it works as
an alternative to TwitterApiArchiver when no API credentials are provided.
"""
name = "twitter"
link_pattern = re.compile(r"twitter.com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
def get_username_tweet_id(self, url):
# detect URLs that we definitely cannot handle
matches = self.link_pattern.findall(url)
if not len(matches): return False, False
username, tweet_id = matches[0] # only one URL supported
logger.debug(f"Found {username=} and {tweet_id=} in {url=}")
return username, tweet_id
def download(self, url, check_if_exists=False):
username, tweet_id = self.get_username_tweet_id(url)
if not username: return False
scr = TwitterTweetScraper(tweet_id)
try:
tweet = next(scr.get_items())
except Exception as ex:
logger.warning(f"can't get tweet: {type(ex).__name__} occurred. args: {ex.args}")
return self.download_alternative(url, tweet_id)
if tweet.media is None:
logger.debug(f'No media found, archiving tweet text only')
screenshot = self.get_screenshot(url)
wacz = self.get_wacz(url)
page_cdn, page_hash, _ = self.generate_media_page_html(url, [], html.escape(tweet.json()))
return ArchiveResult(status="success", cdn_url=page_cdn, title=tweet.content, timestamp=tweet.date, hash=page_hash, screenshot=screenshot, wacz=wacz)
urls = []
for media in tweet.media:
if type(media) == Video:
variant = max(
[v for v in media.variants if v.bitrate], key=lambda v: v.bitrate)
urls.append(variant.url)
elif type(media) == Gif:
urls.append(media.variants[0].url)
elif type(media) == Photo:
urls.append(media.fullUrl.replace('name=large', 'name=orig'))
else:
logger.warning(f"Could not get media URL of {media}")
page_cdn, page_hash, thumbnail = self.generate_media_page(urls, url, tweet.json())
screenshot = self.get_screenshot(url)
wacz = self.get_wacz(url)
return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=tweet.date, title=tweet.content, wacz=wacz)
def download_alternative(self, url, tweet_id):
# https://stackoverflow.com/a/71867055/6196010
logger.debug(f"Trying twitter hack for {url=}")
hack_url = f"https://cdn.syndication.twimg.com/tweet?id={tweet_id}"
r = requests.get(hack_url)
if r.status_code != 200: return False
tweet = r.json()
urls = []
for p in tweet["photos"]:
urls.append(p["url"])
# 1 tweet has 1 video max
if "video" in tweet:
v = tweet["video"]
urls.append(self.choose_variant(v.get("variants", [])))
logger.debug(f"Twitter hack got {urls=}")
timestamp = datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
screenshot = self.get_screenshot(url)
page_cdn, page_hash, thumbnail = self.generate_media_page(urls, url, r.text)
return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=timestamp, title=tweet["text"])
def choose_variant(self, variants):
# choosing the highest quality possible
variant, width, height = None, 0, 0
for var in variants:
if var["type"] == "video/mp4":
width_height = re.search(r"\/(\d+)x(\d+)\/", var["src"])
if width_height:
w, h = int(width_height[1]), int(width_height[2])
if w > width or h > height:
width, height = w, h
variant = var.get("src", variant)
else:
variant = var.get("src") if not variant else variant
return variant