test: add integration test for sharing

This commit is contained in:
Felix Spöttel
2023-06-29 14:59:43 +02:00
parent f01ea48f57
commit 05ebc17215
9 changed files with 236 additions and 219 deletions

View File

@@ -1,30 +1,12 @@
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
import app.shared.db.models as models
from app.web.main import app
client = TestClient(app)
@pytest.fixture(name="mock_job", scope="function", autouse=False)
def mock_job(db_session: Session) -> models.Job:
job = models.Job(
url="https://example.com",
type=models.JobType.transcript,
status=models.JobStatus.create,
)
db_session.add(job)
db_session.flush()
return job
from app.web.main import app_factory
# POST /api/v1/jobs
# ---
def test_create_job_pass(auth_headers: dict[str, str]) -> None:
def test_create_job_pass(client, auth_headers: dict[str, str]):
res = client.post(
"/api/v1/jobs",
headers=auth_headers,
@@ -34,12 +16,12 @@ def test_create_job_pass(auth_headers: dict[str, str]) -> None:
assert isinstance(res.json()["id"], str)
def test_create_job_missing_body(auth_headers: dict[str, str]) -> None:
def test_create_job_missing_body(client, auth_headers: dict[str, str]):
res = client.post("/api/v1/jobs", headers=auth_headers, json={})
assert res.status_code == 422
def test_create_job_malformed_url(auth_headers: dict[str, str]) -> None:
def test_create_job_malformed_url(client, auth_headers: dict[str, str]):
res = client.post(
"/api/v1/jobs",
headers=auth_headers,
@@ -50,7 +32,7 @@ def test_create_job_malformed_url(auth_headers: dict[str, str]) -> None:
# GET /api/v1/jobs
# ---
def test_get_jobs_pass(auth_headers: dict[str, str], mock_job: models.Job) -> None:
def test_get_jobs_pass(client, auth_headers: dict[str, str], mock_job: models.Job):
res = client.get(
"/api/v1/jobs?type=transcribe",
headers=auth_headers,
@@ -61,7 +43,7 @@ def test_get_jobs_pass(auth_headers: dict[str, str], mock_job: models.Job) -> No
# GET /api/v1/jobs/:id
# ---
def test_get_job_pass(auth_headers: dict[str, str], mock_job: models.Job) -> None:
def test_get_job_pass(client, auth_headers: dict[str, str], mock_job: models.Job):
res = client.get(
f"/api/v1/jobs/{mock_job.id}",
headers=auth_headers,
@@ -70,7 +52,7 @@ def test_get_job_pass(auth_headers: dict[str, str], mock_job: models.Job) -> Non
assert res.json()["id"] == str(mock_job.id)
def test_get_job_not_found(auth_headers: dict[str, str], mock_job: models.Job) -> None:
def test_get_job_not_found(client, auth_headers: dict[str, str], mock_job):
res = client.get(
"/api/v1/jobs/c8ecf5ea-77cf-48a2-9ecd-199ef35e0ccb",
headers=auth_headers,
@@ -79,18 +61,29 @@ def test_get_job_not_found(auth_headers: dict[str, str], mock_job: models.Job) -
assert res.status_code == 404
# GET /api/v1/jobs/:id/artifacts
# ---
def test_get_artifacts_pass(
auth_headers: dict[str, str], db_session: Session, mock_job: models.Job
) -> None:
artifact = models.Artifact(
data=None, job_id=str(mock_job.id), type=models.ArtifactType.raw_transcript
def test_get_job_sharing_disabled(client, mock_job):
res = client.get(
f"/api/v1/jobs/{mock_job.id}",
headers={},
)
assert res.status_code == 401
def test_get_job_sharing_enabled(db_session, mock_job, sharing_enabled):
# HACK: delay construction until settings are patched.
client = TestClient(app_factory(lambda: db_session))
res = client.get(
f"/api/v1/jobs/{mock_job.id}",
headers={},
)
db_session.add(artifact)
db_session.flush()
assert res.status_code == 200
# GET /api/v1/jobs/:id/artifacts
# ---
def test_get_artifacts_pass(client, auth_headers, db_session, mock_job, mock_artifact):
res = client.get(
f"/api/v1/jobs/{mock_job.id}/artifacts",
headers=auth_headers,
@@ -98,12 +91,10 @@ def test_get_artifacts_pass(
assert res.status_code == 200
assert res.json()[0]["job_id"] == str(mock_job.id)
assert res.json()[0]["id"] == str(artifact.id)
assert res.json()[0]["id"] == str(mock_artifact.id)
def test_get_artifacts_not_found(
auth_headers: dict[str, str], mock_job: models.Job
) -> None:
def test_get_artifacts_not_found(client, auth_headers, mock_job):
res = client.get(
f"/api/v1/jobs/{mock_job.id}/artifacts",
headers=auth_headers,
@@ -115,12 +106,11 @@ def test_get_artifacts_not_found(
# DELETE /api/v1/jobs
# ---
def test_delete_job_pass(
auth_headers: dict[str, str], mock_job: models.Job, db_session: Session
) -> None:
def test_delete_job_pass(client, auth_headers, mock_job, db_session):
res = client.delete(
f"/api/v1/jobs/{mock_job.id}",
headers=auth_headers,
)
assert db_session.query(models.Job).count() == 0
assert res.status_code == 204