Our custom ordering system
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

411 lines
11 KiB

import pytest
@pytest.fixture
def example_order_data():
from datetime import datetime
from ordr3.models import OrderCategory, OrderStatus
return [
(
1,
"Ethanol",
"1-23",
"VWR",
OrderCategory.SOLVENT,
"5 l",
20.0,
2,
"EUR",
"DFG",
"urgent",
datetime(2020, 2, 3, 15, 14, 13),
"me",
OrderStatus.OPEN,
),
(
2,
"Gloves",
"12-3",
"Carl Roth",
OrderCategory.DISPOSABLE,
"100 St.",
40.0,
5,
"USD",
"BMBF",
"no comment",
datetime(2020, 2, 4, 15, 14, 13),
"you",
OrderStatus.COMPLETED,
),
]
@pytest.fixture
def example_orders(example_order_data):
from ordr3.models import OrderItem
return [OrderItem(*data) for data in example_order_data]
@pytest.fixture
def example_consumables(example_order_data):
from datetime import datetime, timedelta
from ordr3.models import OrderItem
consumables = []
order_data = example_order_data * 3
for i, data in enumerate(order_data, start=1):
order = OrderItem(*data)
order.id = i
order.created_on = datetime.utcnow() - timedelta(days=i * 150)
consumables.append(order)
return consumables
@pytest.fixture
def example_users():
from ordr3.models import User, UserRole
return [
User(1, "You", "Jim", "Smith", "jim.smith", "abcd", UserRole.ADMIN),
User(2, "Me", "Jane", "Doe", "jane.doe", "1234", UserRole.USER),
]
@pytest.fixture
def example_tokens():
from datetime import datetime, timedelta
from ordr3.models import PasswordResetToken
valid = datetime.utcnow() + timedelta(days=2)
invalid = datetime.utcnow() - timedelta(days=2)
return [
PasswordResetToken("valid_token", 1, valid),
PasswordResetToken("invalid_token", 2, invalid),
]
@pytest.fixture
def example_vendors():
from ordr3.models import Vendor
return [
Vendor("sb", "Sigma Aldrich"),
Vendor("sa", "Sigma Aldrich"),
Vendor("vw", "VWR"),
Vendor("me", "Merck"),
]
def test_sql_repo_add_order(session, example_orders):
from ordr3.models import OrderItem
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_order(example_orders[0])
session.flush()
order = session.query(OrderItem).first()
assert order == example_orders[0]
def test_sql_repo_delete_order(session, example_orders):
from ordr3.models import LogEntry, OrderItem, OrderStatus, User
from ordr3.repo import SqlAlchemyRepository
from ordr3.services import create_log_entry
repo = SqlAlchemyRepository(session)
repo.add_order(example_orders[0])
repo.add_order(example_orders[1])
user = User(*list("ABCDEFG"))
create_log_entry(example_orders[0], OrderStatus.APPROVAL, user)
create_log_entry(example_orders[1], OrderStatus.APPROVAL, user)
session.flush()
repo.delete_order(example_orders[0])
session.flush()
assert session.query(OrderItem).all() == [example_orders[1]]
assert session.query(LogEntry).all() == example_orders[1].log
def test_sql_repo_get_order(session, example_orders):
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_order(example_orders[0])
repo.add_order(example_orders[1])
session.flush()
assert example_orders[1] == repo.get_order(2)
def test_sql_repo_get_order_raises_exception(session, example_orders):
from ordr3.repo import RepoItemNotFound, SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_order(example_orders[0])
session.flush()
with pytest.raises(RepoItemNotFound):
repo.get_order(2)
def test_sql_list_consumable_candidates(session, example_consumables):
from datetime import datetime, timedelta
from ordr3.models import OrderStatus
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
for example in example_consumables:
repo.add_order(example)
session.flush()
limit_date = datetime.utcnow() - timedelta(days=2 * 365)
states = {OrderStatus.COMPLETED}
result = repo.list_consumable_candidates(limit_date, states)
assert len(result) == 2
assert [o.id for o in result] == [2, 4]
def test_sql_repo_add_user(session, example_users):
from ordr3.models import User
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_user(example_users[0])
session.flush()
user = session.query(User).first()
assert user == example_users[0]
def test_sql_repo_delte_user(session, example_users):
from ordr3.models import User
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_user(example_users[0])
repo.add_user(example_users[1])
session.flush()
repo.delete_user(example_users[0])
assert session.query(User).all() == [example_users[1]]
def test_sql_repo_get_user(session, example_users):
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_user(example_users[0])
repo.add_user(example_users[1])
session.flush()
assert example_users[1] == repo.get_user(2)
def test_sql_repo_get_user_raises_exception(session, example_users):
from ordr3.repo import RepoItemNotFound, SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_user(example_users[0])
session.flush()
with pytest.raises(RepoItemNotFound):
repo.get_user(2)
def test_sql_repo_get_user_by_username(session, example_users):
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_user(example_users[0])
repo.add_user(example_users[1])
session.flush()
assert example_users[1] == repo.get_user_by_username("Me")
def test_sql_repo_get_user_by_username_exception(session, example_users):
from ordr3.repo import RepoItemNotFound, SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_user(example_users[0])
session.flush()
with pytest.raises(RepoItemNotFound):
repo.get_user_by_username("unknown user name")
def test_sql_repo_get_user_by_email(session, example_users):
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_user(example_users[0])
repo.add_user(example_users[1])
session.flush()
assert example_users[1] == repo.get_user_by_email("jane.doe")
def test_sql_repo_get_user_by_email_exception(session, example_users):
from ordr3.repo import RepoItemNotFound, SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_user(example_users[0])
session.flush()
with pytest.raises(RepoItemNotFound):
repo.get_user_by_email("unknown email")
def test_sql_repo_count_new_users(session, example_users):
from ordr3.models import UserRole
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
example_users[0].role = UserRole.NEW
repo.add_user(example_users[0])
repo.add_user(example_users[1])
session.flush()
assert repo.count_new_users() == 1
def test_sql_search_vendor(session, example_users): # noqa: ARG001
from ordr3.models import Vendor
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
entry = Vendor("sa", "Sigma Aldrich")
session.add(entry)
session.flush()
assert repo.search_vendor("sa") == entry
assert repo.search_vendor("unknown") is None
def test_sql_get_vendor_aggregates(session, example_vendors):
from ordr3.models import VendorAggregate
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
session.add_all(example_vendors)
session.flush()
result = repo.get_vendor_aggregates("Sigma Aldrich")
assert isinstance(result, VendorAggregate)
assert result.name == "Sigma Aldrich"
assert result.terms == ["sa", "sb"]
def test_sql_get_vendor_aggregates_raises_error(session):
from ordr3.repo import RepoItemNotFound, SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
with pytest.raises(RepoItemNotFound):
repo.get_vendor_aggregates("Sigma Aldrich")
def test_sql_update_vendors(session, example_vendors):
from ordr3.models import Vendor, VendorAggregate
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
session.add_all(example_vendors)
session.flush()
old_vendor = VendorAggregate("Sigma Aldrich", None)
repo.update_vendors(old_vendor, "ACME", {"sa", "me", "sx"})
result = session.query(Vendor).order_by(Vendor.term).all()
assert result == [
Vendor("me", "ACME"),
Vendor("sa", "ACME"),
Vendor("sx", "ACME"),
Vendor("vw", "VWR"),
]
def test_sql_repo_add_reset_token(session, example_tokens):
from ordr3.models import PasswordResetToken
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_reset_token(example_tokens[0])
session.flush()
token = session.query(PasswordResetToken).first()
assert token == example_tokens[0]
def test_sql_repo_delete_reset_token(session, example_tokens):
from ordr3.models import PasswordResetToken
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_reset_token(example_tokens[0])
repo.add_reset_token(example_tokens[1])
session.flush()
repo.delete_reset_token(example_tokens[0])
tokens = session.query(PasswordResetToken).all()
assert tokens == [example_tokens[1]]
def test_sql_repo_get_reset_token(session, example_tokens):
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_reset_token(example_tokens[0])
repo.add_reset_token(example_tokens[1])
session.flush()
token = repo.get_reset_token("valid_token")
assert token == example_tokens[0]
def test_sql_repo_get_reset_token_raises_exception(session, example_tokens):
from ordr3.repo import RepoItemNotFound, SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_reset_token(example_tokens[0])
session.flush()
with pytest.raises(RepoItemNotFound):
repo.get_reset_token("unknown token")
def test_sql_clear_stale_reset_tokens(session, example_tokens):
from ordr3.models import PasswordResetToken
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_reset_token(example_tokens[0])
repo.add_reset_token(example_tokens[1])
session.flush()
repo.clear_stale_reset_tokens()
tokens = session.query(PasswordResetToken).all()
assert tokens == [example_tokens[0]]