|
|
|
@ -12,6 +12,7 @@ class FakeOrderRepository(AbstractOrderRepository):
@@ -12,6 +12,7 @@ class FakeOrderRepository(AbstractOrderRepository):
|
|
|
|
|
self._orders = set() |
|
|
|
|
self._users = set() |
|
|
|
|
self._vendors = {"sa": "Sigma Aldrich"} |
|
|
|
|
self._tokens = set() |
|
|
|
|
|
|
|
|
|
def add_order(self, order): |
|
|
|
|
""" add an order to the datastore """ |
|
|
|
@ -49,6 +50,23 @@ class FakeOrderRepository(AbstractOrderRepository):
@@ -49,6 +50,23 @@ class FakeOrderRepository(AbstractOrderRepository):
|
|
|
|
|
""" search for a vendor by a canonical search term """ |
|
|
|
|
return self._vendors.get(reference, None) |
|
|
|
|
|
|
|
|
|
def add_reset_token(self, token): |
|
|
|
|
""" add an password reset token """ |
|
|
|
|
self._tokens.add(token) |
|
|
|
|
|
|
|
|
|
def delete_reset_token(self, token): |
|
|
|
|
""" deletes a password reset token """ |
|
|
|
|
self._tokens.remove(token) |
|
|
|
|
|
|
|
|
|
def get_reset_token(self, reference): |
|
|
|
|
""" add an password reset token """ |
|
|
|
|
return next(t for t in self._tokens if t.token == reference) |
|
|
|
|
|
|
|
|
|
def clean_stale_reset_tokens(self): |
|
|
|
|
""" removes invalid reset tokens """ |
|
|
|
|
now = datetime.utcnow() |
|
|
|
|
self._tokens = {t for t in self._tokens if t.valid_until > now} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FakePasslibContext: |
|
|
|
|
def __init__(self, needs_update): |
|
|
|
@ -323,3 +341,65 @@ def test_set_new_password_to_short_and_breached(monkeypatch):
@@ -323,3 +341,65 @@ def test_set_new_password_to_short_and_breached(monkeypatch):
|
|
|
|
|
assert get_passlib_context().verify("1", user.password) |
|
|
|
|
assert len(queue) == 1 # only one item in que due to monkeypatch |
|
|
|
|
assert queue[0].text.startswith("Your password is quite short") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_user_from_reset_token_ok(): |
|
|
|
|
from ordr3 import services |
|
|
|
|
from ordr3.models import PasswordResetToken, User |
|
|
|
|
|
|
|
|
|
repo = FakeOrderRepository(None) |
|
|
|
|
user = User(*list("ABCDEFG")) |
|
|
|
|
repo.add_user(user) |
|
|
|
|
token = PasswordResetToken("identifier", "A") |
|
|
|
|
repo.add_reset_token(token) |
|
|
|
|
|
|
|
|
|
result = services.get_user_from_reset_token(repo, "identifier") |
|
|
|
|
|
|
|
|
|
assert result == user |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_user_from_reset_token_wrong_token(): |
|
|
|
|
from ordr3 import services |
|
|
|
|
from ordr3.models import PasswordResetToken, User |
|
|
|
|
|
|
|
|
|
repo = FakeOrderRepository(None) |
|
|
|
|
user = User(*list("ABCDEFG")) |
|
|
|
|
repo.add_user(user) |
|
|
|
|
token = PasswordResetToken("identifier", "A") |
|
|
|
|
repo.add_reset_token(token) |
|
|
|
|
|
|
|
|
|
result = services.get_user_from_reset_token(repo, "wrong identifier") |
|
|
|
|
|
|
|
|
|
assert result is None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_user_from_reset_token_invalid_token(): |
|
|
|
|
from ordr3 import services |
|
|
|
|
from ordr3.models import PasswordResetToken, User |
|
|
|
|
from datetime import datetime, timedelta |
|
|
|
|
|
|
|
|
|
repo = FakeOrderRepository(None) |
|
|
|
|
user = User(*list("ABCDEFG")) |
|
|
|
|
repo.add_user(user) |
|
|
|
|
valid_until = datetime.now() - timedelta(hours=2) |
|
|
|
|
token = PasswordResetToken("identifier", "A", valid_until) |
|
|
|
|
repo.add_reset_token(token) |
|
|
|
|
|
|
|
|
|
result = services.get_user_from_reset_token(repo, "identifier") |
|
|
|
|
|
|
|
|
|
assert result is None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_user_from_reset_token_unknown_user(): |
|
|
|
|
from ordr3 import services |
|
|
|
|
from ordr3.models import PasswordResetToken, User |
|
|
|
|
|
|
|
|
|
repo = FakeOrderRepository(None) |
|
|
|
|
user = User(*list("ABCDEFG")) |
|
|
|
|
repo.add_user(user) |
|
|
|
|
token = PasswordResetToken("identifier", "B") |
|
|
|
|
repo.add_reset_token(token) |
|
|
|
|
|
|
|
|
|
result = services.get_user_from_reset_token(repo, "identifier") |
|
|
|
|
|
|
|
|
|
assert result is None |
|
|
|
|