Commit b67ffe3b authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Restructure and add ack semantics

parent e43ab85a
......@@ -40,3 +40,15 @@ class DeploymentsSerializer(serializers.Serializer):
child=DeploymentSerializer()
)
)
class DeploymentTaskSerializer(serializers.Serializer):
id = serializers.IntegerField()
action = serializers.CharField()
user = UserSerializer()
service = ServiceSerializer()
key = frontend_serializers.SSHPublicKeySerializer()
class SiteSerializer(serializers.Serializer):
tasks = DeploymentTaskSerializer(many=True)
......@@ -5,7 +5,7 @@ from django.conf.urls import url
from . import views
urlpatterns = [
# url(r'^', include(router.urls)),
url(r'^deployments', views.DeploymentsView.as_view()),
url(r'^config', views.ConfigurationView.as_view()),
url(r'^ack/(?P<id>\d+)/', views.AckView.as_view()),
]
from rest_framework import generics
from rest_framework import generics, views, status
from rest_framework.authentication import TokenAuthentication
from rest_framework.response import Response
from ..models import rabbitmq_instance
from . import serializers, models
from . import serializers
# authentication class for the client api
authentication_classes = (TokenAuthentication, )
# has an optional url parameter called 'all' which can be set to 'true'
class DeploymentsView(generics.RetrieveAPIView):
authentication_classes = authentication_classes
serializer_class = serializers.DeploymentsSerializer
serializer_class = serializers.SiteSerializer
def get_object(self):
all_services = False
if ('all' in self.request.query_params and
self.request.query_params['all'] == 'true'):
all_services = True
# the client may want only a special selection of services
if 's' in self.request.query_params:
filter_services = self.request.query_params.getlist('s')
else:
filter_services = None
d = models.Deployments()
d.services = self.request.user.site.clientapi_get_deployments(
all=all_services,
filter=filter_services,
)
return d
return self.request.user.site
# the client has to fetch the configuration (like services etc.) here
......@@ -46,3 +29,16 @@ class ConfigurationView(generics.ListAPIView):
# client can access all of his services, even new ones
rabbitmq_instance().update_site(site)
return site.services.all()
class AckView(views.APIView):
authentication_classes = authentication_classes
def delete(self, request, id=None, format=None):
# find the corresponding task for this item
for item in request.user.site.task_items.all():
if item.task.id == int(id):
item.task.item_finished(request.user.site)
return Response({'ok': True})
return Response({'ok': False}, status=status.HTTP_400_BAD_REQUEST)
......@@ -18,6 +18,7 @@ deployment_change = Signal(providing_args=['instance'])
# clients are registerred at rabbitmq, when they are assigned to a site
# (because we only then know what services they provide)
class RabbitMQInstance(models.Model):
host = models.CharField(
max_length=150,
default='localhost',
......@@ -70,6 +71,35 @@ class RabbitMQInstance(models.Model):
def vhost(self):
return '%2f'
# singletons
rabbitmq_connection = None
rabbitmq_channel = None
@property
def connection(self):
if self.rabbitmq_connection is None:
rabbitmqconnection_properties = pika.ConnectionParameters(
host=self.host,
ssl=True,
)
self.rabbitmq_connection = pika.BlockingConnection(
rabbitmqconnection_properties)
# self.print("Opened connection")
return self.rabbitmq_connection
@property
def channel(self):
if self.rabbitmq_channel is None:
self.rabbitmq_channel = self.connection.channel()
self.rabbitmq_channel.exchange_declare(
exchange=self.exchange,
durable=True,
exchange_type='topic')
self.rabbitmq_channel.confirm_delivery()
# self.print("Opened channel")
return self.rabbitmq_channel
def get_uri(self, path):
return '{}/{}'.format(self.api, path)
......@@ -169,14 +199,13 @@ class RabbitMQInstance(models.Model):
self.set_topic_permissions(site)
def update_site(self, site):
self.print('update: {} / {}'.format(
site.name, site.client.username))
self.set_topic_permissions(site)
self.print('updated permissions for {}'.format(
print_client(site.client)))
def deregister_site(self, site):
self.print('deregister: {} / {}'.format(
site.name, site.client.username))
self.delete_user(site)
self.print('deregistered {}'.format(
print_client(site.client)))
def is_client_connected(self, site):
connections = self.rest_get("connections/")
......@@ -185,21 +214,8 @@ class RabbitMQInstance(models.Model):
if c['user'] == site.client.username]
return len(clients_for_site) > 0
def connect(self):
self.connection_properties = pika.ConnectionParameters(
host=self.host,
ssl=True,
)
self.connection = pika.BlockingConnection(self.connection_properties)
self.channel = self.connection.channel()
self.channel.exchange_declare(
exchange=self.exchange,
durable=True,
exchange_type='topic')
self.channel.confirm_delivery()
def disconnect(self):
self.print("closing connection")
self.connection.close()
def service_routing_key(self, service):
......@@ -210,44 +226,18 @@ class RabbitMQInstance(models.Model):
for site in service.site.all()
if self.is_client_connected(site)]
def publish_by_service(self, service, message):
online_clients = self.online_clients(service)
if online_clients:
self.print("Online clients for service {}: {}".format(
service,
online_clients,
))
self.connect()
self.print('Sent deployment update for service {}'.format(
service
))
# True if all the clients acked the message
delivery_confirmed = self.channel.basic_publish(
exchange=self.exchange,
routing_key=self.service_routing_key(service),
body=message,
properties=pika.BasicProperties(
delivery_mode=1,
),
)
if delivery_confirmed:
self.print('Delivery confirmed')
else:
self.print('Delivery unconfirmed')
self.disconnect()
else:
self.print("No clients online for service {}".format(
service
))
delivery_confirmed = False
return delivery_confirmed, online_clients
def publish_by_service(self, service, msg):
return self.channel.basic_publish(
exchange=self.exchange,
routing_key=self.service_routing_key(service),
body=msg,
properties=pika.BasicProperties(
delivery_mode=1,
),
)
# TODO dirty
def rabbitmq_instance():
return RabbitMQInstance.objects.get(enabled=True)
......@@ -273,6 +263,13 @@ class User(AbstractUser):
return self._ssh_keys.filter(deleted=False)
def print_client(user):
return '{}@{}'.format(
user.username,
user.site,
)
def construct_user(user_info):
return User(
sub=user_info['sub'],
......@@ -295,44 +292,12 @@ class Site(models.Model):
def __str__(self):
return self.name
def ack_update(self, service=None):
if service is None:
for du in self.deployment_updates.all():
du.done()
else:
for du in self.deployment_updates.all():
if du.service.name == service.name:
du.done()
def clientapi_get_deployments(self, all=False, filter=None):
def service_deployments(service):
if all:
return service.deployments.all()
else:
ds = [deployment_update.deployment
for deployment_update
in self.deployment_updates.all()
if deployment_update.service.name == service.name]
# TODO inappropriate
self.ack_update(service=service)
return ds
if filter is not None:
services = [s
for s
in self.services.all()
if s.name in filter]
else:
services = self.services.all()
deployments = {service.name: service_deployments(service)
for service
in services}
# changed deployments for this site
# filtered by service name using filter parameter
return deployments
# tasks which are still to be executed on this site
@property
def tasks(self):
return [item.task
for item
in self.task_items.all()]
class Service(models.Model):
......@@ -364,15 +329,21 @@ class SSHPublicKey(models.Model):
editable=False,
)
def print(self, msg):
print('[SSHPublicKey:{}] {}'.format(self, msg))
# does not directly delete the key if the key is deployed or withdrawn
# somewhere
# the receiver 'delete_withdrawn_ssh_key' does the actual deletion
def delete_key(self):
if (not self.deployments.exists()
if (not self.task_items.exists()
and not self.deployments.exists()
and not self.withdrawn_deployments.exists()):
self.print('Direct deletion of key')
self.delete()
return
self.print('Deletion of key started')
self.deleted = True
self.save()
......@@ -383,8 +354,8 @@ class SSHPublicKey(models.Model):
# when a key is withdrawn by a client we try to finally delete it
def try_final_deletion(self):
if (self.deleted
and not self.deployments.exists()
and not self.withdrawn_deployments.exists()):
and not self.task_items.exists()):
self.print('All clients have withdrawn this key. Final deletion')
self.delete()
return
......@@ -394,6 +365,9 @@ class SSHPublicKey(models.Model):
return self.name
# Deployment describes the credential state per user as it is supposed to be
# DeploymentTask is what is sent to the clients via rabbitmq
# The DeploymentTaskItem track the acknowledgements from the clients
class Deployment(models.Model):
user = models.ForeignKey(
User,
......@@ -419,78 +393,100 @@ class Deployment(models.Model):
blank=True,
)
last_change = models.DateTimeField(
editable=False,
default=make_aware(datetime.utcfromtimestamp(0)),
)
def __str__(self):
return '{}@{}'.format(self.user, self.service)
def changed(self):
print("Deployment {} changed".format(self))
# delete old deployment_updates and generate new ones
for du in self.deployment_updates.all():
du.delete()
@property
def withdrawals(self):
return self.tasks.filter(action='withdraw')
for site in self.service.site.all():
du = DeploymentUpdate(
site=site,
deployment=self,
)
du.save()
@property
def deploys(self):
return self.tasks.filter(action='deploy')
self.last_change = make_aware(datetime.now())
self.save()
def __str__(self):
return '{}:{}'.format(self.service, self.user)
def deploy_key(self, key):
# key state: -> (2.5)
self.ssh_keys.add(key)
if key in self.ssh_keys_to_withdraw.all():
self.ssh_keys_to_withdraw.remove(key)
self.save()
self.changed()
self.send_change()
# and delete outstanding tasks which are made obsolete by this task
for withdrawal in self.withdrawals.filter(key=key):
withdrawal.delete_msg("now obsolete")
# generate task
task = DeploymentTask(
action='deploy',
deployment=self,
key=key,
)
task.save()
task.print("generated")
# generate task items
for site in self.service.site.all():
deploy = DeploymentTaskItem(
task=task,
site=site,
)
deploy.save()
deploy.print("generated")
# publish the task
task.publish()
def withdraw_key(self, key):
# key state: -> (4)
self.ssh_keys.remove(key)
# keys which are to be withdrawn by the clients
self.ssh_keys_to_withdraw.add(key)
self.save()
self.changed()
self.send_change()
def update(self):
# if there are still remaining updates we do not change anything
if self.deployment_updates.exists():
return
# and delete outstanding tasks which are made obsolete by this task
for deploy in self.deploys.filter(key=key):
deploy.delete_msg("now obsolete")
withdrawn_keys = list(self.ssh_keys_to_withdraw.all())
# the client has withdrawn the keys so we can empty the list
self.ssh_keys_to_withdraw.clear()
for key in withdrawn_keys:
key.try_final_deletion()
# generate task
task = DeploymentTask(
action='withdraw',
deployment=self,
key=key,
)
task.save()
task.print("generated")
self.save()
# generate task items
for site in self.service.site.all():
withdrawal = DeploymentTaskItem(
task=task,
site=site,
)
withdrawal.save()
withdrawal.print("generated")
def send_change(self):
deployment_change.send(sender=self.__class__, instance=self)
# publish the task
task.publish()
class DeploymentUpdate(models.Model):
class DeploymentTask(models.Model):
ACTION_CHOICES = (
('deploy', 'deploy'),
('withdraw', 'withdraw'),
)
action = models.CharField(
max_length=10,
choices=ACTION_CHOICES,
)
key = models.ForeignKey(
SSHPublicKey,
related_name='tasks',
on_delete=models.CASCADE,
)
deployment = models.ForeignKey(
Deployment,
related_name='deployment_updates',
)
site = models.ForeignKey(
Site,
related_name='deployment_updates',
related_name='tasks',
on_delete=models.CASCADE,
)
@property
......@@ -501,18 +497,83 @@ class DeploymentUpdate(models.Model):
def service(self):
return self.deployment.service
# to be called if all clients have received the update
def done(self):
print("[DeploymentUpdate:{}:{}:{}] done".format(
self.service,
self.site,
self.user,
def __str__(self):
return "{}:{}:{}:{} - {}".format(
self.id,
self.deployment.service,
self.deployment.user,
self.key,
self.action,
)
def print(self, msg):
print("[DeploymentTask:{}] {}".format(
self,
msg,
))
def publish(self):
from .clientapi.serializers import DeploymentTaskSerializer
msg = json.dumps(DeploymentTaskSerializer(self).data)
rabbitmq_instance().publish_by_service(
self.service,
msg)
# used for e.g. "done", "now obsolete"
def delete_msg(self, msg):
self.print(msg)
self.delete()
# the client acked the receipt and execution of the task for his site
def item_finished(self, site):
self.task_items.get(site=site).delete_msg("done")
if not self.task_items.exists():
self.finished()
# maintenance after all task items are done
def finished(self):
self.print("done")
# check if this was the final withdraw in a key deletion
if self.action == 'withdraw':
self.key.try_final_deletion()
class DeploymentTaskItem(models.Model):
task = models.ForeignKey(
DeploymentTask,
related_name='task_items',
on_delete=models.CASCADE,
)
site = models.ForeignKey(
Site,
related_name='task_items',
on_delete=models.CASCADE,
)
def __str__(self):
return str(self.user) + ':' + str(self.service) + '@' + str(self.site)
return "{}@{}".format(
self.task,
self.site,
)
def print(self, msg):
print("[DeploymentTaskItem:{}] {}".format(
self,
msg,
))
# used for e.g. "done", "now obsolete"
def delete_msg(self, msg):
self.print(msg)
self.delete()
#
# RECEIVERS
#
@receiver(post_save, sender=settings.AUTH_USER_MODEL)
def create_auth_token(sender, instance=None, created=False, **kwargs):
......@@ -534,23 +595,3 @@ def deregister_at_rabbitmq(
sender, instance=None, **kwargs):
RabbitMQInstance().deregister_site(instance)
@receiver(deployment_change, sender=Deployment)
def publish_deployment(sender, instance=None, created=False, **kwargs):
if instance is None:
return
from .clientapi.serializers import DeploymentSerializer
message = json.dumps(DeploymentSerializer(instance).data)
confirmed, online_sites = rabbitmq_instance().publish_by_service(
instance.service, message)
if confirmed:
# delivery successful
for site in online_sites:
print('Client for site {} got update for service {}'.format(
site,
instance.service,
))
site.ack_update(service=instance.service)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment