Commit da677b59 authored by janis.streib's avatar janis.streib 🦉
Browse files

Clenaup of python middleware

parent a3c21846
Pipeline #164810 passed with stages
in 9 minutes and 35 seconds
from json import JSONEncoder
from flask import Flask, url_for, jsonify
from flask import Flask
from beaker.middleware import SessionMiddleware
from kitnet.lib.netdb import PostgreSQL
import sys
import os
import inspect
import subprocess
from .model import DBObject, MetaDBObject, SimpleKITUser
import importlib
......@@ -26,7 +24,7 @@ session_opts = {
class ModMetaData(object):
def __init__(self, name, version, contact_email, printable_name, mod_path, gitlab_url=None, search_func=None,
search_opts_func=None, is_tool=False):
search_opts_func=None):
self.version = version
self.contact_mail = contact_email
self.gitlab_url = gitlab_url
......@@ -35,7 +33,6 @@ class ModMetaData(object):
self.name = name
self.search_func = search_func
self.search_opts_func = search_opts_func if search_func is not None else None
self.is_tool = is_tool
class CustomJSONEncoder(JSONEncoder):
......@@ -62,6 +59,7 @@ class CustomJSONEncoder(JSONEncoder):
class MyApp(Flask):
pass
# Define the WSGI application object
app = MyApp(__name__)
app.json_encoder = CustomJSONEncoder
......@@ -95,10 +93,11 @@ def load_mods():
mod = importlib.import_module(m['mod'])
MODS.append(mod.METADATA)
app.register_blueprint(blueprint=getattr(mod, m['mod'].split('.')[-1]),
url_prefix='{prefix}{url}'.format(prefix='/tools' if mod.METADATA.is_tool else '',
url=m['url']))
url_prefix='{prefix}{url}'.format(prefix='', url=m['url']))
def finalize_init():
db_conn.close()
from . import views
......@@ -6,60 +6,12 @@ import ipaddress
import requests
from dataclasses import dataclass
class BooleanSearchFilter(object):
def __init__(self, name, default, form_name):
self.name = name
self.default = default
self.form_name = form_name
# Database/general objects
class DBObject(object):
TABLE = None
def get_event_log(self, db, connection, bnd_entity_id, filter_login_name=None, top_n=5):
if self.TABLE is None or self.TABLE.startswith('META'):
raise NotImplementedError()
stmt = """
select
fact.log_ts as timestamp,
evdt.short_name as evlog_dflt_typ_short_name,
evdt.name as evlog_dflt_typ_name,
evdt.confirmed_action as evlog_dflt_typ_conf_action,
cm.login_name as cntl_mgr_login_name,
cm.first_name as cntl_mgr_first_name,
cm.last_name as cntl_mgr_last_name,
cm.email as cntl_mgr_email,
cp.name as cntl_pkg_name,
ct.name as cntl_tab_name,
ct.cont_sing1 as cntl_tab_sing_1_title,
ct.cont_plur1 as cntl_tab_plur1_title,
ct.cont_cmpnd as cntl_tab_cmpnd_title,
fact.log_data as log_data
from evlog.calc_top_n_log_ta(
in_cntl_tab_name => {bnd_entity_name},
in_evlog_dflt_typ_short_name => {bnd_log_typ},
in_cntl_mgr_login_name => {bnd_log_mgr},
in_oltp_pk => (select key_nr from dns.get_range_rec({bnd_entity_id})),
in_top_n => {bnd_top_n}
) AS fact
JOIN evlog_row2dwh dim_row ON fact.dwh_row_key_nr = dim_row.dwh_key_nr
JOIN evlog_dflt_typ2dwh dim_typ ON fact.dwh_typ_key_nr = dim_typ.dwh_key_nr
JOIN cntl_mgr2dwh dim_mgr ON fact.dwh_mgr_key_nr = dim_mgr.dwh_key_nr
JOIN cntl_tab ct ON ct.key_nr = dim_row.oltp_tab_key_nr
JOIN cntl_pkg cp ON cp.key_nr = ct.cntl_pkg_key_nr
JOIN evlog_dflt_typ evdt ON evdt.key_nr = dim_typ.oltp_key_nr
LEFT JOIN cntl_mgr cm ON cm.key_nr = dim_mgr.oltp_key_nr
where fact.evlog_dflt_key_nr is null
order by fact.log_ts desc, ct.name
"""
bindings = {'bnd_log_mgr': None, 'bnd_log_typ': filter_login_name, 'bnd_top_n': top_n,
'bnd_entity_name': self.TABLE,
'bnd_entity_id': bnd_entity_id}
res = db.execute(connection, stmt, bindings)
return [EVLogEntry(mgr=DBMgr(**_marshal_dict(r, DBMgr)), **r) for r in res]
class MetaDBObject(DBObject):
TABLE = "META_"
......@@ -73,37 +25,6 @@ def _marshal_dict(dict, obj: DBObject):
return _marshal_dict_str(dict, obj.TABLE.lower())
class RRType(MetaDBObject):
def __init__(self, name, is_intern):
self.name = name
self.is_intern = is_intern
@staticmethod
def get_all(db, connection):
rr_types = db.execute(connection, "SELECT name, is_intern FROM dns_rr_type order by name", {})
return [RRType(**r) for r in rr_types]
class EVLogEntry(DBObject):
TABLE = 'v_evlog_dflt_top_n'
def __init__(self, timestamp=None, evlog_dflt_typ_short_name=None, evlog_dflt_typ_name=None,
evlog_dflt_typ_conf_action=None, mgr=None, cntl_pkg_name=None, cntl_tab_name=None,
cntl_tab_sing1_title=None, cntl_tab_plur1_title=None, cntl_tab_cmpnd_title=None, log_data=None,
**args):
self.timestamp = timestamp
self.evlog_dflt_typ_short_name = evlog_dflt_typ_short_name
self.evlog_dflt_typ_name = evlog_dflt_typ_name
self.evlog_dflt_typ_conf_action = evlog_dflt_typ_conf_action
self.cntl_pkg_name = cntl_pkg_name
self.cntl_tab_name = cntl_tab_name
self.cntl_tab_sing1_title = cntl_tab_sing1_title
self.cntl_tab_plur1_title = cntl_tab_plur1_title
self.cntl_tab_cmpnd_title = cntl_tab_cmpnd_title
self.log_data = log_data
self.mgr = mgr
class Mgr(object):
is_scc_itb = False
......@@ -145,7 +66,6 @@ class SimpleKITUser(Mgr):
pass
class DBMgr(Mgr, DBObject):
TABLE = 'cntl_mgr'
......@@ -189,12 +109,12 @@ class DBMgr(Mgr, DBObject):
in_report_stmt_pos => false,
in_is_dry_mode => false
);
""".format(dict=json.dumps({'name': 'is_own', 'old_value': True}).replace('{', '{{').replace('}', '}}')).replace(
""".format(
dict=json.dumps({'name': 'is_own', 'old_value': True}).replace('{', '{{').replace('}', '}}')).replace(
'{}', '{{}}')
res = db.execute(connection, query, {'login_name': self.login_name})
return [BCD(**r) for r in json.loads(res[0][0])]
def get_areas(self, db, connection):
res = db.execute(connection,
DBNetArea.FULL_SELECT +
......@@ -292,46 +212,6 @@ SELECT out_token_text as token, out_token_pk as pk FROM cntl.create_sess_auth({l
return APIToken(**res, login_name=self.login_name, description=descr)
class IntType(MetaDBObject):
def __init__(self, name, description, dns_rr_type_name, rrd_auto_dml_fqdn_text=None, **kvargs):
self.name = name
self.description = description
self.dns_rr_type_name = dns_rr_type_name
self.left = self.right = self.right_bits = None
self.is_set = False
self.rrd_auto_dml_fqdn_text = rrd_auto_dml_fqdn_text
if name is not None:
# TODO use dns_rr_def.r.unq instead of split
split = name.split(':')
self.left = split[0] + ':' + split[1]
self.right = split[2]
self.right_bits = self.right.split(',')[1]
self.is_set = len(self.right_bits) > 0 and (self.right_bits[-1] == '0' or self.right_bits[-1] == '2')
def __eq__(self, other):
if type(other) is not IntType:
return False
if other.name is None and self.name is None:
return other.dns_rr_type_name == self.dns_rr_type_name
return other.name == self.name
class FqdnNameType(object):
def __init__(self, name, description, **kvargs):
self.name = name
self.description = description
@staticmethod
def get_all(db, connection):
res = db.execute(connection, """
select
vnt.name, vnt.description
from v_dns_dbnt vnt
""", {})
return [FqdnNameType(**r) for r in res]
class NetArea(object):
def __init__(self, name, suffix, **args):
self.name = name if not name.endswith('/') else name[:-1]
......@@ -367,6 +247,7 @@ class NetArea(object):
def add_mgrs(self):
raise NotImplementedError()
@dataclass
class VLAN(DBObject):
id: int = None
......@@ -379,7 +260,9 @@ class VLAN(DBObject):
@classmethod
def get_by_bcd(cls, db, connection, bcd_name):
res = db.execute(connection, "select nd_vlan.id, nd_bcd.key_nr as bcd, nd_vlan.descr as description, nd_net_instnc.name as net_instnc from nd_vlan inner join nd_bcd on nd_vlan.nd_bcd_key_nr = nd_bcd.key_nr inner join nd_net_instnc on nd_vlan.nd_net_instnc_key_nr = nd_net_instnc.key_nr WHERE nd_bcd.name = {name}", {'name': bcd_name})
res = db.execute(connection,
"select nd_vlan.id, nd_bcd.key_nr as bcd, nd_vlan.descr as description, nd_net_instnc.name as net_instnc from nd_vlan inner join nd_bcd on nd_vlan.nd_bcd_key_nr = nd_bcd.key_nr inner join nd_net_instnc on nd_vlan.nd_net_instnc_key_nr = nd_net_instnc.key_nr WHERE nd_bcd.name = {name}",
{'name': bcd_name})
if len(res) == 0:
return None
return [cls(**r) for r in res]
......@@ -622,313 +505,6 @@ class Net(DBObject):
self.descr = descr
class RR(MetaDBObject):
TABLE = 'meta_rr'
def __init__(self, rr_data, rr_data_unref, dbrt_name, rr_data_target_ipaddr=None, rr_data_target_fqdn=None,
rr_type=None, host_is_nws=None, force_auto_dml=False, **args):
self.rr_data = rr_data
self.force_auto_dml = force_auto_dml
self.rr_data_unref = rr_data_unref
self.rr_data_target = rr_data_target_ipaddr if rr_data_target_fqdn is None else dns.name.from_text(
rr_data_target_fqdn).to_unicode()
self.host_is_nws = host_is_nws
self.inttype = IntType(name=dbrt_name, description=None, dns_rr_type_name=rr_type)
@staticmethod
def get(fqdn, inttype, rr_data, login_name, db, connection):
res = db.execute(connection,
"""
SELECT
(rrt_rec).name as META_rr_rr_type,
(rrt_rec).force_auto_dml as META_rr_force_auto_dml,
host_is_nws as META_rr_host_is_nws,
fqdn as dns_ntree_fqdn,
fqdn_description as dns_ntree_description,
rr_data as META_rr_rr_data,
rr_data_unref as META_rr_rr_data_unref,
rr_data_target_ipaddr as META_rr_rr_data_target_ipaddr,
rr_data_target_fqdn as META_rr_rr_data_target_fqdn,
(dbrt_rec).name as META_rr_dbrt_name
FROM dns.list_rr(in_login_name:={login_name},
in_listopt_global:={global}, in_fqdn:={fqdn}, in_inttype_list:={inttype}) WHERE
rr_data={rr_data}
ORDER BY rr_data_target_ipaddr, META_rr_rr_data""",
{'global': DBMgr.get_by_login_name(login_name=login_name, connection=connection,
db=db).has_permission('dns.ro'),
'login_name': login_name, 'fqdn': fqdn, 'inttype': [inttype],
'rr_data': rr_data})
if len(res) == 0:
return None
return {'NTree': NTree(**_marshal_dict(res[0], NTree)), 'rr': RR(**_marshal_dict(res[0], RR))}
@staticmethod
def get_by_target_fqdn(login_name, target_fqdn, db, connection):
res = db.execute(connection,
"SELECT fqdn as dns_ntree_fqdn, (rrt_rec).name as META_rr_rr_type, rr_data as META_rr_rr_data,"
"(rrt_rec).force_auto_dml as META_rr_force_auto_dml,"
"rr_data_target_ipaddr as META_rr_rr_data_target_ipaddr,"
"(dbrt_rec).name as META_rr_dbrt_name,"
"rr_data_unref as META_rr_rr_data_unref,"
"(select array_agg(name || suffix) from dns_range where key_nr = any(target_range_pk_list)) as META_range_list,"
"host_is_nws as META_rr_host_is_nws, "
"fqdn_description as dns_ntree_description,"
"rr_data_target_fqdn as META_rr_rr_data_target_fqdn, "
"(fqdn_dbnt_rec).name as dns_ntree_dbnt_name FROM "
"dns.list_rr(in_login_name:={login_name}, in_target_fqdn:={target_fqdn}, in_listopt_global:={global}) "
"ORDER BY META_rr_rr_type",
{'login_name': login_name, 'target_fqdn': target_fqdn,
'global': DBMgr.get_by_login_name(login_name=login_name, connection=connection,
db=db).has_permission('dns.ro')})
return [{
'nws_range': (
DBNetArea.get_by_name(print_name=r['meta_range_list'][0], db=db, connection=connection).get_nat_range(
db=db,
connection=connection)
if r['meta_range_list'] is not None and r['meta_rr_host_is_nws'] else None
),
'rr': RR(**_marshal_dict(r, RR)), 'NTree': NTree(**_marshal_dict(r, NTree)),
'ranges': (r['meta_range_list'] if r['meta_range_list'] is not None else list())} for r in res]
@staticmethod
def get_by_fqdn(login_name, fqdn, db, connection):
res = db.execute(connection,
"SELECT fqdn as dns_ntree_fqdn, (rrt_rec).name as META_rr_rr_type,"
"(rrt_rec).force_auto_dml as META_rr_force_auto_dml,"
"rr_data as META_rr_rr_data,"
"rr_data_target_ipaddr as META_rr_rr_data_target_ipaddr,"
"(dbrt_rec).name as META_rr_dbrt_name,"
"host_is_nws as META_rr_host_is_nws, "
"rr_data_unref as META_rr_rr_data_unref,"
"(select array_agg(name || suffix) from dns_range where key_nr = any(target_range_pk_list)) as META_range_list,"
"fqdn_description as dns_ntree_description,"
"rr_data_target_fqdn as META_rr_rr_data_target_fqdn, "
"(fqdn_dbnt_rec).name as dns_ntree_dbnt_name FROM "
"dns.list_rr(in_login_name:={login_name}, in_fqdn:={fqdn}, in_listopt_global:={global}) "
"ORDER BY META_rr_rr_type",
{'login_name': login_name, 'fqdn': fqdn,
'global': DBMgr.get_by_login_name(login_name=login_name, connection=connection,
db=db).has_permission('dns.ro')})
return [{
'nws_range': (
DBNetArea.get_by_name(print_name=r['meta_range_list'][0], db=db, connection=connection).get_nat_range(
db=db,
connection=connection)
if r['meta_range_list'] is not None and r['meta_rr_host_is_nws'] else None
),
'rr': RR(**_marshal_dict(r, RR)), 'NTree': NTree(**_marshal_dict(r, NTree)),
'ranges': (r['meta_range_list'] if r['meta_range_list'] is not None else list())} for r in res]
class NTree(DBObject):
TABLE = "dns_ntree"
def __init__(self, key_nr=None, parent_key_nr=None, parent_fqdn=None, parent_is_d=None, is_d=None, is_a=None,
is_dhcp=None, dbnt_name=None,
is_rad=None, is_wildcard=None, dns_name_type_name=None, name=None, description=None, fqdn=None,
fqdn_hash=None, parent_zone_fqdn_hash=None, dns_zone_key_nr=None, dns_parent_zone_key_nr=None,
hierarchy_pk_list=None, fqdn_sub_count=None, fqdn_has_rr=None, **args):
self.key_nr = key_nr
self.parent_key_nr = parent_key_nr
self.parent_is_d = parent_is_d
self.is_d = is_d
self.is_a = is_a
self.inttype = dbnt_name
self.is_dhcp = is_dhcp
self.is_wildcard = is_wildcard
self.name = name
self.description = description if not description == '' else None
self.fqdn = dns.name.from_text(fqdn).to_unicode() if fqdn else None
self.dns_zone_key_nr = dns_zone_key_nr
self.dns_parent_zone_key_nr = dns_parent_zone_key_nr
self.parent_fqdn = dns.name.from_text(parent_fqdn).to_unicode() if parent_fqdn and not fqdn == '.' else None
self.fqdn_sub_count = fqdn_sub_count
self.fqdn_has_rr = fqdn_has_rr
self.dbnt_name = dbnt_name
def update_fqdn_description(self, descr):
if self.description == descr:
return None
return self.update_fqdn_description_deferrable(old_description=self.description, new_description=descr,
fqdn=self.fqdn)
def update(self, new_fqdn, new_description, new_inttype):
return NTree.update_deferrable(old_fqdn=self.fqdn, new_fqdn=new_fqdn, old_description=self.description,
new_description=new_description, old_inttype=self.inttype,
new_inttype=new_inttype)
class DNSAddr(DBObject):
TABLE = 'dns_addr'
def __init__(self, key_nr=None, ip_addr=None, addr_type=None, is_reserved=None, is_rr_usable=None, is_dhcp=None,
dns_range_key_nr=None, nd_ip_subnet_key_nr=None, **args):
self.key_nr = key_nr
self.ip_addr = ip_addr
self.dns_range_ley_nr = dns_range_key_nr
self.is_reserved = is_reserved
self.is_dhcp = is_dhcp
self.is_rr_usable = is_rr_usable
def __eq__(self, other):
return (type(other) == str and other == str(self.ip_addr)) or (
type(self) == type(other) and self.key_nr == other.key_nr)
class NDPPort(DBObject):
TABLE = 'nd_p_port'
def __init__(self, name, module=None, **kwargs):
self.name = name
self.module = module
@staticmethod
def get_nd_p_port_by_vlan(db, connection, vlan_id, ni_name):
res = db.execute(connection,
"""
select nd_mdl_typ_class.name as root_nd_mdl_typ_class_name, root_typ.name as root_nd_mdl_typ_name, dst.name as nd_p_port_name, root.name as root_nd_mdl_name, mod.name as base_nd_mdl_name,
nd_room.name as nd_room_name, nd_room.nr as nd_room_nr, nd_floor.nr as nd_floor_nr, nd_floor.descr as nd_floor_descr,
nd_bldg.nr as nd_bldg_nr, nd_bldg.name as nd_bldg_name from nd_dev
INNER JOIN dns_ntree on nd_dev.dns_ntree_key_nr=dns_ntree.key_nr
inner join nd_l_port on nd_dev.key_nr=nd_l_port.nd_dev_key_nr
inner join nd_vlan on nd_l_port.nd_vlan_key_nr=nd_vlan.key_nr
inner join nd_net_instnc on nd_vlan.nd_net_instnc_key_nr=nd_net_instnc.key_nr
inner join nd_l2p_port on nd_l_port.key_nr=nd_l2p_port.nd_l_port_key_nr
inner join nd_p_port src on nd_l2p_port.nd_p_port_key_nr = src.key_nr
inner join nd_p_port dst on src.nd_dest_p_port_key_nr=dst.key_nr
inner join nd_mdl mod on dst.nd_mdl_key_nr=mod.key_nr
inner join nd_mdl root on mod.nd_root_mdl_key_nr=root.key_nr
inner join nd_room on root.nd_room_key_nr=nd_room.key_nr
inner join nd_floor on nd_room.nd_floor_key_nr=nd_floor.key_nr
inner join nd_bldg on nd_room.nd_bldg_key_nr=nd_bldg.key_nr
inner join nd_mdl_typ root_typ on root.nd_mdl_typ_key_nr = root_typ.key_nr
inner join nd_mdl_typ_class on root_typ.nd_mdl_typ_class_key_nr=nd_mdl_typ_class.key_nr
where nd_net_instnc.name = {ni_name} and nd_vlan.id = {vlan_id}
""",
{'ni_name': ni_name, 'vlan_id': vlan_id})
if len(res) == 0:
return None
return [NDPPort(
**_marshal_dict(r, NDPPort),
module=NDModule(**_marshal_dict(_marshal_dict_str(r, 'base'), NDModule),
root_module=NDModule(**_marshal_dict(_marshal_dict_str(r, 'root'), NDModule),
room=NDRoom(
**_marshal_dict(r, NDRoom)),
floor=NDFloor(
**_marshal_dict(r, NDFloor)),
building=NDBuilding(
**_marshal_dict(r, NDBuilding)),
module_type=NDModuleType(
**_marshal_dict(_marshal_dict_str(r, 'root'), NDModuleType),
type_class=NDModuleTypeClass(
**_marshal_dict(_marshal_dict_str(r, 'root'),
NDModuleTypeClass)
)
)
),
)
) for r in res]
@staticmethod
def get_nd_p_port_by_dev_fqdn_and_port_name(db, connection, fqdn, port):
res = db.execute(connection,
"""
select nd_mdl_typ_class.name as root_nd_mdl_typ_class_name, root_typ.name as root_nd_mdl_typ_name, dst.name as nd_p_port_name, root.name as root_nd_mdl_name, mod.name as base_nd_mdl_name,
nd_room.name as nd_room_name, nd_room.nr as nd_room_nr, nd_floor.nr as nd_floor_nr, nd_floor.descr as nd_floor_descr,
nd_bldg.nr as nd_bldg_nr, nd_bldg.name as nd_bldg_name from nd_dev
INNER JOIN dns_ntree on nd_dev.dns_ntree_key_nr=dns_ntree.key_nr
inner join nd_l_port on nd_dev.key_nr=nd_l_port.nd_dev_key_nr
inner join nd_l2p_port on nd_l_port.key_nr=nd_l2p_port.nd_l_port_key_nr
inner join nd_p_port src on nd_l2p_port.nd_p_port_key_nr = src.key_nr
inner join nd_p_port dst on src.nd_dest_p_port_key_nr=dst.key_nr
inner join nd_mdl mod on dst.nd_mdl_key_nr=mod.key_nr
inner join nd_mdl root on mod.nd_root_mdl_key_nr=root.key_nr
inner join nd_room on root.nd_room_key_nr=nd_room.key_nr
inner join nd_floor on nd_room.nd_floor_key_nr=nd_floor.key_nr
inner join nd_bldg on nd_room.nd_bldg_key_nr=nd_bldg.key_nr
inner join nd_mdl_typ root_typ on root.nd_mdl_typ_key_nr = root_typ.key_nr
inner join nd_mdl_typ_class on root_typ.nd_mdl_typ_class_key_nr=nd_mdl_typ_class.key_nr
where dns_ntree.fqdn = {fqdn} and nd_l_port.name={port}
""",
{'port': port, 'fqdn': fqdn})
if len(res) == 0:
return None
res = res[0]
return NDPPort(
**_marshal_dict(res, NDPPort),
module=NDModule(**_marshal_dict(_marshal_dict_str(res, 'base'), NDModule),
root_module=NDModule(**_marshal_dict(_marshal_dict_str(res, 'root'), NDModule),
room=NDRoom(
**_marshal_dict(res, NDRoom)),
floor=NDFloor(
**_marshal_dict(res, NDFloor)),
building=NDBuilding(
**_marshal_dict(res, NDBuilding)),
module_type=NDModuleType(
**_marshal_dict(_marshal_dict_str(res, 'root'), NDModuleType),
type_class=NDModuleTypeClass(
**_marshal_dict(_marshal_dict_str(res, 'root'),
NDModuleTypeClass)
)
)
),
)
)
class NDBuilding(DBObject):
TABLE = 'nd_bldg'
def __init__(self, nr, name=None, **kwargs):
self.nr = nr
self.name = name
class NDFloor(DBObject):
TABLE = 'nd_floor'
def __init__(self, nr, descr=None, **kwargs):
self.nr = nr
self.description = descr
class NDRoom(DBObject):
TABLE = 'nd_room'
def __init__(self, nr, name=None, **kwargs):
self.nr = nr
self.name = name
class NDModule(DBObject):
TABLE = 'nd_mdl'
def __init__(self, name, descr=None, root_module=None, room=None, floor=None, building=None, module_type=None,
**kwargs):
self.name = name
self.description = descr
self.root_module = root_module
self.room = room
self.building = building
self.floor = floor
self.module_type = module_type
class NDModuleType(DBObject):
TABLE = 'nd_mdl_typ'
def __init__(self, name, type_class=None, **kwargs):
self.name = name
self.type_class = type_class
class NDModuleTypeClass(DBObject):
TABLE = 'nd_mdl_typ_class'
def __init__(self, name, **kwargs):
self.name = name
class APIToken(MetaDBObject):
def __init__(self, pk=None, token=None, login_name=None, description=None):
......@@ -967,34 +543,3 @@ class APIToken(MetaDBObject):
class VDNSRecords(DBObject):
TABLE = 'pdns_rr'
# API objects
class APINetArea(NetArea):
def __init__(self, name, suffix, facility_short, net):
self.name = name
self.suffix = suffix
self.facility = Facility(short_name=facility_short)
self.net = Net(net=ipaddress.IPv6Network(net) if ':' in net else ipaddress.IPv4Network(net))
class APIMgr(Mgr):
__areas__ = list()
@staticmethod
def get_by_login_name(login_name, api_cert, api_key):
r = requests.get("https://www-net.scc.kit.edu/api/2.0/dns/mgr/list", params={'login_name': login_name},
cert=(api_cert, api_key))
res = r.json()
if len(res) == 0:
return None
mgr = APIMgr(**res[0]['mgr'])
mgr.__areas__ = [
APINetArea(name=r['range'].split('/')[0] + '/', suffix=int(r['range'].split('/')[1]),
net=r['subnet_cidr_spec'],
facility_short=r['ou']) for r in res]
return mgr
def get_areas(self, *args, **kwargs):
return self.__areas__
......@@ -2,7 +2,6 @@ from flask import Blueprint
from . import dhcp_leases_model
from net_suite import app, get_git_version, ModMetaData
# we are assuming, that at the time of loading the app is initializes and the config is loaded
bb_name = 'dhcp_leases'
dhcp_leases = Blueprint(bb_name, __name__, template_folder='templates')
......
......@@ -5,13 +5,14 @@ from . import dhcp_leases_model
from net_suite import db
from net_suite.model import BCD