Commit d5e085c1 authored by Lukas Burgey's avatar Lukas Burgey

Simplify the broker code

parent a107d1dd
......@@ -56,7 +56,6 @@ class VOParentAdmin(PolymorphicParentModelAdmin):
admin.site.register(EntitlementNameSpace)
admin.site.register(OIDCConfig)
admin.site.register(models.RabbitMQInstance)
admin.site.register(models.Site)
admin.site.register(models.Service)
......
# Generated by Django 2.2.7 on 2019-11-13 15:27
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('backend', '0008_auto_20190220_1649'),
]
operations = [
migrations.RemoveField(
model_name='rabbitmqinstance',
name='exchanges',
),
migrations.RemoveField(
model_name='rabbitmqinstance',
name='host',
),
migrations.RemoveField(
model_name='rabbitmqinstance',
name='password',
),
migrations.RemoveField(
model_name='rabbitmqinstance',
name='port',
),
migrations.RemoveField(
model_name='rabbitmqinstance',
name='username',
),
migrations.RemoveField(
model_name='rabbitmqinstance',
name='vhost',
),
]
# pylint: disable=global-statement
from logging import getLogger
from json import dumps
from requests.auth import HTTPBasicAuth
from django.db import models
from django.core.cache import cache
from django_mysql.models import JSONField
import pika
from pika.exceptions import ConnectionClosed
from pika.credentials import PlainCredentials
LOGGER = getLogger(__name__)
RABBITMQ_CONNECTION = None
def exchanges_default():
return [
'entitlements',
'groups',
'users',
'services',
]
def publish_to_user(user, obj):
......@@ -56,97 +43,71 @@ class SingletonModel(models.Model):
return cache.get(cls.__name__)
# clients are registerred at rabbitmq, when they are assigned to a site
# (because we only then know what services they provide)
# The rabbitmq instance located on this host (localhost)
class RabbitMQInstance(SingletonModel):
host = models.CharField(
max_length=150,
default='localhost',
)
vhost = models.CharField(
max_length=150,
default='/',
)
exchanges = JSONField(
default=exchanges_default,
null=True,
blank=True,
)
port = models.IntegerField(
default=15672,
)
username = models.CharField(
max_length=150,
default='guest',
)
password = models.CharField(
max_length=150,
default='guest',
)
def __str__(self):
return self.host
@property
def host(self):
return 'localhost'
def msg(self, msg):
return '[RabbitMQ:{}] {}'.format(self.host, msg)
@property
def vhost(self):
return '/'
@property
def auth(self):
return HTTPBasicAuth(
self.username,
self.password,
)
def exchanges(self):
return [
'entitlements',
'groups',
'users',
'services',
]
def _init_exchanges(self, channel):
# this is no error
for exchange in self.exchanges:
channel.exchange_declare(
exchange=exchange,
durable=True,
durable=True, # so exchange survive a broker restart
auto_delete=False,
exchange_type='topic',
)
def _init_connection(self):
global RABBITMQ_CONNECTION
RABBITMQ_CONNECTION = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host,
heartbeat=60,
)
@property
def _params(self):
# we set NO port here, we use the default (probably 5672)
# this requires the
return pika.ConnectionParameters(
host=self.host,
virtual_host=self.vhost,
credentials=PlainCredentials(
'guest',
'guest',
),
)
return RABBITMQ_CONNECTION
@property
def _connection(self):
global RABBITMQ_CONNECTION
if RABBITMQ_CONNECTION is not None:
if RABBITMQ_CONNECTION.is_open:
return RABBITMQ_CONNECTION
def __str__(self):
return self.host
if RABBITMQ_CONNECTION.is_closing:
RABBITMQ_CONNECTION.close()
def msg(self, msg):
return '[RabbitMQ:{}] {}'.format(self.host, msg)
connection = self._init_connection()
channel = connection.channel()
self._init_exchanges(connection.channel())
channel.close()
RABBITMQ_CONNECTION = connection
return connection
@property
def _channel(self):
def _open_connection(self):
try:
channel = self._connection.channel()
channel.confirm_delivery()
return channel
except ConnectionClosed:
# reinitialize the connection
self._init_connection()
return self._channel
# start a new connection
return pika.BlockingConnection(self._params)
except Exception as e:
LOGGER.exception('RabbitMQ connection error: {}'.format(e))
raise e
def _publish(self, exchange, routing_key, body):
channel = self._channel
connection = self._open_connection()
channel = connection.channel()
# put in confirm mode
channel.confirm_delivery()
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
......@@ -155,11 +116,18 @@ class RabbitMQInstance(SingletonModel):
delivery_mode=1,
),
)
channel.close()
# close channel and connection
connection.close()
# PUBLIC API
# called on client registration to make sure the exchanges exists
def initialize(self):
self._connection
connection = self._open_connection()
channel = connection.channel()
self._init_exchanges(channel)
connection.close()
def publish_deployment_state(self, deployment_state):
from .serializers.clients import DeploymentStateSerializer
......@@ -175,3 +143,7 @@ class RabbitMQInstance(SingletonModel):
str(user.id),
msg,
)
# we keep this, as removing this breaks migrations
def exchanges_default():
return []
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment