diff --git a/ordr3/__init__.py b/ordr3/__init__.py
index 45a50f4..c923877 100644
--- a/ordr3/__init__.py
+++ b/ordr3/__init__.py
@@ -28,6 +28,7 @@ def main(global_config, **settings):
config.set_root_factory(resources.Root)
config.include(".adapters")
+ config.include(".events")
config.include(".resources")
config.include(".routes")
config.include(".security")
diff --git a/ordr3/events.py b/ordr3/events.py
new file mode 100644
index 0000000..9675e34
--- /dev/null
+++ b/ordr3/events.py
@@ -0,0 +1,47 @@
+from pyramid.events import subscriber
+
+
+class Ordr3Event:
+ def __init__(self):
+ self.request = None
+
+
+class FlashMessage(Ordr3Event):
+ def __init__(self, channel, text, more):
+ super().__init__()
+ self.channel = channel
+ self.text = text
+ self.more = more
+
+ @classmethod
+ def info(cls, text, more=""):
+ return cls("info", text, more)
+
+ @classmethod
+ def warning(cls, text, more=""):
+ return cls("warning", text, more)
+
+ @classmethod
+ def error(cls, text, more=""):
+ return cls("error", text, more)
+
+
+@subscriber(FlashMessage)
+def handle_flash_message(message):
+ if message.request is None:
+ return
+ session = message.request.session
+ session.flash(message, message.channel, allow_duplicate=False)
+
+
+def emit(request, event):
+ event.request = request
+ request.registry.notify(event)
+
+
+def includeme(config):
+ """ initializing Events sytem for the Pyramid app
+
+ Activate this setup using ``config.include('ordr2.events')``.
+ """
+ config.add_request_method(emit, "emit")
diff --git a/ordr3/models.py b/ordr3/models.py
index 22908f3..21b33d3 100644
--- a/ordr3/models.py
+++ b/ordr3/models.py
@@ -1,8 +1,5 @@
import enum
from datetime import datetime
-from collections import namedtuple
-
-FlashMessage = namedtuple("FlashMessage", ["message", "description"])
@enum.unique
diff --git a/ordr3/services.py b/ordr3/services.py
index 2ceb2bf..191ad4b 100644
--- a/ordr3/services.py
+++ b/ordr3/services.py
@@ -5,7 +5,7 @@ from urllib.parse import urlparse
import requests
-from . import models, security
+from . import events, models, security
CONSUMABLE_STATI = {
models.OrderStatus.ORDERED,
@@ -13,6 +13,24 @@ CONSUMABLE_STATI = {
}
+MSG_SHORT_PASSWORD = events.FlashMessage.warning(
+ f"Your password is quite short.",
+ (
+ 'Have a look at '
+ "this comic strip why longer passwords are better."
+ ),
+)
+
+MSG_PWNED_PASSWORD = events.FlashMessage.warning(
+ "This password appears in a breach or has been compromised.",
+ (
+ "Please consider changing your password. "
+ 'Read this page on why you '
+ "see this message."
+ ),
+)
+
+
def find_consumables(repo, repeat=3, days=365 * 2):
""" search for orders that are requested often """
unsorted = _find_consumables(repo, repeat, days)
@@ -90,43 +108,31 @@ def check_vendor_name(repo, to_check):
return CheckVendorResult(vendor, True)
-def set_new_password(user, password):
+def set_new_password(user, password, event_queue):
crypt_context = security.get_passlib_context()
user.password = crypt_context.hash(password)
- messages = []
-
- pwd_length = len(password)
- if pwd_length < 16:
- messages.append(
- models.FlashMessage(
- f"Your password is quite short.",
- (
- f"Your Password has only {pwd_length} characters. "
- 'Have a look at '
- "this comic strip why longer passwords are better."
- ),
- )
- )
+ length_ok = len(password) >= 16
+ if not length_ok:
+ event_queue.emit(MSG_SHORT_PASSWORD)
+
+ return length_ok and not check_have_i_been_pwned(password, event_queue)
+
+
+def check_have_i_been_pwned(password, event_queue):
+ """ public function for checking haveibeenpwned
+
+ this is just a small shim to eas testing"""
+ password_hash = hashlib.sha1(password.encode()).hexdigest()
+ return _check_have_i_been_pwned(password_hash, event_queue)
- sha1_hash = hashlib.sha1(password.encode()).hexdigest()
- if have_i_been_pwned(sha1_hash):
-
- messages.append(
- models.FlashMessage(
- "This password appears in a breach or has been compromised.",
- (
- "Please consider changing your password. "
- 'Read this page on why you '
- "see this message."
- ),
- )
- )
- return messages
+def _check_have_i_been_pwned(password_hash, event_queue):
+ """ chech haveibeenpwned for password leak
+ This function does the heavy lifting for check_have_i_been_pwned()
+ """
-def have_i_been_pwned(password_hash):
head, tails = password_hash[:5], password_hash[5:].lower()
url = f"https://api.pwnedpasswords.com/range/{head}"
headers = {"Add-Padding": "true"}
@@ -137,6 +143,8 @@ def have_i_been_pwned(password_hash):
return False
lower_case_lines = (line.lower() for line in response.text.splitlines())
for line in lower_case_lines:
+ print(tails, line, line.startswith(tails))
if line.startswith(tails):
+ event_queue.emit(MSG_PWNED_PASSWORD)
return True
return False
diff --git a/ordr3/views/__init__.py b/ordr3/views/__init__.py
index b987ab8..8afae8c 100644
--- a/ordr3/views/__init__.py
+++ b/ordr3/views/__init__.py
@@ -4,20 +4,8 @@ some view helpers are defined here
"""
-from .. import models
-
-
-def flash(request, channel, message, description=""):
- """ small wrapper around request.session.flash """
- msg = models.FlashMessage(message, description)
- request.session.flash(msg, channel, allow_duplicate=False)
-
-
def includeme(config):
""" adding request helpers
Activate this setup using ``config.include('ordr3.views')``.
"""
- # this allows to use the request object like this:
- # request.flash(channel, message, description)
- config.add_request_method(flash, "flash")
diff --git a/ordr3/views/account.py b/ordr3/views/account.py
index cfdb2fd..6ede2eb 100644
--- a/ordr3/views/account.py
+++ b/ordr3/views/account.py
@@ -94,12 +94,9 @@ def register_new_user(context, request):
email=appstruct["email"],
role=models.UserRole.NEW,
)
- warnings = services.set_new_password(account, appstruct["password"])
+ services.set_new_password(account, appstruct["password"])
request.repo.add_user(account)
- for message in warnings:
- request.flash("warning", message.message, message.description)
-
return HTTPFound(request.resource_path(request.root, "registered"))
diff --git a/tests/test_services.py b/tests/test_services.py
index 82d1b28..2270b58 100644
--- a/tests/test_services.py
+++ b/tests/test_services.py
@@ -63,6 +63,11 @@ class FakePasslibContext:
return True, None
+class FakeEventQueue(list):
+ def emit(self, event):
+ self.append(event)
+
+
@pytest.fixture
def prefilled_repo():
from itertools import count
@@ -218,22 +223,34 @@ def test_check_vendor_name(input, name, found):
assert result.found == found
-def test_have_i_been_pwned_ok():
- from ordr3.services import have_i_been_pwned
+def test__check_have_i_been_pwned_ok():
+ from ordr3.services import _check_have_i_been_pwned
+
+ queue = FakeEventQueue()
+
+ assert not _check_have_i_been_pwned("21BD2x", queue)
+ assert len(queue) == 0
- assert not have_i_been_pwned("21BD2x")
+def test__check_have_i_been_pwned_not_ok():
+ from ordr3.services import _check_have_i_been_pwned
-def test_have_i_been_pwned_not_ok():
- from ordr3.services import have_i_been_pwned
+ queue = FakeEventQueue()
- assert have_i_been_pwned("21BD2008F2FF3F9F3AE0A2072D19CD17E971B33A")
+ assert _check_have_i_been_pwned(
+ "21BD2008F2FF3F9F3AE0A2072D19CD17E971B33A", queue
+ )
+ assert len(queue) == 1
+ assert queue[0].text.startswith("This password appears in a breach")
-def test_have_i_been_pwned_request_exception():
- from ordr3.services import have_i_been_pwned
+def test__check_have_i_been_pwned_request_exception():
+ from ordr3.services import _check_have_i_been_pwned
- assert not have_i_been_pwned("xxxxx")
+ queue = FakeEventQueue()
+
+ assert not _check_have_i_been_pwned("xxxxx", queue)
+ assert len(queue) == 0
def test_set_new_password_ok(monkeypatch):
@@ -242,11 +259,16 @@ def test_set_new_password_ok(monkeypatch):
from ordr3.security import get_passlib_context
user = User(*list("ABCDEFG"))
- monkeypatch.setattr(services, "have_i_been_pwned", lambda x: False)
- result = services.set_new_password(user, "1234567890123456")
+ queue = FakeEventQueue()
+ monkeypatch.setattr(
+ services, "_check_have_i_been_pwned", lambda x, y: False
+ )
+
+ result = services.set_new_password(user, "1234567890123456", queue)
+ assert result
assert get_passlib_context().verify("1234567890123456", user.password)
- assert result == []
+ assert len(queue) == 0
def test_set_new_password_to_short(monkeypatch):
@@ -255,12 +277,17 @@ def test_set_new_password_to_short(monkeypatch):
from ordr3.security import get_passlib_context
user = User(*list("ABCDEFG"))
- monkeypatch.setattr(services, "have_i_been_pwned", lambda x: False)
- result = services.set_new_password(user, "1")
+ queue = FakeEventQueue()
+ monkeypatch.setattr(
+ services, "_check_have_i_been_pwned", lambda x, y: False
+ )
+ result = services.set_new_password(user, "1", queue)
+
+ assert not result
assert get_passlib_context().verify("1", user.password)
- assert len(result) == 1
- assert result[0].message.startswith("Your password is quite short")
+ assert len(queue) == 1
+ assert queue[0].text.startswith("Your password is quite short")
def test_set_new_password_breached(monkeypatch):
@@ -269,12 +296,15 @@ def test_set_new_password_breached(monkeypatch):
from ordr3.security import get_passlib_context
user = User(*list("ABCDEFG"))
- monkeypatch.setattr(services, "have_i_been_pwned", lambda x: True)
- result = services.set_new_password(user, "1234567890123456")
+ queue = FakeEventQueue()
+ monkeypatch.setattr(
+ services, "_check_have_i_been_pwned", lambda x, y: True
+ )
+ result = services.set_new_password(user, "1234567890123456", queue)
+ assert not result
assert get_passlib_context().verify("1234567890123456", user.password)
- assert len(result) == 1
- assert result[0].message.startswith("This password appears in a breach")
+ assert len(queue) == 0 # no item in que due to monkeypatch
def test_set_new_password_to_short_and_breached(monkeypatch):
@@ -283,10 +313,13 @@ def test_set_new_password_to_short_and_breached(monkeypatch):
from ordr3.security import get_passlib_context
user = User(*list("ABCDEFG"))
- monkeypatch.setattr(services, "have_i_been_pwned", lambda x: True)
- result = services.set_new_password(user, "1")
+ queue = FakeEventQueue()
+ monkeypatch.setattr(
+ services, "_check_have_i_been_pwned", lambda x, y: True
+ )
+ result = services.set_new_password(user, "1", queue)
+ assert not result
assert get_passlib_context().verify("1", user.password)
- assert len(result) == 2
- assert result[0].message.startswith("Your password is quite short")
- assert result[1].message.startswith("This password appears in a breach")
+ assert len(queue) == 1 # only one item in que due to monkeypatch
+ assert queue[0].text.startswith("Your password is quite short")