Ruff fix on src.

This commit is contained in:
erinhmclark
2025-03-10 19:03:45 +00:00
parent 85abe1837a
commit ca44a40b88
23 changed files with 66 additions and 46 deletions

View File

@@ -135,7 +135,7 @@ class GDriveStorage(Storage):
debug_header: str = f"[searching {name=} in {parent_id=}]"
query_string = f"'{parent_id}' in parents and name = '{name}' and trashed = false "
if use_mime_type:
query_string += f" and mimeType='application/vnd.google-apps.folder' "
query_string += " and mimeType='application/vnd.google-apps.folder' "
for attempt in range(retries):
results = (

View File

@@ -1,4 +1,5 @@
import datetime, os
import datetime
import os
import importlib
import subprocess
from typing import Generator, Type
@@ -386,7 +387,7 @@ class GenericExtractor(Extractor):
item.set("replaced_url", url)
ydl_options = {
"outtmpl": os.path.join(self.tmp_dir, f"%(id)s.%(ext)s"),
"outtmpl": os.path.join(self.tmp_dir, "%(id)s.%(ext)s"),
"quiet": False,
"noplaylist": not self.allow_playlist,
"writesubtitles": self.subtitles,

View File

@@ -1,4 +1,6 @@
import re, mimetypes, json
import re
import mimetypes
import json
from datetime import datetime
from loguru import logger
@@ -35,7 +37,7 @@ class Twitter(GenericDropin):
result = Metadata()
try:
if not tweet.get("user") or not tweet.get("created_at"):
raise ValueError(f"Error retreiving post. Are you sure it exists?")
raise ValueError("Error retreiving post. Are you sure it exists?")
timestamp = datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S %z %Y")
except (ValueError, KeyError) as ex:
logger.warning(f"Unable to parse tweet: {str(ex)}\nRetreived tweet data: {tweet}")

View File

@@ -20,7 +20,7 @@ from slugify import slugify
from auto_archiver.core import Feeder, Database, Media
from auto_archiver.core import Metadata
from auto_archiver.modules.gsheet_feeder_db import GWorksheet
from auto_archiver.utils.misc import calculate_file_hash, get_current_timestamp
from auto_archiver.utils.misc import get_current_timestamp
class GsheetsFeederDB(Feeder, Database):

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
import mimetypes, os, pathlib
import mimetypes
import os
import pathlib
from jinja2 import Environment, FileSystemLoader
from urllib.parse import quote
from loguru import logger

View File

@@ -95,7 +95,7 @@ class InstagramAPIExtractor(Extractor):
result.set_title(user.get("full_name", username)).set("data", user)
if pic_url := user.get("profile_pic_url_hd", user.get("profile_pic_url")):
filename = self.download_from_url(pic_url)
result.add_media(Media(filename=filename), id=f"profile_picture")
result.add_media(Media(filename=filename), id="profile_picture")
if self.full_profile:
user_id = user.get("pk")
@@ -133,7 +133,7 @@ class InstagramAPIExtractor(Extractor):
def download_all_highlights(self, result, username, user_id):
count_highlights = 0
highlights = self.call_api(f"v1/user/highlights", {"user_id": user_id})
highlights = self.call_api("v1/user/highlights", {"user_id": user_id})
for h in highlights:
try:
h_info = self._download_highlights_reusable(result, h.get("pk"))
@@ -151,9 +151,9 @@ class InstagramAPIExtractor(Extractor):
def download_post(self, result: Metadata, code: str = None, id: str = None, context: str = None) -> Metadata:
if id:
post = self.call_api(f"v1/media/by/id", {"id": id})
post = self.call_api("v1/media/by/id", {"id": id})
else:
post = self.call_api(f"v1/media/by/code", {"code": code})
post = self.call_api("v1/media/by/code", {"code": code})
assert post, f"Post {id or code} not found"
if caption_text := post.get("caption_text"):
@@ -173,7 +173,7 @@ class InstagramAPIExtractor(Extractor):
return result.success("insta highlights")
def _download_highlights_reusable(self, result: Metadata, id: str) -> dict:
full_h = self.call_api(f"v2/highlight/by/id", {"id": id})
full_h = self.call_api("v2/highlight/by/id", {"id": id})
h_info = full_h.get("response", {}).get("reels", {}).get(f"highlight:{id}")
assert h_info, f"Highlight {id} not found: {full_h=}"
@@ -200,7 +200,7 @@ class InstagramAPIExtractor(Extractor):
return result.success(f"insta stories {now}")
def _download_stories_reusable(self, result: Metadata, username: str) -> list[dict]:
stories = self.call_api(f"v1/user/stories/by/username", {"username": username})
stories = self.call_api("v1/user/stories/by/username", {"username": username})
if not stories or not len(stories):
return []
stories = stories[::-1] # newest to oldest
@@ -219,7 +219,7 @@ class InstagramAPIExtractor(Extractor):
post_count = 0
while end_cursor != "":
posts = self.call_api(f"v1/user/medias/chunk", {"user_id": user_id, "end_cursor": end_cursor})
posts = self.call_api("v1/user/medias/chunk", {"user_id": user_id, "end_cursor": end_cursor})
if not len(posts) or not type(posts) == list or len(posts) != 2:
break
posts, end_cursor = posts[0], posts[1]
@@ -244,7 +244,7 @@ class InstagramAPIExtractor(Extractor):
tagged_count = 0
while next_page_id != None:
resp = self.call_api(f"v2/user/tag/medias", {"user_id": user_id, "page_id": next_page_id})
resp = self.call_api("v2/user/tag/medias", {"user_id": user_id, "page_id": next_page_id})
posts = resp.get("response", {}).get("items", [])
if not len(posts):
break

View File

@@ -4,7 +4,9 @@ highlights, and tagged posts. Authentication is required via username/password o
"""
import re, os, shutil
import re
import os
import shutil
import instaloader
from loguru import logger
@@ -36,9 +38,9 @@ class InstagramExtractor(Extractor):
)
try:
self.insta.load_session_from_file(self.username, self.session_file)
except Exception as e:
except Exception:
try:
logger.debug(f"Session file failed", exc_info=True)
logger.debug("Session file failed", exc_info=True)
logger.info("No valid session file found - Attempting login with use and password.")
self.insta.login(self.username, self.password)
self.insta.save_session_to_file(self.session_file)

View File

@@ -51,7 +51,7 @@ class InstagramTbotExtractor(Extractor):
"""Initializes the Telegram client."""
try:
self.client = TelegramClient(self.session_file, self.api_id, self.api_hash)
except OperationalError as e:
except OperationalError:
logger.error(
f"Unable to access the {self.session_file} session. "
"Ensure that you don't use the same session file here and in telethon_extractor. "
@@ -68,7 +68,7 @@ class InstagramTbotExtractor(Extractor):
def download(self, item: Metadata) -> Metadata:
url = item.get_url()
if not "instagram.com" in url:
if "instagram.com" not in url:
return False
result = Metadata()

View File

@@ -1,5 +1,6 @@
from loguru import logger
import time, os
import time
import os
import base64
from selenium.common.exceptions import TimeoutException

View File

@@ -1,4 +1,5 @@
import ssl, os
import ssl
import os
from slugify import slugify
from urllib.parse import urlparse
from loguru import logger

View File

@@ -1,4 +1,6 @@
import requests, re, html
import requests
import re
import html
from bs4 import BeautifulSoup
from loguru import logger

View File

@@ -10,7 +10,9 @@ from telethon.errors.rpcerrorlist import (
)
from loguru import logger
from tqdm import tqdm
import re, time, os
import re
import time
import os
from auto_archiver.core import Extractor
from auto_archiver.core import Metadata, Media
@@ -63,11 +65,11 @@ class TelethonExtractor(Extractor):
logger.warning(
f"please add the property id='{ent.id}' to the 'channel_invites' configuration where {invite=}, not doing so can lead to a minutes-long setup time due to telegram's rate limiting."
)
except ValueError as e:
except ValueError:
logger.info(f"joining new channel {invite=}")
try:
self.client(ImportChatInviteRequest(match.group(2)))
except UserAlreadyParticipantError as e:
except UserAlreadyParticipantError:
logger.info(f"already joined {invite=}")
except InviteRequestSentError:
logger.warning(f"already sent a join request with {invite} still no answer")

View File

@@ -7,7 +7,8 @@ and identify important moments without watching the entire video.
"""
import ffmpeg, os
import ffmpeg
import os
from loguru import logger
from auto_archiver.core import Enricher

View File

@@ -1,6 +1,8 @@
import jsonlines
import mimetypes
import os, shutil, subprocess
import os
import shutil
import subprocess
from zipfile import ZipFile
from loguru import logger
from warcio.archiveiterator import ArchiveIterator
@@ -186,7 +188,6 @@ class WaczExtractorEnricher(Enricher, Extractor):
# get media out of .warc
counter = 0
seen_urls = set()
import json
with open(warc_filename, "rb") as warc_stream:
for record in ArchiveIterator(warc_stream):

View File

@@ -1,6 +1,7 @@
import json
from loguru import logger
import time, requests
import time
import requests
from auto_archiver.core import Extractor, Enricher
from auto_archiver.utils import url as UrlUtil
@@ -57,7 +58,7 @@ class WaybackExtractorEnricher(Enricher, Extractor):
if not job_id:
logger.error(f"Wayback failed with {r.json()}")
return False
except json.decoder.JSONDecodeError as e:
except json.decoder.JSONDecodeError:
logger.error(f"Expected a JSON with job_id from Wayback and got {r.text}")
return False
@@ -80,7 +81,7 @@ class WaybackExtractorEnricher(Enricher, Extractor):
except requests.exceptions.RequestException as e:
logger.warning(f"RequestException: fetching status for {url=} due to: {e}")
break
except json.decoder.JSONDecodeError as e:
except json.decoder.JSONDecodeError:
logger.error(f"Expected a JSON from Wayback and got {r.text} for {url=}")
break
except Exception as e:

View File

@@ -1,5 +1,6 @@
import traceback
import requests, time
import requests
import time
from loguru import logger
from auto_archiver.core import Enricher
@@ -16,7 +17,7 @@ class WhisperEnricher(Enricher):
def setup(self) -> None:
self.stores = self.config["steps"]["storages"]
self.s3 = self.module_factory.get_module("s3_storage", self.config)
if not "s3_storage" in self.stores:
if "s3_storage" not in self.stores:
logger.error(
"WhisperEnricher: To use the WhisperEnricher you need to use S3Storage so files are accessible publicly to the whisper service being called."
)