You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
178 lines
5.0 KiB
178 lines
5.0 KiB
import uuid |
|
import hashlib |
|
from datetime import datetime, timedelta |
|
from collections import namedtuple |
|
from urllib.parse import urlparse |
|
|
|
import requests |
|
|
|
from . import events, models, security |
|
|
|
CONSUMABLE_STATI = {models.OrderStatus.ORDERED, models.OrderStatus.COMPLETED} |
|
|
|
|
|
MSG_SHORT_PASSWORD = events.FlashMessage.warning( |
|
"Your password is quite short.", |
|
( |
|
'Have a look at <a href="https://www.xkcd.com/936/">' |
|
"this comic strip</a> why longer passwords are better." |
|
), |
|
) |
|
|
|
MSG_PWNED_PASSWORD = events.FlashMessage.warning( |
|
"This password appears in a breach or has been compromised.", |
|
( |
|
"Please consider changing your password. " |
|
'<a href="/breached">Read this page</a> on why you ' |
|
"see this message." |
|
), |
|
) |
|
|
|
|
|
def find_consumables(repo, repeat=3, days=365 * 2): |
|
"""search for orders that are requested often""" |
|
unsorted = _find_consumables(repo, repeat, days) |
|
return sorted(unsorted, key=lambda o: o.cas_description) |
|
|
|
|
|
def _find_consumables(repo, repeat=3, days=365 * 2): |
|
"""helper function for find_consumables() implementation""" |
|
limit_date = datetime.utcnow() - timedelta(days=days) |
|
relevant = repo.list_consumable_candidates(limit_date, CONSUMABLE_STATI) |
|
counter = {} |
|
for order in relevant: |
|
item = counter.setdefault( |
|
order.catalog_nr, models.ProposedConsumable(order) |
|
) |
|
item.times = item.times + 1 |
|
if item.times == repeat: |
|
yield item.order |
|
|
|
|
|
def create_log_entry(order, status, user): |
|
old_status = order.status |
|
|
|
if old_status != status: |
|
# only change to a new status |
|
log_entry = models.LogEntry(order.id, status, user.username) |
|
order.add_to_log(log_entry) |
|
# is this noteworthy? |
|
return order.created_by != user.username |
|
|
|
# it's not noteworthy |
|
return False |
|
|
|
|
|
def verify_credentials(repo, pass_ctx, username, password): |
|
try: |
|
user = repo.get_user_by_username(username) |
|
except StopIteration: |
|
# user was not found |
|
return None |
|
valid, new_hash = pass_ctx.verify_and_update(password, user.password) |
|
if not valid: |
|
# password did not match |
|
return None |
|
if new_hash: |
|
# we need to update the password hash to a algorithm |
|
user.password = new_hash |
|
return user |
|
|
|
|
|
def _vendor_from_url(vendor): |
|
parsed = urlparse(vendor) |
|
if parsed.netloc != "": |
|
return parsed.netloc |
|
else: |
|
return vendor |
|
|
|
|
|
def _vendor_with_common_domain(vendor): |
|
for tld in (".eu", ".com", ".de"): |
|
if vendor.endswith(tld): |
|
parts = vendor.split(".") |
|
return parts[-2] |
|
return vendor |
|
|
|
|
|
def canonical_vendor_name(vendor): |
|
cleaned = " ".join(vendor.strip().split()) |
|
tmp = _vendor_from_url(cleaned) |
|
return _vendor_with_common_domain(tmp) |
|
|
|
|
|
CheckVendorResult = namedtuple("CheckVendorResult", ["name", "found"]) |
|
|
|
|
|
def check_vendor_name(repo, to_check): |
|
# remove unused whitespace |
|
cleaned = " ".join(to_check.strip().split()) |
|
tmp = _vendor_from_url(cleaned) |
|
canonical_name = canonical_vendor_name(tmp) |
|
|
|
vendor = repo.search_vendor(canonical_name.lower()) |
|
|
|
if vendor is None: |
|
return CheckVendorResult(canonical_name, False) |
|
else: |
|
return CheckVendorResult(vendor.name, True) |
|
|
|
|
|
def set_new_password(user, password, event_queue): |
|
crypt_context = security.get_passlib_context() |
|
user.password = crypt_context.hash(password) |
|
|
|
length_ok = len(password) >= 16 |
|
if not length_ok: |
|
event_queue.emit(MSG_SHORT_PASSWORD) |
|
|
|
has_been_pwned = check_have_i_been_pwned(password, event_queue) |
|
|
|
return length_ok and not has_been_pwned |
|
|
|
|
|
def check_have_i_been_pwned(password, event_queue): |
|
"""public function for checking haveibeenpwned |
|
|
|
this is just a small shim to eas testing""" |
|
password_hash = hashlib.sha1(password.encode()).hexdigest() # noqa: S303 |
|
return _check_have_i_been_pwned(password_hash, event_queue) |
|
|
|
|
|
def _check_have_i_been_pwned(password_hash, event_queue): |
|
"""chech haveibeenpwned for password leak |
|
|
|
This function does the heavy lifting for check_have_i_been_pwned() |
|
""" |
|
|
|
head, tails = password_hash[:5], password_hash[5:].lower() |
|
url = f"https://api.pwnedpasswords.com/range/{head}" |
|
headers = {"Add-Padding": "true"} |
|
try: |
|
response = requests.get(url, headers=headers, timeout=2) |
|
response.raise_for_status() |
|
except requests.RequestException: |
|
return False |
|
lower_case_lines = (line.lower() for line in response.text.splitlines()) |
|
for line in lower_case_lines: |
|
if line.startswith(tails): |
|
event_queue.emit(MSG_PWNED_PASSWORD) |
|
return True |
|
return False |
|
|
|
|
|
def create_token_for_user(repo, user): |
|
reference = str(uuid.uuid4()) |
|
token = models.PasswordResetToken(reference, user.id) |
|
repo.add_reset_token(token) |
|
return token |
|
|
|
|
|
def get_user_from_reset_token(repo, identifier): |
|
try: |
|
token = repo.get_reset_token(identifier) |
|
if token.is_valid: |
|
return repo.get_user(token.user_id) |
|
except StopIteration: |
|
pass |
|
return None
|
|
|