Browse Source

updated test to reflect new pyramid version

funding-tag
Holger Frey 4 years ago
parent
commit
d5727c6273
  1. 8
      Dockerfile
  2. 5
      Makefile
  3. BIN
      ordr3.sqlite
  4. 8
      ordr3/adapters.py
  5. 10
      ordr3/events.py
  6. 14
      ordr3/models.py
  7. 76
      ordr3/repo.py
  8. 34
      ordr3/resources.py
  9. 10
      ordr3/schemas/account.py
  10. 8
      ordr3/schemas/base.py
  11. 12
      ordr3/schemas/orders.py
  12. 2
      ordr3/scripts/migrate_db.py
  13. 89
      ordr3/security.py
  14. 6
      ordr3/services.py
  15. 2
      ordr3/views/account.py
  16. 6
      ordr3/views/orders.py
  17. 1
      pyproject.toml
  18. 4
      tests/functional/conftest.py
  19. 21
      tests/functional/test_login.py
  20. 23
      tests/functional/test_my_account.py
  21. 75
      tests/functional/test_order.py
  22. 47
      tests/functional/test_order_list.py
  23. 40
      tests/functional/test_password_reset.py
  24. 25
      tests/functional/test_registration.py
  25. 43
      tests/functional/test_user_edit.py
  26. 30
      tests/functional/test_vendors.py
  27. 34
      tests/test_services.py

8
Dockerfile

@ -30,14 +30,6 @@ WORKDIR /app @@ -30,14 +30,6 @@ WORKDIR /app
RUN pip install --upgrade pip
RUN pip install gunicorn
RUN pip install wheel
RUN pip install -r requirements.txt
RUN flit install --pth-file
# switch back to root to remove header files
#USER root
#RUN apk del libc-dev libffi-dev openssl-dev python3-dev
# switch to the created user to run the application
#USER deploy
CMD ["gunicorn", "--paster", "/app/production.ini", "-b", "0.0.0.0:8000"]

5
Makefile

@ -62,6 +62,9 @@ test: lint ## run tests quickly with the default Python @@ -62,6 +62,9 @@ test: lint ## run tests quickly with the default Python
testall: lint ## run tests quickly with the default Python
pytest tests
testfun: lint ## run tests quickly with the default Python
pytest tests -x -m "fun"
coverage: lint ## full test suite, check code coverage and open coverage report
pytest tests --cov=ordr3 -m "fun"
coverage html
@ -86,5 +89,5 @@ repo: devenv ## complete project setup with development environment and git repo @@ -86,5 +89,5 @@ repo: devenv ## complete project setup with development environment and git repo
git commit -m "import of project template"
git remote add origin https://git.cpi.imtek.uni-freiburg.de/CPI/ordr3.git
git push -u origin main --no-verify
.venv/bin/pre-commit install --install-hooks

BIN
ordr3.sqlite

Binary file not shown.

8
ordr3/adapters.py

@ -91,7 +91,7 @@ reset_token_table = Table( @@ -91,7 +91,7 @@ reset_token_table = Table(
def start_mappers():
""" maps data base tables to model objects """
"""maps data base tables to model objects"""
mapper(
models.OrderItem,
order_table,
@ -108,12 +108,12 @@ def start_mappers(): @@ -108,12 +108,12 @@ def start_mappers():
def get_engine(settings, prefix="sqlalchemy."):
""" returns a sqlalchemy engine from a settings dict """
"""returns a sqlalchemy engine from a settings dict"""
return engine_from_config(settings, prefix)
def get_session_factory(engine):
""" returns a sqlalchemy session factory for a db enging """
"""returns a sqlalchemy session factory for a db enging"""
factory = sessionmaker()
factory.configure(bind=engine)
return factory
@ -148,7 +148,7 @@ def get_tm_session(session_factory, transaction_manager): @@ -148,7 +148,7 @@ def get_tm_session(session_factory, transaction_manager):
def get_repo_with_session(session_factory, request):
""" returns an sql alchemy repository with database session configured """
"""returns an sql alchemy repository with database session configured"""
# request.tm is the transaction manager used by pyramid_tm
session = get_tm_session(session_factory, request.tm)
return repo.SqlAlchemyRepository(session)

10
ordr3/events.py

@ -35,7 +35,7 @@ class FlashMessage(Ordr3Event): @@ -35,7 +35,7 @@ class FlashMessage(Ordr3Event):
class EmailNotification(Ordr3Event):
""" base class for user notifications """
"""base class for user notifications"""
subject = None
template = None
@ -47,21 +47,21 @@ class EmailNotification(Ordr3Event): @@ -47,21 +47,21 @@ class EmailNotification(Ordr3Event):
class AccountActivationEmail(EmailNotification):
""" user notification for account activation """
"""user notification for account activation"""
subject = "[ordr] Your account was activated"
template = "ordr3:templates/emails/activation.jinja2"
class OrderStatusChangeEmail(EmailNotification):
""" user notification for order status change """
"""user notification for order status change"""
subject = "[ordr] Order Status Change"
template = "ordr3:templates/emails/status_change.jinja2"
class PasswordResetEmail(EmailNotification):
""" user notification for password reset link """
"""user notification for password reset link"""
subject = "[ordr] Password Reset"
template = "ordr3:templates/emails/password_reset.jinja2"
@ -79,7 +79,7 @@ def handle_flash_message_event(event): @@ -79,7 +79,7 @@ def handle_flash_message_event(event):
@subscriber(EmailNotification)
def notify_user(event):
""" notify a user about an event """
"""notify a user about an event"""
body = render(
event.template, {"user": event.user, "data": event.data}, event.request
)

14
ordr3/models.py

@ -33,7 +33,7 @@ class UserRole(enum.Enum): @@ -33,7 +33,7 @@ class UserRole(enum.Enum):
@property
def principal(self):
""" returns the principal identifier of the role """
"""returns the principal identifier of the role"""
return "role:" + self.name.lower()
@ -49,7 +49,7 @@ class Model: @@ -49,7 +49,7 @@ class Model:
class OrderItem(Model):
""" an ordered item """
"""an ordered item"""
# properties
id = None
@ -115,7 +115,7 @@ class OrderItem(Model): @@ -115,7 +115,7 @@ class OrderItem(Model):
return self.status not in (OrderStatus.OPEN, OrderStatus.HOLD)
def add_to_log(self, log_entry):
""" adds a log item to the status log """
"""adds a log item to the status log"""
if len(self.log) == 0:
self.created_by = log_entry.by
self.created_on = log_entry.date
@ -124,7 +124,7 @@ class OrderItem(Model): @@ -124,7 +124,7 @@ class OrderItem(Model):
class LogEntry(Model):
""" an entry in the order log """
"""an entry in the order log"""
order_id = None
status = None
@ -145,7 +145,7 @@ class LogEntry(Model): @@ -145,7 +145,7 @@ class LogEntry(Model):
class ProposedConsumable:
""" counting orders to find out if they are consumables """
"""counting orders to find out if they are consumables"""
def __init__(self, order):
self.order = order
@ -153,7 +153,7 @@ class ProposedConsumable: @@ -153,7 +153,7 @@ class ProposedConsumable:
class Vendor(Model):
""" a model for finding vendor names and their search terms """
"""a model for finding vendor names and their search terms"""
term = None
name = None
@ -203,7 +203,7 @@ class User(Model): @@ -203,7 +203,7 @@ class User(Model):
@property
def is_active(self):
""" check if it is an active user account """
"""check if it is an active user account"""
return self.role in {UserRole.USER, UserRole.PURCHASER, UserRole.ADMIN}
def __hash__(self):

76
ordr3/repo.py

@ -10,107 +10,107 @@ from . import models @@ -10,107 +10,107 @@ from . import models
class RepoItemNotFound(StopIteration):
""" repo error for a not found item """
"""repo error for a not found item"""
pass
class AbstractOrderRepository(abc.ABC):
""" Abstract base class for a datastore """
"""Abstract base class for a datastore"""
def __init__(self, session):
self.session = session
@abc.abstractmethod
def add_order(self, order):
""" add an order to the datastore """
"""add an order to the datastore"""
@abc.abstractmethod
def get_order(self, reference):
""" get an order from the datastore by primary key """
"""get an order from the datastore by primary key"""
@abc.abstractmethod
def delete_order(self, order):
""" remove an order from the datastore """
"""remove an order from the datastore"""
@abc.abstractmethod
def list_consumable_candidates(self, limit_date, statuses):
""" list orders that might be consumables """
"""list orders that might be consumables"""
@abc.abstractmethod
def add_user(self, user):
""" add a user to the datastore """
"""add a user to the datastore"""
@abc.abstractmethod
def delete_user(self, user):
""" removes a user from the datastore """
"""removes a user from the datastore"""
@abc.abstractmethod
def get_user(self, reference):
""" get a user from the datastore by primary key """
"""get a user from the datastore by primary key"""
@abc.abstractmethod
def get_user_by_username(self, reference):
""" get a user from the datastore by username """
"""get a user from the datastore by username"""
@abc.abstractmethod
def get_user_by_email(self, reference):
""" get a user from the datastore by email """
"""get a user from the datastore by email"""
@abc.abstractmethod
def search_vendor(self, reference):
""" search for a vendor by its canonical name """
"""search for a vendor by its canonical name"""
@abc.abstractmethod
def get_vendor_aggregates(self, reference):
""" list a all canonical names of vendors """
"""list a all canonical names of vendors"""
@abc.abstractmethod
def update_vendors(self, old_vendor, new_name, new_terms):
""" update autocorrect values of vendors """
"""update autocorrect values of vendors"""
@abc.abstractmethod
def add_reset_token(self, token):
""" add an password reset token """
"""add an password reset token"""
@abc.abstractmethod
def get_reset_token(self, reference):
""" add an password reset token """
"""add an password reset token"""
@abc.abstractmethod
def delete_reset_token(self, token):
""" deletes a password reset token """
"""deletes a password reset token"""
@abc.abstractmethod
def clear_stale_reset_tokens(self):
""" removes invalid reset tokens """
"""removes invalid reset tokens"""
class SqlAlchemyRepository(AbstractOrderRepository):
""" Repository implementation for SQLAlchemy """
"""Repository implementation for SQLAlchemy"""
def _add_item_to_db(self, item):
""" add any item to the database """
"""add any item to the database"""
self.session.add(item)
self.session.flush()
def _delete_item_from_db(self, item):
""" add any item to the database """
"""add any item to the database"""
self.session.delete(item)
self.session.flush()
def add_order(self, order):
""" add an order to the database """
"""add an order to the database"""
self._add_item_to_db(order)
def delete_order(self, order):
""" remove an order from the datastore """
"""remove an order from the datastore"""
for log_entry in order.log:
self.session.delete(log_entry)
self._delete_item_from_db(order)
def get_order(self, reference):
""" get an order from the database by primary key """
"""get an order from the database by primary key"""
try:
return (
self.session.query(models.OrderItem)
@ -121,7 +121,7 @@ class SqlAlchemyRepository(AbstractOrderRepository): @@ -121,7 +121,7 @@ class SqlAlchemyRepository(AbstractOrderRepository):
raise RepoItemNotFound from exc
def list_consumable_candidates(self, limit_date, statuses):
""" list orders that might be consumables """
"""list orders that might be consumables"""
return (
self.session.query(models.OrderItem)
.filter(models.OrderItem.created_on > limit_date)
@ -131,15 +131,15 @@ class SqlAlchemyRepository(AbstractOrderRepository): @@ -131,15 +131,15 @@ class SqlAlchemyRepository(AbstractOrderRepository):
)
def add_user(self, user):
""" add a user to the database """
"""add a user to the database"""
self._add_item_to_db(user)
def delete_user(self, user):
""" removes a user from the datastore """
"""removes a user from the datastore"""
self._delete_item_from_db(user)
def get_user(self, reference):
""" get a user from the database by primary key """
"""get a user from the database by primary key"""
try:
return (
self.session.query(models.User).filter_by(id=reference).one()
@ -148,7 +148,7 @@ class SqlAlchemyRepository(AbstractOrderRepository): @@ -148,7 +148,7 @@ class SqlAlchemyRepository(AbstractOrderRepository):
raise RepoItemNotFound from exc
def get_user_by_username(self, reference):
""" get a user from the database by username """
"""get a user from the database by username"""
try:
return (
self.session.query(models.User)
@ -159,7 +159,7 @@ class SqlAlchemyRepository(AbstractOrderRepository): @@ -159,7 +159,7 @@ class SqlAlchemyRepository(AbstractOrderRepository):
raise RepoItemNotFound from exc
def get_user_by_email(self, reference):
""" get a user from the database by email """
"""get a user from the database by email"""
try:
return (
self.session.query(models.User)
@ -170,7 +170,7 @@ class SqlAlchemyRepository(AbstractOrderRepository): @@ -170,7 +170,7 @@ class SqlAlchemyRepository(AbstractOrderRepository):
raise RepoItemNotFound from exc
def count_new_users(self):
""" count the number of new users that need approval """
"""count the number of new users that need approval"""
return (
self.session.query(models.User)
.filter(models.User.role == models.UserRole.NEW)
@ -178,7 +178,7 @@ class SqlAlchemyRepository(AbstractOrderRepository): @@ -178,7 +178,7 @@ class SqlAlchemyRepository(AbstractOrderRepository):
)
def search_vendor(self, reference):
""" search for a vendor by its canonical name """
"""search for a vendor by its canonical name"""
return (
self.session.query(models.Vendor)
.filter_by(term=reference)
@ -186,7 +186,7 @@ class SqlAlchemyRepository(AbstractOrderRepository): @@ -186,7 +186,7 @@ class SqlAlchemyRepository(AbstractOrderRepository):
)
def get_vendor_aggregates(self, reference):
""" list a all canonical names of vendors """
"""list a all canonical names of vendors"""
vendors = (
self.session.query(models.Vendor).filter_by(name=reference).all()
)
@ -196,7 +196,7 @@ class SqlAlchemyRepository(AbstractOrderRepository): @@ -196,7 +196,7 @@ class SqlAlchemyRepository(AbstractOrderRepository):
return models.VendorAggregate(vendors[0].name, terms)
def update_vendors(self, old_vendor, new_name, new_terms):
""" update autocorrect values of vendors """
"""update autocorrect values of vendors"""
# remove old vendor autocorrect values
self.session.query(models.Vendor).filter(
models.Vendor.name == old_vendor.name
@ -213,15 +213,15 @@ class SqlAlchemyRepository(AbstractOrderRepository): @@ -213,15 +213,15 @@ class SqlAlchemyRepository(AbstractOrderRepository):
self._add_item_to_db(new_vendor)
def add_reset_token(self, token):
""" add an password reset token """
"""add an password reset token"""
self._add_item_to_db(token)
def delete_reset_token(self, token):
""" deletes a password reset token """
"""deletes a password reset token"""
self._delete_item_from_db(token)
def get_reset_token(self, reference):
""" get a passowrd reset token from the database"""
"""get a passowrd reset token from the database"""
try:
return (
self.session.query(models.PasswordResetToken)
@ -235,7 +235,7 @@ class SqlAlchemyRepository(AbstractOrderRepository): @@ -235,7 +235,7 @@ class SqlAlchemyRepository(AbstractOrderRepository):
raise RepoItemNotFound from exc
def clear_stale_reset_tokens(self):
""" removes invalid reset tokens """
"""removes invalid reset tokens"""
self.session.query(models.PasswordResetToken).filter(
models.PasswordResetToken.valid_until < datetime.utcnow()
).delete()

34
ordr3/resources.py

@ -2,11 +2,11 @@ @@ -2,11 +2,11 @@
import abc
from pyramid.security import DENY_ALL, Allow, Everyone, Authenticated
from pyramid.authorization import DENY_ALL, Allow, Everyone, Authenticated
class BaseResource(abc.ABC):
""" Base Resource for all other resources """
"""Base Resource for all other resources"""
__parent__ = None # required by pyramid for location aware resources
__name__ = None # required by pyramid for location aware resources
@ -23,17 +23,17 @@ class BaseResource(abc.ABC): @@ -23,17 +23,17 @@ class BaseResource(abc.ABC):
@abc.abstractmethod
def __acl__(self):
""" Access controll list """
"""Access controll list"""
def __getitem__(self, key):
""" returns child resources """
"""returns child resources"""
child_node_class = self.nodes[key]
return child_node_class(key, self)
class User(BaseResource):
def __acl__(self):
""" access controll list """
"""access controll list"""
acl = [(Allow, "role:admin", "view"), (Allow, "role:admin", "edit")]
if not self.model.is_active:
acl.append((Allow, "role:admin", "delete"))
@ -42,17 +42,17 @@ class User(BaseResource): @@ -42,17 +42,17 @@ class User(BaseResource):
@classmethod
def from_model(cls, model, parent):
""" initializes a resource from an SQLalchemy object """
"""initializes a resource from an SQLalchemy object"""
return cls(model.username, parent, model)
class UserList(BaseResource):
def __acl__(self):
""" access controll list """
"""access controll list"""
return [(Allow, "role:admin", "view"), DENY_ALL]
def __getitem__(self, key):
""" returns child resources """
"""returns child resources"""
try:
user = self.request.repo.get_user_by_username(key)
return User.from_model(user, self)
@ -62,7 +62,7 @@ class UserList(BaseResource): @@ -62,7 +62,7 @@ class UserList(BaseResource):
class Order(BaseResource):
def __acl__(self):
""" access controll list """
"""access controll list"""
acl = [
(Allow, "role:user", "view"),
(Allow, "role:purchaser", "edit"),
@ -84,7 +84,7 @@ class Order(BaseResource): @@ -84,7 +84,7 @@ class Order(BaseResource):
class OrderList(BaseResource):
def __acl__(self):
""" access controll list """
"""access controll list"""
return [
(Allow, "role:user", "view"),
(Allow, "role:user", "add"),
@ -94,7 +94,7 @@ class OrderList(BaseResource): @@ -94,7 +94,7 @@ class OrderList(BaseResource):
]
def __getitem__(self, key):
""" returns child resources """
"""returns child resources"""
try:
order = self.request.repo.get_order(key)
return Order.from_model(order, self)
@ -104,7 +104,7 @@ class OrderList(BaseResource): @@ -104,7 +104,7 @@ class OrderList(BaseResource):
class Vendor(BaseResource):
def __acl__(self):
""" access controll list """
"""access controll list"""
acl = [
(Allow, "role:admin", "view"),
(Allow, "role:admin", "edit"),
@ -114,17 +114,17 @@ class Vendor(BaseResource): @@ -114,17 +114,17 @@ class Vendor(BaseResource):
@classmethod
def from_model(cls, model, parent):
""" initializes a resource from an model object """
"""initializes a resource from an model object"""
return cls(model.name, parent, model)
class VendorList(BaseResource):
def __acl__(self):
""" access controll list """
"""access controll list"""
return [(Allow, "role:admin", "view"), DENY_ALL]
def __getitem__(self, key):
""" returns child resources """
"""returns child resources"""
try:
aggregagtes = self.request.repo.get_vendor_aggregates(key)
return Vendor.from_model(aggregagtes, self)
@ -133,7 +133,7 @@ class VendorList(BaseResource): @@ -133,7 +133,7 @@ class VendorList(BaseResource):
class Root(BaseResource):
""" Root resource """
"""Root resource"""
__name__ = None
__parent__ = None
@ -144,7 +144,7 @@ class Root(BaseResource): @@ -144,7 +144,7 @@ class Root(BaseResource):
self.request = request
def __acl__(self):
""" access controll list """
"""access controll list"""
return [
(Allow, Everyone, "login"),
(Allow, Everyone, "logout"),

10
ordr3/schemas/account.py

@ -11,7 +11,7 @@ ROLES = [(role.name, role.name.capitalize()) for role in UserRole] @@ -11,7 +11,7 @@ ROLES = [(role.name, role.name.capitalize()) for role in UserRole]
@colander.deferred
def deferred_unique_username_validator(node, kw):
""" checks if an username is not registered already """
"""checks if an username is not registered already"""
def validate_unique_username(node, value):
request = kw.get("request")
@ -26,7 +26,7 @@ def deferred_unique_username_validator(node, kw): @@ -26,7 +26,7 @@ def deferred_unique_username_validator(node, kw):
@colander.deferred
def deferred_unique_email_validator(node, kw):
""" checks if an email is not registered already """
"""checks if an email is not registered already"""
email_validator = colander.Email()
def validate_unique_email(node, value):
@ -42,7 +42,7 @@ def deferred_unique_email_validator(node, kw): @@ -42,7 +42,7 @@ def deferred_unique_email_validator(node, kw):
class RegistrationSchema(CSRFSchema):
""" new user registration """
"""new user registration"""
user_name = colander.SchemaNode(
colander.String(),
@ -120,7 +120,7 @@ class ResetPasswordSchema(CSRFSchema): @@ -120,7 +120,7 @@ class ResetPasswordSchema(CSRFSchema):
class MyAccountSchema(CSRFSchema):
""" edit the own account """
"""edit the own account"""
user_name = colander.SchemaNode(
colander.String(),
@ -147,7 +147,7 @@ class MyAccountSchema(CSRFSchema): @@ -147,7 +147,7 @@ class MyAccountSchema(CSRFSchema):
class EditAccountSchema(CSRFSchema):
""" edit an account """
"""edit an account"""
user_name = colander.SchemaNode(
colander.String(),

8
ordr3/schemas/base.py

@ -7,14 +7,14 @@ from pyramid.csrf import get_csrf_token, check_csrf_token @@ -7,14 +7,14 @@ from pyramid.csrf import get_csrf_token, check_csrf_token
@colander.deferred
def deferred_csrf_default(node, kw):
""" sets the current csrf token """
"""sets the current csrf token"""
request = kw.get("request")
return get_csrf_token(request)
@colander.deferred
def deferred_csrf_validator(node, kw):
""" validates a submitted csrf token """
"""validates a submitted csrf token"""
def validate_csrf(node, value):
request = kw.get("request")
@ -28,7 +28,7 @@ def deferred_csrf_validator(node, kw): @@ -28,7 +28,7 @@ def deferred_csrf_validator(node, kw):
class CSRFSchema(colander.Schema):
""" base class for schemas with csrf validation """
"""base class for schemas with csrf validation"""
csrf_token = colander.SchemaNode(
colander.String(),
@ -39,7 +39,7 @@ class CSRFSchema(colander.Schema): @@ -39,7 +39,7 @@ class CSRFSchema(colander.Schema):
@classmethod
def as_form(cls, request, **kwargs):
""" returns the schema as a form """
"""returns the schema as a form"""
url = kwargs.pop("url", None)
if not url:
url = request.resource_url(request.context, request.view_name)

12
ordr3/schemas/orders.py

@ -78,7 +78,7 @@ class OrderItem(colander.Schema): @@ -78,7 +78,7 @@ class OrderItem(colander.Schema):
class MoneyInputSchema(colander.Schema):
""" custom schema for structured money and currency input """
"""custom schema for structured money and currency input"""
amount = colander.SchemaNode(
colander.Decimal(),
@ -99,7 +99,7 @@ class MoneyInputSchema(colander.Schema): @@ -99,7 +99,7 @@ class MoneyInputSchema(colander.Schema):
)
def __init__(self, *args, **kwargs):
""" define the custom schema templates """
"""define the custom schema templates"""
if "widget" not in kwargs:
readonly = kwargs.pop("readonly", False)
kwargs["widget"] = deform.widget.MappingWidget(
@ -168,7 +168,7 @@ class OrderOptionals(colander.Schema): @@ -168,7 +168,7 @@ class OrderOptionals(colander.Schema):
class EditOrderSchema(CSRFSchema):
""" edit an order """
"""edit an order"""
status = OrderStatus()
item = OrderItem()
@ -177,7 +177,7 @@ class EditOrderSchema(CSRFSchema): @@ -177,7 +177,7 @@ class EditOrderSchema(CSRFSchema):
@classmethod
def as_form(cls, request, **override):
""" returns the schema as a form """
"""returns the schema as a form"""
vendor_autocorrect_url = override.pop("autocorrect_url")
settings = {
@ -204,7 +204,7 @@ class EditOrderSchema(CSRFSchema): @@ -204,7 +204,7 @@ class EditOrderSchema(CSRFSchema):
class AddOrderSchema(CSRFSchema):
""" add an order """
"""add an order"""
item = OrderItem()
pricing = OrderPricing()
@ -212,7 +212,7 @@ class AddOrderSchema(CSRFSchema): @@ -212,7 +212,7 @@ class AddOrderSchema(CSRFSchema):
@classmethod
def as_form(cls, request, **override):
""" returns the schema as a form """
"""returns the schema as a form"""
vendor_autocorrect_url = override.pop("autocorrect_url")
settings = {

2
ordr3/scripts/migrate_db.py

@ -226,7 +226,7 @@ vendor_map = { @@ -226,7 +226,7 @@ vendor_map = {
def _query_table(cursor, table):
cursor.execute(f"SELECT * FROM {table}")
cursor.execute("SELECT * FROM :year", {"table": table})
columns = [d[0] for d in cursor.description]
return (dict(zip(columns, values)) for values in cursor)

89
ordr3/security.py

@ -1,48 +1,80 @@ @@ -1,48 +1,80 @@
""" User Authentication and Authorization """
from passlib.context import CryptContext
from pyramid.security import Everyone, Authenticated
from sqlalchemy.orm.exc import NoResultFound
from pyramid.authorization import ACLAuthorizationPolicy
from pyramid.authentication import AuthTktAuthenticationPolicy
from pyramid.authorization import Everyone, ACLHelper, Authenticated
from pyramid.authentication import AuthTktCookieHelper
class AuthenticationPolicy(AuthTktAuthenticationPolicy):
""" How to authenticate users """
class SecurityPolicy:
def __init__(self, secret):
self.helper = AuthTktCookieHelper(secret)
def authenticated_userid(self, request):
"""returns the id of an authenticated user
def identity(self, request):
# define our simple identity as None or a dict with
# userid and principals keys
identity = self.helper.identify(request)
if identity is None:
return None
userid = identity[
"userid"
] # identical to the deprecated request.unauthenticated_userid
# verify the userid, just like we did before with groupfinder
try:
user = request.repo.get_user(userid)
if not user.is_active:
return None
except NoResultFound:
return None
heavy lifting done in get_user() attached to request
"""
user = request.user
if user is not None:
# assuming the userid is valid, return a map with userid and principals
return {
"userid": user.id,
"principals": user.principals,
}
def authenticated_userid(self, request):
# defer to the identity logic to determine if the user id logged in
# and return None if they are not
identity = request.identity
try:
user = request.repo.get_user(identity["userid"])
return user.id
except (NoResultFound, TypeError):
return None
def effective_principals(self, request):
""" returns a list of principals for the user """
principals = [Everyone]
user = request.user
if user is not None:
principals.append(Authenticated)
principals.extend(user.principals)
return principals
def permits(self, request, context, permission):
# use the identity to build a list of principals, and pass them
# to the ACLHelper to determine allowed/denied
identity = request.identity
principals = {Everyone}
if identity is not None:
principals.add(Authenticated)
principals.update(identity["principals"])
return ACLHelper().permits(context, principals, permission)
def remember(self, request, userid, **kw):
return self.helper.remember(request, userid, **kw)
def forget(self, request, **kw):
return self.helper.forget(request, **kw)
def get_user(request):
""" retrieves the user object by the unauthenticated user id """
user_id = request.unauthenticated_userid
if user_id is None:
"""retrieves the user object by the unauthenticated user id"""
identity = request.identity
if identity is None:
return None
try:
user = request.repo.get_user(user_id)
user = request.repo.get_user(identity["userid"])
return user if user.is_active else None
except NoResultFound:
except (NoResultFound, TypeError):
return None
def get_passlib_context():
""" configures a passlib context and returns it """
"""configures a passlib context and returns it"""
ctx = CryptContext(
schemes=["argon2", "bcrypt"], default="argon2", deprecated=["bcrypt"]
)
@ -55,9 +87,6 @@ def includeme(config): @@ -55,9 +87,6 @@ def includeme(config):
Activate this setup using ``config.include('ordr2.security')``.
"""
settings = config.get_settings()
authn_policy = AuthenticationPolicy(
settings["auth.secret"], hashalg="sha512"
)
config.set_authentication_policy(authn_policy)
config.set_authorization_policy(ACLAuthorizationPolicy())
policy = SecurityPolicy(settings["auth.secret"])
config.set_security_policy(policy)
config.add_request_method(get_user, "user", reify=True)

6
ordr3/services.py

@ -30,13 +30,13 @@ MSG_PWNED_PASSWORD = events.FlashMessage.warning( @@ -30,13 +30,13 @@ MSG_PWNED_PASSWORD = events.FlashMessage.warning(
def find_consumables(repo, repeat=3, days=365 * 2):
""" search for orders that are requested often """
"""search for orders that are requested often"""
unsorted = _find_consumables(repo, repeat, days)
return sorted(unsorted, key=lambda o: o.cas_description)
def _find_consumables(repo, repeat=3, days=365 * 2):
""" helper function for find_consumables() implementation """
"""helper function for find_consumables() implementation"""
limit_date = datetime.utcnow() - timedelta(days=days)
relevant = repo.list_consumable_candidates(limit_date, CONSUMABLE_STATI)
counter = {}
@ -135,7 +135,7 @@ def check_have_i_been_pwned(password, event_queue): @@ -135,7 +135,7 @@ def check_have_i_been_pwned(password, event_queue):
"""public function for checking haveibeenpwned
this is just a small shim to eas testing"""
password_hash = hashlib.sha1(password.encode()).hexdigest()
password_hash = hashlib.sha1(password.encode()).hexdigest() # noqa: S303
return _check_have_i_been_pwned(password_hash, event_queue)

2
ordr3/views/account.py

@ -51,7 +51,7 @@ def check_credentials(context, request): @@ -51,7 +51,7 @@ def check_credentials(context, request):
context="ordr3:resources.Root", name="logout", permission="logout"
)
def logout(context, request):
""" logout of a user """
"""logout of a user"""
return HTTPFound(
request.resource_path(request.root, "login"), headers=forget(request)
)

6
ordr3/views/orders.py

@ -331,7 +331,7 @@ def new_order(context, request): @@ -331,7 +331,7 @@ def new_order(context, request):
renderer="ordr3:templates/orders/add.jinja2",
)
def place_new_order(context, request):
""" process the reorder form """
"""process the reorder form"""
autocorrect_url = request.resource_url(context, "vendor")
form = orders.AddOrderSchema.as_form(
request, autocorrect_url=autocorrect_url
@ -407,7 +407,7 @@ def edit_order(context, request): @@ -407,7 +407,7 @@ def edit_order(context, request):
renderer="ordr3:templates/orders/edit.jinja2",
)
def do_edit_order(context, request):
""" process the edit order form """
"""process the edit order form"""
autocorrect_url = request.resource_url(context.__parent__, "vendor")
form = orders.EditOrderSchema.as_form(
request, autocorrect_url=autocorrect_url
@ -471,7 +471,7 @@ def reorder(context, request): @@ -471,7 +471,7 @@ def reorder(context, request):
renderer="ordr3:templates/orders/reorder.jinja2",
)
def place_reorder(context, request):
""" process the reorder form """
"""process the reorder form"""
autocorrect_url = request.resource_url(context.__parent__, "vendor")
form = orders.AddOrderSchema.as_form(
request, autocorrect_url=autocorrect_url

1
pyproject.toml

@ -55,6 +55,7 @@ test = [ @@ -55,6 +55,7 @@ test = [
"pytest-mock",
"pytest-randomly",
"tox",
"webtest",
]
dev = [
"black",

4
tests/functional/conftest.py

@ -58,9 +58,10 @@ def _example_data(_sqlite_repo): @@ -58,9 +58,10 @@ def _example_data(_sqlite_repo):
from ordr3 import models, security
today = datetime.utcnow()
crypt_context = security.get_passlib_context()
nested = _sqlite_repo.session.begin_nested()
user = models.User(
1,
"TestUser",
@ -198,6 +199,7 @@ def _example_data(_sqlite_repo): @@ -198,6 +199,7 @@ def _example_data(_sqlite_repo):
_sqlite_repo.session.add(models.Vendor("merck", "Merck"))
_sqlite_repo.session.add(models.Vendor("merk", "Merck"))
nested.commit()
_sqlite_repo.session.flush()

21
tests/functional/test_login.py

@ -1,5 +1,9 @@ @@ -1,5 +1,9 @@
import pytest
@pytest.mark.fun
def test_login_ok(testapp):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
form = response.form
@ -9,8 +13,9 @@ def test_login_ok(testapp): @@ -9,8 +13,9 @@ def test_login_ok(testapp):
assert "My Orders" in response
@pytest.mark.fun
def test_login_wrong_username(testapp):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
form = response.form
@ -20,8 +25,9 @@ def test_login_wrong_username(testapp): @@ -20,8 +25,9 @@ def test_login_wrong_username(testapp):
assert "Credentials are invalid" in response
@pytest.mark.fun
def test_login_wrong_password(testapp):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
form = response.form
@ -31,8 +37,9 @@ def test_login_wrong_password(testapp): @@ -31,8 +37,9 @@ def test_login_wrong_password(testapp):
assert "Credentials are invalid" in response
@pytest.mark.fun
def test_login_fails_inactive_user(testapp):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
form = response.form
@ -42,8 +49,9 @@ def test_login_fails_inactive_user(testapp): @@ -42,8 +49,9 @@ def test_login_fails_inactive_user(testapp):
assert "Credentials are invalid" in response
@pytest.mark.fun
def test_logout(testapp):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
form = response.form
@ -52,10 +60,11 @@ def test_logout(testapp): @@ -52,10 +60,11 @@ def test_logout(testapp):
response = form.submit("submit").follow()
assert "My Orders" in response
response = testapp.get("/logout", status=302).follow(status=200)
response = testapp.get("/logout", status=302).follow()
assert "Please Log In" in response
@pytest.mark.fun
def test_breached_faq(testapp):
response = testapp.get("/breached")
assert "haveibeenpwned" in response

23
tests/functional/test_my_account.py

@ -1,8 +1,12 @@ @@ -1,8 +1,12 @@
import pytest
@pytest.mark.fun
def test_my_account_edit(testapp, login_as):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestUser", "jon").follow(status=200)
response = login_as("TestUser", "jon").follow()
assert "My Orders" in response
response = testapp.get("/myaccount")
@ -23,11 +27,12 @@ def test_my_account_edit(testapp, login_as): @@ -23,11 +27,12 @@ def test_my_account_edit(testapp, login_as):
assert "terry@example.com" in response
@pytest.mark.fun
def test_my_account_edit_cancel(testapp, login_as):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestUser", "jon").follow(status=200)
response = login_as("TestUser", "jon").follow()
assert "My Orders" in response
response = testapp.get("/myaccount")
@ -48,11 +53,12 @@ def test_my_account_edit_cancel(testapp, login_as): @@ -48,11 +53,12 @@ def test_my_account_edit_cancel(testapp, login_as):
assert "terry@example.com" not in response
@pytest.mark.fun
def test_my_account_edit_form_error(testapp, login_as):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestUser", "jon").follow(status=200)
response = login_as("TestUser", "jon").follow()
assert "My Orders" in response
response = testapp.get("/myaccount")
@ -64,11 +70,12 @@ def test_my_account_edit_form_error(testapp, login_as): @@ -64,11 +70,12 @@ def test_my_account_edit_form_error(testapp, login_as):
assert "There was a problem with your submission" in response
@pytest.mark.fun
def test_my_account_reset_password(testapp, login_as, parse_latest_mail):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestUser", "jon").follow(status=200)
response = login_as("TestUser", "jon").follow()
assert "My Orders" in response
response = testapp.get("/myaccount")

75
tests/functional/test_order.py

@ -1,8 +1,12 @@ @@ -1,8 +1,12 @@
import pytest
@pytest.mark.fun
def test_view_order(testapp, login_as):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/orders/3/view/")
@ -25,11 +29,12 @@ def test_view_order(testapp, login_as): @@ -25,11 +29,12 @@ def test_view_order(testapp, login_as):
assert "TestUser" in response
@pytest.mark.fun
def test_add_order_ok(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
assert contains(
response,
@ -79,11 +84,12 @@ def test_add_order_ok(testapp, login_as, contains): @@ -79,11 +84,12 @@ def test_add_order_ok(testapp, login_as, contains):
assert "28.35" in response
@pytest.mark.fun
def test_add_order_validation_error(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
assert contains(
response,
@ -110,11 +116,12 @@ def test_add_order_validation_error(testapp, login_as, contains): @@ -110,11 +116,12 @@ def test_add_order_validation_error(testapp, login_as, contains):
)
@pytest.mark.fun
def test_add_order_cancel(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
assert contains(
response,
@ -153,11 +160,12 @@ def test_add_order_cancel(testapp, login_as, contains): @@ -153,11 +160,12 @@ def test_add_order_cancel(testapp, login_as, contains):
)
@pytest.mark.fun
def test_edit_order_ok(testapp, login_as, contains, parse_latest_mail):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
assert contains(
response,
@ -214,11 +222,12 @@ def test_edit_order_ok(testapp, login_as, contains, parse_latest_mail): @@ -214,11 +222,12 @@ def test_edit_order_ok(testapp, login_as, contains, parse_latest_mail):
assert "- new status: Hold" in parsed.body
@pytest.mark.fun
def test_edit_order_form_error(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/orders/3/edit/")
@ -251,11 +260,12 @@ def test_edit_order_form_error(testapp, login_as, contains): @@ -251,11 +260,12 @@ def test_edit_order_form_error(testapp, login_as, contains):
)
@pytest.mark.fun
def test_edit_order_cancel(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/orders/3/edit/")
@ -286,11 +296,12 @@ def test_edit_order_cancel(testapp, login_as, contains): @@ -286,11 +296,12 @@ def test_edit_order_cancel(testapp, login_as, contains):
)
@pytest.mark.fun
def test_edit_order_purchaser_vs_user(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/orders/4/edit/")
@ -298,7 +309,7 @@ def test_edit_order_purchaser_vs_user(testapp, login_as, contains): @@ -298,7 +309,7 @@ def test_edit_order_purchaser_vs_user(testapp, login_as, contains):
status = soup.find("select", {"id": "deformField3"})
assert not status.has_attr("readonly")
response = login_as("TestUser", "jon").follow(status=200)
response = login_as("TestUser", "jon").follow()
assert "My Orders" in response
response = testapp.get("/orders/4/edit/")
@ -307,11 +318,12 @@ def test_edit_order_purchaser_vs_user(testapp, login_as, contains): @@ -307,11 +318,12 @@ def test_edit_order_purchaser_vs_user(testapp, login_as, contains):
assert status.has_attr("readonly")
@pytest.mark.fun
def test_delete_order_ok(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
assert contains(response, Eppis=True, Ethanol=True, NaCl=True)
# don't check for "Spritzen", the term will apear in the flash message
@ -332,11 +344,12 @@ def test_delete_order_ok(testapp, login_as, contains): @@ -332,11 +344,12 @@ def test_delete_order_ok(testapp, login_as, contains):
assert "/orders/4/edit" not in response
@pytest.mark.fun
def test_delete_order_cancel(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
assert contains(response, Eppis=True, Ethanol=True, NaCl=True)
# don't check for "Spritzen", the term will apear in the flash message
@ -357,11 +370,12 @@ def test_delete_order_cancel(testapp, login_as, contains): @@ -357,11 +370,12 @@ def test_delete_order_cancel(testapp, login_as, contains):
assert "/orders/4/edit" in response
@pytest.mark.fun
def test_delete_order_no_confirm(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
assert contains(response, Eppis=True, Ethanol=True, NaCl=True)
# don't check for "Spritzen", the term will apear in the flash message
@ -382,11 +396,12 @@ def test_delete_order_no_confirm(testapp, login_as, contains): @@ -382,11 +396,12 @@ def test_delete_order_no_confirm(testapp, login_as, contains):
assert "/orders/4/edit" in response
@pytest.mark.fun
def test_reorder_ok(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
assert "1138,50" not in response
assert "/orders/5/edit" not in response
@ -412,11 +427,12 @@ def test_reorder_ok(testapp, login_as, contains): @@ -412,11 +427,12 @@ def test_reorder_ok(testapp, login_as, contains):
)
@pytest.mark.fun
def test_reorder_cancel(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
assert "1138,50" not in response
assert "/orders/5/edit" not in response
@ -437,11 +453,12 @@ def test_reorder_cancel(testapp, login_as, contains): @@ -437,11 +453,12 @@ def test_reorder_cancel(testapp, login_as, contains):
)
@pytest.mark.fun
def test_reorder_form_error(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
assert "1138,50" not in response
assert "/orders/5/edit" not in response

47
tests/functional/test_order_list.py

@ -1,8 +1,12 @@ @@ -1,8 +1,12 @@
import pytest
@pytest.mark.fun
def test_order_list(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
assert contains(
response, Eppis=True, Ethanol=True, NaCl=True, Spritzen=True
@ -53,11 +57,12 @@ def test_order_list(testapp, login_as, contains): @@ -53,11 +57,12 @@ def test_order_list(testapp, login_as, contains):
)
@pytest.mark.fun
def test_multi_edit_ok(testapp, login_as, parse_latest_mail, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
form = response.forms[1]
@ -87,28 +92,30 @@ def test_multi_edit_ok(testapp, login_as, parse_latest_mail, contains): @@ -87,28 +92,30 @@ def test_multi_edit_ok(testapp, login_as, parse_latest_mail, contains):
assert "- new status: Hold" in parsed.body
@pytest.mark.fun
def test_multi_edit_no_orders_selected(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
form = response.forms[1]
form.action = "/orders/batch-edit/"
response = form.submit().follow(status=200)
response = form.submit().follow()
assert "My Orders" in response
assert contains(
response, Eppis=True, Ethanol=True, NaCl=True, Spritzen=True
)
@pytest.mark.fun
def test_multi_edit_cancel(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
form = response.forms[1]
@ -144,11 +151,12 @@ def test_multi_edit_cancel(testapp, login_as, contains): @@ -144,11 +151,12 @@ def test_multi_edit_cancel(testapp, login_as, contains):
)
@pytest.mark.fun
def test_multi_delete_ok(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
form = response.forms[1]
@ -173,11 +181,12 @@ def test_multi_delete_ok(testapp, login_as, contains): @@ -173,11 +181,12 @@ def test_multi_delete_ok(testapp, login_as, contains):
)
@pytest.mark.fun
def test_multi_delete_no_orders(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
form = response.forms[1]
@ -186,18 +195,19 @@ def test_multi_delete_no_orders(testapp, login_as, contains): @@ -186,18 +195,19 @@ def test_multi_delete_no_orders(testapp, login_as, contains):
select_checkboxes[0].checked = True
select_checkboxes[1].checked = True
response = form.submit().follow(status=200)
response = form.submit().follow()
assert "My Orders" in response
assert contains(
response, Eppis=True, Ethanol=True, NaCl=True, Spritzen=True
)
@pytest.mark.fun
def test_multi_delete_no_confirm(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
form = response.forms[1]
@ -222,11 +232,12 @@ def test_multi_delete_no_confirm(testapp, login_as, contains): @@ -222,11 +232,12 @@ def test_multi_delete_no_confirm(testapp, login_as, contains):
)
@pytest.mark.fun
def test_multi_delete_cancel(testapp, login_as, contains):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
form = response.forms[1]

40
tests/functional/test_password_reset.py

@ -1,5 +1,9 @@ @@ -1,5 +1,9 @@
import pytest
@pytest.mark.fun
def test_password_reset(testapp, parse_latest_mail):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
form = response.form
@ -27,14 +31,14 @@ def test_password_reset(testapp, parse_latest_mail): @@ -27,14 +31,14 @@ def test_password_reset(testapp, parse_latest_mail):
response = form.submit("Reset_Password").follow()
assert "You changed your Password." in response
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
form = response.form
form["username"] = "TestAdmin"
form["password"] = "jane"
response = form.submit("Log In")
assert "Credentials are invalid" in response
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
form = response.form
form["username"] = "TestAdmin"
form["password"] = "jixx"
@ -42,8 +46,9 @@ def test_password_reset(testapp, parse_latest_mail): @@ -42,8 +46,9 @@ def test_password_reset(testapp, parse_latest_mail):
assert "My Orders" in response
@pytest.mark.fun
def test_password_cancel_forgot_password(testapp):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = testapp.get("/forgot", status=200)
@ -55,10 +60,11 @@ def test_password_cancel_forgot_password(testapp): @@ -55,10 +60,11 @@ def test_password_cancel_forgot_password(testapp):
assert "Please Log In" in response
@pytest.mark.fun
def test_password_reset_user_or_email_not_found(testapp):
from pyramid_mailer import get_mailer
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = testapp.get("/forgot", status=200)
@ -74,8 +80,9 @@ def test_password_reset_user_or_email_not_found(testapp): @@ -74,8 +80,9 @@ def test_password_reset_user_or_email_not_found(testapp):
assert len(mailer.outbox) == 0
@pytest.mark.fun
def test_password_reset_cancel_after_token(testapp, parse_latest_mail):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = testapp.get("/forgot", status=200)
@ -94,10 +101,10 @@ def test_password_reset_cancel_after_token(testapp, parse_latest_mail): @@ -94,10 +101,10 @@ def test_password_reset_cancel_after_token(testapp, parse_latest_mail):
form = response.form
form["new_password"] = "jixx"
response = form.submit("Cancel").follow(status=302).follow(status=200)
response = form.submit("Cancel").follow(status=302).follow()
assert "Please Log In" in response
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
form = response.form
form["username"] = "TestAdmin"
form["password"] = "jane"
@ -105,8 +112,9 @@ def test_password_reset_cancel_after_token(testapp, parse_latest_mail): @@ -105,8 +112,9 @@ def test_password_reset_cancel_after_token(testapp, parse_latest_mail):
assert "My Orders" in response
@pytest.mark.fun
def test_password_reset_empty_password(testapp, parse_latest_mail):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = testapp.get("/forgot", status=200)
@ -129,18 +137,18 @@ def test_password_reset_empty_password(testapp, parse_latest_mail): @@ -129,18 +137,18 @@ def test_password_reset_empty_password(testapp, parse_latest_mail):
assert "There was a problem with your submission" in response
@pytest.mark.fun
def test_password_reset_invalid_token(testapp):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = (
testapp.get("/reset?t=invalid").follow(status=302).follow(status=200)
)
response = testapp.get("/reset?t=invalid").follow(status=302).follow()
assert "Please Log In" in response
@pytest.mark.fun
def test_password_reset_form_invalid_token(testapp, parse_latest_mail):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = testapp.get("/forgot", status=200)
@ -160,9 +168,7 @@ def test_password_reset_form_invalid_token(testapp, parse_latest_mail): @@ -160,9 +168,7 @@ def test_password_reset_form_invalid_token(testapp, parse_latest_mail):
form = response.form
form.action = "/reset?t=invalid"
form["new_password"] = "jixx"
response = (
form.submit("Reset_Password").follow(status=302).follow(status=200)
)
response = form.submit("Reset_Password").follow(status=302).follow()
assert "Please Log In" in response
form = response.form

25
tests/functional/test_registration.py

@ -1,5 +1,9 @@ @@ -1,5 +1,9 @@
import pytest
@pytest.mark.fun
def test_registration_procedure(testapp, login_as, parse_latest_mail):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = testapp.get("/registration", status=200)
@ -39,14 +43,15 @@ def test_registration_procedure(testapp, login_as, parse_latest_mail): @@ -39,14 +43,15 @@ def test_registration_procedure(testapp, login_as, parse_latest_mail):
parsed = parse_latest_mail()
assert "Your account was activated" in parsed.body
response = login_as("TestNew", "eric").follow(status=200)
response = login_as("TestNew", "eric").follow()
assert "My Orders" in response
@pytest.mark.fun
def test_registration_procedure_form_error(
testapp, login_as, parse_latest_mail
):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = testapp.get("/registration", status=200)
@ -57,10 +62,11 @@ def test_registration_procedure_form_error( @@ -57,10 +62,11 @@ def test_registration_procedure_form_error(
assert "There was a problem with your submission" in response
@pytest.mark.fun
def test_registration_procedure_not_unique_username(
testapp, login_as, parse_latest_mail
):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = testapp.get("/registration", status=200)
@ -77,10 +83,11 @@ def test_registration_procedure_not_unique_username( @@ -77,10 +83,11 @@ def test_registration_procedure_not_unique_username(
assert "There was a problem with your submission" in response
@pytest.mark.fun
def test_registration_procedure_not_unique_email(
testapp, login_as, parse_latest_mail
):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = testapp.get("/registration", status=200)
@ -97,10 +104,11 @@ def test_registration_procedure_not_unique_email( @@ -97,10 +104,11 @@ def test_registration_procedure_not_unique_email(
assert "There was a problem with your submission" in response
@pytest.mark.fun
def test_registration_procedure_bad_csrf_token(
testapp, login_as, parse_latest_mail
):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = testapp.get("/registration", status=200)
@ -117,8 +125,9 @@ def test_registration_procedure_bad_csrf_token( @@ -117,8 +125,9 @@ def test_registration_procedure_bad_csrf_token(
form.submit("Create_Account", status=400)
@pytest.mark.fun
def test_registration_procedure_canceled(testapp, login_as, parse_latest_mail):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = testapp.get("/registration", status=200)
@ -130,5 +139,5 @@ def test_registration_procedure_canceled(testapp, login_as, parse_latest_mail): @@ -130,5 +139,5 @@ def test_registration_procedure_canceled(testapp, login_as, parse_latest_mail):
form["last_name"] = "Idle"
form["email"] = "eric@example.com"
form["password"] = "eric"
response = form.submit("Cancel").follow(status=302).follow(status=200)
response = form.submit("Cancel").follow(status=302).follow()
assert "Please Log In" in response

43
tests/functional/test_user_edit.py

@ -1,8 +1,12 @@ @@ -1,8 +1,12 @@
import pytest
@pytest.mark.fun
def test_user_edit(testapp, login_as):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/users/TestUser/edit")
@ -21,11 +25,12 @@ def test_user_edit(testapp, login_as): @@ -21,11 +25,12 @@ def test_user_edit(testapp, login_as):
assert "terry@example.com" in response
@pytest.mark.fun
def test_user_edit_cancel(testapp, login_as):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/users/TestUser/edit")
@ -44,11 +49,12 @@ def test_user_edit_cancel(testapp, login_as): @@ -44,11 +49,12 @@ def test_user_edit_cancel(testapp, login_as):
assert "terry@example.com" not in response
@pytest.mark.fun
def test_user_edit_form_error(testapp, login_as):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/users/TestUser/edit")
@ -58,22 +64,24 @@ def test_user_edit_form_error(testapp, login_as): @@ -58,22 +64,24 @@ def test_user_edit_form_error(testapp, login_as):
assert "There was a problem with your submission" in response
@pytest.mark.fun
def test_user_edit_invalid_user(testapp, login_as):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/users/Unknown/edit").follow()
assert "My Orders" in response
@pytest.mark.fun
def test_user_edit_reset_password(testapp, login_as, parse_latest_mail):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/users/TestUser/edit")
@ -91,11 +99,12 @@ def test_user_edit_reset_password(testapp, login_as, parse_latest_mail): @@ -91,11 +99,12 @@ def test_user_edit_reset_password(testapp, login_as, parse_latest_mail):
assert parsed.link.startswith("http://localhost/reset?t=")
@pytest.mark.fun
def test_user_delete(testapp, login_as, parse_latest_mail):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/users/TestInactive/edit")
@ -113,11 +122,12 @@ def test_user_delete(testapp, login_as, parse_latest_mail): @@ -113,11 +122,12 @@ def test_user_delete(testapp, login_as, parse_latest_mail):
assert "TestInactive" not in response
@pytest.mark.fun
def test_user_delete_cancel(testapp, login_as, parse_latest_mail):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/users/TestInactive/edit")
@ -135,11 +145,12 @@ def test_user_delete_cancel(testapp, login_as, parse_latest_mail): @@ -135,11 +145,12 @@ def test_user_delete_cancel(testapp, login_as, parse_latest_mail):
assert "TestInactive" in response
@pytest.mark.fun
def test_user_delete_no_confirm(testapp, login_as, parse_latest_mail):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/users/TestInactive/edit")

30
tests/functional/test_vendors.py

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
import pytest
@pytest.mark.fun
@pytest.mark.parametrize(
"vendor,returned,found",
[
@ -16,21 +17,22 @@ import pytest @@ -16,21 +17,22 @@ import pytest
],
)
def test_check_vendor_name(testapp, login_as, vendor, returned, found):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.post("/orders/vendor", {"vendor": vendor}, xhr=True)
assert response.json == {"name": returned, "found": found}
@pytest.mark.fun
def test_vendor_list(testapp, login_as):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/vendors")
@ -40,11 +42,12 @@ def test_vendor_list(testapp, login_as): @@ -40,11 +42,12 @@ def test_vendor_list(testapp, login_as):
assert "Merck" in response
@pytest.mark.fun
def test_vendor_edit_ok(testapp, login_as):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/vendors")
@ -79,11 +82,12 @@ def test_vendor_edit_ok(testapp, login_as): @@ -79,11 +82,12 @@ def test_vendor_edit_ok(testapp, login_as):
assert response.json == {"name": "vr", "found": False}
@pytest.mark.fun
def test_vendor_edit_cancel(testapp, login_as):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/vendors")
@ -112,11 +116,12 @@ def test_vendor_edit_cancel(testapp, login_as): @@ -112,11 +116,12 @@ def test_vendor_edit_cancel(testapp, login_as):
assert set(terms) == {"merck", "merk"}
@pytest.mark.fun
def test_vendor_edit_form_error(testapp, login_as):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/vendors")
@ -142,11 +147,12 @@ def test_vendor_edit_form_error(testapp, login_as): @@ -142,11 +147,12 @@ def test_vendor_edit_form_error(testapp, login_as):
assert "Merck" in response
@pytest.mark.fun
def test_vendor_edit_unknonw_vendor(testapp, login_as):
response = testapp.get("/", status=302).follow(status=200)
response = testapp.get("/", status=302).follow()
assert "Please Log In" in response
response = login_as("TestAdmin", "jane").follow(status=200)
response = login_as("TestAdmin", "jane").follow()
assert "My Orders" in response
response = testapp.get("/vendors")

34
tests/test_services.py

@ -7,7 +7,7 @@ from ordr3.models import Vendor, VendorAggregate @@ -7,7 +7,7 @@ from ordr3.models import Vendor, VendorAggregate
class FakeOrderRepository(AbstractOrderRepository):
""" Repository implementation for testing """
"""Repository implementation for testing"""
def __init__(self, session):
self._orders = set()
@ -16,59 +16,59 @@ class FakeOrderRepository(AbstractOrderRepository): @@ -16,59 +16,59 @@ class FakeOrderRepository(AbstractOrderRepository):
self._tokens = set()
def add_order(self, order):
""" add an order to the datastore """
"""add an order to the datastore"""
self._orders.add(order)
def get_order(self, reference):
""" retrieve an order from the datastore """
"""retrieve an order from the datastore"""
return next(o for o in self._orders if o.id == reference)
def delete_order(self, order):
""" removes a user from the datastore """
"""removes a user from the datastore"""
self._orders.remove(order)
def list_consumable_candidates(self, limit_date, statuses):
""" list orders, sorted by descending creation date """
"""list orders, sorted by descending creation date"""
newer_orders = (o for o in self._orders if o.created_on > limit_date)
valid_orders = (o for o in newer_orders if o.status in statuses)
return sorted(valid_orders, reverse=True, key=lambda x: x.created_on)
def add_user(self, user):
""" add a user to the datastore """
"""add a user to the datastore"""
self._users.add(user)
def delete_user(self, user):
""" removes a user from the datastore """
"""removes a user from the datastore"""
self._users.remove(user)
def get_user(self, reference):
""" retrieve a user from the datastore """
"""retrieve a user from the datastore"""
return next(o for o in self._users if o.id == reference)
def get_user_by_username(self, reference):
""" retrieve a user from the datastore by username """
"""retrieve a user from the datastore by username"""
return next(o for o in self._users if o.username == reference)
def get_user_by_email(self, reference):
""" retrieve a user from the datastore by email """
"""retrieve a user from the datastore by email"""
return next(o for o in self._users if o.email == reference)
def search_vendor(self, reference):
""" search for a vendor by a canonical search term """
"""search for a vendor by a canonical search term"""
try:
return next(v for v in self._vendors if v.term == reference)
except StopIteration:
return None
def get_vendor_aggregates(self, reference):
""" list a all canonical names of vendors """
"""list a all canonical names of vendors"""
vendors = [v for v in self._vendors if v.name == reference]
terms = sorted((v.term for v in vendors), key=lambda x: x.lower())
first_vendor = next(iter(vendors))
return VendorAggregate(first_vendor.name, terms)
def update_vendors(self, old_vendor, new_name, new_terms):
""" update autocorrect values of vendors """
"""update autocorrect values of vendors"""
vendors_to_delete = {
v for v in self._vendors if v.name == old_vendor.name
}
@ -78,19 +78,19 @@ class FakeOrderRepository(AbstractOrderRepository): @@ -78,19 +78,19 @@ class FakeOrderRepository(AbstractOrderRepository):
self._vendors.add(Vendor(new_term, new_name))
def add_reset_token(self, token):
""" add an password reset token """
"""add an password reset token"""
self._tokens.add(token)
def delete_reset_token(self, token):
""" deletes a password reset token """
"""deletes a password reset token"""
self._tokens.remove(token)
def get_reset_token(self, reference):
""" add an password reset token """
"""add an password reset token"""
return next(t for t in self._tokens if t.token == reference)
def clear_stale_reset_tokens(self):
""" removes invalid reset tokens """
"""removes invalid reset tokens"""
now = datetime.utcnow()
self._tokens = {t for t in self._tokens if t.valid_until > now}

Loading…
Cancel
Save