''' Models for User Accounts and Roles ''' import enum import uuid from datetime import datetime, timedelta from sqlalchemy import ( Column, Date, DateTime, Enum, ForeignKey, Integer, Text, Unicode ) from sqlalchemy.orm import relationship from .meta import Base, JsonEncoder class Role(enum.Enum): ''' roles of user accounts ''' UNVALIDATED = 'unvalidated' #: new user, email not validated NEW = 'new' #: new user, email validated, not active USER = 'user' #: standard user, may place and view orders PURCHASER = 'purchaser' #: privileged user, may edit orders ADMIN = 'admin' #: fully privileged user INACTIVE = 'inactive' #: user that is no longer active ("deleted") @property def principal(self): ''' returns the principal identifier of the role ''' return 'role:' + self.name.lower() def __str__(self): ''' string representation ''' return self.name.capitalize() class TokenSubject(enum.Enum): ''' Email Token Subjects for user accounts ''' REGISTRATION = 'registration' #: validate email for new user RESET_PASSWORD = 'reset_password' #: reset a forgotten password # database driven models class User(Base): ''' A user of the application ''' __tablename__ = 'users' #: primary key id = Column(Integer, primary_key=True) #: unique user name username = Column(Text, nullable=False, unique=True) #: hashed password, see :mod:`ordr.security` password_hash = Column(Text, nullable=False) #: role of the user, see :class:`ordr.models.account.Role` role = Column(Enum(Role), nullable=False) first_name = Column(Text, nullable=False) last_name = Column(Text, nullable=False) email = Column(Text, nullable=False, unique=True) date_created = Column(Date, nullable=False, default=datetime.utcnow) #: tokens for new user registration and forgotten passwords tokens = relationship( 'Token', back_populates='owner', cascade="all, delete-orphan" ) @property def principal(self): ''' returns the principal identifier for the user ''' return 'user:{}'.format(self.id) @property def principals(self): ''' returns all principal identifiers for the user including roles ''' principals = [self.principal, self.role.principal] if self.role is Role.PURCHASER: # a purchaser is also a user principals.append(Role.USER.principal) elif self.role is Role.ADMIN: # an admin is also a purchaser and a user principals.append(Role.PURCHASER.principal) principals.append(Role.USER.principal) return principals @property def is_active(self): ''' is true if the user has an active role ''' return self.role in {Role.USER, Role.PURCHASER, Role.ADMIN} def set_password(self, password): ''' hashes a password using :mod:`ordr.security.passlib_context` ''' from ordr.security import password_context self.password_hash = password_context.hash(password) def check_password(self, password): ''' checks a password against a stored password hash if the password algorithm is considered deprecated, the stored hash will be updated using the current algorithm ''' from ordr.security import password_context ok, new_hash = password_context.verify_and_update( password, self.password_hash ) if not ok: # password does not match, return False return False elif new_hash: # algorithm is deprecated, update hash with new algorithm self.password_hash = new_hash # password match, return True return True def issue_token(self, request, subject, payload=None): ''' issues a token for mail change, password reset or user verification :param pyramid.request.Request request: the current request object :param ordr.models.account.TokenSubject subject: kind of token :param payload: extra data to store with the token, JSON serializable :rtype: (str) unique hash to access the token ''' return Token.issue(request, self, subject, payload) def __str__(self): ''' string representation ''' return str(self.username) class Token(Base): ''' Tokens for mail change, account verification and password reset ''' __tablename__ = 'tokens' #: hash identifyer of the token hash = Column(Unicode, primary_key=True) #: :class:`ordr.models.account.TokenSubject` subject = Column(Enum(TokenSubject), nullable=False) #: token expires at this date and time expires = Column(DateTime, nullable=False) #: additional data to attach to a token payload = Column(JsonEncoder, nullable=True) #: the user_id the token belongs to owner_id = Column(Integer, ForeignKey('users.id')) #: the user the token belongs to owner = relationship('User', back_populates='tokens') @classmethod def issue(cls, request, owner, subject, payload=None): ''' issues a token for password reset or user verification if the expiry keys for the token is not set in the app configuration, the token will expire in five minutes. to set the expiry time in the conig use `token_expiry.` prefix followed by the lowercase name of the token subject and a time in minutes. For example, to give an active user two hours time to verify a registration use `token_expiry.registration = 120` :param pyramid.request.Request request: the current request object :param ordr2.models.account.TokenSubject subject: kind of token :param ordr2.models.account.User owner: account the token is issued for :param payload: extra data to store with the token, JSON serializable :rtype: ordr2.models.account.Token ''' settings_key = 'token_expiry.' + subject.name.lower() minutes = request.registry.settings.get(settings_key, 5) expires = datetime.utcnow() + timedelta(minutes=int(minutes)) return cls( hash=uuid.uuid4().hex, subject=subject, payload=payload, owner=owner, expires=expires ) @classmethod def retrieve(cls, request, hash, subject=None): ''' returns a token from the database The database is queried for a token with the given hash. If an optional subject is given, the query will search only for tokens of this kind. The method will return None if a token could not be found or the token has already expired. If the token has expired, it will be deleted from the database :param pyramid.request.Request request: the current request object :param str hash: token hash :param ordr2.models.account.TokenSubject subject: kind of token :rtype: ordr2.models.account.Token or None ''' query = request.dbsession.query(cls).filter_by(hash=hash) if subject: query = query.filter_by(subject=subject) token = query.first() if token is None: return None elif token.expires < datetime.utcnow(): request.dbsession.delete(token) return None return token