pubsub.py 2.04 KB
Newer Older
Lukas Burgey's avatar
Lukas Burgey committed
1
2

import pika
3
from ..rabbitmq import RabbitMQInstance
Lukas Burgey's avatar
Lukas Burgey committed
4
5
6
7
8
9


class PubSubConnection:
    def __init__(self):
        self.host = 'localhost'
        self.exchange_name = 'deployments'
Lukas Burgey's avatar
Lukas Burgey committed
10
11
12
13
14
15
        self.properties = pika.BasicProperties(
                delivery_mode=1,
                )

    def delivery_callback(method):
        print(str(method))
Lukas Burgey's avatar
Lukas Burgey committed
16
17
18
19
20
21
22
23

    def connect(self):
        self.connection = pika.BlockingConnection(
                pika.ConnectionParameters(host=self.host))

        self.channel = self.connection.channel()
        self.channel.exchange_declare(
                exchange=self.exchange_name,
Lukas Burgey's avatar
Lukas Burgey committed
24
                durable=True,
Lukas Burgey's avatar
Lukas Burgey committed
25
                exchange_type='topic')
Lukas Burgey's avatar
Lukas Burgey committed
26
27
28
29
        self.channel.confirm_delivery()
        # self.channel.confirm_delivery(
        #         callback=self.delivery_callback,
        #         )
Lukas Burgey's avatar
Lukas Burgey committed
30
31
32
33
34
35
36

    def disconnect(self):
        self.connection.close()

    def service_routing_key(self, service):
        return 'service.' + service.name

37
38
39
40
41
    def online_clients(self, service):
        rabbitmq = RabbitMQInstance()
        return [s
                for s in service.site.all()
                if rabbitmq.is_client_connected(s)]
42

Lukas Burgey's avatar
Lukas Burgey committed
43
    def publish_by_service(self, service, message):
44
45
46
47
48
49
50
51
52
53
54
        online_clients = self.online_clients(service)
        if len(online_clients) > 0:
            print("Online clients for service {}: {}".format(
                service,
                online_clients,
                ))
        else:
            print("No clients online for service {}".format(
                service
            ))
            return
55

Lukas Burgey's avatar
Lukas Burgey committed
56
        self.connect()
57
58
59
        print('Sent deployment update for service {}'.format(
            service
            ))
Lukas Burgey's avatar
Lukas Burgey committed
60

61
        # True if all the clients acked the message
Lukas Burgey's avatar
Lukas Burgey committed
62
        delivery_confirmed = self.channel.basic_publish(
Lukas Burgey's avatar
Lukas Burgey committed
63
64
                exchange=self.exchange_name,
                routing_key=self.service_routing_key(service),
Lukas Burgey's avatar
Lukas Burgey committed
65
66
67
                body=message,
                properties=self.properties,
                )
Lukas Burgey's avatar
Lukas Burgey committed
68
69

        self.disconnect()
70
        return delivery_confirmed, online_clients