Our custom ordering system
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

470 lines
15 KiB

from datetime import datetime, timedelta
import pytest
from ordr3.models import Vendor, VendorAggregate
from ordr3.repo import AbstractOrderRepository
class FakeOrderRepository(AbstractOrderRepository):
"""Repository implementation for testing"""
def __init__(self, session): # noqa: ARG002
self._orders = set()
self._users = set()
self._vendors = set()
self._tokens = set()
def add_order(self, order):
"""add an order to the datastore"""
self._orders.add(order)
def get_order(self, reference):
"""retrieve an order from the datastore"""
return next(o for o in self._orders if o.id == reference)
def delete_order(self, order):
"""removes a user from the datastore"""
self._orders.remove(order)
def list_consumable_candidates(self, limit_date, statuses):
"""list orders, sorted by descending creation date"""
newer_orders = (o for o in self._orders if o.created_on > limit_date)
valid_orders = (o for o in newer_orders if o.status in statuses)
return sorted(valid_orders, reverse=True, key=lambda x: x.created_on)
def add_user(self, user):
"""add a user to the datastore"""
self._users.add(user)
def delete_user(self, user):
"""removes a user from the datastore"""
self._users.remove(user)
def get_user(self, reference):
"""retrieve a user from the datastore"""
return next(o for o in self._users if o.id == reference)
def get_user_by_username(self, reference):
"""retrieve a user from the datastore by username"""
return next(o for o in self._users if o.username == reference)
def get_user_by_email(self, reference):
"""retrieve a user from the datastore by email"""
return next(o for o in self._users if o.email == reference)
def search_vendor(self, reference):
"""search for a vendor by a canonical search term"""
try:
return next(v for v in self._vendors if v.term == reference)
except StopIteration:
return None
def get_vendor_aggregates(self, reference):
"""list a all canonical names of vendors"""
vendors = [v for v in self._vendors if v.name == reference]
terms = sorted((v.term for v in vendors), key=lambda x: x.lower())
first_vendor = next(iter(vendors))
return VendorAggregate(first_vendor.name, terms)
def update_vendors(self, old_vendor, new_name, new_terms):
"""update autocorrect values of vendors"""
vendors_to_delete = {
v for v in self._vendors if v.name == old_vendor.name
}
terms_to_delete = {v for v in self._vendors if v.term in new_terms}
self._vendors = self._vendors - vendors_to_delete - terms_to_delete
for new_term in new_terms:
self._vendors.add(Vendor(new_term, new_name))
def add_reset_token(self, token):
"""add an password reset token"""
self._tokens.add(token)
def delete_reset_token(self, token):
"""deletes a password reset token"""
self._tokens.remove(token)
def get_reset_token(self, reference):
"""add an password reset token"""
return next(t for t in self._tokens if t.token == reference)
def clear_stale_reset_tokens(self):
"""removes invalid reset tokens"""
now = datetime.utcnow()
self._tokens = {t for t in self._tokens if t.valid_until > now}
class FakePasslibContext:
def __init__(self, needs_update):
self.needs_update = needs_update
def verify_and_update(self, password, hash):
if password != hash:
return False, None
if self.needs_update:
return True, password[::-1]
return True, None
class FakeEventQueue(list):
def emit(self, event):
self.append(event)
@pytest.fixture
def prefilled_repo():
from itertools import count
from ordr3.models import OrderItem, OrderStatus, Vendor
i = count()
catalog = {1: "Ethanol", 2: "Aceton", 3: "NaOH", 4: "Coffee", 5: "Water"}
def _create_order(item, date, status):
order = OrderItem(next(i), catalog[item], item, "", "", "", "", "")
order.created_on = date
order.status = status
return order
month = timedelta(days=30)
today = datetime.now()
repo = FakeOrderRepository(session=None)
# should be consumables
repo.add_order(_create_order(1, today - month * 1, OrderStatus.ORDERED))
repo.add_order(_create_order(1, today - month * 2, OrderStatus.ORDERED))
repo.add_order(_create_order(1, today - month * 3, OrderStatus.COMPLETED))
repo.add_order(_create_order(2, today - month * 1, OrderStatus.ORDERED))
repo.add_order(_create_order(2, today - month * 2, OrderStatus.ORDERED))
repo.add_order(_create_order(2, today - month * 3, OrderStatus.COMPLETED))
# no consumable, only two repeats
repo.add_order(_create_order(3, today - month * 1, OrderStatus.ORDERED))
repo.add_order(_create_order(3, today - month * 2, OrderStatus.ORDERED))
# no consumable, only two repeats in the last two years
repo.add_order(_create_order(4, today - month * 1, OrderStatus.ORDERED))
repo.add_order(_create_order(4, today - month * 2, OrderStatus.ORDERED))
repo.add_order(_create_order(4, today - month * 50, OrderStatus.ORDERED))
# no consumable, one order on hold
repo.add_order(_create_order(5, today - month * 1, OrderStatus.ORDERED))
repo.add_order(_create_order(5, today - month * 2, OrderStatus.ORDERED))
repo.add_order(_create_order(5, today - month * 3, OrderStatus.HOLD))
repo._vendors.add(Vendor("sa", "Sigma Aldrich"))
return repo
def test_service_find_consumables(prefilled_repo):
from ordr3.services import find_consumables
result = find_consumables(prefilled_repo)
assert len(result) == 2
assert [o.id for o in result] == [3, 0]
def test_create_log_entry_noteworthy(prefilled_repo):
from ordr3.models import OrderStatus, User
from ordr3.services import create_log_entry
order = prefilled_repo.get_order(1)
user_1 = User(*list("ABCDEFG"))
user_2 = User(*list("AXCDEFG"))
# first log entry, order.created_by is automatically set
create_log_entry(order, OrderStatus.OPEN, user_1)
# second log entry
result = create_log_entry(order, OrderStatus.APPROVAL, user_2)
assert result is True
assert len(order.log) == 2
first_entry, second_entry = order.log
assert first_entry.order_id == order.id
assert first_entry.status == OrderStatus.OPEN
assert first_entry.by == "B"
assert isinstance(first_entry.date, datetime)
assert order.created_by == first_entry.by
assert order.created_on == first_entry.date
assert order.status == second_entry.status
def test_create_log_entry_not_noteworthy_same_user(prefilled_repo):
from ordr3.models import OrderStatus, User
from ordr3.services import create_log_entry
order = prefilled_repo.get_order(1)
user = User(*list("ABCDEFG"))
order.created_by = user.username
result = create_log_entry(order, OrderStatus.APPROVAL, user)
assert result is False
def test_create_log_entry_not_noteworthy_same_status(prefilled_repo):
from ordr3.models import OrderStatus, User
from ordr3.services import create_log_entry
order = prefilled_repo.get_order(1)
user = User(*list("ABCDEFG"))
result = create_log_entry(order, OrderStatus.ORDERED, user)
assert result is False
@pytest.mark.parametrize(
"update,new_hash",
[(False, "1234"), (True, "4321")],
)
def test_verify_username_and_password_valid(update, new_hash):
from ordr3.models import User, UserRole
from ordr3.services import verify_credentials
user = User(2, "Me", "Jane", "Doe", "jane.doe", "1234", UserRole.USER)
repo = FakeOrderRepository(None)
repo.add_user(user)
pass_ctx = FakePasslibContext(update)
user = verify_credentials(repo, pass_ctx, user.username, user.password)
assert user is not None
assert user.password == new_hash
@pytest.mark.parametrize(
"name,pwd",
[("You", "1234"), ("Me", "abcd")],
)
def test_verify_username_and_password_invalid(name, pwd):
from ordr3.models import User, UserRole
from ordr3.services import verify_credentials
user = User(2, "Me", "Jane", "Doe", "jane.doe", "1234", UserRole.USER)
repo = FakeOrderRepository(None)
repo.add_user(user)
pass_ctx = FakePasslibContext(False)
assert verify_credentials(repo, pass_ctx, name, pwd) is None
@pytest.mark.parametrize(
"input,expected",
[("no url", "no url"), ("http://company.com/path", "company.com")],
)
def test_vendor_from_url(input, expected):
from ordr3.services import _vendor_from_url
assert _vendor_from_url(input) == expected
@pytest.mark.parametrize(
"input,expected",
[
("no domain", "no domain"),
("company.de", "company"),
("company.com", "company"),
("company.eu", "company"),
("company.co.uk", "company.co.uk"),
],
)
def test_vendor_with_common_domain(input, expected):
from ordr3.services import _vendor_with_common_domain
assert _vendor_with_common_domain(input) == expected
@pytest.mark.parametrize(
"input,name,found",
[
("Happy Company", "Happy Company", False),
("Company Cleanup\n", "Company Cleanup", False),
("http://url-company.it/", "url-company.it", False),
("http://url-company.de/", "url-company", False),
("domain.eu", "domain", False),
("sa", "Sigma Aldrich", True),
("http://SA.com/some/path", "Sigma Aldrich", True),
],
)
def test_check_vendor_name(prefilled_repo, input, name, found):
from ordr3.services import check_vendor_name
result = check_vendor_name(prefilled_repo, input)
assert result.name == name
assert result.found == found
def test__check_have_i_been_pwned_ok():
from ordr3.services import _check_have_i_been_pwned
queue = FakeEventQueue()
assert not _check_have_i_been_pwned("21BD2x", queue)
assert len(queue) == 0
def test__check_have_i_been_pwned_not_ok():
from ordr3.services import _check_have_i_been_pwned
queue = FakeEventQueue()
assert _check_have_i_been_pwned(
"21BD2008F2FF3F9F3AE0A2072D19CD17E971B33A", queue
)
assert len(queue) == 1
assert queue[0].text.startswith("This password appears in a breach")
def test__check_have_i_been_pwned_request_exception():
from ordr3.services import _check_have_i_been_pwned
queue = FakeEventQueue()
assert not _check_have_i_been_pwned("xxxxx", queue)
assert len(queue) == 0
def test_set_new_password_ok(monkeypatch):
from ordr3 import services
from ordr3.models import User
from ordr3.security import get_passlib_context
user = User(*list("ABCDEFG"))
queue = FakeEventQueue()
monkeypatch.setattr(
services, "_check_have_i_been_pwned", lambda x, y: False
)
result = services.set_new_password(user, "1234567890123456", queue)
assert result
assert get_passlib_context().verify("1234567890123456", user.password)
assert len(queue) == 0
def test_set_new_password_to_short(monkeypatch):
from ordr3 import services
from ordr3.models import User
from ordr3.security import get_passlib_context
user = User(*list("ABCDEFG"))
queue = FakeEventQueue()
monkeypatch.setattr(
services, "_check_have_i_been_pwned", lambda x, y: False
)
result = services.set_new_password(user, "1", queue)
assert not result
assert get_passlib_context().verify("1", user.password)
assert len(queue) == 1
assert queue[0].text.startswith("Your password is quite short")
def test_set_new_password_breached(monkeypatch):
from ordr3 import services
from ordr3.models import User
from ordr3.security import get_passlib_context
user = User(*list("ABCDEFG"))
queue = FakeEventQueue()
monkeypatch.setattr(services, "_check_have_i_been_pwned", lambda x, y: True)
result = services.set_new_password(user, "1234567890123456", queue)
assert not result
assert get_passlib_context().verify("1234567890123456", user.password)
assert len(queue) == 0 # no item in que due to monkeypatch
def test_set_new_password_to_short_and_breached(monkeypatch):
from ordr3 import services
from ordr3.models import User
from ordr3.security import get_passlib_context
user = User(*list("ABCDEFG"))
queue = FakeEventQueue()
monkeypatch.setattr(services, "_check_have_i_been_pwned", lambda x, y: True)
result = services.set_new_password(user, "1", queue)
assert not result
assert get_passlib_context().verify("1", user.password)
assert len(queue) == 1 # only one item in que due to monkeypatch
assert queue[0].text.startswith("Your password is quite short")
def test_get_user_from_reset_token_ok():
from ordr3 import services
from ordr3.models import PasswordResetToken, User
repo = FakeOrderRepository(None)
user = User(*list("ABCDEFG"))
repo.add_user(user)
token = PasswordResetToken("identifier", "A")
repo.add_reset_token(token)
result = services.get_user_from_reset_token(repo, "identifier")
assert result == user
def test_get_user_from_reset_token_wrong_token():
from ordr3 import services
from ordr3.models import PasswordResetToken, User
repo = FakeOrderRepository(None)
user = User(*list("ABCDEFG"))
repo.add_user(user)
token = PasswordResetToken("identifier", "A")
repo.add_reset_token(token)
result = services.get_user_from_reset_token(repo, "wrong identifier")
assert result is None
def test_get_user_from_reset_token_invalid_token():
from datetime import datetime, timedelta
from ordr3 import services
from ordr3.models import PasswordResetToken, User
repo = FakeOrderRepository(None)
user = User(*list("ABCDEFG"))
repo.add_user(user)
valid_until = datetime.now() - timedelta(hours=2)
token = PasswordResetToken("identifier", "A", valid_until)
repo.add_reset_token(token)
result = services.get_user_from_reset_token(repo, "identifier")
assert result is None
def test_get_user_from_reset_token_unknown_user():
from ordr3 import services
from ordr3.models import PasswordResetToken, User
repo = FakeOrderRepository(None)
user = User(*list("ABCDEFG"))
repo.add_user(user)
token = PasswordResetToken("identifier", "B")
repo.add_reset_token(token)
result = services.get_user_from_reset_token(repo, "identifier")
assert result is None
def test_create_token_for_user():
from ordr3.models import PasswordResetToken, User
from ordr3.services import create_token_for_user
repo = FakeOrderRepository(None)
user = User(*list("ABCDEFG"))
result = create_token_for_user(repo, user)
assert isinstance(result, PasswordResetToken)
assert result.user_id == "A"
assert repo._tokens == {result}