You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
470 lines
15 KiB
470 lines
15 KiB
from datetime import datetime, timedelta |
|
|
|
import pytest |
|
|
|
from ordr3.models import Vendor, VendorAggregate |
|
from ordr3.repo import AbstractOrderRepository |
|
|
|
|
|
class FakeOrderRepository(AbstractOrderRepository): |
|
"""Repository implementation for testing""" |
|
|
|
def __init__(self, session): # noqa: ARG002 |
|
self._orders = set() |
|
self._users = set() |
|
self._vendors = set() |
|
self._tokens = set() |
|
|
|
def add_order(self, order): |
|
"""add an order to the datastore""" |
|
self._orders.add(order) |
|
|
|
def get_order(self, reference): |
|
"""retrieve an order from the datastore""" |
|
return next(o for o in self._orders if o.id == reference) |
|
|
|
def delete_order(self, order): |
|
"""removes a user from the datastore""" |
|
self._orders.remove(order) |
|
|
|
def list_consumable_candidates(self, limit_date, statuses): |
|
"""list orders, sorted by descending creation date""" |
|
newer_orders = (o for o in self._orders if o.created_on > limit_date) |
|
valid_orders = (o for o in newer_orders if o.status in statuses) |
|
return sorted(valid_orders, reverse=True, key=lambda x: x.created_on) |
|
|
|
def add_user(self, user): |
|
"""add a user to the datastore""" |
|
self._users.add(user) |
|
|
|
def delete_user(self, user): |
|
"""removes a user from the datastore""" |
|
self._users.remove(user) |
|
|
|
def get_user(self, reference): |
|
"""retrieve a user from the datastore""" |
|
return next(o for o in self._users if o.id == reference) |
|
|
|
def get_user_by_username(self, reference): |
|
"""retrieve a user from the datastore by username""" |
|
return next(o for o in self._users if o.username == reference) |
|
|
|
def get_user_by_email(self, reference): |
|
"""retrieve a user from the datastore by email""" |
|
return next(o for o in self._users if o.email == reference) |
|
|
|
def search_vendor(self, reference): |
|
"""search for a vendor by a canonical search term""" |
|
try: |
|
return next(v for v in self._vendors if v.term == reference) |
|
except StopIteration: |
|
return None |
|
|
|
def get_vendor_aggregates(self, reference): |
|
"""list a all canonical names of vendors""" |
|
vendors = [v for v in self._vendors if v.name == reference] |
|
terms = sorted((v.term for v in vendors), key=lambda x: x.lower()) |
|
first_vendor = next(iter(vendors)) |
|
return VendorAggregate(first_vendor.name, terms) |
|
|
|
def update_vendors(self, old_vendor, new_name, new_terms): |
|
"""update autocorrect values of vendors""" |
|
vendors_to_delete = { |
|
v for v in self._vendors if v.name == old_vendor.name |
|
} |
|
terms_to_delete = {v for v in self._vendors if v.term in new_terms} |
|
self._vendors = self._vendors - vendors_to_delete - terms_to_delete |
|
for new_term in new_terms: |
|
self._vendors.add(Vendor(new_term, new_name)) |
|
|
|
def add_reset_token(self, token): |
|
"""add an password reset token""" |
|
self._tokens.add(token) |
|
|
|
def delete_reset_token(self, token): |
|
"""deletes a password reset token""" |
|
self._tokens.remove(token) |
|
|
|
def get_reset_token(self, reference): |
|
"""add an password reset token""" |
|
return next(t for t in self._tokens if t.token == reference) |
|
|
|
def clear_stale_reset_tokens(self): |
|
"""removes invalid reset tokens""" |
|
now = datetime.utcnow() |
|
self._tokens = {t for t in self._tokens if t.valid_until > now} |
|
|
|
|
|
class FakePasslibContext: |
|
def __init__(self, needs_update): |
|
self.needs_update = needs_update |
|
|
|
def verify_and_update(self, password, hash): |
|
if password != hash: |
|
return False, None |
|
if self.needs_update: |
|
return True, password[::-1] |
|
return True, None |
|
|
|
|
|
class FakeEventQueue(list): |
|
def emit(self, event): |
|
self.append(event) |
|
|
|
|
|
@pytest.fixture |
|
def prefilled_repo(): |
|
from itertools import count |
|
|
|
from ordr3.models import OrderItem, OrderStatus, Vendor |
|
|
|
i = count() |
|
catalog = {1: "Ethanol", 2: "Aceton", 3: "NaOH", 4: "Coffee", 5: "Water"} |
|
|
|
def _create_order(item, date, status): |
|
order = OrderItem(next(i), catalog[item], item, "", "", "", "", "") |
|
order.created_on = date |
|
order.status = status |
|
return order |
|
|
|
month = timedelta(days=30) |
|
today = datetime.now() |
|
|
|
repo = FakeOrderRepository(session=None) |
|
# should be consumables |
|
repo.add_order(_create_order(1, today - month * 1, OrderStatus.ORDERED)) |
|
repo.add_order(_create_order(1, today - month * 2, OrderStatus.ORDERED)) |
|
repo.add_order(_create_order(1, today - month * 3, OrderStatus.COMPLETED)) |
|
repo.add_order(_create_order(2, today - month * 1, OrderStatus.ORDERED)) |
|
repo.add_order(_create_order(2, today - month * 2, OrderStatus.ORDERED)) |
|
repo.add_order(_create_order(2, today - month * 3, OrderStatus.COMPLETED)) |
|
# no consumable, only two repeats |
|
repo.add_order(_create_order(3, today - month * 1, OrderStatus.ORDERED)) |
|
repo.add_order(_create_order(3, today - month * 2, OrderStatus.ORDERED)) |
|
# no consumable, only two repeats in the last two years |
|
repo.add_order(_create_order(4, today - month * 1, OrderStatus.ORDERED)) |
|
repo.add_order(_create_order(4, today - month * 2, OrderStatus.ORDERED)) |
|
repo.add_order(_create_order(4, today - month * 50, OrderStatus.ORDERED)) |
|
# no consumable, one order on hold |
|
repo.add_order(_create_order(5, today - month * 1, OrderStatus.ORDERED)) |
|
repo.add_order(_create_order(5, today - month * 2, OrderStatus.ORDERED)) |
|
repo.add_order(_create_order(5, today - month * 3, OrderStatus.HOLD)) |
|
|
|
repo._vendors.add(Vendor("sa", "Sigma Aldrich")) |
|
|
|
return repo |
|
|
|
|
|
def test_service_find_consumables(prefilled_repo): |
|
from ordr3.services import find_consumables |
|
|
|
result = find_consumables(prefilled_repo) |
|
|
|
assert len(result) == 2 |
|
assert [o.id for o in result] == [3, 0] |
|
|
|
|
|
def test_create_log_entry_noteworthy(prefilled_repo): |
|
from ordr3.models import OrderStatus, User |
|
from ordr3.services import create_log_entry |
|
|
|
order = prefilled_repo.get_order(1) |
|
user_1 = User(*list("ABCDEFG")) |
|
user_2 = User(*list("AXCDEFG")) |
|
|
|
# first log entry, order.created_by is automatically set |
|
create_log_entry(order, OrderStatus.OPEN, user_1) |
|
# second log entry |
|
result = create_log_entry(order, OrderStatus.APPROVAL, user_2) |
|
|
|
assert result is True |
|
assert len(order.log) == 2 |
|
first_entry, second_entry = order.log |
|
assert first_entry.order_id == order.id |
|
assert first_entry.status == OrderStatus.OPEN |
|
assert first_entry.by == "B" |
|
assert isinstance(first_entry.date, datetime) |
|
assert order.created_by == first_entry.by |
|
assert order.created_on == first_entry.date |
|
assert order.status == second_entry.status |
|
|
|
|
|
def test_create_log_entry_not_noteworthy_same_user(prefilled_repo): |
|
from ordr3.models import OrderStatus, User |
|
from ordr3.services import create_log_entry |
|
|
|
order = prefilled_repo.get_order(1) |
|
user = User(*list("ABCDEFG")) |
|
order.created_by = user.username |
|
|
|
result = create_log_entry(order, OrderStatus.APPROVAL, user) |
|
|
|
assert result is False |
|
|
|
|
|
def test_create_log_entry_not_noteworthy_same_status(prefilled_repo): |
|
from ordr3.models import OrderStatus, User |
|
from ordr3.services import create_log_entry |
|
|
|
order = prefilled_repo.get_order(1) |
|
user = User(*list("ABCDEFG")) |
|
|
|
result = create_log_entry(order, OrderStatus.ORDERED, user) |
|
|
|
assert result is False |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"update,new_hash", |
|
[(False, "1234"), (True, "4321")], |
|
) |
|
def test_verify_username_and_password_valid(update, new_hash): |
|
from ordr3.models import User, UserRole |
|
from ordr3.services import verify_credentials |
|
|
|
user = User(2, "Me", "Jane", "Doe", "jane.doe", "1234", UserRole.USER) |
|
repo = FakeOrderRepository(None) |
|
repo.add_user(user) |
|
pass_ctx = FakePasslibContext(update) |
|
|
|
user = verify_credentials(repo, pass_ctx, user.username, user.password) |
|
assert user is not None |
|
assert user.password == new_hash |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"name,pwd", |
|
[("You", "1234"), ("Me", "abcd")], |
|
) |
|
def test_verify_username_and_password_invalid(name, pwd): |
|
from ordr3.models import User, UserRole |
|
from ordr3.services import verify_credentials |
|
|
|
user = User(2, "Me", "Jane", "Doe", "jane.doe", "1234", UserRole.USER) |
|
repo = FakeOrderRepository(None) |
|
repo.add_user(user) |
|
pass_ctx = FakePasslibContext(False) |
|
|
|
assert verify_credentials(repo, pass_ctx, name, pwd) is None |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"input,expected", |
|
[("no url", "no url"), ("http://company.com/path", "company.com")], |
|
) |
|
def test_vendor_from_url(input, expected): |
|
from ordr3.services import _vendor_from_url |
|
|
|
assert _vendor_from_url(input) == expected |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"input,expected", |
|
[ |
|
("no domain", "no domain"), |
|
("company.de", "company"), |
|
("company.com", "company"), |
|
("company.eu", "company"), |
|
("company.co.uk", "company.co.uk"), |
|
], |
|
) |
|
def test_vendor_with_common_domain(input, expected): |
|
from ordr3.services import _vendor_with_common_domain |
|
|
|
assert _vendor_with_common_domain(input) == expected |
|
|
|
|
|
@pytest.mark.parametrize( |
|
"input,name,found", |
|
[ |
|
("Happy Company", "Happy Company", False), |
|
("Company Cleanup\n", "Company Cleanup", False), |
|
("http://url-company.it/", "url-company.it", False), |
|
("http://url-company.de/", "url-company", False), |
|
("domain.eu", "domain", False), |
|
("sa", "Sigma Aldrich", True), |
|
("http://SA.com/some/path", "Sigma Aldrich", True), |
|
], |
|
) |
|
def test_check_vendor_name(prefilled_repo, input, name, found): |
|
from ordr3.services import check_vendor_name |
|
|
|
result = check_vendor_name(prefilled_repo, input) |
|
|
|
assert result.name == name |
|
assert result.found == found |
|
|
|
|
|
def test__check_have_i_been_pwned_ok(): |
|
from ordr3.services import _check_have_i_been_pwned |
|
|
|
queue = FakeEventQueue() |
|
|
|
assert not _check_have_i_been_pwned("21BD2x", queue) |
|
assert len(queue) == 0 |
|
|
|
|
|
def test__check_have_i_been_pwned_not_ok(): |
|
from ordr3.services import _check_have_i_been_pwned |
|
|
|
queue = FakeEventQueue() |
|
|
|
assert _check_have_i_been_pwned( |
|
"21BD2008F2FF3F9F3AE0A2072D19CD17E971B33A", queue |
|
) |
|
assert len(queue) == 1 |
|
assert queue[0].text.startswith("This password appears in a breach") |
|
|
|
|
|
def test__check_have_i_been_pwned_request_exception(): |
|
from ordr3.services import _check_have_i_been_pwned |
|
|
|
queue = FakeEventQueue() |
|
|
|
assert not _check_have_i_been_pwned("xxxxx", queue) |
|
assert len(queue) == 0 |
|
|
|
|
|
def test_set_new_password_ok(monkeypatch): |
|
from ordr3 import services |
|
from ordr3.models import User |
|
from ordr3.security import get_passlib_context |
|
|
|
user = User(*list("ABCDEFG")) |
|
queue = FakeEventQueue() |
|
monkeypatch.setattr( |
|
services, "_check_have_i_been_pwned", lambda x, y: False |
|
) |
|
|
|
result = services.set_new_password(user, "1234567890123456", queue) |
|
|
|
assert result |
|
assert get_passlib_context().verify("1234567890123456", user.password) |
|
assert len(queue) == 0 |
|
|
|
|
|
def test_set_new_password_to_short(monkeypatch): |
|
from ordr3 import services |
|
from ordr3.models import User |
|
from ordr3.security import get_passlib_context |
|
|
|
user = User(*list("ABCDEFG")) |
|
queue = FakeEventQueue() |
|
monkeypatch.setattr( |
|
services, "_check_have_i_been_pwned", lambda x, y: False |
|
) |
|
|
|
result = services.set_new_password(user, "1", queue) |
|
|
|
assert not result |
|
assert get_passlib_context().verify("1", user.password) |
|
assert len(queue) == 1 |
|
assert queue[0].text.startswith("Your password is quite short") |
|
|
|
|
|
def test_set_new_password_breached(monkeypatch): |
|
from ordr3 import services |
|
from ordr3.models import User |
|
from ordr3.security import get_passlib_context |
|
|
|
user = User(*list("ABCDEFG")) |
|
queue = FakeEventQueue() |
|
monkeypatch.setattr(services, "_check_have_i_been_pwned", lambda x, y: True) |
|
result = services.set_new_password(user, "1234567890123456", queue) |
|
|
|
assert not result |
|
assert get_passlib_context().verify("1234567890123456", user.password) |
|
assert len(queue) == 0 # no item in que due to monkeypatch |
|
|
|
|
|
def test_set_new_password_to_short_and_breached(monkeypatch): |
|
from ordr3 import services |
|
from ordr3.models import User |
|
from ordr3.security import get_passlib_context |
|
|
|
user = User(*list("ABCDEFG")) |
|
queue = FakeEventQueue() |
|
monkeypatch.setattr(services, "_check_have_i_been_pwned", lambda x, y: True) |
|
result = services.set_new_password(user, "1", queue) |
|
|
|
assert not result |
|
assert get_passlib_context().verify("1", user.password) |
|
assert len(queue) == 1 # only one item in que due to monkeypatch |
|
assert queue[0].text.startswith("Your password is quite short") |
|
|
|
|
|
def test_get_user_from_reset_token_ok(): |
|
from ordr3 import services |
|
from ordr3.models import PasswordResetToken, User |
|
|
|
repo = FakeOrderRepository(None) |
|
user = User(*list("ABCDEFG")) |
|
repo.add_user(user) |
|
token = PasswordResetToken("identifier", "A") |
|
repo.add_reset_token(token) |
|
|
|
result = services.get_user_from_reset_token(repo, "identifier") |
|
|
|
assert result == user |
|
|
|
|
|
def test_get_user_from_reset_token_wrong_token(): |
|
from ordr3 import services |
|
from ordr3.models import PasswordResetToken, User |
|
|
|
repo = FakeOrderRepository(None) |
|
user = User(*list("ABCDEFG")) |
|
repo.add_user(user) |
|
token = PasswordResetToken("identifier", "A") |
|
repo.add_reset_token(token) |
|
|
|
result = services.get_user_from_reset_token(repo, "wrong identifier") |
|
|
|
assert result is None |
|
|
|
|
|
def test_get_user_from_reset_token_invalid_token(): |
|
from datetime import datetime, timedelta |
|
|
|
from ordr3 import services |
|
from ordr3.models import PasswordResetToken, User |
|
|
|
repo = FakeOrderRepository(None) |
|
user = User(*list("ABCDEFG")) |
|
repo.add_user(user) |
|
valid_until = datetime.now() - timedelta(hours=2) |
|
token = PasswordResetToken("identifier", "A", valid_until) |
|
repo.add_reset_token(token) |
|
|
|
result = services.get_user_from_reset_token(repo, "identifier") |
|
|
|
assert result is None |
|
|
|
|
|
def test_get_user_from_reset_token_unknown_user(): |
|
from ordr3 import services |
|
from ordr3.models import PasswordResetToken, User |
|
|
|
repo = FakeOrderRepository(None) |
|
user = User(*list("ABCDEFG")) |
|
repo.add_user(user) |
|
token = PasswordResetToken("identifier", "B") |
|
repo.add_reset_token(token) |
|
|
|
result = services.get_user_from_reset_token(repo, "identifier") |
|
|
|
assert result is None |
|
|
|
|
|
def test_create_token_for_user(): |
|
from ordr3.models import PasswordResetToken, User |
|
from ordr3.services import create_token_for_user |
|
|
|
repo = FakeOrderRepository(None) |
|
user = User(*list("ABCDEFG")) |
|
|
|
result = create_token_for_user(repo, user) |
|
|
|
assert isinstance(result, PasswordResetToken) |
|
assert result.user_id == "A" |
|
assert repo._tokens == {result}
|
|
|