Browse Source

setting up pyramid app: security

funding-tag
Holger Frey 5 years ago
parent
commit
278774253e
  1. 14
      ordr3/adapters.py
  2. 16
      ordr3/models.py
  3. 1
      ordr3/repo.py
  4. 54
      ordr3/security.py
  5. 24
      tests/test_models.py

14
ordr3/adapters.py

@ -17,7 +17,7 @@ from sqlalchemy import ( @@ -17,7 +17,7 @@ from sqlalchemy import (
from sqlalchemy.orm import mapper, relationship, sessionmaker
from sqlalchemy.schema import MetaData
from . import models
from . import repo, models
# Recommended naming convention used by Alembic, as various different database
# providers will autogenerate vastly different names making migrations more
@ -132,6 +132,13 @@ def get_tm_session(session_factory, transaction_manager): @@ -132,6 +132,13 @@ def get_tm_session(session_factory, transaction_manager):
return dbsession
def get_repo_with_session(session_factory, request):
""" returns an sql alchemy repository with database session configured """
# request.tm is the transaction manager used by pyramid_tm
session = get_tm_session(session_factory, request.tm)
return repo.SqlAlchemyRepository(session)
def includeme(config):
"""
Initialize the model for a Pyramid app.
@ -153,9 +160,8 @@ def includeme(config): @@ -153,9 +160,8 @@ def includeme(config):
# make request.dbsession available for use in Pyramid
config.add_request_method(
# r.tm is the transaction manager used by pyramid_tm
lambda r: get_tm_session(session_factory, r.tm),
"dbsession",
lambda r: get_repo_with_session(session_factory, r),
"repo",
reify=True,
)

16
ordr3/models.py

@ -27,6 +27,11 @@ class UserRole(enum.Enum): @@ -27,6 +27,11 @@ class UserRole(enum.Enum):
ADMIN = 4
INACTIVE = 5
@property
def principal(self):
""" returns the principal identifier of the role """
return "role:" + self.name.lower()
class Model:
def __hash__(self):
@ -164,12 +169,17 @@ class User(Model): @@ -164,12 +169,17 @@ class User(Model):
def principals(self):
tmp = [self.principal]
if self.role in {UserRole.PURCHASER, UserRole.ADMIN}:
tmp.append("role:user")
tmp.append(UserRole.USER.principal)
if self.role == UserRole.ADMIN:
tmp.append("role:purchaser")
tmp.append("role:" + self.role.name.lower())
tmp.append(UserRole.PURCHASER.principal)
tmp.append(self.role.principal)
return tmp
@property
def is_active(self):
""" check if it is an active user account """
return self.role in {UserRole.USER, UserRole.PURCHASER, UserRole.ADMIN}
def __hash__(self):
items = sorted(self.__dict__.items())
content = ((k, v) for k, v in items if not k.startswith("_"))

1
ordr3/repo.py

@ -11,7 +11,6 @@ class AbstractOrderRepository(abc.ABC): @@ -11,7 +11,6 @@ class AbstractOrderRepository(abc.ABC):
def __init__(self, session):
self.session = session
@abc.abstractmethod
def add_order(self, order):
""" add an order to the datastore """

54
ordr3/security.py

@ -0,0 +1,54 @@ @@ -0,0 +1,54 @@
""" User Authentication and Authorization """
from pyramid.security import Everyone, Authenticated
from sqlalchemy.orm.exc import NoResultFound
from pyramid.authorization import ACLAuthorizationPolicy
from pyramid.authentication import AuthTktAuthenticationPolicy
class AuthenticationPolicy(AuthTktAuthenticationPolicy):
""" How to authenticate users """
def authenticated_userid(self, request):
""" returns the id of an authenticated user
heavy lifting done in get_user() attached to request
"""
user = request.user
if user is not None:
return user.id
def effective_principals(self, request):
""" returns a list of principals for the user """
principals = [Everyone]
user = request.user
if user is not None:
principals.append(Authenticated)
principals.extend(user.principals)
return principals
def get_user(request):
""" retrieves the user object by the unauthenticated user id """
user_id = request.unauthenticated_userid
if user_id is None:
return None
try:
user = request.repo.get_user(user_id)
return user if user.is_active else None
except NoResultFound:
return None
def includeme(config):
""" initializing authentication and authorization for the Pyramid app
Activate this setup using ``config.include('ordr2.security')``.
"""
settings = config.get_settings()
authn_policy = AuthenticationPolicy(
settings["auth.secret"], hashalg="sha512",
)
config.set_authentication_policy(authn_policy)
config.set_authorization_policy(ACLAuthorizationPolicy())
config.add_request_method(get_user, "user", reify=True)

24
tests/test_models.py

@ -116,9 +116,33 @@ def test_user_principals(role_str, roles): @@ -116,9 +116,33 @@ def test_user_principals(role_str, roles):
assert user.principals == ["user:A"] + roles
@pytest.mark.parametrize(
"role_str,expected",
[
("NEW", False),
("USER", True),
("PURCHASER", True),
("ADMIN", True),
("INACTIVE", False),
],
)
def test_user_is_active(role_str, expected):
from ordr3.models import User, UserRole
user = User(*list("ABCDEF"), UserRole[role_str])
assert user.is_active == expected
def test_user_repr():
from ordr3.models import User
user = User(*list("ABCDEFG"))
assert repr(user) == "<ordr3.models.User id=A, username=B>"
def test_user_role_rincipal():
from ordr3.models import UserRole
assert UserRole.INACTIVE.principal == "role:inactive"

Loading…
Cancel
Save