Ruff format with defaults.

This commit is contained in:
erinhmclark
2025-03-10 18:44:54 +00:00
parent cbb0414e5f
commit 85abe1837a
155 changed files with 2539 additions and 1908 deletions

View File

@@ -1,7 +1,8 @@
""" Auto Archiver Utilities. """
"""Auto Archiver Utilities."""
# we need to explicitly expose the available imports here
from .misc import *
from .webdriver import Webdriver
# handy utils from ytdlp
from yt_dlp.utils import (clean_html, traverse_obj, strip_or_none, url_or_none)
from yt_dlp.utils import clean_html, traverse_obj, strip_or_none, url_or_none

View File

@@ -16,20 +16,21 @@ def mkdir_if_not_exists(folder):
def expand_url(url):
# expand short URL links
if 'https://t.co/' in url:
if "https://t.co/" in url:
try:
r = requests.get(url)
logger.debug(f'Expanded url {url} to {r.url}')
logger.debug(f"Expanded url {url} to {r.url}")
return r.url
except:
logger.error(f'Failed to expand url {url}')
logger.error(f"Failed to expand url {url}")
return url
def getattr_or(o: object, prop: str, default=None):
try:
res = getattr(o, prop)
if res is None: raise
if res is None:
raise
return res
except:
return default
@@ -61,18 +62,19 @@ def random_str(length: int = 32) -> str:
return str(uuid.uuid4()).replace("-", "")[:length]
def calculate_file_hash(filename: str, hash_algo = hashlib.sha256, chunksize: int = 16000000) -> str:
def calculate_file_hash(filename: str, hash_algo=hashlib.sha256, chunksize: int = 16000000) -> str:
hash = hash_algo()
with open(filename, "rb") as f:
while True:
buf = f.read(chunksize)
if not buf: break
if not buf:
break
hash.update(buf)
return hash.hexdigest()
def get_datetime_from_str(dt_str: str, fmt: str | None = None, dayfirst=True) -> datetime | None:
""" parse a datetime string with option of passing a specific format
"""parse a datetime string with option of passing a specific format
Args:
dt_str: the datetime string to parse
@@ -88,19 +90,24 @@ def get_datetime_from_str(dt_str: str, fmt: str | None = None, dayfirst=True) ->
def get_timestamp(ts, utc=True, iso=True, dayfirst=True) -> str | datetime | None:
""" Consistent parsing of timestamps.
"""Consistent parsing of timestamps.
Args:
If utc=True, the timezone is set to UTC,
if iso=True, the output is an iso string
Use dayfirst to signify between date formats which put the date vs month first:
e.g. DD/MM/YYYY vs MM/DD/YYYY
"""
if not ts: return
"""
if not ts:
return
try:
if isinstance(ts, str): ts = parse_dt(ts, dayfirst=dayfirst)
if isinstance(ts, (int, float)): ts = datetime.fromtimestamp(ts)
if utc: ts = ts.replace(tzinfo=timezone.utc)
if iso: return ts.isoformat()
if isinstance(ts, str):
ts = parse_dt(ts, dayfirst=dayfirst)
if isinstance(ts, (int, float)):
ts = datetime.fromtimestamp(ts)
if utc:
ts = ts.replace(tzinfo=timezone.utc)
if iso:
return ts.isoformat()
return ts
except Exception as e:
logger.error(f"Unable to parse timestamp {ts}: {e}")

View File

@@ -4,8 +4,8 @@ from ipaddress import ip_address
AUTHWALL_URLS = [
re.compile(r"https:\/\/t\.me(\/c)\/(.+)\/(\d+)"), # telegram private channels
re.compile(r"https:\/\/www\.instagram\.com"), # instagram
re.compile(r"https:\/\/t\.me(\/c)\/(.+)\/(\d+)"), # telegram private channels
re.compile(r"https:\/\/www\.instagram\.com"), # instagram
]
@@ -14,17 +14,16 @@ def check_url_or_raise(url: str) -> bool | ValueError:
Blocks localhost, private, reserved, and link-local IPs and all non-http/https schemes.
"""
if not (url.startswith("http://") or url.startswith("https://")):
raise ValueError(f"Invalid URL scheme for url {url}")
parsed = urlparse(url)
if not parsed.hostname:
raise ValueError(f"Invalid URL hostname for url {url}")
if parsed.hostname == "localhost":
raise ValueError(f"Localhost URLs cannot be parsed for security reasons (for url {url})")
if parsed.scheme not in ["http", "https"]:
raise ValueError(f"Invalid URL scheme, only http and https supported (for url {url})")
@@ -32,7 +31,7 @@ def check_url_or_raise(url: str) -> bool | ValueError:
ip = ip_address(parsed.hostname)
except ValueError:
pass
else:
if not ip.is_global:
raise ValueError(f"IP address {ip} is not globally reachable")
@@ -42,18 +41,21 @@ def check_url_or_raise(url: str) -> bool | ValueError:
raise ValueError(f"Link-local IP address {ip} used")
if ip.is_private:
raise ValueError(f"Private IP address {ip} used")
return True
def domain_for_url(url: str) -> str:
"""
SECURITY: parse the domain using urllib to avoid any potential security issues
"""
return urlparse(url).netloc
def clean(url: str) -> str:
return url
def is_auth_wall(url: str) -> bool:
"""
checks if URL is behind an authentication wall meaning steps like wayback, wacz, ... may not work
@@ -64,13 +66,15 @@ def is_auth_wall(url: str) -> bool:
return False
def remove_get_parameters(url: str) -> str:
# http://example.com/file.mp4?t=1 -> http://example.com/file.mp4
# useful for mimetypes to work
parsed_url = urlparse(url)
new_url = urlunparse(parsed_url._replace(query=''))
new_url = urlunparse(parsed_url._replace(query=""))
return new_url
def is_relevant_url(url: str) -> bool:
"""
Detect if a detected media URL is recurring and therefore irrelevant to a specific archive. Useful, for example, for the enumeration of the media files in WARC files which include profile pictures, favicons, etc.
@@ -78,42 +82,59 @@ def is_relevant_url(url: str) -> bool:
clean_url = remove_get_parameters(url)
# favicons
if "favicon" in url: return False
if "favicon" in url:
return False
# ifnore icons
if clean_url.endswith(".ico"): return False
if clean_url.endswith(".ico"):
return False
# ignore SVGs
if remove_get_parameters(url).endswith(".svg"): return False
if remove_get_parameters(url).endswith(".svg"):
return False
# twitter profile pictures
if "twimg.com/profile_images" in url: return False
if "twimg.com" in url and "/default_profile_images" in url: return False
if "twimg.com/profile_images" in url:
return False
if "twimg.com" in url and "/default_profile_images" in url:
return False
# instagram profile pictures
if "https://scontent.cdninstagram.com/" in url and "150x150" in url: return False
if "https://scontent.cdninstagram.com/" in url and "150x150" in url:
return False
# instagram recurring images
if "https://static.cdninstagram.com/rsrc.php/" in url: return False
if "https://static.cdninstagram.com/rsrc.php/" in url:
return False
# telegram
if "https://telegram.org/img/emoji/" in url: return False
if "https://telegram.org/img/emoji/" in url:
return False
# youtube
if "https://www.youtube.com/s/gaming/emoji/" in url: return False
if "https://yt3.ggpht.com" in url and "default-user=" in url: return False
if "https://www.youtube.com/s/search/audio/" in url: return False
if "https://www.youtube.com/s/gaming/emoji/" in url:
return False
if "https://yt3.ggpht.com" in url and "default-user=" in url:
return False
if "https://www.youtube.com/s/search/audio/" in url:
return False
# ok
if " https://ok.ru/res/i/" in url: return False
if " https://ok.ru/res/i/" in url:
return False
# vk
if "https://vk.com/emoji/" in url: return False
if "vk.com/images/" in url: return False
if "vk.com/images/reaction/" in url: return False
if "https://vk.com/emoji/" in url:
return False
if "vk.com/images/" in url:
return False
if "vk.com/images/reaction/" in url:
return False
# wikipedia
if "wikipedia.org/static" in url: return False
if "wikipedia.org/static" in url:
return False
return True
def twitter_best_quality_url(url: str) -> str:
"""
some twitter image URLs point to a less-than best quality

View File

@@ -1,10 +1,11 @@
""" This Webdriver class acts as a context manager for the selenium webdriver. """
"""This Webdriver class acts as a context manager for the selenium webdriver."""
from __future__ import annotations
import os
import time
#import domain_for_url
# import domain_for_url
from urllib.parse import urlparse, urlunparse
from http.cookiejar import MozillaCookieJar
@@ -19,16 +20,15 @@ from loguru import logger
class CookieSettingDriver(webdriver.Firefox):
facebook_accept_cookies: bool
cookies: str
cookiejar: MozillaCookieJar
def __init__(self, cookies, cookiejar, facebook_accept_cookies, *args, **kwargs):
if os.environ.get('RUNNING_IN_DOCKER'):
if os.environ.get("RUNNING_IN_DOCKER"):
# Selenium doesn't support linux-aarch64 driver, we need to set this manually
kwargs['service'] = webdriver.FirefoxService(executable_path='/usr/local/bin/geckodriver')
kwargs["service"] = webdriver.FirefoxService(executable_path="/usr/local/bin/geckodriver")
super(CookieSettingDriver, self).__init__(*args, **kwargs)
self.cookies = cookies
self.cookiejar = cookiejar
@@ -38,42 +38,43 @@ class CookieSettingDriver(webdriver.Firefox):
if self.cookies or self.cookiejar:
# set up the driver to make it not 'cookie averse' (needs a context/URL)
# get the 'robots.txt' file which should be quick and easy
robots_url = urlunparse(urlparse(url)._replace(path='/robots.txt', query='', fragment=''))
robots_url = urlunparse(urlparse(url)._replace(path="/robots.txt", query="", fragment=""))
super(CookieSettingDriver, self).get(robots_url)
if self.cookies:
# an explicit cookie is set for this site, use that first
for cookie in self.cookies.split(";"):
for name, value in cookie.split("="):
self.driver.add_cookie({'name': name, 'value': value})
self.driver.add_cookie({"name": name, "value": value})
elif self.cookiejar:
domain = urlparse(url).netloc.lstrip("www.")
for cookie in self.cookiejar:
if domain in cookie.domain:
try:
self.add_cookie({
'name': cookie.name,
'value': cookie.value,
'path': cookie.path,
'domain': cookie.domain,
'secure': bool(cookie.secure),
'expiry': cookie.expires
})
self.add_cookie(
{
"name": cookie.name,
"value": cookie.value,
"path": cookie.path,
"domain": cookie.domain,
"secure": bool(cookie.secure),
"expiry": cookie.expires,
}
)
except Exception as e:
logger.warning(f"Failed to add cookie to webdriver: {e}")
if self.facebook_accept_cookies:
try:
logger.debug(f'Trying fb click accept cookie popup.')
logger.debug(f"Trying fb click accept cookie popup.")
super(CookieSettingDriver, self).get("http://www.facebook.com")
essential_only = self.find_element(By.XPATH, "//span[contains(text(), 'Decline optional cookies')]")
essential_only.click()
logger.debug(f'fb click worked')
logger.debug(f"fb click worked")
# linux server needs a sleep otherwise facebook cookie won't have worked and we'll get a popup on next page
time.sleep(2)
except Exception as e:
logger.warning(f'Failed on fb accept cookies.', e)
logger.warning(f"Failed on fb accept cookies.", e)
# now get the actual URL
super(CookieSettingDriver, self).get(url)
@@ -87,9 +88,14 @@ class CookieSettingDriver(webdriver.Firefox):
pass
else:
# for all other sites, try and use some common button text to reject/accept cookies
for text in ["Refuse non-essential cookies", "Decline optional cookies", "Reject additional cookies", "Reject all", "Accept all cookies"]:
for text in [
"Refuse non-essential cookies",
"Decline optional cookies",
"Reject additional cookies",
"Reject all",
"Accept all cookies",
]:
try:
xpath = f"//*[contains(translate(text(), 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', 'abcdefghijklmnopqrstuvwxyz'), '{text.lower()}')]"
WebDriverWait(self, 5).until(EC.element_to_be_clickable((By.XPATH, xpath))).click()
@@ -97,11 +103,18 @@ class CookieSettingDriver(webdriver.Firefox):
except selenium_exceptions.WebDriverException:
pass
class Webdriver:
def __init__(self, width: int, height: int, timeout_seconds: int,
facebook_accept_cookies: bool = False, http_proxy: str = "",
print_options: dict = {}, auth: dict = {}) -> webdriver:
def __init__(
self,
width: int,
height: int,
timeout_seconds: int,
facebook_accept_cookies: bool = False,
http_proxy: str = "",
print_options: dict = {},
auth: dict = {},
) -> webdriver:
self.width = width
self.height = height
self.timeout_seconds = timeout_seconds
@@ -116,23 +129,29 @@ class Webdriver:
def __enter__(self) -> webdriver:
options = webdriver.FirefoxOptions()
options.add_argument("--headless")
options.add_argument(f'--proxy-server={self.http_proxy}')
options.set_preference('network.protocol-handler.external.tg', False)
options.add_argument(f"--proxy-server={self.http_proxy}")
options.set_preference("network.protocol-handler.external.tg", False)
# if facebook cookie popup is present, force the browser to English since then it's easier to click the 'Decline optional cookies' option
if self.facebook_accept_cookies:
options.add_argument('--lang=en')
options.add_argument("--lang=en")
try:
self.driver = CookieSettingDriver(cookies=self.auth.get('cookies'), cookiejar=self.auth.get('cookies_jar'),
facebook_accept_cookies=self.facebook_accept_cookies, options=options)
self.driver = CookieSettingDriver(
cookies=self.auth.get("cookies"),
cookiejar=self.auth.get("cookies_jar"),
facebook_accept_cookies=self.facebook_accept_cookies,
options=options,
)
self.driver.set_window_size(self.width, self.height)
self.driver.set_page_load_timeout(self.timeout_seconds)
self.driver.print_options = self.print_options
except selenium_exceptions.TimeoutException as e:
logger.error(f"failed to get new webdriver, possibly due to insufficient system resources or timeout settings: {e}")
logger.error(
f"failed to get new webdriver, possibly due to insufficient system resources or timeout settings: {e}"
)
return self.driver
def __exit__(self, exc_type, exc_val, exc_tb):
self.driver.close()
self.driver.quit()