Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / amqp_puka.py @ fdfd8c6d

History | View | Annotate | Download (14.7 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
""" Module implementing connection and communication with an AMQP broker.
35

36
AMQP Client's implemented by this module silenty detect connection failures and
37
try to reconnect to any available broker. Also publishing takes advantage of
38
publisher-confirms in order to guarantee that messages are properly delivered
39
to the broker.
40

41
"""
42

    
43
import logging
44

    
45
from puka import Client
46
from puka import spec_exceptions
47
import socket
48
from socket import error as socket_error
49
from time import sleep
50
from random import shuffle
51
from functools import wraps
52
from ordereddict import OrderedDict
53
from synnefo import settings
54

    
55
logger = logging.getLogger("amqp")
56

    
57

    
58
def reconnect_decorator(func):
59
    """
60
    Decorator for persistent connection with one or more AMQP brokers.
61

62
    """
63
    @wraps(func)
64
    def wrapper(self, *args, **kwargs):
65
        try:
66
            return func(self, *args, **kwargs)
67
        except (socket_error, spec_exceptions.ConnectionForced) as e:
68
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
69
            self.connect()
70

    
71
    return wrapper
72

    
73

    
74
class AMQPPukaClient(object):
75
    """
76
    AMQP generic client implementing most of the basic AMQP operations.
77

78
    """
79
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
80
                 confirms=True, confirm_buffer=100):
81
        """
82
        Format hosts as "amqp://username:pass@host:port"
83
        max_retries=0 defaults to unlimited retries
84

85
        """
86

    
87
        self.hosts = hosts
88
        shuffle(self.hosts)
89

    
90
        self.max_retries = max_retries
91
        self.confirms = confirms
92
        self.confirm_buffer = confirm_buffer
93

    
94
        self.connection = None
95
        self.channel = None
96
        self.consumers = {}
97
        self.unacked = OrderedDict()
98
        self.unsend = OrderedDict()
99
        self.consume_promises = []
100
        self.exchanges = []
101

    
102
    def connect(self, retries=0):
103
        if self.max_retries and retries >= self.max_retries:
104
            logger.error("Aborting after %d retries", retries)
105
            raise AMQPConnectionError('Aborting after %d connection failures.'\
106
                                      % retries)
107
            return
108

    
109
        # Pick up a host
110
        host = self.hosts.pop()
111
        self.hosts.insert(0, host)
112

    
113
        self.client = Client(host, pubacks=self.confirms)
114

    
115
        host = host.split('@')[-1]
116
        logger.debug('Connecting to node %s' % host)
117

    
118
        try:
119
            promise = self.client.connect()
120
            self.client.wait(promise)
121
        except socket_error as e:
122
            if retries < len(self.hosts):
123
                logger.warning('Cannot connect to host %s: %s', host, e)
124
            else:
125
                logger.error('Cannot connect to host %s: %s', host, e)
126
                sleep(1)
127
            return self.connect(retries + 1)
128

    
129
        logger.info('Successfully connected to host: %s', host)
130

    
131
        # Setup TCP keepalive option
132
        self.client.sd.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
133
        # Keepalive time
134
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 20)
135
        # Keepalive interval
136
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 2)
137
        # Keepalive retry
138
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 10)
139

    
140
        logger.info('Creating channel')
141

    
142
        # Clear consume_promises each time connecting, since they are related
143
        # to the connection object
144
        self.consume_promises = []
145

    
146
        if self.unacked:
147
            self._resend_unacked_messages()
148

    
149
        if self.unsend:
150
            self._resend_unsend_messages()
151

    
152
        if self.consumers:
153
            for queue, callback in self.consumers.items():
154
                self.basic_consume(queue, callback)
155

    
156
        if self.exchanges:
157
            exchanges = self.exchanges
158
            self.exchanges = []
159
            for exchange, type in exchanges:
160
                self.exchange_declare(exchange, type)
161

    
162
    @reconnect_decorator
163
    def reconnect(self):
164
        self.close()
165
        self.connect()
166

    
167
    def exchange_declare(self, exchange, type='direct'):
168
        """Declare an exchange
169
        @type exchange_name: string
170
        @param exchange_name: name of the exchange
171
        @type exchange_type: string
172
        @param exhange_type: one of 'direct', 'topic', 'fanout'
173

174
        """
175
        logger.info('Declaring %s exchange: %s', type, exchange)
176
        promise = self.client.exchange_declare(exchange=exchange,
177
                                               type=type,
178
                                               durable=True,
179
                                               auto_delete=False)
180
        self.client.wait(promise)
181
        self.exchanges.append((exchange, type))
182

    
183
    @reconnect_decorator
184
    def queue_declare(self, queue, exclusive=False,
185
                      mirrored=True, mirrored_nodes='all',
186
                      dead_letter_exchange=None):
187
        """Declare a queue
188

189
        @type queue: string
190
        @param queue: name of the queue
191
        @param mirrored: whether the queue will be mirrored to other brokers
192
        @param mirrored_nodes: the policy for the mirrored queue.
193
            Available policies:
194
                - 'all': The queue is mirrored to all nodes and the
195
                  master node is the one to which the client is
196
                  connected
197
                - a list of nodes. The queue will be mirrored only to
198
                  the specified nodes, and the master will be the
199
                  first node in the list. Node names must be provided
200
                  and not host IP. example: [node1@rabbit,node2@rabbit]
201

202
        """
203
        logger.info('Declaring queue: %s', queue)
204

    
205
        if mirrored:
206
            if mirrored_nodes == 'all':
207
                arguments = {'x-ha-policy': 'all'}
208
            elif isinstance(mirrored_nodes, list):
209
                arguments = {'x-ha-policy': 'nodes',
210
                           'x-ha-policy-params': mirrored_nodes}
211
            else:
212
                raise AttributeError
213
        else:
214
            arguments = {}
215

    
216
        if dead_letter_exchange:
217
            arguments['x-dead-letter-exchange'] = dead_letter_exchange
218

    
219
        promise = self.client.queue_declare(queue=queue, durable=True,
220
                                            exclusive=exclusive,
221
                                            auto_delete=False,
222
                                            arguments=arguments)
223
        self.client.wait(promise)
224

    
225
    def queue_bind(self, queue, exchange, routing_key):
226
        logger.debug('Binding queue %s to exchange %s with key %s'
227
                 % (queue, exchange, routing_key))
228
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
229
                                         routing_key=routing_key)
230
        self.client.wait(promise)
231

    
232
    @reconnect_decorator
233
    def basic_publish(self, exchange, routing_key, body, headers={}):
234
        """Publish a message with a specific routing key """
235
        self._publish(exchange, routing_key, body, headers)
236

    
237
        self.flush_buffer()
238

    
239
        if self.confirms and len(self.unacked) >= self.confirm_buffer:
240
            self.get_confirms()
241

    
242
    @reconnect_decorator
243
    def basic_publish_multi(self, exchange, routing_key, bodies):
244
        for body in bodies:
245
            self.unsend[body] = (exchange, routing_key)
246

    
247
        for body in bodies:
248
            self._publish(exchange, routing_key, body)
249
            self.unsend.pop(body)
250

    
251
        self.flush_buffer()
252

    
253
        if self.confirms:
254
            self.get_confirms()
255

    
256
    def _publish(self, exchange, routing_key, body, headers={}):
257
        # Persisent messages by default!
258
        headers['delivery_mode'] = 2
259
        promise = self.client.basic_publish(exchange=exchange,
260
                                            routing_key=routing_key,
261
                                            body=body, headers=headers)
262

    
263
        if self.confirms:
264
            self.unacked[promise] = (exchange, routing_key, body)
265

    
266
        return promise
267

    
268
    @reconnect_decorator
269
    def flush_buffer(self):
270
        while self.client.needs_write():
271
            self.client.on_write()
272

    
273
    @reconnect_decorator
274
    def get_confirms(self):
275
        for promise in self.unacked.keys():
276
            self.client.wait(promise)
277
            self.unacked.pop(promise)
278

    
279
    @reconnect_decorator
280
    def _resend_unacked_messages(self):
281
        """Resend unacked messages in case of a connection failure."""
282
        msgs = self.unacked.values()
283
        self.unacked.clear()
284
        for exchange, routing_key, body in msgs:
285
            logger.debug('Resending message %s' % body)
286
            self.basic_publish(exchange, routing_key, body)
287

    
288
    @reconnect_decorator
289
    def _resend_unsend_messages(self):
290
        """Resend unsend messages in case of a connection failure."""
291
        for body in self.unsend.keys():
292
            (exchange, routing_key) = self.unsend[body]
293
            self.basic_publish(exchange, routing_key, body)
294
            self.unsend.pop(body)
295

    
296
    @reconnect_decorator
297
    def basic_consume(self, queue, callback, prefetch_count=0):
298
        """Consume from a queue.
299

300
        @type queue: string or list of strings
301
        @param queue: the name or list of names from the queues to consume
302
        @type callback: function
303
        @param callback: the callback function to run when a message arrives
304

305
        """
306
        # Store the queues and the callback
307
        self.consumers[queue] = callback
308

    
309
        def handle_delivery(promise, msg):
310
            """Hide promises and messages without body"""
311
            if 'body' in msg:
312
                callback(self, msg)
313
            else:
314
                logger.debug("Message without body %s" % msg)
315
                raise socket_error
316

    
317
        consume_promise = \
318
                self.client.basic_consume(queue=queue,
319
                                          prefetch_count=prefetch_count,
320
                                          callback=handle_delivery)
321

    
322
        self.consume_promises.append(consume_promise)
323
        return consume_promise
324

    
325
    @reconnect_decorator
326
    def basic_wait(self, promise=None, timeout=0):
327
        """Wait for messages from the queues declared by basic_consume.
328

329
        This function will block until a message arrives from the queues that
330
        have been declared with basic_consume. If the optional arguments
331
        'promise' is given, only messages for this promise will be delivered.
332

333
        """
334
        if promise is not None:
335
            return self.client.wait(promise, timeout)
336
        else:
337
            return self.client.wait(self.consume_promises, timeout)
338

    
339
    @reconnect_decorator
340
    def basic_get(self, queue):
341
        """Get a single message from a queue.
342

343
        This is a non-blocking method for getting messages from a queue.
344
        It will return None if the queue is empty.
345

346
        """
347
        get_promise = self.client.basic_get(queue=queue)
348
        result = self.client.wait(get_promise)
349
        if 'empty' in result:
350
            # The queue is empty
351
            return None
352
        else:
353
            return result
354

    
355
    @reconnect_decorator
356
    def basic_ack(self, message):
357
        self.client.basic_ack(message)
358

    
359
    @reconnect_decorator
360
    def basic_nack(self, message):
361
        self.client.basic_ack(message)
362

    
363
    @reconnect_decorator
364
    def basic_reject(self, message, requeue=False):
365
        """Reject a message.
366

367
        If requeue option is False and a dead letter exchange is associated
368
        with the queue, the message will be routed to the dead letter exchange.
369

370
        """
371
        self.client.basic_reject(message, requeue=requeue)
372

    
373
    def close(self):
374
        """Check that messages have been send and close the connection."""
375
        logger.debug("Closing connection to %s", self.client.host)
376
        try:
377
            if self.confirms:
378
                self.get_confirms()
379
            close_promise = self.client.close()
380
            self.client.wait(close_promise)
381
        except (socket_error, spec_exceptions.ConnectionForced) as e:
382
            logger.error('Connection closed while closing connection:%s',
383
                          e)
384

    
385
    def queue_delete(self, queue, if_unused=True, if_empty=True):
386
        """Delete a queue.
387

388
        Returns False if the queue does not exist
389
        """
390
        try:
391
            promise = self.client.queue_delete(queue=queue,
392
                                               if_unused=if_unused,
393
                                               if_empty=if_empty)
394
            self.client.wait(promise)
395
            return True
396
        except spec_exceptions.NotFound:
397
            logger.info("Queue %s does not exist", queue)
398
            return False
399

    
400
    def exchange_delete(self, exchange, if_unused=True):
401
        """Delete an exchange."""
402
        try:
403

    
404
            promise = self.client.exchange_delete(exchange=exchange,
405
                                                  if_unused=if_unused)
406
            self.client.wait(promise)
407
            return True
408
        except spec_exceptions.NotFound:
409
            logger.info("Exchange %s does not exist", exchange)
410
            return False
411

    
412
    @reconnect_decorator
413
    def basic_cancel(self, promise=None):
414
        """Cancel consuming from a queue. """
415
        if promise is not None:
416
            self.client.basic_cancel(promise)
417
        else:
418
            for promise in self.consume_promises:
419
                self.client.basic_cancel(promise)
420

    
421

    
422
class AMQPConnectionError(Exception):
423
    pass