diff --git a/ordr/models/__init__.py b/ordr/models/__init__.py index 4b5351e..d0e6e5d 100644 --- a/ordr/models/__init__.py +++ b/ordr/models/__init__.py @@ -5,7 +5,7 @@ import zope.sqlalchemy # import or define all models here to ensure they are attached to the # Base.metadata prior to any initialization routines -from .mymodel import MyModel # flake8: noqa +from .account import Role, Token, TokenSubject, User # flake8: noqa # run configure_mappers after defining all of the models to ensure # all relationships can be setup diff --git a/ordr/models/account.py b/ordr/models/account.py index ad1eb18..d9439c3 100644 --- a/ordr/models/account.py +++ b/ordr/models/account.py @@ -1,6 +1,23 @@ ''' Models for User Accounts and Roles ''' import enum +import uuid + +from datetime import datetime, timedelta +from sqlalchemy import ( + Column, + Date, + DateTime, + Enum, + ForeignKey, + Integer, + Text, + Unicode + ) +from sqlalchemy.orm import relationship + + +from .meta import Base, JsonEncoder class Role(enum.Enum): @@ -21,3 +38,159 @@ class Role(enum.Enum): def __str__(self): ''' string representation ''' return self.name.capitalize() + + +class TokenSubject(enum.Enum): + ''' Email Token Subjects for user accounts ''' + + REGISTRATION = 'registration' #: validate email for new user + RESET_PASSWORD = 'reset_password' #: reset a forgotten password + + +# database driven models + +class User(Base): + ''' A user of the application ''' + + __tablename__ = 'users' + + #: primary key + id = Column(Integer, primary_key=True) + + #: unique user name + username = Column(Text, nullable=False, unique=True) + + #: hashed password, see :mod:`ordr.security` + password_hash = Column(Text, nullable=False) + + #: role of the user, see :class:`ordr.models.account.Role` + role = Column(Enum(Role), nullable=False) + + first_name = Column(Text, nullable=False) + last_name = Column(Text, nullable=False) + email = Column(Text, nullable=False, unique=True) + date_created = Column(Date, nullable=False, default=datetime.utcnow) + + #: tokens for new user registration and forgotten passwords + tokens = relationship( + 'Token', + back_populates='owner', + cascade="all, delete-orphan" + ) + + @property + def principal(self): + ''' returns the principal identifier for the user ''' + return 'user:{}'.format(self.id) + + @property + def all_principals(self): + ''' returns all principal identifiers for the user including roles ''' + principals = [self.principal, self.role.principal] + if self.role is Role.PURCHASER: + # a purchaser is also a user + principals.append(Role.USER.principal) + elif self.role is Role.ADMIN: + # an admin is also a purchaser and a user + principals.append(Role.PURCHASER.principal) + principals.append(Role.USER.principal) + return principals + + @property + def is_active(self): + ''' is true if the user has an active role ''' + return self.role in {Role.USER, Role.PURCHASER, Role.ADMIN} + + def set_password(self, password): + ''' hashes a password using :mod:`ordr.security.passlib_context` ''' + from ordr.security import password_context + self.password_hash = password_context.hash(password) + + def check_password(self, password): + ''' checks a password against a stored password hash + + if the password algorithm is considered deprecated, the stored hash + will be updated using the current algorithm + ''' + from ordr.security import password_context + ok, new_hash = password_context.verify_and_update( + password, + self.password_hash + ) + + if not ok: + # password does not match, return False + return False + elif new_hash: + # algorithm is deprecated, update hash with new algorithm + self.password_hash = new_hash + + # password match, return True + return True + + def issue_token(self, request, subject, payload=None): + ''' issues a token for mail change, password reset or user verification + + :param pyramid.request.Request request: the current request object + :param ordr.models.account.TokenSubject subject: kind of token + :param payload: extra data to store with the token, JSON serializable + :rtype: (str) unique hash to access the token + ''' + return Token.issue(request, self, subject, payload) + + def __str__(self): + ''' string representation ''' + return str(self.username) + + +class Token(Base): + ''' Tokens for mail change, account verification and password reset ''' + + __tablename__ = 'tokens' + + #: hash identifyer of the token + hash = Column(Unicode, primary_key=True) + + #: :class:`ordr.models.account.TokenSubject` + subject = Column(Enum(TokenSubject), nullable=False) + + #: token expires at this date and time + expires = Column(DateTime, nullable=False) + + #: additional data to attach to a token + payload = Column(JsonEncoder, nullable=True) + + #: the user_id the token belongs to + owner_id = Column(Integer, ForeignKey('users.id')) + + #: the user the token belongs to + owner = relationship('User', back_populates='tokens') + + @classmethod + def issue(cls, request, owner, subject, payload=None): + ''' issues a token for password reset or user verification + + if the expiry keys for the token is not set in the app configuration, + the token will expire in five minutes. + + to set the expiry time in the conig use `token_expiry.` prefix followed + by the lowercase name of the token subject and a time in minutes. For + example, to give an active user two hours time to verify a registration + use `token_expiry.registration = 120` + + :param pyramid.request.Request request: the current request object + :param ordr2.models.account.TokenSubject subject: kind of token + :param ordr2.models.account.User owner: account the token is issued for + :param payload: extra data to store with the token, JSON serializable + :rtype: ordr2.models.account.Token + ''' + settings_key = 'token_expiry.' + subject.name.lower() + minutes = request.registry.settings.get(settings_key, 5) + expires = datetime.utcnow() + timedelta(minutes=int(minutes)) + return cls( + hash=uuid.uuid4().hex, + subject=subject, + payload=payload, + owner=owner, + expires=expires + ) diff --git a/ordr/models/mymodel.py b/ordr/models/mymodel.py deleted file mode 100644 index 8c9d701..0000000 --- a/ordr/models/mymodel.py +++ /dev/null @@ -1,18 +0,0 @@ -from sqlalchemy import ( - Column, - Index, - Integer, - Text, - ) - -from .meta import Base - - -class MyModel(Base): - __tablename__ = 'models' - id = Column(Integer, primary_key=True) - name = Column(Text) - value = Column(Integer) - - -Index('my_index', MyModel.name, unique=True, mysql_length=255) diff --git a/ordr/scripts/initializedb.py b/ordr/scripts/initializedb.py index 7307ecc..6539938 100644 --- a/ordr/scripts/initializedb.py +++ b/ordr/scripts/initializedb.py @@ -15,7 +15,7 @@ from ..models import ( get_session_factory, get_tm_session, ) -from ..models import MyModel +# from ..models import Role, Token, TokenSubject, User def usage(argv): @@ -41,5 +41,4 @@ def main(argv=sys.argv): with transaction.manager: dbsession = get_tm_session(session_factory, transaction.manager) - model = MyModel(name='one', value=1) - dbsession.add(model) + dbsession.add() diff --git a/ordr/tests/_functional/__init__.py b/ordr/tests/_functional/__init__.py index adb6358..6c9c852 100644 --- a/ordr/tests/_functional/__init__.py +++ b/ordr/tests/_functional/__init__.py @@ -15,6 +15,11 @@ class CustomTestApp(webtest.TestApp): pass +def create_users(dbsession): + ''' create example users ''' + pass + + @pytest.fixture(scope='module') def testapp(): ''' fixture for using webtest ''' @@ -32,7 +37,7 @@ def testapp(): with transaction.manager: # set up test data here dbsession = get_tm_session(session_factory, transaction.manager) - # create_users(dbsession) + create_users(dbsession) yield testapp diff --git a/ordr/tests/account.py b/ordr/tests/account.py index 068b9cb..b509cc0 100644 --- a/ordr/tests/account.py +++ b/ordr/tests/account.py @@ -1,12 +1,10 @@ -'UNVALIDATED' -'NEW' -'USER' -'PURCHASER' -'ADMIN' -'INACTIVE' - import pytest +from datetime import datetime, timedelta +from pyramid.testing import DummyRequest + +from ordr.tests import app_config # noqa: F401 + @pytest.mark.parametrize( 'key,result', [('NEW', 'role:new'), ('USER', 'role:user')] @@ -24,4 +22,139 @@ def test_role__str__(key, result): from ordr.models.account import Role subject = Role[key] assert str(subject) == result + + +@pytest.mark.parametrize('id_', [1, 2, 5, 123]) +def test_user_principal(id_): + from ordr.models.account import User + user = User(id=id_) + assert user.principal == f'user:{id_}' + + +@pytest.mark.parametrize( + 'name, principals', [ + ('UNVALIDATED', ['role:unvalidated']), + ('NEW', ['role:new']), + ('USER', ['role:user']), + ('PURCHASER', ['role:purchaser', 'role:user']), + ('ADMIN', ['role:admin', 'role:purchaser', 'role:user']), + ('INACTIVE', ['role:inactive']), + ] + ) +def test_user_all_principals(name, principals): + from ordr.models.account import User, Role + user = User(id=1, role=Role[name]) + expected = ['user:1'] + expected.extend(principals) + assert expected == user.all_principals + +@pytest.mark.parametrize( + 'name, expected', [ + ('UNVALIDATED', False), + ('NEW', False), + ('USER', True), + ('PURCHASER', True), + ('ADMIN', True), + ('INACTIVE', False), + ] + ) +def test_user_is_active(name, expected): + from ordr.models.account import User, Role + user = User(id=1, role=Role[name]) + assert expected == user.is_active + + +def test_user_set_password(): + from ordr.models.account import User + from ordr.security import password_context + password_context.update(schemes=['argon2']) + user = User() + assert user.password_hash is None + user.set_password('password') + assert user.password_hash.startswith('$argon2') + + +@pytest.mark.parametrize( + 'password,expected', [ + ('', False), + ('wrong', False), + ('password', True), + ] + ) +def test_user_check_password(password, expected): + from ordr.models.account import User + from ordr.security import password_context + password_context.update(schemes=['argon2']) + hash = ('$argon2i$v=19$m=512,t=2,p=2$' + 'YcyZMyak9D7nvFfKmVOq1Q$fnzNh58HWfvxHvRDGjhTqA' + ) + user = User(password_hash=hash) + assert user.check_password(password) == expected + + +def test_user_check_password_updates_old_sheme(): + from ordr.models.account import User + from ordr.security import password_context + password_context.update( + schemes=['argon2', 'bcrypt'], + default='argon2', + deprecated='auto' + ) + old_hash = '$2b$12$6ljSfpLaXBeEVOeaP1scUe6IAa0cztM.UBbjc1PdrI4j0vwgoYgpi' + user = User(password_hash=old_hash) + assert user.check_password('password') + assert user.password_hash.startswith('$argon2') + assert user.check_password('password') + + +def test_user__str__(): + from ordr.models.account import User + user = User(username='Eric Idle') + assert str(user) == 'Eric Idle' + + +def test_user_issue_token(app_config): # noqa: F811 + from ordr.models.account import User, Token, TokenSubject + request = DummyRequest() + user = User() + token = user.issue_token(request, TokenSubject.REGISTRATION, {'foo': 1}) + assert isinstance(token, Token) + assert token.hash is not None + assert token.subject == TokenSubject.REGISTRATION + assert token.payload == {'foo': 1} + assert token.owner == user + + +def test_token_issue_token(app_config): # noqa: F811 + from ordr.models.account import User, Token, TokenSubject + request = DummyRequest() + user = User() + token = Token.issue(request, user, TokenSubject.REGISTRATION, {'foo': 1}) + expected_expires = datetime.utcnow() + timedelta(minutes=5) + assert isinstance(token, Token) + assert token.hash is not None + assert token.subject == TokenSubject.REGISTRATION + assert token.payload == {'foo': 1} + assert token.owner == user + assert token.expires.timestamp() == pytest.approx( + expected_expires.timestamp(), + abs=1 + ) + + +@pytest.mark.parametrize( # noqa: F811 + 'subject,delta', [('REGISTRATION', 5), ('RESET_PASSWORD', 10)] + ) +def test_token_issue_token_time_from_settings(app_config, subject, delta): + from ordr.models.account import User, Token, TokenSubject + request = DummyRequest() + request.registry.settings['token_expiry.reset_password'] = 10 + user = User() + token_subject = TokenSubject[subject] + token = Token.issue(request, user, token_subject, None) + expected_expires = datetime.utcnow() + timedelta(minutes=delta) + assert token.expires.timestamp() == pytest.approx( + expected_expires.timestamp(), + abs=1 + ) diff --git a/ordr/tests/resources/__init__.py b/ordr/tests/resources/__init__.py index 8b13789..52dec3b 100644 --- a/ordr/tests/resources/__init__.py +++ b/ordr/tests/resources/__init__.py @@ -1 +1 @@ - +''' test package for resources ''' diff --git a/setup.py b/setup.py index cf2bff3..c8b4fe2 100644 --- a/setup.py +++ b/setup.py @@ -9,6 +9,8 @@ with open(os.path.join(here, 'CHANGES.txt')) as f: CHANGES = f.read() requires = [ + 'argon2_cffi', + 'bcrypt', 'passlib', 'plaster_pastedeploy', 'pyramid >= 1.9a',