mirror of
https://github.com/bellingcat/auto-archiver-api.git
synced 2026-06-11 13:08:34 +03:00
minor refactor and user_state tests
This commit is contained in:
@@ -29,9 +29,9 @@ def mock_settings():
|
||||
def test_db(get_settings: Settings):
|
||||
from app.shared.db import models
|
||||
from app.shared.db.database import make_engine
|
||||
from app.web.db.crud import get_user_groups
|
||||
from app.web.db.crud import get_user_group_names
|
||||
|
||||
get_user_groups.cache_clear()
|
||||
get_user_group_names.cache_clear()
|
||||
make_engine.cache_clear()
|
||||
engine = make_engine(get_settings.DATABASE_PATH)
|
||||
|
||||
|
||||
@@ -231,47 +231,6 @@ def test_count_by_users_since(test_data, db_session):
|
||||
assert cu[2].total == 33
|
||||
|
||||
|
||||
def test_is_user_in_group(test_data, db_session):
|
||||
from app.web.db import crud
|
||||
from app.web.config import ALLOW_ANY_EMAIL
|
||||
|
||||
# see user-groups.test.yaml
|
||||
test_pairs = [
|
||||
(ALLOW_ANY_EMAIL, "spaceship", True),
|
||||
(ALLOW_ANY_EMAIL, "non-existant!@#!%!", True),
|
||||
|
||||
("rick@example.com", "spaceship", True),
|
||||
("rick@example.com", "SPACESHIP", False),
|
||||
("rick@example.com", "interdimensional", True),
|
||||
("rick@example.com", "animated-characters", True),
|
||||
("rick@example.com", "the-jerrys-club", False),
|
||||
|
||||
("morty@example.com", "spaceship", True),
|
||||
("morty@example.com", "interdimensional", False),
|
||||
("morty@example.com", "the-jerrys-club", False),
|
||||
|
||||
("jerry@example.com", "spaceship", False),
|
||||
("jerry@example.com", "interdimensional", False),
|
||||
("jerry@example.com", "the-jerrys-club", False), # group not in 'groups'
|
||||
|
||||
("rick@example.com", "animated-characters", True),
|
||||
("morty@example.com", "animated-characters", True),
|
||||
("jerry@example.com", "animated-characters", True),
|
||||
("anyone@example.com", "animated-characters", True),
|
||||
("anyone@birdy.com", "animated-characters", True),
|
||||
|
||||
("summer@herself.com", "animated-characters", False),
|
||||
|
||||
("rick@example.com", "", False),
|
||||
("", "spaceship", False),
|
||||
("bademailexample.com", "spaceship", False),
|
||||
]
|
||||
for email, group, expected in test_pairs:
|
||||
print(f"{email} in {group} == {expected}")
|
||||
assert crud.is_user_in_group(email, group) == expected
|
||||
|
||||
|
||||
|
||||
def test_upsert_group(test_data, db_session):
|
||||
from app.web.db import crud
|
||||
|
||||
|
||||
433
app/tests/web/db/test_user_state.py
Normal file
433
app/tests/web/db/test_user_state.py
Normal file
@@ -0,0 +1,433 @@
|
||||
|
||||
from unittest.mock import MagicMock, PropertyMock, patch
|
||||
import pytest
|
||||
|
||||
from app.shared.db import models
|
||||
from app.shared.user_groups import GroupInfo, GroupPermissions
|
||||
from app.web.db.user_state import UserState
|
||||
|
||||
|
||||
def fresh_user_state():
|
||||
return UserState(None, email="test@example.com")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def user_state():
|
||||
return fresh_user_state()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def user_state_with_groups(user_state):
|
||||
user_groups = [
|
||||
models.Group(id="no-permissions", permissions={}),
|
||||
models.Group(id="group1", description="this is g1", service_account_email="sa1@example.com", permissions={"read": ["group1", "no-permissions"], "read_public": True, "archive_url": True, "archive_sheet": True, "max_archive_lifespan_months": 24, "max_monthly_urls": 100, "max_monthly_mbs": 1000, "priority": "high"}),
|
||||
models.Group(id="group2", description="this is g2", service_account_email="sa2@example.com", permissions={"read": ["all"], "read_public": True, "archive_url": False, "archive_sheet": False, "max_archive_lifespan_months": -1, "max_monthly_urls": -1, "max_monthly_mbs": -1, "priority": "low", "sheet_frequency": {"daily"}}),
|
||||
]
|
||||
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=user_groups):
|
||||
yield user_state
|
||||
|
||||
|
||||
def test_permissions(user_state_with_groups):
|
||||
permissions = user_state_with_groups.permissions
|
||||
|
||||
assert permissions["all"].read == True
|
||||
assert permissions["all"].read_public == True
|
||||
assert permissions["all"].archive_url == True
|
||||
assert permissions["all"].archive_sheet == True
|
||||
assert permissions["all"].max_archive_lifespan_months == -1
|
||||
assert permissions["all"].max_monthly_urls == -1
|
||||
assert permissions["all"].max_monthly_mbs == -1
|
||||
assert permissions["all"].priority == "high"
|
||||
|
||||
assert permissions["group1"].read == set(["group1", "no-permissions"])
|
||||
assert permissions["group1"].read_public == True
|
||||
assert permissions["group1"].archive_url == True
|
||||
assert permissions["group1"].archive_sheet == True
|
||||
assert permissions["group1"].max_archive_lifespan_months == 24
|
||||
assert permissions["group1"].max_monthly_urls == 100
|
||||
assert permissions["group1"].max_monthly_mbs == 1000
|
||||
assert permissions["group1"].priority == "high"
|
||||
|
||||
assert permissions["group2"].read == set(["all"])
|
||||
assert permissions["group2"].read_public == True
|
||||
assert permissions["group2"].archive_url == False
|
||||
assert permissions["group2"].archive_sheet == False
|
||||
assert permissions["group2"].max_archive_lifespan_months == -1
|
||||
assert permissions["group2"].max_monthly_urls == -1
|
||||
assert permissions["group2"].max_monthly_mbs == -1
|
||||
assert permissions["group2"].priority == "low"
|
||||
|
||||
assert len(permissions) == 3
|
||||
|
||||
|
||||
def test_user_groups_names(user_state):
|
||||
with patch('app.web.db.crud.get_user_group_names', return_value=["group1", "group2"]) as mock:
|
||||
assert user_state.user_groups_names == ["group1", "group2", "default"]
|
||||
mock.assert_called_once_with(None, "test@example.com")
|
||||
|
||||
|
||||
def test_user_groups(user_state):
|
||||
with patch('app.web.db.crud.get_user_groups_by_name', return_value=[MagicMock(), MagicMock()]) as mock:
|
||||
user_state._user_groups_names = ["group1", "group2"]
|
||||
assert len(user_state.user_groups) == 2
|
||||
mock.assert_called_once_with(None, ["group1", "group2"])
|
||||
|
||||
|
||||
def test_read():
|
||||
us = fresh_user_state()
|
||||
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="no-permissions", permissions={})]) as mock:
|
||||
assert not hasattr(us, "_read")
|
||||
assert us.read == set()
|
||||
assert us._read == set()
|
||||
mock.assert_called_once()
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"read": ["group1", "no-permissions"]})]):
|
||||
assert us.read == set(["group1", "no-permissions"])
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"read": ["all"]})]):
|
||||
assert us.read == True
|
||||
|
||||
|
||||
def test_read_public():
|
||||
us = fresh_user_state()
|
||||
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="no-permissions", permissions={})]) as mock:
|
||||
assert not hasattr(us, "_read_public")
|
||||
assert us.read_public == False
|
||||
assert us._read_public == False
|
||||
mock.assert_called_once()
|
||||
# no new calls
|
||||
assert us.read_public == False
|
||||
mock.assert_called_once()
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"read_public": True})]):
|
||||
assert us.read_public == True
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"read_public": False})]):
|
||||
assert us.read_public == False
|
||||
|
||||
|
||||
def test_archive_url():
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="no-permissions", permissions={})]) as mock:
|
||||
assert not hasattr(us, "_archive_url")
|
||||
assert us.archive_url == False
|
||||
assert us._archive_url == False
|
||||
mock.assert_called_once()
|
||||
# no new calls
|
||||
assert us.archive_url == False
|
||||
mock.assert_called_once()
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"archive_url": False})]):
|
||||
assert us.archive_url == False
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"archive_url": True})]):
|
||||
assert us.archive_url == True
|
||||
|
||||
|
||||
def test_archive_sheet():
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="no-permissions", permissions={})]) as mock:
|
||||
assert not hasattr(us, "_archive_sheet")
|
||||
assert us.archive_sheet == False
|
||||
assert us._archive_sheet == False
|
||||
mock.assert_called_once()
|
||||
# no new calls
|
||||
assert us.archive_sheet == False
|
||||
mock.assert_called_once()
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"archive_sheet": False})]):
|
||||
assert us.archive_sheet == False
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"archive_sheet": True})]):
|
||||
assert us.archive_sheet == True
|
||||
|
||||
|
||||
def test_sheet_frequency():
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="no-permissions", permissions={})]) as mock:
|
||||
assert not hasattr(us, "_sheet_frequency")
|
||||
assert us.sheet_frequency == set()
|
||||
assert us._sheet_frequency == set()
|
||||
mock.assert_called_once()
|
||||
# no new calls
|
||||
assert us.sheet_frequency == set()
|
||||
mock.assert_called_once()
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"sheet_frequency": ["daily", "hourly"]})]):
|
||||
assert us.sheet_frequency == {"daily", "hourly"}
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"sheet_frequency": []})]):
|
||||
assert us.sheet_frequency == set()
|
||||
|
||||
|
||||
def test_max_archive_lifespan_months():
|
||||
us = fresh_user_state()
|
||||
default = GroupPermissions.model_fields["max_archive_lifespan_months"].default
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="no-permissions", permissions={})]) as mock:
|
||||
assert not hasattr(us, "_max_archive_lifespan_months")
|
||||
assert us.max_archive_lifespan_months == default
|
||||
assert us._max_archive_lifespan_months == default
|
||||
mock.assert_called_once()
|
||||
# no new calls
|
||||
assert us.max_archive_lifespan_months == default
|
||||
mock.assert_called_once()
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"max_archive_lifespan_months": 24})]):
|
||||
assert us.max_archive_lifespan_months == 24
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"max_archive_lifespan_months": 150}), models.Group(id="group2", permissions={"max_archive_lifespan_months": -1})]):
|
||||
assert us.max_archive_lifespan_months == -1
|
||||
|
||||
|
||||
def test_max_monthly_urls():
|
||||
us = fresh_user_state()
|
||||
default = GroupPermissions.model_fields["max_monthly_urls"].default
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="no-permissions", permissions={})]) as mock:
|
||||
assert not hasattr(us, "_max_monthly_urls")
|
||||
assert us.max_monthly_urls == default
|
||||
assert us._max_monthly_urls == default
|
||||
mock.assert_called_once()
|
||||
# no new calls
|
||||
assert us.max_monthly_urls == default
|
||||
mock.assert_called_once()
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"max_monthly_urls": 100})]):
|
||||
assert us.max_monthly_urls == 100
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"max_monthly_urls": 150}), models.Group(id="group2", permissions={"max_monthly_urls": -1})]):
|
||||
assert us.max_monthly_urls == -1
|
||||
|
||||
|
||||
def test_max_monthly_mbs():
|
||||
us = fresh_user_state()
|
||||
default = GroupPermissions.model_fields["max_monthly_mbs"].default
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="no-permissions", permissions={})]) as mock:
|
||||
assert not hasattr(us, "_max_monthly_mbs")
|
||||
assert us.max_monthly_mbs == default
|
||||
assert us._max_monthly_mbs == default
|
||||
mock.assert_called_once()
|
||||
# no new calls
|
||||
assert us.max_monthly_mbs == default
|
||||
mock.assert_called_once()
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"max_monthly_mbs": 1000})]):
|
||||
assert us.max_monthly_mbs == 1000
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"max_monthly_mbs": 1500}), models.Group(id="group2", permissions={"max_monthly_mbs": -1})]):
|
||||
assert us.max_monthly_mbs == -1
|
||||
|
||||
|
||||
def test_priority(user_state):
|
||||
default = GroupPermissions.model_fields["priority"].default
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="no-permissions", permissions={})]) as mock:
|
||||
assert not hasattr(user_state, "_priority")
|
||||
assert user_state.priority == default
|
||||
assert user_state._priority == default
|
||||
mock.assert_called_once()
|
||||
# no new calls
|
||||
assert user_state.priority == default
|
||||
mock.assert_called_once()
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"priority": "high"})]):
|
||||
assert us.priority == "high"
|
||||
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[models.Group(id="group1", permissions={"priority": "low"}), models.Group(id="group2", permissions={"priority": "medium"})]):
|
||||
assert us.priority == "low"
|
||||
|
||||
|
||||
def test_active():
|
||||
for read, read_public, archive_url, archive_sheet, is_active in [
|
||||
(False, False, False, False, False),
|
||||
(True, False, False, False, True),
|
||||
(False, True, False, False, True),
|
||||
(False, False, True, False, True),
|
||||
(False, False, False, True, True)
|
||||
]:
|
||||
us = fresh_user_state()
|
||||
with patch.object(UserState, 'read', new_callable=PropertyMock, return_value=read), \
|
||||
patch.object(UserState, 'read_public', new_callable=PropertyMock, return_value=read_public), \
|
||||
patch.object(UserState, 'archive_url', new_callable=PropertyMock, return_value=archive_url), \
|
||||
patch.object(UserState, 'archive_sheet', new_callable=PropertyMock, return_value=archive_sheet):
|
||||
assert us.active == is_active
|
||||
|
||||
|
||||
def test_in_group(user_state):
|
||||
with patch.object(UserState, 'user_groups_names', new_callable=PropertyMock, return_value=["group1", "group2"]):
|
||||
assert user_state.in_group("group1") == True
|
||||
assert user_state.in_group("group2") == True
|
||||
assert user_state.in_group("group3") == False
|
||||
|
||||
|
||||
def test_usage(db_session):
|
||||
user_state = UserState(db_session, email="test@example.com")
|
||||
user_sheets = [
|
||||
MagicMock(group_id="group1", sheet_count=5),
|
||||
MagicMock(group_id="group2", sheet_count=10),
|
||||
MagicMock(group_id="group3", sheet_count=100),
|
||||
]
|
||||
bytes = [1000000, 2000000, 3000000]
|
||||
urls_by_group = [
|
||||
MagicMock(group_id="group1", url_count=50, total_bytes=bytes[0]),
|
||||
MagicMock(group_id="group2", url_count=100, total_bytes=bytes[1]),
|
||||
MagicMock(group_id="group4", url_count=5, total_bytes=bytes[2]),
|
||||
]
|
||||
megabytes = int(sum(bytes) / 1024 / 1024)
|
||||
|
||||
with patch.object(db_session, 'query', side_effect=[
|
||||
MagicMock(filter=MagicMock(return_value=MagicMock(group_by=MagicMock(return_value=MagicMock(all=MagicMock(return_value=user_sheets)))))),
|
||||
MagicMock(filter=MagicMock(return_value=MagicMock(group_by=MagicMock(return_value=MagicMock(all=MagicMock(return_value=urls_by_group))))))
|
||||
]):
|
||||
usage_response = user_state.usage()
|
||||
|
||||
assert usage_response.monthly_urls == 155
|
||||
assert usage_response.monthly_mbs == megabytes
|
||||
assert usage_response.total_sheets == 115
|
||||
|
||||
assert usage_response.groups["group1"].monthly_urls == 50
|
||||
assert usage_response.groups["group1"].monthly_mbs == int(bytes[0] / 1024 / 1024)
|
||||
assert usage_response.groups["group1"].total_sheets == 5
|
||||
|
||||
assert usage_response.groups["group2"].monthly_urls == 100
|
||||
assert usage_response.groups["group2"].monthly_mbs == int(bytes[1] / 1024 / 1024)
|
||||
assert usage_response.groups["group2"].total_sheets == 10
|
||||
|
||||
assert usage_response.groups["group3"].monthly_urls == 0
|
||||
assert usage_response.groups["group3"].monthly_mbs == 0
|
||||
assert usage_response.groups["group3"].total_sheets == 100
|
||||
|
||||
assert usage_response.groups["group4"].monthly_urls == 5
|
||||
assert usage_response.groups["group4"].monthly_mbs == int(bytes[2] / 1024 / 1024)
|
||||
assert usage_response.groups["group4"].total_sheets == 0
|
||||
|
||||
|
||||
def test_has_quota_monthly_sheets(db_session):
|
||||
us = UserState(db_session, email="test@example.com")
|
||||
|
||||
test_cases = [
|
||||
({"unkonwn": GroupInfo(max_sheets=5)}, 1, False),
|
||||
({"group1": GroupInfo(max_sheets=-1)}, 1000, True),
|
||||
({"group1": GroupInfo(max_sheets=5)}, 3, True),
|
||||
({"group1": GroupInfo(max_sheets=5)}, 5, False),
|
||||
({"group1": GroupInfo(max_sheets=5)}, 6, False),
|
||||
]
|
||||
|
||||
for permissions, count, expected in test_cases:
|
||||
with patch.object(UserState, 'permissions', new_callable=PropertyMock, return_value=permissions):
|
||||
with patch.object(us.db, 'query', return_value=MagicMock(filter=MagicMock(return_value=MagicMock(count=MagicMock(return_value=count))))):
|
||||
assert us.has_quota_monthly_sheets("group1") == expected
|
||||
|
||||
|
||||
def test_has_quota_max_monthly_urls(db_session):
|
||||
us = UserState(db_session, email="test@example.com")
|
||||
|
||||
test_cases = [
|
||||
({"group1": GroupInfo(max_monthly_urls=-1)}, 1000, True),
|
||||
({"group1": GroupInfo(max_monthly_urls=100)}, 50, True),
|
||||
({"group1": GroupInfo(max_monthly_urls=100)}, 100, False),
|
||||
({"group1": GroupInfo(max_monthly_urls=100)}, 150, False),
|
||||
]
|
||||
|
||||
for permissions, count, expected in test_cases:
|
||||
with patch.object(UserState, 'permissions', new_callable=PropertyMock, return_value=permissions):
|
||||
with patch.object(us.db, 'query', return_value=MagicMock(filter=MagicMock(return_value=MagicMock(count=MagicMock(return_value=count))))):
|
||||
assert us.has_quota_max_monthly_urls("group1") == expected
|
||||
test_cases = [
|
||||
(-1, 1000, True),
|
||||
(100, 50, True),
|
||||
(100, 100, False),
|
||||
(100, 150, False),
|
||||
]
|
||||
|
||||
for max_urls, count, expected in test_cases:
|
||||
with patch.object(UserState, 'max_monthly_urls', new_callable=PropertyMock, return_value=max_urls):
|
||||
with patch.object(us.db, 'query', return_value=MagicMock(filter=MagicMock(return_value=MagicMock(count=MagicMock(return_value=count))))):
|
||||
assert us.has_quota_max_monthly_urls("") == expected
|
||||
|
||||
|
||||
def test_has_quota_max_monthly_mbs(db_session):
|
||||
us = UserState(db_session, email="test@example.com")
|
||||
|
||||
test_cases = [
|
||||
({"group1": GroupInfo(max_monthly_mbs=-1)}, 1000, True),
|
||||
({"group1": GroupInfo(max_monthly_mbs=100)}, 50, True),
|
||||
({"group1": GroupInfo(max_monthly_mbs=100)}, 100, False),
|
||||
({"group1": GroupInfo(max_monthly_mbs=100)}, 150, False),
|
||||
]
|
||||
|
||||
for permissions, mbs, expected in test_cases:
|
||||
with patch.object(UserState, 'permissions', new_callable=PropertyMock, return_value=permissions):
|
||||
with patch.object(us.db, 'query', return_value=MagicMock(filter=MagicMock(return_value=MagicMock(with_entities=MagicMock(return_value=MagicMock(scalar=MagicMock(return_value=mbs * 1024 * 1024))))))):
|
||||
assert us.has_quota_max_monthly_mbs("group1") == expected
|
||||
|
||||
test_cases = [
|
||||
(-1, 1000, True),
|
||||
(100, 50, True),
|
||||
(100, 100, False),
|
||||
(100, 150, False),
|
||||
]
|
||||
|
||||
for max_mbs, mbs, expected in test_cases:
|
||||
with patch.object(UserState, 'max_monthly_mbs', new_callable=PropertyMock, return_value=max_mbs):
|
||||
with patch.object(us.db, 'query', return_value=MagicMock(filter=MagicMock(return_value=MagicMock(with_entities=MagicMock(return_value=MagicMock(scalar=MagicMock(return_value=mbs * 1024 * 1024))))))):
|
||||
assert us.has_quota_max_monthly_mbs("") == expected
|
||||
|
||||
|
||||
def test_can_manually_trigger(user_state):
|
||||
permissions = {
|
||||
"group1": GroupInfo(manually_trigger_sheet=True),
|
||||
"group2": GroupInfo(manually_trigger_sheet=False),
|
||||
}
|
||||
|
||||
with patch.object(UserState, 'permissions', new_callable=PropertyMock, return_value=permissions):
|
||||
assert user_state.can_manually_trigger("group1") == True
|
||||
assert user_state.can_manually_trigger("group2") == False
|
||||
assert user_state.can_manually_trigger("group3") == False
|
||||
|
||||
|
||||
def test_is_sheet_frequency_allowed(user_state):
|
||||
permissions = {
|
||||
"group1": GroupInfo(sheet_frequency={"daily", "hourly"}),
|
||||
"group2": GroupInfo(sheet_frequency={"daily"}),
|
||||
}
|
||||
|
||||
with patch.object(UserState, 'permissions', new_callable=PropertyMock, return_value=permissions):
|
||||
assert user_state.is_sheet_frequency_allowed("group1", "daily") == True
|
||||
assert user_state.is_sheet_frequency_allowed("group1", "hourly") == True
|
||||
assert user_state.is_sheet_frequency_allowed("group1", "weekly") == False
|
||||
assert user_state.is_sheet_frequency_allowed("group2", "hourly") == False
|
||||
assert user_state.is_sheet_frequency_allowed("group2", "daily") == True
|
||||
assert user_state.is_sheet_frequency_allowed("group3", "daily") == False
|
||||
|
||||
|
||||
def test_priority_group(user_state):
|
||||
from app.web.utils.misc import convert_priority_to_queue_dict
|
||||
with patch.object(UserState, 'user_groups', new_callable=PropertyMock, return_value=[
|
||||
models.Group(id="group1", permissions={"priority": "high"}),
|
||||
models.Group(id="group2", permissions={"priority": "medium"}),
|
||||
models.Group(id="group3", permissions={"priority": "low"}),
|
||||
]):
|
||||
assert user_state.priority_group("group1") == convert_priority_to_queue_dict("high")
|
||||
assert user_state.priority_group("group2") == convert_priority_to_queue_dict("medium")
|
||||
assert user_state.priority_group("group3") == convert_priority_to_queue_dict("low")
|
||||
assert user_state.priority_group("group4") == convert_priority_to_queue_dict("low")
|
||||
@@ -5,15 +5,17 @@ from sqlalchemy import Column, or_, func, select
|
||||
from loguru import logger
|
||||
from datetime import datetime, timedelta
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from cachetools import LRUCache, cached
|
||||
from cachetools.keys import hashkey
|
||||
|
||||
from app.web.config import ALLOW_ANY_EMAIL
|
||||
from app.shared.db.database import get_db
|
||||
from app.shared.db import models
|
||||
from app.shared.settings import get_settings
|
||||
from app.shared.user_groups import UserGroups
|
||||
from app.shared.utils.misc import fnv1a_hash_mod
|
||||
from app.web.utils.misc import convert_priority_to_queue_dict
|
||||
|
||||
|
||||
DATABASE_QUERY_LIMIT = get_settings().DATABASE_QUERY_LIMIT
|
||||
|
||||
|
||||
@@ -33,7 +35,7 @@ def base_query(db: Session):
|
||||
def get_archive(db: Session, id: str, email: str):
|
||||
query = base_query(db).filter(models.Archive.id == id)
|
||||
if email != ALLOW_ANY_EMAIL:
|
||||
groups = get_user_groups(email)
|
||||
groups = get_user_group_names(db ,email)
|
||||
query = query.filter(or_(models.Archive.public == True, models.Archive.author_id == email, models.Archive.group_id.in_(groups)))
|
||||
return query.first()
|
||||
|
||||
@@ -42,7 +44,7 @@ def search_archives_by_url(db: Session, url: str, email: str, skip: int = 0, lim
|
||||
# searches for partial URLs, if email is * no ownership filtering happens
|
||||
query = base_query(db)
|
||||
if email != ALLOW_ANY_EMAIL:
|
||||
groups = get_user_groups(email)
|
||||
groups = get_user_group_names(db, email)
|
||||
query = query.filter(or_(models.Archive.public == True, models.Archive.author_id == email, models.Archive.group_id.in_(groups)))
|
||||
if absolute_search:
|
||||
query = query.filter(models.Archive.url == url)
|
||||
@@ -108,38 +110,39 @@ async def soft_delete_expired_archives(db: AsyncSession) -> dict:
|
||||
# --------------- TAG
|
||||
|
||||
|
||||
def is_user_in_group(email: str, group_name: str) -> models.Group:
|
||||
if email == ALLOW_ANY_EMAIL: return True
|
||||
return len(group_name) and len(email) and group_name in get_user_groups(email)
|
||||
|
||||
|
||||
async def get_group_priority_async(db: AsyncSession, group_id: str) -> dict:
|
||||
db_group = await db.get(models.Group, group_id)
|
||||
priority = db_group.permissions.get("priority", "low") if db_group else "low"
|
||||
return convert_priority_to_queue_dict(priority)
|
||||
|
||||
@lru_cache
|
||||
def get_user_groups(email: str) -> list[str]:
|
||||
|
||||
@cached(cache=LRUCache(maxsize=128), key=lambda db, email: hashkey(email))
|
||||
def get_user_group_names(db: Session, email: str) -> list[str]:
|
||||
"""
|
||||
given an email retrieves the user groups from the DB and then the email-domain groups from a global variable, the email does not need to belong to an existing user.
|
||||
"""
|
||||
if not email or not len(email) or "@" not in email: return []
|
||||
|
||||
with get_db() as db:
|
||||
# get user groups
|
||||
user_groups = db.query(models.association_table_user_groups).filter_by(user_id=email).with_entities(Column("group_id")).all()
|
||||
user_level_groups_names = [g[0] for g in user_groups]
|
||||
# get user groups
|
||||
user_groups = db.query(models.association_table_user_groups).filter_by(user_id=email).with_entities(Column("group_id")).all()
|
||||
user_level_groups_names = [g[0] for g in user_groups]
|
||||
|
||||
# get domain groups
|
||||
domain = email.split('@')[1]
|
||||
domain_level_groups = db.query(models.Group.id).filter(models.Group.domains.contains(domain)).with_entities(Column("id")).all()
|
||||
domain_level_groups_names = [g[0] for g in domain_level_groups]
|
||||
# get domain groups
|
||||
domain = email.split('@')[1]
|
||||
domain_level_groups = db.query(models.Group.id).filter(models.Group.domains.contains(domain)).with_entities(Column("id")).all()
|
||||
domain_level_groups_names = [g[0] for g in domain_level_groups]
|
||||
|
||||
return list(set(user_level_groups_names + domain_level_groups_names))
|
||||
return list(set(user_level_groups_names + domain_level_groups_names))
|
||||
|
||||
|
||||
def get_user_groups_by_name(db: Session, groups: list[str]) -> list[models.Group]:
|
||||
return db.query(models.Group).filter(
|
||||
models.Group.id.in_(groups)
|
||||
).all()
|
||||
|
||||
# --------------- INIT User-Groups
|
||||
|
||||
|
||||
def upsert_group(db: Session, group_name: str, description: str, orchestrator: str, orchestrator_sheet: str, service_account_email: str, permissions: dict, domains: list) -> models.Group:
|
||||
db_group = db.query(models.Group).filter(models.Group.id == group_name).first()
|
||||
if db_group is None:
|
||||
|
||||
@@ -47,15 +47,13 @@ class UserState:
|
||||
@property
|
||||
def user_groups_names(self):
|
||||
if not hasattr(self, '_user_groups_names'):
|
||||
self._user_groups_names = crud.get_user_groups(self.email) + ["default"]
|
||||
self._user_groups_names = crud.get_user_group_names(self.db, self.email) + ["default"]
|
||||
return self._user_groups_names
|
||||
|
||||
@property
|
||||
def user_groups(self):
|
||||
if not hasattr(self, '_user_groups'):
|
||||
self._user_groups = self.db.query(models.Group).filter(
|
||||
models.Group.id.in_(self.user_groups_names)
|
||||
).all()
|
||||
self._user_groups = crud.get_user_groups_by_name(self.db, self.user_groups_names)
|
||||
return self._user_groups
|
||||
|
||||
@property
|
||||
@@ -150,8 +148,9 @@ class UserState:
|
||||
self._priority = "low"
|
||||
for group in self.user_groups:
|
||||
if not group.permissions: continue
|
||||
if group.permissions.get("priority", "low") == "high":
|
||||
if group.permissions.get("priority", self._priority) == "high":
|
||||
self._priority = "high"
|
||||
break
|
||||
return self._priority
|
||||
|
||||
@property
|
||||
|
||||
Reference in New Issue
Block a user