diff --git a/ordr2/models/account.py b/ordr2/models/account.py index a7915b3..db98378 100644 --- a/ordr2/models/account.py +++ b/ordr2/models/account.py @@ -1,18 +1,23 @@ ''' User Account and Roles Models ''' import enum +import uuid -from datetime import datetime +from datetime import datetime, timedelta from passlib.context import CryptContext from sqlalchemy import ( Column, Date, + DateTime, Enum, + ForeignKey, Integer, Text, + Unicode ) +from sqlalchemy.orm import relationship -from .meta import Base +from .meta import Base, JsonEncoder #: create a crypt context for password hashes @@ -21,6 +26,8 @@ from .meta import Base passlib_context = CryptContext() +# non-database models + class Role(enum.Enum): ''' roles of user accounts ''' @@ -52,6 +59,21 @@ class Role(enum.Enum): return self.value.capitalize() +class TokenSubject(enum.Enum): + ''' Token Subjects for changing user accounts ''' + + #: validate email address of freshly registered user + USER_REGISTRATION = 'user_registration' + + #: validate email change of active user + CHANGE_EMAIL = 'change_email' + + #: reset a forgotten password + RESET_PASSWORD = 'reset_password' + + +# database driven models + class User(Base): ''' A user of the application ''' @@ -71,6 +93,13 @@ class User(Base): email = Column(Text, nullable=False, unique=True) date_created = Column(Date, nullable=False, default=datetime.utcnow) + #: tokens for new user registration, email change and forgotten passwords + tokens = relationship( + 'Token', + back_populates='owner', + cascade="all, delete-orphan" + ) + @property def principal(self): ''' returns the principal identifier for the user ''' @@ -119,6 +148,76 @@ class User(Base): # password match, return True return True + def issue_token(self, request, subject, payload=None): + ''' issues a token for mail change, password reset or user verification + + :param request: + the current request object + :type request: + pyramid.request.Request + :param subject: + what the token is used for + :type subject: + ordr2.models.account.TokenSubject + :param **payload: + etra data to store with the token, must be JSON serializable + :rtype: + (str) unique hash to access the token + ''' + token = Token.issue(request, self, subject, payload) + return token.hash + def __str__(self): ''' string representation ''' return str(self.username) + + +class Token(Base): + ''' Tokens for mail change, account verification and password reset ''' + + __tablename__ = 'tokens' + hash = Column(Unicode, primary_key=True) + subject = Column(Enum(TokenSubject), nullable=False) + expires = Column(DateTime, nullable=False) + payload = Column(JsonEncoder, nullable=False) + owner_id = Column(Integer, ForeignKey('users.id')) + owner = relationship('User', back_populates='tokens') + + @classmethod + def issue(cls, request, owner, subject, payload=None): + ''' issues a token for mail change, password reset or user verification + + if the expiry keys for the token is not set in the app configuration, + the token will expire in five minutes. + + to set the expiry time in the conig use `token_expiry.` prefix followed + by the value of the token subject and a time in minutes. For example, + to give an active user two hours time to verify an email address change + use `token_expiry.change_email = 120` + + :param request: + the current request object + :type request: + pyramid.request.Request + :param subject: + what the token is used for + :type subject: + ordr2.models.account.TokenSubject + :param owner: + account the token is issued for + :type subject: + ordr2.models.account.User + :param **payload: + etra data to store with the token, must be JSON serializable + :rtype: + ordr2.models.account.Token + ''' + settings_key = 'token_expiry.' + subject.value + minutes = request.registry.settings.get(settings_key, 5) + expires = datetime.utcnow() + timedelta(minutes=int(minutes)) + return cls( + hash=uuid.uuid4().hex, + subject=subject, + payload=payload, + owner=owner, + expires=expires) diff --git a/ordr2/models/meta.py b/ordr2/models/meta.py index 24292e7..9bc1605 100644 --- a/ordr2/models/meta.py +++ b/ordr2/models/meta.py @@ -1,5 +1,10 @@ +''' SQL conventions and custom Encoders ''' + +import json + from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.schema import MetaData +from sqlalchemy.types import TypeDecorator, Unicode # Recommended naming convention used by Alembic, as various different database # providers will autogenerate vastly different names making migrations more @@ -15,3 +20,21 @@ NAMING_CONVENTION = { # setup metadata conventions and base declarative class metadata = MetaData(naming_convention=NAMING_CONVENTION) Base = declarative_base(metadata=metadata) + + +class JsonEncoder(TypeDecorator): + ''' Represents an a data structure as a json-encoded string. ''' + + impl = Unicode + + def process_bind_param(self, value, dialect): + ''' inbound (to database) ''' + if value is not None: + value = json.dumps(value) + return value + + def process_result_value(self, value, dialect): + ''' outbound (from database) ''' + if value is not None: + value = json.loads(value) + return value diff --git a/tests/models/account.py b/tests/models/account.py index 1b0c778..a2789c5 100644 --- a/tests/models/account.py +++ b/tests/models/account.py @@ -2,8 +2,13 @@ import pytest +from datetime import datetime, timedelta +from pyramid.testing import DummyRequest -# tests for users.Role +from .. import get_user, app_config + + +# tests for account.Role def test_role_principals(): ''' test Role.principal, a caluclated property ''' @@ -29,7 +34,7 @@ def test_role_str(): assert str(Role.INACTIVE) == 'Inactive' -# tests for users.User +# tests for account.User def test_user_principal(): ''' test the user principal calculated property ''' @@ -132,3 +137,47 @@ def test_user_string_representation(): user = User(username='FooBar') assert str(user) == 'FooBar' + + +def test_user_issue_token(app_config): + from ordr2.models.account import Token, TokenSubject + + request = DummyRequest() + request.registry.settings['token_expiry.change_email'] = 10 + user = get_user('user') + payload = {'test-key': 'test-data'} + hash = user.issue_token(request, TokenSubject.CHANGE_EMAIL, payload) + token = user.tokens[0] + + expected_expires = datetime.utcnow() + timedelta(minutes=10) + # one second drift is still considered ok + assert token.expires.timestamp() == pytest.approx( + expected_expires.timestamp(), + abs=1 + ) + assert token.hash == hash + assert token.owner == user + assert token.payload == payload + assert token.subject == TokenSubject.CHANGE_EMAIL + + +def test_token_issue(app_config): + from ordr2.models.account import Token, TokenSubject + + request = DummyRequest() + request.registry.settings['token_expiry.change_email'] = 10 + user = get_user('user') + payload = {'test-key': 'test-data'} + result = Token.issue(request, user, TokenSubject.CHANGE_EMAIL, payload) + + expected_expires = datetime.utcnow() + timedelta(minutes=10) + # one second drift is still considered ok + assert result.expires.timestamp() == pytest.approx( + expected_expires.timestamp(), + abs=1 + ) + assert len(result.hash) == 32 + assert result.owner == user + assert result.payload == payload + assert result.subject == TokenSubject.CHANGE_EMAIL + assert user.tokens[0] == result diff --git a/tests/models/meta.py b/tests/models/meta.py new file mode 100644 index 0000000..ebc6355 --- /dev/null +++ b/tests/models/meta.py @@ -0,0 +1,47 @@ +''' Tests for ordr2.models.meta ''' + +import pytest + + +# tests for JsonEncoder + +def test_json_encoder_bind_value(): + ''' test encoding to database for a given value ''' + from ordr2.models.meta import JsonEncoder + + encoder = JsonEncoder() + result = encoder.process_bind_param({'a': 'b'}, None) + + assert result == '{"a": "b"}' + + +def test_json_encoder_bind_none(): + ''' test encoding to database if the given value is None ''' + from ordr2.models.meta import JsonEncoder + + encoder = JsonEncoder() + result = encoder.process_bind_param(None, None) + + assert result is None + + +def test_json_encoder_result_value(): + ''' test encoding from database for a stored value ''' + from ordr2.models.meta import JsonEncoder + + json_string = '{"a": "b"}' + encoder = JsonEncoder() + result = encoder.process_result_value(json_string, None) + + assert result == {'a': 'b'} + + +def test_json_encoder_result_none(): + ''' test encoding from database if the stored value is None ''' + from ordr2.models.meta import JsonEncoder + + encoder = JsonEncoder() + result = encoder.process_result_value(None, None) + + assert result is None +