Revision db400d82 snf-common/synnefo/lib/amqp.py

b/snf-common/synnefo/lib/amqp.py
33 33

  
34 34
""" Module implementing connection and communication with an AMQP broker.
35 35

  
36
AMQP Client's implemented by this module silenty detect connection failures and
36
AMQP Client's instatiated by this module silenty detect connection failures and
37 37
try to reconnect to any available broker. Also publishing takes advantage of
38 38
publisher-confirms in order to guarantee that messages are properly delivered
39 39
to the broker.
40 40

  
41 41
"""
42 42

  
43
import puka
44
import logging
45
import socket
46

  
47
from time import sleep
48
from random import shuffle
49
from functools import wraps
50

  
51
from ordereddict import OrderedDict
52 43
from synnefo import settings
53 44

  
54
AMQP_HOSTS = settings.AMQP_HOSTS
55

  
56
MAX_RETRIES = 20
45
if settings.AMQP_BACKEND == 'puka':
46
    from amqp_puka import AMQPPukaClient as Client
47
elif settings.AMQP_BACKEND == 'haigha':
48
    from amqp_haigha import AMQPHaighaClient as Client
49
else:
50
    raise Exception('Unknown Backend %s' % settings.AMQP_BACKEND)
57 51

  
58
log = logging.getLogger()
59

  
60

  
61
def reconnect_decorator(func):
62
    """
63
    Decorator for persistent connection with one or more AMQP brokers.
64 52

  
65
    """
66
    @wraps(func)
67
    def wrapper(self, *args, **kwargs):
68
        try:
69
            if self.client.sd is None:
70
                self.connect()
71
            return func(self, *args, **kwargs)
72
        except (socket.error, puka.spec_exceptions.ConnectionForced):
73
            log.debug("Connection closed. Reconnecting")
74
            self.connect()
75
            return wrapper(self, *args, **kwargs)
76
        except Exception:
77
            if self.client.sd is None:
78
                log.debug("Connection closed. Reconnecting")
79
                self.connect()
80
                return wrapper(self, *args, **kwargs)
81
            else:
82
                raise
83
    return wrapper
84

  
85

  
86
class AMQPClient():
53
class AMQPClient(object):
87 54
    """
88 55
    AMQP generic client implementing most of the basic AMQP operations.
89 56

  
90
    This client confirms delivery of each published message before publishing
91
    the next one, which results in low performance. Better performance can be
92
    achieved by using AMQPAsyncClient.
93

  
57
    This class will create an object of AMQPPukaClient or AMQPHaigha client
58
    depending on AMQP_BACKEND setting
94 59
    """
95
    def __init__(self, hosts=AMQP_HOSTS, max_retries=MAX_RETRIES):
96
        """Format hosts as "amqp://username:pass@host:port" """
97
        # Shuffle the elements of the host list for better load balancing
98
        self.hosts = hosts
99
        shuffle(self.hosts)
100
        self.max_retries = max_retries
101

  
102
        self.promises = []
103
        self.consume_promises = []
104
        self.consume_info = {}
105

  
106
    def connect(self, retries=0):
107
        # Pick up a host
108
        url = self.hosts.pop()
109
        self.hosts.insert(0, url)
110

  
111
        if retries > self.max_retries:
112
            raise AMQPError("Cannot connect to any node after %s attemps" % retries)
113
        if retries > 2 * len(self.hosts):
114
            sleep(1)
115

  
116
        self.client = puka.Client(url, pubacks=True)
117

  
118
        host = url.split('@')[-1]
119
        log.debug('Connecting to node %s' % host)
120

  
121
        try:
122
            promise = self.client.connect()
123
            self.client.wait(promise)
124
        except socket.error as e:
125
            log.debug('Cannot connect to node %s.' % host)
126
            return self.connect(retries+1)
127

  
128
    @reconnect_decorator
129
    def exchange_declare(self, exchange_name, exchange_type='direct',
130
                         durable=True, auto_delete=False):
131
        """Declare an exchange
132
        @type exchange_name: string
133
        @param exchange_name: name of the exchange
134
        @type exchange_type: string
135
        @param exhange_type: one of 'direct', 'topic', 'fanout'
136

  
137
        """
138
        log.debug('Declaring exchange %s of %s type.'
139
                  %(exchange_name, exchange_type))
140
        promise = self.client.exchange_declare(exchange=exchange_name,
141
                                               type=exchange_type,
142
                                               durable=durable,
143
                                               auto_delete=auto_delete)
144
        self.client.wait(promise)
145
        log.debug('Exchange %s declared succesfully ' % exchange_name)
146

  
147
    @reconnect_decorator
148
    def queue_declare(self, queue, durable=True, exclusive=False,
149
                      auto_delete=False, mirrored=True, mirrored_nodes='all'):
150
        """Declare a queue
151

  
152
        @type queue: string
153
        @param queue: name of the queue
154
        @param mirrored: whether the queue will be mirrored to other brokers
155
        @param mirrored_nodes: the policy for the mirrored queue.
156
            Available policies:
157
                - 'all': The queue is mirrored to all nodes and the
158
                  master node is the one to which the client is
159
                  connected
160
                - a list of nodes. The queue will be mirrored only to
161
                  the specified nodes, and the master will be the
162
                  first node in the list. Node names must be provided
163
                  and not host IP. example: [node1@rabbit,node2@rabbit]
164

  
165
        """
166

  
167
        log.debug('Declaring queue %s' % queue)
168
        if mirrored:
169
            if mirrored_nodes == 'all':
170
                arguments={'x-ha-policy':'all'}
171
            elif isinstance(mirrored_nodes, list):
172
                arguments={'x-ha-policy':'nodes', 'x-ha-policy-params':mirrored_nodes}
173
            else:
174
                raise AttributeError
175
        else:
176
            arguments = {}
177

  
178
        promise = self.client.queue_declare(queue=queue, durable=durable,
179
                                            exclusive=exclusive,
180
                                            auto_delete=auto_delete,
181
                                            arguments=arguments)
182
        self.client.wait(promise)
183
        log.debug('Queue %s declared successfully.' % queue)
184

  
185
    @reconnect_decorator
186
    def queue_bind(self, queue, exchange, routing_key):
187
        log.debug('Binding queue %s to exchange %s with key %s'
188
                 % (queue, exchange, routing_key))
189
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
190
                                         routing_key=routing_key)
191
        self.client.wait(promise)
192
        log.debug('Binding completed successfully')
193

  
194
    def basic_publish_multi(self, exhange, routing_key, msgs, headers={}):
195
        """Send many messages to the  same exchange and with the same key """
196
        for msg in msgs:
197
            self.basic_publish(exchange, routing_key, headers, msg)
198

  
199
    @reconnect_decorator
200
    def basic_publish(self, exchange, routing_key, body, headers={}):
201
        """Publish a message with a specific routing key """
202

  
203
        # Persisent messages by default!
204
        if not 'delivery_mode' in headers:
205
            headers['delivery_mode'] = 2
206

  
207
        promise = self.client.basic_publish(exchange=exchange,
208
                                            routing_key=routing_key,
209
                                            body=body, headers=headers)
210
        self.client.wait(promise)
211

  
212
    @reconnect_decorator
213
    def basic_consume(self, queue, callback, prefetch_count=0):
214
        """Consume from a queue.
215

  
216
        @type queue: string or list of strings
217
        @param queue: the name or list of names from the queues to consume
218
        @type callback: function
219
        @param callback: the callback function to run when a message arrives
220

  
221
        """
222
        if isinstance(queue, str):
223
            queue = [queue]
224
        elif isinstance(queue, list):
225
            pass
226
        else:
227
            raise AttributeError
228

  
229
        # Store the queues and the callback
230
        for q in queue:
231
            self.consume_info[q] = callback
232

  
233
        def handle_delivery(promise, result):
234
            """Hide promises and messages without body"""
235
            if 'body' in result:
236
                callback(self, result)
237
            else:
238
                log.debug("Message without body %s" % result)
239
                raise socket.error
240

  
241
        consume_promise = \
242
                self.client.basic_consume_multi(queues=queue,
243
                                                prefetch_count=prefetch_count,
244
                                                callback=handle_delivery)
245
        self.consume_promises.append(consume_promise)
246
        return consume_promise
247

  
248
    def basic_wait(self, promise=None, timeout=0):
249
        """Wait for messages from the queues declared by basic_consume.
250

  
251
        This function will block until a message arrives from the queues that
252
        have been declared with basic_consume. If the optional arguments
253
        'promise' is given, only messages for this promise will be delivered.
254

  
255
        """
256
        try:
257
            if promise is not None:
258
               self.client.wait(promise, timeout)
259
            else:
260
               self.client.wait(self.consume_promises)
261
        except (socket.error, puka.spec_exceptions.ConnectionForced):
262
            log.debug('Connection closed while receiving messages.')
263
            self.consume_promises = []
264
            try:
265
                self.client.close()
266
            except:
267
                pass
268
            self.connect()
269
            for queues, callback in self.consume_info.items():
270
                self.basic_consume(queues, callback)
271
            self.basic_wait(timeout)
272
        except Exception as e:
273
            if self.client.sd is None:
274
                log.debug('Connection closed while receiving messages.')
275
                self.consume_promises = []
276
                self.connect()
277
                for queues, callback in self.consume_info.items():
278
                    self.basic_consume(queues, callback)
279
                self.basic_wait(timeout)
280
            else:
281
                log.error("Exception while waiting for messages ",e)
282
                raise
283

  
284
    def basic_cancel(self, promise=None):
285
        """Cancel consuming from a queue. """
286
        try:
287
            if promise is not None:
288
                self.client.basic_cancel(promise)
289
            else:
290
                for promise in self.consume_promises:
291
                    self.client.basic_cancel(promise)
292
        except (socket.error, puka.spec_exceptions.ConnectionForced):
293
            pass
294
        except Exception as e:
295
            if self.client.sd is None:
296
                pass;
297
            else:
298
                log.error("Exception while canceling client ",e)
299
                raise
300

  
301

  
302
    @reconnect_decorator
303
    def basic_get(self, queue):
304
        """Get a single message from a queue.
305

  
306
        This is a non-blocking method for getting messages from a queue.
307
        It will return None if the queue is empty.
308

  
309
        """
310
        get_promise = self.client.basic_get(queue=queue)
311
        result = self.client.wait(get_promise)
312
        if 'empty' in result:
313
            # The queue is empty
314
            return None
315
        else:
316
            return result
317

  
318
    def basic_ack(self, message):
319
        """Acknowledge a message. """
320
        try:
321
            self.client.basic_ack(message)
322
            return True
323
        except (socket.error, puka.spec_exceptions.ConnectionForced):
324
            return False
325
        except Exception as e:
326
            if self.client.sd is None:
327
                return False
328
            else:
329
                log.error("Exception while acknowleding message ",e)
330
                raise
331

  
332
    def close(self):
333
        """Close the connection with the AMQP broker. """
334
        try:
335
            close_promise = self.client.close()
336
            self.client.wait(close_promise)
337
        except (socket.error, puka.spec_exceptions.ConnectionForced):
338
            pass
339

  
340
    def queue_delete(self, queue, if_unused=False, if_empty=False):
341
        """Delete a queue.
342

  
343
        Returns False if the queue does not exist
344
        """
345
        try:
346
            promise = self.client.queue_delete(queue=queue, if_unused=if_unused,
347
                                               if_empty=if_empty)
348
            self.client.wait(promise)
349
            return True
350
        except puka.spec_exceptions.NotFound:
351
            log.debug("Queue %s does not exist", queue)
352
            return False
353

  
354
    def exchange_delete(self, exchange, if_unused=False):
355
        """Delete an exchange."""
356
        try:
357

  
358
            promise = self.client.exchange_delete(exchange=exchange,
359
                                                  if_unused=if_unused)
360
            self.client.wait(promise)
361
            return True
362
        except puka.spec_exceptions.NotFound:
363
            log.debug("Exchange %s does not exist", exchange)
364
            return False
365

  
366

  
367

  
368
class AMQPAsyncClient(AMQPClient):
369
    """AMQP client implementing asynchronous sending of messages.
370

  
371
    This client is more efficient that AMQPClient in sending large number
372
    of messages. Messages are confirmed to be sent to the broker in batches
373
    of a size specified by the 'max_buffer' argument.
374

  
375
    Messages are kept to an internal buffer until the max_buffer messages are
376
    sent or until the connection closes. Explicit delivering of messages can be
377
    achieved by calling 'wait_for_promises' method.
378

  
379
    Always remember to close the connection, or messages may be lost.
380

  
381
    """
382
    def __init__(self, hosts=AMQP_HOSTS, max_buffer=5000,
383
                 max_retries=MAX_RETRIES):
384
        AMQPClient.__init__(self, hosts, max_retries)
385
        self.published_msgs = OrderedDict()
386
        self._promise_counter = 0
387
        self.max_buffer=max_buffer
388

  
389
    def basic_publish_multi(self, exhange, routing_key, msgs, headers={}):
390
        while msgs:
391
            msg = msgs.pop[0]
392
            self.basic_publish(exchange, routing_key, msg, headers)
393

  
394
    def basic_publish(self, exchange, routing_key, body, headers={}):
395
        """Publish a message.
396

  
397
        The message will not be actually published to the broker until
398
        'max_buffer' messages are published or wait_for_promises is called.
399

  
400
        """
401
        try:
402
            if not 'delivery_mode' in headers:
403
                headers['delivery_mode'] = 2
404

  
405
            promise = self.client.basic_publish(exchange=exchange,
406
                                                routing_key=routing_key,
407
                                                body=body,
408
                                                headers=headers)
409

  
410
            self._promise_counter += 1
411
            self.published_msgs[promise] = {'exchange':exchange,
412
                                            'routing_key':routing_key,
413
                                            'body':body,
414
                                            'headers':headers}
415

  
416
            if self._promise_counter > self.max_buffer:
417
                self.wait_for_promises()
418

  
419
        except (socket.error, puka.spec_exceptions.ConnectionForced):
420
            log.debug('Connection closed while sending message %s.\
421
                      Reconnecting and retrying' % body)
422
            self.connect()
423
            self.basic_publish(exchange, routing_key, body, headers)
424
            return self._retry_publish_msgs()
425
        except Exception as e:
426
            if self.client.sd is None:
427
                log.debug('Connection closed while sending message %s.\
428
                             Reconnecting and retrying' % body)
429
                self.connect()
430
                self.basic_publish(exchange, routing_key, body, headers)
431
                return self._retry_publish_msgs()
432
            else:
433
                log.error("Exception while publishing message ",e)
434
                raise
435

  
436
    def wait_for_promises(self):
437
        """Wait for confirm that all messages are sent."""
438
        try:
439
            promises = self.published_msgs.keys()
440
            for promise in promises:
441
                self.client.wait(promise)
442
                self.published_msgs.pop(promise)
443
            self._promise_counter = 0
444
        except (socket.error, puka.spec_exceptions.ConnectionForced):
445
            log.debug('Connection closed while waiting from promises')
446
            self.connect()
447
            self._retry_publish_msgs()
448
        except Exception as e:
449
            if self.client.sd is None:
450
                log.debug('Connection closed while waiting from promises')
451
                self.connect()
452
                self._retry_publish_msgs()
453
            else:
454
                log.error("Exception while waiting for promises ",e)
455
                raise
456

  
457
    def _retry_publish_msgs(self):
458
        """Resend messages in case of a connection failure."""
459
        values = self.published_msgs.values()
460
        self.published_msgs = OrderedDict()
461
        for message in values:
462
            exchange = message['exchange']
463
            key = message['routing_key']
464
            body = message['body']
465
            headers = message['headers']
466
            log.debug('Resending message %s' % body)
467
            self.basic_publish(exchange, key, body, headers)
468

  
469
    def close(self):
470
        """Check that messages have been send and close the connection."""
471
        try:
472
            self.wait_for_promises()
473
            close_promise = self.client.close()
474
            self.client.wait(close_promise)
475
        except (socket.error, puka.spec_exceptions.ConnectionForced):
476
            pass
477
        except Exception as e:
478
            if self.client.sd is None:
479
                pass
480
            else:
481
                log.error("Exception while closing the connection ",e)
482
                raise
483

  
484
    def flush_buffer(self):
485
        try:
486
            self.client.on_write()
487
        except (socket.error, puka.spec_exceptions.ConnectionForced):
488
            log.debug('Connection closed while clearing buffer')
489
            self.connect()
490
            return self._retry_publish_msgs()
491
        except Exception as e:
492
            if self.client.sd is None:
493
                log.debug('Connection closed while clearing buffer')
494
                self.connect()
495
                return self._retry_publish_msgs()
496
            else:
497
                log.error("Exception while clearing buffer ",e)
498
                raise
499

  
500
class AMQPConsumer(AMQPClient):
501
    """AMQP client implementing a consumer without callbacks.
502

  
503
    """
504
    def __init__(self, hosts=AMQP_HOSTS, max_buffer=5000,
505
                 max_retries=MAX_RETRIES):
506
        AMQPClient.__init__(self, hosts, max_retries)
507
        self.consume_queues = []
508
        self.consume_promises = []
509

  
510
    @reconnect_decorator
511
    def basic_consume(self, queue, prefetch_count=0):
512
        """Consume from a queue.
513

  
514
        @type queue: string or list of strings
515
        @param queue: the name or list of names from the queues to consume
516
        @type callback: function
517
        @param callback: the callback function to run when a message arrives
518

  
519
        """
520
        if isinstance(queue, str):
521
            queue = [queue]
522
        elif isinstance(queue, list):
523
            pass
524
        else:
525
            raise AttributeError
526

  
527
        # Store the queues and the callback
528
        for q in queue:
529
            self.consume_queues.append(q)
530

  
531
        consume_promise = \
532
                self.client.basic_consume_multi(queues=queue,
533
                                                prefetch_count=prefetch_count)
534
        self.consume_promises.append(consume_promise)
535
        return consume_promise
536

  
537
    def basic_wait(self, promise=None, timeout=0):
538
        """Wait for messages from the queues declared by basic_consume.
539

  
540
        This function will block until a message arrives from the queues that
541
        have been declared with basic_consume. If the optional arguments
542
        'promise' is given, only messages for this promise will be delivered.
543

  
544
        """
545
        try:
546
            if promise is not None:
547
               return self.client.wait(promise, timeout)
548
            else:
549
               return self.client.wait(self.consume_promises)
550
        except (socket.error, puka.spec_exceptions.ConnectionForced):
551
            log.debug('Connection closed while receiving messages.')
552
            self.consume_promises = []
553
            try:
554
                self.client.close()
555
            except:
556
                pass
557
            self.connect()
558
            for queues in self.consume_queues:
559
                self.basic_consume(queues)
560
            self.basic_wait(timeout)
561
        except Exception as e:
562
            if self.client.sd is None:
563
                log.debug('Connection closed while receiving messages.')
564
                self.consume_promises = []
565
                self.connect()
566
                for queues in self.consume_queues:
567
                    self.basic_consume(queues)
568
                self.basic_wait(timeout)
569
            else:
570
                log.error("Exception while waiting for messages ",e)
571
                raise
572

  
573

  
574
class AMQPError(Exception):
575
    def __init__(self, msg):
576
        self.msg = msg
577
    def __str__(self):
578
        return repr(self.msg)
60
    def __new__(cls, *args, **kwargs):
61
        return Client(*args, **kwargs)

Also available in: Unified diff