brokers.py 3.23 KB
Newer Older
1

2
3
import sys
import os
4
from logging import getLogger
5

lukas.burgey's avatar
lukas.burgey committed
6
from pika import BlockingConnection, ConnectionParameters, BasicProperties
Lukas Burgey's avatar
Lukas Burgey committed
7
from pika.credentials import PlainCredentials
lukas.burgey's avatar
lukas.burgey committed
8
from django.conf import settings
9

10
LOGGER = getLogger(__name__)
11

12

lukas.burgey's avatar
lukas.burgey committed
13
class RabbitMQInstance:
14

Lukas Burgey's avatar
Lukas Burgey committed
15
16
17
    @property
    def host(self):
        return 'localhost'
18

Lukas Burgey's avatar
Lukas Burgey committed
19
20
21
    @property
    def vhost(self):
        return '/'
22
23

    @property
Lukas Burgey's avatar
Lukas Burgey committed
24
25
26
27
28
29
30
    def exchanges(self):
        return [
            'entitlements',
            'groups',
            'users',
            'services',
        ]
31

32
33
34
35
36
37
38
39
40
41
42
43
    @property
    def _no_broker(self):
        """ indicates if this module should do anything.
        This is useful during testing, when there may not be a running RabbitMQ server
        """
        return (
            'pytest' in sys.modules
            or getattr(settings, 'NO_BROKER', False)
            or os.path.basename(sys.argv[0]) in ('pytest', 'py.test')
        )


44
    def _init_exchanges(self, channel):
45
46
47
48
        # this is no error
        for exchange in self.exchanges:
            channel.exchange_declare(
                exchange=exchange,
Lukas Burgey's avatar
Lukas Burgey committed
49
                durable=True,  # so exchange survive a broker restart
50
51
52
                auto_delete=False,
                exchange_type='topic',
            )
53

Lukas Burgey's avatar
Lukas Burgey committed
54
55
56
57
    @property
    def _params(self):
        # we set NO port here, we use the default (probably 5672)
        # this requires the
lukas.burgey's avatar
lukas.burgey committed
58
        return ConnectionParameters(
Lukas Burgey's avatar
Lukas Burgey committed
59
60
61
62
63
64
            host=self.host,
            virtual_host=self.vhost,
            credentials=PlainCredentials(
                'guest',
                'guest',
            ),
65
66
        )

Lukas Burgey's avatar
Lukas Burgey committed
67
68
    def __str__(self):
        return self.host
Lukas Burgey's avatar
Lukas Burgey committed
69

Lukas Burgey's avatar
Lukas Burgey committed
70
71
    def msg(self, msg):
        return '[RabbitMQ:{}] {}'.format(self.host, msg)
72

Lukas Burgey's avatar
Lukas Burgey committed
73
    def _open_connection(self):
74
        try:
Lukas Burgey's avatar
Lukas Burgey committed
75
            # start a new connection
lukas.burgey's avatar
lukas.burgey committed
76
            return BlockingConnection(self._params)
Lukas Burgey's avatar
Lukas Burgey committed
77
        except Exception as e:
Lukas Burgey's avatar
Lukas Burgey committed
78
            LOGGER.exception('RabbitMQ connection error: %s', e)
Lukas Burgey's avatar
Lukas Burgey committed
79
            raise e
80
81

    def _publish(self, exchange, routing_key, body):
lukas.burgey's avatar
lukas.burgey committed
82
        # when running CI tests we have no broker, so just do nothing
83
        if self._no_broker:
lukas.burgey's avatar
lukas.burgey committed
84
85
            return

Lukas Burgey's avatar
Lukas Burgey committed
86
87
88
89
90
91
        connection = self._open_connection()

        channel = connection.channel()
        # put in confirm mode
        channel.confirm_delivery()

92
93
94
95
        channel.basic_publish(
            exchange=exchange,
            routing_key=routing_key,
            body=body,
lukas.burgey's avatar
lukas.burgey committed
96
            properties=BasicProperties(
97
98
99
                delivery_mode=1,
            ),
        )
Lukas Burgey's avatar
Lukas Burgey committed
100
101
102

        # close channel and connection
        connection.close()
103
104

    # PUBLIC API
Lukas Burgey's avatar
Lukas Burgey committed
105
106

    # called on client registration to make sure the  exchanges exists
107
    def initialize(self):
lukas.burgey's avatar
lukas.burgey committed
108
        # when running CI tests we have no broker, so just do nothing
109
        if self._no_broker:
lukas.burgey's avatar
lukas.burgey committed
110
111
            return

Lukas Burgey's avatar
Lukas Burgey committed
112
113
114
115
        connection = self._open_connection()
        channel = connection.channel()
        self._init_exchanges(channel)
        connection.close()
116

117
    def publish_deployment_state(self, deployment_state, msg):
118
        self._publish(
119
120
            'services',
            deployment_state.service.name,
121
            msg,
122
123
        )

124
125
126
127
128
129
    def publish_to_user(self, user, msg):
        self._publish(
            'users',
            str(user.id),
            msg,
        )