import re import configparser from .users import ElabUser from .constants import ( USERS, ADMINS, ALUMNI, READ_ACL, WRITE_ACL, RESTRICTED, SVN_SUFFIX, GROUP_DEFAULTS, ) RE_LIST_SEPARATORS = re.compile("[\t ,;]+") def format_ini_option(key, value): """formats a key value pair for writing an ini file""" return " = ".join((key, str(value).replace("\n", "\n\t"))) class AuthzConfigParser(configparser.ConfigParser): """custom functions for parsing the "authz" file as used at cpi there is a dict of users defined, the journals themselves can be accessed via the sections functionality of the ConfigParser base class """ def __init__(self): """initialization of the class""" self.elab_users = {} self.original_path = None super().__init__() def optionxform(self, value): """reset the method to use cases sensitive names""" return str(value) def read(self, path): """set up the acl defaults after reading the file""" super().read(path) self.original_path = path self._extract_user_info_from_config() @classmethod def from_file(cls, path): instance = cls() instance.read(path) return instance def write_to_file(self, path=None): path = path or self.original_path if not path: raise IOError("No path specified") with open(path, "w") as filehandle: self.write(filehandle) def write(self, fp): """Write an .ini-format representation of the configuration state. this is adapted from the original library file. changes: - no default section - group-section at top - rest of section sorted by name """ sorted_keys = sorted(self._sections.keys()) sorting = ["groups"] sorting.extend([k for k in sorted_keys if k != "groups"]) for section in sorting: fp.write("[%s]\n" % section) acls = { k: v for k, v in self._sections[section].items() if k != "__name__" } if section != "groups": for group in (ADMINS, USERS, RESTRICTED, ALUMNI): group_id = "@" + group acl_value = acls.pop(group_id, GROUP_DEFAULTS[group]) key = format_ini_option(group_id, acl_value) fp.write("%s\n" % (key)) for (key, value) in acls.items(): if (value is not None) or (self._optcre == self.OPTCRE): key = format_ini_option(key, value) fp.write("%s\n" % (key)) fp.write("\n") def _extract_user_info_from_config(self): """extracts the user information from the config file the information of the journals can be accessed via get_journal_info """ # first parse the group definitions self._extract_group_definitions() # walk through the sections to get individual acl information self._extract_individual_acls() def _extract_group_definitions(self): """extracts the group information from the config file""" # first parse the group definitions for group, userlist in self.items("groups"): if group not in GROUP_DEFAULTS: raise KeyError(f"Undefined group {group} in authz file") for username in RE_LIST_SEPARATORS.split(userlist): if username in self.elab_users: raise Exception( ( f"Found duplicate entry for user " f"{username} in authz file" ) ) self.elab_users[username] = ElabUser(username, group) def _extract_individual_acls(self): """extracts the acl information from the elab section""" elabs = (item for item in self.sections() if item.endswith(SVN_SUFFIX)) for elab in elabs: for (user_or_group, acl) in self.items(elab): if user_or_group in self.elab_users: # a nicer name for the lab journal belongs_to = elab[: -len(SVN_SUFFIX)] # a acl entry for a user if acl.lower() == WRITE_ACL: self.elab_users[user_or_group].write_acl.add( belongs_to ) elif acl.lower() == READ_ACL: self.elab_users[user_or_group].read_acl.add(belongs_to) def group_users(self): """uses the list of users to group them by their group name""" groups = {key: [] for key in GROUP_DEFAULTS.keys()} for user in self.elab_users.values(): if user.group not in groups: raise KeyError( f"found unknown group {user.group} for user {user.name}" ) groups[user.group].append(user.name) return groups def add_journal_acl_for(self, username, group): """sets the acls for a new user an the corresponding journal""" self.elab_users[username] = ElabUser(username, group) journal_path = username + SVN_SUFFIX self.add_section(journal_path) self.set(journal_path, username, WRITE_ACL) for group, acl in GROUP_DEFAULTS.items(): self.set(journal_path, f"@{group}", acl) self._update_user_group_config() return self.elab_users[username] def move_user_to_alumni(self, name): """moves a user to the alumni group and removes the acl privileges""" user = self.elab_users[name] user.group = ALUMNI for access_to in user.write_acl: self.remove_option(access_to + SVN_SUFFIX, user.name) for access_to in user.read_acl: self.remove_option(access_to + SVN_SUFFIX, user.name) user.write_acl = set() user.read_acl = set() self._update_user_group_config() return user def _update_user_group_config(self): """updates the config settings of the groups section""" groups = self.group_users() for group, userlist in groups.items(): self.set("groups", group, ", ".join(sorted(userlist))) def get_journal_info(self, elab): """returns read and write access info of an lab journal""" if not elab.endswith(SVN_SUFFIX): elab = elab + SVN_SUFFIX if not self.has_section(elab): return None info = {WRITE_ACL: [], READ_ACL: []} for (user_or_group, acl) in self.items(elab): if acl in (WRITE_ACL, READ_ACL): info[acl].append(user_or_group) return info