Browse Source

working on modernizing

pull/1/head
Holger Frey 3 years ago
parent
commit
6021ceec0e
  1. 171
      elab_users/authz.py
  2. 24
      elab_users/constants.py
  3. 77
      elab_users/users.py
  4. 140
      manage_scrap.py
  5. 9
      run.py
  6. 2
      test-data/authz
  7. 4
      tests/test_elab_users.py

171
elab_users/authz.py

@ -0,0 +1,171 @@
import re
import configparser
from .users import ElabUser
from .constants import (
USERS,
ADMINS,
ALUMNI,
READ_ACL,
WRITE_ACL,
RESTRICTED,
SVN_SUFFIX,
GROUP_DEFAULTS,
)
RE_LIST_SEPARATORS = re.compile("[\t ,;]+")
def format_ini_option(key, value):
"""formats a key value pair for writing an ini file"""
return " = ".join((key, str(value).replace("\n", "\n\t")))
class AuthzConfigParser(configparser.ConfigParser):
"""custom functions for parsing the "authz" file as used at cpi
there is a dict of users defined, the journals themselves can be accessed
via the sections functionality of the ConfigParser base class
"""
def __init__(self):
"""initialization of the class"""
self.elab_users = {}
super().__init__()
def optionxform(self, value):
"""reset the method to use cases sensitive names"""
return str(value)
def read(self, path):
"""set up the acl defaults after reading the file"""
super().read(path)
self._extract_user_info_from_config()
def write_to_file(self, path):
with open(path, "w") as filehandle:
self.write(filehandle)
def write(self, fp):
"""Write an .ini-format representation of the configuration state.
this is adapted from the original library file. changes:
- no default section
- group-section at top
- rest of section sorted by name
"""
sorted_keys = sorted(self._sections.keys())
sorting = ["groups"]
sorting.extend([k for k in sorted_keys if k != "groups"])
for section in sorting:
fp.write("[%s]\n" % section)
acls = {
k: v
for k, v in self._sections[section].items()
if k != "__name__"
}
if section != "groups":
for group in (ADMINS, USERS, RESTRICTED, ALUMNI):
group_id = "@" + group
acl_value = acls.pop(group_id, GROUP_DEFAULTS[group])
key = format_ini_option(group_id, acl_value)
fp.write("%s\n" % (key))
for (key, value) in acls.items():
if (value is not None) or (self._optcre == self.OPTCRE):
key = format_ini_option(key, value)
fp.write("%s\n" % (key))
fp.write("\n")
def _extract_user_info_from_config(self):
"""extracts the user information from the config file
the information of the journals can be accessed via get_journal_info
"""
# first parse the group definitions
self._extract_group_definitions()
# walk through the sections to get individual acl information
self._extract_individual_acls()
def _extract_group_definitions(self):
"""extracts the group information from the config file"""
# first parse the group definitions
for group, userlist in self.items("groups"):
if group not in GROUP_DEFAULTS:
raise KeyError(f"Undefined group {group} in authz file")
for username in RE_LIST_SEPARATORS.split(userlist):
if username in self.elab_users:
raise Exception(
(
f"Found duplicate entry for user "
f"{username} in authz file"
)
)
self.elab_users[username] = ElabUser(username, group)
def _extract_individual_acls(self):
"""extracts the acl information from the elab section"""
elabs = (item for item in self.sections() if item.endswith(SVN_SUFFIX))
for elab in elabs:
for (user_or_group, acl) in self.items(elab):
print(elab, "-", user_or_group, "-", acl)
if user_or_group in self.elab_users:
# a nicer name for the lab journal
belongs_to = elab[: -len(SVN_SUFFIX)]
# a acl entry for a user
if acl.lower() == WRITE_ACL:
self.elab_users[user_or_group].write_acl.append(
belongs_to
)
elif acl.lower() == READ_ACL:
self.elab_users[user_or_group].read_acl.append(
belongs_to
)
def group_users(self):
"""uses the list of users to group them by their group name"""
groups = {key: [] for key in GROUP_DEFAULTS.keys()}
for user in self.elab_users.values():
if user.group not in groups:
raise KeyError(
f"found unknown group {user.group} for user {user.name}"
)
groups[user.group].append(user.name)
return groups
def add_journal_acl_for(self, username, group):
"""sets the acls for a new user an the corresponding journal"""
self.elab_users[username] = ElabUser(username, group)
journal_path = username + SVN_SUFFIX
self.add_section(journal_path)
self.set(journal_path, username, WRITE_ACL)
for group, acl in GROUP_DEFAULTS.items():
self.set(journal_path, "@" + group, acl)
self._update_user_group_config()
def move_user_to_alumni(self, name):
"""moves a user to the alumni group and removes the acl privileges"""
user = self.elab_users[name]
user.group = ALUMNI
for access_to in user.write_acl:
self.remove_option(access_to + SVN_SUFFIX, user.name)
for access_to in user.read_acl:
self.remove_option(access_to + SVN_SUFFIX, user.name)
self._update_user_group_config()
def _update_user_group_config(self):
"""updates the config settings of the groups section"""
groups = self.group_users()
for group, userlist in groups.items():
self.set("groups", group, ", ".join(sorted(userlist)))
def get_journal_info(self, elab):
"""returns read and write access info of an lab journal"""
if not elab.endswith(SVN_SUFFIX):
elab = elab + SVN_SUFFIX
if not self.has_section(elab):
return None
info = {WRITE_ACL: [], READ_ACL: []}
for (user_or_group, acl) in self.items(elab):
if acl in (WRITE_ACL, READ_ACL):
info[acl].append(user_or_group)
return info

24
elab_users/constants.py

@ -0,0 +1,24 @@
from pathlib import Path
MOUNT_PATH = Path("/mnt") / "nfs-data-store-1" / "drive"
REPO_PATH = MOUNT_PATH / "svn-repository"
AUTHZ_PATH = REPO_PATH / "authz"
HTPWD_PATH = REPO_PATH / ".htpasswd"
ADMINS = "administrators"
USERS = "users"
RESTRICTED = "restricted"
ALUMNI = "alumni"
NO_ACL = ""
READ_ACL = "r"
WRITE_ACL = "rw"
GROUP_DEFAULTS = {
ADMINS: WRITE_ACL,
USERS: READ_ACL,
RESTRICTED: NO_ACL,
ALUMNI: NO_ACL,
}
SVN_SUFFIX = ":/"

77
elab_users/users.py

@ -0,0 +1,77 @@
import os
import random
import string
import tempfile
import subprocess # noqa: S404
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass
@dataclass
class ElabUser:
name: str
group: str
write_acl = []
read_acl = []
def __str__(self):
"""return a string representation"""
return self.name
def set_new_password(self, htpasswd_path, length=10, handler=subprocess):
"""sets a new random password for a user"""
characters = string.ascii_letters + string.digits
password = "".join(
random.choice(characters) for i in range(length) # noqa: S311
)
handler.check_call(
["htpasswd", "-b", htpasswd_path, self.name, password]
)
return password
def delete_password(self, htpasswd_path, handler=subprocess):
"""deletes a password for a user"""
# if the user was not added to the password db, the removal will show
# an error message that is confusing to the user - at least it
# confused me - so redirect this to /dev/null
with open(os.devnull, "wb") as devnull:
handler.check_call(
["htpasswd", "-D", htpasswd_path, self.name], stderr=devnull
)
def create_new_repository(self, data_dir, handler=subprocess):
"""creates a repository for a user and checks in some stuff"""
# create the new repository
new_repo = data_dir / self.name
handler.check_call(
["svnadmin", "create", new_repo], stderr=handler.STDOUT
)
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
# check out a temporary working copy
handler.check_call(
["svn", "checkout", f"file://{new_repo}", tmpdir]
)
# create subfolders
today = datetime.now()
year_path = tmpdir / f"{today.year:0>4}"
year_path.mkdir()
for month in range(today.month, 13):
month_path = year_path / f"{month:0>2}"
month_path.mkdir()
handler.check_call(["touch", month_path / ".empty"])
# copy some examples
for temp in ("experiment", "synthesis", "toc"):
filename = f"template-{temp}.doc"
in_file = data_dir / filename
out_file = tmpdir / filename
handler.check_call(["cp", in_file, out_file])
# add and commit the changes
handler.check_call(
"svn", "add", tmpdir / "*", shell=True # noqa: S604
)
handler.check_call(
["svn", "commit", "-m", f"New User: {self.name}", tmpdir]
)

140
manage_scrap.py

@ -0,0 +1,140 @@
#!/usr/bin/python
# imports of modules
import optparse
import subprocess
import sys
if __name__ == "__main__":
# create configparser instance
config = AuthzConfigParser()
# read config file
config.read(AUTHZ_PATH)
# command line interface:
# no option: display info
# -g display users in a group
# -a add regular user
# -r add restricted user
# -m move to alumni
# -p reset user password
parser = optparse.OptionParser(
usage="usage: %prog [option] name",
description="shows and manipulates svn access rights",
epilog="to grant a restricted user access to another folder, you have to carefully edit the authz file")
parser.add_option("-g", "--groupinfo", action="store_const", dest="what",
const="g", help="display users in a group")
parser.add_option("-a", "--add", action="store_const", dest="what",
const="a", help="add a regular user")
parser.add_option("-r", "--restricted", action="store_const", dest="what",
const="r", help="add a restricted user")
parser.add_option("-m", "--move", action="store_const", dest="what",
const="m", help="move a user to alumni")
parser.add_option("-p", "--password", action="store_const", dest="what",
const="p", help="reset a user password")
options, args = parser.parse_args()
if len(args)==0:
# no arguments? then display all the users!
groups = config.group_users()
for name, usernames in groups.items():
print "Users in group '%s':" % name
for name in sorted(usernames):
print " " + name
sys.exit()
if len(args)>1:
# more than one usename? not here, john boy
sys.exit("please provide only one name")
name = args[0]
if options.what == "g":
# show group information
groups = config.group_users()
if name not in groups:
sys.exit("Group not found")
print "Users in group '%s':" % name
for usernamename in sorted(groups[name]):
print " " + usernamename
sys.exit()
if options.what in ("a", "r"):
# add a user, restricted or regular
if name in config.elab_users:
sys.exit("Username '%s' already in use" % name)
group = RESTRICTED if options.what == "r" else USERS
config.add_journal_acl_for(name, group)
create_new_repository(name)
#subprocess.check_call(SVN_DIR_CREATOR + " " + name, shell=True)
password = set_new_password(name)
print "New password for :"
print "username: " + name
print "password: " + password
print "url: https://svn.cpi.imtek.uni-freiburg.de/" + name
config.write_to_file()
sys.exit()
# from here downwards we need already existent usernames
if name not in config.elab_users:
sys.exit("User '%s' not found, use this without a name to get a list of users." % name)
if options.what == "m":
# move user to alumni
user = config.elab_users[name]
if user.group == ALUMNI:
sys.exit("User '%s' is already in group '%s'" % (name, ALUMNI))
if user.group == ADMINS:
sys.exit("User '%s' is in group '%s', will not moved to '%s'" % (name, ADMINS, ALUMNI))
config.move_user_to_alumni(name)
config.write_to_file()
delete_password(name)
sys.exit()
if options.what == "p":
# reset a password
password = set_new_password(name)
print "New password for :"
print "username: " + name
print "password: " + password
sys.exit()
# no option, just a name:
user = config.elab_users[name]
print "User %s is in group '%s':" % (name, user.group)
# print the write acls for a user
if user.group == ADMINS:
print " Write access is granted to all journals."
elif user.write_acl:
write_acl = [ username + SVN_SUFFIX for username in user.write_acl ]
print " Write access is granted to '%s'. " % "', '".join(write_acl)
else:
print " Write access is NOT granted to any journals"
# print the read acls for a user
if user.group == ADMINS:
print " Read access is granted to all journals."
elif user.group == USERS:
print " Read access is granted to (nearly) all journals."
elif user.read_acl:
read_acl = [ username + SVN_SUFFIX for username in user.read_acl ]
print " Read access is granted to '%s'. " % "', '".join(read_acl)
else:
print " Read access is NOT granted to any journals"
info = config.get_journal_info(name)
# print the write acls for a journal
print "Labjournal %s%s" % (name, SVN_SUFFIX)
if info[WRITE_ACL]:
print " Write access granted to: " + ", ".join(info[WRITE_ACL])
else:
print " No write access granted to anybody"
# print the read acls for a journal
if info[READ_ACL]:
print " Read access granted to: " + ", ".join(info[READ_ACL])
else:
print " No read access granted to anybody"

9
run.py

@ -0,0 +1,9 @@
import elab_users.authz
f = "test-data/authz"
p = elab_users.authz.AuthzConfigParser()
p.read(f)
print(p.group_users())
print(p.get_journal_info("CamillaOestevold"))
print(p.get_journal_info("AlexanderDietz"))

2
test-data/authz

@ -15,7 +15,7 @@ restricted = BeniPrasser, JuliaSaar, SimonZunker, UrmilShah, YongZhou
@users = r @users = r
@restricted = @restricted =
@alumni = @alumni =
AlexanderDietz= r AlexanderDietz= rw
[AlexeyKopyshev:/] [AlexeyKopyshev:/]
@administrators= rw @administrators= rw

4
tests/test_elab_users.py

@ -25,7 +25,7 @@ import pytest
def test_example_unittest(): def test_example_unittest():
""" example unittest """example unittest
will be run by 'make test' and 'make testall' but not 'make coverage' will be run by 'make test' and 'make testall' but not 'make coverage'
""" """
@ -34,7 +34,7 @@ def test_example_unittest():
@pytest.mark.fun @pytest.mark.fun
def test_example_functional_test(): def test_example_functional_test():
""" example unittest """example unittest
will be by 'make coverage' and 'make testall' but not 'make test' will be by 'make coverage' and 'make testall' but not 'make test'
""" """

Loading…
Cancel
Save