From 84659986e6d45e665ec0c4acad87592b88ca96c4 Mon Sep 17 00:00:00 2001 From: Holger Frey Date: Fri, 3 Apr 2020 09:25:11 +0200 Subject: [PATCH] added events --- ordr3/__init__.py | 1 + ordr3/events.py | 47 +++++++++++++++++++++++ ordr3/models.py | 3 -- ordr3/services.py | 70 +++++++++++++++++++--------------- ordr3/views/__init__.py | 12 ------ ordr3/views/account.py | 5 +-- tests/test_services.py | 83 ++++++++++++++++++++++++++++------------- 7 files changed, 146 insertions(+), 75 deletions(-) create mode 100644 ordr3/events.py diff --git a/ordr3/__init__.py b/ordr3/__init__.py index 45a50f4..c923877 100644 --- a/ordr3/__init__.py +++ b/ordr3/__init__.py @@ -28,6 +28,7 @@ def main(global_config, **settings): config.set_root_factory(resources.Root) config.include(".adapters") + config.include(".events") config.include(".resources") config.include(".routes") config.include(".security") diff --git a/ordr3/events.py b/ordr3/events.py new file mode 100644 index 0000000..9675e34 --- /dev/null +++ b/ordr3/events.py @@ -0,0 +1,47 @@ +from pyramid.events import subscriber + + +class Ordr3Event: + def __init__(self): + self.request = None + + +class FlashMessage(Ordr3Event): + def __init__(self, channel, text, more): + super().__init__() + self.channel = channel + self.text = text + self.more = more + + @classmethod + def info(cls, text, more=""): + return cls("info", text, more) + + @classmethod + def warning(cls, text, more=""): + return cls("warning", text, more) + + @classmethod + def error(cls, text, more=""): + return cls("error", text, more) + + +@subscriber(FlashMessage) +def handle_flash_message(message): + if message.request is None: + return + session = message.request.session + session.flash(message, message.channel, allow_duplicate=False) + + +def emit(request, event): + event.request = request + request.registry.notify(event) + + +def includeme(config): + """ initializing Events sytem for the Pyramid app + + Activate this setup using ``config.include('ordr2.events')``. + """ + config.add_request_method(emit, "emit") diff --git a/ordr3/models.py b/ordr3/models.py index 22908f3..21b33d3 100644 --- a/ordr3/models.py +++ b/ordr3/models.py @@ -1,8 +1,5 @@ import enum from datetime import datetime -from collections import namedtuple - -FlashMessage = namedtuple("FlashMessage", ["message", "description"]) @enum.unique diff --git a/ordr3/services.py b/ordr3/services.py index 2ceb2bf..191ad4b 100644 --- a/ordr3/services.py +++ b/ordr3/services.py @@ -5,7 +5,7 @@ from urllib.parse import urlparse import requests -from . import models, security +from . import events, models, security CONSUMABLE_STATI = { models.OrderStatus.ORDERED, @@ -13,6 +13,24 @@ CONSUMABLE_STATI = { } +MSG_SHORT_PASSWORD = events.FlashMessage.warning( + f"Your password is quite short.", + ( + 'Have a look at ' + "this comic strip why longer passwords are better." + ), +) + +MSG_PWNED_PASSWORD = events.FlashMessage.warning( + "This password appears in a breach or has been compromised.", + ( + "Please consider changing your password. " + 'Read this page on why you ' + "see this message." + ), +) + + def find_consumables(repo, repeat=3, days=365 * 2): """ search for orders that are requested often """ unsorted = _find_consumables(repo, repeat, days) @@ -90,43 +108,31 @@ def check_vendor_name(repo, to_check): return CheckVendorResult(vendor, True) -def set_new_password(user, password): +def set_new_password(user, password, event_queue): crypt_context = security.get_passlib_context() user.password = crypt_context.hash(password) - messages = [] - - pwd_length = len(password) - if pwd_length < 16: - messages.append( - models.FlashMessage( - f"Your password is quite short.", - ( - f"Your Password has only {pwd_length} characters. " - 'Have a look at ' - "this comic strip why longer passwords are better." - ), - ) - ) + length_ok = len(password) >= 16 + if not length_ok: + event_queue.emit(MSG_SHORT_PASSWORD) + + return length_ok and not check_have_i_been_pwned(password, event_queue) + + +def check_have_i_been_pwned(password, event_queue): + """ public function for checking haveibeenpwned + + this is just a small shim to eas testing""" + password_hash = hashlib.sha1(password.encode()).hexdigest() + return _check_have_i_been_pwned(password_hash, event_queue) - sha1_hash = hashlib.sha1(password.encode()).hexdigest() - if have_i_been_pwned(sha1_hash): - - messages.append( - models.FlashMessage( - "This password appears in a breach or has been compromised.", - ( - "Please consider changing your password. " - 'Read this page on why you ' - "see this message." - ), - ) - ) - return messages +def _check_have_i_been_pwned(password_hash, event_queue): + """ chech haveibeenpwned for password leak + This function does the heavy lifting for check_have_i_been_pwned() + """ -def have_i_been_pwned(password_hash): head, tails = password_hash[:5], password_hash[5:].lower() url = f"https://api.pwnedpasswords.com/range/{head}" headers = {"Add-Padding": "true"} @@ -137,6 +143,8 @@ def have_i_been_pwned(password_hash): return False lower_case_lines = (line.lower() for line in response.text.splitlines()) for line in lower_case_lines: + print(tails, line, line.startswith(tails)) if line.startswith(tails): + event_queue.emit(MSG_PWNED_PASSWORD) return True return False diff --git a/ordr3/views/__init__.py b/ordr3/views/__init__.py index b987ab8..8afae8c 100644 --- a/ordr3/views/__init__.py +++ b/ordr3/views/__init__.py @@ -4,20 +4,8 @@ some view helpers are defined here """ -from .. import models - - -def flash(request, channel, message, description=""): - """ small wrapper around request.session.flash """ - msg = models.FlashMessage(message, description) - request.session.flash(msg, channel, allow_duplicate=False) - - def includeme(config): """ adding request helpers Activate this setup using ``config.include('ordr3.views')``. """ - # this allows to use the request object like this: - # request.flash(channel, message, description) - config.add_request_method(flash, "flash") diff --git a/ordr3/views/account.py b/ordr3/views/account.py index cfdb2fd..6ede2eb 100644 --- a/ordr3/views/account.py +++ b/ordr3/views/account.py @@ -94,12 +94,9 @@ def register_new_user(context, request): email=appstruct["email"], role=models.UserRole.NEW, ) - warnings = services.set_new_password(account, appstruct["password"]) + services.set_new_password(account, appstruct["password"]) request.repo.add_user(account) - for message in warnings: - request.flash("warning", message.message, message.description) - return HTTPFound(request.resource_path(request.root, "registered")) diff --git a/tests/test_services.py b/tests/test_services.py index 82d1b28..2270b58 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -63,6 +63,11 @@ class FakePasslibContext: return True, None +class FakeEventQueue(list): + def emit(self, event): + self.append(event) + + @pytest.fixture def prefilled_repo(): from itertools import count @@ -218,22 +223,34 @@ def test_check_vendor_name(input, name, found): assert result.found == found -def test_have_i_been_pwned_ok(): - from ordr3.services import have_i_been_pwned +def test__check_have_i_been_pwned_ok(): + from ordr3.services import _check_have_i_been_pwned + + queue = FakeEventQueue() + + assert not _check_have_i_been_pwned("21BD2x", queue) + assert len(queue) == 0 - assert not have_i_been_pwned("21BD2x") +def test__check_have_i_been_pwned_not_ok(): + from ordr3.services import _check_have_i_been_pwned -def test_have_i_been_pwned_not_ok(): - from ordr3.services import have_i_been_pwned + queue = FakeEventQueue() - assert have_i_been_pwned("21BD2008F2FF3F9F3AE0A2072D19CD17E971B33A") + assert _check_have_i_been_pwned( + "21BD2008F2FF3F9F3AE0A2072D19CD17E971B33A", queue + ) + assert len(queue) == 1 + assert queue[0].text.startswith("This password appears in a breach") -def test_have_i_been_pwned_request_exception(): - from ordr3.services import have_i_been_pwned +def test__check_have_i_been_pwned_request_exception(): + from ordr3.services import _check_have_i_been_pwned - assert not have_i_been_pwned("xxxxx") + queue = FakeEventQueue() + + assert not _check_have_i_been_pwned("xxxxx", queue) + assert len(queue) == 0 def test_set_new_password_ok(monkeypatch): @@ -242,11 +259,16 @@ def test_set_new_password_ok(monkeypatch): from ordr3.security import get_passlib_context user = User(*list("ABCDEFG")) - monkeypatch.setattr(services, "have_i_been_pwned", lambda x: False) - result = services.set_new_password(user, "1234567890123456") + queue = FakeEventQueue() + monkeypatch.setattr( + services, "_check_have_i_been_pwned", lambda x, y: False + ) + + result = services.set_new_password(user, "1234567890123456", queue) + assert result assert get_passlib_context().verify("1234567890123456", user.password) - assert result == [] + assert len(queue) == 0 def test_set_new_password_to_short(monkeypatch): @@ -255,12 +277,17 @@ def test_set_new_password_to_short(monkeypatch): from ordr3.security import get_passlib_context user = User(*list("ABCDEFG")) - monkeypatch.setattr(services, "have_i_been_pwned", lambda x: False) - result = services.set_new_password(user, "1") + queue = FakeEventQueue() + monkeypatch.setattr( + services, "_check_have_i_been_pwned", lambda x, y: False + ) + result = services.set_new_password(user, "1", queue) + + assert not result assert get_passlib_context().verify("1", user.password) - assert len(result) == 1 - assert result[0].message.startswith("Your password is quite short") + assert len(queue) == 1 + assert queue[0].text.startswith("Your password is quite short") def test_set_new_password_breached(monkeypatch): @@ -269,12 +296,15 @@ def test_set_new_password_breached(monkeypatch): from ordr3.security import get_passlib_context user = User(*list("ABCDEFG")) - monkeypatch.setattr(services, "have_i_been_pwned", lambda x: True) - result = services.set_new_password(user, "1234567890123456") + queue = FakeEventQueue() + monkeypatch.setattr( + services, "_check_have_i_been_pwned", lambda x, y: True + ) + result = services.set_new_password(user, "1234567890123456", queue) + assert not result assert get_passlib_context().verify("1234567890123456", user.password) - assert len(result) == 1 - assert result[0].message.startswith("This password appears in a breach") + assert len(queue) == 0 # no item in que due to monkeypatch def test_set_new_password_to_short_and_breached(monkeypatch): @@ -283,10 +313,13 @@ def test_set_new_password_to_short_and_breached(monkeypatch): from ordr3.security import get_passlib_context user = User(*list("ABCDEFG")) - monkeypatch.setattr(services, "have_i_been_pwned", lambda x: True) - result = services.set_new_password(user, "1") + queue = FakeEventQueue() + monkeypatch.setattr( + services, "_check_have_i_been_pwned", lambda x, y: True + ) + result = services.set_new_password(user, "1", queue) + assert not result assert get_passlib_context().verify("1", user.password) - assert len(result) == 2 - assert result[0].message.startswith("Your password is quite short") - assert result[1].message.startswith("This password appears in a breach") + assert len(queue) == 1 # only one item in que due to monkeypatch + assert queue[0].text.startswith("Your password is quite short")