Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / amqp.py @ 597e7eba

History | View | Annotate | Download (21 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
""" Module implementing connection and communication with an AMQP broker.
35

36
AMQP Client's implemented by this module silenty detect connection failures and
37
try to reconnect to any available broker. Also publishing takes advantage of
38
publisher-confirms in order to guarantee that messages are properly delivered
39
to the broker.
40

41
"""
42

    
43
import puka
44
import logging
45
import socket
46

    
47
from time import sleep
48
from random import shuffle
49
from functools import wraps
50

    
51
from ordereddict import OrderedDict
52
from synnefo import settings
53

    
54
AMQP_HOSTS = settings.AMQP_HOSTS
55

    
56
MAX_RETRIES = 20
57

    
58
log = logging.getLogger()
59

    
60

    
61
def reconnect_decorator(func):
62
    """
63
    Decorator for persistent connection with one or more AMQP brokers.
64

65
    """
66
    @wraps(func)
67
    def wrapper(self, *args, **kwargs):
68
        try:
69
            if self.client.sd is None:
70
                self.connect()
71
            return func(self, *args, **kwargs)
72
        except (socket.error, puka.spec_exceptions.ConnectionForced):
73
            log.debug("Connection closed. Reconnecting")
74
            self.connect()
75
            return wrapper(self, *args, **kwargs)
76
        except Exception:
77
            if self.client.sd is None:
78
                log.debug("Connection closed. Reconnecting")
79
                self.connect()
80
                return wrapper(self, *args, **kwargs)
81
            else:
82
                raise
83
    return wrapper
84

    
85

    
86
class AMQPClient():
87
    """
88
    AMQP generic client implementing most of the basic AMQP operations.
89

90
    This client confirms delivery of each published message before publishing
91
    the next one, which results in low performance. Better performance can be
92
    achieved by using AMQPAsyncClient.
93

94
    """
95
    def __init__(self, hosts=AMQP_HOSTS, max_retries=MAX_RETRIES):
96
        """Format hosts as "amqp://username:pass@host:port" """
97
        # Shuffle the elements of the host list for better load balancing
98
        self.hosts = hosts
99
        shuffle(self.hosts)
100
        self.max_retries = max_retries
101

    
102
        self.promises = []
103
        self.consume_promises = []
104
        self.consume_info = {}
105

    
106
    def connect(self, retries=0):
107
        # Pick up a host
108
        url = self.hosts.pop()
109
        self.hosts.insert(0, url)
110

    
111
        if retries > self.max_retries:
112
            raise AMQPError("Cannot connect to any node after %s attemps" % retries)
113
        if retries > 2 * len(self.hosts):
114
            sleep(1)
115

    
116
        self.client = puka.Client(url, pubacks=True)
117

    
118
        host = url.split('@')[-1]
119
        log.debug('Connecting to node %s' % host)
120

    
121
        try:
122
            promise = self.client.connect()
123
            self.client.wait(promise)
124
        except socket.error as e:
125
            log.debug('Cannot connect to node %s.' % host)
126
            return self.connect(retries+1)
127

    
128
    @reconnect_decorator
129
    def exchange_declare(self, exchange_name, exchange_type='direct',
130
                         durable=True, auto_delete=False):
131
        """Declare an exchange
132
        @type exchange_name: string
133
        @param exchange_name: name of the exchange
134
        @type exchange_type: string
135
        @param exhange_type: one of 'direct', 'topic', 'fanout'
136

137
        """
138
        log.debug('Declaring exchange %s of %s type.'
139
                  %(exchange_name, exchange_type))
140
        promise = self.client.exchange_declare(exchange=exchange_name,
141
                                               type=exchange_type,
142
                                               durable=durable,
143
                                               auto_delete=auto_delete)
144
        self.client.wait(promise)
145
        log.debug('Exchange %s declared succesfully ' % exchange_name)
146

    
147
    @reconnect_decorator
148
    def queue_declare(self, queue, durable=True, exclusive=False,
149
                      auto_delete=False, mirrored=True, mirrored_nodes='all'):
150
        """Declare a queue
151

152
        @type queue: string
153
        @param queue: name of the queue
154
        @param mirrored: whether the queue will be mirrored to other brokers
155
        @param mirrored_nodes: the policy for the mirrored queue.
156
            Available policies:
157
                - 'all': The queue is mirrored to all nodes and the
158
                  master node is the one to which the client is
159
                  connected
160
                - a list of nodes. The queue will be mirrored only to
161
                  the specified nodes, and the master will be the
162
                  first node in the list. Node names must be provided
163
                  and not host IP. example: [node1@rabbit,node2@rabbit]
164

165
        """
166

    
167
        log.debug('Declaring queue %s' % queue)
168
        if mirrored:
169
            if mirrored_nodes == 'all':
170
                arguments={'x-ha-policy':'all'}
171
            elif isinstance(mirrored_nodes, list):
172
                arguments={'x-ha-policy':'nodes', 'x-ha-policy-params':mirrored_nodes}
173
            else:
174
                raise AttributeError
175
        else:
176
            arguments = {}
177

    
178
        promise = self.client.queue_declare(queue=queue, durable=durable,
179
                                            exclusive=exclusive,
180
                                            auto_delete=auto_delete,
181
                                            arguments=arguments)
182
        self.client.wait(promise)
183
        log.debug('Queue %s declared successfully.' % queue)
184

    
185
    @reconnect_decorator
186
    def queue_bind(self, queue, exchange, routing_key):
187
        log.debug('Binding queue %s to exchange %s with key %s'
188
                 % (queue, exchange, routing_key))
189
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
190
                                         routing_key=routing_key)
191
        self.client.wait(promise)
192
        log.debug('Binding completed successfully')
193

    
194
    def basic_publish_multi(self, exhange, routing_key, msgs, headers={}):
195
        """Send many messages to the  same exchange and with the same key """
196
        for msg in msgs:
197
            self.basic_publish(exchange, routing_key, headers, msg)
198

    
199
    @reconnect_decorator
200
    def basic_publish(self, exchange, routing_key, body, headers={}):
201
        """Publish a message with a specific routing key """
202

    
203
        # Persisent messages by default!
204
        if not 'delivery_mode' in headers:
205
            headers['delivery_mode'] = 2
206

    
207
        promise = self.client.basic_publish(exchange=exchange,
208
                                            routing_key=routing_key,
209
                                            body=body, headers=headers)
210
        self.client.wait(promise)
211

    
212
    @reconnect_decorator
213
    def basic_consume(self, queue, callback, prefetch_count=0):
214
        """Consume from a queue.
215

216
        @type queue: string or list of strings
217
        @param queue: the name or list of names from the queues to consume
218
        @type callback: function
219
        @param callback: the callback function to run when a message arrives
220

221
        """
222
        if isinstance(queue, str):
223
            queue = [queue]
224
        elif isinstance(queue, list):
225
            pass
226
        else:
227
            raise AttributeError
228

    
229
        # Store the queues and the callback
230
        for q in queue:
231
            self.consume_info[q] = callback
232

    
233
        def handle_delivery(promise, result):
234
            """Hide promises and messages without body"""
235
            if 'body' in result:
236
                callback(self, result)
237
            else:
238
                log.debug("Message without body %s" % result)
239
                return
240

    
241
        consume_promise = \
242
                self.client.basic_consume_multi(queues=queue,
243
                                                prefetch_count=prefetch_count,
244
                                                callback=handle_delivery)
245
        self.consume_promises.append(consume_promise)
246
        return consume_promise
247

    
248
    def basic_wait(self, promise=None, timeout=0):
249
        """Wait for messages from the queues declared by basic_consume.
250

251
        This function will block until a message arrives from the queues that
252
        have been declared with basic_consume. If the optional arguments
253
        'promise' is given, only messages for this promise will be delivered.
254

255
        """
256
        try:
257
            if promise is not None:
258
               self.client.wait(promise, timeout)
259
            else:
260
               self.client.wait(self.consume_promises)
261
        except (socket.error, puka.spec_exceptions.ConnectionForced):
262
            log.debug('Connection closed while receiving messages.')
263
            self.consume_promises = []
264
            self.connect()
265
            for queues, callback in self.consume_info.items():
266
                self.basic_consume(queues, callback)
267
            self.basic_wait(timeout)
268
        except Exception as e:
269
            if self.client.sd is None:
270
                log.debug('Connection closed while receiving messages.')
271
                self.consume_promises = []
272
                self.connect()
273
                for queues, callback in self.consume_info.items():
274
                    self.basic_consume(queues, callback)
275
                self.basic_wait(timeout)
276
            else:
277
                log.error("Exception while waiting for messages ",e)
278
                raise
279

    
280
    def basic_cancel(self, promise=None):
281
        """Cancel consuming from a queue. """
282
        try:
283
            if promise is not None:
284
                self.client.basic_cancel(promise)
285
            else:
286
                for promise in self.consume_promises:
287
                    self.client.basic_cancel(promise)
288
        except (socket.error, puka.spec_exceptions.ConnectionForced):
289
            pass
290
        except Exception as e:
291
            if self.client.sd is None:
292
                pass;
293
            else:
294
                log.error("Exception while canceling client ",e)
295
                raise
296

    
297

    
298
    @reconnect_decorator
299
    def basic_get(self, queue):
300
        """Get a single message from a queue.
301

302
        This is a non-blocking method for getting messages from a queue.
303
        It will return None if the queue is empty.
304

305
        """
306
        get_promise = self.client.basic_get(queue=queue)
307
        result = self.client.wait(get_promise)
308
        if 'empty' in result:
309
            # The queue is empty
310
            return None
311
        else:
312
            return result
313

    
314
    def basic_ack(self, message):
315
        """Acknowledge a message. """
316
        try:
317
            self.client.basic_ack(message)
318
            return True
319
        except (socket.error, puka.spec_exceptions.ConnectionForced):
320
            return False
321
        except Exception as e:
322
            if self.client.sd is None:
323
                return False
324
            else:
325
                log.error("Exception while acknowleding message ",e)
326
                raise
327

    
328
    def close(self):
329
        """Close the connection with the AMQP broker. """
330
        try:
331
            close_promise = self.client.close()
332
            self.client.wait(close_promise)
333
        except (socket.error, puka.spec_exceptions.ConnectionForced):
334
            pass
335

    
336
    def queue_delete(self, queue, if_unused=False, if_empty=False):
337
        """Delete a queue.
338

339
        Returns False if the queue does not exist
340
        """
341
        try:
342
            promise = self.client.queue_delete(queue=queue, if_unused=if_unused,
343
                                               if_empty=if_empty)
344
            self.client.wait(promise)
345
            return True
346
        except puka.spec_exceptions.NotFound:
347
            log.debug("Queue %s does not exist", queue)
348
            return False
349

    
350
    def exchange_delete(self, exchange, if_unused=False):
351
        """Delete an exchange."""
352
        try:
353

    
354
            promise = self.client.exchange_delete(exchange=exchange,
355
                                                  if_unused=if_unused)
356
            self.client.wait(promise)
357
            return True
358
        except puka.spec_exceptions.NotFound:
359
            log.debug("Exchange %s does not exist", exchange)
360
            return False
361

    
362

    
363

    
364
class AMQPAsyncClient(AMQPClient):
365
    """AMQP client implementing asynchronous sending of messages.
366

367
    This client is more efficient that AMQPClient in sending large number
368
    of messages. Messages are confirmed to be sent to the broker in batches
369
    of a size specified by the 'max_buffer' argument.
370

371
    Messages are kept to an internal buffer until the max_buffer messages are
372
    sent or until the connection closes. Explicit delivering of messages can be
373
    achieved by calling 'wait_for_promises' method.
374

375
    Always remember to close the connection, or messages may be lost.
376

377
    """
378
    def __init__(self, hosts=AMQP_HOSTS, max_buffer=5000,
379
                 max_retries=MAX_RETRIES):
380
        AMQPClient.__init__(self, hosts, max_retries)
381
        self.published_msgs = OrderedDict()
382
        self._promise_counter = 0
383
        self.max_buffer=max_buffer
384

    
385
    def basic_publish_multi(self, exhange, routing_key, msgs, headers={}):
386
        while msgs:
387
            msg = msgs.pop[0]
388
            self.basic_publish(exchange, routing_key, msg, headers)
389

    
390
    def basic_publish(self, exchange, routing_key, body, headers={}):
391
        """Publish a message.
392

393
        The message will not be actually published to the broker until
394
        'max_buffer' messages are published or wait_for_promises is called.
395

396
        """
397
        try:
398
            if not 'delivery_mode' in headers:
399
                headers['delivery_mode'] = 2
400

    
401
            promise = self.client.basic_publish(exchange=exchange,
402
                                                routing_key=routing_key,
403
                                                body=body,
404
                                                headers=headers)
405

    
406
            self._promise_counter += 1
407
            self.published_msgs[promise] = {'exchange':exchange,
408
                                            'routing_key':routing_key,
409
                                            'body':body,
410
                                            'headers':headers}
411

    
412
            if self._promise_counter > self.max_buffer:
413
                self.wait_for_promises()
414

    
415
        except (socket.error, puka.spec_exceptions.ConnectionForced):
416
            log.debug('Connection closed while sending message %s.\
417
                      Reconnecting and retrying' % body)
418
            self.connect()
419
            self.basic_publish(exchange, routing_key, body, headers)
420
            return self._retry_publish_msgs()
421
        except Exception as e:
422
            if self.client.sd is None:
423
                log.debug('Connection closed while sending message %s.\
424
                             Reconnecting and retrying' % body)
425
                self.connect()
426
                self.basic_publish(exchange, routing_key, body, headers)
427
                return self._retry_publish_msgs()
428
            else:
429
                log.error("Exception while publishing message ",e)
430
                raise
431

    
432
    def wait_for_promises(self):
433
        """Wait for confirm that all messages are sent."""
434
        try:
435
            promises = self.published_msgs.keys()
436
            for promise in promises:
437
                self.client.wait(promise)
438
                self.published_msgs.pop(promise)
439
            self._promise_counter = 0
440
        except (socket.error, puka.spec_exceptions.ConnectionForced):
441
            log.debug('Connection closed while waiting from promises')
442
            self.connect()
443
            self._retry_publish_msgs()
444
        except Exception as e:
445
            if self.client.sd is None:
446
                log.debug('Connection closed while waiting from promises')
447
                self.connect()
448
                self._retry_publish_msgs()
449
            else:
450
                log.error("Exception while waiting for promises ",e)
451
                raise
452

    
453
    def _retry_publish_msgs(self):
454
        """Resend messages in case of a connection failure."""
455
        values = self.published_msgs.values()
456
        self.published_msgs = OrderedDict()
457
        for message in values:
458
            exchange = message['exchange']
459
            key = message['routing_key']
460
            body = message['body']
461
            headers = message['headers']
462
            log.debug('Resending message %s' % body)
463
            self.basic_publish(exchange, key, body, headers)
464

    
465
    def close(self):
466
        """Check that messages have been send and close the connection."""
467
        try:
468
            self.wait_for_promises()
469
            close_promise = self.client.close()
470
            self.client.wait(close_promise)
471
        except (socket.error, puka.spec_exceptions.ConnectionForced):
472
            pass
473
        except Exception as e:
474
            if self.client.sd is None:
475
                pass
476
            else:
477
                log.error("Exception while closing the connection ",e)
478
                raise
479

    
480
    def flush_buffer(self):
481
        try:
482
            self.client.on_write()
483
        except (socket.error, puka.spec_exceptions.ConnectionForced):
484
            log.debug('Connection closed while clearing buffer')
485
            self.connect()
486
            return self._retry_publish_msgs()
487
        except Exception as e:
488
            if self.client.sd is None:
489
                log.debug('Connection closed while clearing buffer')
490
                self.connect()
491
                return self._retry_publish_msgs()
492
            else:
493
                log.error("Exception while clearing buffer ",e)
494
                raise
495

    
496
class AMQPConsumer(AMQPClient):
497
    """AMQP client implementing a consumer without callbacks.
498

499
    """
500
    def __init__(self, hosts=AMQP_HOSTS, max_buffer=5000,
501
                 max_retries=MAX_RETRIES):
502
        AMQPClient.__init__(self, hosts, max_retries)
503
        self.consume_queues = []
504
        self.consume_promises = []
505

    
506
    @reconnect_decorator
507
    def basic_consume(self, queue, prefetch_count=0):
508
        """Consume from a queue.
509

510
        @type queue: string or list of strings
511
        @param queue: the name or list of names from the queues to consume
512
        @type callback: function
513
        @param callback: the callback function to run when a message arrives
514

515
        """
516
        if isinstance(queue, str):
517
            queue = [queue]
518
        elif isinstance(queue, list):
519
            pass
520
        else:
521
            raise AttributeError
522

    
523
        # Store the queues and the callback
524
        for q in queue:
525
            self.consume_queues.append(q)
526

    
527
        consume_promise = \
528
                self.client.basic_consume_multi(queues=queue,
529
                                                prefetch_count=prefetch_count)
530
        self.consume_promises.append(consume_promise)
531
        return consume_promise
532

    
533
    def basic_wait(self, promise=None, timeout=0):
534
        """Wait for messages from the queues declared by basic_consume.
535

536
        This function will block until a message arrives from the queues that
537
        have been declared with basic_consume. If the optional arguments
538
        'promise' is given, only messages for this promise will be delivered.
539

540
        """
541
        try:
542
            if promise is not None:
543
               return self.client.wait(promise, timeout)
544
            else:
545
               return self.client.wait(self.consume_promises)
546
        except (socket.error, puka.spec_exceptions.ConnectionForced):
547
            log.debug('Connection closed while receiving messages.')
548
            self.consume_promises = []
549
            self.connect()
550
            for queues in self.consume_queues:
551
                self.basic_consume(queues)
552
            self.basic_wait(timeout)
553
        except Exception as e:
554
            if self.client.sd is None:
555
                log.debug('Connection closed while receiving messages.')
556
                self.consume_promises = []
557
                self.connect()
558
                for queues in self.consume_queues:
559
                    self.basic_consume(queues)
560
                self.basic_wait(timeout)
561
            else:
562
                log.error("Exception while waiting for messages ",e)
563
                raise
564

    
565

    
566
class AMQPError(Exception):
567
    def __init__(self, msg):
568
        self.msg = msg
569
    def __str__(self):
570
        return repr(self.msg)