Commit f123a8cf authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Fix ACKs when no client is connected

parent 00a1f04b
...@@ -21,6 +21,7 @@ def publish_deployment(sender, instance=None, created=False, **kwargs): ...@@ -21,6 +21,7 @@ def publish_deployment(sender, instance=None, created=False, **kwargs):
instance.service, message): instance.service, message):
# delivery successful # delivery successful
instance.service.site.client_updated() instance.service.site.client_updated()
print('client updated') print('Client got update for service {}'.format(instance.service))
else: else:
print('client update failed') print('Client failed to receive update for service {}'.format(
instance.service))
import pika import pika
from ..rabbitmq import RabbitMQInstance
class PubSubConnection: class PubSubConnection:
...@@ -33,10 +34,18 @@ class PubSubConnection: ...@@ -33,10 +34,18 @@ class PubSubConnection:
def service_routing_key(self, service): def service_routing_key(self, service):
return 'service.' + service.name return 'service.' + service.name
def is_client_connected(self, client):
return False
def publish_by_service(self, service, message): def publish_by_service(self, service, message):
if not RabbitMQInstance().is_client_connected(service.site):
print('Client for service {} is not online'.format(service))
return False
print('Client for service {} is online'.format(service))
self.connect() self.connect()
print(' [x] Sent {}:{}'.format( print('Sent deployment update for service {}'.format(service))
self.service_routing_key(service), message))
# True if the client acked the message # True if the client acked the message
delivery_confirmed = self.channel.basic_publish( delivery_confirmed = self.channel.basic_publish(
......
...@@ -66,35 +66,41 @@ class Site(models.Model): ...@@ -66,35 +66,41 @@ class Site(models.Model):
return self.name return self.name
def client_updated(self): def client_updated(self):
print("Client of site {} got updated".format(self.name))
self.last_fetch = make_aware(datetime.now()) self.last_fetch = make_aware(datetime.now())
self.save() self.save()
def clientapi_get_deployments(self, all=False): def clientapi_get_deployments(self, all=False):
services = {} def service_deployments(service):
if all:
for service in self.services.all(): ds = (
ds = ( service.deployments
service.deployments # only oidcuser are supposed to have deployments
# only oidcuser are supposed to have deployments .filter(user__user_type='oidcuser')
.filter(user__user_type='oidcuser') )
# we do not exclude deployments without ssh_keys, as the else:
# ssh_keys_to_withdraw still need to be propagated ds = (
# .exclude(ssh_keys=None) service.deployments
) # only oidcuser are supposed to have deployments
print(ds) .filter(user__user_type='oidcuser')
.filter(last_change__gte=self.last_fetch)
if not all: )
ds = ds.filter(
last_change__gt=self.last_fetch)
for deployment in ds: for deployment in ds:
# TODO replace this optimism with an acknowledgement # TODO replace this optimism with an acknowledgement
deployment.client_updated() deployment.client_updated()
# deployments for this site return ds
services[service.name] = ds
return services deployments = {service.name: service_deployments(service)
for service
in self.services.all()}
# TODO we expect the client to get the update here
self.client_updated()
# deployments of the services of this site
return deployments
@receiver(post_save, sender=Site) @receiver(post_save, sender=Site)
...@@ -198,12 +204,18 @@ class Deployment(models.Model): ...@@ -198,12 +204,18 @@ class Deployment(models.Model):
) )
last_change = models.DateTimeField( last_change = models.DateTimeField(
auto_now=True editable=False,
default=make_aware(datetime.utcfromtimestamp(0)),
) )
def __str__(self): def __str__(self):
return '{}@{}'.format(self.user, self.service) return '{}@{}'.format(self.user, self.service)
def changed(self):
print("Deployment {} changed".format(self))
self.last_change = make_aware(datetime.now())
self.save()
def deploy_key(self, key): def deploy_key(self, key):
# key state: -> (2.5) # key state: -> (2.5)
self.ssh_keys.add(key) self.ssh_keys.add(key)
...@@ -211,6 +223,7 @@ class Deployment(models.Model): ...@@ -211,6 +223,7 @@ class Deployment(models.Model):
if key in self.ssh_keys_to_withdraw.all(): if key in self.ssh_keys_to_withdraw.all():
self.ssh_keys_to_withdraw.remove(key) self.ssh_keys_to_withdraw.remove(key)
self.save() self.save()
self.changed()
self.send_change() self.send_change()
def withdraw_key(self, key): def withdraw_key(self, key):
...@@ -220,6 +233,7 @@ class Deployment(models.Model): ...@@ -220,6 +233,7 @@ class Deployment(models.Model):
# keys which are to be withdrawn by the clients # keys which are to be withdrawn by the clients
self.ssh_keys_to_withdraw.add(key) self.ssh_keys_to_withdraw.add(key)
self.save() self.save()
self.changed()
self.send_change() self.send_change()
def client_updated(self): def client_updated(self):
...@@ -235,5 +249,3 @@ class Deployment(models.Model): ...@@ -235,5 +249,3 @@ class Deployment(models.Model):
def send_change(self): def send_change(self):
deployment_change.send(sender=self.__class__, instance=self) deployment_change.send(sender=self.__class__, instance=self)
...@@ -17,10 +17,33 @@ class RabbitMQInstance: ...@@ -17,10 +17,33 @@ class RabbitMQInstance:
def get_uri(self, path): def get_uri(self, path):
return '{}/{}'.format(self.api, path) return '{}/{}'.format(self.api, path)
def rest_get(self, api_path):
r = requests.get(
self.get_uri(api_path),
auth=self.auth)
r.raise_for_status()
return r.json()
# send a rest call with path and data to the rest interface of
# the rabbitmq instance
def rest_put(self, api_path, data):
r = requests.put(
self.get_uri(api_path),
json=data,
auth=self.auth)
r.raise_for_status()
return r
def rest_del(self, api_path):
r = requests.delete(
self.get_uri(api_path),
auth=self.auth)
r.raise_for_status()
return r
def set_topic_permissions(self, site): def set_topic_permissions(self, site):
username = site.client.username username = site.client.username
set_topic_permission_uri = '{}/topic-permissions/{}/{}/'.format( path = 'topic-permissions/{}/{}/'.format(
self.api,
self.vhost, self.vhost,
username, username,
) )
...@@ -43,16 +66,12 @@ class RabbitMQInstance: ...@@ -43,16 +66,12 @@ class RabbitMQInstance:
'read': '^service\.({})$'.format(services), 'read': '^service\.({})$'.format(services),
} }
r = requests.put( return self.rest_put(path, set_topic_permission_data)
set_topic_permission_uri,
json=set_topic_permission_data, auth=self.auth)
r.raise_for_status()
# set permissions for the user # set permissions for the user
def set_permissions(self, site): def set_permissions(self, site):
username = site.client.username username = site.client.username
set_permission_uri = '{}/permissions/{}/{}/'.format( path = 'permissions/{}/{}/'.format(
self.api,
self.vhost, self.vhost,
username, username,
) )
...@@ -63,40 +82,26 @@ class RabbitMQInstance: ...@@ -63,40 +82,26 @@ class RabbitMQInstance:
'read': permission, 'read': permission,
} }
r = requests.put( return self.rest_put(path, set_permission_data)
set_permission_uri, json=set_permission_data, auth=self.auth)
r.raise_for_status()
# create user at the rabbitmq instance # create user at the rabbitmq instance
def create_user(self, site): def create_user(self, site):
username = site.client.username username = site.client.username
user_creation_uri = '{}/users/{}/'.format( path = 'users/{}/'.format(username)
self.api,
username
)
user_creation_data = { user_creation_data = {
'password': str(site.client.auth_token.key), 'password': str(site.client.auth_token.key),
'tags': '', 'tags': '',
} }
r = requests.put( return self.rest_put(path, user_creation_data)
user_creation_uri, json=user_creation_data, auth=self.auth)
r.raise_for_status()
# delete user at the rabbitmq instance # delete user at the rabbitmq instance
def delete_user(self, site): def delete_user(self, site):
username = site.client.username username = site.client.username
user_creation_uri = self.get_uri('users/{}/'.format(username)) path = 'users/{}/'.format(username)
user_creation_data = { return self.rest_del(path)
'password': str(site.client.auth_token.key),
'tags': '',
}
r = requests.delete(
user_creation_uri, json=user_creation_data, auth=self.auth)
r.raise_for_status()
# PUBLIC API # PUBLIC API
...@@ -116,3 +121,10 @@ class RabbitMQInstance: ...@@ -116,3 +121,10 @@ class RabbitMQInstance:
print('RabbitMQ: deregister: {} / {}'.format( print('RabbitMQ: deregister: {} / {}'.format(
site.name, site.client.username)) site.name, site.client.username))
self.delete_user(site) self.delete_user(site)
def is_client_connected(self, site):
connections = self.rest_get("connections/")
client_connections = [c
for c in connections
if c['user'] == site.client.username]
return len(client_connections) > 0
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment