Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / amqp_puka.py @ d01cd522

History | View | Annotate | Download (13.4 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
""" Module implementing connection and communication with an AMQP broker.
35

36
AMQP Client's implemented by this module silenty detect connection failures and
37
try to reconnect to any available broker. Also publishing takes advantage of
38
publisher-confirms in order to guarantee that messages are properly delivered
39
to the broker.
40

41
"""
42

    
43
import logging
44

    
45
from puka import Client
46
from puka import spec_exceptions
47
from socket import error as socket_error
48
from time import sleep
49
from random import shuffle
50
from functools import wraps
51
from ordereddict import OrderedDict
52
from synnefo import settings
53

    
54
logger = logging.getLogger()
55

    
56

    
57
def reconnect_decorator(func):
58
    """
59
    Decorator for persistent connection with one or more AMQP brokers.
60

61
    """
62
    @wraps(func)
63
    def wrapper(self, *args, **kwargs):
64
        try:
65
            return func(self, *args, **kwargs)
66
        except (socket_error, spec_exceptions.ConnectionForced) as e:
67
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
68
            self.consume_promises = []
69
            self.connect()
70

    
71
    return wrapper
72

    
73

    
74
class AMQPPukaClient(object):
75
    """
76
    AMQP generic client implementing most of the basic AMQP operations.
77

78
    """
79
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
80
                 confirms=True, confirm_buffer=100):
81
        """
82
        Format hosts as "amqp://username:pass@host:port"
83
        max_retries=0 defaults to unlimited retries
84

85
        """
86

    
87
        self.hosts = hosts
88
        shuffle(self.hosts)
89

    
90
        self.max_retries = max_retries
91
        self.confirms = confirms
92
        self.confirm_buffer = confirm_buffer
93

    
94
        self.connection = None
95
        self.channel = None
96
        self.consumers = {}
97
        self.unacked = OrderedDict()
98
        self.unsend = OrderedDict()
99
        self.consume_promises = []
100
        self.exchanges = []
101

    
102
    def connect(self, retries=0):
103
        if self.max_retries and retries >= self.max_retries:
104
            logger.error("Aborting after %d retries", retries)
105
            raise AMQPConnectionError('Aborting after %d connection failures.'\
106
                                      % retries)
107
            return
108

    
109
        # Pick up a host
110
        host = self.hosts.pop()
111
        self.hosts.insert(0, host)
112

    
113
        self.client = Client(host, pubacks=self.confirms)
114

    
115
        host = host.split('@')[-1]
116
        logger.debug('Connecting to node %s' % host)
117

    
118
        try:
119
            promise = self.client.connect()
120
            self.client.wait(promise)
121
        except socket_error as e:
122
            if retries < len(self.hosts):
123
                logger.warning('Cannot connect to host %s: %s', host, e)
124
            else:
125
                logger.error('Cannot connect to host %s: %s', host, e)
126
                sleep(1)
127
            return self.connect(retries + 1)
128

    
129
        logger.info('Successfully connected to host: %s', host)
130

    
131
        logger.info('Creating channel')
132

    
133
        if self.unacked:
134
            self._resend_unacked_messages()
135

    
136
        if self.unsend:
137
            self._resend_unsend_messages()
138

    
139
        if self.consumers:
140
            for queue, callback in self.consumers.items():
141
                self.basic_consume(queue, callback)
142

    
143
        if self.exchanges:
144
            exchanges = self.exchanges
145
            self.exchanges = []
146
            for exchange, type in exchanges:
147
                self.exchange_declare(exchange, type)
148

    
149
    def exchange_declare(self, exchange, type='direct'):
150
        """Declare an exchange
151
        @type exchange_name: string
152
        @param exchange_name: name of the exchange
153
        @type exchange_type: string
154
        @param exhange_type: one of 'direct', 'topic', 'fanout'
155

156
        """
157
        logger.info('Declaring %s exchange: %s', type, exchange)
158
        promise = self.client.exchange_declare(exchange=exchange,
159
                                               type=type,
160
                                               durable=True,
161
                                               auto_delete=False)
162
        self.client.wait(promise)
163
        self.exchanges.append((exchange, type))
164

    
165
    @reconnect_decorator
166
    def queue_declare(self, queue, exclusive=False,
167
                      mirrored=True, mirrored_nodes='all'):
168
        """Declare a queue
169

170
        @type queue: string
171
        @param queue: name of the queue
172
        @param mirrored: whether the queue will be mirrored to other brokers
173
        @param mirrored_nodes: the policy for the mirrored queue.
174
            Available policies:
175
                - 'all': The queue is mirrored to all nodes and the
176
                  master node is the one to which the client is
177
                  connected
178
                - a list of nodes. The queue will be mirrored only to
179
                  the specified nodes, and the master will be the
180
                  first node in the list. Node names must be provided
181
                  and not host IP. example: [node1@rabbit,node2@rabbit]
182

183
        """
184
        logger.info('Declaring queue: %s', queue)
185

    
186
        if mirrored:
187
            if mirrored_nodes == 'all':
188
                arguments = {'x-ha-policy': 'all'}
189
            elif isinstance(mirrored_nodes, list):
190
                arguments = {'x-ha-policy': 'nodes',
191
                           'x-ha-policy-params': mirrored_nodes}
192
            else:
193
                raise AttributeError
194
        else:
195
            arguments = {}
196

    
197
        promise = self.client.queue_declare(queue=queue, durable=True,
198
                                            exclusive=exclusive,
199
                                            auto_delete=False,
200
                                            arguments=arguments)
201
        self.client.wait(promise)
202

    
203
    def queue_bind(self, queue, exchange, routing_key):
204
        logger.debug('Binding queue %s to exchange %s with key %s'
205
                 % (queue, exchange, routing_key))
206
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
207
                                         routing_key=routing_key)
208
        self.client.wait(promise)
209

    
210
    @reconnect_decorator
211
    def basic_publish(self, exchange, routing_key, body):
212
        """Publish a message with a specific routing key """
213
        self._publish(exchange, routing_key, body)
214

    
215
        self.flush_buffer()
216

    
217
        if self.confirms and len(self.unacked) >= self.confirm_buffer:
218
            self.get_confirms()
219

    
220
    @reconnect_decorator
221
    def basic_publish_multi(self, exchange, routing_key, bodies):
222
        for body in bodies:
223
            self.unsend[body] = (exchange, routing_key)
224

    
225
        for body in bodies:
226
            self._publish(exchange, routing_key, body)
227
            self.unsend.pop(body)
228

    
229
        self.flush_buffer()
230

    
231
        if self.confirms:
232
            self.get_confirms()
233

    
234
    def _publish(self, exchange, routing_key, body):
235
        # Persisent messages by default!
236
        headers = {}
237
        headers['delivery_mode'] = 2
238
        promise = self.client.basic_publish(exchange=exchange,
239
                                            routing_key=routing_key,
240
                                            body=body, headers=headers)
241

    
242
        if self.confirms:
243
            self.unacked[promise] = (exchange, routing_key, body)
244

    
245
        return promise
246

    
247
    @reconnect_decorator
248
    def flush_buffer(self):
249
        while self.client.needs_write():
250
            self.client.on_write()
251

    
252
    @reconnect_decorator
253
    def get_confirms(self):
254
        for promise in self.unacked.keys():
255
            self.client.wait(promise)
256
            self.unacked.pop(promise)
257

    
258
    @reconnect_decorator
259
    def _resend_unacked_messages(self):
260
        """Resend unacked messages in case of a connection failure."""
261
        msgs = self.unacked.values()
262
        self.unacked.clear()
263
        for exchange, routing_key, body in msgs:
264
            logger.debug('Resending message %s' % body)
265
            self.basic_publish(exchange, routing_key, body)
266

    
267
    @reconnect_decorator
268
    def _resend_unsend_messages(self):
269
        """Resend unsend messages in case of a connection failure."""
270
        for body in self.unsend.keys():
271
            (exchange, routing_key) = self.unsend[body]
272
            self.basic_publish(exchange, routing_key, body)
273
            self.unsend.pop(body)
274

    
275
    @reconnect_decorator
276
    def basic_consume(self, queue, callback, prefetch_count=0):
277
        """Consume from a queue.
278

279
        @type queue: string or list of strings
280
        @param queue: the name or list of names from the queues to consume
281
        @type callback: function
282
        @param callback: the callback function to run when a message arrives
283

284
        """
285
        # Store the queues and the callback
286
        self.consumers[queue] = callback
287

    
288
        def handle_delivery(promise, msg):
289
            """Hide promises and messages without body"""
290
            if 'body' in msg:
291
                callback(self, msg)
292
            else:
293
                logger.debug("Message without body %s" % msg)
294
                raise socket_error
295

    
296
        consume_promise = \
297
                self.client.basic_consume(queue=queue,
298
                                          prefetch_count=prefetch_count,
299
                                          callback=handle_delivery)
300

    
301
        self.consume_promises.append(consume_promise)
302
        return consume_promise
303

    
304
    @reconnect_decorator
305
    def basic_wait(self, promise=None, timeout=0):
306
        """Wait for messages from the queues declared by basic_consume.
307

308
        This function will block until a message arrives from the queues that
309
        have been declared with basic_consume. If the optional arguments
310
        'promise' is given, only messages for this promise will be delivered.
311

312
        """
313
        if promise is not None:
314
            self.client.wait(promise, timeout)
315
        else:
316
            self.client.wait(self.consume_promises)
317

    
318
    @reconnect_decorator
319
    def basic_get(self, queue):
320
        """Get a single message from a queue.
321

322
        This is a non-blocking method for getting messages from a queue.
323
        It will return None if the queue is empty.
324

325
        """
326
        get_promise = self.client.basic_get(queue=queue)
327
        result = self.client.wait(get_promise)
328
        if 'empty' in result:
329
            # The queue is empty
330
            return None
331
        else:
332
            return result
333

    
334
    @reconnect_decorator
335
    def basic_ack(self, message):
336
        self.client.basic_ack(message)
337

    
338
    @reconnect_decorator
339
    def basic_nack(self, message):
340
        #TODO:
341
        pass
342

    
343
    def close(self):
344
        """Check that messages have been send and close the connection."""
345
        try:
346
            if self.confirms:
347
                self.get_confirms()
348
            close_promise = self.client.close()
349
            self.client.wait(close_promise)
350
        except (socket_error, spec_exceptions.ConnectionForced) as e:
351
            logger.error('Connection closed while closing connection:%s',
352
                          e)
353

    
354
    def queue_delete(self, queue, if_unused=True, if_empty=True):
355
        """Delete a queue.
356

357
        Returns False if the queue does not exist
358
        """
359
        try:
360
            promise = self.client.queue_delete(queue=queue,
361
                                               if_unused=if_unused,
362
                                               if_empty=if_empty)
363
            self.client.wait(promise)
364
            return True
365
        except spec_exceptions.NotFound:
366
            logger.info("Queue %s does not exist", queue)
367
            return False
368

    
369
    def exchange_delete(self, exchange, if_unused=True):
370
        """Delete an exchange."""
371
        try:
372

    
373
            promise = self.client.exchange_delete(exchange=exchange,
374
                                                  if_unused=if_unused)
375
            self.client.wait(promise)
376
            return True
377
        except spec_exceptions.NotFound:
378
            logger.info("Exchange %s does not exist", exchange)
379
            return False
380

    
381
    @reconnect_decorator
382
    def basic_cancel(self, promise=None):
383
        """Cancel consuming from a queue. """
384
        if promise is not None:
385
            self.client.basic_cancel(promise)
386
        else:
387
            for promise in self.consume_promises:
388
                self.client.basic_cancel(promise)
389

    
390

    
391
class AMQPConnectionError(Exception):
392
    pass