Standardize router names (#70)

This commit is contained in:
Michael Plunkett
2025-04-02 13:14:09 -05:00
committed by GitHub
parent 1a3546c09e
commit de6800ea54
14 changed files with 61 additions and 54 deletions

View File

@@ -0,0 +1,193 @@
from http import HTTPStatus
from unittest.mock import MagicMock
import pytest
from fastapi.testclient import TestClient
from loguru import logger
from app.shared.schemas import Usage, UsageResponse
from app.shared.user_groups import GroupInfo
from app.web.config import VERSION
from app.web.security import get_user_state
from app.web.utils.metrics import measure_regular_metrics
def test_endpoint_home(client_with_auth):
r = client_with_auth.get("/")
assert r.status_code == HTTPStatus.OK
j = r.json()
assert "version" in j and j["version"] == VERSION
assert "breakingChanges" in j
assert "groups" not in j
def test_endpoint_health(client_with_auth):
r = client_with_auth.get("/health")
assert r.status_code == HTTPStatus.OK
assert r.json() == {"status": "ok"}
def test_endpoint_active_no_auth(client, test_no_auth):
test_no_auth(client.get, "/user/active")
def test_endpoint_active(app):
m_user_state = MagicMock()
app.dependency_overrides[get_user_state] = lambda: m_user_state
# inactive user
m_user_state.active = False
client = TestClient(app)
r = client.get("/user/active")
assert r.status_code == HTTPStatus.OK
assert r.json() == {"active": False}
# active user
m_user_state.active = True
client = TestClient(app)
r = client.get("/user/active")
assert r.status_code == HTTPStatus.OK
assert r.json() == {"active": True}
def test_no_serve_local_archive_by_default(client_with_auth):
r = client_with_auth.get("/app/local_archive_test/temp.txt")
assert r.status_code == HTTPStatus.NOT_FOUND
def test_favicon(client_with_auth):
r = client_with_auth.get("/favicon.ico")
assert r.status_code == HTTPStatus.OK
assert r.headers["content-type"] == "image/vnd.microsoft.icon"
def test_endpoint_test_prometheus_no_auth(client, test_no_auth):
test_no_auth(client.get, "/metrics")
def test_endpoint_test_prometheus_no_user_auth(client_with_auth, test_no_auth):
test_no_auth(client_with_auth.get, "/metrics")
@pytest.mark.asyncio
async def test_prometheus_metrics(test_data, client_with_token, get_settings):
# before metrics calculation
r = client_with_token.get("/metrics")
assert r.status_code == HTTPStatus.OK
assert (
r.headers["content-type"] == "text/plain; version=0.0.4; charset=utf-8"
)
assert "disk_utilization" in r.text
assert "database_metrics" in r.text
assert "exceptions" in r.text
assert "worker_exceptions_total" in r.text
assert 'disk_utilization{type="used"}' not in r.text
# after metrics calculation
await measure_regular_metrics(
get_settings.DATABASE_PATH, 60 * 60 * 24 * 31 * 12 * 100
)
r2 = client_with_token.get("/metrics")
assert 'disk_utilization{type="used"}' in r2.text
assert 'disk_utilization{type="free"}' in r2.text
assert 'disk_utilization{type="database"}' in r2.text
assert 'database_metrics{query="count_archives"} 100.0' in r2.text
assert 'database_metrics{query="count_archive_urls"} 1000.0' in r2.text
assert 'database_metrics{query="count_users"} 3.0' in r2.text
assert (
'database_metrics_counter_total{query="count_by_user",user="rick@example.com"} 34.0'
in r2.text
)
assert (
'database_metrics_counter_total{query="count_by_user",user="morty@example.com"} 33.0'
in r2.text
)
assert (
'database_metrics_counter_total{query="count_by_user",user="jerry@example.com"} 33.0'
in r2.text
)
# 30s window, should not change the gauges nor the total in the counters
await measure_regular_metrics(get_settings.DATABASE_PATH, 30)
r3 = client_with_token.get("/metrics")
assert 'database_metrics{query="count_archives"} 100.0' in r3.text
assert 'database_metrics{query="count_archive_urls"} 1000.0' in r3.text
assert 'database_metrics{query="count_users"} 3.0' in r3.text
assert (
'database_metrics_counter_total{query="count_by_user",user="rick@example.com"} 34.0'
in r3.text
)
assert (
'database_metrics_counter_total{query="count_by_user",user="morty@example.com"} 33.0'
in r3.text
)
assert (
'database_metrics_counter_total{query="count_by_user",user="jerry@example.com"} 33.0'
in r3.text
)
def test_endpoint_get_user_permissions_no_user_auth(client, test_no_auth):
test_no_auth(client.get, "/user/permissions")
def test_endpoint_get_user_permissions(app):
m_user_state = MagicMock()
rv = {
"all": GroupInfo(read=True),
"group1": GroupInfo(archive_url=True),
}
logger.info(rv)
m_user_state.permissions = rv
app.dependency_overrides[get_user_state] = lambda: m_user_state
client = TestClient(app)
r = client.get("/user/permissions")
assert r.status_code == HTTPStatus.OK
response = r.json()
assert response.keys() == {"all", "group1"}
assert response["all"]["read"]
assert response["group1"]["read"] == []
assert response["group1"]["archive_url"]
assert response["all"]["archive_url"] is False
def test_endpoint_get_user_usage_no_user_auth(client, test_no_auth):
test_no_auth(client.get, "/user/usage")
def test_endpoint_get_user_usage_inactive(app):
m_user_state = MagicMock()
m_user_state.active = False
app.dependency_overrides[get_user_state] = lambda: m_user_state
client = TestClient(app)
r = client.get("/user/usage")
assert r.status_code == HTTPStatus.FORBIDDEN
assert r.json() == {"detail": "User is not active."}
def test_endpoint_get_user_usage_active(app):
m_user_state = MagicMock()
m_user_state.active = True
mock_usage = UsageResponse(
monthly_urls=1,
monthly_mbs=2,
total_sheets=3,
groups={
"group1": Usage(monthly_urls=4, monthly_mbs=5, total_sheets=6),
"group2": Usage(monthly_urls=7, monthly_mbs=8, total_sheets=9),
},
)
m_user_state.usage.return_value = mock_usage
app.dependency_overrides[get_user_state] = lambda: m_user_state
client = TestClient(app)
r = client.get("/user/usage")
assert r.status_code == HTTPStatus.OK
assert UsageResponse(**r.json()) == mock_usage

View File

@@ -0,0 +1,147 @@
import json
from datetime import datetime
from http import HTTPStatus
from unittest.mock import MagicMock, patch
from app.shared.db import models
def test_submit_manual_archive_unauthenticated(client, test_no_auth):
test_no_auth(client.post, "/interop/submit-archive")
def test_submit_manual_archive_not_user_auth(client_with_auth, test_no_auth):
test_no_auth(client_with_auth.post, "/interop/submit-archive")
@patch(
"app.web.routers.interoperability.business_logic",
return_value=MagicMock(
get_store_archive_until=MagicMock(return_value=datetime)
),
)
def test_submit_manual_archive(m1, client_with_token, db_session):
# normal workflow
aa_metadata = json.dumps(
{
"status": "test: success",
"metadata": {"url": "http://example.com"},
"media": [{"filename": "fn1", "urls": ["http://example.s3.com"]}],
}
)
r = client_with_token.post(
"/interop/submit-archive",
json={
"result": aa_metadata,
"public": True,
"author_id": "jerry@gmail.com",
"group_id": "spaceship",
"tags": ["test"],
"url": "http://example.com",
},
)
assert r.status_code == HTTPStatus.CREATED
assert "id" in r.json()
inserted = (
db_session.query(models.Archive)
.filter(models.Archive.id == r.json()["id"])
.first()
)
assert inserted.url == "http://example.com"
assert inserted.group_id == "spaceship"
assert inserted.author_id == "jerry@gmail.com"
assert sorted([t.id for t in inserted.tags]) == sorted(["test", "manual"])
assert inserted.public
assert isinstance(inserted.result, dict)
assert [u.url for u in inserted.urls] == ["http://example.s3.com"]
assert isinstance(inserted.store_until, datetime)
# cannot have the same URL twice
aa_metadata = json.dumps(
{
"status": "test: success",
"metadata": {"url": "http://example.com"},
"media": [
{
"filename": "fn1",
"urls": ["http://example.com", "http://example.com"],
}
],
}
)
r = client_with_token.post(
"/interop/submit-archive",
json={
"result": aa_metadata,
"public": False,
"author_id": "jerry@gmail.com",
"tags": ["test"],
"url": "http://example.com",
},
)
assert r.status_code == HTTPStatus.UNPROCESSABLE_ENTITY
assert r.json() == {
"detail": "Cannot insert into DB due to integrity error, likely duplicate urls."
}
# test with invalid JSON
def test_submit_manual_archive_invalid_json(client_with_token):
r = client_with_token.post(
"/interop/submit-archive",
json={
"result": "invalid json",
"public": False,
"author_id": "jer",
"tags": ["test"],
"url": "http://example.com",
},
)
assert r.status_code == HTTPStatus.UNPROCESSABLE_ENTITY
assert r.json() == {"detail": "Invalid JSON in result field."}
@patch(
"app.web.routers.interoperability.business_logic.get_store_archive_until",
side_effect=AssertionError("AssertionError"),
)
def test_submit_manual_archive_no_store_until(
m_sau, client_with_token, db_session
):
aa_metadata = json.dumps(
{
"status": "test: success",
"metadata": {"url": "http://example.com"},
"media": [{"filename": "fn1", "urls": ["http://example.s3.com"]}],
}
)
r = client_with_token.post(
"/interop/submit-archive",
json={
"result": aa_metadata,
"public": True,
"author_id": "jerry@gmail.com",
"group_id": "spaceship",
"tags": ["test"],
"url": "http://example.com",
},
)
assert r.status_code == HTTPStatus.CREATED
assert len(r.json()["id"]) == 36
res = (
db_session.query(models.Archive)
.filter(models.Archive.id == r.json()["id"])
.first()
)
assert res.store_until is None
# testing that store_until = None is not comparable with datetime, and will always return False
res = (
db_session.query(models.Archive)
.filter(
models.Archive.id == r.json()["id"],
models.Archive.store_until < datetime.now(),
)
.first()
)
assert res is None

View File

@@ -0,0 +1,268 @@
from datetime import datetime
from http import HTTPStatus
from unittest.mock import MagicMock, patch
from fastapi.testclient import TestClient
from app.shared.constants import STATUS_PENDING
from app.shared.db import models
from app.shared.schemas import TaskResult
from app.web.db.user_state import UserState
from app.web.security import get_user_state
def test_endpoints_no_auth(client, test_no_auth):
test_no_auth(client.post, "/sheet/create")
test_no_auth(client.get, "/sheet/mine")
test_no_auth(client.delete, "/sheet/123-sheet-id")
test_no_auth(client.post, "/sheet/123-sheet-id/archive")
def test_create_sheet_endpoint(app_with_auth, db_session):
client_with_auth = TestClient(app_with_auth)
good_data = {
"id": "123-sheet-id",
"name": "Test Sheet",
"group_id": "spaceship",
"frequency": "daily",
}
# with good data
response = client_with_auth.post("/sheet/create", json=good_data)
assert response.status_code == HTTPStatus.CREATED
j = response.json()
assert datetime.fromisoformat(j.pop("created_at"))
assert datetime.fromisoformat(j.pop("last_url_archived_at"))
assert j.pop("author_id") == "morty@example.com"
assert j == good_data
# already exists
response = client_with_auth.post("/sheet/create", json=good_data)
assert response.status_code == HTTPStatus.BAD_REQUEST
assert response.json() == {
"detail": "Sheet with this ID is already being archived."
}
# bad group
bad_data = good_data.copy()
bad_data["group_id"] = "not a group"
response = client_with_auth.post("/sheet/create", json=bad_data)
assert response.status_code == HTTPStatus.FORBIDDEN
assert response.json() == {
"detail": "User does not have access to this group."
}
# switch to jerry who's got less quota/permissions
app_with_auth.dependency_overrides[get_user_state] = lambda: UserState(
db_session, "jerry@example.com"
)
client_jerry = TestClient(app_with_auth)
# frequency not allowed
jerry_data = good_data.copy()
jerry_data["group_id"] = "animated-characters"
jerry_data["frequency"] = "hourly"
jerry_data["id"] = "jerry-sheet-id"
response = client_jerry.post("/sheet/create", json=jerry_data)
assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY
assert response.json() == {
"detail": "Invalid frequency selected for this group."
}
jerry_data["frequency"] = "daily"
# success for the first sheet, bad quota on second
response = client_jerry.post("/sheet/create", json=jerry_data)
assert response.status_code == HTTPStatus.CREATED
response = client_jerry.post("/sheet/create", json=jerry_data)
assert response.status_code == HTTPStatus.TOO_MANY_REQUESTS
assert response.json() == {
"detail": "User has reached their sheet quota for this group."
}
def test_get_user_sheets_endpoint(client_with_auth, db_session):
# no data
response = client_with_auth.get("/sheet/mine")
assert response.status_code == HTTPStatus.OK
assert response.json() == []
# with data
db_session.add(
models.Sheet(
id="123",
name="Test Sheet 1",
author_id="morty@example.com",
group_id="spaceship",
frequency="hourly",
)
)
db_session.commit()
db_session.add_all(
[
models.Sheet(
id="456",
name="Test Sheet 2",
author_id="morty@example.com",
group_id="interdimensional",
frequency="daily",
),
models.Sheet(
id="789",
name="Test Sheet 3",
author_id="rick@example.com",
group_id="interdimensional",
frequency="hourly",
),
]
)
db_session.commit()
response = client_with_auth.get("/sheet/mine")
assert response.status_code == HTTPStatus.OK
r = response.json()
assert isinstance(r, list)
assert len(r) == 2
assert datetime.fromisoformat(r[0].pop("created_at"))
assert datetime.fromisoformat(r[0].pop("last_url_archived_at"))
assert datetime.fromisoformat(r[1].pop("created_at"))
assert datetime.fromisoformat(r[1].pop("last_url_archived_at"))
assert r[0] == {
"id": "123",
"author_id": "morty@example.com",
"frequency": "hourly",
"group_id": "spaceship",
"name": "Test Sheet 1",
}
assert r[1] == {
"id": "456",
"author_id": "morty@example.com",
"frequency": "daily",
"group_id": "interdimensional",
"name": "Test Sheet 2",
}
def test_delete_sheet_endpoint(client_with_auth, db_session):
# missing sheet
response = client_with_auth.delete("/sheet/123-sheet-id")
assert response.status_code == HTTPStatus.OK
assert response.json() == {"id": "123-sheet-id", "deleted": False}
# add sheets for deletion
db_session.add_all(
[
models.Sheet(
id="123-sheet-id",
name="Test Sheet 1",
author_id="morty@example.com",
group_id="interdimensional",
frequency="daily",
),
models.Sheet(
id="456-sheet-id",
name="Test Sheet 2",
author_id="rick@example.com",
group_id="spaceship",
frequency="hourly",
),
]
)
db_session.commit()
# morty can delete his
response = client_with_auth.delete("/sheet/123-sheet-id")
assert response.status_code == HTTPStatus.OK
assert response.json() == {"id": "123-sheet-id", "deleted": True}
# but only once
response = client_with_auth.delete("/sheet/123-sheet-id")
assert response.status_code == HTTPStatus.OK
assert response.json() == {"id": "123-sheet-id", "deleted": False}
# and not Rick's
response = client_with_auth.delete("/sheet/456-sheet-id")
assert response.status_code == HTTPStatus.OK
assert response.json() == {"id": "456-sheet-id", "deleted": False}
class TestArchiveUserSheetEndpoint:
@patch("app.web.routers.sheet.celery", return_value=MagicMock())
def test_normal_flow(self, m_celery, client_with_auth, db_session):
db_session.add(
models.Sheet(
id="123-sheet-id",
name="Test Sheet 1",
author_id="morty@example.com",
group_id="spaceship",
frequency="hourly",
)
)
db_session.commit()
m_signature = MagicMock()
m_signature.apply_async.return_value = TaskResult(
id="123-taskid", status=STATUS_PENDING, result=""
)
m_celery.signature.return_value = m_signature
r = client_with_auth.post("/sheet/123-sheet-id/archive")
assert r.status_code == HTTPStatus.CREATED
assert r.json() == {"id": "123-taskid"}
m_celery.signature.assert_called_once()
m_signature.apply_async.assert_called_once()
def test_token_auth(self, client_with_token, test_no_auth):
test_no_auth(client_with_token.post, "/sheet/123-sheet-id/archive")
def test_missing_data(self, client_with_auth):
r = client_with_auth.post("/sheet/123-sheet-id/archive")
assert r.status_code == HTTPStatus.FORBIDDEN
assert r.json() == {"detail": "No access to this sheet."}
def test_no_access(self, client_with_auth, db_session):
db_session.add(
models.Sheet(
id="123-sheet-id",
name="Test Sheet 1",
author_id="rick@example.com",
group_id="spaceship",
frequency="hourly",
)
)
db_session.commit()
r = client_with_auth.post("/sheet/123-sheet-id/archive")
assert r.status_code == HTTPStatus.FORBIDDEN
assert r.json() == {"detail": "No access to this sheet."}
def test_user_not_in_group(self, client_with_auth, db_session):
db_session.add(
models.Sheet(
id="123-sheet-id",
name="Test Sheet 1",
author_id="morty@example.com",
group_id="interdimensional",
frequency="hourly",
)
)
db_session.commit()
r = client_with_auth.post("/sheet/123-sheet-id/archive")
assert r.status_code == HTTPStatus.FORBIDDEN
assert r.json() == {
"detail": "User does not have access to this group."
}
def test_user_cannot_manually_trigger(self, client_with_auth, db_session):
db_session.add(
models.Sheet(
id="123-sheet-id",
name="Test Sheet 1",
author_id="morty@example.com",
group_id="default",
frequency="hourly",
)
)
db_session.commit()
r = client_with_auth.post("/sheet/123-sheet-id/archive")
assert r.status_code == HTTPStatus.TOO_MANY_REQUESTS
assert r.json() == {
"detail": "User cannot manually trigger sheet archiving in this group."
}

View File

@@ -0,0 +1,53 @@
from http import HTTPStatus
from unittest.mock import patch
from app.shared.constants import STATUS_FAILURE, STATUS_PENDING, STATUS_SUCCESS
def test_endpoint_task_status_no_auth(client, test_no_auth):
test_no_auth(client.get, "/task/test-task-id")
@patch("app.web.routers.task.AsyncResult")
def test_get_status_success(mock_async_result, client_with_auth):
mock_async_result.return_value.status = STATUS_SUCCESS
mock_async_result.return_value.result = {"data": "some result"}
response = client_with_auth.get("/task/test-task-id")
assert response.status_code == HTTPStatus.OK
assert response.json() == {
"id": "test-task-id",
"status": STATUS_SUCCESS,
"result": {"data": "some result"},
}
@patch("app.web.routers.task.AsyncResult")
def test_get_status_failure(mock_async_result, client_with_auth):
mock_async_result.return_value.status = STATUS_FAILURE
mock_async_result.return_value.result = Exception("Some error")
response = client_with_auth.get("/task/test-task-id")
assert response.status_code == HTTPStatus.OK
assert response.json() == {
"id": "test-task-id",
"status": STATUS_FAILURE,
"result": {"error": "Some error"},
}
@patch("app.web.routers.task.AsyncResult")
def test_get_status_pending(mock_async_result, client_with_auth):
mock_async_result.return_value.status = STATUS_PENDING
mock_async_result.return_value.result = None
response = client_with_auth.get("/task/test-task-id")
assert response.status_code == HTTPStatus.OK
assert response.json() == {
"id": "test-task-id",
"status": STATUS_PENDING,
"result": None,
}

View File

@@ -0,0 +1,312 @@
import json
from http import HTTPStatus
from unittest.mock import MagicMock, patch
from app.shared import schemas
from app.shared.constants import STATUS_PENDING
from app.shared.db import worker_crud
from app.shared.schemas import ArchiveCreate, TaskResult
from app.web.config import ALLOW_ANY_EMAIL
def test_archive_url_unauthenticated(client, test_no_auth):
test_no_auth(client.post, "/url/archive")
@patch("app.web.routers.url.UserState")
@patch("app.web.routers.url.celery", return_value=MagicMock())
def test_archive_url(m_celery, m2, client_with_auth):
m_signature = MagicMock()
m_signature.apply_async.return_value = TaskResult(
id="123-456-789", status=STATUS_PENDING, result=""
)
m_celery.signature.return_value = m_signature
m_user_state = MagicMock()
m2.return_value = m_user_state
# url is too short
response = client_with_auth.post("/url/archive", json={"url": "bad"})
assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY
assert (
response.json()["detail"][0]["msg"]
== "String should have at least 5 characters"
)
m_celery.signature.assert_not_called()
# url is invalid
response = client_with_auth.post(
"/url/archive", json={"url": "example.com"}
)
assert response.status_code == HTTPStatus.BAD_REQUEST
assert response.json()["detail"] == "Invalid URL received."
# valid request
m_user_state.has_quota_max_monthly_urls.return_value = True
m_user_state.has_quota_max_monthly_mbs.return_value = True
response = client_with_auth.post(
"/url/archive", json={"url": "https://example.com"}
)
assert response.status_code == HTTPStatus.CREATED
assert response.json() == {"id": "123-456-789"}
m_celery.signature.assert_called_once()
m_signature.apply_async.assert_called_once()
called_val = m_celery.signature.call_args
assert called_val[0][0] == "create_archive_task"
assert json.loads(called_val[1]["args"][0]) == {
"id": None,
"url": "https://example.com",
"result": None,
"public": False,
"author_id": "rick@example.com",
"group_id": "default",
"tags": None,
"sheet_id": None,
"store_until": None,
"urls": None,
}
m_user_state.has_quota_max_monthly_urls.assert_called_once()
m_user_state.has_quota_max_monthly_mbs.assert_called_once()
m_user_state.in_group.assert_called_once_with("default")
# user is not in group
m_user_state.in_group.return_value = False
response = client_with_auth.post(
"/url/archive",
json={"url": "https://example.com", "group_id": "new-group"},
)
assert response.status_code == HTTPStatus.FORBIDDEN
assert (
response.json()["detail"] == "User does not have access to this group."
)
m_user_state.in_group.assert_called_with("new-group")
# user is in group
m_user_state.in_group.return_value = True
response = client_with_auth.post(
"/url/archive",
json={"url": "https://example.com", "group_id": "spaceship"},
)
assert response.status_code == HTTPStatus.CREATED
assert response.json() == {"id": "123-456-789"}
assert m_celery.signature.call_count == 2
assert m_signature.apply_async.call_count == 2
called_val = m_celery.signature.call_args
assert json.loads(called_val[1]["args"][0])["group_id"] == "spaceship"
m_user_state.in_group.assert_called_with("spaceship")
# user is over monthly URL quota
m_user_state.has_quota_max_monthly_urls.return_value = False
m_user_state.has_quota_max_monthly_mbs.return_value = True
response = client_with_auth.post(
"/url/archive",
json={"url": "https://example.com", "group_id": "spaceship"},
)
assert response.status_code == HTTPStatus.TOO_MANY_REQUESTS
assert (
response.json()["detail"] == "User has reached their monthly URL quota."
)
m_user_state.has_quota_max_monthly_urls.assert_called_with("spaceship")
# user is over monthly MB quota
m_user_state.has_quota_max_monthly_urls.return_value = True
m_user_state.has_quota_max_monthly_mbs.return_value = False
response = client_with_auth.post(
"/url/archive",
json={"url": "https://example.com", "group_id": "spacesuit"},
)
assert response.status_code == HTTPStatus.TOO_MANY_REQUESTS
assert (
response.json()["detail"] == "User has reached their monthly MB quota."
)
m_user_state.has_quota_max_monthly_mbs.assert_called_with("spacesuit")
assert m_celery.signature.call_count == 2
assert m_signature.apply_async.call_count == 2
@patch("app.web.routers.url.UserState")
def test_archive_url_quotas(m1, client_with_auth):
m_user_state = MagicMock()
m1.return_value = m_user_state
# misses on monthly URLs quota
m_user_state.has_quota_max_monthly_urls.return_value = False
response = client_with_auth.post(
"/url/archive", json={"url": "https://example.com"}
)
assert response.status_code == HTTPStatus.TOO_MANY_REQUESTS
assert (
response.json()["detail"] == "User has reached their monthly URL quota."
)
m_user_state.has_quota_max_monthly_urls.assert_called_once()
# misses on monthly MBs quota
m_user_state.has_quota_max_monthly_urls.return_value = True
m_user_state.has_quota_max_monthly_mbs.return_value = False
response = client_with_auth.post(
"/url/archive", json={"url": "https://example.com"}
)
assert response.status_code == HTTPStatus.TOO_MANY_REQUESTS
assert (
response.json()["detail"] == "User has reached their monthly MB quota."
)
m_user_state.has_quota_max_monthly_mbs.assert_called_once()
@patch("app.web.routers.url.celery", return_value=MagicMock())
def test_archive_url_with_api_token(m_celery, client_with_token):
m_signature = MagicMock()
m_signature.apply_async.return_value = TaskResult(
id="123-456-789", status=STATUS_PENDING, result=""
)
m_celery.signature.return_value = m_signature
response = client_with_token.post(
"/url/archive",
json={"url": "https://example.com", "author_id": "someone@example.com"},
)
assert response.status_code == HTTPStatus.CREATED
assert response.json() == {"id": "123-456-789"}
m_celery.signature.assert_called_once()
m_signature.apply_async.assert_called_once()
called_val = m_celery.signature.call_args
assert called_val[0][0] == "create_archive_task"
assert json.loads(called_val[1]["args"][0]) == {
"id": None,
"url": "https://example.com",
"result": None,
"public": False,
"author_id": "someone@example.com",
"group_id": "default",
"tags": None,
"sheet_id": None,
"store_until": None,
"urls": None,
}
# missing id should use ALLOW_ANY_EMAIL
response = client_with_token.post(
"/url/archive", json={"url": "https://example.com", "author_id": None}
)
assert response.status_code == HTTPStatus.CREATED
called_val = m_celery.signature.call_args
assert called_val[0][0] == "create_archive_task"
assert json.loads(called_val[1]["args"][0]) == {
"id": None,
"url": "https://example.com",
"result": None,
"public": False,
"author_id": ALLOW_ANY_EMAIL,
"group_id": "default",
"tags": None,
"sheet_id": None,
"store_until": None,
"urls": None,
}
def test_search_by_url_unauthenticated(client, test_no_auth):
test_no_auth(client.get, "/url/search")
def test_search_by_url(client_with_auth, client_with_token, db_session):
# tests the search endpoint, including through some db data for the endpoint params
response = client_with_auth.get("/url/search")
assert response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY
assert response.json()["detail"][0]["msg"] == "Field required"
response = client_with_auth.get("/url/search?url=https://example.com")
assert response.status_code == HTTPStatus.OK
assert response.json() == []
for i in range(11):
worker_crud.create_archive(
db_session,
ArchiveCreate(
id=f"url-456-{i}",
url="https://example.com"
if i < 10
else "https://something-else.com",
result={},
public=True,
author_id="rick@example.com",
),
[],
[],
)
# NB: this insertion is too fast for the ordering to be correct as they are within the same second
response = client_with_auth.get("/url/search?url=https://example.com")
assert response.status_code == HTTPStatus.OK
assert len(j := response.json()) == 10
assert "url-456-0" in [i["id"] for i in j]
assert "url-456-9" in [i["id"] for i in j]
assert "url-456-10" not in [i["id"] for i in j]
assert j[0].keys() == schemas.ArchiveResult.model_fields.keys()
response = client_with_auth.get(
"/url/search?url=https://example.com&limit=5"
)
assert response.status_code == HTTPStatus.OK
assert len(response.json()) == 5
response = client_with_auth.get(
"/url/search?url=https://example.com&skip=5&limit=2"
)
assert response.status_code == HTTPStatus.OK
assert len(response.json()) == 2
response = client_with_auth.get(
"/url/search?url=https://example.com&archived_before=2010-01-01"
)
assert response.status_code == HTTPStatus.OK
assert len(response.json()) == 0
response = client_with_auth.get(
"/url/search?url=https://example.com&archived_after=2010-01-01"
)
assert response.status_code == HTTPStatus.OK
assert len(response.json()) == 10
# API token will also work
response = client_with_token.get(
"/url/search?url=https://example.com&archived_after=2010-01-01"
)
assert response.status_code == HTTPStatus.OK
assert len(response.json()) == 10
@patch("app.web.routers.url.UserState")
def test_search_no_read_access(mock_user_state, client_with_auth):
mock_user_state.return_value.read = False
mock_user_state.return_value.read_public = False
response = client_with_auth.get("/url/search?url=https://example.com")
assert response.status_code == HTTPStatus.FORBIDDEN
assert response.json() == {"detail": "User does not have read access."}
def test_delete_task_unauthenticated(client, test_no_auth):
test_no_auth(client.delete, "/url/123-456-789")
def test_delete_task(client_with_auth, db_session):
response = client_with_auth.delete("/url/delete-123-456-789")
assert response.status_code == HTTPStatus.OK
assert response.json() == {"id": "delete-123-456-789", "deleted": False}
worker_crud.create_archive(
db_session,
ArchiveCreate(
id="delete-123-456-789",
url="https://example.com",
result={},
public=True,
author_id="morty@example.com",
),
[],
[],
)
response = client_with_auth.delete("/url/delete-123-456-789")
assert response.status_code == HTTPStatus.OK
assert response.json() == {"id": "delete-123-456-789", "deleted": True}