from django.conf import settings from django.contrib.auth.models import AbstractUser, Group from django.db import models from django.db.models.signals import post_save, pre_delete from django.dispatch import receiver from requests.auth import HTTPBasicAuth from rest_framework.authtoken.models import Token import json import pika import requests from .logging import get_logger logger = get_logger(__name__) # clients are registerred at rabbitmq, when they are assigned to a site # (because we only then know what services they provide) class RabbitMQInstance(models.Model): host = models.CharField( max_length=150, default='localhost', ) exchange = models.CharField( max_length=150, default='deployments', ) port = models.IntegerField( default=15672 ) path = models.CharField( max_length=150, default='api', ) username = models.CharField( max_length=150, default='guest', ) password = models.CharField( max_length=150, default='guest', ) is_active = models.BooleanField( default=True, ) def __str__(self): return self.host def msg(self, msg): return '[RabbitMQ:{}] {}'.format(self.host, msg) @property def api(self): return 'http://{}:{}/{}'.format( self.host, self.port, self.path, ) @property def auth(self): return HTTPBasicAuth( self.username, self.password ) @property def vhost(self): return '%2f' # singletons rabbitmq_connection = None rabbitmq_channel = None @property def connection(self): if self.rabbitmq_connection is None: rabbitmqconnection_properties = pika.ConnectionParameters( host=self.host, ssl=True, ) self.rabbitmq_connection = pika.BlockingConnection( rabbitmqconnection_properties ) logger.debug(self.msg('opened connection')) return self.rabbitmq_connection @property def channel(self): if self.rabbitmq_channel is None: self.rabbitmq_channel = self.connection.channel() self.rabbitmq_channel.exchange_declare( exchange=self.exchange, durable=True, exchange_type='topic') self.rabbitmq_channel.confirm_delivery() logger.debug(self.msg('opened channel')) return self.rabbitmq_channel def get_uri(self, path): return '{}/{}'.format(self.api, path) def rest_get(self, api_path): r = requests.get( self.get_uri(api_path), auth=self.auth) r.raise_for_status() return r.json() # send a rest call with path and data to the rest interface of # the rabbitmq instance def rest_put(self, api_path, data): r = requests.put( self.get_uri(api_path), json=data, auth=self.auth) r.raise_for_status() return r def rest_del(self, api_path): r = requests.delete( self.get_uri(api_path), auth=self.auth) r.raise_for_status() return r def set_topic_permissions(self, site): username = site.client.username path = 'topic-permissions/{}/{}/'.format( self.vhost, username, ) # set permissions for the correct topics # we construct a regex to match the services of the site services = '' omitBar = True for service in site.services.all(): prefix = '|' if omitBar: prefix = '' omitBar = False services = services + prefix + service.name set_topic_permission_data = { 'exchange': self.exchange, 'write': '^$', 'read': '^service\.({})$'.format(services), } return self.rest_put(path, set_topic_permission_data) # set permissions for the user def set_permissions(self, site): username = site.client.username path = 'permissions/{}/{}/'.format( self.vhost, username, ) permission = '^(amq\.gen.*|{})'.format(self.exchange) set_permission_data = { 'configure': permission, 'write': permission, 'read': permission, } return self.rest_put(path, set_permission_data) # create user at the rabbitmq instance def create_user(self, site): username = site.client.username path = 'users/{}/'.format(username) user_creation_data = { 'password': str(site.client.auth_token.key), 'tags': '', } return self.rest_put(path, user_creation_data) # delete user at the rabbitmq instance def delete_user(self, site): username = site.client.username path = 'users/{}/'.format(username) return self.rest_del(path) # PUBLIC API def register_site(self, site): self.create_user(site) self.set_permissions(site) self.set_topic_permissions(site) logger.info(self.msg('registered {}'.format(site.client))) def update_site(self, site): self.set_topic_permissions(site) logger.info(self.msg('updated permissions for {}'.format(site.client))) def deregister_site(self, site): logger.info(self.msg('deregistered {}'.format(site.client))) def is_client_connected(self, site): connections = self.rest_get("connections/") clients_for_site = [c for c in connections if c['user'] == site.client.username] return len(clients_for_site) > 0 def disconnect(self): logger.debug(self.msg('closing connection')) self.connection.close() def service_routing_key(self, service): return 'service.' + service.name def online_clients(self, service): return [site for site in service.site.all() if self.is_client_connected(site)] def publish_by_service(self, service, msg): return self.channel.basic_publish( exchange=self.exchange, routing_key=self.service_routing_key(service), body=msg, properties=pika.BasicProperties( delivery_mode=1, ), ) def rabbitmq_instance(): return RabbitMQInstance.objects.get(is_active=True) class User(AbstractUser): TYPE_CHOICES = ( ('apiclient', 'API-Client'), ('oidcuser', 'OIDC User'), ('admin', 'Admin'), ) user_type = models.CharField( max_length=20, choices=TYPE_CHOICES, default='oidcuser', ) sub = models.CharField( max_length=150, blank=True, null=True, ) password = models.CharField( max_length=150, blank=True, null=True, ) # the real state of the user # (self.is_active is the supposed state of the user) _is_active = models.BooleanField( default=True, editable=False, ) # we hide deleted keys here # the full list of ssh keys is self._ssh_keys @property def ssh_keys(self): return self._ssh_keys.filter(deleted=False) def __str__(self): if self.user_type == 'admin': return 'ADMIN {}'.format(self.username) if self.user_type == 'oidcuser': if not self.is_active: return 'DEACTIVATED USER {}'.format(self.username) return 'USER {}'.format(self.username) if self.user_type == 'apiclient': return 'APICLIENT {}@{}'.format(self.username, self.site) def msg(self, msg): return '[{}] {}'.format(self, msg) # oidcuser: withdraw and delete all credentials and delete the user def remove(self): if self.user_type == 'oidcuser': self.deactivate() logger.info(self.msg('Deleting')) # TODO: deleting the user brings problems: # the deletion cascades down to DeploymentTask and DeploymentTaskItem # but these need to be conserved so all clients withdrawals can be tracked self.delete() def activate(self): if self._is_active: logger.error(self.msg('already activated')) return if self.user_type == 'oidcuser': self.is_active = True self._is_active = True self.save() for dep in self.deployments.all(): dep.activate() logger.info(self.msg('activated')) # oidcuser: withdraw all credentials def deactivate(self): if not self._is_active: logger.error(self.msg('already deactivated')) return if self.user_type == 'oidcuser': self.is_active = False self._is_active = False self.save() for dep in self.deployments.all(): dep.deactivate() logger.info(self.msg('deactivated')) def construct_user(user_info): return User( sub=user_info['sub'], name=user_info['name'], first_name=user_info['given_name'], last_name=user_info['family_name'], email=user_info['email'], username=user_info['email'], ) class Site(models.Model): client = models.OneToOneField( User, related_name='site', ) name = models.CharField(max_length=150, unique=True) description = models.TextField(max_length=300, blank=True) def __str__(self): return self.name # tasks which are still to be executed on this site @property def tasks(self): return [item.task for item in self.task_items.all()] class Service(models.Model): name = models.CharField(max_length=150, unique=True) description = models.TextField(max_length=300, blank=True) site = models.ManyToManyField( Site, related_name='services') groups = models.ManyToManyField( Group, related_name='services', blank=True) def __str__(self): return self.name class SSHPublicKey(models.Model): name = models.CharField( max_length=150, unique=True, ) key = models.TextField( max_length=1000 ) # hidden field at the user user = models.ForeignKey( User, related_name='_ssh_keys', ) # has the user triggered the deletion of this key deleted = models.BooleanField( default=False, editable=False, ) def msg(self, msg): return '[SSHPublicKey:{}] {}'.format(self, msg) # does not directly delete the key if the key is deployed or withdrawn # somewhere # the receiver 'delete_withdrawn_ssh_key' does the actual deletion def delete_key(self): if (not self.tasks.exists() and not self.deployments.exists()): logger.info(self.msg('Direct deletion of key')) self.delete() return logger.info(self.msg('Deletion of key started')) self.deleted = True self.save() # delete implies withdrawing the key from all clients for deployment in self.deployments.all(): deployment.withdraw_key(self) # when a key is withdrawn by a client we try to finally delete it def try_final_deletion(self): if (self.deleted and not self.tasks.exists()): logger.info(self.msg( 'All clients have withdrawn this key. Final deletion')) self.delete() return def __str__(self): if self.deleted: return "DELETED: {}".format(self.name) return self.name # Deployment describes the credential state per user as it is supposed to be # # (exception: if is_active=False the ssh_keys contain the keys to be deployed # if the deployment is reactivated) # # DeploymentTask is what is sent to the clients via rabbitmq # The DeploymentTaskItem track the acknowledgements from the clients class Deployment(models.Model): user = models.ForeignKey( User, related_name='deployments', on_delete=models.CASCADE, ) service = models.ForeignKey( Service, related_name='deployments', on_delete=models.CASCADE, ) ssh_keys = models.ManyToManyField( SSHPublicKey, related_name='deployments', blank=True, ) ssh_keys_to_withdraw = models.ManyToManyField( SSHPublicKey, related_name='withdrawn_deployments', blank=True, ) is_active = models.BooleanField( default=True, ) @property def withdrawals(self): return self.tasks.filter(action='withdraw') @property def deploys(self): return self.tasks.filter(action='deploy') def __str__(self): return '{}:{}'.format(self.service, self.user) def msg(self, msg): return '[Deployment:{}] {}'.format(self, msg) # deploy credentials which were deployed prior to deactivation def activate(self): if self.is_active: logger.error(self.msg('already active')) return logger.debug(self.msg(str(self.ssh_keys.all()))) for key in self.ssh_keys.all(): self._deploy_key(key) self.is_active = True self.save() logger.info(self.msg('activated')) # withdraw all credentials def deactivate(self): if not self.is_active: logger.error(self.msg('already deactivated')) return self.is_active = False self.save() for key in self.ssh_keys.all(): self._withdraw_key(key) logger.info(self.msg('deactivated')) # only deploy the key def _deploy_key(self, key): # delete outstanding tasks which are made obsolete by this task for withdrawal in self.withdrawals.filter(key=key): logger.debug(withdrawal.msg('now obsolete')) withdrawal.delete() # generate task task = DeploymentTask( action='deploy', deployment=self, key=key, ) task.save() logger.debug(task.msg('generated')) # generate task items for site in self.service.site.all(): deploy = DeploymentTaskItem( task=task, site=site, ) deploy.save() logger.debug(deploy.msg('generated')) # publish the task task.publish() def _withdraw_key(self, key): # delete outstanding tasks which are made obsolete by this task for deploy in self.deploys.filter(key=key): logger.debug(deploy.msg("now obsolete")) deploy.delete() # generate task task = DeploymentTask( action='withdraw', deployment=self, key=key, ) task.save() logger.debug(task.msg('generated')) # generate task items for site in self.service.site.all(): withdrawal = DeploymentTaskItem( task=task, site=site, ) withdrawal.save() logger.debug(withdrawal.msg('generated')) # publish the task task.publish() # deploy key and track changes in the key lists def deploy_key(self, key): if not self.is_active: logger.error(self.msg('cannot deploy while deactivated')) raise Exception('deployment deactivated') self.ssh_keys.add(key) if key in self.ssh_keys_to_withdraw.all(): self.ssh_keys_to_withdraw.remove(key) self.save() self._deploy_key(key) # withdraw key and track changes in the key lists def withdraw_key(self, key): if not self.is_active: logger.error(self.msg('cannot withdraw while deactivated')) raise Exception('deployment deactivated') self.ssh_keys.remove(key) # keys which are to be withdrawn by the clients self.ssh_keys_to_withdraw.add(key) self.save() self._withdraw_key(key) class DeploymentTask(models.Model): ACTION_CHOICES = ( ('deploy', 'deploy'), ('withdraw', 'withdraw'), ) action = models.CharField( max_length=10, choices=ACTION_CHOICES, ) key = models.ForeignKey( SSHPublicKey, related_name='tasks', on_delete=models.CASCADE, ) deployment = models.ForeignKey( Deployment, related_name='tasks', on_delete=models.CASCADE, ) @property def user(self): return self.deployment.user @property def service(self): return self.deployment.service def __str__(self): return "{}:{}:{} - {}".format( self.deployment.service, self.deployment.user, self.key, self.action, ) def msg(self, msg): return '[DeploymentTask:{}] {}'.format(self, msg) def publish(self): from .clientapi.serializers import DeploymentTaskSerializer msg = json.dumps(DeploymentTaskSerializer(self).data) rabbitmq_instance().publish_by_service( self.service, msg, ) # the client acked the receipt and execution of the task for his site def item_finished(self, site): item = self.task_items.get(site=site) logger.debug(item.msg('done')) item.delete() if not self.task_items.exists(): self.finished() # maintenance after all task items are done def finished(self): logger.info(self.msg('done')) self.delete() # check if this was the final withdraw in a key deletion if self.action == 'withdraw': self.key.try_final_deletion() class DeploymentTaskItem(models.Model): task = models.ForeignKey( DeploymentTask, related_name='task_items', on_delete=models.CASCADE, ) site = models.ForeignKey( Site, related_name='task_items', on_delete=models.CASCADE, ) def __str__(self): return "{}@{}".format( self.task, self.site, ) def msg(self, msg): return '[DeploymentTaskItem:{}] {}'.format(self, msg) # # RECEIVERS # @receiver(post_save, sender=settings.AUTH_USER_MODEL) def create_auth_token(sender, instance=None, created=False, **kwargs): if instance.user_type == 'apiclient' and created: Token.objects.create(user=instance) @receiver(post_save, sender=Site) def register_at_rabbitmq(sender, instance=None, created=False, **kwargs): if not created: return RabbitMQInstance().register_site(instance) @receiver(pre_delete, sender=Site) def deregister_at_rabbitmq( sender, instance=None, **kwargs): RabbitMQInstance().deregister_site(instance) @receiver(post_save, sender=User) def deactivate_user(sender, instance=None, created=False, **kwargs): if created: return if not instance.is_active and instance._is_active: instance.deactivate() @receiver(post_save, sender=User) def activate_user(sender, instance=None, created=False, **kwargs): if created: return if instance.is_active and not instance._is_active: instance.activate()