# django senders need their arguments # pylint: disable=unused-argument import json import logging import requests from requests.auth import HTTPBasicAuth import pika from django.conf import settings from django.contrib.auth.models import AbstractUser, Group from django.core.cache import cache from django.db import models from django.db.models.signals import post_save, pre_delete from django.dispatch import receiver from rest_framework.authtoken.models import Token from .auth.v1.models import OIDCConfig LOGGER = logging.getLogger(__name__) # singleton for simple configs # https://steelkiwi.com/blog/practical-application-singleton-design-pattern/ class SingletonModel(models.Model): class Meta: abstract = True def set_cache(self): cache.set(self.__class__.__name__, self) # pylint: disable=invalid-name, arguments-differ def save(self, *args, **kwargs): self.pk = 1 super(SingletonModel, self).save(*args, **kwargs) self.set_cache() @classmethod def load(cls): if cache.get(cls.__name__) is None: obj, created = cls.objects.get_or_create(pk=1) if not created: obj.set_cache() return cache.get(cls.__name__) # clients are registerred at rabbitmq, when they are assigned to a site # (because we only then know what services they provide) class RabbitMQInstance(SingletonModel): host = models.CharField( max_length=150, default='localhost', ) vhost = models.CharField( max_length=150, default='%2f', ) exchange = models.CharField( max_length=150, default='deployments', ) port = models.IntegerField( default=15672, ) path = models.CharField( max_length=150, default='api', ) username = models.CharField( max_length=150, default='guest', ) password = models.CharField( max_length=150, default='guest', ) is_active = models.BooleanField( default=True, ) def __str__(self): return self.host def _msg(self, msg): return '[RabbitMQ:{}] {}'.format(self.host, msg) @property def auth(self): return HTTPBasicAuth( self.username, self.password, ) @property def _connection_parameters(self): return pika.ConnectionParameters( host=self.host, ssl=True, ) @property def connection(self): LOGGER.debug(self._msg('opened connection')) return pika.BlockingConnection( self._connection_parameters, ) @property def channel(self): rabbitmq_channel = self.connection.channel() rabbitmq_channel.exchange_declare( exchange=self.exchange, durable=True, auto_delete=False, exchange_type='topic', ) rabbitmq_channel.confirm_delivery() LOGGER.debug(self._msg('opened channel')) return rabbitmq_channel def _get_api_uri(self, path): return 'http://{}:{}/{}/{}'.format( self.host, self.port, self.path, path, ) def _rest_get(self, api_path): req = requests.get( self._get_api_uri(api_path), auth=self.auth) req.raise_for_status() return req.json() # send a rest call with path and data to the rest interface of # the rabbitmq instance def _rest_put(self, api_path, data): req = requests.put( self._get_api_uri(api_path), json=data, auth=self.auth) req.raise_for_status() return req def _rest_del(self, api_path): req = requests.delete( self._get_api_uri(api_path), auth=self.auth) req.raise_for_status() return req def _set_topic_permissions(self, site): username = site.client.username path = 'topic-permissions/{}/{}/'.format( self.vhost, username, ) # set permissions for the correct topics # we construct a regex to match the services of the site services = '' omit_bar = True for service in site.services.all(): prefix = '|' if omit_bar: prefix = '' omit_bar = False services = services + prefix + service.name set_topic_permission_data = { 'exchange': self.exchange, 'write': '^$', 'read': r'^service\.({})$'.format(services), } return self._rest_put(path, set_topic_permission_data) # set permissions for the user def _set_permissions(self, site): username = site.client.username path = 'permissions/{}/{}/'.format( self.vhost, username, ) permission = r'^(amq\.gen.*|{})'.format(self.exchange) set_permission_data = { 'configure': permission, 'write': permission, 'read': permission, } return self._rest_put(path, set_permission_data) # create user at the rabbitmq instance def _create_user(self, site): username = site.client.username path = 'users/{}/'.format(username) user_creation_data = { 'password': str(site.client.auth_token.key), 'tags': '', } return self._rest_put(path, user_creation_data) # delete user at the rabbitmq instance def _delete_user(self, site): username = site.client.username path = 'users/{}/'.format(username) return self._rest_del(path) def _disconnect(self): LOGGER.debug(self._msg('closing connection')) self.connection.close() # PUBLIC API def register_site(self, site): self._create_user(site) self._set_permissions(site) self._set_topic_permissions(site) LOGGER.info(self._msg('registered {}'.format(site.client))) def update_site(self, site): self._set_topic_permissions(site) LOGGER.info(self._msg('updated permissions for {}'.format(site.client))) def deregister_site(self, site): # TODO implement LOGGER.info(self._msg('deregistered {}'.format(site.client))) def is_client_connected(self, site): connections = self._rest_get("connections/") clients_for_site = [c for c in connections if c['user'] == site.client.username] return len(clients_for_site) > 0 def online_clients(self, service): return [site for site in service.site.all() if self.is_client_connected(site)] def publish_by_service(self, service, msg): # FIXME publish can fail -> catch error return self.channel.basic_publish( exchange=self.exchange, routing_key=service.routing_key, body=msg, properties=pika.BasicProperties( delivery_mode=1, ), ) class User(AbstractUser): TYPE_CHOICES = ( ('apiclient', 'API-Client'), ('oidcuser', 'OIDC User'), ('admin', 'Admin'), ) user_type = models.CharField( max_length=20, choices=TYPE_CHOICES, default='oidcuser', ) sub = models.CharField( max_length=150, blank=True, null=True, ) password = models.CharField( max_length=150, blank=True, null=True, ) # the real state of the user # (self.is_active is the supposed state of the user) _is_active = models.BooleanField( default=True, editable=False, ) # the idp which authenticated the user idp = models.ForeignKey( OIDCConfig, related_name='users', on_delete=models.CASCADE, ) # we hide deleted keys here # the full list of ssh keys is self._ssh_keys @property def ssh_keys(self): return self._ssh_keys.filter(deleted=False) @property def is_active_at_clients(self): return self._is_active def __str__(self): if self.user_type == 'admin': return 'ADMIN {}'.format(self.username) elif self.user_type == 'oidcuser': if not self.is_active: return 'DEACTIVATED USER {}'.format(self.username) return 'USER {}'.format(self.username) elif self.user_type == 'apiclient': return 'APICLIENT {}@{}'.format(self.username, self.site) else: raise Exception() def _msg(self, msg): return '[{}] {}'.format(self, msg) # oidcuser: withdraw and delete all credentials and delete the user def remove(self): if self.user_type == 'oidcuser': self.deactivate() LOGGER.info(self._msg('Deleting')) # TODO: deleting the user brings problems: # the deletion cascades down to DeploymentTask and DeploymentTaskItem # but these need to be conserved so all clients withdrawals can be tracked self.delete() def activate(self): if self._is_active: LOGGER.error(self._msg('already activated')) return if self.user_type == 'oidcuser': self.is_active = True self._is_active = True self.save() for dep in self.deployments.all(): dep.activate() LOGGER.info(self._msg('activated')) # oidcuser: withdraw all credentials def deactivate(self): if not self._is_active: LOGGER.error(self._msg('already deactivated')) return if self.user_type == 'oidcuser': self.is_active = False self._is_active = False self.save() for dep in self.deployments.all(): dep.deactivate() LOGGER.info(self._msg('deactivated')) def construct_user(user_info): return User( sub=user_info['sub'], name=user_info['name'], first_name=user_info['given_name'], last_name=user_info['family_name'], email=user_info['email'], username=user_info['email'], ) class Site(models.Model): client = models.OneToOneField( User, related_name='site', ) name = models.CharField(max_length=150, unique=True) description = models.TextField(max_length=300, blank=True) def __str__(self): return self.name # tasks which are still to be executed on this site @property def tasks(self): return [item.task for item in self.task_items.all()] class Service(models.Model): name = models.CharField(max_length=150, unique=True) description = models.TextField(max_length=300, blank=True) site = models.ManyToManyField( Site, related_name='services') groups = models.ManyToManyField( Group, related_name='services', blank=True) @property def routing_key(self): return 'service.{}'.format(self.name) def __str__(self): return self.name class SSHPublicKey(models.Model): name = models.CharField( max_length=150, unique=True, ) key = models.TextField( max_length=1000 ) # hidden field at the user user = models.ForeignKey( User, related_name='_ssh_keys', ) # has the user triggered the deletion of this key deleted = models.BooleanField( default=False, editable=False, ) def _msg(self, msg): return '[SSHPublicKey:{}] {}'.format(self, msg) # does not directly delete the key if the key is deployed or withdrawn # somewhere # the receiver 'delete_withdrawn_ssh_key' does the actual deletion def delete_key(self): if (not self.tasks.exists() and not self.deployments.exists()): LOGGER.info(self._msg('Direct deletion of key')) self.delete() return LOGGER.info(self._msg('Deletion of key started')) self.deleted = True self.save() # delete implies withdrawing the key from all clients for deployment in self.deployments.all(): deployment.withdraw_key(self) # when a key is withdrawn by a client we try to finally delete it def try_final_deletion(self): if (self.deleted and not self.tasks.exists()): LOGGER.info(self._msg( 'All clients have withdrawn this key. Final deletion')) self.delete() return def __str__(self): if self.deleted: return "DELETED: {}".format(self.name) return self.name # Deployment describes the credential state per user as it is supposed to be # # (exception: if is_active=False the ssh_keys contain the keys to be deployed # if the deployment is reactivated) # # DeploymentTask is what is sent to the clients via rabbitmq # The DeploymentTaskItem track the acknowledgements from the clients class Deployment(models.Model): user = models.ForeignKey( User, related_name='deployments', on_delete=models.CASCADE, ) service = models.ForeignKey( Service, related_name='deployments', on_delete=models.CASCADE, ) ssh_keys = models.ManyToManyField( SSHPublicKey, related_name='deployments', blank=True, ) ssh_keys_to_withdraw = models.ManyToManyField( SSHPublicKey, related_name='withdrawn_deployments', blank=True, ) is_active = models.BooleanField( default=True, ) @property def withdrawals(self): return self.tasks.filter(action='withdraw') @property def deploys(self): return self.tasks.filter(action='deploy') def __str__(self): return '{}:{}'.format(self.service, self.user) def _msg(self, msg): return '[Deployment:{}] {}'.format(self, msg) # deploy credentials which were deployed prior to deactivation def activate(self): if self.is_active: LOGGER.error(self._msg('already active')) return LOGGER.debug(self._msg(str(self.ssh_keys.all()))) for key in self.ssh_keys.all(): self._deploy_key(key) self.is_active = True self.save() LOGGER.info(self._msg('activated')) # withdraw all credentials def deactivate(self): if not self.is_active: LOGGER.error(self._msg('already deactivated')) return self.is_active = False self.save() for key in self.ssh_keys.all(): self._withdraw_key(key) LOGGER.info(self._msg('deactivated')) # only deploy the key def _deploy_key(self, key): # delete outstanding tasks which are made obsolete by this task for withdrawal in self.withdrawals.filter(key=key): LOGGER.debug(withdrawal._msg('now obsolete')) withdrawal.delete() # generate task task = DeploymentTask( action='deploy', deployment=self, key=key, ) task.save() LOGGER.debug(task._msg('generated')) # generate task items for site in self.service.site.all(): deploy = DeploymentTaskItem( task=task, site=site, ) deploy.save() LOGGER.debug(deploy._msg('generated')) # publish the task task.publish() def _withdraw_key(self, key): # delete outstanding tasks which are made obsolete by this task for deploy in self.deploys.filter(key=key): LOGGER.debug(deploy._msg("now obsolete")) deploy.delete() # generate task task = DeploymentTask( action='withdraw', deployment=self, key=key, ) task.save() LOGGER.debug(task._msg('generated')) # generate task items for site in self.service.site.all(): withdrawal = DeploymentTaskItem( task=task, site=site, ) withdrawal.save() LOGGER.debug(withdrawal._msg('generated')) # publish the task task.publish() # deploy key and track changes in the key lists def deploy_key(self, key): if not self.is_active: LOGGER.error(self._msg('cannot deploy while deactivated')) raise Exception('deployment deactivated') self.ssh_keys.add(key) if key in self.ssh_keys_to_withdraw.all(): self.ssh_keys_to_withdraw.remove(key) self.save() self._deploy_key(key) # withdraw key and track changes in the key lists def withdraw_key(self, key): if not self.is_active: LOGGER.error(self._msg('cannot withdraw while deactivated')) raise Exception('deployment deactivated') self.ssh_keys.remove(key) # keys which are to be withdrawn by the clients self.ssh_keys_to_withdraw.add(key) self.save() self._withdraw_key(key) class DeploymentTask(models.Model): ACTION_CHOICES = ( ('deploy', 'deploy'), ('withdraw', 'withdraw'), ) action = models.CharField( max_length=10, choices=ACTION_CHOICES, ) key = models.ForeignKey( SSHPublicKey, related_name='tasks', on_delete=models.CASCADE, ) deployment = models.ForeignKey( Deployment, related_name='tasks', on_delete=models.CASCADE, ) @property def user(self): return self.deployment.user @property def service(self): return self.deployment.service def __str__(self): return "{}:{}:{} - {}".format( self.deployment.service, self.deployment.user, self.key, self.action, ) def _msg(self, msg): return '[DeploymentTask:{}] {}'.format(self, msg) def publish(self): # FIXME mitigating circular dependencies here from .clientapi.serializers import DeploymentTaskSerializer msg = json.dumps(DeploymentTaskSerializer(self).data) # FIXME select the rabbitmq instance more meaningful RabbitMQInstance.load().publish_by_service( self.service, msg, ) # the client acked the receipt and execution of the task for his site def item_finished(self, site): item = self.task_items.get(site=site) LOGGER.debug(item._msg('done')) item.delete() if not self.task_items.exists(): self.finished() # maintenance after all task items are done def finished(self): LOGGER.info(self._msg('done')) self.delete() # check if this was the final withdraw in a key deletion if self.action == 'withdraw': self.key.try_final_deletion() class DeploymentTaskItem(models.Model): task = models.ForeignKey( DeploymentTask, related_name='task_items', on_delete=models.CASCADE, ) site = models.ForeignKey( Site, related_name='task_items', on_delete=models.CASCADE, ) def __str__(self): return "{}@{}".format( self.task, self.site, ) def _msg(self, msg): return '[DeploymentTaskItem:{}] {}'.format(self, msg) # # RECEIVERS # @receiver(post_save, sender=settings.AUTH_USER_MODEL) def create_auth_token(sender, instance=None, created=False, **kwargs): if instance.user_type == 'apiclient' and created: Token.objects.create(user=instance) @receiver(post_save, sender=Site) def register_at_rabbitmq(sender, instance=None, created=False, **kwargs): if not created: return RabbitMQInstance().register_site(instance) @receiver(pre_delete, sender=Site) def deregister_at_rabbitmq(sender, instance=None, **kwargs): RabbitMQInstance().deregister_site(instance) @receiver(post_save, sender=User) def deactivate_user(sender, instance=None, created=False, **kwargs): if created: return if not instance.is_active and instance.is_active_at_clients: instance.deactivate() @receiver(post_save, sender=User) def activate_user(sender, instance=None, created=False, **kwargs): if created: return if instance.is_active and not instance.is_active_at_clients: instance.activate()