from datetime import datetime, timedelta import pytest from ordr3.repo import AbstractOrderRepository from ordr3.models import Vendor, VendorAggregate class FakeOrderRepository(AbstractOrderRepository): """ Repository implementation for testing """ def __init__(self, session): self._orders = set() self._users = set() self._vendors = set() self._tokens = set() def add_order(self, order): """ add an order to the datastore """ self._orders.add(order) def get_order(self, reference): """ retrieve an order from the datastore """ return next(o for o in self._orders if o.id == reference) def delete_order(self, order): """ removes a user from the datastore """ self._orders.remove(order) def list_consumable_candidates(self, limit_date, statuses): """ list orders, sorted by descending creation date """ newer_orders = (o for o in self._orders if o.created_on > limit_date) valid_orders = (o for o in newer_orders if o.status in statuses) return sorted(valid_orders, reverse=True, key=lambda x: x.created_on) def add_user(self, user): """ add a user to the datastore """ self._users.add(user) def delete_user(self, user): """ removes a user from the datastore """ self._users.remove(user) def get_user(self, reference): """ retrieve a user from the datastore """ return next(o for o in self._users if o.id == reference) def get_user_by_username(self, reference): """ retrieve a user from the datastore by username """ return next(o for o in self._users if o.username == reference) def get_user_by_email(self, reference): """ retrieve a user from the datastore by email """ return next(o for o in self._users if o.email == reference) def search_vendor(self, reference): """ search for a vendor by a canonical search term """ try: return next(v for v in self._vendors if v.term == reference) except StopIteration: return None def get_vendor_aggregates(self, reference): """ list a all canonical names of vendors """ vendors = [v for v in self._vendors if v.name == reference] terms = sorted((v.term for v in vendors), key=lambda x: x.lower()) first_vendor = next(iter(vendors)) return VendorAggregate(first_vendor.name, terms) def update_vendors(self, old_vendor, new_name, new_terms): """ update autocorrect values of vendors """ vendors_to_delete = { v for v in self._vendors if v.name == old_vendor.name } terms_to_delete = {v for v in self._vendors if v.term in new_terms} self._vendors = self._vendors - vendors_to_delete - terms_to_delete for new_term in new_terms: self._vendors.add(Vendor(new_term, new_name)) def add_reset_token(self, token): """ add an password reset token """ self._tokens.add(token) def delete_reset_token(self, token): """ deletes a password reset token """ self._tokens.remove(token) def get_reset_token(self, reference): """ add an password reset token """ return next(t for t in self._tokens if t.token == reference) def clear_stale_reset_tokens(self): """ removes invalid reset tokens """ now = datetime.utcnow() self._tokens = {t for t in self._tokens if t.valid_until > now} class FakePasslibContext: def __init__(self, needs_update): self.needs_update = needs_update def verify_and_update(self, password, hash): if password != hash: return False, None if self.needs_update: return True, password[::-1] else: return True, None class FakeEventQueue(list): def emit(self, event): self.append(event) @pytest.fixture def prefilled_repo(): from itertools import count from ordr3.models import OrderItem, OrderStatus, Vendor i = count() catalog = {1: "Ethanol", 2: "Aceton", 3: "NaOH", 4: "Coffee", 5: "Water"} def _create_order(item, date, status): order = OrderItem(next(i), catalog[item], item, "", "", "", "", "") order.created_on = date order.status = status return order month = timedelta(days=30) today = datetime.now() repo = FakeOrderRepository(session=None) # should be consumables repo.add_order(_create_order(1, today - month * 1, OrderStatus.ORDERED)) repo.add_order(_create_order(1, today - month * 2, OrderStatus.ORDERED)) repo.add_order(_create_order(1, today - month * 3, OrderStatus.COMPLETED)) repo.add_order(_create_order(2, today - month * 1, OrderStatus.ORDERED)) repo.add_order(_create_order(2, today - month * 2, OrderStatus.ORDERED)) repo.add_order(_create_order(2, today - month * 3, OrderStatus.COMPLETED)) # no consumable, only two repeats repo.add_order(_create_order(3, today - month * 1, OrderStatus.ORDERED)) repo.add_order(_create_order(3, today - month * 2, OrderStatus.ORDERED)) # no consumable, only two repeats in the last two years repo.add_order(_create_order(4, today - month * 1, OrderStatus.ORDERED)) repo.add_order(_create_order(4, today - month * 2, OrderStatus.ORDERED)) repo.add_order(_create_order(4, today - month * 50, OrderStatus.ORDERED)) # no consumable, one order on hold repo.add_order(_create_order(5, today - month * 1, OrderStatus.ORDERED)) repo.add_order(_create_order(5, today - month * 2, OrderStatus.ORDERED)) repo.add_order(_create_order(5, today - month * 3, OrderStatus.HOLD)) repo._vendors.add(Vendor("sa", "Sigma Aldrich")) return repo def test_service_find_consumables(prefilled_repo): from ordr3.services import find_consumables result = find_consumables(prefilled_repo) assert len(result) == 2 assert [o.id for o in result] == [3, 0] def test_create_log_entry_noteworthy(prefilled_repo): from ordr3.services import create_log_entry from ordr3.models import OrderStatus, User order = prefilled_repo.get_order(1) user_1 = User(*list("ABCDEFG")) user_2 = User(*list("AXCDEFG")) # first log entry, order.created_by is automatically set create_log_entry(order, OrderStatus.OPEN, user_1) # second log entry result = create_log_entry(order, OrderStatus.APPROVAL, user_2) assert result is True assert len(order.log) == 2 first_entry, second_entry = order.log assert first_entry.order_id == order.id assert first_entry.status == OrderStatus.OPEN assert first_entry.by == "B" assert isinstance(first_entry.date, datetime) assert order.created_by == first_entry.by assert order.created_on == first_entry.date assert order.status == second_entry.status def test_create_log_entry_not_noteworthy_same_user(prefilled_repo): from ordr3.services import create_log_entry from ordr3.models import OrderStatus, User order = prefilled_repo.get_order(1) user = User(*list("ABCDEFG")) order.created_by = user.username result = create_log_entry(order, OrderStatus.APPROVAL, user) assert result is False def test_create_log_entry_not_noteworthy_same_status(prefilled_repo): from ordr3.services import create_log_entry from ordr3.models import OrderStatus, User order = prefilled_repo.get_order(1) user = User(*list("ABCDEFG")) result = create_log_entry(order, OrderStatus.ORDERED, user) assert result is False @pytest.mark.parametrize( "update,new_hash", [(False, "1234"), (True, "4321")] # noqa: E231 ) def test_verify_username_and_password_valid(update, new_hash): from ordr3.models import User, UserRole from ordr3.services import verify_credentials user = User(2, "Me", "Jane", "Doe", "jane.doe", "1234", UserRole.USER) repo = FakeOrderRepository(None) repo.add_user(user) pass_ctx = FakePasslibContext(update) user = verify_credentials(repo, pass_ctx, user.username, user.password) assert user is not None assert user.password == new_hash @pytest.mark.parametrize( "name,pwd", [("You", "1234"), ("Me", "abcd")] # noqa: E231 ) def test_verify_username_and_password_invalid(name, pwd): from ordr3.models import User, UserRole from ordr3.services import verify_credentials user = User(2, "Me", "Jane", "Doe", "jane.doe", "1234", UserRole.USER) repo = FakeOrderRepository(None) repo.add_user(user) pass_ctx = FakePasslibContext(False) assert verify_credentials(repo, pass_ctx, name, pwd) is None @pytest.mark.parametrize( "input,expected", [("no url", "no url"), ("http://company.com/path", "company.com")], ) def test_vendor_from_url(input, expected): from ordr3.services import _vendor_from_url assert _vendor_from_url(input) == expected @pytest.mark.parametrize( "input,expected", [ ("no domain", "no domain"), ("company.de", "company"), ("company.com", "company"), ("company.eu", "company"), ("company.co.uk", "company.co.uk"), ], ) def test_vendor_with_common_domain(input, expected): from ordr3.services import _vendor_with_common_domain assert _vendor_with_common_domain(input) == expected @pytest.mark.parametrize( "input,name,found", [ ("Happy Company", "Happy Company", False), ("Company Cleanup\n", "Company Cleanup", False), ("http://url-company.it/", "url-company.it", False), ("http://url-company.de/", "url-company", False), ("domain.eu", "domain", False), ("sa", "Sigma Aldrich", True), ("http://SA.com/some/path", "Sigma Aldrich", True), ], ) def test_check_vendor_name(prefilled_repo, input, name, found): from ordr3.services import check_vendor_name result = check_vendor_name(prefilled_repo, input) print(prefilled_repo._vendors) assert result.name == name assert result.found == found def test__check_have_i_been_pwned_ok(): from ordr3.services import _check_have_i_been_pwned queue = FakeEventQueue() assert not _check_have_i_been_pwned("21BD2x", queue) assert len(queue) == 0 def test__check_have_i_been_pwned_not_ok(): from ordr3.services import _check_have_i_been_pwned queue = FakeEventQueue() assert _check_have_i_been_pwned( "21BD2008F2FF3F9F3AE0A2072D19CD17E971B33A", queue ) assert len(queue) == 1 assert queue[0].text.startswith("This password appears in a breach") def test__check_have_i_been_pwned_request_exception(): from ordr3.services import _check_have_i_been_pwned queue = FakeEventQueue() assert not _check_have_i_been_pwned("xxxxx", queue) assert len(queue) == 0 def test_set_new_password_ok(monkeypatch): from ordr3 import services from ordr3.models import User from ordr3.security import get_passlib_context user = User(*list("ABCDEFG")) queue = FakeEventQueue() monkeypatch.setattr( services, "_check_have_i_been_pwned", lambda x, y: False ) result = services.set_new_password(user, "1234567890123456", queue) assert result assert get_passlib_context().verify("1234567890123456", user.password) assert len(queue) == 0 def test_set_new_password_to_short(monkeypatch): from ordr3 import services from ordr3.models import User from ordr3.security import get_passlib_context user = User(*list("ABCDEFG")) queue = FakeEventQueue() monkeypatch.setattr( services, "_check_have_i_been_pwned", lambda x, y: False ) result = services.set_new_password(user, "1", queue) assert not result assert get_passlib_context().verify("1", user.password) assert len(queue) == 1 assert queue[0].text.startswith("Your password is quite short") def test_set_new_password_breached(monkeypatch): from ordr3 import services from ordr3.models import User from ordr3.security import get_passlib_context user = User(*list("ABCDEFG")) queue = FakeEventQueue() monkeypatch.setattr( services, "_check_have_i_been_pwned", lambda x, y: True ) result = services.set_new_password(user, "1234567890123456", queue) assert not result assert get_passlib_context().verify("1234567890123456", user.password) assert len(queue) == 0 # no item in que due to monkeypatch def test_set_new_password_to_short_and_breached(monkeypatch): from ordr3 import services from ordr3.models import User from ordr3.security import get_passlib_context user = User(*list("ABCDEFG")) queue = FakeEventQueue() monkeypatch.setattr( services, "_check_have_i_been_pwned", lambda x, y: True ) result = services.set_new_password(user, "1", queue) assert not result assert get_passlib_context().verify("1", user.password) assert len(queue) == 1 # only one item in que due to monkeypatch assert queue[0].text.startswith("Your password is quite short") def test_get_user_from_reset_token_ok(): from ordr3 import services from ordr3.models import PasswordResetToken, User repo = FakeOrderRepository(None) user = User(*list("ABCDEFG")) repo.add_user(user) token = PasswordResetToken("identifier", "A") repo.add_reset_token(token) result = services.get_user_from_reset_token(repo, "identifier") assert result == user def test_get_user_from_reset_token_wrong_token(): from ordr3 import services from ordr3.models import PasswordResetToken, User repo = FakeOrderRepository(None) user = User(*list("ABCDEFG")) repo.add_user(user) token = PasswordResetToken("identifier", "A") repo.add_reset_token(token) result = services.get_user_from_reset_token(repo, "wrong identifier") assert result is None def test_get_user_from_reset_token_invalid_token(): from ordr3 import services from ordr3.models import PasswordResetToken, User from datetime import datetime, timedelta repo = FakeOrderRepository(None) user = User(*list("ABCDEFG")) repo.add_user(user) valid_until = datetime.now() - timedelta(hours=2) token = PasswordResetToken("identifier", "A", valid_until) repo.add_reset_token(token) result = services.get_user_from_reset_token(repo, "identifier") assert result is None def test_get_user_from_reset_token_unknown_user(): from ordr3 import services from ordr3.models import PasswordResetToken, User repo = FakeOrderRepository(None) user = User(*list("ABCDEFG")) repo.add_user(user) token = PasswordResetToken("identifier", "B") repo.add_reset_token(token) result = services.get_user_from_reset_token(repo, "identifier") assert result is None def test_create_token_for_user(): from ordr3.services import create_token_for_user from ordr3.models import PasswordResetToken, User repo = FakeOrderRepository(None) user = User(*list("ABCDEFG")) result = create_token_for_user(repo, user) assert isinstance(result, PasswordResetToken) assert result.user_id == "A" assert repo._tokens == {result}