Browse Source

added events

funding-tag
Holger Frey 5 years ago
parent
commit
84659986e6
  1. 1
      ordr3/__init__.py
  2. 47
      ordr3/events.py
  3. 3
      ordr3/models.py
  4. 70
      ordr3/services.py
  5. 12
      ordr3/views/__init__.py
  6. 5
      ordr3/views/account.py
  7. 83
      tests/test_services.py

1
ordr3/__init__.py

@ -28,6 +28,7 @@ def main(global_config, **settings):
config.set_root_factory(resources.Root) config.set_root_factory(resources.Root)
config.include(".adapters") config.include(".adapters")
config.include(".events")
config.include(".resources") config.include(".resources")
config.include(".routes") config.include(".routes")
config.include(".security") config.include(".security")

47
ordr3/events.py

@ -0,0 +1,47 @@
from pyramid.events import subscriber
class Ordr3Event:
def __init__(self):
self.request = None
class FlashMessage(Ordr3Event):
def __init__(self, channel, text, more):
super().__init__()
self.channel = channel
self.text = text
self.more = more
@classmethod
def info(cls, text, more=""):
return cls("info", text, more)
@classmethod
def warning(cls, text, more=""):
return cls("warning", text, more)
@classmethod
def error(cls, text, more=""):
return cls("error", text, more)
@subscriber(FlashMessage)
def handle_flash_message(message):
if message.request is None:
return
session = message.request.session
session.flash(message, message.channel, allow_duplicate=False)
def emit(request, event):
event.request = request
request.registry.notify(event)
def includeme(config):
""" initializing Events sytem for the Pyramid app
Activate this setup using ``config.include('ordr2.events')``.
"""
config.add_request_method(emit, "emit")

3
ordr3/models.py

@ -1,8 +1,5 @@
import enum import enum
from datetime import datetime from datetime import datetime
from collections import namedtuple
FlashMessage = namedtuple("FlashMessage", ["message", "description"])
@enum.unique @enum.unique

70
ordr3/services.py

@ -5,7 +5,7 @@ from urllib.parse import urlparse
import requests import requests
from . import models, security from . import events, models, security
CONSUMABLE_STATI = { CONSUMABLE_STATI = {
models.OrderStatus.ORDERED, models.OrderStatus.ORDERED,
@ -13,6 +13,24 @@ CONSUMABLE_STATI = {
} }
MSG_SHORT_PASSWORD = events.FlashMessage.warning(
f"Your password is quite short.",
(
'Have a look at <a href="https://www.xkcd.com/936/">'
"this comic strip</a> why longer passwords are better."
),
)
MSG_PWNED_PASSWORD = events.FlashMessage.warning(
"This password appears in a breach or has been compromised.",
(
"Please consider changing your password. "
'<a href="/breached">Read this page</a> on why you '
"see this message."
),
)
def find_consumables(repo, repeat=3, days=365 * 2): def find_consumables(repo, repeat=3, days=365 * 2):
""" search for orders that are requested often """ """ search for orders that are requested often """
unsorted = _find_consumables(repo, repeat, days) unsorted = _find_consumables(repo, repeat, days)
@ -90,43 +108,31 @@ def check_vendor_name(repo, to_check):
return CheckVendorResult(vendor, True) return CheckVendorResult(vendor, True)
def set_new_password(user, password): def set_new_password(user, password, event_queue):
crypt_context = security.get_passlib_context() crypt_context = security.get_passlib_context()
user.password = crypt_context.hash(password) user.password = crypt_context.hash(password)
messages = [] length_ok = len(password) >= 16
if not length_ok:
pwd_length = len(password) event_queue.emit(MSG_SHORT_PASSWORD)
if pwd_length < 16:
messages.append( return length_ok and not check_have_i_been_pwned(password, event_queue)
models.FlashMessage(
f"Your password is quite short.",
( def check_have_i_been_pwned(password, event_queue):
f"Your Password has only {pwd_length} characters. " """ public function for checking haveibeenpwned
'Have a look at <a href="https://www.xkcd.com/936/">'
"this comic strip</a> why longer passwords are better." this is just a small shim to eas testing"""
), password_hash = hashlib.sha1(password.encode()).hexdigest()
) return _check_have_i_been_pwned(password_hash, event_queue)
)
sha1_hash = hashlib.sha1(password.encode()).hexdigest()
if have_i_been_pwned(sha1_hash):
messages.append(
models.FlashMessage(
"This password appears in a breach or has been compromised.",
(
"Please consider changing your password. "
'<a href="/breached">Read this page</a> on why you '
"see this message."
),
)
)
return messages def _check_have_i_been_pwned(password_hash, event_queue):
""" chech haveibeenpwned for password leak
This function does the heavy lifting for check_have_i_been_pwned()
"""
def have_i_been_pwned(password_hash):
head, tails = password_hash[:5], password_hash[5:].lower() head, tails = password_hash[:5], password_hash[5:].lower()
url = f"https://api.pwnedpasswords.com/range/{head}" url = f"https://api.pwnedpasswords.com/range/{head}"
headers = {"Add-Padding": "true"} headers = {"Add-Padding": "true"}
@ -137,6 +143,8 @@ def have_i_been_pwned(password_hash):
return False return False
lower_case_lines = (line.lower() for line in response.text.splitlines()) lower_case_lines = (line.lower() for line in response.text.splitlines())
for line in lower_case_lines: for line in lower_case_lines:
print(tails, line, line.startswith(tails))
if line.startswith(tails): if line.startswith(tails):
event_queue.emit(MSG_PWNED_PASSWORD)
return True return True
return False return False

12
ordr3/views/__init__.py

@ -4,20 +4,8 @@ some view helpers are defined here
""" """
from .. import models
def flash(request, channel, message, description=""):
""" small wrapper around request.session.flash """
msg = models.FlashMessage(message, description)
request.session.flash(msg, channel, allow_duplicate=False)
def includeme(config): def includeme(config):
""" adding request helpers """ adding request helpers
Activate this setup using ``config.include('ordr3.views')``. Activate this setup using ``config.include('ordr3.views')``.
""" """
# this allows to use the request object like this:
# request.flash(channel, message, description)
config.add_request_method(flash, "flash")

5
ordr3/views/account.py

@ -94,12 +94,9 @@ def register_new_user(context, request):
email=appstruct["email"], email=appstruct["email"],
role=models.UserRole.NEW, role=models.UserRole.NEW,
) )
warnings = services.set_new_password(account, appstruct["password"]) services.set_new_password(account, appstruct["password"])
request.repo.add_user(account) request.repo.add_user(account)
for message in warnings:
request.flash("warning", message.message, message.description)
return HTTPFound(request.resource_path(request.root, "registered")) return HTTPFound(request.resource_path(request.root, "registered"))

83
tests/test_services.py

@ -63,6 +63,11 @@ class FakePasslibContext:
return True, None return True, None
class FakeEventQueue(list):
def emit(self, event):
self.append(event)
@pytest.fixture @pytest.fixture
def prefilled_repo(): def prefilled_repo():
from itertools import count from itertools import count
@ -218,22 +223,34 @@ def test_check_vendor_name(input, name, found):
assert result.found == found assert result.found == found
def test_have_i_been_pwned_ok(): def test__check_have_i_been_pwned_ok():
from ordr3.services import have_i_been_pwned from ordr3.services import _check_have_i_been_pwned
queue = FakeEventQueue()
assert not _check_have_i_been_pwned("21BD2x", queue)
assert len(queue) == 0
assert not have_i_been_pwned("21BD2x")
def test__check_have_i_been_pwned_not_ok():
from ordr3.services import _check_have_i_been_pwned
def test_have_i_been_pwned_not_ok(): queue = FakeEventQueue()
from ordr3.services import have_i_been_pwned
assert have_i_been_pwned("21BD2008F2FF3F9F3AE0A2072D19CD17E971B33A") assert _check_have_i_been_pwned(
"21BD2008F2FF3F9F3AE0A2072D19CD17E971B33A", queue
)
assert len(queue) == 1
assert queue[0].text.startswith("This password appears in a breach")
def test_have_i_been_pwned_request_exception(): def test__check_have_i_been_pwned_request_exception():
from ordr3.services import have_i_been_pwned from ordr3.services import _check_have_i_been_pwned
assert not have_i_been_pwned("xxxxx") queue = FakeEventQueue()
assert not _check_have_i_been_pwned("xxxxx", queue)
assert len(queue) == 0
def test_set_new_password_ok(monkeypatch): def test_set_new_password_ok(monkeypatch):
@ -242,11 +259,16 @@ def test_set_new_password_ok(monkeypatch):
from ordr3.security import get_passlib_context from ordr3.security import get_passlib_context
user = User(*list("ABCDEFG")) user = User(*list("ABCDEFG"))
monkeypatch.setattr(services, "have_i_been_pwned", lambda x: False) queue = FakeEventQueue()
result = services.set_new_password(user, "1234567890123456") monkeypatch.setattr(
services, "_check_have_i_been_pwned", lambda x, y: False
)
result = services.set_new_password(user, "1234567890123456", queue)
assert result
assert get_passlib_context().verify("1234567890123456", user.password) assert get_passlib_context().verify("1234567890123456", user.password)
assert result == [] assert len(queue) == 0
def test_set_new_password_to_short(monkeypatch): def test_set_new_password_to_short(monkeypatch):
@ -255,12 +277,17 @@ def test_set_new_password_to_short(monkeypatch):
from ordr3.security import get_passlib_context from ordr3.security import get_passlib_context
user = User(*list("ABCDEFG")) user = User(*list("ABCDEFG"))
monkeypatch.setattr(services, "have_i_been_pwned", lambda x: False) queue = FakeEventQueue()
result = services.set_new_password(user, "1") monkeypatch.setattr(
services, "_check_have_i_been_pwned", lambda x, y: False
)
result = services.set_new_password(user, "1", queue)
assert not result
assert get_passlib_context().verify("1", user.password) assert get_passlib_context().verify("1", user.password)
assert len(result) == 1 assert len(queue) == 1
assert result[0].message.startswith("Your password is quite short") assert queue[0].text.startswith("Your password is quite short")
def test_set_new_password_breached(monkeypatch): def test_set_new_password_breached(monkeypatch):
@ -269,12 +296,15 @@ def test_set_new_password_breached(monkeypatch):
from ordr3.security import get_passlib_context from ordr3.security import get_passlib_context
user = User(*list("ABCDEFG")) user = User(*list("ABCDEFG"))
monkeypatch.setattr(services, "have_i_been_pwned", lambda x: True) queue = FakeEventQueue()
result = services.set_new_password(user, "1234567890123456") monkeypatch.setattr(
services, "_check_have_i_been_pwned", lambda x, y: True
)
result = services.set_new_password(user, "1234567890123456", queue)
assert not result
assert get_passlib_context().verify("1234567890123456", user.password) assert get_passlib_context().verify("1234567890123456", user.password)
assert len(result) == 1 assert len(queue) == 0 # no item in que due to monkeypatch
assert result[0].message.startswith("This password appears in a breach")
def test_set_new_password_to_short_and_breached(monkeypatch): def test_set_new_password_to_short_and_breached(monkeypatch):
@ -283,10 +313,13 @@ def test_set_new_password_to_short_and_breached(monkeypatch):
from ordr3.security import get_passlib_context from ordr3.security import get_passlib_context
user = User(*list("ABCDEFG")) user = User(*list("ABCDEFG"))
monkeypatch.setattr(services, "have_i_been_pwned", lambda x: True) queue = FakeEventQueue()
result = services.set_new_password(user, "1") monkeypatch.setattr(
services, "_check_have_i_been_pwned", lambda x, y: True
)
result = services.set_new_password(user, "1", queue)
assert not result
assert get_passlib_context().verify("1", user.password) assert get_passlib_context().verify("1", user.password)
assert len(result) == 2 assert len(queue) == 1 # only one item in que due to monkeypatch
assert result[0].message.startswith("Your password is quite short") assert queue[0].text.startswith("Your password is quite short")
assert result[1].message.startswith("This password appears in a breach")

Loading…
Cancel
Save