You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
169 lines
4.7 KiB
169 lines
4.7 KiB
import uuid |
|
import hashlib |
|
from datetime import datetime |
|
from collections import namedtuple |
|
from urllib.parse import urlparse |
|
|
|
import requests |
|
|
|
from . import events, models, security |
|
|
|
CONSUMABLE_STATI = { |
|
models.OrderStatus.ORDERED, |
|
models.OrderStatus.COMPLETED, |
|
} |
|
|
|
|
|
MSG_SHORT_PASSWORD = events.FlashMessage.warning( |
|
f"Your password is quite short.", |
|
( |
|
'Have a look at <a href="https://www.xkcd.com/936/">' |
|
"this comic strip</a> why longer passwords are better." |
|
), |
|
) |
|
|
|
MSG_PWNED_PASSWORD = events.FlashMessage.warning( |
|
"This password appears in a breach or has been compromised.", |
|
( |
|
"Please consider changing your password. " |
|
'<a href="/breached">Read this page</a> on why you ' |
|
"see this message." |
|
), |
|
) |
|
|
|
|
|
def find_consumables(repo, repeat=3, days=365 * 2): |
|
""" search for orders that are requested often """ |
|
unsorted = _find_consumables(repo, repeat, days) |
|
return sorted(unsorted, key=lambda x: x.cas_description) |
|
|
|
|
|
def _find_consumables(repo, repeat=3, days=365 * 2): |
|
""" helper function for find_consumables() implementation """ |
|
now = datetime.now() |
|
by_date = ( |
|
o for o in repo.list_orders() if (now - o.created_on).days < days |
|
) |
|
relevant = (o for o in by_date if o.status in CONSUMABLE_STATI) |
|
counter = {} |
|
for order in relevant: |
|
item = counter.setdefault( |
|
order.catalog_nr, models.ProposedConsumable(order) |
|
) |
|
item.times = item.times + 1 |
|
if item.times == repeat: |
|
yield item.order |
|
|
|
|
|
def create_log_entry(order, status, user): |
|
log_entry = models.LogEntry(order.id, status, user.username) |
|
order.add_to_log(log_entry) |
|
|
|
|
|
def verify_credentials(repo, pass_ctx, username, password): |
|
try: |
|
user = repo.get_user_by_username(username) |
|
except StopIteration: |
|
# user was not found |
|
return None |
|
valid, new_hash = pass_ctx.verify_and_update(password, user.password) |
|
if not valid: |
|
# password did not match |
|
return None |
|
if new_hash: |
|
# we need to update the password hash to a algorithm |
|
user.password = new_hash |
|
return user |
|
|
|
|
|
def _vendor_from_url(vendor): |
|
parsed = urlparse(vendor) |
|
if parsed.netloc != "": |
|
return parsed.netloc |
|
else: |
|
return vendor |
|
|
|
|
|
def _vendor_with_common_domain(vendor): |
|
for tld in (".eu", ".com", ".de"): |
|
if vendor.endswith(tld): |
|
parts = vendor.split(".") |
|
return parts[-2] |
|
return vendor |
|
|
|
|
|
CheckVendorResult = namedtuple("CheckVendorResult", ["name", "found"]) |
|
|
|
|
|
def check_vendor_name(repo, to_check): |
|
# remove unused whitespace |
|
cleaned = " ".join(to_check.strip().split()) |
|
tmp = _vendor_from_url(cleaned) |
|
canonical_name = _vendor_with_common_domain(tmp) |
|
|
|
vendor = repo.search_vendor(canonical_name.lower()) |
|
|
|
if vendor is None: |
|
return CheckVendorResult(canonical_name, False) |
|
else: |
|
return CheckVendorResult(vendor, True) |
|
|
|
|
|
def set_new_password(user, password, event_queue): |
|
crypt_context = security.get_passlib_context() |
|
user.password = crypt_context.hash(password) |
|
|
|
length_ok = len(password) >= 16 |
|
if not length_ok: |
|
event_queue.emit(MSG_SHORT_PASSWORD) |
|
|
|
has_been_pwned = check_have_i_been_pwned(password, event_queue) |
|
|
|
return length_ok and not has_been_pwned |
|
|
|
|
|
def check_have_i_been_pwned(password, event_queue): |
|
""" public function for checking haveibeenpwned |
|
|
|
this is just a small shim to eas testing""" |
|
password_hash = hashlib.sha1(password.encode()).hexdigest() |
|
return _check_have_i_been_pwned(password_hash, event_queue) |
|
|
|
|
|
def _check_have_i_been_pwned(password_hash, event_queue): |
|
""" chech haveibeenpwned for password leak |
|
|
|
This function does the heavy lifting for check_have_i_been_pwned() |
|
""" |
|
|
|
head, tails = password_hash[:5], password_hash[5:].lower() |
|
url = f"https://api.pwnedpasswords.com/range/{head}" |
|
headers = {"Add-Padding": "true"} |
|
try: |
|
response = requests.get(url, headers=headers, timeout=2) |
|
response.raise_for_status() |
|
except requests.RequestException: |
|
return False |
|
lower_case_lines = (line.lower() for line in response.text.splitlines()) |
|
for line in lower_case_lines: |
|
if line.startswith(tails): |
|
event_queue.emit(MSG_PWNED_PASSWORD) |
|
return True |
|
return False |
|
|
|
|
|
def create_token_for_user(repo, user): |
|
reference = str(uuid.uuid4()) |
|
token = models.PasswordResetToken(reference, user.id) |
|
repo.add_reset_token(token) |
|
return token |
|
|
|
|
|
def get_user_from_reset_token(repo, identifier): |
|
try: |
|
token = repo.get_reset_token(identifier) |
|
if token.is_valid: |
|
return repo.get_user(token.user_id) |
|
except StopIteration: |
|
pass |
|
return None
|
|
|