mirror of
https://github.com/bellingcat/auto-archiver-api.git
synced 2026-06-12 05:28:34 +03:00
further worker/web separation and tests fixed
This commit is contained in:
128
app/tests/web/endpoints/test_default.py
Normal file
128
app/tests/web/endpoints/test_default.py
Normal file
@@ -0,0 +1,128 @@
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from fastapi.testclient import TestClient
|
||||
import pytest
|
||||
from app.shared.config import VERSION
|
||||
from app.tests.web.db.test_crud import test_data
|
||||
|
||||
|
||||
def test_endpoint_home(client_with_auth):
|
||||
r = client_with_auth.get("/")
|
||||
assert r.status_code == 200
|
||||
j = r.json()
|
||||
assert "version" in j and j["version"] == VERSION
|
||||
assert "breakingChanges" in j
|
||||
assert "groups" not in j
|
||||
|
||||
|
||||
@patch("app.web.endpoints.default.bearer_security", new_callable=AsyncMock)
|
||||
@patch("app.web.endpoints.default.get_user_auth", new_callable=AsyncMock, return_value="test@example.com")
|
||||
@patch("app.web.endpoints.default.crud.get_user_groups", return_value=["group1", "group2"])
|
||||
def test_endpoint_home_with_groups(m1, m2, m3, client_with_auth):
|
||||
r = client_with_auth.get("/")
|
||||
assert r.status_code == 200
|
||||
j = r.json()
|
||||
assert "version" in j and j["version"] == VERSION
|
||||
assert "breakingChanges" in j
|
||||
assert "groups" in j
|
||||
assert j["groups"] == ["group1", "group2"]
|
||||
|
||||
|
||||
@patch("app.web.endpoints.default.bearer_security", new_callable=AsyncMock)
|
||||
@patch("app.web.endpoints.default.get_user_auth", new_callable=AsyncMock, return_value="test@example.com")
|
||||
@patch("app.web.endpoints.default.crud.get_user_groups", side_effect=Exception('mocked error'))
|
||||
def test_endpoint_home_with_groups_exception(m1, m2, m3, client_with_auth): # mocks call that triggers an internal error
|
||||
r = client_with_auth.get("/")
|
||||
assert r.status_code == 200
|
||||
j = r.json()
|
||||
assert "version" in j and j["version"] == VERSION
|
||||
assert "breakingChanges" in j
|
||||
assert "groups" not in j
|
||||
|
||||
|
||||
def test_endpoint_health(client_with_auth):
|
||||
r = client_with_auth.get("/health")
|
||||
assert r.status_code == 200
|
||||
assert r.json() == {"status": "ok"}
|
||||
|
||||
|
||||
def test_endpoint_active_no_auth(client, test_no_auth):
|
||||
test_no_auth(client.get, "/user/active")
|
||||
|
||||
|
||||
def test_endpoint_active(app):
|
||||
m_user_state = MagicMock()
|
||||
|
||||
from app.web.security import get_user_state
|
||||
app.dependency_overrides[get_user_state] = lambda: m_user_state
|
||||
|
||||
# inactive user
|
||||
m_user_state.active = False
|
||||
client = TestClient(app)
|
||||
r = client.get("/user/active")
|
||||
assert r.status_code == 200
|
||||
assert r.json() == {"active": False}
|
||||
|
||||
# active user
|
||||
m_user_state.active = True
|
||||
client = TestClient(app)
|
||||
r = client.get("/user/active")
|
||||
assert r.status_code == 200
|
||||
assert r.json() == {"active": True}
|
||||
|
||||
|
||||
|
||||
def test_no_serve_local_archive_by_default(client_with_auth):
|
||||
r = client_with_auth.get("/app/local_archive_test/temp.txt")
|
||||
assert r.status_code == 404
|
||||
|
||||
|
||||
def test_favicon(client_with_auth):
|
||||
r = client_with_auth.get("/favicon.ico")
|
||||
assert r.status_code == 200
|
||||
assert r.headers["content-type"] == "image/vnd.microsoft.icon"
|
||||
|
||||
|
||||
def test_endpoint_test_prometheus_no_auth(client, test_no_auth):
|
||||
test_no_auth(client.get, "/metrics")
|
||||
|
||||
|
||||
def test_endpoint_test_prometheus_no_user_auth(client_with_auth, test_no_auth):
|
||||
test_no_auth(client_with_auth.get, "/metrics")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prometheus_metrics(test_data, client_with_token, get_settings):
|
||||
# before metrics calculation
|
||||
r = client_with_token.get("/metrics")
|
||||
assert r.status_code == 200
|
||||
assert r.headers["content-type"] == "text/plain; version=0.0.4; charset=utf-8"
|
||||
assert "disk_utilization" in r.text
|
||||
assert "database_metrics" in r.text
|
||||
assert "exceptions" in r.text
|
||||
assert "worker_exceptions_total" in r.text
|
||||
assert 'disk_utilization{type="used"}' not in r.text
|
||||
|
||||
# after metrics calculation
|
||||
from app.web.utils.metrics import measure_regular_metrics
|
||||
await measure_regular_metrics(get_settings.DATABASE_PATH, 60 * 60 * 24 * 31 * 12 * 100)
|
||||
r2 = client_with_token.get("/metrics")
|
||||
assert 'disk_utilization{type="used"}' in r2.text
|
||||
assert 'disk_utilization{type="free"}' in r2.text
|
||||
assert 'disk_utilization{type="database"}' in r2.text
|
||||
assert 'database_metrics{query="count_archives"} 100.0' in r2.text
|
||||
assert 'database_metrics{query="count_archive_urls"} 1000.0' in r2.text
|
||||
assert 'database_metrics{query="count_users"} 3.0' in r2.text
|
||||
assert 'database_metrics_counter_total{query="count_by_user",user="rick@example.com"} 34.0' in r2.text
|
||||
assert 'database_metrics_counter_total{query="count_by_user",user="morty@example.com"} 33.0' in r2.text
|
||||
assert 'database_metrics_counter_total{query="count_by_user",user="jerry@example.com"} 33.0' in r2.text
|
||||
|
||||
# 30s window, should not change the gauges nor the total in the counters
|
||||
from app.web.utils.metrics import measure_regular_metrics
|
||||
await measure_regular_metrics(get_settings.DATABASE_PATH, 30)
|
||||
r3 = client_with_token.get("/metrics")
|
||||
assert 'database_metrics{query="count_archives"} 100.0' in r3.text
|
||||
assert 'database_metrics{query="count_archive_urls"} 1000.0' in r3.text
|
||||
assert 'database_metrics{query="count_users"} 3.0' in r3.text
|
||||
assert 'database_metrics_counter_total{query="count_by_user",user="rick@example.com"} 34.0' in r3.text
|
||||
assert 'database_metrics_counter_total{query="count_by_user",user="morty@example.com"} 33.0' in r3.text
|
||||
assert 'database_metrics_counter_total{query="count_by_user",user="jerry@example.com"} 33.0' in r3.text
|
||||
40
app/tests/web/endpoints/test_interoperability.py
Normal file
40
app/tests/web/endpoints/test_interoperability.py
Normal file
@@ -0,0 +1,40 @@
|
||||
from datetime import datetime
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from app.shared.config import ALLOW_ANY_EMAIL
|
||||
from app.web.db import crud
|
||||
|
||||
|
||||
def test_submit_manual_archive_unauthenticated(client, test_no_auth):
|
||||
test_no_auth(client.post, "/interop/submit-archive")
|
||||
|
||||
|
||||
def test_submit_manual_archive_not_user_auth(client_with_auth, test_no_auth):
|
||||
test_no_auth(client_with_auth.post, "/interop/submit-archive")
|
||||
|
||||
|
||||
@patch("app.web.endpoints.interoperability.business_logic", return_value=MagicMock(get_store_archive_until=MagicMock(return_value=datetime)))
|
||||
def test_submit_manual_archive(m1, client_with_token, db_session):
|
||||
# normal workflow
|
||||
aa_metadata = json.dumps({"status": "test: success", "metadata": {"url": "http://example.com"}, "media": [{"filename": "fn1", "urls": ["http://example.s3.com"]}]})
|
||||
r = client_with_token.post("/interop/submit-archive", json={"result": aa_metadata, "public": True, "author_id": "jerry@gmail.com", "group_id": "spaceship", "tags": ["test"], "url": "http://example.com"})
|
||||
assert r.status_code == 201
|
||||
assert "id" in r.json()
|
||||
|
||||
inserted = crud.get_archive(db_session, r.json()["id"], ALLOW_ANY_EMAIL)
|
||||
assert inserted.url == "http://example.com"
|
||||
assert inserted.group_id == "spaceship"
|
||||
assert inserted.author_id == "jerry@gmail.com"
|
||||
assert sorted([t.id for t in inserted.tags]) == sorted(["test", "manual"])
|
||||
assert inserted.public
|
||||
assert type(inserted.result) == dict
|
||||
assert [u.url for u in inserted.urls] == ["http://example.s3.com"]
|
||||
assert type(inserted.store_until) == datetime
|
||||
|
||||
|
||||
# cannot have the same URL twice
|
||||
aa_metadata = json.dumps({"status": "test: success", "metadata": {"url": "http://example.com"}, "media": [{"filename": "fn1", "urls": ["http://example.com", "http://example.com"]}]})
|
||||
r = client_with_token.post("/interop/submit-archive", json={"result": aa_metadata, "public": False, "author_id": "jerry@gmail.com", "tags": ["test"], "url": "http://example.com"})
|
||||
assert r.status_code == 422
|
||||
assert r.json() == {"detail": "Cannot insert into DB due to integrity error, likely duplicate urls."}
|
||||
193
app/tests/web/endpoints/test_sheet.py
Normal file
193
app/tests/web/endpoints/test_sheet.py
Normal file
@@ -0,0 +1,193 @@
|
||||
from datetime import datetime
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from app.shared.schemas import TaskResult
|
||||
|
||||
|
||||
def test_endpoints_no_auth(client, test_no_auth):
|
||||
test_no_auth(client.post, "/sheet/create")
|
||||
test_no_auth(client.get, "/sheet/mine")
|
||||
test_no_auth(client.delete, "/sheet/123-sheet-id")
|
||||
test_no_auth(client.post, "/sheet/123-sheet-id/archive")
|
||||
|
||||
|
||||
def test_create_sheet_endpoint(app_with_auth, db_session):
|
||||
client_with_auth = TestClient(app_with_auth)
|
||||
good_data = {
|
||||
"id": "123-sheet-id",
|
||||
"name": "Test Sheet",
|
||||
"group_id": "spaceship",
|
||||
"frequency": "daily"
|
||||
}
|
||||
|
||||
# with good data
|
||||
response = client_with_auth.post("/sheet/create", json=good_data)
|
||||
assert response.status_code == 201
|
||||
j = response.json()
|
||||
assert datetime.fromisoformat(j.pop("created_at"))
|
||||
assert datetime.fromisoformat(j.pop("last_url_archived_at"))
|
||||
assert j.pop("author_id") == 'morty@example.com'
|
||||
assert j == good_data
|
||||
|
||||
# already exists
|
||||
response = client_with_auth.post("/sheet/create", json=good_data)
|
||||
assert response.status_code == 400
|
||||
assert response.json() == {"detail": "Sheet with this ID is already being archived."}
|
||||
|
||||
# bad group
|
||||
bad_data = good_data.copy()
|
||||
bad_data["group_id"] = "not a group"
|
||||
response = client_with_auth.post("/sheet/create", json=bad_data)
|
||||
assert response.status_code == 403
|
||||
assert response.json() == {"detail": "User does not have access to this group."}
|
||||
|
||||
# switch to jerry who's got less quota/permissions
|
||||
from app.web.security import get_user_state
|
||||
from app.web.db.user_state import UserState
|
||||
app_with_auth.dependency_overrides[get_user_state] = lambda: UserState(db_session, "jerry@example.com")
|
||||
client_jerry = TestClient(app_with_auth)
|
||||
|
||||
# frequency not allowed
|
||||
jerry_data = good_data.copy()
|
||||
jerry_data["group_id"] = "animated-characters"
|
||||
jerry_data["frequency"] = "hourly"
|
||||
jerry_data["id"] = "jerry-sheet-id"
|
||||
response = client_jerry.post("/sheet/create", json=jerry_data)
|
||||
assert response.status_code == 422
|
||||
assert response.json() == {"detail": "Invalid frequency selected for this group."}
|
||||
|
||||
jerry_data["frequency"] = "daily"
|
||||
# success for the first sheet, bad quota on second
|
||||
response = client_jerry.post("/sheet/create", json=jerry_data)
|
||||
assert response.status_code == 201
|
||||
|
||||
response = client_jerry.post("/sheet/create", json=jerry_data)
|
||||
assert response.status_code == 429
|
||||
assert response.json() == {"detail": "User has reached their sheet quota for this group."}
|
||||
|
||||
|
||||
def test_get_user_sheets_endpoint(client_with_auth, db_session):
|
||||
# no data
|
||||
response = client_with_auth.get("/sheet/mine")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == []
|
||||
|
||||
# with data
|
||||
from app.shared.db import models
|
||||
db_session.add(
|
||||
models.Sheet(id="123", name="Test Sheet 1", author_id="morty@example.com", group_id="spaceship", frequency="hourly")
|
||||
)
|
||||
db_session.commit()
|
||||
db_session.add_all([
|
||||
models.Sheet(id="456", name="Test Sheet 2", author_id="morty@example.com", group_id="interdimensional", frequency="daily"),
|
||||
models.Sheet(id="789", name="Test Sheet 3", author_id="rick@example.com", group_id="interdimensional", frequency="hourly"),
|
||||
])
|
||||
db_session.commit()
|
||||
|
||||
response = client_with_auth.get("/sheet/mine")
|
||||
assert response.status_code == 200
|
||||
r = response.json()
|
||||
assert isinstance(r, list)
|
||||
assert len(r) == 2
|
||||
assert datetime.fromisoformat(r[0].pop("created_at"))
|
||||
assert datetime.fromisoformat(r[0].pop("last_url_archived_at"))
|
||||
assert datetime.fromisoformat(r[1].pop("created_at"))
|
||||
assert datetime.fromisoformat(r[1].pop("last_url_archived_at"))
|
||||
assert r[0] == {
|
||||
'id': '123',
|
||||
'author_id': 'morty@example.com',
|
||||
'frequency': 'hourly',
|
||||
'group_id': 'spaceship',
|
||||
'name': 'Test Sheet 1',
|
||||
}
|
||||
assert r[1] == {
|
||||
'id': '456',
|
||||
'author_id': 'morty@example.com',
|
||||
'frequency': 'daily',
|
||||
'group_id': 'interdimensional',
|
||||
'name': 'Test Sheet 2',
|
||||
}
|
||||
|
||||
|
||||
def test_delete_sheet_endpoint(client_with_auth, db_session):
|
||||
# missing sheet
|
||||
response = client_with_auth.delete("/sheet/123-sheet-id")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {
|
||||
"id": "123-sheet-id",
|
||||
"deleted": False
|
||||
}
|
||||
|
||||
# add sheets for deletion
|
||||
from app.shared.db import models
|
||||
db_session.add_all([
|
||||
models.Sheet(id="123-sheet-id", name="Test Sheet 1", author_id="morty@example.com", group_id="interdimensional", frequency="daily"),
|
||||
models.Sheet(id="456-sheet-id", name="Test Sheet 2", author_id="rick@example.com", group_id="spaceship", frequency="hourly"),
|
||||
])
|
||||
db_session.commit()
|
||||
|
||||
# morty can delete his
|
||||
response = client_with_auth.delete("/sheet/123-sheet-id")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"id": "123-sheet-id", "deleted": True}
|
||||
# but only once
|
||||
response = client_with_auth.delete("/sheet/123-sheet-id")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"id": "123-sheet-id", "deleted": False}
|
||||
# and not rick's
|
||||
response = client_with_auth.delete("/sheet/456-sheet-id")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"id": "456-sheet-id", "deleted": False}
|
||||
|
||||
|
||||
class TestArchiveUserSheetEndpoint:
|
||||
@patch("app.web.endpoints.sheet.celery", return_value=MagicMock())
|
||||
def test_normal_flow(self, m_celery, client_with_auth, db_session):
|
||||
from app.shared.db import models
|
||||
db_session.add(models.Sheet(id="123-sheet-id", name="Test Sheet 1", author_id="morty@example.com", group_id="spaceship", frequency="hourly"))
|
||||
db_session.commit()
|
||||
|
||||
m_signature = MagicMock()
|
||||
m_signature.delay.return_value = TaskResult(id="123-taskid", status="PENDING", result="")
|
||||
m_celery.signature.return_value = m_signature
|
||||
|
||||
r = client_with_auth.post("/sheet/123-sheet-id/archive")
|
||||
assert r.status_code == 201
|
||||
assert r.json() == {"id": "123-taskid"}
|
||||
m_celery.signature.assert_called_once()
|
||||
m_signature.delay.assert_called_once()
|
||||
|
||||
def test_token_auth(self, client_with_token, test_no_auth):
|
||||
test_no_auth(client_with_token.post, "/sheet/123-sheet-id/archive")
|
||||
|
||||
def test_missing_data(self, client_with_auth):
|
||||
r = client_with_auth.post("/sheet/123-sheet-id/archive")
|
||||
assert r.status_code == 403
|
||||
assert r.json() == {"detail": "No access to this sheet."}
|
||||
|
||||
def test_no_access(self, client_with_auth, db_session):
|
||||
from app.shared.db import models
|
||||
db_session.add(models.Sheet(id="123-sheet-id", name="Test Sheet 1", author_id="rick@example.com", group_id="spaceship", frequency="hourly"))
|
||||
db_session.commit()
|
||||
r = client_with_auth.post("/sheet/123-sheet-id/archive")
|
||||
assert r.status_code == 403
|
||||
assert r.json() == {"detail": "No access to this sheet."}
|
||||
|
||||
def test_user_not_in_group(self, client_with_auth, db_session):
|
||||
from app.shared.db import models
|
||||
db_session.add(models.Sheet(id="123-sheet-id", name="Test Sheet 1", author_id="morty@example.com", group_id="interdimensional", frequency="hourly"))
|
||||
db_session.commit()
|
||||
r = client_with_auth.post("/sheet/123-sheet-id/archive")
|
||||
assert r.status_code == 403
|
||||
assert r.json() == {"detail": "User does not have access to this group."}
|
||||
|
||||
def test_user_cannot_manually_trigger(self, client_with_auth, db_session):
|
||||
from app.shared.db import models
|
||||
db_session.add(models.Sheet(id="123-sheet-id", name="Test Sheet 1", author_id="morty@example.com", group_id="default", frequency="hourly"))
|
||||
db_session.commit()
|
||||
r = client_with_auth.post("/sheet/123-sheet-id/archive")
|
||||
assert r.status_code == 429
|
||||
assert r.json() == {"detail": "User cannot manually trigger sheet archiving in this group."}
|
||||
51
app/tests/web/endpoints/test_task.py
Normal file
51
app/tests/web/endpoints/test_task.py
Normal file
@@ -0,0 +1,51 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
def test_endpoint_task_status_no_auth(client, test_no_auth):
|
||||
test_no_auth(client.get, "/task/test-task-id")
|
||||
|
||||
|
||||
@patch("app.web.endpoints.task.AsyncResult")
|
||||
def test_get_status_success(mock_async_result, client_with_auth):
|
||||
mock_async_result.return_value.status = "SUCCESS"
|
||||
mock_async_result.return_value.result = {"data": "some result"}
|
||||
|
||||
response = client_with_auth.get("/task/test-task-id")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {
|
||||
"id": "test-task-id",
|
||||
"status": "SUCCESS",
|
||||
"result": {"data": "some result"}
|
||||
}
|
||||
|
||||
|
||||
@patch("app.web.endpoints.task.AsyncResult")
|
||||
def test_get_status_failure(mock_async_result, client_with_auth):
|
||||
|
||||
mock_async_result.return_value.status = "FAILURE"
|
||||
mock_async_result.return_value.result = Exception("Some error")
|
||||
|
||||
response = client_with_auth.get("/task/test-task-id")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {
|
||||
"id": "test-task-id",
|
||||
"status": "FAILURE",
|
||||
"result": {"error": "Some error"}
|
||||
}
|
||||
|
||||
|
||||
@patch("app.web.endpoints.task.AsyncResult")
|
||||
def test_get_status_pending(mock_async_result, client_with_auth):
|
||||
mock_async_result.return_value.status = "PENDING"
|
||||
mock_async_result.return_value.result = None
|
||||
|
||||
response = client_with_auth.get("/task/test-task-id")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {
|
||||
"id": "test-task-id",
|
||||
"status": "PENDING",
|
||||
"result": None
|
||||
}
|
||||
194
app/tests/web/endpoints/test_url.py
Normal file
194
app/tests/web/endpoints/test_url.py
Normal file
@@ -0,0 +1,194 @@
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from app.shared.schemas import ArchiveCreate, TaskResult
|
||||
|
||||
|
||||
def test_archive_url_unauthenticated(client, test_no_auth):
|
||||
test_no_auth(client.post, "/url/archive")
|
||||
|
||||
|
||||
@patch("app.web.endpoints.url.UserState")
|
||||
@patch("app.web.endpoints.url.celery", return_value=MagicMock())
|
||||
def test_archive_url(m_celery, m2, client_with_auth):
|
||||
m_signature = MagicMock()
|
||||
m_signature.delay.return_value = TaskResult(id="123-456-789", status="PENDING", result="")
|
||||
m_celery.signature.return_value = m_signature
|
||||
|
||||
m_user_state = MagicMock()
|
||||
m2.return_value = m_user_state
|
||||
|
||||
# url is too short
|
||||
response = client_with_auth.post("/url/archive", json={"url": "bad"})
|
||||
assert response.status_code == 422
|
||||
assert response.json()["detail"][0]["msg"] == 'String should have at least 5 characters'
|
||||
m_celery.signature.assert_not_called()
|
||||
|
||||
# url is invalid
|
||||
response = client_with_auth.post("/url/archive", json={"url": "example.com"})
|
||||
assert response.status_code == 400
|
||||
assert response.json()["detail"] == "Invalid URL received."
|
||||
|
||||
# valid request
|
||||
m_user_state.has_quota_max_monthly_urls.return_value = True
|
||||
m_user_state.has_quota_max_monthly_mbs.return_value = True
|
||||
response = client_with_auth.post("/url/archive", json={"url": "https://example.com"})
|
||||
assert response.status_code == 201
|
||||
assert response.json() == {'id': '123-456-789'}
|
||||
m_celery.signature.assert_called_once()
|
||||
m_signature.delay.assert_called_once()
|
||||
called_val = m_celery.signature.call_args
|
||||
assert called_val[0][0] == "create_archive_task"
|
||||
assert json.loads(called_val[1]['args'][0]) == {"id": None, "url": "https://example.com", "result": None, "public": False, "author_id": "rick@example.com", "group_id": "default", "tags": None, "sheet_id": None, "store_until": None, "urls": None}
|
||||
m_user_state.has_quota_max_monthly_urls.assert_called_once()
|
||||
m_user_state.has_quota_max_monthly_mbs.assert_called_once()
|
||||
m_user_state.in_group.assert_called_once_with("default")
|
||||
|
||||
# user is not in group
|
||||
m_user_state.in_group.return_value = False
|
||||
response = client_with_auth.post("/url/archive", json={"url": "https://example.com", "group_id": "new-group"})
|
||||
assert response.status_code == 403
|
||||
assert response.json()["detail"] == "User does not have access to this group."
|
||||
m_user_state.in_group.assert_called_with("new-group")
|
||||
|
||||
# user is in group
|
||||
m_user_state.in_group.return_value = True
|
||||
response = client_with_auth.post("/url/archive", json={"url": "https://example.com", "group_id": "spaceship"})
|
||||
assert response.status_code == 201
|
||||
assert response.json() == {'id': '123-456-789'}
|
||||
assert m_celery.signature.call_count == 2
|
||||
assert m_signature.delay.call_count == 2
|
||||
called_val = m_celery.signature.call_args
|
||||
assert json.loads(called_val[1]['args'][0])["group_id"] == "spaceship"
|
||||
m_user_state.in_group.assert_called_with("spaceship")
|
||||
|
||||
# user is over monthly URL quota
|
||||
m_user_state.has_quota_max_monthly_urls.return_value = False
|
||||
m_user_state.has_quota_max_monthly_mbs.return_value = True
|
||||
response = client_with_auth.post("/url/archive", json={"url": "https://example.com", "group_id": "spaceship"})
|
||||
assert response.status_code == 429
|
||||
assert response.json()["detail"] == "User has reached their monthly URL quota."
|
||||
m_user_state.has_quota_max_monthly_urls.assert_called_with("spaceship")
|
||||
|
||||
# user is over monthly MB quota
|
||||
m_user_state.has_quota_max_monthly_urls.return_value = True
|
||||
m_user_state.has_quota_max_monthly_mbs.return_value = False
|
||||
response = client_with_auth.post("/url/archive", json={"url": "https://example.com", "group_id": "spacesuit"})
|
||||
assert response.status_code == 429
|
||||
assert response.json()["detail"] == "User has reached their monthly MB quota."
|
||||
m_user_state.has_quota_max_monthly_mbs.assert_called_with("spacesuit")
|
||||
assert m_celery.signature.call_count == 2
|
||||
assert m_signature.delay.call_count == 2
|
||||
|
||||
|
||||
@patch("app.web.endpoints.url.UserState")
|
||||
def test_archive_url_quotas(m1, client_with_auth):
|
||||
m_user_state = MagicMock()
|
||||
m1.return_value = m_user_state
|
||||
|
||||
# misses on monthly URLs quota
|
||||
m_user_state.has_quota_max_monthly_urls.return_value = False
|
||||
response = client_with_auth.post("/url/archive", json={"url": "https://example.com"})
|
||||
assert response.status_code == 429
|
||||
assert response.json()["detail"] == "User has reached their monthly URL quota."
|
||||
m_user_state.has_quota_max_monthly_urls.assert_called_once()
|
||||
|
||||
# misses on monthly MBs quota
|
||||
m_user_state.has_quota_max_monthly_urls.return_value = True
|
||||
m_user_state.has_quota_max_monthly_mbs.return_value = False
|
||||
response = client_with_auth.post("/url/archive", json={"url": "https://example.com"})
|
||||
assert response.status_code == 429
|
||||
assert response.json()["detail"] == "User has reached their monthly MB quota."
|
||||
m_user_state.has_quota_max_monthly_mbs.assert_called_once()
|
||||
|
||||
|
||||
@patch("app.web.endpoints.url.celery", return_value=MagicMock())
|
||||
def test_archive_url_with_api_token(m_celery, client_with_token):
|
||||
m_signature = MagicMock()
|
||||
m_signature.delay.return_value = TaskResult(id="123-456-789", status="PENDING", result="")
|
||||
m_celery.signature.return_value = m_signature
|
||||
response = client_with_token.post("/url/archive", json={"url": "https://example.com"})
|
||||
assert response.status_code == 201
|
||||
assert response.json() == {'id': '123-456-789'}
|
||||
m_celery.signature.assert_called_once()
|
||||
m_signature.delay.assert_called_once()
|
||||
called_val = m_celery.signature.call_args
|
||||
assert called_val[0][0] == "create_archive_task"
|
||||
|
||||
|
||||
def test_search_by_url_unauthenticated(client, test_no_auth):
|
||||
test_no_auth(client.get, "/url/search")
|
||||
|
||||
|
||||
def test_search_by_url(client_with_auth, client_with_token, db_session):
|
||||
# tests the search endpoint, including through some db data for the endpoint params
|
||||
response = client_with_auth.get("/url/search")
|
||||
assert response.status_code == 422
|
||||
assert response.json()["detail"][0]["msg"] == "Field required"
|
||||
|
||||
response = client_with_auth.get("/url/search?url=https://example.com")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == []
|
||||
|
||||
from app.shared import schemas
|
||||
from app.shared.db import worker_crud
|
||||
for i in range(11):
|
||||
#TODO: fix as this method is gone to shared.db
|
||||
worker_crud.create_task(db_session, ArchiveCreate(id=f"url-456-{i}", url="https://example.com" if i < 10 else "https://something-else.com", result={}, public=True, author_id="rick@example.com"), [], [])
|
||||
# NB: this insertion is too fast for the ordering to be correct as they are within the same second
|
||||
|
||||
response = client_with_auth.get("/url/search?url=https://example.com")
|
||||
assert response.status_code == 200
|
||||
assert len(j := response.json()) == 10
|
||||
assert "url-456-0" in [i["id"] for i in j]
|
||||
assert "url-456-9" in [i["id"] for i in j]
|
||||
assert "url-456-10" not in [i["id"] for i in j]
|
||||
assert j[0].keys() == schemas.ArchiveResult.model_fields.keys()
|
||||
|
||||
response = client_with_auth.get("/url/search?url=https://example.com&limit=5")
|
||||
assert response.status_code == 200
|
||||
assert len(response.json()) == 5
|
||||
|
||||
response = client_with_auth.get("/url/search?url=https://example.com&skip=5&limit=2")
|
||||
assert response.status_code == 200
|
||||
assert len(response.json()) == 2
|
||||
|
||||
response = client_with_auth.get("/url/search?url=https://example.com&archived_before=2010-01-01")
|
||||
assert response.status_code == 200
|
||||
assert len(response.json()) == 0
|
||||
|
||||
response = client_with_auth.get("/url/search?url=https://example.com&archived_after=2010-01-01")
|
||||
assert response.status_code == 200
|
||||
assert len(response.json()) == 10
|
||||
|
||||
# API token will also work
|
||||
response = client_with_token.get("/url/search?url=https://example.com&archived_after=2010-01-01")
|
||||
assert response.status_code == 200
|
||||
assert len(response.json()) == 10
|
||||
|
||||
|
||||
@patch("app.web.endpoints.url.UserState")
|
||||
def test_search_no_read_access(mock_user_state, client_with_auth):
|
||||
mock_user_state.return_value.read = False
|
||||
mock_user_state.return_value.read_public = False
|
||||
|
||||
response = client_with_auth.get("/url/search?url=https://example.com")
|
||||
assert response.status_code == 403
|
||||
assert response.json() == {"detail": "User does not have read access."}
|
||||
|
||||
|
||||
def test_delete_task_unauthenticated(client, test_no_auth):
|
||||
test_no_auth(client.delete, "/url/123-456-789")
|
||||
|
||||
|
||||
def test_delete_task(client_with_auth, db_session):
|
||||
response = client_with_auth.delete("/url/delete-123-456-789")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"id": "delete-123-456-789", "deleted": False}
|
||||
|
||||
from app.shared.db import worker_crud
|
||||
worker_crud.create_task(db_session, ArchiveCreate(id="delete-123-456-789", url="https://example.com", result={}, public=True, author_id="morty@example.com"), [], [])
|
||||
|
||||
response = client_with_auth.delete("/url/delete-123-456-789")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"id": "delete-123-456-789", "deleted": True}
|
||||
Reference in New Issue
Block a user