Commit b14e7ab6 authored by jens.kleineheismann's avatar jens.kleineheismann

Usage redesign: introduce KitGvProvider ...

to provide objects. Removed basicConfig()
Usage/Configuration should be simple AND thread safe now.
parent 9012b335
......@@ -18,28 +18,25 @@ USAGE
=====
::
from kitgvapi import basicConfig, Oe, Group, User
from kitgvapi import KitGvProvider
basicConfig('myusername', 'mypassword')
iism = Oe('IISM')
gv = KitGvProvider()
gv.auth('myusername', 'mypassword')
iism = gv.getOe('IISM')
for g in iism.get_groups():
print g.name
iism.create_group('IISM-EM-newgroup', 'Neue Gruppe')
staff = Group('IISM-EM-OU-Staff')
staff = gv.getGroup('IISM-EM-OU-Staff')
for u in staff.get_users(effective=True):
print "%s %s (%s)" % (u.firstname, u.lastname, u.samaccount)
print "{fn} {ln} ({un})".format(fn=u.firstname, ln=u.lastname, un=u.samaccount)
for g in staff.get_groups():
print "%s (%s)" % (g.name, g.description)
print "{gn} ({desc})".format(gn=g.name, desc=g.description)
me = User('zy8373')
me = gv.getUser('zy8373')
others = me.oe.get_users()
Note:
-----
basicConfig() is not thread safe and is subject to change.
LICENCE
=======
......
from .client import Client
from .config import basicConfig
from .clients import RestClient
from .models import Group, Oe, User
from .provider import KitGvProvider
import json
import requests
from .config import CONFIG
from .config import DEFAULTS, SYMBOLS
from .exceptions import NotFoundError, MissingConfigError
class ClientConfig(object):
class RestClientConfig(object):
def __init__(self):
self.url_schema = CONFIG['URL_SCHEMA']
self.server = CONFIG['SERVER']
self.base_uri = CONFIG['BASE_URI']
self.username = CONFIG['USERNAME']
self.password = CONFIG['PASSWORD']
if not self.username:
self.credentials = None
elif isinstance(self.username, MissingConfigError):
raise self.username
elif isinstance(self.password, MissingConfigError):
raise self.password
else:
self.credentials = requests.auth.HTTPBasicAuth(self.username, self.password)
class Client(object):
self.url_schema = DEFAULTS['URL_SCHEMA']
self.server = DEFAULTS['SERVER']
self.base_uri = DEFAULTS['BASE_URI']
self._username = DEFAULTS['USERNAME']
self._password = DEFAULTS['PASSWORD']
self._credentials = None
def get_username(self):
if isinstance(self._username, MissingConfigError):
raise self._username
return self._username
def set_username(self, username):
self._username = username
self._credentials = None
def get_password(self):
if isinstance(self._password, MissingConfigError):
raise self._password
return self._password
def set_password(self, password):
self._password = password
self._credentials = None
def get_credentials(self):
if self._credentials is None:
u = self.username
if u:
self._credentials = requests.auth.HTTPBasicAuth(u, self.password)
return self._credentials
username = property(get_username, set_username)
password = property(get_password, set_password)
credentials = property(get_credentials)
class RestClient(object):
def __init__(self, config=None):
self.config = config
if self.config is None:
self.config = ClientConfig()
self.config = RestClientConfig()
def _get(self, url):
r = requests.get(url, verify=True, auth=self.config.credentials)
if r.status_code != requests.codes.ok:
r.raise_for_status()
return r
def _post(self, url, data=None, headers=None):
if data is None:
data = {}
if headers is None:
headers = {}
if 'content-type' not in headers:
headers['content-type'] = 'application/json'
r = requests.post(url, verify=True, auth=self.config.credentials, data=data, headers=headers)
if r.status_code != requests.codes.ok:
r.raise_for_status()
return r
def read(self, path):
def get(self, path):
url = self.config.url_schema + self.config.server + self.config.base_uri + path
try:
r = self._get(url)
r = requests.get(url, verify=True, auth=self.config.credentials)
r.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
raise NotFoundError("Endpoint returned Not Found")
else:
raise e
j = r.json()
return j
d = r.json()
return d
def write(self, path, data=None):
def post(self, path, data=None):
url = self.config.url_schema + self.config.server + self.config.base_uri + path
if data is not None:
data = json.dumps(data)
r = self._post(url, data)
headers = {'content-type': 'application/json'}
r = requests.post(url, verify=True, auth=self.config.credentials, data=data, headers=headers)
r.raise_for_status()
return r
def get_oes(self, pk_only=False):
path = '/oe/list'
l = self.read(path)
l = self.get(path)
if pk_only:
l = [d[CONFIG['OE_PK_ATTRIB']] for d in l]
l = [d[SYMBOLS['OE_PK_ATTRIB']] for d in l]
return l
def get_oe(self, pk):
path = '/oe/name/{pk}'.format(pk=pk)
return self.read(path)
return self.get(path)
def get_oe_groups(self, pk, pk_only=False):
path = '/groups/{pk}'.format(pk=pk)
l = self.read(path)
l = self.get(path)
if pk_only:
l = [d[CONFIG['GROUP_PK_ATTRIB']] for d in l]
l = [d[SYMBOLS['GROUP_PK_ATTRIB']] for d in l]
return l
def get_oe_users(self, pk, pk_only=False):
path = '/oe/name/{pk}/identities'.format(pk=pk)
l = self.read(path)
l = self.get(path)
if pk_only:
l = [d[CONFIG['USER_PK_ATTRIB']] for d in l]
l = [d[SYMBOLS['USER_PK_ATTRIB']] for d in l]
return l
def get_user(self, pk):
path = '/identities/get/{pk}'.format(pk=pk)
return self.read(path)
return self.get(path)
def get_user_groups(self, pk, pk_only=False):
path = '/identities/get/{pk}/memberof'.format(pk=pk)
l = self.read(path)
l = self.get(path)
if pk_only:
l = [d[CONFIG['GROUP_PK_ATTRIB']] for d in l]
l = [d[SYMBOLS['GROUP_PK_ATTRIB']] for d in l]
return l
def get_group(self, pk):
group_pk = pk
oe_pk = pk.partition('-')[0]
path = '/groups/{oe}/{group}'.format(oe=oe_pk, group=group_pk)
return self.read(path)
return self.get(path)
def get_group_users(self, pk, pk_only=True, effective=False):
if not pk_only:
raise NotImplemented('Not supported')
group_pk = pk
oe_pk = pk.partition('-')[0]
quant = 'members'
res = 'members'
if effective:
quant = 'effective' + quant
path = '/groups/{oe}/{group}/{quant}'.format(oe=oe_pk, group=group_pk, quant=quant)
return self.read(path)
res = 'effective' + res
path = '/groups/{oe}/{group}/{res}'.format(oe=oe_pk, group=group_pk, res=res)
return self.get(path)
def get_group_groups(self, pk, pk_only=True):
if not pk_only:
......@@ -123,21 +131,20 @@ class Client(object):
group_pk = pk
oe_pk = pk.partition('-')[0]
path = '/groups/{oe}/{group}/subgroups'.format(oe=oe_pk, group=group_pk)
return self.read(path)
def add_group_group(self, super_pk, sub_pk):
oe_pk = super_pk.partition('-')[0]
path = '/groups/{oe}/{superg}/subgroups/{subg}'.format(oe=oe_pk, superg=super_pk, subg=sub_pk)
self.write(path)
return self.get(path)
def add_group_user(self, group_pk, user_pk):
oe_pk = group_pk.partition('-')[0]
path = '/groups/{oe}/{group}/members/{user}'.format(oe=oe_pk, group=group_pk, user=user_pk)
self.write(path)
self.post(path)
def add_group_group(self, super_pk, sub_pk):
oe_pk = super_pk.partition('-')[0]
path = '/groups/{oe}/{superg}/subgroups/{subg}'.format(oe=oe_pk, superg=super_pk, subg=sub_pk)
self.post(path)
def create_group(self, name, description):
oe_pk = name.partition('-')[0]
path = '/groups/{oe}'.format(oe=oe_pk)
data = {'name': name, 'description': description}
self.write(path, data=data)
self.post(path, data=data)
from .exceptions import MissingConfigError
CONFIG = {
'GROUP_PK_ATTRIB': 'name',
'OE_PK_ATTRIB': 'kurz',
'USER_PK_ATTRIB': 'samaccount',
'URL_SCHEMA': 'https://',
'SERVER': 'kit-idm-03.scc.kit.edu',
'BASE_URI': '/itbportal-rest-war/rest',
'USERNAME': MissingConfigError('No username set'),
'PASSWORD': MissingConfigError('No password set'),
SYMBOLS = {
'GROUP_PK_ATTRIB': 'name',
'OE_PK_ATTRIB': 'kurz',
'USER_PK_ATTRIB': 'samaccount',
}
def basicConfig(username, password):
CONFIG['USERNAME'] = username
CONFIG['PASSWORD'] = password
DEFAULTS = {
'URL_SCHEMA': 'https://',
'SERVER': 'kit-idm-03.scc.kit.edu',
'BASE_URI': '/itbportal-rest-war/rest',
'USERNAME': MissingConfigError('No username set'),
'PASSWORD': MissingConfigError('No password set'),
}
class KitGvApiError(Exception):
pass
pass
class MissingConfigError(KitGvApiError):
pass
pass
class MissingClientError(KitGvApiError):
pass
pass
class MissingPkError(KitGvApiError):
pass
pass
class NotFoundError(KitGvApiError):
pass
pass
from .client import Client
from .config import CONFIG
from .clients import RestClient
from .config import SYMBOLS
from .exceptions import MissingClientError, MissingPkError
class Model(object):
def __init__(self, pk=None, client=None, data=None):
self.pk = pk
self.client = client
self.is_loaded = False
if self.client is None:
self.client = Client()
if data is not None:
self._load_data_attribs(data)
self.is_loaded = True
if self.pk is not None:
self.load()
def _load_data_attribs(self, data=None):
raise NotImplementedError('Abstract method')
def load(self, reload=False):
if self.is_loaded and not reload:
return
if self.client is None:
raise MissingClientError()
if self.pk is None:
raise MissingPkError()
self._load_data_attribs()
self.is_loaded = True
def _save_data_attribs(self):
raise NotImplementedError('Abstract method')
def save(self):
if self.client is None:
raise MissingClientError()
self._save_data_attribs()
def __init__(self, pk=None, client=None, data=None):
self.pk = pk
self.client = client
self.is_loaded = False
if self.client is None:
self.client = RestClient()
if data is not None:
self._load_data_attribs(data)
self.is_loaded = True
if self.pk is not None:
self.load()
def _load_data_attribs(self, data=None):
raise NotImplementedError('Abstract method')
def load(self, reload=False):
if self.is_loaded and not reload:
return
if self.client is None:
raise MissingClientError()
if self.pk is None:
raise MissingPkError()
self._load_data_attribs()
self.is_loaded = True
def _save_data_attribs(self):
raise NotImplementedError('Abstract method')
def save(self):
if self.client is None:
raise MissingClientError()
self._save_data_attribs()
class Group(Model):
def _load_data_attribs(self, data=None):
if data is None:
data = self.client.get_group(pk=self.pk)
self.name = data['name']
self.description = data['description']
self.gidNumber = data['gidNumber']
self.readOnly = data['readOnly']
self.verteiler = data['verteiler']
self.eMailAddress = data['eMailAddress']
self.oe_pk = data['oe']
self.pk = getattr(self, CONFIG['GROUP_PK_ATTRIB'])
def _save_data_attribs(self):
raise NotImplemented('Not supported')
@property
def oe(self):
return Oe(client=self.client, pk=self.oe_pk)
def get_users(self, effective=False):
self.load()
l = self.client.get_group_users(pk=self.pk, effective=effective)
for i in l:
y = User(client=self.client, pk=i)
yield y
def add_user(self, pk):
self.client.add_group_user(group_pk=self.pk, user_pk=pk)
def remove_user(self, pk):
raise NotImplemented('Not yet')
def get_groups(self):
self.load()
l = self.client.get_group_groups(pk=self.pk)
for i in l:
y = Group(client=self.client, pk=i)
yield y
def add_group(self, pk):
self.client.add_group_group(super_pk=self.pk, sub_pk=pk)
def remove_group(self, pk):
raise NotImplemented('Not yet')
def _load_data_attribs(self, data=None):
if data is None:
data = self.client.get_group(pk=self.pk)
self.name = data['name']
self.description = data['description']
self.gidNumber = data['gidNumber']
self.readOnly = data['readOnly']
self.verteiler = data['verteiler']
self.eMailAddress = data['eMailAddress']
self.oe_pk = data['oe']
self.pk = getattr(self, SYMBOLS['GROUP_PK_ATTRIB'])
def _save_data_attribs(self):
raise NotImplementedError('Not supported')
@property
def oe(self):
return Oe(client=self.client, pk=self.oe_pk)
def get_users(self, effective=False):
self.load()
l = self.client.get_group_users(pk=self.pk, effective=effective)
for i in l:
y = User(client=self.client, pk=i)
yield y
def add_user(self, pk):
self.client.add_group_user(group_pk=self.pk, user_pk=pk)
def remove_user(self, pk):
raise NotImplementedError('Not yet')
def get_groups(self):
self.load()
l = self.client.get_group_groups(pk=self.pk)
for i in l:
y = Group(client=self.client, pk=i)
yield y
def add_group(self, pk):
self.client.add_group_group(super_pk=self.pk, sub_pk=pk)
def remove_group(self, pk):
raise NotImplementedError('Not yet')
class Oe(Model):
def _load_data_attribs(self, data=None):
if data is None:
data = self.client.get_oe(pk=self.pk)
self.kurz = data['kurz']
self.lang = data['lang']
self.pk = getattr(self, CONFIG['OE_PK_ATTRIB'])
def _save_data_attribs(self):
raise NotImplemented('Not supported')
def get_groups(self):
self.load()
l = self.client.get_oe_groups(pk=self.pk)
for d in l:
y = Group(client=self.client, data=d)
yield y
def get_users(self):
self.load()
l = self.client.get_oe_users(pk=self.pk)
for d in l:
y = User(client=self.client, data=d)
yield y
def create_group(self, name, description):
self.client.create_group(name, description)
def delete_group(self, pk):
raise NotImplemented('Not yet')
def _load_data_attribs(self, data=None):
if data is None:
data = self.client.get_oe(pk=self.pk)
self.kurz = data['kurz']
self.lang = data['lang']
self.pk = getattr(self, SYMBOLS['OE_PK_ATTRIB'])
def _save_data_attribs(self):
raise NotImplementedError('Not supported')
def get_groups(self):
self.load()
l = self.client.get_oe_groups(pk=self.pk)
for d in l:
y = Group(client=self.client, data=d)
yield y
def get_users(self):
self.load()
l = self.client.get_oe_users(pk=self.pk)
for d in l:
y = User(client=self.client, data=d)
yield y
def create_group(self, name, description):
self.client.create_group(name, description)
def delete_group(self, pk):
raise NotImplementedError('Not yet')
class User(Model):
def _load_data_attribs(self, data=None):
if data is None:
data = self.client.get_user(pk=self.pk)
self.samaccount = data['samaccount']
self.firstname = data['firstname']
self.lastname = data['lastname']
self.oe = Oe(client=self.client, data=data['oe'])
self.pk = getattr(self, CONFIG['USER_PK_ATTRIB'])
def _save_data_attribs(self):
raise NotImplemented('Not supported')
def get_groups(self):
self.load()
l = self.client.get_user_groups(pk=self.pk)
for d in l:
y = Group(client=self.client, data=d)
yield y
def _load_data_attribs(self, data=None):
if data is None:
data = self.client.get_user(pk=self.pk)
self.samaccount = data['samaccount']
self.firstname = data['firstname']
self.lastname = data['lastname']
self.oe = Oe(client=self.client, data=data['oe'])
self.pk = getattr(self, SYMBOLS['USER_PK_ATTRIB'])
def _save_data_attribs(self):
raise NotImplementedError('Not supported')
def get_groups(self):
self.load()
l = self.client.get_user_groups(pk=self.pk)
for d in l:
y = Group(client=self.client, data=d)
yield y
from .clients import RestClient
from .models import Group, Oe, User
class KitGvProvider(object):
def __init__(self, config=None, client=None):
if client is None:
self.client = RestClient(config=config)
else:
self.client = client
def auth(self, username, password):
self.client.config.username = username
self.client.config.password = password
def getOe(self, pk):
return Oe(client=self.client, pk=pk)
def getGroup(self, pk):
return Group(client=self.client, pk=pk)
def newGroup(self, name, description):
raise NotImplementedError("Not sure. Use KitGvProvider().getOe().create_group() instead.")
def getUser(self, pk):
return User(client=self.client, pk=pk)
......@@ -8,7 +8,7 @@ def readme():
setup(
name='python-kitgvapi',
version='0.3.dev0',
version='1.0.dev0',
description=('Python bindings for KIT Gruppenverwaltung REST API'),
long_description=readme(),
author='Jens Kleineheismann',
......
Markdown is supported
0%