dockerization complete

This commit is contained in:
msramalho
2022-11-08 15:55:33 +00:00
parent 81eadd4672
commit 390b84eb22
33 changed files with 3 additions and 4 deletions

0
src/__init__.py Normal file
View File

11
src/archivers/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
# we need to explicitly expose the available imports here
from .base_archiver import Archiver, ArchiveResult
from .telegram_archiver import TelegramArchiver
from .telethon_archiver import TelethonArchiver
from .tiktok_archiver import TiktokArchiver
from .wayback_archiver import WaybackArchiver
from .youtubedl_archiver import YoutubeDLArchiver
from .twitter_archiver import TwitterArchiver
from .vk_archiver import VkArchiver
from .twitter_api_archiver import TwitterApiArchiver
from .instagram_archiver import InstagramArchiver

View File

@@ -0,0 +1,360 @@
import os, datetime, shutil, hashlib, time, requests, re, mimetypes, subprocess
from dataclasses import dataclass
from abc import ABC, abstractmethod
from urllib.parse import urlparse
from random import randrange
import ffmpeg
from loguru import logger
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from slugify import slugify
from configs import Config
from storages import Storage
from utils import mkdir_if_not_exists
@dataclass
class ArchiveResult:
status: str
cdn_url: str = None
thumbnail: str = None
thumbnail_index: str = None
duration: float = None
title: str = None
timestamp: datetime.datetime = None
screenshot: str = None
wacz: str = None
hash: str = None
class Archiver(ABC):
name = "default"
retry_regex = r"retrying at (\d+)$"
def __init__(self, storage: Storage, config: Config):
self.storage = storage
self.driver = config.webdriver
self.hash_algorithm = config.hash_algorithm
self.browsertrix = config.browsertrix_config
self.is_docker = config.is_docker
def __str__(self):
return self.__class__.__name__
def __repr__(self):
return self.__str__()
@abstractmethod
def download(self, url, check_if_exists=False): pass
def get_netloc(self, url):
return urlparse(url).netloc
def generate_media_page_html(self, url, urls_info: dict, object, thumbnail=None):
"""
Generates an index.html page where each @urls_info is displayed
"""
page = f'''<html><head><title>{url}</title><meta charset="UTF-8"></head>
<body>
<h2>Archived media from {self.name}</h2>
<h3><a href="{url}">{url}</a></h3><ul>'''
for url_info in urls_info:
mime_global = self._guess_file_type(url_info["key"])
preview = ""
if mime_global == "image":
preview = f'<img src="{url_info["cdn_url"]}" style="max-height:200px;max-width:400px;"></img>'
elif mime_global == "video":
preview = f'<video src="{url_info["cdn_url"]}" controls style="max-height:400px;max-width:400px;"></video>'
page += f'''<li><a href="{url_info['cdn_url']}">{preview}{url_info['key']}</a>: {url_info['hash']}</li>'''
page += f"</ul><h2>{self.name} object data:</h2><code>{object}</code>"
page += f"</body></html>"
page_key = self.get_html_key(url)
page_filename = os.path.join(Storage.TMP_FOLDER, page_key)
with open(page_filename, "w") as f:
f.write(page)
page_hash = self.get_hash(page_filename)
self.storage.upload(page_filename, page_key, extra_args={
'ACL': 'public-read', 'ContentType': 'text/html'})
page_cdn = self.storage.get_cdn_url(page_key)
return (page_cdn, page_hash, thumbnail)
def _guess_file_type(self, path: str):
"""
Receives a URL or filename and returns global mimetype like 'image' or 'video'
see https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Common_types
"""
mime = mimetypes.guess_type(path)[0]
if mime is not None:
return mime.split("/")[0]
return ""
def download_from_url(self, url, to_filename):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
}
d = requests.get(url, headers=headers)
with open(to_filename, 'wb') as f:
f.write(d.content)
def generate_media_page(self, urls, url, object):
"""
For a list of media urls, fetch them, upload them
and call self.generate_media_page_html with them
"""
thumbnail = None
uploaded_media = []
for media_url in urls:
key = self._get_key_from_url(media_url, ".jpg")
filename = os.path.join(Storage.TMP_FOLDER, key)
self.download_from_url(media_url, filename)
self.storage.upload(filename, key)
hash = self.get_hash(filename)
cdn_url = self.storage.get_cdn_url(key)
if thumbnail is None:
thumbnail = cdn_url
uploaded_media.append({'cdn_url': cdn_url, 'key': key, 'hash': hash})
return self.generate_media_page_html(url, uploaded_media, object, thumbnail=thumbnail)
def get_key(self, filename):
"""
returns a key in the format "[archiverName]_[filename]" includes extension
"""
tail = os.path.split(filename)[1] # returns filename.ext from full path
_id, extension = os.path.splitext(tail) # returns [filename, .ext]
if 'unknown_video' in _id:
_id = _id.replace('unknown_video', 'jpg')
# long filenames can cause problems, so trim them if necessary
if len(_id) > 128:
_id = _id[-128:]
return f'{self.name}_{_id}{extension}'
def get_html_key(self, url):
return self._get_key_from_url(url, ".html")
def _get_key_from_url(self, url, with_extension: str = None, append_datetime: bool = False):
"""
Receives a URL and returns a slugified version of the URL path
if a string is passed in @with_extension the slug is appended with it if there is no "." in the slug
if @append_date is true, the key adds a timestamp after the URL slug and before the extension
"""
url_path = urlparse(url).path
path, ext = os.path.splitext(url_path)
slug = slugify(path)
if append_datetime:
slug += "-" + slugify(datetime.datetime.utcnow().isoformat())
if len(ext):
slug += ext
if with_extension is not None:
if "." not in slug:
slug += with_extension
return self.get_key(slug)
def get_hash(self, filename):
with open(filename, "rb") as f:
bytes = f.read() # read entire file as bytes
logger.debug(f'Hash algorithm is {self.hash_algorithm}')
if self.hash_algorithm == "SHA-256": hash = hashlib.sha256(bytes)
elif self.hash_algorithm == "SHA3-512": hash = hashlib.sha3_512(bytes)
else: raise Exception(f"Unknown Hash Algorithm of {self.hash_algorithm}")
return hash.hexdigest()
def get_screenshot(self, url):
logger.debug(f"getting screenshot for {url=}")
key = self._get_key_from_url(url, ".png", append_datetime=True)
filename = os.path.join(Storage.TMP_FOLDER, key)
# Accept cookies popup dismiss for ytdlp video
if 'facebook.com' in url:
try:
logger.debug(f'Trying fb click accept cookie popup for {url}')
self.driver.get("http://www.facebook.com")
foo = self.driver.find_element(By.XPATH, "//button[@data-cookiebanner='accept_only_essential_button']")
foo.click()
logger.debug(f'fb click worked')
# linux server needs a sleep otherwise facebook cookie won't have worked and we'll get a popup on next page
time.sleep(2)
except:
logger.warning(f'Failed on fb accept cookies for url {url}')
try:
self.driver.get(url)
time.sleep(6)
except TimeoutException:
logger.info("TimeoutException loading page for screenshot")
self.driver.save_screenshot(filename)
self.storage.upload(filename, key, extra_args={'ACL': 'public-read', 'ContentType': 'image/png'})
return self.storage.get_cdn_url(key)
def get_wacz(self, url):
if not self.browsertrix.enabled:
logger.debug(f"Browsertrix WACZ generation is not enabled, skipping.")
return
if self.is_docker:
# TODO: figure out support for browsertrix in docker
# see: https://github.com/bellingcat/auto-archiver/issues/66
logger.warning(f"Browsertrix WACZ is not yet supported when using DOCKER.")
return
logger.debug(f"getting wacz for {url}")
key = self._get_key_from_url(url, ".wacz", append_datetime=True)
collection = re.sub('[^0-9a-zA-Z]+', '', key.replace(".wacz", ""))
browsertrix_home = os.path.join(os.getcwd(), "browsertrix-tmp")
cmd = [
"docker", "run",
"--rm", # delete container once it has completed running
"-v", f"{browsertrix_home}:/crawls/",
# "-it", # this leads to "the input device is not a TTY"
"webrecorder/browsertrix-crawler", "crawl",
"--url", url,
"--scopeType", "page",
"--generateWACZ",
"--text",
"--collection", collection,
"--behaviors", "autoscroll,autoplay,autofetch,siteSpecific",
"--behaviorTimeout", str(self.browsertrix.timeout_seconds),
"--timeout", str(self.browsertrix.timeout_seconds)
]
if not os.path.isdir(browsertrix_home):
os.mkdir(browsertrix_home)
if self.browsertrix.profile:
shutil.copyfile(self.browsertrix.profile, os.path.join(browsertrix_home, "profile.tar.gz"))
cmd.extend(["--profile", "/crawls/profile.tar.gz"])
try:
logger.info(f"Running browsertrix-crawler: {' '.join(cmd)}")
subprocess.run(cmd, check=True)
except Exception as e:
logger.error(f"WACZ generation failed: {e}")
return
filename = os.path.join(browsertrix_home, "collections", collection, f"{collection}.wacz")
# do not crash if upload fails
try:
self.storage.upload(filename, key, extra_args={
'ACL': 'public-read', 'ContentType': 'application/zip'})
except FileNotFoundError as e:
logger.warning(f"Unable to locate and upload WACZ {filename=}, {key=}")
# clean up the local browsertrix files
try:
shutil.rmtree(browsertrix_home)
except PermissionError:
logger.warn(f"Unable to clean up browsertrix-crawler files in {browsertrix_home}")
return self.storage.get_cdn_url(key)
def get_thumbnails(self, filename, key, duration=None):
thumbnails_folder = os.path.splitext(filename)[0] + os.path.sep
key_folder = key.split('.')[0] + os.path.sep
mkdir_if_not_exists(thumbnails_folder)
fps = 0.5
if duration is not None:
duration = float(duration)
if duration < 60:
fps = 10.0 / duration
elif duration < 120:
fps = 20.0 / duration
else:
fps = 40.0 / duration
stream = ffmpeg.input(filename)
stream = ffmpeg.filter(stream, 'fps', fps=fps).filter('scale', 512, -1)
stream.output(thumbnails_folder + 'out%d.jpg').run()
thumbnails = os.listdir(thumbnails_folder)
cdn_urls = []
for fname in thumbnails:
if fname[-3:] == 'jpg':
thumbnail_filename = thumbnails_folder + fname
key = os.path.join(key_folder, fname)
self.storage.upload(thumbnail_filename, key)
cdn_url = self.storage.get_cdn_url(key)
cdn_urls.append(cdn_url)
if len(cdn_urls) == 0:
return ('', '')
key_thumb = cdn_urls[int(len(cdn_urls) * 0.1)]
index_page = f'''<html><head><title>{filename}</title><meta charset="UTF-8"></head>
<body>'''
for t in cdn_urls:
index_page += f'<img src="{t}" />'
index_page += f"</body></html>"
index_fname = thumbnails_folder + 'index.html'
with open(index_fname, 'w') as f:
f.write(index_page)
thumb_index = key_folder + 'index.html'
self.storage.upload(index_fname, thumb_index, extra_args={
'ACL': 'public-read', 'ContentType': 'text/html'})
shutil.rmtree(thumbnails_folder)
thumb_index_cdn_url = self.storage.get_cdn_url(thumb_index)
return (key_thumb, thumb_index_cdn_url)
def signal_retry_in(self, min_seconds=1800, max_seconds=7200, **kwargs):
"""
sets state to retry in random between (min_seconds, max_seconds)
"""
now = datetime.datetime.now().timestamp()
retry_at = int(now + randrange(min_seconds, max_seconds))
logger.debug(f"signaling {retry_at=}")
return ArchiveResult(status=f'retrying at {retry_at}', **kwargs)
def is_retry(status):
return re.search(Archiver.retry_regex, status) is not None
def should_retry_from_status(status):
"""
checks status against message in signal_retry_in
returns true if enough time has elapsed, false otherwise
"""
match = re.search(Archiver.retry_regex, status)
if match:
retry_at = int(match.group(1))
now = datetime.datetime.now().timestamp()
should_retry = now >= retry_at
logger.debug(f"{should_retry=} since {now=} and {retry_at=}")
return should_retry
return False
def remove_retry(status):
"""
transforms the status from retry into something else
"""
new_status = re.sub(Archiver.retry_regex, "failed: too many retries", status, 0)
logger.debug(f"removing retry message at {status=}, got {new_status=}")
return new_status

View File

@@ -0,0 +1,140 @@
import re, os, shutil, html, traceback
import instaloader # https://instaloader.github.io/as-module.html
from loguru import logger
from .base_archiver import Archiver, ArchiveResult
from configs import Config
from storages import Storage
class InstagramArchiver(Archiver):
"""
Uses Instaloader to download either a post (inc images, videos, text) or as much as possible from a profile (posts, stories, highlights, )
"""
name = "instagram"
DOWNLOAD_FOLDER = "instaloader"
# NB: post should be tested before profile
# https://regex101.com/r/MGPquX/1
post_pattern = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/(?:p|reel)\/(\w+)")
# https://regex101.com/r/6Wbsxa/1
profile_pattern = re.compile(r"(?:(?:http|https):\/\/)?(?:www.)?(?:instagram.com|instagr.am|instagr.com)\/(\w+)")
def __init__(self, storage: Storage, config: Config):
super().__init__(storage, config)
self.insta = instaloader.Instaloader(download_geotags=True, download_comments=True, compress_json=False, dirname_pattern=self.DOWNLOAD_FOLDER, filename_pattern="{date_utc}_UTC_{target}__{typename}")
if config.instagram_config:
try:
self.insta.load_session_from_file(config.instagram_config.username, config.instagram_config.session_file)
except Exception as e:
logger.error(f"Unable to login from session file: {e}\n{traceback.format_exc()}")
try:
self.insta.login(config.instagram_config.username, config.instagram_config.
password)
#TODO: wait for this issue to be fixed https://github.com/instaloader/instaloader/issues/1758
self.insta.save_session_to_file(config.instagram_config.session_file)
except Exception as e2:
logger.error(f"Unable to finish login (retrying from file): {e2}\n{traceback.format_exc()}")
def download(self, url, check_if_exists=False):
post_matches = self.post_pattern.findall(url)
profile_matches = self.profile_pattern.findall(url)
# return if not a valid instagram link
if not len(post_matches) and not len(profile_matches):
return
# check if already uploaded
key = self.get_html_key(url)
if check_if_exists and self.storage.exists(key):
# only s3 storage supports storage.exists as not implemented on gd
cdn_url = self.storage.get_cdn_url(key)
screenshot = self.get_screenshot(url)
wacz = self.get_wacz(url)
return ArchiveResult(status='already archived', cdn_url=cdn_url, screenshot=screenshot, wacz=wacz)
try:
# process if post
if len(post_matches):
return self.download_post(url, post_matches[0])
# process if profile
if len(profile_matches):
return self.download_profile(url, profile_matches[0])
finally:
shutil.rmtree(self.DOWNLOAD_FOLDER, ignore_errors=True)
def download_post(self, url, post_id):
logger.debug(f"Instagram {post_id=} detected in {url=}")
post = instaloader.Post.from_shortcode(self.insta.context, post_id)
if self.insta.download_post(post, target=post.owner_username):
return self.upload_downloaded_content(url, post.title, post._asdict(), post.date)
def download_profile(self, url, username):
# gets posts, posts where username is tagged, igtv postss, stories, and highlights
logger.debug(f"Instagram {username=} detected in {url=}")
profile = instaloader.Profile.from_username(self.insta.context, username)
try:
for post in profile.get_posts():
try: self.insta.download_post(post, target=f"profile_post_{post.owner_username}")
except Exception as e: logger.error(f"Failed to download post: {post.shortcode}: {e}")
except Exception as e: logger.error(f"Failed profile.get_posts: {e}")
try:
for post in profile.get_tagged_posts():
try: self.insta.download_post(post, target=f"tagged_post_{post.owner_username}")
except Exception as e: logger.error(f"Failed to download tagged post: {post.shortcode}: {e}")
except Exception as e: logger.error(f"Failed profile.get_tagged_posts: {e}")
try:
for post in profile.get_igtv_posts():
try: self.insta.download_post(post, target=f"igtv_post_{post.owner_username}")
except Exception as e: logger.error(f"Failed to download igtv post: {post.shortcode}: {e}")
except Exception as e: logger.error(f"Failed profile.get_igtv_posts: {e}")
try:
for story in self.insta.get_stories([profile.userid]):
for item in story.get_items():
try: self.insta.download_storyitem(item, target=f"story_item_{story.owner_username}")
except Exception as e: logger.error(f"Failed to download story item: {item}: {e}")
except Exception as e: logger.error(f"Failed get_stories: {e}")
try:
for highlight in self.insta.get_highlights(profile.userid):
for item in highlight.get_items():
try: self.insta.download_storyitem(item, target=f"highlight_item_{highlight.owner_username}")
except Exception as e: logger.error(f"Failed to download highlight item: {item}: {e}")
except Exception as e: logger.error(f"Failed get_highlights: {e}")
return self.upload_downloaded_content(url, f"@{username}", profile._asdict(), None)
def upload_downloaded_content(self, url, title, content, date):
status = "success"
try:
uploaded_media = []
for f in os.listdir(self.DOWNLOAD_FOLDER):
if os.path.isfile((filename := os.path.join(self.DOWNLOAD_FOLDER, f))):
key = self.get_key(filename)
self.storage.upload(filename, key)
hash = self.get_hash(filename)
cdn_url = self.storage.get_cdn_url(key)
uploaded_media.append({'cdn_url': cdn_url, 'key': key, 'hash': hash})
assert len(uploaded_media) > 1, "No uploaded media found"
uploaded_media.sort(key=lambda m:m["key"], reverse=True)
page_cdn, page_hash, _ = self.generate_media_page_html(url, uploaded_media, html.escape(str(content)))
except Exception as e:
logger.error(f"Could not fetch instagram post {url} due to: {e}")
status = "error"
finally:
shutil.rmtree(self.DOWNLOAD_FOLDER, ignore_errors=True)
if status == "success":
screenshot = self.get_screenshot(url)
wacz = self.get_wacz(url)
return ArchiveResult(status=status, cdn_url=page_cdn, title=title, timestamp=date, hash=page_hash, screenshot=screenshot, wacz=wacz)

View File

@@ -0,0 +1,89 @@
import os, requests, re
import html
from bs4 import BeautifulSoup
from loguru import logger
from .base_archiver import Archiver, ArchiveResult
from storages import Storage
class TelegramArchiver(Archiver):
name = "telegram"
def download(self, url, check_if_exists=False):
# detect URLs that we definitely cannot handle
if 't.me' != self.get_netloc(url):
return False
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
}
status = "success"
original_url = url
# TODO: check if we can do this more resilient to variable URLs
if url[-8:] != "?embed=1":
url += "?embed=1"
screenshot = self.get_screenshot(url)
wacz = self.get_wacz(url)
t = requests.get(url, headers=headers)
s = BeautifulSoup(t.content, 'html.parser')
video = s.find("video")
if video is None:
logger.warning("could not find video")
image_tags = s.find_all(class_="js-message_photo")
images = []
for im in image_tags:
urls = [u.replace("'", "") for u in re.findall(r'url\((.*?)\)', im['style'])]
images += urls
page_cdn, page_hash, thumbnail = self.generate_media_page(images, url, html.escape(str(t.content)))
time_elements = s.find_all('time')
timestamp = time_elements[0].get('datetime') if len(time_elements) else None
return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=timestamp, wacz=wacz)
video_url = video.get('src')
video_id = video_url.split('/')[-1].split('?')[0]
key = self.get_key(video_id)
filename = os.path.join(Storage.TMP_FOLDER, key)
if check_if_exists and self.storage.exists(key):
status = 'already archived'
v = requests.get(video_url, headers=headers)
with open(filename, 'wb') as f:
f.write(v.content)
if status != 'already archived':
self.storage.upload(filename, key)
hash = self.get_hash(filename)
# extract duration from HTML
try:
duration = s.find_all('time')[0].contents[0]
if ':' in duration:
duration = float(duration.split(
':')[0]) * 60 + float(duration.split(':')[1])
else:
duration = float(duration)
except:
duration = ""
# process thumbnails
key_thumb, thumb_index = self.get_thumbnails(
filename, key, duration=duration)
os.remove(filename)
cdn_url = self.storage.get_cdn_url(key)
return ArchiveResult(status=status, cdn_url=cdn_url, thumbnail=key_thumb, thumbnail_index=thumb_index,
duration=duration, title=original_url, timestamp=s.find_all('time')[1].get('datetime'), hash=hash, screenshot=screenshot, wacz=wacz)

View File

@@ -0,0 +1,125 @@
import os, re, html
from loguru import logger
from telethon.sync import TelegramClient
from telethon.errors import ChannelInvalidError
from storages import Storage
from .base_archiver import Archiver, ArchiveResult
from configs import Config
from utils import getattr_or
class TelethonArchiver(Archiver):
name = "telethon"
link_pattern = re.compile(r"https:\/\/t\.me(\/c){0,1}\/(.+)\/(\d+)")
def __init__(self, storage: Storage, config: Config):
super().__init__(storage, config)
if config.telegram_config:
c = config.telegram_config
self.client = TelegramClient("./anon", c.api_id, c.api_hash)
self.bot_token = c.bot_token
def _get_media_posts_in_group(self, chat, original_post, max_amp=10):
"""
Searches for Telegram posts that are part of the same group of uploads
The search is conducted around the id of the original post with an amplitude
of `max_amp` both ways
Returns a list of [post] where each post has media and is in the same grouped_id
"""
if getattr_or(original_post, "grouped_id") is None:
return [original_post] if getattr_or(original_post, "media") else []
search_ids = [i for i in range(original_post.id - max_amp, original_post.id + max_amp + 1)]
posts = self.client.get_messages(chat, ids=search_ids)
media = []
for post in posts:
if post is not None and post.grouped_id == original_post.grouped_id and post.media is not None:
media.append(post)
return media
def download(self, url, check_if_exists=False):
if not hasattr(self, "client"):
logger.warning('Missing Telethon config')
return False
# detect URLs that we definitely cannot handle
matches = self.link_pattern.findall(url)
if not len(matches):
return False
status = "success"
# app will ask (stall for user input!) for phone number and auth code if anon.session not found
with self.client.start(bot_token=self.bot_token):
matches = list(matches[0])
chat, post_id = matches[1], matches[2]
post_id = int(post_id)
try:
post = self.client.get_messages(chat, ids=post_id)
except ValueError as e:
logger.error(f"Could not fetch telegram {url} possibly it's private: {e}")
return False
except ChannelInvalidError as e:
logger.error(f"Could not fetch telegram {url}. This error can be fixed if you setup a bot_token in addition to api_id and api_hash: {e}")
return False
if post is None: return False
media_posts = self._get_media_posts_in_group(chat, post)
logger.debug(f'got {len(media_posts)=} for {url=}')
screenshot = self.get_screenshot(url)
wacz = self.get_wacz(url)
if len(media_posts) > 0:
key = self.get_html_key(url)
if check_if_exists and self.storage.exists(key):
# only s3 storage supports storage.exists as not implemented on gd
cdn_url = self.storage.get_cdn_url(key)
return ArchiveResult(status='already archived', cdn_url=cdn_url, title=post.message, timestamp=post.date, screenshot=screenshot, wacz=wacz)
key_thumb, thumb_index = None, None
group_id = post.grouped_id if post.grouped_id is not None else post.id
uploaded_media = []
message = post.message
for mp in media_posts:
if len(mp.message) > len(message): message = mp.message
# media can also be in entities
if mp.entities:
other_media_urls = [e.url for e in mp.entities if hasattr(e, "url") and e.url and self._guess_file_type(e.url) in ["video", "image"]]
logger.debug(f"Got {len(other_media_urls)} other medial urls from {mp.id=}: {other_media_urls}")
for om_url in other_media_urls:
filename = os.path.join(Storage.TMP_FOLDER, f'{chat}_{group_id}_{self._get_key_from_url(om_url)}')
self.download_from_url(om_url, filename)
key = filename.split(Storage.TMP_FOLDER)[1]
self.storage.upload(filename, key)
hash = self.get_hash(filename)
cdn_url = self.storage.get_cdn_url(key)
uploaded_media.append({'cdn_url': cdn_url, 'key': key, 'hash': hash})
filename_dest = os.path.join(Storage.TMP_FOLDER, f'{chat}_{group_id}', str(mp.id))
filename = self.client.download_media(mp.media, filename_dest)
if not filename:
logger.debug(f"Empty media found, skipping {str(mp)=}")
continue
key = filename.split(Storage.TMP_FOLDER)[1]
self.storage.upload(filename, key)
hash = self.get_hash(filename)
cdn_url = self.storage.get_cdn_url(key)
uploaded_media.append({'cdn_url': cdn_url, 'key': key, 'hash': hash})
if key_thumb is None:
key_thumb, thumb_index = self.get_thumbnails(filename, key)
os.remove(filename)
page_cdn, page_hash, _ = self.generate_media_page_html(url, uploaded_media, html.escape(str(post)))
return ArchiveResult(status=status, cdn_url=page_cdn, title=message, timestamp=post.date, hash=page_hash, screenshot=screenshot, thumbnail=key_thumb, thumbnail_index=thumb_index, wacz=wacz)
page_cdn, page_hash, _ = self.generate_media_page_html(url, [], html.escape(str(post)))
return ArchiveResult(status=status, cdn_url=page_cdn, title=post.message, timestamp=getattr_or(post, "date"), hash=page_hash, screenshot=screenshot, wacz=wacz)

View File

@@ -0,0 +1,72 @@
import os, traceback
import tiktok_downloader
from loguru import logger
from .base_archiver import Archiver, ArchiveResult
from storages import Storage
class TiktokArchiver(Archiver):
name = "tiktok"
def download(self, url, check_if_exists=False):
if 'tiktok.com' not in url:
return False
status = 'success'
try:
info = tiktok_downloader.info_post(url)
key = self.get_key(f'{info.id}.mp4')
filename = os.path.join(Storage.TMP_FOLDER, key)
logger.info(f'found video {key=}')
if check_if_exists and self.storage.exists(key):
status = 'already archived'
media = tiktok_downloader.snaptik(url).get_media()
if len(media) <= 0:
if status == 'already archived':
return ArchiveResult(status='Could not download media, but already archived', cdn_url=self.storage.get_cdn_url(key))
else:
return ArchiveResult(status='Could not download media')
logger.info(f'downloading video {key=}')
media[0].download(filename)
if status != 'already archived':
logger.info(f'uploading video {key=}')
self.storage.upload(filename, key)
try:
key_thumb, thumb_index = self.get_thumbnails(filename, key, duration=info.duration)
except Exception as e:
logger.error(e)
key_thumb = ''
thumb_index = 'error creating thumbnails'
hash = self.get_hash(filename)
screenshot = self.get_screenshot(url)
wacz = self.get_wacz(url)
try: os.remove(filename)
except FileNotFoundError:
logger.info(f'tmp file not found thus not deleted {filename}')
cdn_url = self.storage.get_cdn_url(key)
timestamp = info.create.isoformat() if hasattr(info, "create") else None
return ArchiveResult(status=status, cdn_url=cdn_url, thumbnail=key_thumb,
thumbnail_index=thumb_index, duration=getattr(info, "duration", 0), title=getattr(info, "caption", ""),
timestamp=timestamp, hash=hash, screenshot=screenshot, wacz=wacz)
except tiktok_downloader.Except.InvalidUrl as e:
status = 'Invalid URL'
logger.warning(f'Invalid URL on {url} {e}\n{traceback.format_exc()}')
return ArchiveResult(status=status)
except:
error = traceback.format_exc()
status = 'Other Tiktok error: ' + str(error)
logger.warning(f'Other Tiktok error' + str(error))
return ArchiveResult(status=status)

View File

@@ -0,0 +1,75 @@
import json
from datetime import datetime
from loguru import logger
from pytwitter import Api
from storages.base_storage import Storage
from configs import Config
from .base_archiver import ArchiveResult
from .twitter_archiver import TwitterArchiver
class TwitterApiArchiver(TwitterArchiver):
name = "twitter_api"
def __init__(self, storage: Storage, config: Config):
super().__init__(storage, config)
c = config.twitter_config
if c.bearer_token:
self.api = Api(bearer_token=c.bearer_token)
elif c.consumer_key and c.consumer_secret and c.access_token and c.access_secret:
self.api = Api(
consumer_key=c.consumer_key, consumer_secret=c.consumer_secret, access_token=c.access_token, access_secret=c.access_secret)
def download(self, url, check_if_exists=False):
if not hasattr(self, "api"):
logger.warning('Missing Twitter API config')
return False
username, tweet_id = self.get_username_tweet_id(url)
if not username: return False
tweet = self.api.get_tweet(tweet_id, expansions=["attachments.media_keys"], media_fields=["type", "duration_ms", "url", "variants"], tweet_fields=["attachments", "author_id", "created_at", "entities", "id", "text", "possibly_sensitive"])
timestamp = datetime.strptime(tweet.data.created_at, "%Y-%m-%dT%H:%M:%S.%fZ")
# check if exists
key = self.get_html_key(url)
if check_if_exists and self.storage.exists(key):
# only s3 storage supports storage.exists as not implemented on gd
cdn_url = self.storage.get_cdn_url(key)
screenshot = self.get_screenshot(url)
return ArchiveResult(status='already archived', cdn_url=cdn_url, title=tweet.data.text, timestamp=timestamp, screenshot=screenshot)
urls = []
if tweet.includes:
for m in tweet.includes.media:
if m.url:
urls.append(m.url)
elif hasattr(m, "variants"):
var_url = self.choose_variant(m.variants)
urls.append(var_url)
else:
urls.append(None) # will trigger error
for u in urls:
if u is None:
logger.debug(f"Should not have gotten None url for {tweet.includes.media=} so going to download_alternative in twitter_archiver")
return self.download_alternative(url, tweet_id)
logger.debug(f"found {urls=}")
output = json.dumps({
"id": tweet.data.id,
"text": tweet.data.text,
"created_at": tweet.data.created_at,
"author_id": tweet.data.author_id,
"geo": tweet.data.geo,
"lang": tweet.data.lang,
"media": urls
}, ensure_ascii=False, indent=4)
screenshot = self.get_screenshot(url)
wacz = self.get_wacz(url)
page_cdn, page_hash, thumbnail = self.generate_media_page(urls, url, output)
return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=timestamp, title=tweet.data.text, wacz=wacz)

View File

@@ -0,0 +1,105 @@
import html, re, requests
from datetime import datetime
from loguru import logger
from snscrape.modules.twitter import TwitterTweetScraper, Video, Gif, Photo
from .base_archiver import Archiver, ArchiveResult
class TwitterArchiver(Archiver):
"""
This Twitter Archiver uses unofficial scraping methods, and it works as
an alternative to TwitterApiArchiver when no API credentials are provided.
"""
name = "twitter"
link_pattern = re.compile(r"twitter.com\/(?:\#!\/)?(\w+)\/status(?:es)?\/(\d+)")
def get_username_tweet_id(self, url):
# detect URLs that we definitely cannot handle
matches = self.link_pattern.findall(url)
if not len(matches): return False, False
username, tweet_id = matches[0] # only one URL supported
logger.debug(f"Found {username=} and {tweet_id=} in {url=}")
return username, tweet_id
def download(self, url, check_if_exists=False):
username, tweet_id = self.get_username_tweet_id(url)
if not username: return False
scr = TwitterTweetScraper(tweet_id)
try:
tweet = next(scr.get_items())
except Exception as ex:
logger.warning(f"can't get tweet: {type(ex).__name__} occurred. args: {ex.args}")
return self.download_alternative(url, tweet_id)
if tweet.media is None:
logger.debug(f'No media found, archiving tweet text only')
screenshot = self.get_screenshot(url)
wacz = self.get_wacz(url)
page_cdn, page_hash, _ = self.generate_media_page_html(url, [], html.escape(tweet.json()))
return ArchiveResult(status="success", cdn_url=page_cdn, title=tweet.content, timestamp=tweet.date, hash=page_hash, screenshot=screenshot, wacz=wacz)
urls = []
for media in tweet.media:
if type(media) == Video:
variant = max(
[v for v in media.variants if v.bitrate], key=lambda v: v.bitrate)
urls.append(variant.url)
elif type(media) == Gif:
urls.append(media.variants[0].url)
elif type(media) == Photo:
urls.append(media.fullUrl.replace('name=large', 'name=orig'))
else:
logger.warning(f"Could not get media URL of {media}")
page_cdn, page_hash, thumbnail = self.generate_media_page(urls, url, tweet.json())
screenshot = self.get_screenshot(url)
wacz = self.get_wacz(url)
return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=tweet.date, title=tweet.content, wacz=wacz)
def download_alternative(self, url, tweet_id):
# https://stackoverflow.com/a/71867055/6196010
logger.debug(f"Trying twitter hack for {url=}")
hack_url = f"https://cdn.syndication.twimg.com/tweet?id={tweet_id}"
r = requests.get(hack_url)
if r.status_code != 200: return False
tweet = r.json()
urls = []
for p in tweet["photos"]:
urls.append(p["url"])
# 1 tweet has 1 video max
if "video" in tweet:
v = tweet["video"]
urls.append(self.choose_variant(v.get("variants", [])))
logger.debug(f"Twitter hack got {urls=}")
timestamp = datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
screenshot = self.get_screenshot(url)
wacz = self.get_wacz(url)
page_cdn, page_hash, thumbnail = self.generate_media_page(urls, url, r.text)
return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, timestamp=timestamp, title=tweet["text"], wacz=wacz)
def choose_variant(self, variants):
# choosing the highest quality possible
variant, width, height = None, 0, 0
for var in variants:
if var["type"] == "video/mp4":
width_height = re.search(r"\/(\d+)x(\d+)\/", var["src"])
if width_height:
w, h = int(width_height[1]), int(width_height[2])
if w > width or h > height:
width, height = w, h
variant = var.get("src", variant)
else:
variant = var.get("src") if not variant else variant
return variant

View File

@@ -0,0 +1,74 @@
import re, json, mimetypes, os
from loguru import logger
from vk_url_scraper import VkScraper, DateTimeEncoder
from storages import Storage
from .base_archiver import Archiver, ArchiveResult
from configs import Config
class VkArchiver(Archiver):
""""
VK videos are handled by YTDownloader, this archiver gets posts text and images.
Currently only works for /wall posts
"""
name = "vk"
wall_pattern = re.compile(r"(wall.{0,1}\d+_\d+)")
photo_pattern = re.compile(r"(photo.{0,1}\d+_\d+)")
def __init__(self, storage: Storage, config: Config):
super().__init__(storage, config)
if config.vk_config != None:
self.vks = VkScraper(config.vk_config.username, config.vk_config.password)
def download(self, url, check_if_exists=False):
if not hasattr(self, "vks") or self.vks is None:
logger.debug("VK archiver was not supplied with credentials.")
return False
key = self.get_html_key(url)
# if check_if_exists and self.storage.exists(key):
# screenshot = self.get_screenshot(url)
# cdn_url = self.storage.get_cdn_url(key)
# return ArchiveResult(status="already archived", cdn_url=cdn_url, screenshot=screenshot)
results = self.vks.scrape(url) # some urls can contain multiple wall/photo/... parts and all will be fetched
if len(results) == 0:
return False
def dump_payload(p): return json.dumps(p, ensure_ascii=False, indent=4, cls=DateTimeEncoder)
textual_output = ""
title, datetime = results[0]["text"], results[0]["datetime"]
urls_found = []
for res in results:
textual_output += f"id: {res['id']}<br>time utc: {res['datetime']}<br>text: {res['text']}<br>payload: {dump_payload(res['payload'])}<br><hr/><br>"
title = res["text"] if len(title) == 0 else title
datetime = res["datetime"] if not datetime else datetime
for attachments in res["attachments"].values():
urls_found.extend(attachments)
# we don't call generate_media_page which downloads urls because it cannot download vk video urls
thumbnail, thumbnail_index = None, None
uploaded_media = []
filenames = self.vks.download_media(results, Storage.TMP_FOLDER)
for filename in filenames:
key = self.get_key(filename)
self.storage.upload(filename, key)
hash = self.get_hash(filename)
cdn_url = self.storage.get_cdn_url(key)
try:
_type = mimetypes.guess_type(filename)[0].split("/")[0]
if _type == "image" and thumbnail is None:
thumbnail = cdn_url
if _type == "video" and (thumbnail is None or thumbnail_index is None):
thumbnail, thumbnail_index = self.get_thumbnails(filename, key)
except Exception as e:
logger.warning(f"failed to get thumb for {filename=} with {e=}")
uploaded_media.append({'cdn_url': cdn_url, 'key': key, 'hash': hash})
page_cdn, page_hash, thumbnail = self.generate_media_page_html(url, uploaded_media, textual_output, thumbnail=thumbnail)
# # if multiple wall/photos/videos are present the screenshot will only grab the 1st
screenshot = self.get_screenshot(url)
wacz = self.get_wacz(url)
return ArchiveResult(status="success", cdn_url=page_cdn, screenshot=screenshot, hash=page_hash, thumbnail=thumbnail, thumbnail_index=thumbnail_index, timestamp=datetime, title=title, wacz=wacz)

View File

@@ -0,0 +1,89 @@
import time, requests
from loguru import logger
from bs4 import BeautifulSoup
from storages import Storage
from .base_archiver import Archiver, ArchiveResult
from configs import Config
class WaybackArchiver(Archiver):
"""
This archiver could implement a check_if_exists by going to "https://web.archive.org/web/{url}"
but that might not be desirable since the webpage might have been archived a long time ago and thus have changed
"""
name = "wayback"
def __init__(self, storage: Storage, config: Config):
super(WaybackArchiver, self).__init__(storage, config)
self.config = config.wayback_config
self.seen_urls = {}
def download(self, url, check_if_exists=False):
if self.config is None:
logger.error('Missing Wayback config')
return False
if check_if_exists:
if url in self.seen_urls: return self.seen_urls[url]
screenshot = self.get_screenshot(url)
wacz = self.get_wacz(url)
logger.debug(f"POSTing {url=} to web.archive.org")
ia_headers = {
"Accept": "application/json",
"Authorization": f"LOW {self.config.key}:{self.config.secret}"
}
r = requests.post('https://web.archive.org/save/', headers=ia_headers, data={'url': url})
if r.status_code != 200:
logger.warning(f"Internet archive failed with status of {r.status_code}")
return ArchiveResult(status="Internet archive failed", screenshot=screenshot, wacz=wacz)
if 'job_id' not in r.json() and 'message' in r.json():
return self.custom_retry(r.json(), screenshot=screenshot, wacz=wacz)
job_id = r.json()['job_id']
logger.debug(f"GETting status for {job_id=} on {url=}")
status_r = requests.get(f'https://web.archive.org/save/status/{job_id}', headers=ia_headers)
retries = 0
# TODO: make the job queue parallel -> consider propagation of results back to sheet though
# wait 90-120 seconds for the archive job to finish
while (status_r.status_code != 200 or status_r.json()['status'] == 'pending') and retries < 30:
time.sleep(3)
try:
logger.debug(f"GETting status for {job_id=} on {url=} [{retries=}]")
status_r = requests.get(f'https://web.archive.org/save/status/{job_id}', headers=ia_headers)
except:
time.sleep(1)
retries += 1
if status_r.status_code != 200:
return ArchiveResult(status=f"Internet archive failed: check https://web.archive.org/save/status/{job_id}", screenshot=screenshot, wacz=wacz)
status_json = status_r.json()
if status_json['status'] != 'success':
return self.custom_retry(status_json, screenshot=screenshot, wacz=wacz)
archive_url = f"https://web.archive.org/web/{status_json['timestamp']}/{status_json['original_url']}"
try:
req = requests.get(archive_url)
parsed = BeautifulSoup(req.content, 'html.parser')
title = parsed.find_all('title')[0].text
if title == 'Wayback Machine':
title = 'Could not get title'
except:
title = "Could not get title"
self.seen_urls[url] = ArchiveResult(status='success', cdn_url=archive_url, title=title, screenshot=screenshot, wacz=wacz)
return self.seen_urls[url]
def custom_retry(self, json_data, **kwargs):
logger.warning(f"Internet archive failed json \n {json_data}")
if "please try again" in str(json_data).lower():
return self.signal_retry_in(**kwargs)
if "this host has been already captured" in str(json_data).lower():
return self.signal_retry_in(**kwargs, min_seconds=86400, max_seconds=129600) # 24h to 36h later
return ArchiveResult(status=f"Internet archive failed: {json_data}", **kwargs)

View File

@@ -0,0 +1,118 @@
import os, datetime
import yt_dlp
from loguru import logger
from .base_archiver import Archiver, ArchiveResult
from storages import Storage
from configs import Config
class YoutubeDLArchiver(Archiver):
name = "youtube_dl"
ydl_opts = {'outtmpl': f'{Storage.TMP_FOLDER}%(id)s.%(ext)s', 'quiet': False}
def __init__(self, storage: Storage, config: Config):
super().__init__(storage, config)
self.fb_cookie = config.facebook_cookie
def download(self, url, check_if_exists=False):
netloc = self.get_netloc(url)
if netloc in ['facebook.com', 'www.facebook.com'] and self.fb_cookie:
logger.debug('Using Facebook cookie')
yt_dlp.utils.std_headers['cookie'] = self.fb_cookie
ydl = yt_dlp.YoutubeDL(YoutubeDLArchiver.ydl_opts)
cdn_url = None
status = 'success'
try:
info = ydl.extract_info(url, download=False)
except yt_dlp.utils.DownloadError as e:
logger.debug(f'No video - Youtube normal control flow: {e}')
return False
except Exception as e:
logger.debug(f'ytdlp exception which is normal for example a facebook page with images only will cause a IndexError: list index out of range. Exception here is: \n {e}')
return False
if info.get('is_live', False):
logger.warning("Live streaming media, not archiving now")
return ArchiveResult(status="Streaming media")
if 'twitter.com' in netloc:
if 'https://twitter.com/' in info['webpage_url']:
logger.info('Found https://twitter.com/ in the download url from Twitter')
else:
logger.info('Found a linked video probably in a link in a tweet - not getting that video as there may be images in the tweet')
return False
if check_if_exists:
if 'entries' in info:
if len(info['entries']) > 1:
logger.warning('YoutubeDLArchiver succeeded but cannot archive channels or pages with multiple videos')
return False
elif len(info['entries']) == 0:
logger.warning(
'YoutubeDLArchiver succeeded but did not find video')
return False
filename = ydl.prepare_filename(info['entries'][0])
else:
filename = ydl.prepare_filename(info)
key = self.get_key(filename)
if self.storage.exists(key):
status = 'already archived'
cdn_url = self.storage.get_cdn_url(key)
# sometimes this results in a different filename, so do this again
info = ydl.extract_info(url, download=True)
# TODO: add support for multiple videos
if 'entries' in info:
if len(info['entries']) > 1:
logger.warning(
'YoutubeDLArchiver cannot archive channels or pages with multiple videos')
return False
else:
info = info['entries'][0]
filename = ydl.prepare_filename(info)
if not os.path.exists(filename):
filename = filename.split('.')[0] + '.mkv'
if status != 'already archived':
key = self.get_key(filename)
self.storage.upload(filename, key)
# filename ='tmp/sDE-qZdi8p8.webm'
# key ='SM0022/youtube_dl_sDE-qZdi8p8.webm'
cdn_url = self.storage.get_cdn_url(key)
hash = self.get_hash(filename)
screenshot = self.get_screenshot(url)
wacz = self.get_wacz(url)
# get duration
duration = info.get('duration')
# get thumbnails
try:
key_thumb, thumb_index = self.get_thumbnails(filename, key, duration=duration)
except:
key_thumb = ''
thumb_index = 'Could not generate thumbnails'
os.remove(filename)
timestamp = None
if 'timestamp' in info and info['timestamp'] is not None:
timestamp = datetime.datetime.utcfromtimestamp(info['timestamp']).replace(tzinfo=datetime.timezone.utc).isoformat()
elif 'upload_date' in info and info['upload_date'] is not None:
timestamp = datetime.datetime.strptime(info['upload_date'], '%Y%m%d').replace(tzinfo=datetime.timezone.utc)
return ArchiveResult(status=status, cdn_url=cdn_url, thumbnail=key_thumb, thumbnail_index=thumb_index, duration=duration,
title=info['title'] if 'title' in info else None, timestamp=timestamp, hash=hash, screenshot=screenshot, wacz=wacz)

171
src/auto_archive.py Normal file
View File

@@ -0,0 +1,171 @@
import os, datetime, traceback, random, tempfile
from loguru import logger
from slugify import slugify
from urllib.parse import quote
from archivers import TelethonArchiver, TelegramArchiver, TiktokArchiver, YoutubeDLArchiver, TwitterArchiver, TwitterApiArchiver, VkArchiver, WaybackArchiver, InstagramArchiver, ArchiveResult, Archiver
from utils import GWorksheet, expand_url
from configs import Config
from storages import Storage
random.seed()
def update_sheet(gw, row, url, result: ArchiveResult):
cell_updates = []
row_values = gw.get_row(row)
def batch_if_valid(col, val, final_value=None):
final_value = final_value or val
if val and gw.col_exists(col) and gw.get_cell(row_values, col) == '':
cell_updates.append((row, col, final_value))
cell_updates.append((row, 'status', result.status))
batch_if_valid('archive', result.cdn_url)
batch_if_valid('date', True, datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat())
batch_if_valid('thumbnail', result.thumbnail, f'=IMAGE("{result.thumbnail}")')
batch_if_valid('thumbnail_index', result.thumbnail_index)
batch_if_valid('title', result.title)
batch_if_valid('duration', result.duration, str(result.duration))
batch_if_valid('screenshot', result.screenshot)
batch_if_valid('hash', result.hash)
if result.wacz is not None:
batch_if_valid('wacz', result.wacz)
batch_if_valid('replaywebpage', f'https://replayweb.page/?source={quote(result.wacz)}#view=pages&url={quote(url)}')
if result.timestamp is not None:
if type(result.timestamp) == int:
timestamp_string = datetime.datetime.fromtimestamp(result.timestamp).replace(tzinfo=datetime.timezone.utc).isoformat()
elif type(result.timestamp) == str:
timestamp_string = result.timestamp
else:
timestamp_string = result.timestamp.isoformat()
batch_if_valid('timestamp', timestamp_string)
gw.batch_set_cell(cell_updates)
def missing_required_columns(gw: GWorksheet):
missing = False
for required_col in ['url', 'status']:
if not gw.col_exists(required_col):
logger.warning(f'Required column for {required_col}: "{gw.columns[required_col]}" not found, skipping worksheet {gw.wks.title}')
missing = True
return missing
def should_process_sheet(c, sheet_name):
if len(c.worksheet_allow) and sheet_name not in c.worksheet_allow:
# ALLOW rules exist AND sheet name not explicitly allowed
return False
if len(c.worksheet_block) and sheet_name in c.worksheet_block:
# BLOCK rules exist AND sheet name is blocked
return False
return True
def process_sheet(c: Config):
sh = c.gsheets_client.open(c.sheet)
# loop through worksheets to check
for ii, wks in enumerate(sh.worksheets()):
if not should_process_sheet(c, wks.title):
logger.info(f'Ignoring worksheet "{wks.title}" due to allow/block configurations')
continue
logger.info(f'Opening worksheet {ii=}: {wks.title=} {c.header=}')
gw = GWorksheet(wks, header_row=c.header, columns=c.column_names)
if missing_required_columns(gw): continue
# archives will default to being in a folder 'doc_name/worksheet_name'
default_folder = os.path.join(slugify(c.sheet), slugify(wks.title))
c.set_folder(default_folder)
storage = c.get_storage()
# loop through rows in worksheet
for row in range(1 + c.header, gw.count_rows() + 1):
url = gw.get_cell(row, 'url')
original_status = gw.get_cell(row, 'status')
status = gw.get_cell(row, 'status', fresh=original_status in ['', None] and url != '')
is_retry = False
if url == '' or status not in ['', None]:
is_retry = Archiver.should_retry_from_status(status)
if not is_retry: continue
# All checks done - archival process starts here
try:
gw.set_cell(row, 'status', 'Archive in progress')
url = expand_url(url)
c.set_folder(gw.get_cell_or_default(row, 'folder', default_folder, when_empty_use_default=True))
# make a new driver so each spreadsheet row is idempotent
c.recreate_webdriver()
# order matters, first to succeed excludes remaining
active_archivers = [
TelethonArchiver(storage, c),
TiktokArchiver(storage, c),
TwitterApiArchiver(storage, c),
InstagramArchiver(storage, c),
YoutubeDLArchiver(storage, c),
TelegramArchiver(storage, c),
TwitterArchiver(storage, c),
VkArchiver(storage, c),
WaybackArchiver(storage, c)
]
for archiver in active_archivers:
logger.debug(f'Trying {archiver} on {row=}')
try:
result = archiver.download(url, check_if_exists=c.check_if_exists)
except KeyboardInterrupt as e: raise e # so the higher level catch can catch it
except Exception as e:
result = False
logger.error(f'Got unexpected error in row {row} with {archiver.name} for {url=}: {e}\n{traceback.format_exc()}')
if result:
success = result.status in ['success', 'already archived']
result.status = f"{archiver.name}: {result.status}"
if success:
logger.success(f'{archiver.name} succeeded on {row=}, {url=}')
break
# only 1 retry possible for now
if is_retry and Archiver.is_retry(result.status):
result.status = Archiver.remove_retry(result.status)
logger.warning(f'{archiver.name} did not succeed on {row=}, final status: {result.status}')
if result:
update_sheet(gw, row, url, result)
else:
gw.set_cell(row, 'status', 'failed: no archiver')
except KeyboardInterrupt:
# catches keyboard interruptions to do a clean exit
logger.warning(f"caught interrupt on {row=}, {url=}")
gw.set_cell(row, 'status', '')
c.destroy_webdriver()
exit()
except Exception as e:
logger.error(f'Got unexpected error in row {row} for {url=}: {e}\n{traceback.format_exc()}')
gw.set_cell(row, 'status', 'failed: unexpected error (see logs)')
logger.success(f'Finished worksheet {wks.title}')
@logger.catch
def main():
c = Config()
c.parse()
logger.info(f'Opening document {c.sheet} for header {c.header}')
with tempfile.TemporaryDirectory(dir="./") as tmpdir:
Storage.TMP_FOLDER = tmpdir
process_sheet(c)
c.destroy_webdriver()
if __name__ == '__main__':
main()

29
src/auto_auto_archive.py Normal file
View File

@@ -0,0 +1,29 @@
import tempfile
import auto_archive
from loguru import logger
from configs import Config
from storages import Storage
def main():
c = Config()
c.parse()
logger.info(f'Opening document {c.sheet} to look for sheet names to archive')
gc = c.gsheets_client
sh = gc.open(c.sheet)
wks = sh.get_worksheet(0)
values = wks.get_all_values()
with tempfile.TemporaryDirectory(dir="./") as tmpdir:
Storage.TMP_FOLDER = tmpdir
for i in range(11, len(values)):
c.sheet = values[i][0]
logger.info(f"Processing {c.sheet}")
auto_archive.process_sheet(c)
c.destroy_webdriver()
if __name__ == "__main__":
main()

0
src/cli.py Normal file
View File

7
src/configs/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
from .config import Config
from .selenium_config import SeleniumConfig
from .telethon_config import TelethonConfig
from .wayback_config import WaybackConfig
from .twitter_api_config import TwitterApiConfig
from .vk_config import VkConfig
from .instagram_config import InstagramConfig

View File

@@ -0,0 +1,7 @@
from dataclasses import dataclass
@dataclass
class BrowsertrixConfig:
enabled: bool
profile: str
timeout_seconds: str

306
src/configs/config.py Normal file
View File

@@ -0,0 +1,306 @@
import argparse, yaml, json, os
import gspread
from loguru import logger
from selenium import webdriver
from dataclasses import asdict
from selenium.common.exceptions import TimeoutException
from utils import GWorksheet, getattr_or
from .wayback_config import WaybackConfig
from .telethon_config import TelethonConfig
from .selenium_config import SeleniumConfig
from .vk_config import VkConfig
from .twitter_api_config import TwitterApiConfig
from .browsertrix_config import BrowsertrixConfig
from .instagram_config import InstagramConfig
from storages import S3Config, S3Storage, GDStorage, GDConfig, LocalStorage, LocalConfig
class Config:
"""
Controls the current execution parameters and manages API configurations
Usage:
c = Config() # initializes the argument parser
c.parse() # parses the values and initializes the Services and API clients
# you can then access the Services and APIs like 'c.s3_config'
All the configurations available as cmd line options, when included, will
override the configurations in the config.yaml file.
Configurations are split between:
1. "secrets" containing API keys for generating services - not kept in memory
2. "execution" containing specific execution configurations
"""
AVAILABLE_STORAGES = {"s3", "gd", "local"}
def __init__(self):
self.parser = self.get_argument_parser()
self.folder = ""
self.is_docker = bool(os.environ.get("IS_DOCKER", 0))
def parse(self):
self.args = self.parser.parse_args()
logger.success(f'Command line arguments parsed successfully')
self.config_file = self.args.config
self.read_config_yaml()
logger.info(f'APIs and Services initialized:\n{self}')
def read_config_yaml(self):
with open(self.config_file, "r", encoding="utf-8") as inf:
self.config = yaml.safe_load(inf)
# ----------------------EXECUTION - execution configurations
execution = self.config.get("execution", {})
self.sheet = getattr_or(self.args, "sheet", execution.get("sheet"))
assert self.sheet is not None, "'sheet' must be provided either through command line or configuration file"
def ensure_set(l):
# always returns a set of strings, can receive a set or a string
l = l if isinstance(l, list) else [l]
return set([x for x in l if isinstance(x, str) and len(x) > 0])
self.worksheet_allow = ensure_set(execution.get("worksheet_allow", []))
self.worksheet_block = ensure_set(execution.get("worksheet_block", []))
self.header = int(getattr_or(self.args, "header", execution.get("header", 1)))
self.storage = getattr_or(self.args, "storage", execution.get("storage", "s3"))
self.save_logs = getattr(self.args, "save_logs") or execution.get("save_logs", False)
if self.save_logs:
self.set_log_files()
self.check_if_exists = getattr(self.args, "check_if_exists") or execution.get("check_if_exists", False)
# Column names come from config and can be overwritten by CMD
# in the end all are considered as lower case
config_column_names = execution.get("column_names", {})
self.column_names = {}
for k in GWorksheet.COLUMN_NAMES.keys():
self.column_names[k] = getattr_or(self.args, k, config_column_names.get(k, GWorksheet.COLUMN_NAMES[k])).lower()
# selenium driver
selenium_configs = execution.get("selenium", {})
self.selenium_config = SeleniumConfig(
timeout_seconds=int(selenium_configs.get("timeout_seconds", SeleniumConfig.timeout_seconds)),
window_width=int(selenium_configs.get("window_width", SeleniumConfig.window_width)),
window_height=int(selenium_configs.get("window_height", SeleniumConfig.window_height))
)
self.webdriver = "not initialized"
# browsertrix config
browsertrix_configs = execution.get("browsertrix", {})
if len(browsertrix_profile := browsertrix_configs.get("profile", "")):
browsertrix_profile = os.path.abspath(browsertrix_profile)
self.browsertrix_config = BrowsertrixConfig(
enabled=bool(browsertrix_configs.get("enabled", False)),
profile=browsertrix_profile,
timeout_seconds=browsertrix_configs.get("timeout_seconds", "90")
)
self.hash_algorithm = execution.get("hash_algorithm", "SHA-256")
# ---------------------- SECRETS - APIs and service configurations
secrets = self.config.get("secrets", {})
# assert selected storage credentials exist
for key, name in [("s3", "s3"), ("gd", "google_drive"), ("local", "local")]:
assert self.storage != key or name in secrets, f"selected storage '{key}' requires secrets.'{name}' in {self.config_file}"
# google sheets config
self.gsheets_client = gspread.service_account(
filename=secrets.get("google_sheets", {}).get("service_account", 'service_account.json')
)
# facebook config
self.facebook_cookie = secrets.get("facebook", {}).get("cookie", None)
# s3 config
if "s3" in secrets:
s3 = secrets["s3"]
self.s3_config = S3Config(
bucket=s3["bucket"],
region=s3["region"],
key=s3["key"],
secret=s3["secret"],
endpoint_url=s3.get("endpoint_url", S3Config.endpoint_url),
cdn_url=s3.get("cdn_url", S3Config.cdn_url),
key_path=s3.get("key_path", S3Config.key_path),
private=getattr_or(self.args, "s3-private", s3.get("private", S3Config.private))
)
# GDrive config
if "google_drive" in secrets:
gd = secrets["google_drive"]
self.gd_config = GDConfig(
root_folder_id=gd.get("root_folder_id"),
oauth_token_filename=gd.get("oauth_token_filename"),
service_account=gd.get("service_account", GDConfig.service_account)
)
if "local" in secrets:
self.local_config = LocalConfig(
save_to=secrets["local"].get("save_to", LocalConfig.save_to),
)
# wayback machine config
if "wayback" in secrets:
self.wayback_config = WaybackConfig(
key=secrets["wayback"]["key"],
secret=secrets["wayback"]["secret"],
)
else:
self.wayback_config = None
logger.debug(f"'wayback' key not present in the {self.config_file=}")
# telethon config
if "telegram" in secrets:
self.telegram_config = TelethonConfig(
api_id=secrets["telegram"]["api_id"],
api_hash=secrets["telegram"]["api_hash"],
bot_token=secrets["telegram"].get("bot_token", None),
session_file=secrets["telegram"].get("session_file", "./anon")
)
else:
self.telegram_config = None
logger.debug(f"'telegram' key not present in the {self.config_file=}")
# twitter config
if "twitter" in secrets:
self.twitter_config = TwitterApiConfig(
bearer_token=secrets["twitter"].get("bearer_token"),
consumer_key=secrets["twitter"].get("consumer_key"),
consumer_secret=secrets["twitter"].get("consumer_secret"),
access_token=secrets["twitter"].get("access_token"),
access_secret=secrets["twitter"].get("access_secret"),
)
else:
self.twitter_config = None
logger.debug(f"'twitter' key not present in the {self.config_file=}")
# vk config
if "vk" in secrets:
self.vk_config = VkConfig(
username=secrets["vk"]["username"],
password=secrets["vk"]["password"],
session_file=secrets["vk"].get("session_file", "./vk_config.v2.json")
)
else:
self.vk_config = None
logger.debug(f"'vk' key not present in the {self.config_file=}")
# instagram config
if "instagram" in secrets:
self.instagram_config = InstagramConfig(
username=secrets["instagram"]["username"],
password=secrets["instagram"]["password"],
session_file=secrets["instagram"].get("session_file", "instaloader.session")
)
else:
self.instagram_config = None
logger.debug(f"'instagram' key not present in the {self.config_file=}")
del self.config["secrets"] # delete to prevent leaks
def set_log_files(self):
# called only when config.execution.save_logs=true
logger.add("logs/1trace.log", level="TRACE")
logger.add("logs/2info.log", level="INFO")
logger.add("logs/3success.log", level="SUCCESS")
logger.add("logs/4warning.log", level="WARNING")
logger.add("logs/5error.log", level="ERROR")
def get_argument_parser(self):
"""
Creates the CMD line arguments. 'python auto_archive.py --help'
"""
parser = argparse.ArgumentParser(description='Automatically archive social media posts, videos, and images from a Google Sheets document. The command line arguments will always override the configurations in the provided YAML config file (--config), only some high-level options are allowed via the command line and the YAML configuration file is the preferred method. The sheet must have the "url" and "status" for the archiver to work. ')
parser.add_argument('--config', action='store', dest='config', help='the filename of the YAML configuration file (defaults to \'config.yaml\')', default='config.yaml')
parser.add_argument('--storage', action='store', dest='storage', help='which storage to use [execution.storage in config.yaml]', choices=Config.AVAILABLE_STORAGES)
parser.add_argument('--sheet', action='store', dest='sheet', help='the name of the google sheets document [execution.sheet in config.yaml]')
parser.add_argument('--header', action='store', dest='header', help='1-based index for the header row [execution.header in config.yaml]')
parser.add_argument('--check-if-exists', action='store_true', dest='check_if_exists', help='when possible checks if the URL has been archived before and does not archive the same URL twice [exceution.check_if_exists]')
parser.add_argument('--save-logs', action='store_true', dest='save_logs', help='creates or appends execution logs to files logs/LEVEL.log [exceution.save_logs]')
parser.add_argument('--s3-private', action='store_true', help='Store content without public access permission (only for storage=s3) [secrets.s3.private in config.yaml]')
for k, v in GWorksheet.COLUMN_NAMES.items():
help = f"the name of the column to FILL WITH {k} (default='{v}')"
if k in ["url", "folder"]:
help = f"the name of the column to READ {k} FROM (default='{v}')"
parser.add_argument(f'--col-{k}', action='store', dest=k, help=help)
return parser
def set_folder(self, folder):
"""
update the folder in each of the storages
"""
self.folder = folder
logger.info(f"setting folder to {folder}")
# s3
if hasattr(self, "s3_config"): self.s3_config.folder = folder
if hasattr(self, "s3_storage"): self.s3_storage.folder = folder
# gdrive
if hasattr(self, "gd_config"): self.gd_config.folder = folder
if hasattr(self, "gd_storage"): self.gd_storage.folder = folder
# local
if hasattr(self, "local_config"): self.local_config.folder = folder
if hasattr(self, "local_storage"): self.local_storage.folder = folder
def get_storage(self):
"""
returns the configured type of storage, creating if needed
"""
if self.storage == "s3":
self.s3_storage = getattr_or(self, "s3_storage", S3Storage(self.s3_config))
return self.s3_storage
elif self.storage == "gd":
self.gd_storage = getattr_or(self, "gd_storage", GDStorage(self.gd_config))
return self.gd_storage
elif self.storage == "local":
self.local_storage = getattr_or(self, "local_storage", LocalStorage(self.local_config))
return self.local_storage
raise f"storage {self.storage} not implemented, available: {Config.AVAILABLE_STORAGES}"
def destroy_webdriver(self):
if self.webdriver is not None and type(self.webdriver) != str:
self.webdriver.close()
self.webdriver.quit()
del self.webdriver
def recreate_webdriver(self):
options = webdriver.FirefoxOptions()
options.headless = True
options.set_preference('network.protocol-handler.external.tg', False)
try:
new_webdriver = webdriver.Firefox(options=options)
# only destroy if creation is successful
self.destroy_webdriver()
self.webdriver = new_webdriver
self.webdriver.set_window_size(self.selenium_config.window_width,
self.selenium_config.window_height)
self.webdriver.set_page_load_timeout(self.selenium_config.timeout_seconds)
except TimeoutException as e:
logger.error(f"failed to get new webdriver, possibly due to insufficient system resources or timeout settings: {e}")
def __str__(self) -> str:
return json.dumps({
"config_file": self.config_file,
"sheet": self.sheet,
"worksheet_allow": list(self.worksheet_allow),
"worksheet_block": list(self.worksheet_block),
"storage": self.storage,
"header": self.header,
"check_if_exists": self.check_if_exists,
"hash_algorithm": self.hash_algorithm,
"browsertrix_config": asdict(self.browsertrix_config),
"save_logs": self.save_logs,
"selenium_config": asdict(self.selenium_config),
"selenium_webdriver": self.webdriver != None,
"s3_config": hasattr(self, "s3_config"),
"s3_private": getattr_or(getattr(self, "s3_config", {}), "private", None),
"gd_config": hasattr(self, "gd_config"),
"local_config": hasattr(self, "local_config"),
"wayback_config": self.wayback_config != None,
"telegram_config": self.telegram_config != None,
"twitter_config": self.twitter_config != None,
"vk_config": self.vk_config != None,
"gsheets_client": self.gsheets_client != None,
"column_names": self.column_names,
}, ensure_ascii=False, indent=4)

View File

@@ -0,0 +1,9 @@
from dataclasses import dataclass
@dataclass
class InstagramConfig:
username: str
password: str
session_file: str

View File

@@ -0,0 +1,8 @@
from dataclasses import dataclass
@dataclass
class SeleniumConfig:
timeout_seconds: int = 120
window_width: int = 1400
window_height: int = 2000

View File

@@ -0,0 +1,10 @@
from dataclasses import dataclass
@dataclass
class TelethonConfig:
api_id: str
api_hash: str
bot_token: str
session_file: str

View File

@@ -0,0 +1,11 @@
from dataclasses import dataclass
@dataclass
class TwitterApiConfig:
bearer_token: str
consumer_key: str
consumer_secret: str
access_token: str
access_secret: str

9
src/configs/vk_config.py Normal file
View File

@@ -0,0 +1,9 @@
from dataclasses import dataclass
@dataclass
class VkConfig:
username: str
password: str
session_file: str

View File

@@ -0,0 +1,8 @@
from dataclasses import dataclass
@dataclass
class WaybackConfig:
key: str
secret: str

5
src/storages/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
# we need to explicitly expose the available imports here
from .base_storage import Storage
from .local_storage import LocalStorage, LocalConfig
from .s3_storage import S3Config, S3Storage
from .gd_storage import GDConfig, GDStorage

View File

@@ -0,0 +1,24 @@
from loguru import logger
from abc import ABC, abstractmethod
from pathlib import Path
class Storage(ABC):
TMP_FOLDER = "tmp/"
@abstractmethod
def __init__(self, config): pass
@abstractmethod
def get_cdn_url(self, key): pass
@abstractmethod
def exists(self, key): pass
@abstractmethod
def uploadf(self, file, key, **kwargs): pass
def upload(self, filename: str, key: str, **kwargs):
logger.debug(f'[{self.__class__.__name__}] uploading file {filename} with key {key}')
with open(filename, 'rb') as f:
self.uploadf(f, key, **kwargs)

185
src/storages/gd_storage.py Normal file
View File

@@ -0,0 +1,185 @@
import os, time
from loguru import logger
from .base_storage import Storage
from dataclasses import dataclass
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
@dataclass
class GDConfig:
root_folder_id: str
oauth_token_filename: str
service_account: str = "service_account.json"
folder: str = "default"
class GDStorage(Storage):
def __init__(self, config: GDConfig):
self.folder = config.folder
self.root_folder_id = config.root_folder_id
SCOPES=['https://www.googleapis.com/auth/drive']
token_file = config.oauth_token_filename
if token_file is not None:
"""
Tokens are refreshed after 1 hour
however keep working for 7 days (tbc)
so as long as the job doesn't last for 7 days
then this method of refreshing only once per run will work
see this link for details on the token
https://davemateer.com/2022/04/28/google-drive-with-python#tokens
"""
logger.debug(f'Using GD OAuth token {token_file}')
creds = Credentials.from_authorized_user_file(token_file, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
logger.debug('Requesting new GD OAuth token')
creds.refresh(Request())
else:
raise Exception("Problem with creds - create the token again")
# Save the credentials for the next run
with open(token_file, 'w') as token:
logger.debug('Saving new GD OAuth token')
token.write(creds.to_json())
else:
logger.debug('GD OAuth Token valid')
else:
gd_service_account = config.service_account
logger.debug(f'Using GD Service Account {gd_service_account}')
creds = service_account.Credentials.from_service_account_file(gd_service_account, scopes=SCOPES)
self.service = build('drive', 'v3', credentials=creds)
def get_cdn_url(self, key):
"""
only support files saved in a folder for GD
S3 supports folder and all stored in the root
"""
key = self.clean_key(key)
full_name = os.path.join(self.folder, key)
parent_id, folder_id = self.root_folder_id, None
path_parts = full_name.split(os.path.sep)
filename = path_parts[-1]
logger.info(f"looking for folders for {path_parts[0:-1]} before uploading {filename=}")
for folder in path_parts[0:-1]:
folder_id = self._get_id_from_parent_and_name(parent_id, folder, use_mime_type=True, raise_on_missing=True)
parent_id = folder_id
# get id of file inside folder (or sub folder)
file_id = self._get_id_from_parent_and_name(folder_id, filename)
return f"https://drive.google.com/file/d/{file_id}/view?usp=sharing"
def exists(self, key):
try:
self.get_cdn_url(key)
return True
except: return False
def uploadf(self, file: str, key: str, **_kwargs):
"""
1. for each sub-folder in the path check if exists or create
2. upload file to root_id/other_paths.../filename
"""
key = self.clean_key(key)
full_name = os.path.join(self.folder, key)
parent_id, upload_to = self.root_folder_id, None
path_parts = full_name.split(os.path.sep)
filename = path_parts[-1]
logger.info(f"checking folders {path_parts[0:-1]} exist (or creating) before uploading {filename=}")
for folder in path_parts[0:-1]:
upload_to = self._get_id_from_parent_and_name(parent_id, folder, use_mime_type=True, raise_on_missing=False)
if upload_to is None:
upload_to = self._mkdir(folder, parent_id)
parent_id = upload_to
# upload file to gd
logger.debug(f'uploading {filename=} to folder id {upload_to}')
file_metadata = {
'name': [filename],
'parents': [upload_to]
}
media = MediaFileUpload(file, resumable=True)
gd_file = self.service.files().create(body=file_metadata, media_body=media, fields='id').execute()
logger.debug(f'uploadf: uploaded file {gd_file["id"]} succesfully in folder={upload_to}')
def upload(self, filename: str, key: str, **kwargs):
# GD only requires the filename not a file reader
self.uploadf(filename, key, **kwargs)
def clean_key(self, key):
# GDrive does not work well with trailing forward slashes and some keys come with that
if key.startswith('/'):
logger.debug(f'Found and fixed a leading "/" for {key=}')
return key[1:]
return key
# gets the Drive folderID if it is there
def _get_id_from_parent_and_name(self, parent_id: str, name: str, retries: int = 1, sleep_seconds: int = 10, use_mime_type: bool = False, raise_on_missing: bool = True, use_cache=False):
"""
Retrieves the id of a folder or file from its @name and the @parent_id folder
Optionally does multiple @retries and sleeps @sleep_seconds between them
If @use_mime_type will restrict search to "mimeType='application/vnd.google-apps.folder'"
If @raise_on_missing will throw error when not found, or returns None
Will remember previous calls to avoid duplication if @use_cache - might not have all edge cases tested, so use at own risk
Returns the id of the file or folder from its name as a string
"""
# cache logic
if use_cache:
self.api_cache = getattr(self, "api_cache", {})
cache_key = f"{parent_id}_{name}_{use_mime_type}"
if cache_key in self.api_cache:
logger.debug(f"cache hit for {cache_key=}")
return self.api_cache[cache_key]
# API logic
debug_header: str = f"[searching {name=} in {parent_id=}]"
query_string = f"'{parent_id}' in parents and name = '{name}' and trashed = false "
if use_mime_type:
query_string += f" and mimeType='application/vnd.google-apps.folder' "
for attempt in range(retries):
results = self.service.files().list(
q=query_string,
spaces='drive', # ie not appDataFolder or photos
fields='files(id, name)'
).execute()
items = results.get('files', [])
if len(items) > 0:
logger.debug(f"{debug_header} found {len(items)} matches, returning last of {','.join([i['id'] for i in items])}")
_id = items[-1]['id']
if use_cache: self.api_cache[cache_key] = _id
return _id
else:
logger.debug(f'{debug_header} not found, attempt {attempt+1}/{retries}.')
if attempt < retries - 1:
logger.debug(f'sleeping for {sleep_seconds} second(s)')
time.sleep(sleep_seconds)
if raise_on_missing:
raise ValueError(f'{debug_header} not found after {retries} attempt(s)')
return None
def _mkdir(self, name: str, parent_id: str):
"""
Creates a new GDrive folder @name inside folder @parent_id
Returns id of the created folder
"""
logger.debug(f'Creating new folder with {name=} inside {parent_id=}')
file_metadata = {
'name': [name],
'mimeType': 'application/vnd.google-apps.folder',
'parents': [parent_id]
}
gd_folder = self.service.files().create(body=file_metadata, fields='id').execute()
return gd_folder.get('id')

View File

@@ -0,0 +1,31 @@
import os
from dataclasses import dataclass
from .base_storage import Storage
from utils import mkdir_if_not_exists
@dataclass
class LocalConfig:
folder: str = ""
save_to: str = "./"
class LocalStorage(Storage):
def __init__(self, config:LocalConfig):
self.folder = config.folder
self.save_to = config.save_to
mkdir_if_not_exists(self.save_to)
def get_cdn_url(self, key):
full_path = os.path.join(self.save_to, self.folder, key)
mkdir_if_not_exists(os.path.join(*full_path.split(os.path.sep)[0:-1]))
return os.path.abspath(full_path)
def exists(self, key):
return os.path.isfile(self.get_cdn_url(key))
def uploadf(self, file, key, **kwargs):
path = self.get_cdn_url(key)
with open(path, "wb") as outf:
outf.write(file.read())

View File

@@ -0,0 +1,80 @@
import uuid, os, mimetypes
from dataclasses import dataclass
import boto3
from botocore.errorfactory import ClientError
from .base_storage import Storage
from dataclasses import dataclass
from loguru import logger
@dataclass
class S3Config:
bucket: str
region: str
key: str
secret: str
folder: str = ""
endpoint_url: str = "https://{region}.digitaloceanspaces.com"
cdn_url: str = "https://{bucket}.{region}.cdn.digitaloceanspaces.com/{key}"
private: bool = False
key_path: str = "default" # 'default' uses full naming, 'random' uses generated uuid
class S3Storage(Storage):
def __init__(self, config: S3Config):
self.bucket = config.bucket
self.region = config.region
self.folder = config.folder
self.private = config.private
self.cdn_url = config.cdn_url
self.key_path = config.key_path
self.key_dict = {}
self.s3 = boto3.client(
's3',
region_name=config.region,
endpoint_url=config.endpoint_url.format(region=config.region),
aws_access_key_id=config.key,
aws_secret_access_key=config.secret
)
def _get_path(self, key):
"""
Depends on the self.key_path configuration:
* random - assigns a random UUID which can be used in conjunction with "private=false" to have unguessable documents publicly available -> self.folder/randomUUID
* default -> defaults to self.folder/key
"""
# defaults to /key
final_key = key
if self.key_path == "random":
if key not in self.key_dict:
ext = os.path.splitext(key)[1]
self.key_dict[key] = f"{str(uuid.uuid4())}{ext}"
final_key = self.key_dict[key]
return os.path.join(self.folder, final_key)
def get_cdn_url(self, key):
return self.cdn_url.format(bucket=self.bucket, region=self.region, key=self._get_path(key))
def exists(self, key):
try:
self.s3.head_object(Bucket=self.bucket, Key=self._get_path(key))
return True
except ClientError:
return False
def uploadf(self, file, key, **kwargs):
extra_args = kwargs.get("extra_args", {})
if not self.private and 'ACL' not in extra_args:
extra_args['ACL'] = 'public-read'
if 'ContentType' not in extra_args:
try:
extra_args['ContentType'] = mimetypes.guess_type(key)[0]
except Exception as e:
logger.error(f"Unable to get mimetype for {key=}, error: {e}")
self.s3.upload_fileobj(file, Bucket=self.bucket, Key=self._get_path(key), ExtraArgs=extra_args)

3
src/utils/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
# we need to explicitly expose the available imports here
from .gworksheet import *
from .misc import *

109
src/utils/gworksheet.py Normal file
View File

@@ -0,0 +1,109 @@
from gspread import utils
class GWorksheet:
"""
This class makes read/write operations to the a worksheet easier.
It can read the headers from a custom row number, but the row references
should always include the offset of the header.
eg: if header=4, row 5 will be the first with data.
"""
COLUMN_NAMES = {
'url': 'link',
'status': 'archive status',
'folder': 'destination folder',
'archive': 'archive location',
'date': 'archive date',
'thumbnail': 'thumbnail',
'thumbnail_index': 'thumbnail index',
'timestamp': 'upload timestamp',
'title': 'upload title',
'duration': 'duration',
'screenshot': 'screenshot',
'hash': 'hash',
'wacz': 'wacz',
'replaywebpage': 'replaywebpage',
}
def __init__(self, worksheet, columns=COLUMN_NAMES, header_row=1):
self.wks = worksheet
self.columns = columns
self.values = self.wks.get_values()
if len(self.values) > 0:
self.headers = [v.lower() for v in self.values[header_row - 1]]
else:
self.headers = []
def _check_col_exists(self, col: str):
if col not in self.columns:
raise Exception(f'Column {col} is not in the configured column names: {self.columns.keys()}')
def _col_index(self, col: str):
self._check_col_exists(col)
return self.headers.index(self.columns[col])
def col_exists(self, col: str):
self._check_col_exists(col)
return self.columns[col] in self.headers
def count_rows(self):
return len(self.values)
def get_row(self, row: int):
# row is 1-based
return self.values[row - 1]
def get_values(self):
return self.values
def get_cell(self, row, col: str, fresh=False):
"""
returns the cell value from (row, col),
where row can be an index (1-based) OR list of values
as received from self.get_row(row)
if fresh=True, the sheet is queried again for this cell
"""
col_index = self._col_index(col)
if fresh:
return self.wks.cell(row, col_index + 1).value
if type(row) == int:
row = self.get_row(row)
if col_index >= len(row):
return ''
return row[col_index]
def get_cell_or_default(self, row, col: str, default: str = None, fresh=False, when_empty_use_default=True):
"""
return self.get_cell or default value on error (eg: column is missing)
"""
try:
val = self.get_cell(row, col, fresh)
if when_empty_use_default and val.strip() == "":
return default
return val
except:
return default
def set_cell(self, row: int, col: str, val):
# row is 1-based
col_index = self._col_index(col) + 1
self.wks.update_cell(row, col_index, val)
def batch_set_cell(self, cell_updates):
"""
receives a list of [(row:int, col:str, val)] and batch updates it, the parameters are the same as in the self.set_cell() method
"""
cell_updates = [
{
'range': self.to_a1(row, col),
'values': [[val]]
}
for row, col, val in cell_updates
]
self.wks.batch_update(cell_updates, value_input_option='USER_ENTERED')
def to_a1(self, row: int, col: str):
# row is 1-based
return utils.rowcol_to_a1(row, self._col_index(col) + 1)

31
src/utils/misc.py Normal file
View File

@@ -0,0 +1,31 @@
import os, json, requests
from datetime import datetime
from loguru import logger
def mkdir_if_not_exists(folder):
if not os.path.exists(folder):
os.makedirs(folder)
def expand_url(url):
# expand short URL links
if 'https://t.co/' in url:
try:
r = requests.get(url)
logger.debug(f'Expanded url {url} to {r.url}')
return r.url
except:
logger.error(f'Failed to expand url {url}')
return url
def getattr_or(o: object, prop: str, default=None):
try:
res = getattr(o, prop)
if res is None: raise
return res
except:
return default