Browse Source

added BaseListResource and Pagination classes

master
Holger Frey 7 years ago
parent
commit
ab499958f0
  1. 356
      ordr2/resources/base.py
  2. 160
      tests/resources/base.py

356
ordr2/resources/base.py

@ -1,9 +1,10 @@
''' Base resource classes ''' ''' Base resource classes '''
from sqlalchemy import asc, desc
class BaseResource(object): class BaseResource(object):
''' ''' Base resouce class for location aware resources
Base resouce class for location aware resources
:param str name: :param str name:
url path segment that identifies this resource in its lineage url path segment that identifies this resource in its lineage
@ -58,3 +59,354 @@ class BaseResource(object):
''' '''
node_class = self.nodes[key] node_class = self.nodes[key]
return node_class(key, self) return node_class(key, self)
class Pagination(object):
''' calculates pagination information
:param int current: current page number
:param int count: total number of items
:param int items: number of items displayed per pages
:param int window_size:
size of pagination window
lets assume the current page is 10 and window size is 7
self.window = [7, 8, 9, 10, 11, 12, 13]
'''
def __init__(self, current, count, items, window_size):
''' calculates pagination information
:param int current: current page number
:param int count: total number of items
:param int items: number of items displayed per pages
:param int window_size:
size of pagination window
lets assume the current page is 10 and window size is 7
self.window = [7, 8, 9, 10, 11, 12, 13]
'''
self.count = count #: total number of items
self.items_per_page = items #: items displayed per page
self.first = None #: number of first page
self.current = None #: number of current (displayed) page
self.last = None #: number of last page
self.previous = None #: number of previous page
self.next = None #: number of next page
self.window = [] #: page window
if count > 0:
# only do the calculation if there are items to be paginated
self.calculate(current, window_size)
def calculate(self, current, window_size):
# calculate number of pages
pages = (self.count - 1) // self.items_per_page + 1
# set the number of the first and last page
self.first = 1
self.last = max(self.first, pages)
# set current, previous and next page
self.current = self.is_valid(current, default=self.first)
self.previous = self.is_valid(self.current - 1)
self.next = self.is_valid(self.current + 1)
# window calculations
# example: lets assume the current page is 10 and window size is 7
# self.window = [7, 8, 9, 10, 11, 12, 13]
half_window = window_size // 2
start = self.current - half_window
end = self.current + half_window
calculated_window = range(start, end + 1)
self.window = [p for p in calculated_window if self.is_valid(p)]
def is_valid(self, page, default=None):
''' checks if the given page is valid, returns default if not
:param int page: the page number to test
:param default: the value to return if the test fails
'''
if self.count and (self.first <= page <= self.last):
return page
return default
class BaseListResource(BaseResource):
''' Base resorce class for listings of other resources
The BaseListResource represents a view to a list of resources, like
products, users, etc. This includes pagination, sorting and filtering of
the resources.
Inherited classes must at least set one parameter and implement three
mehtods:
- ``child_resource_class``
- ``set_base_query()``
The base query is used for basic filtering that should be applied in
all cases; for example to filter out any products that are not in stock
or ariticles in draft mode. This query is also used when traversing
to child resources
- ``set_filtered_query()``
The filtered query extends the base query and applies filters for a
specific view, like show only blue cheeses. This query is used for
calulating pagination and sorting is applied when listing child
resources.
- ``get_sort_by_field(sort_by)``
add a sorting instruction to the query
'''
#: GET key for page number
get_key_page = 'p'
#: GET key for items per page
get_key_items_per_page = 'n'
#: GET key for order by column
get_key_sort_by = 'o'
#: GET key for order by direction
get_key_sort_dir = 'd'
#: default items per page
default_items_per_page = 12
#: default size of pagination window
default_window_size = 7
#: default sort direction
default_sort_dir = 'asc'
#: default sort by
default_sort_by = None
#: class of child resources
child_resource_class = None
def __init__(self, name, parent):
''' Instance creation '''
super().__init__(name, parent)
self.base_query = None #: base database query
self.filtered_query = None #: database query with reuest filters
self.filters = {} #: applied view filters
self.sort_by = None #: applied sorting filed
self.sort_dir = None #: applied sorting direction
self.pages = None #: pagination info
self.set_base_query()
self.set_filtered_query()
self.calculate_pagination()
def set_base_query(self):
''' setup of the basic database query
The base query is used for basic filtering that should be applied in
all cases; for example to filter out any products that are not in stock
or ariticles in draft mode. This query is also used when traversing
to child resources
This method must be implemented in a inherited class::
def set_base_query(self):
self.base_query = (
self.request.dbsession
.query(Cheeses)
.filter_by(in_stock==True)
)
'''
raise NotImplementedError
def set_filtered_query(self):
''' setup of the database query for a specific view
the filtered query extends the base query and applies filters for a
specific view, like show only blue cheeses. This query is used for
calulating pagination and sorting is applied when listing child
resources.
This method must be implemented in a inherited class::
def set_filtered_query(self):
query = self.base_query
# filter by type of cheese
query = self._filter_by('type', Cheese.type, query)
query = self._filter_by('age', Cheeses.age, query)
self.filtered_query = query
'''
raise NotImplementedError
def get_sort_by_field(self, sort_by):
''' returns the SQLalchemy model field to sort by
:param str sort_by:
a lowercase identifier for the field to sort by
:returns:
SQLalchemy model field or None
This method must be implemented in a inherited class::
def get_sort_by_field(self, sort_by):
if sort_by == 'type':
return Cheese.type
'''
raise NotImplementedError
def _filter_by(self, get_key, model_field, query):
''' helper method to appyl a simple filter to a query
it also records the filters applied to the view query in `self.filters`
:param str get_key:
name of the GET key to query
:param model_field:
field of SQLalchemy model the filter should be applied on
:param query:
SQLalchemy query object
:returns:
SQLalchemy query object with applied filter
'''
filter_value = self.request.GET.get(get_key, None)
if filter_value:
query = query.filter_by(model_field=filter_value)
self.active_filters[get_key] = filter_value
return query
def prepare_sorted_query(self, query):
''' add sorting to the query '''
# first the sorting information from request.GET is used
sort_by = self.request.GET.get(self.get_key_sort_by, None)
sort_field = self.get_sort_by_field(sort_by)
if sort_field:
direction = self.request.GET.get(self.get_key_sort_dir, 'asc')
direction = 'asc' if direction.lower() == 'asc' else 'desc'
sort_func = asc if direction == 'asc' else desc
query = query.order_by(sort_func(sort_field))
self.sort_by = sort_by
self.sort_dir = direction
# default sorting is applied if not already sorted by this field
if self.default_sort_by and self.sort_by != self.default_sort_by:
sort_field = self.get_sort_by_field(self.default_sort_by)
sort_func = asc if self.default_sort_dir == 'asc' else desc
query = query.order_by(sort_func(sort_field))
if not self.sort_by:
self.sort_by = self.default_sort_by
self.sort_dir = self.default_sort_dir
return query
def calculate_pagination(self):
''' calculates the pagination info '''
current_page = self._get_int(self.get_key_page, 1)
items_per_page = self._get_int(
self.get_key_items_per_page,
self.default_items_per_page
)
self.pages = Pagination(
current_page,
self.filtered_query.count(),
items_per_page,
self.default_window_size
)
def items(self):
''' returns the items of the current page as resources'''
if not self.pages.count:
return []
# add the sorting
query = self.prepare_sorted_query(self.filtered_query)
# add offset and limit limit
offset = (self.pages.current - 1) * self.pages.items_per_page
query = query.offset(offset).limit(self.pages.items_per_page)
# return a list of resources representing the items found by the query
return [
self._child_resource(item)
for item
in query.all()
]
def __getitem__(self, key):
''' returns a child resource representing a sqlalchemy model '''
model = self.base_query.get(key)
if model:
return self._child_resource(model)
return super().__getitem__(key)
def _child_resource(self, item):
''' helper function that returns an SQLalchemy model as resource '''
return self.child_resource_class(item.id, self, model=item)
def _get_int(self, key, default):
''' returns the value of GET[key] as integer or default
:param str key: the key for the value of request.GET
:param int default: the default value, returned if conversion fails
:rtype: int
'''
try:
return int(self.request.GET[key])
except (KeyError, ValueError, TypeError):
return default
def query_params(self, override=None):
''' query parameters for the active filters, sorting and page
:param dict override:
values that override the current filter, sorting or page settings
Example::
# current page, sorting, etc.
current = context.query_params()
current == {'p':1, 'n':12, 'o':'name', 'd':'asc', 'type':'brie'}
# next page with same sorting but type filter removed
next = context.query_params({'p':2, 'type': None})
next == {'p':2, 'n':12, 'o':'name', 'd':'asc'}
'''
params = {}
if self.pages.current:
params[self.get_key_page] = self.pages.current
params[self.get_key_items_per_page] = self.pages.items_per_page
if self.sort_by:
params[self.get_key_sort_by] = self.sort_by
params[self.get_key_sort_dir] = self.sort_dir
for key, value in self.active_filters.items():
params[key] = value
if override:
params.update(override)
return {k: v for k, v in params.items() if v is not None}
def resource_url(self, resource, *args, override=None, **kwargs):
''' resource url for a context with query parameters for current view
:param resource: resource or view name to generate the url for
:type resource: :class:`BaseResource` or ``str``
:param list args: elements for url construction
:param dict override: overriding query params, see ``query_params()``
if the resource provided is a string, the current instance is
prepended::
# this
context.resoure_url('edit')
# is the same as
current_params = context.query_params()
url = request.resource_url(context, 'edit', query=current_params)
'''
if isinstance(resource, str):
chain = [self, resource] + args
else:
chain = [resource] + args
kwargs['query'] = self.query_params(override)
return self.request.resource_url(*chain, **kwargs)

160
tests/resources/base.py

@ -1,8 +1,11 @@
''' Tests for ordr2.resources.base ''' ''' Tests for ordr2.resources.base '''
import pytest import pytest
from pyramid.testing import DummyRequest, DummyResource
# test for base resources
def test_base_resource_init(): def test_base_resource_init():
''' test __init__ function of base resource ''' ''' test __init__ function of base resource '''
from ordr2.resources import BaseResource, RootResource from ordr2.resources import BaseResource, RootResource
@ -50,3 +53,160 @@ def test_base_resource_getitem_raises_key_error():
with pytest.raises(KeyError): with pytest.raises(KeyError):
resource = root['unknown'] resource = root['unknown']
# tests for pagination class
def test_pagination_init():
''' test object creation of Pagination class with zero total items'''
from ordr2.resources.base import Pagination
pages = Pagination(1, 0, 25, 7)
assert pages.count == 0
assert pages.items_per_page == 25
assert pages.first is None
assert pages.current is None
assert pages.last is None
assert pages.previous is None
assert pages.next is None
assert pages.window == []
def test_pagination_init_auto_calc():
''' test object creation with more then zero total items'''
from ordr2.resources.base import Pagination
pages = Pagination(1, 1, 25, 7)
assert pages.count == 1
assert pages.items_per_page == 25
assert pages.first is not None
assert pages.current is not None
assert pages.last is not None
@pytest.mark.parametrize(
'count, page, items, expected', [
(1, 1, 3, [1, None, 1, None, 1]),
(2, 1, 3, [1, None, 1, None, 1]),
(3, 1, 3, [1, None, 1, None, 1]),
(4, 1, 3, [1, None, 1, 2, 2]),
(5, 1, 3, [1, None, 1, 2, 2]),
(6, 1, 3, [1, None, 1, 2, 2]),
(7, 1, 3, [1, None, 1, 2, 3]),
(1, 1, 10, [1, None, 1, None, 1]),
(2, 1, 10, [1, None, 1, None, 1]),
(9, 1, 10, [1, None, 1, None, 1]),
(10, 1, 10, [1, None, 1, None, 1]),
(11, 1, 10, [1, None, 1, 2, 2]),
(11, 2, 10, [1, 1, 2, None, 2]),
(20, 2, 10, [1, 1, 2, None, 2]),
(21, 2, 10, [1, 1, 2, 3, 3]),
(100, 5, 10, [1, 4, 5, 6, 10]),
(10, 5, 10, [1, None, 1, None, 1]),
]
)
def test_pagination_calculate_page_numbers(count, page, items, expected):
''' calculation of related pages '''
from ordr2.resources.base import Pagination
pages = Pagination(None, 0, items, None)
pages.count = count
pages.calculate(page, 7)
result = [
pages.first,
pages.previous,
pages.current,
pages.next,
pages.last
]
assert result == expected
@pytest.mark.parametrize(
'count, page, size, expected', [
(100, 1, 7, [1, 2, 3, 4]),
(100, 2, 7, [1, 2, 3, 4, 5]),
(100, 3, 7, [1, 2, 3, 4, 5, 6]),
(100, 4, 7, [1, 2, 3, 4, 5, 6, 7]),
(100, 5, 7, [2, 3, 4, 5, 6, 7, 8]),
(100, 6, 7, [3, 4, 5, 6, 7, 8, 9]),
(100, 7, 7, [4, 5, 6, 7, 8, 9, 10]),
(100, 8, 7, [5, 6, 7, 8, 9, 10]),
(100, 9, 7, [6, 7, 8, 9, 10]),
(100, 10, 7, [7, 8, 9, 10]),
(100, 1, 5, [1, 2, 3]),
(100, 2, 5, [1, 2, 3, 4]),
(100, 3, 5, [1, 2, 3, 4, 5]),
(100, 4, 5, [2, 3, 4, 5, 6]),
(100, 5, 5, [3, 4, 5, 6, 7]),
(100, 6, 5, [4, 5, 6, 7, 8]),
(100, 7, 5, [5, 6, 7, 8, 9]),
(100, 8, 5, [6, 7, 8, 9, 10]),
(100, 9, 5, [7, 8, 9, 10]),
(100, 10, 5, [8, 9, 10]),
]
)
def test_pagination_calculate_window(count, page, size, expected):
''' calculation of related pages '''
from ordr2.resources.base import Pagination
pages = Pagination(None, 0, 10, None)
pages.count = count
pages.calculate(page, size)
assert pages.window == expected
@pytest.mark.parametrize(
'first, last, check, expected', [
(1, 1, 0, None),
(1, 1, 1, 1),
(1, 1, 2, None),
(2, 4, 1, None),
(2, 4, 2, 2),
(2, 4, 3, 3),
(2, 4, 4, 4),
(2, 4, 5, None),
]
)
def test_pagination_is_valid(first, last, check, expected):
''' check is_valid function without default value '''
from ordr2.resources.base import Pagination
pages = Pagination(None, 0, 10, None)
pages.count = True
pages.first = first
pages.last = last
assert pages.is_valid(check) == expected
def test_pagination_is_valid_returns_default():
''' check is_valid function with default value '''
from ordr2.resources.base import Pagination
pages = Pagination(None, 0, 10, None)
pages.count = True
pages.first = 1
pages.last = 1
assert pages.is_valid(2, 'default') == 'default'
# test BaseListResource
def test_base_list_init():
''' Test object creation of base class
all other tests must be implemented in a child class of BaseListResource
'''
from ordr2.resources.base import BaseListResource
request = DummyRequest()
parent = DummyResource(request=request)
with pytest.raises(NotImplementedError):
resource = BaseListResource('John Cleese', parent)