from datetime import datetime, timedelta import pytest from ordr3.repo import AbstractOrderRepository class FakeOrderRepository(AbstractOrderRepository): """ Repository implementation for testing """ def __init__(self, session): self._orders = set() self._users = set() self._vendors = {"sa": "Sigma Aldrich"} self._tokens = set() def add_order(self, order): """ add an order to the datastore """ self._orders.add(order) def get_order(self, reference): """ retrieve an order from the datastore """ return next(o for o in self._orders if o.id == reference) def delete_order(self, order): """ removes a user from the datastore """ self._orders.remove(order) def list_orders(self): """ list orders, sorted by descending creation date """ return sorted(self._orders, reverse=True, key=lambda x: x.created_on) def add_user(self, user): """ add a user to the datastore """ self._users.add(user) def delete_user(self, user): """ removes a user from the datastore """ self._users.remove(user) def get_user(self, reference): """ retrieve a user from the datastore """ return next(o for o in self._users if o.id == reference) def get_user_by_username(self, reference): """ retrieve a user from the datastore by username """ return next(o for o in self._users if o.username == reference) def get_user_by_email(self, reference): """ retrieve a user from the datastore by email """ return next(o for o in self._users if o.email == reference) def list_users(self): """ list users, sorted by username """ return sorted(self._users, key=lambda x: x.username) def search_vendor(self, reference): """ search for a vendor by a canonical search term """ return self._vendors.get(reference, None) def add_reset_token(self, token): """ add an password reset token """ self._tokens.add(token) def delete_reset_token(self, token): """ deletes a password reset token """ self._tokens.remove(token) def get_reset_token(self, reference): """ add an password reset token """ return next(t for t in self._tokens if t.token == reference) def clear_stale_reset_tokens(self): """ removes invalid reset tokens """ now = datetime.utcnow() self._tokens = {t for t in self._tokens if t.valid_until > now} class FakePasslibContext: def __init__(self, needs_update): self.needs_update = needs_update def verify_and_update(self, password, hash): if password != hash: return False, None if self.needs_update: return True, password[::-1] else: return True, None class FakeEventQueue(list): def emit(self, event): self.append(event) @pytest.fixture def prefilled_repo(): from itertools import count from ordr3.models import OrderItem, OrderStatus i = count() catalog = { 1: "Ethanol", 2: "Aceton", 3: "NaOH", 4: "Coffee", 5: "Water", } def _create_order(item, date, status): order = OrderItem(next(i), catalog[item], item, "", "", "", "", "") order.created_on = date order.status = status return order month = timedelta(days=30) today = datetime.now() repo = FakeOrderRepository(session=None) # should be consumables repo.add_order(_create_order(1, today - month * 1, OrderStatus.ORDERED)) repo.add_order(_create_order(1, today - month * 2, OrderStatus.ORDERED)) repo.add_order(_create_order(1, today - month * 3, OrderStatus.COMPLETED)) repo.add_order(_create_order(2, today - month * 1, OrderStatus.ORDERED)) repo.add_order(_create_order(2, today - month * 2, OrderStatus.ORDERED)) repo.add_order(_create_order(2, today - month * 3, OrderStatus.COMPLETED)) # no consumable, only two repeats repo.add_order(_create_order(3, today - month * 1, OrderStatus.ORDERED)) repo.add_order(_create_order(3, today - month * 2, OrderStatus.ORDERED)) # no consumable, only two repeats in the last two years repo.add_order(_create_order(4, today - month * 1, OrderStatus.ORDERED)) repo.add_order(_create_order(4, today - month * 2, OrderStatus.ORDERED)) repo.add_order(_create_order(4, today - month * 50, OrderStatus.ORDERED)) # no consumable, one order on hold repo.add_order(_create_order(5, today - month * 1, OrderStatus.ORDERED)) repo.add_order(_create_order(5, today - month * 2, OrderStatus.ORDERED)) repo.add_order(_create_order(5, today - month * 3, OrderStatus.HOLD)) return repo def test_service_find_consumables(prefilled_repo): from ordr3.services import find_consumables result = find_consumables(prefilled_repo) assert len(result) == 2 assert [o.id for o in result] == [3, 0] def test_create_log_entry(prefilled_repo): from ordr3.services import create_log_entry from ordr3.models import OrderStatus, User order = prefilled_repo.get_order(1) user = User(*list("ABCDEFG")) create_log_entry(order, OrderStatus.APPROVAL, user) assert len(order.log) == 1 log_entry = order.log[0] assert log_entry.order_id == order.id assert log_entry.status == OrderStatus.APPROVAL assert log_entry.by == "B" assert isinstance(log_entry.date, datetime) assert order.status == log_entry.status assert order.created_by == log_entry.by assert order.created_on == log_entry.date @pytest.mark.parametrize( "update,new_hash", [(False, "1234"), (True, "4321"),], # noqa: E231 ) def test_verify_username_and_password_valid(update, new_hash): from ordr3.models import User, UserRole from ordr3.services import verify_credentials user = User(2, "Me", "Jane", "Doe", "jane.doe", "1234", UserRole.USER) repo = FakeOrderRepository(None) repo.add_user(user) pass_ctx = FakePasslibContext(update) user = verify_credentials(repo, pass_ctx, user.username, user.password) assert user is not None assert user.password == new_hash @pytest.mark.parametrize( "name,pwd", [("You", "1234"), ("Me", "abcd"),], # noqa: E231 ) def test_verify_username_and_password_invalid(name, pwd): from ordr3.models import User, UserRole from ordr3.services import verify_credentials user = User(2, "Me", "Jane", "Doe", "jane.doe", "1234", UserRole.USER) repo = FakeOrderRepository(None) repo.add_user(user) pass_ctx = FakePasslibContext(False) assert verify_credentials(repo, pass_ctx, name, pwd) is None @pytest.mark.parametrize( "input,expected", [("no url", "no url"), ("http://company.com/path", "company.com")], ) def test_vendor_from_url(input, expected): from ordr3.services import _vendor_from_url assert _vendor_from_url(input) == expected @pytest.mark.parametrize( "input,expected", [ ("no domain", "no domain"), ("company.de", "company"), ("company.com", "company"), ("company.eu", "company"), ("company.co.uk", "company.co.uk"), ], ) def test_vendor_with_common_domain(input, expected): from ordr3.services import _vendor_with_common_domain assert _vendor_with_common_domain(input) == expected @pytest.mark.parametrize( "input,name,found", [ ("Happy Company", "Happy Company", False), ("Company Cleanup\n", "Company Cleanup", False), ("http://url-company.it/", "url-company.it", False), ("http://url-company.de/", "url-company", False), ("domain.eu", "domain", False), ("sa", "Sigma Aldrich", True), ("http://SA.com/some/path", "Sigma Aldrich", True), ], ) def test_check_vendor_name(input, name, found): from ordr3.services import check_vendor_name repo = FakeOrderRepository(None) result = check_vendor_name(repo, input) assert result.name == name assert result.found == found def test__check_have_i_been_pwned_ok(): from ordr3.services import _check_have_i_been_pwned queue = FakeEventQueue() assert not _check_have_i_been_pwned("21BD2x", queue) assert len(queue) == 0 def test__check_have_i_been_pwned_not_ok(): from ordr3.services import _check_have_i_been_pwned queue = FakeEventQueue() assert _check_have_i_been_pwned( "21BD2008F2FF3F9F3AE0A2072D19CD17E971B33A", queue ) assert len(queue) == 1 assert queue[0].text.startswith("This password appears in a breach") def test__check_have_i_been_pwned_request_exception(): from ordr3.services import _check_have_i_been_pwned queue = FakeEventQueue() assert not _check_have_i_been_pwned("xxxxx", queue) assert len(queue) == 0 def test_set_new_password_ok(monkeypatch): from ordr3 import services from ordr3.models import User from ordr3.security import get_passlib_context user = User(*list("ABCDEFG")) queue = FakeEventQueue() monkeypatch.setattr( services, "_check_have_i_been_pwned", lambda x, y: False ) result = services.set_new_password(user, "1234567890123456", queue) assert result assert get_passlib_context().verify("1234567890123456", user.password) assert len(queue) == 0 def test_set_new_password_to_short(monkeypatch): from ordr3 import services from ordr3.models import User from ordr3.security import get_passlib_context user = User(*list("ABCDEFG")) queue = FakeEventQueue() monkeypatch.setattr( services, "_check_have_i_been_pwned", lambda x, y: False ) result = services.set_new_password(user, "1", queue) assert not result assert get_passlib_context().verify("1", user.password) assert len(queue) == 1 assert queue[0].text.startswith("Your password is quite short") def test_set_new_password_breached(monkeypatch): from ordr3 import services from ordr3.models import User from ordr3.security import get_passlib_context user = User(*list("ABCDEFG")) queue = FakeEventQueue() monkeypatch.setattr( services, "_check_have_i_been_pwned", lambda x, y: True ) result = services.set_new_password(user, "1234567890123456", queue) assert not result assert get_passlib_context().verify("1234567890123456", user.password) assert len(queue) == 0 # no item in que due to monkeypatch def test_set_new_password_to_short_and_breached(monkeypatch): from ordr3 import services from ordr3.models import User from ordr3.security import get_passlib_context user = User(*list("ABCDEFG")) queue = FakeEventQueue() monkeypatch.setattr( services, "_check_have_i_been_pwned", lambda x, y: True ) result = services.set_new_password(user, "1", queue) assert not result assert get_passlib_context().verify("1", user.password) assert len(queue) == 1 # only one item in que due to monkeypatch assert queue[0].text.startswith("Your password is quite short") def test_get_user_from_reset_token_ok(): from ordr3 import services from ordr3.models import PasswordResetToken, User repo = FakeOrderRepository(None) user = User(*list("ABCDEFG")) repo.add_user(user) token = PasswordResetToken("identifier", "A") repo.add_reset_token(token) result = services.get_user_from_reset_token(repo, "identifier") assert result == user def test_get_user_from_reset_token_wrong_token(): from ordr3 import services from ordr3.models import PasswordResetToken, User repo = FakeOrderRepository(None) user = User(*list("ABCDEFG")) repo.add_user(user) token = PasswordResetToken("identifier", "A") repo.add_reset_token(token) result = services.get_user_from_reset_token(repo, "wrong identifier") assert result is None def test_get_user_from_reset_token_invalid_token(): from ordr3 import services from ordr3.models import PasswordResetToken, User from datetime import datetime, timedelta repo = FakeOrderRepository(None) user = User(*list("ABCDEFG")) repo.add_user(user) valid_until = datetime.now() - timedelta(hours=2) token = PasswordResetToken("identifier", "A", valid_until) repo.add_reset_token(token) result = services.get_user_from_reset_token(repo, "identifier") assert result is None def test_get_user_from_reset_token_unknown_user(): from ordr3 import services from ordr3.models import PasswordResetToken, User repo = FakeOrderRepository(None) user = User(*list("ABCDEFG")) repo.add_user(user) token = PasswordResetToken("identifier", "B") repo.add_reset_token(token) result = services.get_user_from_reset_token(repo, "identifier") assert result is None def test_create_token_for_user(): from ordr3.services import create_token_for_user from ordr3.models import PasswordResetToken, User repo = FakeOrderRepository(None) user = User(*list("ABCDEFG")) result = create_token_for_user(repo, user) assert isinstance(result, PasswordResetToken) assert result.user_id == "A" assert repo._tokens == {result}