bugs, worker cleanup

This commit is contained in:
msramalho
2025-02-08 15:20:27 +00:00
parent a97333c4d6
commit ad6dc4db43
9 changed files with 72 additions and 88 deletions

View File

@@ -73,7 +73,7 @@ async def archive_sheets_cronjob(frequency: str, interval: int, current_time_uni
sheets = await crud.get_sheets_by_id_hash(db, frequency, interval, current_time_unit)
for s in sheets:
task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=s.id, author_id=s.author_id, group=s.group_id).model_dump_json()]).apply_async()
task = celery.signature("create_sheet_task", args=[schemas.SubmitSheet(sheet_id=s.id, author_id=s.author_id, group_id=s.group_id).model_dump_json()]).apply_async()
triggered_jobs.append({"sheet_id": s.id, "task_id": task.id})
logger.info(f"[CRON {frequency.upper()}:{current_time_unit}] Triggered {len(triggered_jobs)} sheet tasks: {triggered_jobs}")

View File

@@ -21,5 +21,6 @@ async def logging_middleware(request: Request, call_next):
except Exception as e:
from utils.metrics import EXCEPTION_COUNTER
EXCEPTION_COUNTER.labels(type=e.__class__.__name__).inc()
logger.info(f"{request.client.host}:{request.client.port} {request.method} {request.url._url} - {e.__class__.__name__} {e}")
log_error(e)
raise e

View File

@@ -50,8 +50,8 @@ def search_archives_by_url(db: Session, url: str, email: str, skip: int = 0, lim
def search_archives_by_email(db: Session, email: str, skip: int = 0, limit: int = 100):
return base_query(db).filter(models.Archive.author_id == email).order_by(models.Archive.created_at.desc()).offset(skip).limit(get_limit(limit)).all()
def create_task(db: Session, task: schemas.ArchiveCreate, tags: list[models.Tag], urls: list[models.ArchiveUrl]):
#TODO: rename task to archive
def create_task(db: Session, task: schemas.ArchiveCreate, tags: list[models.Tag], urls: list[models.ArchiveUrl]) -> models.Archive:
db_task = models.Archive(id=task.id, url=task.url, result=task.result, public=task.public, author_id=task.author_id, group_id=task.group_id, sheet_id=task.sheet_id)
db_task.tags = tags
db_task.urls = urls
@@ -234,8 +234,8 @@ def upsert_user_groups(db: Session):
# --------------- SHEET
def create_sheet(db: Session, sheet_id: str, sheet_name: str, email: str, group_id: str, frequency: str):
db_sheet = models.Sheet(id=sheet_id, name=sheet_name, author_id=email, group_id=group_id, frequency=frequency)
def create_sheet(db: Session, sheet_id: str, name: str, email: str, group_id: str, frequency: str):
db_sheet = models.Sheet(id=sheet_id, name=name, author_id=email, group_id=group_id, frequency=frequency)
db.add(db_sheet)
db.commit()
db.refresh(db_sheet)
@@ -250,7 +250,6 @@ def get_user_sheets(db: Session, email: str) -> list[models.Sheet]:
return db.query(models.Sheet).filter(models.Sheet.author_id == email).order_by(models.Sheet.last_url_archived_at.desc()).all()
async def get_sheets_by_id_hash(db: AsyncSession, frequency: str, modulo: str, id_hash: str) -> list[models.Sheet]:
result = await db.execute(
select(models.Sheet).filter(models.Sheet.frequency == frequency)
@@ -288,3 +287,15 @@ def delete_sheet(db: Session, sheet_id: str, email: str) -> bool:
db.delete(db_sheet)
db.commit()
return db_sheet is not None
#--- Celery worker tasks
def insert_result_into_db(db: Session, archive: schemas.ArchiveCreate) -> models.Archive:
# create and load user, tags, if needed
create_or_get_user(db, archive.author_id)
db_tags = [create_tag(db, tag) for tag in archive.tags]
# insert everything
db_task = create_task(db, task=archive, tags=db_tags, urls=archive.urls)
return db_task

View File

@@ -4,22 +4,12 @@ from pydantic import BaseModel
from datetime import datetime
class Tag(BaseModel):
id: str
created_at: datetime
model_config = {"from_attributes": True}
__hash__ = object.__hash__
class SubmitSheet(BaseModel):
sheet_name: str | None = None
sheet_id: str | None = None
header: int = 1
public: bool = False
sheet_id: str | None
author_id: str | None = None
group_id: str | None
tags: set[str] | None = set()
columns: dict | None = {} # TODO: implement
columns: dict | None = {} # TODO: implement/remove
class SubmitManual(BaseModel):
@@ -85,7 +75,7 @@ class ArchiveTrigger(BaseModel):
url: Annotated[str, Len(min_length=5)]
public: bool = False
group_id: Annotated[str, Len(min_length=1)] = "default"
tags: set[Tag] | None = None
tags: set[str] | None = None
class ArchiveCreate(ArchiveTrigger):
id: str | None = None
@@ -107,4 +97,10 @@ class Usage(BaseModel):
total_sheets: int = 0
class UsageResponse(Usage):
groups: dict[str, Usage]
groups: dict[str, Usage]
class CelerySheetTask(BaseModel):
success: bool
sheet_id: str
time: datetime
stats: dict

View File

@@ -18,7 +18,6 @@ celery = get_celery()
@task_router.get("/{task_id}", summary="Check the status of an async task by its id, works for URLs and Sheet tasks.")
def get_status(task_id, email=Depends(get_token_or_user_auth)) -> schemas.TaskResult:
logger.info(f"status check for user {email} task {task_id}")
task = AsyncResult(task_id, app=celery)
try:
if task.status == "FAILURE":

0
src/shared/__init__.py Normal file
View File

View File

@@ -64,7 +64,7 @@ class Test_create_archive_task():
class Test_create_sheet_task():
URL = "https://example-live.com"
sheet = schemas.SubmitSheet(sheet_name="Sheet", sheet_id="123", author_id="rick@example.com", group_id=None)
sheet = schemas.SubmitSheet(sheet_id="123", author_id="rick@example.com", group_id=None)
# @patch("worker.main.insert_result_into_db")
@patch("worker.main.models.generate_uuid", return_value="constant-uuid")
@@ -82,7 +82,6 @@ class Test_create_sheet_task():
m_orch_generator.return_value = m_orch
res = create_sheet_task(self.sheet.model_dump_json())
print(res)
assert res["archived"] == 1
assert res["failed"] == 0
assert len(res["errors"]) == 0

View File

@@ -142,8 +142,6 @@ def app_factory(settings = get_settings()):
def archive_sheet(sheet: schemas.SubmitSheet, email=Depends(get_user_auth), db: Session = Depends(get_db_dependency)):
logger.info(f"SHEET TASK for {sheet=}")
sheet.author_id = email
if not sheet.sheet_name and not sheet.sheet_id:
raise HTTPException(status_code=422, detail=f"sheet name or id is required")
if not crud.is_user_in_group(db, email, sheet.group_id):
raise HTTPException(status_code=403, detail="User does not have access to this group.")
task = celery.signature("create_sheet_task", args=[sheet.model_dump_json()]).delay()
@@ -151,11 +149,9 @@ def app_factory(settings = get_settings()):
@app.post("/sheet_service", status_code=201, deprecated=True) # DEPRECATED
def archive_sheet_service(sheet: schemas.SubmitSheet, auth=Depends(token_api_key_auth), db: Session = Depends(get_db_dependency)):
def archive_sheet_service(sheet: schemas.SubmitSheet, auth=Depends(token_api_key_auth)):
logger.info(f"SHEET TASK for {sheet=}")
sheet.author_id = sheet.author_id or "api-endpoint"
if not sheet.sheet_name and not sheet.sheet_id:
raise HTTPException(status_code=422, detail=f"sheet name or id is required")
task = celery.signature("create_sheet_task", args=[sheet.model_dump_json()]).delay()
return JSONResponse({"id": task.id})

View File

@@ -1,6 +1,6 @@
import traceback, yaml, datetime
from typing import List, Set
import traceback, datetime
from typing import List
from celery.signals import task_failure
from auto_archiver import Config, ArchivingOrchestrator, Metadata
@@ -23,6 +23,8 @@ Redis = get_redis()
USER_GROUPS_FILENAME = settings.USER_GROUPS_FILENAME
# TODO: after release, as it requires updating past entries with sheet_id where tag is used, drop tags
@celery.task(name="create_archive_task", bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 0})
def create_archive_task(self, archive_json: str):
@@ -32,35 +34,39 @@ def create_archive_task(self, archive_json: str):
# call auto-archiver
orchestrator = load_orchestrator(archive.group_id)
result = orchestrator.feed_item(Metadata().set_url(archive.url))
# prepare for DB
assert result, f"UNABLE TO archive: {archive.url}"
# prepare and insert in DB
archive.id = self.request.id
archive.urls = get_all_urls(result)
archive.result = json.loads(result.to_json())
insert_result_into_db(archive)
return archive.result.to_dict() # TODO: is return used?
return archive.result
@celery.task(name="create_sheet_task", bind=True)
def create_sheet_task(self, sheet_json: str):
sheet = schemas.SubmitSheet.model_validate_json(sheet_json)
sheet.tags.add("gsheet")
logger.info(f"SHEET START {sheet=}")
# TODO: drop sheet_name and use only sheet_id (new endpoints/models)
orchestrator = load_orchestrator(sheet.group_id, {"configurations": {"gsheet_feeder": {"sheet": sheet.sheet_name, "sheet_id": sheet.sheet_id, "header": sheet.header}}})
orchestrator = load_orchestrator(sheet.group_id, True, {"configurations": {"gsheet_feeder": {"sheet_id": sheet.sheet_id}}})
stats = {"archived": 0, "failed": 0, "errors": []}
for result in orchestrator.feed():
if not result:
logger.error("Got empty result from feeder, an internal error must have occurred.")
continue
try:
# TODO: remove public from sheet in new refactor
#TODO: use new insert_result_into_db
insert_result_into_db(result, sheet.tags, sheet.public, sheet.group_id, sheet.author_id, models.generate_uuid(), sheet.sheet_id)
assert result, f"UNABLE TO archive: {result.get_url()}"
archive = schemas.ArchiveCreate(
author_id=sheet.author_id,
url=result.get_url(),
group_id=sheet.group_id,
tags=sheet.tags,
id=models.generate_uuid(),
result=json.loads(result.to_json()),
sheet_id=sheet.sheet_id,
urls=get_all_urls(result)
)
insert_result_into_db(archive)
stats["archived"] += 1
except exc.IntegrityError as e:
logger.warning(f"cached result detected: {e}")
@@ -75,33 +81,17 @@ def create_sheet_task(self, sheet_json: str):
crud.update_sheet_last_url_archived_at(session, sheet.sheet_id)
logger.info(f"SHEET DONE {sheet=}")
# TODO: use data model
return {"success": True, "sheet": sheet.sheet_name, "sheet_id": sheet.sheet_id, "time": datetime.datetime.now().isoformat(), **stats}
# TODO: is this used anywhere? maybe drop it
return schemas.CelerySheetTask(success=True, sheet_id=sheet.sheet_id, time=datetime.datetime.now().isoformat(), stats=stats).model_dump()
@task_failure.connect(sender=create_sheet_task)
@task_failure.connect(sender=create_archive_task)
def task_failure_notifier(sender, **kwargs):
# automatically capture exceptions in the worker tasks
logger.warning(f"⚠️ worker task failed: {sender.name}")
traceback_msg = "\n".join(traceback.format_list(traceback.extract_tb(kwargs['traceback'])))
log_error(kwargs['exception'], traceback_msg, f"task_failure: {sender.name}")
redis_publish_exception(kwargs['exception'], sender.name, traceback_msg)
def read_user_groups():
# read yaml safely
with open(USER_GROUPS_FILENAME) as inf:
try:
return yaml.safe_load(inf)
except yaml.YAMLError as e:
logger.error(f"could not open user groups filename {USER_GROUPS_FILENAME}: {e}")
raise e
def load_orchestrator(group_id: str, overwrite_configs: dict = {}) -> ArchivingOrchestrator:
def load_orchestrator(group_id: str, orchestrator_for_sheet: bool = False, overwrite_configs: dict = {}) -> ArchivingOrchestrator:
with get_db() as session:
orchestrator_fn = crud.get_group(session, group_id).orchestrator
group = crud.get_group(session, group_id)
if orchestrator_for_sheet:
orchestrator_fn = group.orchestrator_sheet
else:
orchestrator_fn = crud.get_group(session, group_id).orchestrator
assert orchestrator_fn, f"no orchestrator found for {group_id}"
config = Config()
@@ -111,29 +101,11 @@ def load_orchestrator(group_id: str, overwrite_configs: dict = {}) -> ArchivingO
def insert_result_into_db(archive: schemas.ArchiveCreate) -> str:
with get_db() as session:
# create and load user, tags, if needed
crud.create_or_get_user(session, archive.author_id)
db_tags = [crud.create_tag(session, tag) for tag in archive.tags]
# insert everything
db_task = crud.create_task(session, task=archive, tags=db_tags, urls=archive.urls)
logger.debug(f"Added {db_task.id=} to database on {db_task.created_at} ({db_task.author_id})")
db_task = crud.insert_result_into_db(session, archive)
logger.debug(f"[ARCHIVE STORED] {db_task.author_id} {db_task.url}")
return db_task.id
def insert_result_into_db(result: Metadata, tags: Set[str], public: bool, group_id: str, author_id: str, task_id: str, sheet_id: str = "") -> str:
logger.info(f"INSERTING {public=} {group_id=} {author_id=} {tags=} into {task_id}")
assert result, f"UNABLE TO archive: {result.get_url() if result else result}"
with get_db() as session:
# urls are created by get_all_urls
# create author_id if needed
crud.create_or_get_user(session, author_id)
# create DB TAGs if needed
db_tags = [crud.create_tag(session, tag) for tag in tags]
# insert archive
db_task = crud.create_task(session, task=schemas.ArchiveCreate(id=task_id, url=result.get_url(), result=json.loads(result.to_json()), public=public, author_id=author_id, group_id=group_id, sheet_id=sheet_id), tags=db_tags, urls=get_all_urls(result))
logger.debug(f"Added {db_task.id=} to database on {db_task.created_at} ({db_task.author_id})")
return db_task.id
# TODO: this should live within the auto-archiver
def get_all_urls(result: Metadata) -> List[models.ArchiveUrl]:
db_urls = []
@@ -166,4 +138,14 @@ def redis_publish_exception(exception, task_name, traceback: str = ""):
exception_data = {"task": task_name, "type": exception.__class__.__name__, "exception": exception, "traceback": traceback}
Redis.publish(REDIS_EXCEPTIONS_CHANNEL, json.dumps(exception_data, default=str))
except Exception as e:
log_error(e, f"[CRITICAL] Could not publish to {REDIS_EXCEPTIONS_CHANNEL}")
log_error(e, f"[CRITICAL] Could not publish to {REDIS_EXCEPTIONS_CHANNEL}")
@task_failure.connect(sender=create_sheet_task)
@task_failure.connect(sender=create_archive_task)
def task_failure_notifier(sender, **kwargs):
# automatically capture exceptions in the worker tasks
logger.warning(f"⚠️ worker task failed: {sender.name}")
traceback_msg = "\n".join(traceback.format_list(traceback.extract_tb(kwargs['traceback'])))
log_error(kwargs['exception'], traceback_msg, f"task_failure: {sender.name}")
redis_publish_exception(kwargs['exception'], sender.name, traceback_msg)