Browse Source

added service to verify credentials

funding-tag
Holger Frey 5 years ago
parent
commit
017f15645e
  1. 56
      ordr3/repo.py
  2. 18
      ordr3/services.py
  3. 68
      tests/test_repo.py
  4. 65
      tests/test_services.py

56
ordr3/repo.py

@ -13,35 +13,85 @@ class AbstractOrderRepository(abc.ABC): @@ -13,35 +13,85 @@ class AbstractOrderRepository(abc.ABC):
@abc.abstractmethod
def add_order(self, order):
raise NotImplementedError
""" add an order to the datastore """
@abc.abstractmethod
def get_order(self, reference):
raise NotImplementedError
""" get an order from the datastore by primary key """
@abc.abstractmethod
def list_orders(self):
raise NotImplementedError
""" list orders orderd by date, descending """
@abc.abstractmethod
def add_user(self, user):
""" add a user to the datastore """
@abc.abstractmethod
def get_user(self, reference):
""" get a user from the datastore by primary key """
@abc.abstractmethod
def get_user_by_username(self, reference):
""" get a user from the datastore by username """
@abc.abstractmethod
def get_user_by_email(self, reference):
""" get a user from the datastore by email """
@abc.abstractmethod
def list_users(self):
""" list users orderd by username """
class SqlAlchemyRepository(AbstractOrderRepository):
""" Repository implementation for SQLAlchemy """
def _add_item_to_db(self, item):
""" add any item to the database """
self.session.add(item)
self.session.flush()
def add_order(self, order):
""" add an order to the database """
self._add_item_to_db(order)
def get_order(self, reference):
""" get an order from the database by primary key """
return (
self.session.query(models.OrderItem).filter_by(id=reference).one()
)
def list_orders(self):
""" list orders orderd by date, descending """
return (
self.session.query(models.OrderItem)
.order_by(models.OrderItem.created_on.desc())
.all()
)
def add_user(self, user):
""" add a user to the database """
self._add_item_to_db(user)
def get_user(self, reference):
""" get a user from the database by primary key """
return self.session.query(models.User).filter_by(id=reference).one()
def get_user_by_username(self, reference):
""" get a user from the database by username """
return (
self.session.query(models.User).filter_by(username=reference).one()
)
def get_user_by_email(self, reference):
""" get a user from the database by email """
return self.session.query(models.User).filter_by(email=reference).one()
def list_users(self):
""" list users orderd by username """
return (
self.session.query(models.User)
.order_by(models.User.username)
.all()
)

18
ordr3/services.py

@ -1,5 +1,7 @@ @@ -1,5 +1,7 @@
from datetime import datetime
from sqlalchemy.orm.exc import NoResultFound
from . import models
CONSUMABLE_STATI = {
@ -34,3 +36,19 @@ def _find_consumables(repo, repeat=3, days=365 * 2): @@ -34,3 +36,19 @@ def _find_consumables(repo, repeat=3, days=365 * 2):
def create_log_entry(order, status, user):
log_entry = models.LogEntry(order.id, status, user.username, user.id)
order.add_to_log(log_entry)
def verify_credentials(repo, pass_ctx, username, password):
try:
user = repo.get_user_by_username(username)
except (NoResultFound, StopIteration):
# user was not found
return None
valid, new_hash = pass_ctx.verify_and_update(password, user.password)
if not valid:
# password did not match
return None
if new_hash:
# we need to update the password hash to a algorithm
user.password = new_hash
return user

68
tests/test_repo.py

@ -42,6 +42,16 @@ def example_orders(): @@ -42,6 +42,16 @@ def example_orders():
]
@pytest.fixture()
def example_users():
from ordr3.models import User, UserRole
return [
User(1, "You", "Jim", "Smith", "jim.smith", "abcd", UserRole.ADMIN,),
User(2, "Me", "Jane", "Doe", "jane.doe", "1234", UserRole.USER),
]
def test_sql_repo_add_order(session, example_orders):
from ordr3.repo import SqlAlchemyRepository
from ordr3.models import OrderItem
@ -76,3 +86,61 @@ def test_sql_repo_list_orders(session, example_orders): @@ -76,3 +86,61 @@ def test_sql_repo_list_orders(session, example_orders):
session.flush()
assert repo.list_orders() == [later, earlier]
def test_sql_repo_add_user(session, example_users):
from ordr3.repo import SqlAlchemyRepository
from ordr3.models import User
repo = SqlAlchemyRepository(session)
repo.add_user(example_users[0])
session.flush()
user = session.query(User).first()
assert user == example_users[0]
def test_sql_repo_get_user(session, example_users):
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_user(example_users[0])
repo.add_user(example_users[1])
session.flush()
assert example_users[1] == repo.get_user(2)
def test_sql_repo_get_user_by_username(session, example_users):
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_user(example_users[0])
repo.add_user(example_users[1])
session.flush()
assert example_users[1] == repo.get_user_by_username("Me")
def test_sql_repo_get_user_by_email(session, example_users):
from ordr3.repo import SqlAlchemyRepository
repo = SqlAlchemyRepository(session)
repo.add_user(example_users[0])
repo.add_user(example_users[1])
session.flush()
assert example_users[1] == repo.get_user_by_email("jane.doe")
def test_sql_repo_list_users(session, example_users):
from ordr3.repo import SqlAlchemyRepository
later, earlier = example_users
repo = SqlAlchemyRepository(session)
repo.add_user(later)
repo.add_user(earlier)
session.flush()
assert repo.list_users() == [earlier, later]

65
tests/test_services.py

@ -10,6 +10,7 @@ class FakeOrderRepository(AbstractOrderRepository): @@ -10,6 +10,7 @@ class FakeOrderRepository(AbstractOrderRepository):
def __init__(self, session):
self._orders = set()
self._users = set()
def add_order(self, order):
""" add an order to the datastore """
@ -22,6 +23,38 @@ class FakeOrderRepository(AbstractOrderRepository): @@ -22,6 +23,38 @@ class FakeOrderRepository(AbstractOrderRepository):
def list_orders(self):
return sorted(self._orders, reverse=True, key=lambda x: x.created_on)
def add_user(self, user):
""" add a user to the datastore """
self._users.add(user)
def get_user(self, reference):
""" retrieve a user from the datastore """
return next(o for o in self._users if o.id == reference)
def get_user_by_username(self, reference):
""" retrieve a user from the datastore by username """
return next(o for o in self._users if o.username == reference)
def get_user_by_email(self, reference):
""" retrieve a user from the datastore by email """
return next(o for o in self._users if o.email == reference)
def list_users(self):
return sorted(self._users, key=lambda x: x.username)
class FakePasslibContext:
def __init__(self, needs_update):
self.needs_update = needs_update
def verify_and_update(self, password, hash):
if password != hash:
return False, None
if self.needs_update:
return True, password[::-1]
else:
return True, None
@pytest.fixture
def prefilled_repo():
@ -97,3 +130,35 @@ def test_create_log_entry(prefilled_repo): @@ -97,3 +130,35 @@ def test_create_log_entry(prefilled_repo):
assert order.status == log_entry.status
assert order.created_by == log_entry.by
assert order.created_on == log_entry.date
@pytest.mark.parametrize(
"update,new_hash", [(False, "1234"), (True, "4321"),], # noqa: E231
)
def test_verify_username_and_password_valid(update, new_hash):
from ordr3.models import User, UserRole
from ordr3.services import verify_credentials
user = User(2, "Me", "Jane", "Doe", "jane.doe", "1234", UserRole.USER)
repo = FakeOrderRepository(None)
repo.add_user(user)
pass_ctx = FakePasslibContext(update)
user = verify_credentials(repo, pass_ctx, user.username, user.password)
assert user is not None
assert user.password == new_hash
@pytest.mark.parametrize(
"name,pwd", [("You", "1234"), ("Me", "abcd"),], # noqa: E231
)
def test_verify_username_and_password_invalid(name, pwd):
from ordr3.models import User, UserRole
from ordr3.services import verify_credentials
user = User(2, "Me", "Jane", "Doe", "jane.doe", "1234", UserRole.USER)
repo = FakeOrderRepository(None)
repo.add_user(user)
pass_ctx = FakePasslibContext(False)
assert verify_credentials(repo, pass_ctx, name, pwd) is None

Loading…
Cancel
Save