You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
184 lines
6.8 KiB
184 lines
6.8 KiB
import re |
|
import configparser |
|
|
|
from .users import ElabUser |
|
from .constants import ( |
|
USERS, |
|
ADMINS, |
|
ALUMNI, |
|
READ_ACL, |
|
WRITE_ACL, |
|
RESTRICTED, |
|
SVN_SUFFIX, |
|
GROUP_DEFAULTS, |
|
) |
|
|
|
RE_LIST_SEPARATORS = re.compile("[\t ,;]+") |
|
|
|
|
|
def format_ini_option(key, value): |
|
"""formats a key value pair for writing an ini file""" |
|
return " = ".join((key, str(value).replace("\n", "\n\t"))) |
|
|
|
|
|
class AuthzConfigParser(configparser.ConfigParser): |
|
"""custom functions for parsing the "authz" file as used at cpi |
|
|
|
there is a dict of users defined, the journals themselves can be accessed |
|
via the sections functionality of the ConfigParser base class |
|
""" |
|
|
|
def __init__(self): |
|
"""initialization of the class""" |
|
self.elab_users = {} |
|
self.original_path = None |
|
super().__init__() |
|
|
|
def optionxform(self, value): |
|
"""reset the method to use cases sensitive names""" |
|
return str(value) |
|
|
|
def read(self, path): |
|
"""set up the acl defaults after reading the file""" |
|
super().read(path) |
|
self.original_path = path |
|
self._extract_user_info_from_config() |
|
|
|
@classmethod |
|
def from_file(cls, path): |
|
instance = cls() |
|
instance.read(path) |
|
return instance |
|
|
|
def write_to_file(self, path=None): |
|
path = path or self.original_path |
|
if not path: |
|
raise IOError("No path specified") |
|
with open(path, "w") as filehandle: |
|
self.write(filehandle) |
|
|
|
def write(self, fp): |
|
"""Write an .ini-format representation of the configuration state. |
|
|
|
this is adapted from the original library file. changes: |
|
- no default section |
|
- group-section at top |
|
- rest of section sorted by name |
|
""" |
|
sorted_keys = sorted(self._sections.keys()) |
|
sorting = ["groups"] |
|
sorting.extend([k for k in sorted_keys if k != "groups"]) |
|
for section in sorting: |
|
fp.write("[%s]\n" % section) |
|
acls = { |
|
k: v |
|
for k, v in self._sections[section].items() |
|
if k != "__name__" |
|
} |
|
if section != "groups": |
|
for group in (ADMINS, USERS, RESTRICTED, ALUMNI): |
|
group_id = "@" + group |
|
acl_value = acls.pop(group_id, GROUP_DEFAULTS[group]) |
|
key = format_ini_option(group_id, acl_value) |
|
fp.write("%s\n" % (key)) |
|
for (key, value) in acls.items(): |
|
if (value is not None) or (self._optcre == self.OPTCRE): |
|
key = format_ini_option(key, value) |
|
fp.write("%s\n" % (key)) |
|
fp.write("\n") |
|
|
|
def _extract_user_info_from_config(self): |
|
"""extracts the user information from the config file |
|
|
|
the information of the journals can be accessed via get_journal_info |
|
""" |
|
# first parse the group definitions |
|
self._extract_group_definitions() |
|
# walk through the sections to get individual acl information |
|
self._extract_individual_acls() |
|
|
|
def _extract_group_definitions(self): |
|
"""extracts the group information from the config file""" |
|
# first parse the group definitions |
|
for group, userlist in self.items("groups"): |
|
if group not in GROUP_DEFAULTS: |
|
raise KeyError(f"Undefined group {group} in authz file") |
|
for username in RE_LIST_SEPARATORS.split(userlist): |
|
if username in self.elab_users: |
|
raise Exception( |
|
( |
|
f"Found duplicate entry for user " |
|
f"{username} in authz file" |
|
) |
|
) |
|
self.elab_users[username] = ElabUser(username, group) |
|
|
|
def _extract_individual_acls(self): |
|
"""extracts the acl information from the elab section""" |
|
elabs = (item for item in self.sections() if item.endswith(SVN_SUFFIX)) |
|
for elab in elabs: |
|
for (user_or_group, acl) in self.items(elab): |
|
print(elab, "-", user_or_group, "-", acl) |
|
if user_or_group in self.elab_users: |
|
# a nicer name for the lab journal |
|
belongs_to = elab[: -len(SVN_SUFFIX)] |
|
# a acl entry for a user |
|
if acl.lower() == WRITE_ACL: |
|
self.elab_users[user_or_group].write_acl.append( |
|
belongs_to |
|
) |
|
elif acl.lower() == READ_ACL: |
|
self.elab_users[user_or_group].read_acl.append( |
|
belongs_to |
|
) |
|
|
|
def group_users(self): |
|
"""uses the list of users to group them by their group name""" |
|
groups = {key: [] for key in GROUP_DEFAULTS.keys()} |
|
for user in self.elab_users.values(): |
|
if user.group not in groups: |
|
raise KeyError( |
|
f"found unknown group {user.group} for user {user.name}" |
|
) |
|
groups[user.group].append(user.name) |
|
return groups |
|
|
|
def add_journal_acl_for(self, username, group): |
|
"""sets the acls for a new user an the corresponding journal""" |
|
self.elab_users[username] = ElabUser(username, group) |
|
journal_path = username + SVN_SUFFIX |
|
self.add_section(journal_path) |
|
self.set(journal_path, username, WRITE_ACL) |
|
for group, acl in GROUP_DEFAULTS.items(): |
|
self.set(journal_path, "@" + group, acl) |
|
self._update_user_group_config() |
|
return self.elab_users[username] |
|
|
|
def move_user_to_alumni(self, name): |
|
"""moves a user to the alumni group and removes the acl privileges""" |
|
user = self.elab_users[name] |
|
user.group = ALUMNI |
|
for access_to in user.write_acl: |
|
self.remove_option(access_to + SVN_SUFFIX, user.name) |
|
for access_to in user.read_acl: |
|
self.remove_option(access_to + SVN_SUFFIX, user.name) |
|
self._update_user_group_config() |
|
return user |
|
|
|
def _update_user_group_config(self): |
|
"""updates the config settings of the groups section""" |
|
groups = self.group_users() |
|
for group, userlist in groups.items(): |
|
self.set("groups", group, ", ".join(sorted(userlist))) |
|
|
|
def get_journal_info(self, elab): |
|
"""returns read and write access info of an lab journal""" |
|
if not elab.endswith(SVN_SUFFIX): |
|
elab = elab + SVN_SUFFIX |
|
if not self.has_section(elab): |
|
return None |
|
info = {WRITE_ACL: [], READ_ACL: []} |
|
for (user_or_group, acl) in self.items(elab): |
|
if acl in (WRITE_ACL, READ_ACL): |
|
info[acl].append(user_or_group) |
|
return info
|
|
|