Commit 89d739b9 authored by Lukas Burgey's avatar Lukas Burgey

Restructure RabbitMQInstance

parent f4167cb8
......@@ -2,7 +2,7 @@
from rest_framework import generics, views, status
from rest_framework.authentication import TokenAuthentication
from rest_framework.response import Response
from ..models import RABBITMQ_INSTANCE
from ..models import default_rabbitmq_instance
from .serializers import SiteSerializer, ServiceSerializer
# authentication class for the client api
......@@ -27,7 +27,7 @@ class ConfigurationView(generics.ListAPIView):
# we update the rabbitmq permission here, so the
# client can access all of his services, even new ones
RABBITMQ_INSTANCE.update_site(site)
default_rabbitmq_instance().update_site(site)
return site.services.all()
......
......@@ -24,12 +24,16 @@ class RabbitMQInstance(models.Model):
max_length=150,
default='localhost',
)
vhost = models.CharField(
max_length=150,
default='%2f',
)
exchange = models.CharField(
max_length=150,
default='deployments',
)
port = models.IntegerField(
default=15672
default=15672,
)
path = models.CharField(
max_length=150,
......@@ -50,93 +54,77 @@ class RabbitMQInstance(models.Model):
def __str__(self):
return self.host
def msg(self, msg):
def _msg(self, msg):
return '[RabbitMQ:{}] {}'.format(self.host, msg)
@property
def auth(self):
return HTTPBasicAuth(
self.username,
self.password
self.password,
)
@property
def vhost(self):
return '%2f'
# singletons
rabbitmq_connection = None
rabbitmq_channel = None
def _connection_parameters(self):
return pika.ConnectionParameters(
host=self.host,
ssl=True,
)
@property
def connection(self):
if (
self.rabbitmq_connection is None
or self.rabbitmq_connection.is_closed
or self.rabbitmq_connection.is_closing
):
rabbitmqconnection_properties = pika.ConnectionParameters(
host=self.host,
ssl=True,
)
self.rabbitmq_connection = pika.BlockingConnection(
rabbitmqconnection_properties
)
LOGGER.debug(self.msg('opened connection'))
return self.rabbitmq_connection
LOGGER.debug(self._msg('opened connection'))
return pika.BlockingConnection(
self._connection_parameters,
)
@property
def channel(self):
if (
self.rabbitmq_channel is None
or self.rabbitmq_channel.is_closed
or self.rabbitmq_channel.is_closing
):
self.rabbitmq_channel = self.connection.channel()
self.rabbitmq_channel.exchange_declare(
exchange=self.exchange,
durable=True,
exchange_type='topic')
self.rabbitmq_channel.confirm_delivery()
LOGGER.debug(self.msg('opened channel'))
return self.rabbitmq_channel
def get_uri(self, path):
api = 'http://{}:{}/{}'.format(
rabbitmq_channel = self.connection.channel()
rabbitmq_channel.exchange_declare(
exchange=self.exchange,
durable=True,
auto_delete=False,
exchange_type='topic',
)
rabbitmq_channel.confirm_delivery()
LOGGER.debug(self._msg('opened channel'))
return rabbitmq_channel
def _get_api_uri(self, path):
return 'http://{}:{}/{}/{}'.format(
self.host,
self.port,
self.path,
path,
)
return '{}/{}'.format(api, path)
def rest_get(self, api_path):
def _rest_get(self, api_path):
req = requests.get(
self.get_uri(api_path),
self._get_api_uri(api_path),
auth=self.auth)
req.raise_for_status()
return req.json()
# send a rest call with path and data to the rest interface of
# the rabbitmq instance
def rest_put(self, api_path, data):
def _rest_put(self, api_path, data):
req = requests.put(
self.get_uri(api_path),
self._get_api_uri(api_path),
json=data,
auth=self.auth)
req.raise_for_status()
return req
def rest_del(self, api_path):
def _rest_del(self, api_path):
req = requests.delete(
self.get_uri(api_path),
self._get_api_uri(api_path),
auth=self.auth)
req.raise_for_status()
return req
def set_topic_permissions(self, site):
def _set_topic_permissions(self, site):
username = site.client.username
path = 'topic-permissions/{}/{}/'.format(
self.vhost,
......@@ -161,10 +149,10 @@ class RabbitMQInstance(models.Model):
'read': r'^service\.({})$'.format(services),
}
return self.rest_put(path, set_topic_permission_data)
return self._rest_put(path, set_topic_permission_data)
# set permissions for the user
def set_permissions(self, site):
def _set_permissions(self, site):
username = site.client.username
path = 'permissions/{}/{}/'.format(
self.vhost,
......@@ -177,10 +165,10 @@ class RabbitMQInstance(models.Model):
'read': permission,
}
return self.rest_put(path, set_permission_data)
return self._rest_put(path, set_permission_data)
# create user at the rabbitmq instance
def create_user(self, site):
def _create_user(self, site):
username = site.client.username
path = 'users/{}/'.format(username)
......@@ -189,41 +177,42 @@ class RabbitMQInstance(models.Model):
'tags': '',
}
return self.rest_put(path, user_creation_data)
return self._rest_put(path, user_creation_data)
# delete user at the rabbitmq instance
def delete_user(self, site):
def _delete_user(self, site):
username = site.client.username
path = 'users/{}/'.format(username)
return self.rest_del(path)
return self._rest_del(path)
def _disconnect(self):
LOGGER.debug(self._msg('closing connection'))
self.connection.close()
# PUBLIC API
def register_site(self, site):
self.create_user(site)
self.set_permissions(site)
self.set_topic_permissions(site)
LOGGER.info(self.msg('registered {}'.format(site.client)))
self._create_user(site)
self._set_permissions(site)
self._set_topic_permissions(site)
LOGGER.info(self._msg('registered {}'.format(site.client)))
def update_site(self, site):
self.set_topic_permissions(site)
LOGGER.info(self.msg('updated permissions for {}'.format(site.client)))
self._set_topic_permissions(site)
LOGGER.info(self._msg('updated permissions for {}'.format(site.client)))
def deregister_site(self, site):
LOGGER.info(self.msg('deregistered {}'.format(site.client)))
# TODO implement
LOGGER.info(self._msg('deregistered {}'.format(site.client)))
def is_client_connected(self, site):
connections = self.rest_get("connections/")
connections = self._rest_get("connections/")
clients_for_site = [c
for c in connections
if c['user'] == site.client.username]
return len(clients_for_site) > 0
def disconnect(self):
LOGGER.debug(self.msg('closing connection'))
self.connection.close()
def online_clients(self, service):
return [site
for site in service.site.all()
......@@ -240,7 +229,8 @@ class RabbitMQInstance(models.Model):
)
RABBITMQ_INSTANCE = RabbitMQInstance.objects.filter(is_active=True).first()
def default_rabbitmq_instance():
return RabbitMQInstance.objects.filter(is_active=True).first()
class User(AbstractUser):
......@@ -299,14 +289,14 @@ class User(AbstractUser):
else:
raise Exception()
def msg(self, msg):
def _msg(self, msg):
return '[{}] {}'.format(self, msg)
# oidcuser: withdraw and delete all credentials and delete the user
def remove(self):
if self.user_type == 'oidcuser':
self.deactivate()
LOGGER.info(self.msg('Deleting'))
LOGGER.info(self._msg('Deleting'))
# TODO: deleting the user brings problems:
# the deletion cascades down to DeploymentTask and DeploymentTaskItem
......@@ -315,7 +305,7 @@ class User(AbstractUser):
def activate(self):
if self._is_active:
LOGGER.error(self.msg('already activated'))
LOGGER.error(self._msg('already activated'))
return
if self.user_type == 'oidcuser':
......@@ -326,12 +316,12 @@ class User(AbstractUser):
for dep in self.deployments.all():
dep.activate()
LOGGER.info(self.msg('activated'))
LOGGER.info(self._msg('activated'))
# oidcuser: withdraw all credentials
def deactivate(self):
if not self._is_active:
LOGGER.error(self.msg('already deactivated'))
LOGGER.error(self._msg('already deactivated'))
return
if self.user_type == 'oidcuser':
......@@ -342,7 +332,7 @@ class User(AbstractUser):
for dep in self.deployments.all():
dep.deactivate()
LOGGER.info(self.msg('deactivated'))
LOGGER.info(self._msg('deactivated'))
def construct_user(user_info):
......@@ -414,7 +404,7 @@ class SSHPublicKey(models.Model):
editable=False,
)
def msg(self, msg):
def _msg(self, msg):
return '[SSHPublicKey:{}] {}'.format(self, msg)
# does not directly delete the key if the key is deployed or withdrawn
......@@ -422,11 +412,11 @@ class SSHPublicKey(models.Model):
# the receiver 'delete_withdrawn_ssh_key' does the actual deletion
def delete_key(self):
if (not self.tasks.exists() and not self.deployments.exists()):
LOGGER.info(self.msg('Direct deletion of key'))
LOGGER.info(self._msg('Direct deletion of key'))
self.delete()
return
LOGGER.info(self.msg('Deletion of key started'))
LOGGER.info(self._msg('Deletion of key started'))
self.deleted = True
self.save()
......@@ -437,7 +427,7 @@ class SSHPublicKey(models.Model):
# when a key is withdrawn by a client we try to finally delete it
def try_final_deletion(self):
if (self.deleted and not self.tasks.exists()):
LOGGER.info(self.msg(
LOGGER.info(self._msg(
'All clients have withdrawn this key. Final deletion'))
self.delete()
return
......@@ -491,27 +481,27 @@ class Deployment(models.Model):
def __str__(self):
return '{}:{}'.format(self.service, self.user)
def msg(self, msg):
def _msg(self, msg):
return '[Deployment:{}] {}'.format(self, msg)
# deploy credentials which were deployed prior to deactivation
def activate(self):
if self.is_active:
LOGGER.error(self.msg('already active'))
LOGGER.error(self._msg('already active'))
return
LOGGER.debug(self.msg(str(self.ssh_keys.all())))
LOGGER.debug(self._msg(str(self.ssh_keys.all())))
for key in self.ssh_keys.all():
self._deploy_key(key)
self.is_active = True
self.save()
LOGGER.info(self.msg('activated'))
LOGGER.info(self._msg('activated'))
# withdraw all credentials
def deactivate(self):
if not self.is_active:
LOGGER.error(self.msg('already deactivated'))
LOGGER.error(self._msg('already deactivated'))
return
self.is_active = False
......@@ -520,13 +510,13 @@ class Deployment(models.Model):
for key in self.ssh_keys.all():
self._withdraw_key(key)
LOGGER.info(self.msg('deactivated'))
LOGGER.info(self._msg('deactivated'))
# only deploy the key
def _deploy_key(self, key):
# delete outstanding tasks which are made obsolete by this task
for withdrawal in self.withdrawals.filter(key=key):
LOGGER.debug(withdrawal.msg('now obsolete'))
LOGGER.debug(withdrawal._msg('now obsolete'))
withdrawal.delete()
# generate task
......@@ -536,7 +526,7 @@ class Deployment(models.Model):
key=key,
)
task.save()
LOGGER.debug(task.msg('generated'))
LOGGER.debug(task._msg('generated'))
# generate task items
for site in self.service.site.all():
......@@ -545,7 +535,7 @@ class Deployment(models.Model):
site=site,
)
deploy.save()
LOGGER.debug(deploy.msg('generated'))
LOGGER.debug(deploy._msg('generated'))
# publish the task
task.publish()
......@@ -553,7 +543,7 @@ class Deployment(models.Model):
def _withdraw_key(self, key):
# delete outstanding tasks which are made obsolete by this task
for deploy in self.deploys.filter(key=key):
LOGGER.debug(deploy.msg("now obsolete"))
LOGGER.debug(deploy._msg("now obsolete"))
deploy.delete()
# generate task
......@@ -563,7 +553,7 @@ class Deployment(models.Model):
key=key,
)
task.save()
LOGGER.debug(task.msg('generated'))
LOGGER.debug(task._msg('generated'))
# generate task items
for site in self.service.site.all():
......@@ -572,7 +562,7 @@ class Deployment(models.Model):
site=site,
)
withdrawal.save()
LOGGER.debug(withdrawal.msg('generated'))
LOGGER.debug(withdrawal._msg('generated'))
# publish the task
task.publish()
......@@ -580,7 +570,7 @@ class Deployment(models.Model):
# deploy key and track changes in the key lists
def deploy_key(self, key):
if not self.is_active:
LOGGER.error(self.msg('cannot deploy while deactivated'))
LOGGER.error(self._msg('cannot deploy while deactivated'))
raise Exception('deployment deactivated')
self.ssh_keys.add(key)
......@@ -594,7 +584,7 @@ class Deployment(models.Model):
# withdraw key and track changes in the key lists
def withdraw_key(self, key):
if not self.is_active:
LOGGER.error(self.msg('cannot withdraw while deactivated'))
LOGGER.error(self._msg('cannot withdraw while deactivated'))
raise Exception('deployment deactivated')
self.ssh_keys.remove(key)
......@@ -642,7 +632,7 @@ class DeploymentTask(models.Model):
self.action,
)
def msg(self, msg):
def _msg(self, msg):
return '[DeploymentTask:{}] {}'.format(self, msg)
def publish(self):
......@@ -650,7 +640,8 @@ class DeploymentTask(models.Model):
from .clientapi.serializers import DeploymentTaskSerializer
msg = json.dumps(DeploymentTaskSerializer(self).data)
RABBITMQ_INSTANCE.publish_by_service(
# FIXME select the rabbitmq instance more meaningful
default_rabbitmq_instance().publish_by_service(
self.service,
msg,
)
......@@ -658,7 +649,7 @@ class DeploymentTask(models.Model):
# the client acked the receipt and execution of the task for his site
def item_finished(self, site):
item = self.task_items.get(site=site)
LOGGER.debug(item.msg('done'))
LOGGER.debug(item._msg('done'))
item.delete()
if not self.task_items.exists():
......@@ -666,7 +657,7 @@ class DeploymentTask(models.Model):
# maintenance after all task items are done
def finished(self):
LOGGER.info(self.msg('done'))
LOGGER.info(self._msg('done'))
self.delete()
# check if this was the final withdraw in a key deletion
......@@ -692,7 +683,7 @@ class DeploymentTaskItem(models.Model):
self.site,
)
def msg(self, msg):
def _msg(self, msg):
return '[DeploymentTaskItem:{}] {}'.format(self, msg)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment