mirror of
https://github.com/bellingcat/auto-archiver-api.git
synced 2026-06-08 03:28:35 +03:00
fixing tests
This commit is contained in:
@@ -39,16 +39,17 @@ def test_archive_url(m_celery, m2, client_with_auth):
|
||||
m_signature.delay.assert_called_once()
|
||||
called_val = m_celery.signature.call_args
|
||||
assert called_val[0][0] == "create_archive_task"
|
||||
assert json.loads(called_val[1]['args'][0]) == {"id": None, "url": "https://example.com", "result": None, "public": True, "author_id": "rick@example.com", "group_id": None, "tags": [], "sheet_id": None}
|
||||
assert json.loads(called_val[1]['args'][0]) == {"id": None, "url": "https://example.com", "result": None, "public": False, "author_id": "rick@example.com", "group_id": "default", "tags": None, "sheet_id": None, "store_until": None, "urls": None}
|
||||
m_user_state.has_quota_max_monthly_urls.assert_called_once()
|
||||
m_user_state.has_quota_max_monthly_mbs.assert_called_once()
|
||||
m_user_state.in_group.assert_called_once_with("default")
|
||||
|
||||
# user is not in group
|
||||
m_user_state.in_group.return_value = False
|
||||
response = client_with_auth.post("/url/archive", json={"url": "https://example.com", "group_id": "new-group"})
|
||||
assert response.status_code == 403
|
||||
assert response.json()["detail"] == "User does not have access to this group."
|
||||
m_user_state.in_group.assert_called_once_with("new-group")
|
||||
m_user_state.in_group.assert_called_with("new-group")
|
||||
|
||||
# user is in group
|
||||
m_user_state.in_group.return_value = True
|
||||
@@ -131,7 +132,7 @@ def test_search_by_url(client_with_auth, client_with_token, db_session):
|
||||
|
||||
from db import crud, schemas
|
||||
for i in range(11):
|
||||
crud.create_task(db_session, ArchiveCreate(id=f"url-456-{i}", url="https://example.com" if i < 10 else "https://something-else.com", result={}, public=True, author_id="rick@example.com", group_id=None), [], [])
|
||||
crud.create_task(db_session, ArchiveCreate(id=f"url-456-{i}", url="https://example.com" if i < 10 else "https://something-else.com", result={}, public=True, author_id="rick@example.com"), [], [])
|
||||
# NB: this insertion is too fast for the ordering to be correct as they are within the same second
|
||||
|
||||
response = client_with_auth.get("/url/search?url=https://example.com")
|
||||
@@ -184,7 +185,7 @@ def test_delete_task(client_with_auth, db_session):
|
||||
assert response.json() == {"id": "delete-123-456-789", "deleted": False}
|
||||
|
||||
from db import crud
|
||||
crud.create_task(db_session, ArchiveCreate(id="delete-123-456-789", url="https://example.com", result={}, public=True, author_id="morty@example.com", group_id=None), [], [])
|
||||
crud.create_task(db_session, ArchiveCreate(id="delete-123-456-789", url="https://example.com", result={}, public=True, author_id="morty@example.com"), [], [])
|
||||
|
||||
response = client_with_auth.delete("/url/delete-123-456-789")
|
||||
assert response.status_code == 200
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from datetime import datetime
|
||||
from unittest import mock
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
@@ -9,68 +10,74 @@ from auto_archiver import Metadata
|
||||
from auto_archiver.core import Media
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def worker_init():
|
||||
from worker.main import at_start
|
||||
at_start(None)
|
||||
|
||||
|
||||
class Test_create_archive_task():
|
||||
URL = "https://example-live.com"
|
||||
archive = schemas.ArchiveCreate(url=URL, tags=[], public=True, group_id=None, author_id="rick@example.com")
|
||||
archive = schemas.ArchiveCreate(url=URL, tags=["tag-celery"], public=True, author_id="rick@example.com", group_id="interstellar")
|
||||
|
||||
@patch("worker.main.insert_result_into_db")
|
||||
@patch("worker.main.is_group_invalid_for_user", return_value=None)
|
||||
# @patch("worker.main.choose_orchestrator")
|
||||
@patch("worker.main.get_store_until", return_value=datetime.now())
|
||||
@patch("worker.main.load_orchestrator")
|
||||
@patch("celery.app.task.Task.request")
|
||||
def test_success(self, m_req, m_choose, m_is_group, m_insert, worker_init, db_session):
|
||||
def test_success(self, m_req, m_load, m_store, m_insert, db_session):
|
||||
from worker.main import create_archive_task
|
||||
|
||||
m_req.id = "this-just-in"
|
||||
mock_orchestrator = self.mock_orchestrator_choice(m_choose)
|
||||
mock_orchestrator = self.mock_orchestrator_choice(m_load)
|
||||
|
||||
task = create_archive_task(self.archive.model_dump_json())
|
||||
|
||||
m_choose.assert_called_once()
|
||||
m_load.assert_called_once_with("interstellar")
|
||||
m_store.assert_called_once_with("interstellar")
|
||||
m_insert.assert_called_once()
|
||||
mock_orchestrator.feed_item.assert_called_once()
|
||||
|
||||
assert task["status"] == "success"
|
||||
assert task["metadata"]["url"] == self.URL
|
||||
assert len(task["media"]) == 0
|
||||
|
||||
@patch("worker.main.is_group_invalid_for_user", return_value=True)
|
||||
def test_raise_invalid(self, m_is_group, worker_init):
|
||||
def test_raise_invalid(self):
|
||||
from worker.main import create_archive_task
|
||||
with pytest.raises(Exception):
|
||||
create_archive_task(self.archive.model_dump_json())
|
||||
|
||||
@patch("worker.main.insert_result_into_db", side_effect=Exception)
|
||||
@patch("worker.main.is_group_invalid_for_user", return_value=False)
|
||||
# @patch("worker.main.choose_orchestrator")
|
||||
def test_raise_db_error(self, m_choose, m_is_group, m_insert, worker_init):
|
||||
@patch("worker.main.load_orchestrator")
|
||||
def test_raise_db_error(self, m_load, m_insert):
|
||||
from worker.main import create_archive_task
|
||||
mock_orchestrator = self.mock_orchestrator_choice(m_choose)
|
||||
mock_orchestrator = self.mock_orchestrator_choice(m_load)
|
||||
|
||||
with pytest.raises(Exception):
|
||||
create_archive_task(self.archive.model_dump_json())
|
||||
mock_orchestrator.feed_item.assert_called_once()
|
||||
|
||||
def mock_orchestrator_choice(self, m_choose):
|
||||
|
||||
@patch("worker.main.insert_result_into_db", return_value=None)
|
||||
@patch("worker.main.load_orchestrator")
|
||||
def test_raise_empty_result(self, m_load, m_insert):
|
||||
from worker.main import create_archive_task
|
||||
mock_orchestrator = self.mock_orchestrator_choice(m_load)
|
||||
|
||||
with pytest.raises(Exception) as e:
|
||||
create_archive_task(self.archive.model_dump_json())
|
||||
assert "UNABLE TO archive" in str(e)
|
||||
mock_orchestrator.feed_item.assert_called_once()
|
||||
|
||||
def mock_orchestrator_choice(self, m_load):
|
||||
mock_orchestrator = mock.MagicMock()
|
||||
mock_orchestrator.configure_mock(feed_item=mock.MagicMock(return_value=Metadata().set_url(self.URL).success()))
|
||||
m_choose.return_value = mock_orchestrator
|
||||
m_load.return_value = mock_orchestrator
|
||||
return mock_orchestrator
|
||||
|
||||
|
||||
class Test_create_sheet_task():
|
||||
URL = "https://example-live.com"
|
||||
sheet = schemas.SubmitSheet(sheet_id="123", author_id="rick@example.com", group_id=None)
|
||||
sheet = schemas.SubmitSheet(sheet_id="123", author_id="rick@example.com", group_id="interstellar", tags=["spaceship"])
|
||||
|
||||
# @patch("worker.main.insert_result_into_db")
|
||||
@patch("worker.main.models.generate_uuid", return_value="constant-uuid")
|
||||
@patch("worker.main.is_group_invalid_for_user", return_value=False)
|
||||
@patch("worker.main.ArchivingOrchestrator")
|
||||
def test_success(self, m_orch_generator, m_is_group, m_uuid, worker_init, db_session):
|
||||
@patch("worker.main.get_store_until", return_value=datetime.now())
|
||||
@patch("worker.main.load_orchestrator")
|
||||
def test_success(self, m_load, m_store, m_uuid, db_session):
|
||||
from worker.main import create_sheet_task
|
||||
|
||||
assert db_session.query(models.Archive).filter(models.Archive.url == self.URL).count() == 0
|
||||
@@ -79,50 +86,35 @@ class Test_create_sheet_task():
|
||||
mock_metadata.add_media(Media("fn1.txt", urls=["outcome1.com"]))
|
||||
m_orch = MagicMock()
|
||||
m_orch.feed.return_value = iter([False, mock_metadata, mock_metadata])
|
||||
m_orch_generator.return_value = m_orch
|
||||
m_load.return_value = m_orch
|
||||
|
||||
res = create_sheet_task(self.sheet.model_dump_json())
|
||||
assert res["archived"] == 1
|
||||
assert res["failed"] == 0
|
||||
assert len(res["errors"]) == 0
|
||||
assert res["sheet"] == "Sheet"
|
||||
|
||||
m_load.assert_called_once_with("interstellar", True, {'configurations': {'gsheet_feeder': {'sheet_id': '123'}}})
|
||||
m_orch.feed.assert_called_once()
|
||||
m_store.assert_called_with("interstellar")
|
||||
m_store.call_count == 2
|
||||
m_uuid.call_count == 2
|
||||
assert type(res) == dict
|
||||
assert res["stats"]["archived"] == 1
|
||||
assert res["stats"]["failed"] == 1
|
||||
assert len(res["stats"]["errors"]) == 1
|
||||
assert res["sheet_id"] == "123"
|
||||
assert res["success"] == True
|
||||
assert len(res["time"]) > 0
|
||||
assert res["success"]
|
||||
assert type(res["time"]) == datetime
|
||||
|
||||
# query created archive entry
|
||||
inserted = db_session.query(models.Archive).filter(models.Archive.url == self.URL).one()
|
||||
assert inserted is not None
|
||||
assert inserted.url == self.URL
|
||||
assert inserted.tags[0].id == "gsheet"
|
||||
|
||||
@patch("worker.main.insert_result_into_db", side_effect=Exception("some-error"))
|
||||
@patch("worker.main.models.generate_uuid", return_value="constant-uuid")
|
||||
@patch("worker.main.is_group_invalid_for_user", return_value=False)
|
||||
@patch("worker.main.ArchivingOrchestrator")
|
||||
def test_has_exception(self, m_orch_generator, m_is_group, m_uuid, worker_init, db_session):
|
||||
from worker.main import create_sheet_task
|
||||
|
||||
assert db_session.query(models.Archive).filter(models.Archive.url == self.URL).count() == 0
|
||||
|
||||
mock_metadata = Metadata().set_url(self.URL).success()
|
||||
mock_metadata.add_media(Media("fn1.txt", urls=["outcome1.com"]))
|
||||
m_orch = MagicMock()
|
||||
m_orch.feed.return_value = iter([mock_metadata])
|
||||
m_orch_generator.return_value = m_orch
|
||||
|
||||
res = create_sheet_task(self.sheet.model_dump_json())
|
||||
print(res)
|
||||
assert res["archived"] == 0
|
||||
assert res["failed"] == 1
|
||||
assert res["errors"] == ["some-error"]
|
||||
assert res["sheet_id"] == "123"
|
||||
assert res["success"] == True
|
||||
|
||||
assert db_session.query(models.Archive).filter(models.Archive.url == self.URL).count() == 0
|
||||
assert len(inserted.tags) == 1
|
||||
assert inserted.tags[0].id == "spaceship"
|
||||
assert inserted.group_id == "interstellar"
|
||||
assert inserted.author_id == "rick@example.com"
|
||||
assert inserted.public == False
|
||||
|
||||
|
||||
def test_get_all_urls(worker_init, db_session):
|
||||
def test_get_all_urls(db_session):
|
||||
from worker.main import get_all_urls
|
||||
from auto_archiver import Metadata
|
||||
|
||||
|
||||
Reference in New Issue
Block a user