from django.contrib.auth.models import AbstractUser, Group from django.db import models from django.conf import settings from django.dispatch import receiver, Signal from django.utils.timezone import make_aware from rest_framework.authtoken.models import Token from django.db.models.signals import post_save, pre_delete # from django.db.models.signals import m2m_changed from datetime import datetime import requests from requests.auth import HTTPBasicAuth import pika import json deployment_change = Signal(providing_args=['instance']) # clients are registerred at rabbitmq, when they are assigned to a site # (because we only then know what services they provide) class RabbitMQInstance(models.Model): host = models.CharField( max_length=150, default='localhost', ) exchange = models.CharField( max_length=150, default='deployments', ) port = models.IntegerField( default=15672 ) path = models.CharField( max_length=150, default='api', ) username = models.CharField( max_length=150, default='guest', ) password = models.CharField( max_length=150, default='guest', ) enabled = models.BooleanField( default=False, ) def __str__(self): return self.host def print(self, msg): print('[RabbitMQ:{}] {}'.format(self.host, msg)) @property def api(self): return 'http://{}:{}/{}'.format( self.host, self.port, self.path, ) @property def auth(self): return HTTPBasicAuth( self.username, self.password ) @property def vhost(self): return '%2f' def get_uri(self, path): return '{}/{}'.format(self.api, path) def rest_get(self, api_path): r = requests.get( self.get_uri(api_path), auth=self.auth) r.raise_for_status() return r.json() # send a rest call with path and data to the rest interface of # the rabbitmq instance def rest_put(self, api_path, data): r = requests.put( self.get_uri(api_path), json=data, auth=self.auth) r.raise_for_status() return r def rest_del(self, api_path): r = requests.delete( self.get_uri(api_path), auth=self.auth) r.raise_for_status() return r def set_topic_permissions(self, site): username = site.client.username path = 'topic-permissions/{}/{}/'.format( self.vhost, username, ) # set permissions for the correct topics # we construct a regex to match the services of the site services = '' omitBar = True for service in site.services.all(): prefix = '|' if omitBar: prefix = '' omitBar = False services = services + prefix + service.name set_topic_permission_data = { 'exchange': self.exchange, 'write': '^$', 'read': '^service\.({})$'.format(services), } return self.rest_put(path, set_topic_permission_data) # set permissions for the user def set_permissions(self, site): username = site.client.username path = 'permissions/{}/{}/'.format( self.vhost, username, ) permission = '^(amq\.gen.*|{})'.format(self.exchange) set_permission_data = { 'configure': permission, 'write': permission, 'read': permission, } return self.rest_put(path, set_permission_data) # create user at the rabbitmq instance def create_user(self, site): username = site.client.username path = 'users/{}/'.format(username) user_creation_data = { 'password': str(site.client.auth_token.key), 'tags': '', } return self.rest_put(path, user_creation_data) # delete user at the rabbitmq instance def delete_user(self, site): username = site.client.username path = 'users/{}/'.format(username) return self.rest_del(path) # PUBLIC API def register_site(self, site): self.print('register: {} / {}'.format( site.name, site.client.username)) self.create_user(site) self.set_permissions(site) self.set_topic_permissions(site) def update_site(self, site): self.print('update: {} / {}'.format( site.name, site.client.username)) self.set_topic_permissions(site) def deregister_site(self, site): self.print('deregister: {} / {}'.format( site.name, site.client.username)) self.delete_user(site) def is_client_connected(self, site): connections = self.rest_get("connections/") clients_for_site = [c for c in connections if c['user'] == site.client.username] return len(clients_for_site) > 0 def connect(self): self.connection_properties = pika.ConnectionParameters( host=self.host, ssl=True, ) self.connection = pika.BlockingConnection(self.connection_properties) self.channel = self.connection.channel() self.channel.exchange_declare( exchange=self.exchange, durable=True, exchange_type='topic') self.channel.confirm_delivery() def disconnect(self): self.connection.close() def service_routing_key(self, service): return 'service.' + service.name def online_clients(self, service): return [site for site in service.site.all() if self.is_client_connected(site)] def publish_by_service(self, service, message): online_clients = self.online_clients(service) if online_clients: self.print("Online clients for service {}: {}".format( service, online_clients, )) self.connect() self.print('Sent deployment update for service {}'.format( service )) # True if all the clients acked the message delivery_confirmed = self.channel.basic_publish( exchange=self.exchange, routing_key=self.service_routing_key(service), body=message, properties=pika.BasicProperties( delivery_mode=1, ), ) if delivery_confirmed: self.print('Delivery confirmed') else: self.print('Delivery unconfirmed') self.disconnect() else: self.print("No clients online for service {}".format( service )) delivery_confirmed = False return delivery_confirmed, online_clients def rabbitmq_instance(): return RabbitMQInstance.objects.get(enabled=True) class User(AbstractUser): TYPE_CHOICES = ( ('apiclient', 'API-Client'), ('oidcuser', 'OIDC User'), ('admin', 'Admin'), ) user_type = models.CharField( max_length=20, choices=TYPE_CHOICES, default='oidcuser', ) sub = models.CharField(max_length=150, blank=True, null=True) password = models.CharField(max_length=150, blank=True, null=True) # we hide deleted keys here # the full list of ssh keys is at self._ssh_keys @property def ssh_keys(self): return self._ssh_keys.filter(deleted=False) def construct_user(user_info): return User( sub=user_info['sub'], name=user_info['name'], first_name=user_info['given_name'], last_name=user_info['family_name'], email=user_info['email'], username=user_info['email'], ) class Site(models.Model): client = models.OneToOneField( User, related_name='site', ) name = models.CharField(max_length=150, unique=True) description = models.TextField(max_length=300, blank=True) def __str__(self): return self.name def ack_update(self, service=None): if service is None: for du in self.deployment_updates.all(): du.delete() else: for deployment_update in self.deployment_updates.all(): if deployment_update.service.name == service.name: deployment_update.delete() def clientapi_get_deployments(self, all=False, filter=None): def service_deployments(service): if all: return service.deployments.all() else: ds = [deployment_update.deployment for deployment_update in self.deployment_updates.all() if deployment_update.service.name == service.name] # TODO is this apropriate? self.ack_update(service=service) return ds if filter is not None: services = [s for s in self.services.all() if s.name in filter] else: services = self.services.all() deployments = {service.name: service_deployments(service) for service in services} # changed deployments for this site # filtered by service name using filter parameter return deployments class Service(models.Model): name = models.CharField(max_length=150, unique=True) description = models.TextField(max_length=300, blank=True) site = models.ManyToManyField( Site, related_name='services') groups = models.ManyToManyField( Group, related_name='services', blank=True) def __str__(self): return self.name class SSHPublicKey(models.Model): name = models.CharField(max_length=150, unique=True) key = models.TextField(max_length=1000) # hidden field at the user user = models.ForeignKey( User, related_name='_ssh_keys') # has the user triggered the deletion of this key deleted = models.BooleanField( default=False, editable=False, ) # does not directly delete the key if the key is deployed or withdrawn # somewhere # the receiver 'delete_withdrawn_ssh_key' does the actual deletion def delete_key(self): if (not self.deployments.exists() and not self.withdrawn_deployments.exists()): self.delete() return self.deleted = True self.save() # delete implies withdrawing the key from all clients for deployment in self.deployments.all(): deployment.withdraw_key(self) # when a key is withdrawn by a client we try to finally delete it def try_final_deletion(self): if (self.deleted and not self.deployments.exists() and not self.withdrawn_deployments.exists()): self.delete() return def __str__(self): if self.deleted: return "DELETED: {}".format(self.name) return self.name class Deployment(models.Model): user = models.ForeignKey( User, related_name='deployments', on_delete=models.CASCADE, ) service = models.ForeignKey( Service, related_name='deployments', on_delete=models.CASCADE, ) # SET_NULL: we allow credentials to be deleted after deployment ssh_keys = models.ManyToManyField( SSHPublicKey, related_name='deployments', blank=True, ) # these ssh keys are to be withdrawn by the clients ssh_keys_to_withdraw = models.ManyToManyField( SSHPublicKey, related_name='withdrawn_deployments', blank=True, ) last_change = models.DateTimeField( editable=False, default=make_aware(datetime.utcfromtimestamp(0)), ) def __str__(self): return '{}@{}'.format(self.user, self.service) def changed(self): print("Deployment {} changed".format(self)) # delete old deployment_updates and generate new ones for du in self.deployment_updates.all(): du.delete() for site in self.service.site.all(): du = DeploymentUpdate( site=site, deployment=self, ) du.save() self.last_change = make_aware(datetime.now()) self.save() def deploy_key(self, key): # key state: -> (2.5) self.ssh_keys.add(key) if key in self.ssh_keys_to_withdraw.all(): self.ssh_keys_to_withdraw.remove(key) self.save() self.changed() self.send_change() def withdraw_key(self, key): # key state: -> (4) self.ssh_keys.remove(key) # keys which are to be withdrawn by the clients self.ssh_keys_to_withdraw.add(key) self.save() self.changed() self.send_change() def update(self): # if there are still remaining updates we do not change anything if self.deployment_updates.exists(): return withdrawn_keys = list(self.ssh_keys_to_withdraw.all()) # the client has withdrawn the keys so we can empty the list self.ssh_keys_to_withdraw.clear() for key in withdrawn_keys: key.try_final_deletion() self.save() def send_change(self): deployment_change.send(sender=self.__class__, instance=self) class DeploymentUpdate(models.Model): deployment = models.ForeignKey( Deployment, related_name='deployment_updates', ) site = models.ForeignKey( Site, related_name='deployment_updates', ) @property def user(self): return self.deployment.user @property def service(self): return self.deployment.service def __str__(self): return str(self.user) + ':' + str(self.service) + '@' + str(self.site) @receiver(post_save, sender=settings.AUTH_USER_MODEL) def create_auth_token(sender, instance=None, created=False, **kwargs): if instance.user_type == 'apiclient' and created: Token.objects.create(user=instance) @receiver(post_save, sender=Site) def register_at_rabbitmq( sender, instance=None, created=False, **kwargs): if not created: return RabbitMQInstance().register_site(instance) @receiver(pre_delete, sender=Site) def deregister_at_rabbitmq( sender, instance=None, **kwargs): RabbitMQInstance().deregister_site(instance) @receiver(deployment_change, sender=Deployment) def publish_deployment(sender, instance=None, created=False, **kwargs): if instance is None: return from .clientapi.serializers import DeploymentSerializer message = json.dumps(DeploymentSerializer(instance).data) confirmed, online_sites = rabbitmq_instance().publish_by_service( instance.service, message) if confirmed: # delivery successful for site in online_sites: print('Client for site {} got update for service {}'.format( site, instance.service, )) site.ack_update(service=instance.service)