Merge GSheet Feeder and Database.

This commit is contained in:
erinhmclark
2025-02-25 21:32:32 +00:00
parent 011ded2bde
commit 077b56c150
11 changed files with 259 additions and 286 deletions

View File

@@ -0,0 +1,2 @@
from .gworksheet import GWorksheet
from .gsheet_feeder_db import GsheetsFeederDB

View File

@@ -0,0 +1,92 @@
{
"name": "Google Sheets Feeder Database",
"type": ["feeder", "database"],
"entry_point": "gsheet_feeder_db::GsheetsFeederDB",
"requires_setup": True,
"dependencies": {
"python": ["loguru", "gspread", "slugify"],
},
"configs": {
"sheet": {"default": None, "help": "name of the sheet to archive"},
"sheet_id": {
"default": None,
"help": "the id of the sheet to archive (alternative to 'sheet' config)",
},
"header": {"default": 1, "help": "index of the header row (starts at 1)", "type": "int"},
"service_account": {
"default": "secrets/service_account.json",
"help": "service account JSON file path. Learn how to create one: https://gspread.readthedocs.io/en/latest/oauth2.html",
"required": True,
},
"columns": {
"default": {
"url": "link",
"status": "archive status",
"folder": "destination folder",
"archive": "archive location",
"date": "archive date",
"thumbnail": "thumbnail",
"timestamp": "upload timestamp",
"title": "upload title",
"text": "text content",
"screenshot": "screenshot",
"hash": "hash",
"pdq_hash": "perceptual hashes",
"wacz": "wacz",
"replaywebpage": "replaywebpage",
},
"help": "Custom names for the columns in your Google sheet. If you don't want to use the default column names, change them with this setting",
"type": "json_loader",
},
"allow_worksheets": {
"default": set(),
"help": "A list of worksheet names that should be processed (overrides worksheet_block), leave empty so all are allowed",
},
"block_worksheets": {
"default": set(),
"help": "A list of worksheet names for worksheets that should be explicitly blocked from being processed",
},
"use_sheet_names_in_stored_paths": {
"default": True,
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
"type": "bool",
},
"allow_worksheets": {
"default": set(),
"help": "(CSV) only worksheets whose name is included in allow are included (overrides worksheet_block), leave empty so all are allowed",
},
"block_worksheets": {
"default": set(),
"help": "(CSV) explicitly block some worksheets from being processed",
},
"use_sheet_names_in_stored_paths": {
"default": True,
"type": "bool",
"help": "if True the stored files path will include 'workbook_name/worksheet_name/...'",
}
},
"description": """
GsheetsFeederDatabase
A Google Sheets-based feeder and optional database for the Auto Archiver.
This reads data from Google Sheets and filters rows based on user-defined rules.
The filtered rows are processed into `Metadata` objects.
### Features
- Validates the sheet structure and filters rows based on input configurations.
- Processes only worksheets allowed by the `allow_worksheets` and `block_worksheets` configurations.
- Ensures only rows with valid URLs and unprocessed statuses are included for archival.
- Supports organizing stored files into folder paths based on sheet and worksheet names.
- If the database is enabled, this updates the Google Sheet with the status of the archived URLs, including in progress, success or failure, and method used.
- Saves metadata such as title, text, timestamp, hashes, screenshots, and media URLs to designated columns.
- Formats media-specific metadata, such as thumbnails and PDQ hashes for the sheet.
- Skips redundant updates for empty or invalid data fields.
### Setup
- Requires a Google Service Account JSON file for authentication, which should be stored in `secrets/gsheets_service_account.json`.
To set up a service account, follow the instructions [here](https://gspread.readthedocs.io/en/latest/oauth2.html).
- Define the `sheet` or `sheet_id` configuration to specify the sheet to archive.
- Customize the column names in your Google sheet using the `columns` configuration.
- The Google Sheet can be used soley as a feeder or as a feeder and database, but note you can't currently feed into the database from an alternate feeder.
""",
}

View File

@@ -0,0 +1,196 @@
"""
GsheetsFeeder: A Google Sheets-based feeder for the Auto Archiver.
This reads data from Google Sheets and filters rows based on user-defined rules.
The filtered rows are processed into `Metadata` objects.
### Key properties
- validates the sheet's structure and filters rows based on input configurations.
- Ensures only rows with valid URLs and unprocessed statuses are included.
"""
import os
from typing import Tuple, Union
from urllib.parse import quote
import gspread
from loguru import logger
from slugify import slugify
from auto_archiver.core import Feeder, Database, Media
from auto_archiver.core import Metadata
from auto_archiver.modules.gsheet_feeder_db import GWorksheet
from auto_archiver.utils.misc import calculate_file_hash, get_current_timestamp
class GsheetsFeederDB(Feeder, Database):
def setup(self) -> None:
self.gsheets_client = gspread.service_account(filename=self.service_account)
# TODO mv to validators
if not self.sheet and not self.sheet_id:
raise ValueError("You need to define either a 'sheet' name or a 'sheet_id' in your manifest.")
def open_sheet(self):
if self.sheet:
return self.gsheets_client.open(self.sheet)
else: # self.sheet_id
return self.gsheets_client.open_by_key(self.sheet_id)
def __iter__(self) -> Metadata:
sh = self.open_sheet()
for ii, worksheet in enumerate(sh.worksheets()):
if not self.should_process_sheet(worksheet.title):
logger.debug(f"SKIPPED worksheet '{worksheet.title}' due to allow/block rules")
continue
logger.info(f'Opening worksheet {ii=}: {worksheet.title=} header={self.header}')
gw = GWorksheet(worksheet, header_row=self.header, columns=self.columns)
if len(missing_cols := self.missing_required_columns(gw)):
logger.warning(f"SKIPPED worksheet '{worksheet.title}' due to missing required column(s) for {missing_cols}")
continue
# process and yield metadata here:
yield from self._process_rows(gw)
logger.success(f'Finished worksheet {worksheet.title}')
def _process_rows(self, gw: GWorksheet):
for row in range(1 + self.header, gw.count_rows() + 1):
url = gw.get_cell(row, 'url').strip()
if not len(url): continue
original_status = gw.get_cell(row, 'status')
status = gw.get_cell(row, 'status', fresh=original_status in ['', None])
# TODO: custom status parser(?) aka should_retry_from_status
if status not in ['', None]: continue
# All checks done - archival process starts here
m = Metadata().set_url(url)
self._set_context(m, gw, row)
yield m
def _set_context(self, m: Metadata, gw: GWorksheet, row: int) -> Metadata:
# TODO: Check folder value not being recognised
m.set_context("gsheet", {"row": row, "worksheet": gw})
if gw.get_cell_or_default(row, 'folder', "") is None:
folder = ''
else:
folder = slugify(gw.get_cell_or_default(row, 'folder', "").strip())
if len(folder):
if self.use_sheet_names_in_stored_paths:
m.set_context("folder", os.path.join(folder, slugify(self.sheet), slugify(gw.wks.title)))
else:
m.set_context("folder", folder)
def should_process_sheet(self, sheet_name: str) -> bool:
if len(self.allow_worksheets) and sheet_name not in self.allow_worksheets:
# ALLOW rules exist AND sheet name not explicitly allowed
return False
if len(self.block_worksheets) and sheet_name in self.block_worksheets:
# BLOCK rules exist AND sheet name is blocked
return False
return True
def missing_required_columns(self, gw: GWorksheet) -> list:
missing = []
for required_col in ['url', 'status']:
if not gw.col_exists(required_col):
missing.append(required_col)
return missing
def started(self, item: Metadata) -> None:
logger.warning(f"STARTED {item}")
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, "status", "Archive in progress")
def failed(self, item: Metadata, reason: str) -> None:
logger.error(f"FAILED {item}")
self._safe_status_update(item, f"Archive failed {reason}")
def aborted(self, item: Metadata) -> None:
logger.warning(f"ABORTED {item}")
self._safe_status_update(item, "")
def fetch(self, item: Metadata) -> Union[Metadata, bool]:
"""check if the given item has been archived already"""
return False
def done(self, item: Metadata, cached: bool = False) -> None:
"""archival result ready - should be saved to DB"""
logger.success(f"DONE {item.get_url()}")
gw, row = self._retrieve_gsheet(item)
# self._safe_status_update(item, 'done')
cell_updates = []
row_values = gw.get_row(row)
def batch_if_valid(col, val, final_value=None):
final_value = final_value or val
try:
if val and gw.col_exists(col) and gw.get_cell(row_values, col) == "":
cell_updates.append((row, col, final_value))
except Exception as e:
logger.error(f"Unable to batch {col}={final_value} due to {e}")
status_message = item.status
if cached:
status_message = f"[cached] {status_message}"
cell_updates.append((row, "status", status_message))
media: Media = item.get_final_media()
if hasattr(media, "urls"):
batch_if_valid("archive", "\n".join(media.urls))
batch_if_valid("date", True, get_current_timestamp())
batch_if_valid("title", item.get_title())
batch_if_valid("text", item.get("content", ""))
batch_if_valid("timestamp", item.get_timestamp())
if media:
batch_if_valid("hash", media.get("hash", "not-calculated"))
# merge all pdq hashes into a single string, if present
pdq_hashes = []
all_media = item.get_all_media()
for m in all_media:
if pdq := m.get("pdq_hash"):
pdq_hashes.append(pdq)
if len(pdq_hashes):
batch_if_valid("pdq_hash", ",".join(pdq_hashes))
if (screenshot := item.get_media_by_id("screenshot")) and hasattr(
screenshot, "urls"
):
batch_if_valid("screenshot", "\n".join(screenshot.urls))
if thumbnail := item.get_first_image("thumbnail"):
if hasattr(thumbnail, "urls"):
batch_if_valid("thumbnail", f'=IMAGE("{thumbnail.urls[0]}")')
if browsertrix := item.get_media_by_id("browsertrix"):
batch_if_valid("wacz", "\n".join(browsertrix.urls))
batch_if_valid(
"replaywebpage",
"\n".join(
[
f"https://replayweb.page/?source={quote(wacz)}#view=pages&url={quote(item.get_url())}"
for wacz in browsertrix.urls
]
),
)
gw.batch_set_cell(cell_updates)
def _safe_status_update(self, item: Metadata, new_status: str) -> None:
try:
gw, row = self._retrieve_gsheet(item)
gw.set_cell(row, "status", new_status)
except Exception as e:
logger.debug(f"Unable to update sheet: {e}")
def _retrieve_gsheet(self, item: Metadata) -> Tuple[GWorksheet, int]:
if gsheet := item.get_context("gsheet"):
gw: GWorksheet = gsheet.get("worksheet")
row: int = gsheet.get("row")
elif self.sheet_id:
logger.error(f"Unable to retrieve Gsheet for {item.get_url()}, GsheetDB must be used alongside GsheetFeeder.")
return gw, row

View File

@@ -0,0 +1,109 @@
from gspread import utils
class GWorksheet:
"""
This class makes read/write operations to the a worksheet easier.
It can read the headers from a custom row number, but the row references
should always include the offset of the header.
eg: if header=4, row 5 will be the first with data.
"""
COLUMN_NAMES = {
'url': 'link',
'status': 'archive status',
'folder': 'destination folder',
'archive': 'archive location',
'date': 'archive date',
'thumbnail': 'thumbnail',
'timestamp': 'upload timestamp',
'title': 'upload title',
'text': 'text content',
'screenshot': 'screenshot',
'hash': 'hash',
'pdq_hash': 'perceptual hashes',
'wacz': 'wacz',
'replaywebpage': 'replaywebpage',
}
def __init__(self, worksheet, columns=COLUMN_NAMES, header_row=1):
self.wks = worksheet
self.columns = columns
self.values = self.wks.get_values()
if len(self.values) > 0:
self.headers = [v.lower() for v in self.values[header_row - 1]]
else:
self.headers = []
def _check_col_exists(self, col: str):
if col not in self.columns:
raise Exception(f'Column {col} is not in the configured column names: {self.columns.keys()}')
def _col_index(self, col: str):
self._check_col_exists(col)
return self.headers.index(self.columns[col].lower())
def col_exists(self, col: str):
self._check_col_exists(col)
return self.columns[col].lower() in self.headers
def count_rows(self):
return len(self.values)
def get_row(self, row: int):
# row is 1-based
return self.values[row - 1]
def get_values(self):
return self.values
def get_cell(self, row, col: str, fresh=False):
"""
returns the cell value from (row, col),
where row can be an index (1-based) OR list of values
as received from self.get_row(row)
if fresh=True, the sheet is queried again for this cell
"""
col_index = self._col_index(col)
if fresh:
return self.wks.cell(row, col_index + 1).value
if type(row) == int:
row = self.get_row(row)
if col_index >= len(row):
return ''
return row[col_index]
def get_cell_or_default(self, row, col: str, default: str = None, fresh=False, when_empty_use_default=True):
"""
return self.get_cell or default value on error (eg: column is missing)
"""
try:
val = self.get_cell(row, col, fresh)
if when_empty_use_default and val.strip() == "":
return default
return val
except:
return default
def set_cell(self, row: int, col: str, val):
# row is 1-based
col_index = self._col_index(col) + 1
self.wks.update_cell(row, col_index, val)
def batch_set_cell(self, cell_updates):
"""
receives a list of [(row:int, col:str, val)] and batch updates it, the parameters are the same as in the self.set_cell() method
"""
cell_updates = [
{
'range': self.to_a1(row, col),
'values': [[str(val)[0:49999]]]
}
for row, col, val in cell_updates
]
self.wks.batch_update(cell_updates, value_input_option='USER_ENTERED')
def to_a1(self, row: int, col: str):
# row is 1-based
return utils.rowcol_to_a1(row, self._col_index(col) + 1)