Holger Frey
7 years ago
6 changed files with 309 additions and 0 deletions
@ -0,0 +1,54 @@ |
|||||||
|
''' Schemas (sub) package, for form rendering and validation ''' |
||||||
|
|
||||||
|
import colander |
||||||
|
import deform |
||||||
|
|
||||||
|
from deform.renderer import configure_zpt_renderer |
||||||
|
|
||||||
|
from .helpers import ( |
||||||
|
deferred_csrf_default, |
||||||
|
deferred_csrf_validator |
||||||
|
) |
||||||
|
|
||||||
|
# Base Schema |
||||||
|
|
||||||
|
class CSRFSchema(colander.Schema): |
||||||
|
''' base class for schemas with csrf validation ''' |
||||||
|
|
||||||
|
csrf_token = colander.SchemaNode( |
||||||
|
colander.String(), |
||||||
|
default=deferred_csrf_default, |
||||||
|
validator=deferred_csrf_validator, |
||||||
|
widget=deform.widget.HiddenWidget(), |
||||||
|
) |
||||||
|
|
||||||
|
@classmethod |
||||||
|
def as_form(cls, request, **kwargs): |
||||||
|
''' returns the schema as a form |
||||||
|
|
||||||
|
:param request: |
||||||
|
the current request |
||||||
|
:type request: |
||||||
|
pyramid.request.Request |
||||||
|
:param kwargs: |
||||||
|
additional parameters for the form rendering. |
||||||
|
url is not set, the current context and view name will be used to |
||||||
|
constuct a url for the form |
||||||
|
''' |
||||||
|
url = kwargs.pop('url', None) |
||||||
|
if not url: |
||||||
|
url = request.resource_url(request.context, request.view_name) |
||||||
|
schema = cls().bind(request=request) |
||||||
|
form = deform.Form(schema, action=url, **kwargs) |
||||||
|
return form |
||||||
|
|
||||||
|
|
||||||
|
def includeme(config): |
||||||
|
''' |
||||||
|
Initialize the form schemas |
||||||
|
|
||||||
|
Activate this setup using ``config.include('ordr2.schemas')``. |
||||||
|
|
||||||
|
''' |
||||||
|
# Make Deform widgets aware of our widget template paths |
||||||
|
configure_zpt_renderer(['ordr2:templates/deform']) |
@ -0,0 +1,64 @@ |
|||||||
|
''' helper functions for schemas ''' |
||||||
|
|
||||||
|
import colander |
||||||
|
import deform |
||||||
|
|
||||||
|
from pyramid.csrf import get_csrf_token, check_csrf_token |
||||||
|
|
||||||
|
from ordr2.models import User |
||||||
|
|
||||||
|
|
||||||
|
@colander.deferred |
||||||
|
def deferred_csrf_default(node, kw): |
||||||
|
''' sets the current csrf token ''' |
||||||
|
request = kw.get('request') |
||||||
|
return get_csrf_token(request) |
||||||
|
|
||||||
|
|
||||||
|
@colander.deferred |
||||||
|
def deferred_csrf_validator(node, kw): |
||||||
|
''' validates a submitted csrf token ''' |
||||||
|
def validate_csrf(node, value): |
||||||
|
request = kw.get('request') |
||||||
|
if not check_csrf_token(request, raises=False): |
||||||
|
raise colander.Invalid(node, 'Bad CSRF token') |
||||||
|
return validate_csrf |
||||||
|
|
||||||
|
|
||||||
|
@colander.deferred |
||||||
|
def deferred_unique_username_validator(node, kw): |
||||||
|
''' checks if an username is not registered already ''' |
||||||
|
|
||||||
|
def validate_unique_username(node, value): |
||||||
|
request = kw.get('request') |
||||||
|
user = request.dbsession.query(User).filter_by(username=value).first() |
||||||
|
if user is not None: |
||||||
|
raise colander.Invalid(node, 'User name already registered') |
||||||
|
return validate_unique_username |
||||||
|
|
||||||
|
|
||||||
|
@colander.deferred |
||||||
|
def deferred_unique_email_validator(node, kw): |
||||||
|
''' checks if an email is not registered already ''' |
||||||
|
email_validator = colander.Email() |
||||||
|
|
||||||
|
def validate_unique_email(node, value): |
||||||
|
email_validator(node, value) # raises exception on invalid address |
||||||
|
request = kw.get('request') |
||||||
|
user = request.dbsession.query(User).filter_by(email=value).first() |
||||||
|
if user not in (None, request.context.model): |
||||||
|
# allow existing email addresses if |
||||||
|
# it belongs to the user that is currently edited |
||||||
|
raise colander.Invalid(node, 'Email address in use') |
||||||
|
return validate_unique_email |
||||||
|
|
||||||
|
|
||||||
|
@colander.deferred |
||||||
|
def deferred_password_validator(node, kw): |
||||||
|
''' checks password confirmation for settings ''' |
||||||
|
|
||||||
|
def validate_password_confirmation(node, value): |
||||||
|
request = kw.get('request') |
||||||
|
if request.user is None or not request.user.check_password(value): |
||||||
|
raise colander.Invalid(node, 'Wrong password') |
||||||
|
return validate_password_confirmation |
@ -0,0 +1,31 @@ |
|||||||
|
''' Test package for ordr2.schemas ''' |
||||||
|
|
||||||
|
from pyramid.testing import DummyRequest, DummyResource |
||||||
|
|
||||||
|
from .. import app_config |
||||||
|
|
||||||
|
|
||||||
|
def test_csrf_schema_form_with_custom_url(app_config): |
||||||
|
''' test for creation with custom url ''' |
||||||
|
from ordr2.schemas import CSRFSchema |
||||||
|
|
||||||
|
request = DummyRequest() |
||||||
|
form = CSRFSchema.as_form(request, url='/Nudge/Nudge') |
||||||
|
|
||||||
|
assert form.action == '/Nudge/Nudge' |
||||||
|
assert form.buttons == [] |
||||||
|
|
||||||
|
|
||||||
|
def test_csrf_schema_form_with_automatic_url(app_config): |
||||||
|
''' test for creation with custom url ''' |
||||||
|
from deform.form import Button |
||||||
|
from ordr2.schemas import CSRFSchema |
||||||
|
root = DummyResource() |
||||||
|
context = DummyResource('Crunchy', root) |
||||||
|
|
||||||
|
request = DummyRequest(context=context, view_name='Frog') |
||||||
|
form = CSRFSchema.as_form(request, buttons=['submit']) |
||||||
|
|
||||||
|
assert 'http://example.com/Crunchy/Frog' == form.action |
||||||
|
assert len(form.buttons) == 1 |
||||||
|
assert form.buttons[0].type == 'submit' |
@ -0,0 +1,158 @@ |
|||||||
|
''' Tests for ordr2.schemas.helpers ''' |
||||||
|
|
||||||
|
import pytest |
||||||
|
|
||||||
|
from pyramid.testing import DummyRequest, DummyResource |
||||||
|
|
||||||
|
from .. import app_config, dbsession, get_user |
||||||
|
|
||||||
|
|
||||||
|
def test_deferred_csrf_default(app_config): |
||||||
|
''' deferred_csrf_default should return a csrf token ''' |
||||||
|
from ordr2.schemas.helpers import deferred_csrf_default |
||||||
|
from pyramid.csrf import get_csrf_token |
||||||
|
|
||||||
|
request = DummyRequest() |
||||||
|
token = deferred_csrf_default(None, {'request': request}) |
||||||
|
|
||||||
|
assert token == get_csrf_token(request) |
||||||
|
|
||||||
|
|
||||||
|
def test_deferred_csrf_validator_ok(app_config): |
||||||
|
''' test deferred_csrf_validator with valid csrf token ''' |
||||||
|
from ordr2.schemas.helpers import deferred_csrf_validator |
||||||
|
from pyramid.csrf import get_csrf_token |
||||||
|
|
||||||
|
request = DummyRequest() |
||||||
|
token = get_csrf_token(request) |
||||||
|
request.POST = {'csrf_token': token} |
||||||
|
validation_func = deferred_csrf_validator(None, {'request': request}) |
||||||
|
|
||||||
|
assert validation_func(None, None) is None |
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('post', [{}, {'csrf_token': 'Albatross!'}]) |
||||||
|
def test_deferred_csrf_validator_fails_on_no_csrf_token(app_config, post): |
||||||
|
''' test deferred_csrf_validator with invalid or missing csrf token ''' |
||||||
|
from ordr2.schemas.helpers import deferred_csrf_validator |
||||||
|
from colander import Invalid |
||||||
|
|
||||||
|
request = DummyRequest() |
||||||
|
request.POST = post |
||||||
|
validation_func = deferred_csrf_validator(None, {'request': request}) |
||||||
|
|
||||||
|
with pytest.raises(Invalid): |
||||||
|
assert validation_func(None, None) is None |
||||||
|
|
||||||
|
|
||||||
|
def test_deferred_unique_username_validator_ok(dbsession): |
||||||
|
''' unknown usernames should not raise an invalidation error ''' |
||||||
|
from ordr2.schemas.helpers import deferred_unique_username_validator |
||||||
|
|
||||||
|
request = DummyRequest(dbsession=dbsession) |
||||||
|
user = get_user('user') |
||||||
|
dbsession.add(user) |
||||||
|
validation_func = deferred_unique_username_validator( |
||||||
|
None, |
||||||
|
{'request': request} |
||||||
|
) |
||||||
|
|
||||||
|
assert validation_func(None, 'AnneElk') is None |
||||||
|
|
||||||
|
|
||||||
|
def test_deferred_unique_username_validator_fails(dbsession): |
||||||
|
''' known username should raise an invalidation error ''' |
||||||
|
from ordr2.schemas.helpers import deferred_unique_username_validator |
||||||
|
from colander import Invalid |
||||||
|
|
||||||
|
request = DummyRequest(dbsession=dbsession) |
||||||
|
user = get_user('user') |
||||||
|
dbsession.add(user) |
||||||
|
validation_func = deferred_unique_username_validator( |
||||||
|
None, |
||||||
|
{'request': request} |
||||||
|
) |
||||||
|
|
||||||
|
with pytest.raises(Invalid): |
||||||
|
assert validation_func(None, 'TerryGilliam') is None |
||||||
|
|
||||||
|
|
||||||
|
def test_deferred_unique_email_validator_ok(dbsession): |
||||||
|
''' unknown emails should not raise an invalidation error ''' |
||||||
|
from ordr2.schemas.helpers import deferred_unique_email_validator |
||||||
|
|
||||||
|
context = DummyResource(model=None) |
||||||
|
request = DummyRequest(dbsession=dbsession, context=context) |
||||||
|
user = get_user('user') |
||||||
|
dbsession.add(user) |
||||||
|
validation_func = deferred_unique_email_validator( |
||||||
|
None, |
||||||
|
{'request': request} |
||||||
|
) |
||||||
|
|
||||||
|
assert validation_func(None, 'elk@example.com') is None |
||||||
|
|
||||||
|
|
||||||
|
def test_deferred_unique_email_validator_ok_belongs_to_same_user(dbsession): |
||||||
|
''' known emails of a user might not raise an error |
||||||
|
|
||||||
|
if a user is edited and the mail address is not change, no invalidation |
||||||
|
error should be raised |
||||||
|
''' |
||||||
|
from ordr2.schemas.helpers import deferred_unique_email_validator |
||||||
|
|
||||||
|
user = get_user('user') |
||||||
|
context = DummyResource(model=user) |
||||||
|
request = DummyRequest(dbsession=dbsession, context=context) |
||||||
|
dbsession.add(user) |
||||||
|
validation_func = deferred_unique_email_validator( |
||||||
|
None, |
||||||
|
{'request': request} |
||||||
|
) |
||||||
|
|
||||||
|
assert validation_func(None, user.email) is None |
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize('email', ['', 'gilliam@example.com', 'malformed']) |
||||||
|
def test_deferred_unique_email_validator_fails(dbsession, email): |
||||||
|
''' known, empty or malformed emails should raise an invalidation error ''' |
||||||
|
from ordr2.schemas.helpers import deferred_unique_email_validator |
||||||
|
from colander import Invalid |
||||||
|
|
||||||
|
context = DummyResource(model=None) |
||||||
|
request = DummyRequest(dbsession=dbsession, context=context) |
||||||
|
user = get_user('user') |
||||||
|
dbsession.add(user) |
||||||
|
validation_func = deferred_unique_email_validator( |
||||||
|
None, |
||||||
|
{'request': request} |
||||||
|
) |
||||||
|
|
||||||
|
with pytest.raises(Invalid): |
||||||
|
assert validation_func(None, email) is None |
||||||
|
|
||||||
|
|
||||||
|
def test_deferred_password_validator_ok(app_config): |
||||||
|
''' correct password should not raise invalidation error ''' |
||||||
|
from ordr2.schemas.helpers import deferred_password_validator |
||||||
|
from pyramid.csrf import get_csrf_token |
||||||
|
|
||||||
|
user = get_user('user') |
||||||
|
request = DummyRequest(user=user) |
||||||
|
validation_func = deferred_password_validator(None, {'request': request}) |
||||||
|
|
||||||
|
assert validation_func(None, 'Terry') is None |
||||||
|
|
||||||
|
|
||||||
|
def test_deferred_password_validator_fails(app_config): |
||||||
|
''' incorrect password should raise invalidation error ''' |
||||||
|
from ordr2.schemas.helpers import deferred_password_validator |
||||||
|
from colander import Invalid |
||||||
|
|
||||||
|
user = get_user('user') |
||||||
|
request = DummyRequest(user=user) |
||||||
|
validation_func = deferred_password_validator(None, {'request': request}) |
||||||
|
|
||||||
|
with pytest.raises(Invalid): |
||||||
|
assert validation_func(None, 'Wrong Password') is None |
||||||
|
|
Reference in new issue