You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
411 lines
11 KiB
411 lines
11 KiB
import pytest |
|
|
|
|
|
@pytest.fixture |
|
def example_order_data(): |
|
from datetime import datetime |
|
|
|
from ordr3.models import OrderCategory, OrderStatus |
|
|
|
return [ |
|
( |
|
1, |
|
"Ethanol", |
|
"1-23", |
|
"VWR", |
|
OrderCategory.SOLVENT, |
|
"5 l", |
|
20.0, |
|
2, |
|
"EUR", |
|
"DFG", |
|
"urgent", |
|
datetime(2020, 2, 3, 15, 14, 13), |
|
"me", |
|
OrderStatus.OPEN, |
|
), |
|
( |
|
2, |
|
"Gloves", |
|
"12-3", |
|
"Carl Roth", |
|
OrderCategory.DISPOSABLE, |
|
"100 St.", |
|
40.0, |
|
5, |
|
"USD", |
|
"BMBF", |
|
"no comment", |
|
datetime(2020, 2, 4, 15, 14, 13), |
|
"you", |
|
OrderStatus.COMPLETED, |
|
), |
|
] |
|
|
|
|
|
@pytest.fixture |
|
def example_orders(example_order_data): |
|
from ordr3.models import OrderItem |
|
|
|
return [OrderItem(*data) for data in example_order_data] |
|
|
|
|
|
@pytest.fixture |
|
def example_consumables(example_order_data): |
|
from datetime import datetime, timedelta |
|
|
|
from ordr3.models import OrderItem |
|
|
|
consumables = [] |
|
order_data = example_order_data * 3 |
|
for i, data in enumerate(order_data, start=1): |
|
order = OrderItem(*data) |
|
order.id = i |
|
order.created_on = datetime.utcnow() - timedelta(days=i * 150) |
|
consumables.append(order) |
|
|
|
return consumables |
|
|
|
|
|
@pytest.fixture |
|
def example_users(): |
|
from ordr3.models import User, UserRole |
|
|
|
return [ |
|
User(1, "You", "Jim", "Smith", "jim.smith", "abcd", UserRole.ADMIN), |
|
User(2, "Me", "Jane", "Doe", "jane.doe", "1234", UserRole.USER), |
|
] |
|
|
|
|
|
@pytest.fixture |
|
def example_tokens(): |
|
from datetime import datetime, timedelta |
|
|
|
from ordr3.models import PasswordResetToken |
|
|
|
valid = datetime.utcnow() + timedelta(days=2) |
|
invalid = datetime.utcnow() - timedelta(days=2) |
|
|
|
return [ |
|
PasswordResetToken("valid_token", 1, valid), |
|
PasswordResetToken("invalid_token", 2, invalid), |
|
] |
|
|
|
|
|
@pytest.fixture |
|
def example_vendors(): |
|
from ordr3.models import Vendor |
|
|
|
return [ |
|
Vendor("sb", "Sigma Aldrich"), |
|
Vendor("sa", "Sigma Aldrich"), |
|
Vendor("vw", "VWR"), |
|
Vendor("me", "Merck"), |
|
] |
|
|
|
|
|
def test_sql_repo_add_order(session, example_orders): |
|
from ordr3.models import OrderItem |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_order(example_orders[0]) |
|
session.flush() |
|
|
|
order = session.query(OrderItem).first() |
|
|
|
assert order == example_orders[0] |
|
|
|
|
|
def test_sql_repo_delete_order(session, example_orders): |
|
from ordr3.models import LogEntry, OrderItem, OrderStatus, User |
|
from ordr3.repo import SqlAlchemyRepository |
|
from ordr3.services import create_log_entry |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_order(example_orders[0]) |
|
repo.add_order(example_orders[1]) |
|
user = User(*list("ABCDEFG")) |
|
create_log_entry(example_orders[0], OrderStatus.APPROVAL, user) |
|
create_log_entry(example_orders[1], OrderStatus.APPROVAL, user) |
|
session.flush() |
|
|
|
repo.delete_order(example_orders[0]) |
|
session.flush() |
|
|
|
assert session.query(OrderItem).all() == [example_orders[1]] |
|
assert session.query(LogEntry).all() == example_orders[1].log |
|
|
|
|
|
def test_sql_repo_get_order(session, example_orders): |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_order(example_orders[0]) |
|
repo.add_order(example_orders[1]) |
|
session.flush() |
|
|
|
assert example_orders[1] == repo.get_order(2) |
|
|
|
|
|
def test_sql_repo_get_order_raises_exception(session, example_orders): |
|
from ordr3.repo import RepoItemNotFound, SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_order(example_orders[0]) |
|
session.flush() |
|
|
|
with pytest.raises(RepoItemNotFound): |
|
repo.get_order(2) |
|
|
|
|
|
def test_sql_list_consumable_candidates(session, example_consumables): |
|
from datetime import datetime, timedelta |
|
|
|
from ordr3.models import OrderStatus |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
for example in example_consumables: |
|
repo.add_order(example) |
|
session.flush() |
|
|
|
limit_date = datetime.utcnow() - timedelta(days=2 * 365) |
|
states = {OrderStatus.COMPLETED} |
|
|
|
result = repo.list_consumable_candidates(limit_date, states) |
|
|
|
assert len(result) == 2 |
|
assert [o.id for o in result] == [2, 4] |
|
|
|
|
|
def test_sql_repo_add_user(session, example_users): |
|
from ordr3.models import User |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_user(example_users[0]) |
|
session.flush() |
|
|
|
user = session.query(User).first() |
|
|
|
assert user == example_users[0] |
|
|
|
|
|
def test_sql_repo_delte_user(session, example_users): |
|
from ordr3.models import User |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_user(example_users[0]) |
|
repo.add_user(example_users[1]) |
|
session.flush() |
|
|
|
repo.delete_user(example_users[0]) |
|
|
|
assert session.query(User).all() == [example_users[1]] |
|
|
|
|
|
def test_sql_repo_get_user(session, example_users): |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_user(example_users[0]) |
|
repo.add_user(example_users[1]) |
|
session.flush() |
|
|
|
assert example_users[1] == repo.get_user(2) |
|
|
|
|
|
def test_sql_repo_get_user_raises_exception(session, example_users): |
|
from ordr3.repo import RepoItemNotFound, SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_user(example_users[0]) |
|
session.flush() |
|
|
|
with pytest.raises(RepoItemNotFound): |
|
repo.get_user(2) |
|
|
|
|
|
def test_sql_repo_get_user_by_username(session, example_users): |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_user(example_users[0]) |
|
repo.add_user(example_users[1]) |
|
session.flush() |
|
|
|
assert example_users[1] == repo.get_user_by_username("Me") |
|
|
|
|
|
def test_sql_repo_get_user_by_username_exception(session, example_users): |
|
from ordr3.repo import RepoItemNotFound, SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_user(example_users[0]) |
|
session.flush() |
|
|
|
with pytest.raises(RepoItemNotFound): |
|
repo.get_user_by_username("unknown user name") |
|
|
|
|
|
def test_sql_repo_get_user_by_email(session, example_users): |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_user(example_users[0]) |
|
repo.add_user(example_users[1]) |
|
session.flush() |
|
|
|
assert example_users[1] == repo.get_user_by_email("jane.doe") |
|
|
|
|
|
def test_sql_repo_get_user_by_email_exception(session, example_users): |
|
from ordr3.repo import RepoItemNotFound, SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_user(example_users[0]) |
|
session.flush() |
|
|
|
with pytest.raises(RepoItemNotFound): |
|
repo.get_user_by_email("unknown email") |
|
|
|
|
|
def test_sql_repo_count_new_users(session, example_users): |
|
from ordr3.models import UserRole |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
example_users[0].role = UserRole.NEW |
|
repo.add_user(example_users[0]) |
|
repo.add_user(example_users[1]) |
|
session.flush() |
|
|
|
assert repo.count_new_users() == 1 |
|
|
|
|
|
def test_sql_search_vendor(session, example_users): # noqa: ARG001 |
|
from ordr3.models import Vendor |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
entry = Vendor("sa", "Sigma Aldrich") |
|
session.add(entry) |
|
session.flush() |
|
|
|
assert repo.search_vendor("sa") == entry |
|
assert repo.search_vendor("unknown") is None |
|
|
|
|
|
def test_sql_get_vendor_aggregates(session, example_vendors): |
|
from ordr3.models import VendorAggregate |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
session.add_all(example_vendors) |
|
session.flush() |
|
|
|
result = repo.get_vendor_aggregates("Sigma Aldrich") |
|
|
|
assert isinstance(result, VendorAggregate) |
|
assert result.name == "Sigma Aldrich" |
|
assert result.terms == ["sa", "sb"] |
|
|
|
|
|
def test_sql_get_vendor_aggregates_raises_error(session): |
|
from ordr3.repo import RepoItemNotFound, SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
|
|
with pytest.raises(RepoItemNotFound): |
|
repo.get_vendor_aggregates("Sigma Aldrich") |
|
|
|
|
|
def test_sql_update_vendors(session, example_vendors): |
|
from ordr3.models import Vendor, VendorAggregate |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
session.add_all(example_vendors) |
|
session.flush() |
|
|
|
old_vendor = VendorAggregate("Sigma Aldrich", None) |
|
|
|
repo.update_vendors(old_vendor, "ACME", {"sa", "me", "sx"}) |
|
|
|
result = session.query(Vendor).order_by(Vendor.term).all() |
|
|
|
assert result == [ |
|
Vendor("me", "ACME"), |
|
Vendor("sa", "ACME"), |
|
Vendor("sx", "ACME"), |
|
Vendor("vw", "VWR"), |
|
] |
|
|
|
|
|
def test_sql_repo_add_reset_token(session, example_tokens): |
|
from ordr3.models import PasswordResetToken |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_reset_token(example_tokens[0]) |
|
session.flush() |
|
|
|
token = session.query(PasswordResetToken).first() |
|
|
|
assert token == example_tokens[0] |
|
|
|
|
|
def test_sql_repo_delete_reset_token(session, example_tokens): |
|
from ordr3.models import PasswordResetToken |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_reset_token(example_tokens[0]) |
|
repo.add_reset_token(example_tokens[1]) |
|
session.flush() |
|
|
|
repo.delete_reset_token(example_tokens[0]) |
|
|
|
tokens = session.query(PasswordResetToken).all() |
|
assert tokens == [example_tokens[1]] |
|
|
|
|
|
def test_sql_repo_get_reset_token(session, example_tokens): |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_reset_token(example_tokens[0]) |
|
repo.add_reset_token(example_tokens[1]) |
|
session.flush() |
|
|
|
token = repo.get_reset_token("valid_token") |
|
|
|
assert token == example_tokens[0] |
|
|
|
|
|
def test_sql_repo_get_reset_token_raises_exception(session, example_tokens): |
|
from ordr3.repo import RepoItemNotFound, SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_reset_token(example_tokens[0]) |
|
session.flush() |
|
|
|
with pytest.raises(RepoItemNotFound): |
|
repo.get_reset_token("unknown token") |
|
|
|
|
|
def test_sql_clear_stale_reset_tokens(session, example_tokens): |
|
from ordr3.models import PasswordResetToken |
|
from ordr3.repo import SqlAlchemyRepository |
|
|
|
repo = SqlAlchemyRepository(session) |
|
repo.add_reset_token(example_tokens[0]) |
|
repo.add_reset_token(example_tokens[1]) |
|
session.flush() |
|
|
|
repo.clear_stale_reset_tokens() |
|
|
|
tokens = session.query(PasswordResetToken).all() |
|
assert tokens == [example_tokens[0]]
|
|
|