Files
auto-archiver/src/auto_archiver/utils/misc.py
2025-06-26 17:20:04 +01:00

130 lines
4.0 KiB
Python

import hashlib
import json
import mimetypes
import os
import uuid
from datetime import datetime, timezone
from dateutil.parser import parse as parse_dt
from auto_archiver.utils.custom_logger import logger
def mkdir_if_not_exists(folder):
if not os.path.exists(folder):
os.makedirs(folder)
def getattr_or(o: object, prop: str, default=None):
try:
res = getattr(o, prop)
if res is None:
raise
return res
except Exception:
return default
class DateTimeEncoder(json.JSONEncoder):
# to allow json.dump with datetimes do json.dumps(obj, cls=DateTimeEncoder)
def default(self, o):
if isinstance(o, datetime):
return str(o) # with timezone
return json.JSONEncoder.default(self, o)
def dump_payload(p):
return json.dumps(p, ensure_ascii=False, indent=4, cls=DateTimeEncoder)
def update_nested_dict(dictionary, update_dict):
# takes 2 dicts and overwrites the first with the second only on the changed values
for key, value in update_dict.items():
if key in dictionary and isinstance(value, dict) and isinstance(dictionary[key], dict):
update_nested_dict(dictionary[key], value)
else:
dictionary[key] = value
def random_str(length: int = 32) -> str:
assert length <= 32, "length must be less than 32 as UUID4 is used"
return str(uuid.uuid4()).replace("-", "")[:length]
def calculate_file_hash(filename: str, hash_algo=hashlib.sha256, chunksize: int = 16000000) -> str:
hash = hash_algo()
with open(filename, "rb") as f:
while True:
buf = f.read(chunksize)
if not buf:
break
hash.update(buf)
return hash.hexdigest()
def get_datetime_from_str(dt_str: str, fmt: str | None = None, dayfirst=True) -> datetime | None:
"""parse a datetime string with option of passing a specific format
Args:
dt_str: the datetime string to parse
fmt: the python date format of the datetime string, if None, dateutil.parser.parse is used
dayfirst: Use this to signify between date formats which put the day first, vs the month first:
e.g. DD/MM/YYYY vs MM/DD/YYYY
"""
try:
return datetime.strptime(dt_str, fmt) if fmt else parse_dt(dt_str, dayfirst=dayfirst)
except ValueError as e:
logger.error(f"Unable to parse datestring {dt_str}: {e}")
return None
def get_timestamp(ts, utc=True, iso=True, dayfirst=True) -> str | datetime | None:
"""Consistent parsing of timestamps.
Args:
If utc=True, the timezone is set to UTC,
if iso=True, the output is an iso string
Use dayfirst to signify between date formats which put the date vs month first:
e.g. DD/MM/YYYY vs MM/DD/YYYY
"""
if not ts:
return
try:
if isinstance(ts, str):
ts = parse_dt(ts, dayfirst=dayfirst)
if isinstance(ts, (int, float)):
ts = datetime.fromtimestamp(ts)
if utc:
ts = ts.replace(tzinfo=timezone.utc)
if iso:
return ts.isoformat()
return ts
except Exception as e:
logger.error(f"Unable to parse timestamp {ts}: {e}")
return None
def get_current_timestamp() -> str:
return get_timestamp(datetime.now())
def ydl_entry_to_filename(ydl, entry: dict) -> str:
import yt_dlp
ydl: yt_dlp.YoutubeDL
entry_url = entry.get("url")
filename = ydl.prepare_filename(entry)
if os.path.exists(filename):
return filename
base_filename, _ = os.path.splitext(filename) # '/get/path/to/file' ignore '.ext'
directory = os.path.dirname(base_filename) # '/get/path/to'
basename = os.path.basename(base_filename) # 'file'
for f in os.listdir(directory):
if (
f.startswith(basename)
or (entry_url and os.path.splitext(f)[0] in entry_url)
and "video/" in (mimetypes.guess_type(f)[0] or "")
):
return os.path.join(directory, f)
return False