brokers.py 3.75 KB
Newer Older
1

2
3
from logging import getLogger
from json import dumps
4
5
6

from django.db import models
from django.core.cache import cache
Lukas Burgey's avatar
Lukas Burgey committed
7

8
import pika
Lukas Burgey's avatar
Lukas Burgey committed
9
from pika.credentials import PlainCredentials
10

11
LOGGER = getLogger(__name__)
12

13

14
15
16
17
18
19
20
21
def publish_to_user(user, obj):
    from . import serializers
    RabbitMQInstance.load().publish_to_user(
        user,
        dumps(serializers.UpdateSerializer(obj).data),
    )


22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# singleton for simple configs
# https://steelkiwi.com/blog/practical-application-singleton-design-pattern/
class SingletonModel(models.Model):
    class Meta:
        abstract = True

    def set_cache(self):
        cache.set(self.__class__.__name__, self)

    # pylint: disable=invalid-name, arguments-differ
    def save(self, *args, **kwargs):
        self.pk = 1
        super(SingletonModel, self).save(*args, **kwargs)
        self.set_cache()

    @classmethod
    def load(cls):
        if cache.get(cls.__name__) is None:
            obj, created = cls.objects.get_or_create(pk=1)
            if not created:
                obj.set_cache()
        return cache.get(cls.__name__)


Lukas Burgey's avatar
Lukas Burgey committed
46
# The rabbitmq instance located on this host (localhost)
47
48
class RabbitMQInstance(SingletonModel):

Lukas Burgey's avatar
Lukas Burgey committed
49
50
51
    @property
    def host(self):
        return 'localhost'
52

Lukas Burgey's avatar
Lukas Burgey committed
53
54
55
    @property
    def vhost(self):
        return '/'
56
57

    @property
Lukas Burgey's avatar
Lukas Burgey committed
58
59
60
61
62
63
64
    def exchanges(self):
        return [
            'entitlements',
            'groups',
            'users',
            'services',
        ]
65
66

    def _init_exchanges(self, channel):
67
68
69
70
        # this is no error
        for exchange in self.exchanges:
            channel.exchange_declare(
                exchange=exchange,
Lukas Burgey's avatar
Lukas Burgey committed
71
                durable=True,  # so exchange survive a broker restart
72
73
74
                auto_delete=False,
                exchange_type='topic',
            )
75

Lukas Burgey's avatar
Lukas Burgey committed
76
77
78
79
80
81
82
83
84
85
86
    @property
    def _params(self):
        # we set NO port here, we use the default (probably 5672)
        # this requires the
        return pika.ConnectionParameters(
            host=self.host,
            virtual_host=self.vhost,
            credentials=PlainCredentials(
                'guest',
                'guest',
            ),
87
88
        )

Lukas Burgey's avatar
Lukas Burgey committed
89
90
    def __str__(self):
        return self.host
Lukas Burgey's avatar
Lukas Burgey committed
91

Lukas Burgey's avatar
Lukas Burgey committed
92
93
    def msg(self, msg):
        return '[RabbitMQ:{}] {}'.format(self.host, msg)
94

Lukas Burgey's avatar
Lukas Burgey committed
95
    def _open_connection(self):
96
        try:
Lukas Burgey's avatar
Lukas Burgey committed
97
98
99
            # start a new connection
            return pika.BlockingConnection(self._params)
        except Exception as e:
Lukas Burgey's avatar
Lukas Burgey committed
100
            LOGGER.exception('RabbitMQ connection error: %s', e)
Lukas Burgey's avatar
Lukas Burgey committed
101
            raise e
102
103

    def _publish(self, exchange, routing_key, body):
Lukas Burgey's avatar
Lukas Burgey committed
104
105
106
107
108
109
        connection = self._open_connection()

        channel = connection.channel()
        # put in confirm mode
        channel.confirm_delivery()

110
111
112
113
114
115
116
117
        channel.basic_publish(
            exchange=exchange,
            routing_key=routing_key,
            body=body,
            properties=pika.BasicProperties(
                delivery_mode=1,
            ),
        )
Lukas Burgey's avatar
Lukas Burgey committed
118
119
120

        # close channel and connection
        connection.close()
121
122

    # PUBLIC API
Lukas Burgey's avatar
Lukas Burgey committed
123
124

    # called on client registration to make sure the  exchanges exists
125
    def initialize(self):
Lukas Burgey's avatar
Lukas Burgey committed
126
127
128
129
        connection = self._open_connection()
        channel = connection.channel()
        self._init_exchanges(channel)
        connection.close()
130

131
132
    def publish_deployment_state(self, deployment_state):
        from .serializers.clients import DeploymentStateSerializer
133
        self._publish(
134
135
136
            'services',
            deployment_state.service.name,
            dumps(DeploymentStateSerializer(deployment_state).data),
137
138
        )

139
140
141
142
143
144
    def publish_to_user(self, user, msg):
        self._publish(
            'users',
            str(user.id),
            msg,
        )
Lukas Burgey's avatar
Lukas Burgey committed
145

Lukas Burgey's avatar
Lukas Burgey committed
146

Lukas Burgey's avatar
Lukas Burgey committed
147
148
149
# we keep this, as removing this breaks migrations
def exchanges_default():
    return []