feat: add celery job queue

This commit is contained in:
Felix Spöttel
2023-01-06 16:48:06 +01:00
parent c90915ba40
commit e41c07fd4b
11 changed files with 83 additions and 56 deletions

View File

@@ -1,3 +1,4 @@
DATABASE_URI="postgresql://postgres:postgres@localhost:5432/whisper_api_test"
ENVIRONMENT="development"
API_SECRET="foo"
REDIS_URI="redis://localhost:6379/0"

View File

@@ -5,8 +5,8 @@ Revises:
Create Date: 2023-01-05 12:00:58.824773
"""
from alembic import op
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
@@ -26,9 +26,7 @@ def upgrade() -> None:
sa.Enum("Create", "Error", "Success", name="jobstatus"),
nullable=False,
),
sa.Column(
"type", sa.Enum("Transcript", name="jobtype"), nullable=False
),
sa.Column("type", sa.Enum("Transcript", name="jobtype"), nullable=False),
sa.Column(
"created_at",
sa.DateTime(),

View File

@@ -4,13 +4,10 @@ from pydantic import BaseSettings
class Settings(BaseSettings):
API_SECRET: str
DATABASE_URI: str
ENVIRONMENT: str
API_SECRET: str
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
REDIS_URI: str
if "ENVIRONMENT" in os.environ and os.environ["ENVIRONMENT"] == "test":

View File

@@ -4,12 +4,11 @@ from uuid import UUID
from fastapi import APIRouter, Depends, FastAPI, HTTPException, Path
from pydantic import AnyHttpUrl, BaseModel
from sqlalchemy.orm import Session
from app.db.base import get_session
from app.db.dtos import Job, JobStatus, JobType
import app.db.dtos as dtos
import app.db.models as models
from .security import authenticate_api_key
from app.db.base import get_session
from app.utils.security import authenticate_api_key
app = FastAPI()
@@ -25,29 +24,35 @@ class TranscriptPayload(BaseModel):
url: AnyHttpUrl
@api_router.post("/transcripts", response_model=Job)
@api_router.post("/transcripts", response_model=dtos.Job)
def create_transcript(
payload: TranscriptPayload, session: Session = Depends(get_session)
) -> models.Job:
job = models.Job(url=payload.url, status=JobStatus.Create, type=JobType.Transcript)
job = models.Job(
url=payload.url, status=dtos.JobStatus.Create, type=dtos.JobType.Transcript
)
session.add(job)
session.flush()
return job
@api_router.get("/transcripts", response_model=List[Job])
@api_router.get("/transcripts", response_model=List[dtos.Job])
def get_transcripts(session: Session = Depends(get_session)) -> List[models.Job]:
return session.query(models.Job).filter(models.Job.type == JobType.Transcript).all()
return (
session.query(models.Job)
.filter(models.Job.type == dtos.JobType.Transcript)
.all()
)
@api_router.get("/transcripts/{id}", response_model=Job)
@api_router.get("/transcripts/{id}", response_model=dtos.Job)
def get_transcript(
id: UUID = Path(), session: Session = Depends(get_session)
) -> Optional[Job]:
) -> Optional[dtos.Job]:
job = (
session.query(models.Job)
.filter(models.Job.id == id)
.filter(models.Job.type == JobType.Transcript)
.filter(models.Job.type == dtos.JobType.Transcript)
.one_or_none()
)
if not job:
@@ -60,7 +65,7 @@ def delete_transcript(
id: UUID = Path(), session: Session = Depends(get_session)
) -> None:
session.query(models.Job).filter(models.Job.id == id).filter(
models.Job.type == JobType.Transcript
models.Job.type == dtos.JobType.Transcript
).delete()
return None

View File

@@ -1,12 +0,0 @@
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from app.main import app
import app.db.models as models
client = TestClient(app)
def test_create_task(db_session: Session) -> None:
jobs = db_session.query(models.Job).all()
assert len(jobs) == 0

0
app/utils/__init__.py Normal file
View File

7
app/worker.py Normal file
View File

@@ -0,0 +1,7 @@
from celery import Celery
from .config import settings
celery = Celery(__name__)
celery.conf.broker_url = settings.REDIS_URI

View File

@@ -11,5 +11,3 @@ RUN pip install -U pip
RUN pip install .[test]
# The source code is mounted as a volume at /code, no need to copy.
ENTRYPOINT ["bash", "./app/start.sh"]

View File

@@ -1,27 +1,12 @@
version: "3.8"
services:
app:
container_name: whisper_api_app
build:
context: .
dockerfile: dev.Dockerfile
environment:
DATABASE_URI: postgresql://postgres:postgres@postgres/whisper_api
ENVIRONMENT: development
API_SECRET: foobar
ports:
- "8000:80"
networks:
- app
volumes:
- ./:/code
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_started
x-app-variables: &app-variables
API_SECRET: a_very_secret_token
DATABASE_URI: postgresql://postgres:postgres@postgres/whisper_api
ENVIRONMENT: development
REDIS_URI: redis://redis:6379/0
services:
postgres:
container_name: whisper_api_postgres
image: postgres:15-alpine
@@ -44,6 +29,53 @@ services:
redis:
container_name: whisper_api_redis
image: redis:7-alpine
ports:
- 6379:6379
networks:
- app
app:
container_name: whisper_api_app
build:
context: .
dockerfile: dev.Dockerfile
command: bash ./app/start.sh
environment: *app-variables
ports:
- "8000:80"
networks:
- app
volumes:
- ./:/code
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_started
worker:
build:
context: .
dockerfile: dev.Dockerfile
container_name: whisper_api_worker
command: celery --app=app.worker.celery worker --loglevel=info
volumes:
- ./:/code
environment: *app-variables
depends_on:
- app
- redis
networks:
- app
flower:
container_name: whisper_api_flower
image: mher/flower
command: celery --broker redis://redis:6379/0 flower --port=5555
ports:
- 5555:5555
depends_on:
- redis
networks:
- app

View File

@@ -5,6 +5,7 @@ version = "0.0.1"
dependencies=[
"alembic ==1.9.0",
"celery[redis] ==5.2.7",
"fastapi ==0.88.0",
"psycopg2 ==2.9.5",
"python-dotenv ==0.21.0",