Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / amqp_puka.py @ e0b68525

History | View | Annotate | Download (13.4 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
""" Module implementing connection and communication with an AMQP broker.
35

36
AMQP Client's implemented by this module silenty detect connection failures and
37
try to reconnect to any available broker. Also publishing takes advantage of
38
publisher-confirms in order to guarantee that messages are properly delivered
39
to the broker.
40

41
"""
42

    
43
import logging
44

    
45
from puka import Client
46
from puka import spec_exceptions
47
from socket import error as socket_error
48
from time import sleep
49
from random import shuffle
50
from functools import wraps
51
from ordereddict import OrderedDict
52
from synnefo import settings
53

    
54
logging.basicConfig(level=logging.DEBUG,
55
                    format="[%(levelname)s %(asctime)s] %(message)s")
56
logger = logging.getLogger()
57

    
58

    
59
def reconnect_decorator(func):
60
    """
61
    Decorator for persistent connection with one or more AMQP brokers.
62

63
    """
64
    @wraps(func)
65
    def wrapper(self, *args, **kwargs):
66
        try:
67
            return func(self, *args, **kwargs)
68
        except (socket_error, spec_exceptions.ConnectionForced) as e:
69
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
70
            self.consume_promises = []
71
            self.connect()
72

    
73
    return wrapper
74

    
75

    
76
class AMQPPukaClient(object):
77
    """
78
    AMQP generic client implementing most of the basic AMQP operations.
79

80
    """
81
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
82
                 confirms=True, confirm_buffer=100):
83
        """Format hosts as "amqp://username:pass@host:port" """
84

    
85
        self.hosts = hosts
86
        shuffle(self.hosts)
87

    
88
        self.max_retries = max_retries
89
        self.confirms = confirms
90
        self.confirm_buffer = confirm_buffer
91

    
92
        self.connection = None
93
        self.channel = None
94
        self.consumers = {}
95
        self.unacked = OrderedDict()
96
        self.unsend = OrderedDict()
97
        self.consume_promises = []
98
        self.exchanges = []
99

    
100
    def connect(self, retries=0):
101
        if retries > self.max_retries:
102
            logger.error("Aborting after %s retries", retries - 1)
103
            raise AMQPConnectionError('Aborting after %d connection failures.'\
104
                                      % (retries - 1))
105
            return
106

    
107
        # Pick up a host
108
        host = self.hosts.pop()
109
        self.hosts.insert(0, host)
110

    
111
        self.client = Client(host, pubacks=self.confirms)
112

    
113
        host = host.split('@')[-1]
114
        logger.debug('Connecting to node %s' % host)
115

    
116
        try:
117
            promise = self.client.connect()
118
            self.client.wait(promise)
119
        except socket_error as e:
120
            logger.error('Cannot connect to host %s: %s', host, e)
121
            if retries > 2 * len(self.hosts):
122
                sleep(1)
123
            return self.connect(retries + 1)
124

    
125
        logger.info('Successfully connected to host: %s', host)
126

    
127
        logger.info('Creating channel')
128

    
129
        if self.unacked:
130
            self._resend_unacked_messages()
131

    
132
        if self.unsend:
133
            self._resend_unsend_messages()
134

    
135
        if self.consumers:
136
            for queue, callback in self.consumers.items():
137
                self.basic_consume(queue, callback)
138

    
139
        if self.exchanges:
140
            exchanges = self.exchanges
141
            self.exchanges = []
142
            for exchange, type in exchanges:
143
                self.exchange_declare(exchange, type)
144

    
145
    def exchange_declare(self, exchange, type='direct'):
146
        """Declare an exchange
147
        @type exchange_name: string
148
        @param exchange_name: name of the exchange
149
        @type exchange_type: string
150
        @param exhange_type: one of 'direct', 'topic', 'fanout'
151

152
        """
153
        logger.info('Declaring %s exchange: %s', type, exchange)
154
        promise = self.client.exchange_declare(exchange=exchange,
155
                                               type=type,
156
                                               durable=True,
157
                                               auto_delete=False)
158
        self.client.wait(promise)
159
        self.exchanges.append((exchange, type))
160

    
161
    @reconnect_decorator
162
    def queue_declare(self, queue, exclusive=False,
163
                      mirrored=True, mirrored_nodes='all'):
164
        """Declare a queue
165

166
        @type queue: string
167
        @param queue: name of the queue
168
        @param mirrored: whether the queue will be mirrored to other brokers
169
        @param mirrored_nodes: the policy for the mirrored queue.
170
            Available policies:
171
                - 'all': The queue is mirrored to all nodes and the
172
                  master node is the one to which the client is
173
                  connected
174
                - a list of nodes. The queue will be mirrored only to
175
                  the specified nodes, and the master will be the
176
                  first node in the list. Node names must be provided
177
                  and not host IP. example: [node1@rabbit,node2@rabbit]
178

179
        """
180
        logger.info('Declaring queue: %s', queue)
181

    
182
        if mirrored:
183
            if mirrored_nodes == 'all':
184
                arguments = {'x-ha-policy': 'all'}
185
            elif isinstance(mirrored_nodes, list):
186
                arguments = {'x-ha-policy': 'nodes',
187
                           'x-ha-policy-params': mirrored_nodes}
188
            else:
189
                raise AttributeError
190
        else:
191
            arguments = {}
192

    
193
        promise = self.client.queue_declare(queue=queue, durable=True,
194
                                            exclusive=exclusive,
195
                                            auto_delete=False,
196
                                            arguments=arguments)
197
        self.client.wait(promise)
198

    
199
    def queue_bind(self, queue, exchange, routing_key):
200
        logger.debug('Binding queue %s to exchange %s with key %s'
201
                 % (queue, exchange, routing_key))
202
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
203
                                         routing_key=routing_key)
204
        self.client.wait(promise)
205

    
206
    @reconnect_decorator
207
    def basic_publish(self, exchange, routing_key, body):
208
        """Publish a message with a specific routing key """
209
        self._publish(exchange, routing_key, body)
210

    
211
        self.flush_buffer()
212

    
213
        if self.confirms and len(self.unacked) >= self.confirm_buffer:
214
            self.get_confirms()
215

    
216
    @reconnect_decorator
217
    def basic_publish_multi(self, exchange, routing_key, bodies):
218
        for body in bodies:
219
            self.unsend[body] = (exchange, routing_key)
220

    
221
        for body in bodies:
222
            self._publish(exchange, routing_key, body)
223
            self.unsend.pop(body)
224

    
225
        self.flush_buffer()
226

    
227
        if self.confirms:
228
            self.get_confirms()
229

    
230
    def _publish(self, exchange, routing_key, body):
231
        # Persisent messages by default!
232
        headers = {}
233
        headers['delivery_mode'] = 2
234
        promise = self.client.basic_publish(exchange=exchange,
235
                                            routing_key=routing_key,
236
                                            body=body, headers=headers)
237

    
238
        if self.confirms:
239
            self.unacked[promise] = (exchange, routing_key, body)
240

    
241
        return promise
242

    
243
    @reconnect_decorator
244
    def flush_buffer(self):
245
        while self.client.needs_write():
246
            self.client.on_write()
247

    
248
    @reconnect_decorator
249
    def get_confirms(self):
250
        for promise in self.unacked.keys():
251
            self.client.wait(promise)
252
            self.unacked.pop(promise)
253

    
254
    @reconnect_decorator
255
    def _resend_unacked_messages(self):
256
        """Resend unacked messages in case of a connection failure."""
257
        msgs = self.unacked.values()
258
        self.unacked.clear()
259
        for exchange, routing_key, body in msgs:
260
            logger.debug('Resending message %s' % body)
261
            self.basic_publish(exchange, routing_key, body)
262

    
263
    @reconnect_decorator
264
    def _resend_unsend_messages(self):
265
        """Resend unsend messages in case of a connection failure."""
266
        for body in self.unsend.keys():
267
            (exchange, routing_key) = self.unsend[body]
268
            self.basic_publish(exchange, routing_key, body)
269
            self.unsend.pop(body)
270

    
271
    @reconnect_decorator
272
    def basic_consume(self, queue, callback, prefetch_count=0):
273
        """Consume from a queue.
274

275
        @type queue: string or list of strings
276
        @param queue: the name or list of names from the queues to consume
277
        @type callback: function
278
        @param callback: the callback function to run when a message arrives
279

280
        """
281
        # Store the queues and the callback
282
        self.consumers[queue] = callback
283

    
284
        def handle_delivery(promise, msg):
285
            """Hide promises and messages without body"""
286
            if 'body' in msg:
287
                callback(self, msg)
288
            else:
289
                logger.debug("Message without body %s" % msg)
290
                raise socket_error
291

    
292
        consume_promise = \
293
                self.client.basic_consume(queue=queue,
294
                                          prefetch_count=prefetch_count,
295
                                          callback=handle_delivery)
296

    
297
        self.consume_promises.append(consume_promise)
298
        return consume_promise
299

    
300
    @reconnect_decorator
301
    def basic_wait(self, promise=None, timeout=0):
302
        """Wait for messages from the queues declared by basic_consume.
303

304
        This function will block until a message arrives from the queues that
305
        have been declared with basic_consume. If the optional arguments
306
        'promise' is given, only messages for this promise will be delivered.
307

308
        """
309
        if promise is not None:
310
            self.client.wait(promise, timeout)
311
        else:
312
            self.client.wait(self.consume_promises)
313

    
314
    @reconnect_decorator
315
    def basic_get(self, queue):
316
        """Get a single message from a queue.
317

318
        This is a non-blocking method for getting messages from a queue.
319
        It will return None if the queue is empty.
320

321
        """
322
        get_promise = self.client.basic_get(queue=queue)
323
        result = self.client.wait(get_promise)
324
        if 'empty' in result:
325
            # The queue is empty
326
            return None
327
        else:
328
            return result
329

    
330
    @reconnect_decorator
331
    def basic_ack(self, message):
332
        self.client.basic_ack(message)
333

    
334
    @reconnect_decorator
335
    def basic_nack(self, message):
336
        #TODO:
337
        pass
338

    
339
    def close(self):
340
        """Check that messages have been send and close the connection."""
341
        try:
342
            if self.confirms:
343
                self.get_confirms()
344
            close_promise = self.client.close()
345
            self.client.wait(close_promise)
346
        except (socket_error, spec_exceptions.ConnectionForced) as e:
347
            logger.error('Connection closed while closing connection:%s',
348
                          e)
349

    
350
    def queue_delete(self, queue, if_unused=True, if_empty=True):
351
        """Delete a queue.
352

353
        Returns False if the queue does not exist
354
        """
355
        try:
356
            promise = self.client.queue_delete(queue=queue,
357
                                               if_unused=if_unused,
358
                                               if_empty=if_empty)
359
            self.client.wait(promise)
360
            return True
361
        except spec_exceptions.NotFound:
362
            logger.info("Queue %s does not exist", queue)
363
            return False
364

    
365
    def exchange_delete(self, exchange, if_unused=True):
366
        """Delete an exchange."""
367
        try:
368

    
369
            promise = self.client.exchange_delete(exchange=exchange,
370
                                                  if_unused=if_unused)
371
            self.client.wait(promise)
372
            return True
373
        except spec_exceptions.NotFound:
374
            logger.info("Exchange %s does not exist", exchange)
375
            return False
376

    
377
    @reconnect_decorator
378
    def basic_cancel(self, promise=None):
379
        """Cancel consuming from a queue. """
380
        if promise is not None:
381
            self.client.basic_cancel(promise)
382
        else:
383
            for promise in self.consume_promises:
384
                self.client.basic_cancel(promise)
385

    
386

    
387
class AMQPConnectionError():
388
    pass