Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / amqp_puka.py @ 1c65202f

History | View | Annotate | Download (14.8 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
""" Module implementing connection and communication with an AMQP broker.
35

36
AMQP Client's implemented by this module silenty detect connection failures and
37
try to reconnect to any available broker. Also publishing takes advantage of
38
publisher-confirms in order to guarantee that messages are properly delivered
39
to the broker.
40

41
"""
42

    
43
import logging
44

    
45
from puka import Client
46
from puka import spec_exceptions
47
import socket
48
from socket import error as socket_error
49
from time import sleep
50
from random import shuffle
51
from functools import wraps
52
from ordereddict import OrderedDict
53
from synnefo import settings
54

    
55

    
56
def reconnect_decorator(func):
57
    """
58
    Decorator for persistent connection with one or more AMQP brokers.
59

60
    """
61
    @wraps(func)
62
    def wrapper(self, *args, **kwargs):
63
        try:
64
            return func(self, *args, **kwargs)
65
        except (socket_error, spec_exceptions.ConnectionForced) as e:
66
            self.log.error('Connection Closed while in %s: %s', func.__name__, e)
67
            self.connect()
68

    
69
    return wrapper
70

    
71

    
72
class AMQPPukaClient(object):
73
    """
74
    AMQP generic client implementing most of the basic AMQP operations.
75

76
    """
77
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
78
                 confirms=True, confirm_buffer=100, logger=None):
79
        """
80
        Format hosts as "amqp://username:pass@host:port"
81
        max_retries=0 defaults to unlimited retries
82

83
        """
84

    
85
        self.hosts = hosts
86
        shuffle(self.hosts)
87

    
88
        self.max_retries = max_retries
89
        self.confirms = confirms
90
        self.confirm_buffer = confirm_buffer
91

    
92
        self.connection = None
93
        self.channel = None
94
        self.consumers = {}
95
        self.unacked = OrderedDict()
96
        self.unsend = OrderedDict()
97
        self.consume_promises = []
98
        self.exchanges = []
99
        if logger:
100
            self.log = logger
101
        else:
102
            logger = logging.getLogger("amqp")
103
            logging.basicConfig()
104
            self.log = logger
105

    
106
    def connect(self, retries=0):
107
        if self.max_retries and retries >= self.max_retries:
108
            self.log.error("Aborting after %d retries", retries)
109
            raise AMQPConnectionError('Aborting after %d connection failures.'\
110
                                      % retries)
111
            return
112

    
113
        # Pick up a host
114
        host = self.hosts.pop()
115
        self.hosts.insert(0, host)
116

    
117
        self.client = Client(host, pubacks=self.confirms)
118

    
119
        host = host.split('@')[-1]
120
        self.log.debug('Connecting to node %s' % host)
121

    
122
        try:
123
            promise = self.client.connect()
124
            self.client.wait(promise)
125
        except socket_error as e:
126
            if retries < len(self.hosts):
127
                self.log.warning('Cannot connect to host %s: %s', host, e)
128
            else:
129
                self.log.error('Cannot connect to host %s: %s', host, e)
130
                sleep(1)
131
            return self.connect(retries + 1)
132

    
133
        self.log.info('Successfully connected to host: %s', host)
134

    
135
        # Setup TCP keepalive option
136
        self.client.sd.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
137
        # Keepalive time
138
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 20)
139
        # Keepalive interval
140
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 2)
141
        # Keepalive retry
142
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 10)
143

    
144
        self.log.info('Creating channel')
145

    
146
        # Clear consume_promises each time connecting, since they are related
147
        # to the connection object
148
        self.consume_promises = []
149

    
150
        if self.unacked:
151
            self._resend_unacked_messages()
152

    
153
        if self.unsend:
154
            self._resend_unsend_messages()
155

    
156
        if self.consumers:
157
            for queue, callback in self.consumers.items():
158
                self.basic_consume(queue, callback)
159

    
160
        if self.exchanges:
161
            exchanges = self.exchanges
162
            self.exchanges = []
163
            for exchange, type in exchanges:
164
                self.exchange_declare(exchange, type)
165

    
166
    @reconnect_decorator
167
    def reconnect(self):
168
        self.close()
169
        self.connect()
170

    
171
    def exchange_declare(self, exchange, type='direct'):
172
        """Declare an exchange
173
        @type exchange_name: string
174
        @param exchange_name: name of the exchange
175
        @type exchange_type: string
176
        @param exhange_type: one of 'direct', 'topic', 'fanout'
177

178
        """
179
        self.log.info('Declaring %s exchange: %s', type, exchange)
180
        promise = self.client.exchange_declare(exchange=exchange,
181
                                               type=type,
182
                                               durable=True,
183
                                               auto_delete=False)
184
        self.client.wait(promise)
185
        self.exchanges.append((exchange, type))
186

    
187
    @reconnect_decorator
188
    def queue_declare(self, queue, exclusive=False,
189
                      mirrored=True, mirrored_nodes='all',
190
                      dead_letter_exchange=None):
191
        """Declare a queue
192

193
        @type queue: string
194
        @param queue: name of the queue
195
        @param mirrored: whether the queue will be mirrored to other brokers
196
        @param mirrored_nodes: the policy for the mirrored queue.
197
            Available policies:
198
                - 'all': The queue is mirrored to all nodes and the
199
                  master node is the one to which the client is
200
                  connected
201
                - a list of nodes. The queue will be mirrored only to
202
                  the specified nodes, and the master will be the
203
                  first node in the list. Node names must be provided
204
                  and not host IP. example: [node1@rabbit,node2@rabbit]
205

206
        """
207
        self.log.info('Declaring queue: %s', queue)
208

    
209
        if mirrored:
210
            if mirrored_nodes == 'all':
211
                arguments = {'x-ha-policy': 'all'}
212
            elif isinstance(mirrored_nodes, list):
213
                arguments = {'x-ha-policy': 'nodes',
214
                           'x-ha-policy-params': mirrored_nodes}
215
            else:
216
                raise AttributeError
217
        else:
218
            arguments = {}
219

    
220
        if dead_letter_exchange:
221
            arguments['x-dead-letter-exchange'] = dead_letter_exchange
222

    
223
        promise = self.client.queue_declare(queue=queue, durable=True,
224
                                            exclusive=exclusive,
225
                                            auto_delete=False,
226
                                            arguments=arguments)
227
        self.client.wait(promise)
228

    
229
    def queue_bind(self, queue, exchange, routing_key):
230
        self.log.debug('Binding queue %s to exchange %s with key %s'
231
                       % (queue, exchange, routing_key))
232
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
233
                                         routing_key=routing_key)
234
        self.client.wait(promise)
235

    
236
    @reconnect_decorator
237
    def basic_publish(self, exchange, routing_key, body, headers={}):
238
        """Publish a message with a specific routing key """
239
        self._publish(exchange, routing_key, body, headers)
240

    
241
        self.flush_buffer()
242

    
243
        if self.confirms and len(self.unacked) >= self.confirm_buffer:
244
            self.get_confirms()
245

    
246
    @reconnect_decorator
247
    def basic_publish_multi(self, exchange, routing_key, bodies):
248
        for body in bodies:
249
            self.unsend[body] = (exchange, routing_key)
250

    
251
        for body in bodies:
252
            self._publish(exchange, routing_key, body)
253
            self.unsend.pop(body)
254

    
255
        self.flush_buffer()
256

    
257
        if self.confirms:
258
            self.get_confirms()
259

    
260
    def _publish(self, exchange, routing_key, body, headers={}):
261
        # Persisent messages by default!
262
        headers['delivery_mode'] = 2
263
        promise = self.client.basic_publish(exchange=exchange,
264
                                            routing_key=routing_key,
265
                                            body=body, headers=headers)
266

    
267
        if self.confirms:
268
            self.unacked[promise] = (exchange, routing_key, body)
269

    
270
        return promise
271

    
272
    @reconnect_decorator
273
    def flush_buffer(self):
274
        while self.client.needs_write():
275
            self.client.on_write()
276

    
277
    @reconnect_decorator
278
    def get_confirms(self):
279
        for promise in self.unacked.keys():
280
            self.client.wait(promise)
281
            self.unacked.pop(promise)
282

    
283
    @reconnect_decorator
284
    def _resend_unacked_messages(self):
285
        """Resend unacked messages in case of a connection failure."""
286
        msgs = self.unacked.values()
287
        self.unacked.clear()
288
        for exchange, routing_key, body in msgs:
289
            self.log.debug('Resending message %s' % body)
290
            self.basic_publish(exchange, routing_key, body)
291

    
292
    @reconnect_decorator
293
    def _resend_unsend_messages(self):
294
        """Resend unsend messages in case of a connection failure."""
295
        for body in self.unsend.keys():
296
            (exchange, routing_key) = self.unsend[body]
297
            self.basic_publish(exchange, routing_key, body)
298
            self.unsend.pop(body)
299

    
300
    @reconnect_decorator
301
    def basic_consume(self, queue, callback, prefetch_count=0):
302
        """Consume from a queue.
303

304
        @type queue: string or list of strings
305
        @param queue: the name or list of names from the queues to consume
306
        @type callback: function
307
        @param callback: the callback function to run when a message arrives
308

309
        """
310
        # Store the queues and the callback
311
        self.consumers[queue] = callback
312

    
313
        def handle_delivery(promise, msg):
314
            """Hide promises and messages without body"""
315
            if 'body' in msg:
316
                callback(self, msg)
317
            else:
318
                self.log.debug("Message without body %s" % msg)
319
                raise socket_error
320

    
321
        consume_promise = \
322
                self.client.basic_consume(queue=queue,
323
                                          prefetch_count=prefetch_count,
324
                                          callback=handle_delivery)
325

    
326
        self.consume_promises.append(consume_promise)
327
        return consume_promise
328

    
329
    @reconnect_decorator
330
    def basic_wait(self, promise=None, timeout=0):
331
        """Wait for messages from the queues declared by basic_consume.
332

333
        This function will block until a message arrives from the queues that
334
        have been declared with basic_consume. If the optional arguments
335
        'promise' is given, only messages for this promise will be delivered.
336

337
        """
338
        if promise is not None:
339
            return self.client.wait(promise, timeout)
340
        else:
341
            return self.client.wait(self.consume_promises, timeout)
342

    
343
    @reconnect_decorator
344
    def basic_get(self, queue):
345
        """Get a single message from a queue.
346

347
        This is a non-blocking method for getting messages from a queue.
348
        It will return None if the queue is empty.
349

350
        """
351
        get_promise = self.client.basic_get(queue=queue)
352
        result = self.client.wait(get_promise)
353
        if 'empty' in result:
354
            # The queue is empty
355
            return None
356
        else:
357
            return result
358

    
359
    @reconnect_decorator
360
    def basic_ack(self, message):
361
        self.client.basic_ack(message)
362

    
363
    @reconnect_decorator
364
    def basic_nack(self, message):
365
        self.client.basic_ack(message)
366

    
367
    @reconnect_decorator
368
    def basic_reject(self, message, requeue=False):
369
        """Reject a message.
370

371
        If requeue option is False and a dead letter exchange is associated
372
        with the queue, the message will be routed to the dead letter exchange.
373

374
        """
375
        self.client.basic_reject(message, requeue=requeue)
376

    
377
    def close(self):
378
        """Check that messages have been send and close the connection."""
379
        self.log.debug("Closing connection to %s", self.client.host)
380
        try:
381
            if self.confirms:
382
                self.get_confirms()
383
            close_promise = self.client.close()
384
            self.client.wait(close_promise)
385
        except (socket_error, spec_exceptions.ConnectionForced) as e:
386
            self.log.error('Connection closed while closing connection:%s', e)
387

    
388
    def queue_delete(self, queue, if_unused=True, if_empty=True):
389
        """Delete a queue.
390

391
        Returns False if the queue does not exist
392
        """
393
        try:
394
            promise = self.client.queue_delete(queue=queue,
395
                                               if_unused=if_unused,
396
                                               if_empty=if_empty)
397
            self.client.wait(promise)
398
            return True
399
        except spec_exceptions.NotFound:
400
            self.log.info("Queue %s does not exist", queue)
401
            return False
402

    
403
    def exchange_delete(self, exchange, if_unused=True):
404
        """Delete an exchange."""
405
        try:
406

    
407
            promise = self.client.exchange_delete(exchange=exchange,
408
                                                  if_unused=if_unused)
409
            self.client.wait(promise)
410
            return True
411
        except spec_exceptions.NotFound:
412
            self.log.info("Exchange %s does not exist", exchange)
413
            return False
414

    
415
    @reconnect_decorator
416
    def basic_cancel(self, promise=None):
417
        """Cancel consuming from a queue. """
418
        if promise is not None:
419
            self.client.basic_cancel(promise)
420
        else:
421
            for promise in self.consume_promises:
422
                self.client.basic_cancel(promise)
423

    
424

    
425
class AMQPConnectionError(Exception):
426
    pass