Commit 8e5e6d08 authored by Lukas Burgey's avatar Lukas Burgey
Browse files

Move RabbitMQInstance to a separate package

parent f270a857
-group description
-deployment for all keys
-
format
====
SCIM
key -> credentials
"credentials":{
"type": [
{
"name": "<name>",
"value": "<value>"
}
]
}
...@@ -2,7 +2,7 @@ from django.contrib import admin ...@@ -2,7 +2,7 @@ from django.contrib import admin
from django.contrib.auth.admin import UserAdmin from django.contrib.auth.admin import UserAdmin
from django.contrib.auth.models import Group from django.contrib.auth.models import Group
from . import models from . import models, broker_models
from .auth.v1.models import OIDCConfig from .auth.v1.models import OIDCConfig
...@@ -23,7 +23,7 @@ class ClientAdmin(UserAdmin): ...@@ -23,7 +23,7 @@ class ClientAdmin(UserAdmin):
list_filter = (TypeFilter,) list_filter = (TypeFilter,)
admin.site.register(OIDCConfig) admin.site.register(OIDCConfig)
admin.site.register(models.RabbitMQInstance) admin.site.register(broker_models.RabbitMQInstance)
admin.site.register(models.User, ClientAdmin) admin.site.register(models.User, ClientAdmin)
admin.site.unregister(Group) admin.site.unregister(Group)
......
...@@ -5,7 +5,8 @@ from django.contrib.auth.models import AbstractUser, Group ...@@ -5,7 +5,8 @@ from django.contrib.auth.models import AbstractUser, Group
from django.http import HttpResponse from django.http import HttpResponse
from django.contrib.auth import authenticate from django.contrib.auth import authenticate
from django.contrib.sessions.models import Session from django.contrib.sessions.models import Session
from ...models import Site, User, RabbitMQInstance from ...models import Site, User
from ...broker_models import RabbitMQInstance
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
CLIENT_DEBUGGING = False CLIENT_DEBUGGING = False
......
# pylint: disable=global-statement
import logging
from requests.auth import HTTPBasicAuth
from django.db import models
from django.core.cache import cache
from django_mysql.models import JSONField
import pika
from pika.exceptions import ConnectionClosed
LOGGER = logging.getLogger(__name__)
RABBITMQ_CONNECTION = None
def exchanges_default():
return []
# singleton for simple configs
# https://steelkiwi.com/blog/practical-application-singleton-design-pattern/
class SingletonModel(models.Model):
class Meta:
abstract = True
def set_cache(self):
cache.set(self.__class__.__name__, self)
# pylint: disable=invalid-name, arguments-differ
def save(self, *args, **kwargs):
self.pk = 1
super(SingletonModel, self).save(*args, **kwargs)
self.set_cache()
@classmethod
def load(cls):
if cache.get(cls.__name__) is None:
obj, created = cls.objects.get_or_create(pk=1)
if not created:
obj.set_cache()
return cache.get(cls.__name__)
# clients are registerred at rabbitmq, when they are assigned to a site
# (because we only then know what services they provide)
class RabbitMQInstance(SingletonModel):
host = models.CharField(
max_length=150,
default='localhost',
)
vhost = models.CharField(
max_length=150,
default='%2f',
)
exchanges = JSONField(
default=exchanges_default,
null=True,
blank=True,
)
port = models.IntegerField(
default=15672,
)
username = models.CharField(
max_length=150,
default='guest',
)
password = models.CharField(
max_length=150,
default='guest',
)
def __str__(self):
return self.host
def msg(self, msg):
return '[RabbitMQ:{}] {}'.format(self.host, msg)
@property
def auth(self):
return HTTPBasicAuth(
self.username,
self.password,
)
def _init_exchanges(self, channel):
channel.exchange_declare(
exchange='services',
durable=True,
auto_delete=False,
exchange_type='topic',
)
channel.exchange_declare(
exchange='groups',
durable=True,
auto_delete=False,
exchange_type='topic',
)
channel.exchange_declare(
exchange='sites',
durable=True,
auto_delete=False,
exchange_type='topic',
)
channel.exchange_declare(
exchange='users',
durable=True,
auto_delete=False,
exchange_type='topic',
)
def _init_connection(self):
global RABBITMQ_CONNECTION
#LOGGER.debug('Opening new BlockingConnection')
RABBITMQ_CONNECTION = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host,
ssl=True,
heartbeat_interval=60,
)
)
return RABBITMQ_CONNECTION
@property
def _connection(self):
global RABBITMQ_CONNECTION
if RABBITMQ_CONNECTION is not None:
if RABBITMQ_CONNECTION.is_open:
return RABBITMQ_CONNECTION
elif RABBITMQ_CONNECTION.is_closing:
RABBITMQ_CONNECTION.close()
connection = self._init_connection()
channel = connection.channel()
self._init_exchanges(connection.channel())
channel.close()
RABBITMQ_CONNECTION = connection
return connection
@property
def _channel(self):
try:
channel = self._connection.channel()
channel.confirm_delivery()
return channel
except ConnectionClosed:
# reinitialize the connection
self._init_connection()
return self._channel
def _publish(self, exchange, routing_key, body):
channel = self._channel
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=pika.BasicProperties(
delivery_mode=1,
),
)
channel.close()
# PUBLIC API
def publish_by_service(self, service, msg):
self._publish(
'services',
service.name,
msg,
)
def publish_by_group(self, group, msg):
self._publish(
'groups',
group.name,
msg,
)
def publish_by_site(self, site, msg):
self._publish(
'sites',
site.name,
msg,
)
def publish_to_user(self, user, msg):
self._publish(
'users',
str(user.id),
msg,
)
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
from django_mysql.models import JSONField from django_mysql.models import JSONField
from rest_framework import serializers from rest_framework import serializers
from .. import models, serializers as backend_serializers from .. import models, broker_models, serializers as backend_serializers
class ServiceSerializer(serializers.ModelSerializer): class ServiceSerializer(serializers.ModelSerializer):
...@@ -72,5 +72,5 @@ class SiteSerializer(serializers.Serializer): ...@@ -72,5 +72,5 @@ class SiteSerializer(serializers.Serializer):
class RabbitMQInstanceSerializer(serializers.ModelSerializer): class RabbitMQInstanceSerializer(serializers.ModelSerializer):
class Meta: class Meta:
model = models.RabbitMQInstance model = broker_models.RabbitMQInstance
fields = ['vhost'] fields = ['vhost']
...@@ -6,7 +6,7 @@ from rest_framework import generics, views ...@@ -6,7 +6,7 @@ from rest_framework import generics, views
from rest_framework.authentication import BasicAuthentication from rest_framework.authentication import BasicAuthentication
from rest_framework.response import Response from rest_framework.response import Response
from .serializers import DeploymentStateSerializer, ServiceSerializer, RabbitMQInstanceSerializer from .serializers import DeploymentStateSerializer, ServiceSerializer, RabbitMQInstanceSerializer
from ..models import RabbitMQInstance from ..broker_models import RabbitMQInstance
from .. import models from .. import models
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
......
...@@ -5,20 +5,16 @@ import json ...@@ -5,20 +5,16 @@ import json
import logging import logging
from django.contrib.auth.models import AbstractUser, Group from django.contrib.auth.models import AbstractUser, Group
from django.core.cache import cache
from django.db import models from django.db import models
from django.db.models.signals import post_save from django.db.models.signals import post_save
from django.dispatch import receiver from django.dispatch import receiver
from django_mysql.models import JSONField from django_mysql.models import JSONField
from pika.exceptions import ConnectionClosed
import pika
from requests.auth import HTTPBasicAuth
from .broker_models import RabbitMQInstance
from .auth.v1.models import OIDCConfig from .auth.v1.models import OIDCConfig
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
RABBITMQ_CONNECTION = None
STATE_CHOICES = ( STATE_CHOICES = (
('deployment_pending', 'Deployment Pending'), ('deployment_pending', 'Deployment Pending'),
...@@ -30,8 +26,6 @@ STATE_CHOICES = ( ...@@ -30,8 +26,6 @@ STATE_CHOICES = (
('rejected', 'Rejected'), ('rejected', 'Rejected'),
) )
def exchanges_default():
return []
def user_info_default(): def user_info_default():
return {} return {}
def questionnaire_default(): def questionnaire_default():
...@@ -40,178 +34,6 @@ def credential_default(): ...@@ -40,178 +34,6 @@ def credential_default():
return {} return {}
# singleton for simple configs
# https://steelkiwi.com/blog/practical-application-singleton-design-pattern/
class SingletonModel(models.Model):
class Meta:
abstract = True
def set_cache(self):
cache.set(self.__class__.__name__, self)
# pylint: disable=invalid-name, arguments-differ
def save(self, *args, **kwargs):
self.pk = 1
super(SingletonModel, self).save(*args, **kwargs)
self.set_cache()
@classmethod
def load(cls):
if cache.get(cls.__name__) is None:
obj, created = cls.objects.get_or_create(pk=1)
if not created:
obj.set_cache()
return cache.get(cls.__name__)
# clients are registerred at rabbitmq, when they are assigned to a site
# (because we only then know what services they provide)
class RabbitMQInstance(SingletonModel):
host = models.CharField(
max_length=150,
default='localhost',
)
vhost = models.CharField(
max_length=150,
default='%2f',
)
exchanges = JSONField(
default=exchanges_default,
null=True,
blank=True,
)
port = models.IntegerField(
default=15672,
)
username = models.CharField(
max_length=150,
default='guest',
)
password = models.CharField(
max_length=150,
default='guest',
)
def __str__(self):
return self.host
def msg(self, msg):
return '[RabbitMQ:{}] {}'.format(self.host, msg)
@property
def auth(self):
return HTTPBasicAuth(
self.username,
self.password,
)
def _init_exchanges(self, channel):
channel.exchange_declare(
exchange='services',
durable=True,
auto_delete=False,
exchange_type='topic',
)
channel.exchange_declare(
exchange='groups',
durable=True,
auto_delete=False,
exchange_type='topic',
)
channel.exchange_declare(
exchange='sites',
durable=True,
auto_delete=False,
exchange_type='topic',
)
channel.exchange_declare(
exchange='users',
durable=True,
auto_delete=False,
exchange_type='topic',
)
def _init_connection(self):
global RABBITMQ_CONNECTION
#LOGGER.debug('Opening new BlockingConnection')
RABBITMQ_CONNECTION = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host,
ssl=True,
heartbeat_interval=60,
)
)
return RABBITMQ_CONNECTION
@property
def _connection(self):
global RABBITMQ_CONNECTION
if RABBITMQ_CONNECTION is not None:
if RABBITMQ_CONNECTION.is_open:
return RABBITMQ_CONNECTION
elif RABBITMQ_CONNECTION.is_closing:
RABBITMQ_CONNECTION.close()
connection = self._init_connection()
channel = connection.channel()
self._init_exchanges(connection.channel())
channel.close()
RABBITMQ_CONNECTION = connection
return connection
@property
def _channel(self):
try:
channel = self._connection.channel()
channel.confirm_delivery()
return channel
except ConnectionClosed:
# reinitialize the connection
self._init_connection()
return self._channel
def _publish(self, exchange, routing_key, body):
channel = self._channel
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=pika.BasicProperties(
delivery_mode=1,
),
)
channel.close()
# PUBLIC API
def publish_by_service(self, service, msg):
self._publish(
'services',
service.name,
msg,
)
def publish_by_group(self, group, msg):
self._publish(
'groups',
group.name,
msg,
)
def publish_by_site(self, site, msg):
self._publish(
'sites',
site.name,
msg,
)
def publish_to_user(self, user, msg):
self._publish(
'users',
str(user.id),
msg,
)
class User(AbstractUser): class User(AbstractUser):
TYPE_CHOICES = ( TYPE_CHOICES = (
('apiclient', 'API-Client'), ('apiclient', 'API-Client'),
...@@ -520,7 +342,7 @@ class Service(models.Model): ...@@ -520,7 +342,7 @@ class Service(models.Model):
deployment = user.deployments.get(group=group) deployment = user.deployments.get(group=group)
deployment.service_added(self) deployment.service_added(self)
except Deployment.DoesNotExist: except Deployment.DoesNotExist:
LOGGER.error('Inconsistency in group deployment') LOGGER.error('Inconsistency of group deployment')
raise raise
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment