Commit 07637b37 authored by Lukas Burgey's avatar Lukas Burgey

Rework RabbitMQInstance drastically

parent 7e4f6229
......@@ -19,7 +19,8 @@ from rest_framework.authtoken.models import Token
from .auth.v1.models import OIDCConfig
LOGGER = logging.getLogger(__name__)
RECONNECT_TIMEOUT = 3
RECONNECT_TIMEOUT = 5
RECONNECT_RETRIES = 3
# singleton for simple configs
......@@ -100,157 +101,29 @@ class RabbitMQInstance(SingletonModel):
ssl=True,
)
@property
def connection(self):
LOGGER.debug(self.msg('connection opened'))
return pika.BlockingConnection(
self._connection_parameters,
)
@property
def channel(self):
rabbitmq_channel = self.connection.channel()
rabbitmq_channel.exchange_declare(
exchange=self.exchange,
durable=True,
auto_delete=False,
exchange_type='topic',
)
rabbitmq_channel.confirm_delivery()
LOGGER.debug(self.msg('channel opened'))
return rabbitmq_channel
def _get_api_uri(self, path):
return 'http://{}:{}/{}/{}'.format(
self.host,
self.port,
self.path,
path,
)
def _rest_get(self, api_path):
req = requests.get(
self._get_api_uri(api_path),
auth=self.auth)
req.raise_for_status()
return req.json()
# send a rest call with path and data to the rest interface of
# the rabbitmq instance
def _rest_put(self, api_path, data):
req = requests.put(
self._get_api_uri(api_path),
json=data,
auth=self.auth)
req.raise_for_status()
return req
def _rest_del(self, api_path):
req = requests.delete(
self._get_api_uri(api_path),
auth=self.auth)
req.raise_for_status()
return req
def _set_topic_permissions(self, site):
username = site.client.username
path = 'topic-permissions/{}/{}/'.format(
self.vhost,
username,
)
# set permissions for the correct topics
# we construct a regex to match the services of the site
services = ''
omit_bar = True
for service in site.services.all():
prefix = '|'
if omit_bar:
prefix = ''
omit_bar = False
services = services + prefix + service.name
set_topic_permission_data = {
'exchange': self.exchange,
'write': '^$',
'read': r'^service\.({})$'.format(services),
}
return self._rest_put(path, set_topic_permission_data)
# set permissions for the user
def _set_permissions(self, site):
username = site.client.username
path = 'permissions/{}/{}/'.format(
self.vhost,
username,
)
permission = r'^(amq\.gen.*|{})'.format(self.exchange)
set_permission_data = {
'configure': permission,
'write': permission,
'read': permission,
}
return self._rest_put(path, set_permission_data)
# create user at the rabbitmq instance
def _create_user(self, site):
username = site.client.username
path = 'users/{}/'.format(username)
user_creation_data = {
'password': str(site.client.auth_token.key),
'tags': '',
}
return self._rest_put(path, user_creation_data)
# delete user at the rabbitmq instance
def _delete_user(self, site):
username = site.client.username
path = 'users/{}/'.format(username)
return self._rest_del(path)
def _disconnect(self):
LOGGER.debug(self.msg('closing connection'))
self.connection.close()
# PUBLIC API
def register_site(self, site):
self._create_user(site)
self._set_permissions(site)
self._set_topic_permissions(site)
LOGGER.info(self.msg('registered {}'.format(site.client)))
def update_site(self, site):
self._set_topic_permissions(site)
LOGGER.info(self.msg('updated permissions for {}'.format(site.client)))
def deregister_site(self, site):
self._delete_user(site)
LOGGER.info(self.msg('deregistered {}'.format(site.client)))
def is_client_connected(self, site):
connections = self._rest_get("connections/")
clients_for_site = [c
for c in connections
if c['user'] == site.client.username]
return len(clients_for_site) > 0
def online_clients(self, service):
return [site
for site in service.site.all()
if self.is_client_connected(site)]
def publish_by_service(self, service, msg):
while True:
# FIXME dirty
tries = 0
while tries < RECONNECT_RETRIES:
try:
return self.channel.basic_publish(
# open connection
connection = pika.BlockingConnection(
self._connection_parameters,
)
# open channel
channel = connection.channel()
channel.exchange_declare(
exchange=self.exchange,
durable=True,
auto_delete=False,
exchange_type='topic',
)
channel.confirm_delivery()
channel.basic_publish(
exchange=self.exchange,
routing_key=service.routing_key,
body=msg,
......@@ -258,10 +131,14 @@ class RabbitMQInstance(SingletonModel):
delivery_mode=1,
),
)
except ConnectionClosed as exception:
LOGGER.info(self.msg('ConnectionClosed: {}'.format(exception)))
channel.close()
connection.close()
return
except:
time.sleep(RECONNECT_TIMEOUT)
tries += 1
def user_info_default():
return {}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment