Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / amqp_puka.py @ 2ef10562

History | View | Annotate | Download (13.6 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
""" Module implementing connection and communication with an AMQP broker.
35

36
AMQP Client's implemented by this module silenty detect connection failures and
37
try to reconnect to any available broker. Also publishing takes advantage of
38
publisher-confirms in order to guarantee that messages are properly delivered
39
to the broker.
40

41
"""
42

    
43
import logging
44

    
45
from puka import Client
46
from puka import spec_exceptions
47
from socket import error as socket_error
48
from time import sleep
49
from random import shuffle
50
from functools import wraps
51
from ordereddict import OrderedDict
52
from synnefo import settings
53

    
54
logging.basicConfig(level=logging.DEBUG,
55
                    format="[%(levelname)s %(asctime)s] %(message)s")
56
logger = logging.getLogger()
57

    
58

    
59
def reconnect_decorator(func):
60
    """
61
    Decorator for persistent connection with one or more AMQP brokers.
62

63
    """
64
    @wraps(func)
65
    def wrapper(self, *args, **kwargs):
66
        try:
67
            return func(self, *args, **kwargs)
68
        except (socket_error, spec_exceptions.ConnectionForced) as e:
69
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
70
            self.consume_promises = []
71
            self.connect()
72

    
73
    return wrapper
74

    
75

    
76
class AMQPPukaClient(object):
77
    """
78
    AMQP generic client implementing most of the basic AMQP operations.
79

80
    """
81
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
82
                 confirms=True, confirm_buffer=100):
83
        """
84
        Format hosts as "amqp://username:pass@host:port"
85
        max_retries=0 defaults to unlimited retries
86

87
        """
88

    
89
        self.hosts = hosts
90
        shuffle(self.hosts)
91

    
92
        self.max_retries = max_retries
93
        self.confirms = confirms
94
        self.confirm_buffer = confirm_buffer
95

    
96
        self.connection = None
97
        self.channel = None
98
        self.consumers = {}
99
        self.unacked = OrderedDict()
100
        self.unsend = OrderedDict()
101
        self.consume_promises = []
102
        self.exchanges = []
103

    
104
    def connect(self, retries=0):
105
        if self.max_retries and retries >= self.max_retries:
106
            logger.error("Aborting after %d retries", retries)
107
            raise AMQPConnectionError('Aborting after %d connection failures.'\
108
                                      % retries)
109
            return
110

    
111
        # Pick up a host
112
        host = self.hosts.pop()
113
        self.hosts.insert(0, host)
114

    
115
        self.client = Client(host, pubacks=self.confirms)
116

    
117
        host = host.split('@')[-1]
118
        logger.debug('Connecting to node %s' % host)
119

    
120
        try:
121
            promise = self.client.connect()
122
            self.client.wait(promise)
123
        except socket_error as e:
124
            if retries < len(self.hosts):
125
                logger.warning('Cannot connect to host %s: %s', host, e)
126
            else:
127
                logger.error('Cannot connect to host %s: %s', host, e)
128
                sleep(1)
129
            return self.connect(retries + 1)
130

    
131
        logger.info('Successfully connected to host: %s', host)
132

    
133
        logger.info('Creating channel')
134

    
135
        if self.unacked:
136
            self._resend_unacked_messages()
137

    
138
        if self.unsend:
139
            self._resend_unsend_messages()
140

    
141
        if self.consumers:
142
            for queue, callback in self.consumers.items():
143
                self.basic_consume(queue, callback)
144

    
145
        if self.exchanges:
146
            exchanges = self.exchanges
147
            self.exchanges = []
148
            for exchange, type in exchanges:
149
                self.exchange_declare(exchange, type)
150

    
151
    def exchange_declare(self, exchange, type='direct'):
152
        """Declare an exchange
153
        @type exchange_name: string
154
        @param exchange_name: name of the exchange
155
        @type exchange_type: string
156
        @param exhange_type: one of 'direct', 'topic', 'fanout'
157

158
        """
159
        logger.info('Declaring %s exchange: %s', type, exchange)
160
        promise = self.client.exchange_declare(exchange=exchange,
161
                                               type=type,
162
                                               durable=True,
163
                                               auto_delete=False)
164
        self.client.wait(promise)
165
        self.exchanges.append((exchange, type))
166

    
167
    @reconnect_decorator
168
    def queue_declare(self, queue, exclusive=False,
169
                      mirrored=True, mirrored_nodes='all'):
170
        """Declare a queue
171

172
        @type queue: string
173
        @param queue: name of the queue
174
        @param mirrored: whether the queue will be mirrored to other brokers
175
        @param mirrored_nodes: the policy for the mirrored queue.
176
            Available policies:
177
                - 'all': The queue is mirrored to all nodes and the
178
                  master node is the one to which the client is
179
                  connected
180
                - a list of nodes. The queue will be mirrored only to
181
                  the specified nodes, and the master will be the
182
                  first node in the list. Node names must be provided
183
                  and not host IP. example: [node1@rabbit,node2@rabbit]
184

185
        """
186
        logger.info('Declaring queue: %s', queue)
187

    
188
        if mirrored:
189
            if mirrored_nodes == 'all':
190
                arguments = {'x-ha-policy': 'all'}
191
            elif isinstance(mirrored_nodes, list):
192
                arguments = {'x-ha-policy': 'nodes',
193
                           'x-ha-policy-params': mirrored_nodes}
194
            else:
195
                raise AttributeError
196
        else:
197
            arguments = {}
198

    
199
        promise = self.client.queue_declare(queue=queue, durable=True,
200
                                            exclusive=exclusive,
201
                                            auto_delete=False,
202
                                            arguments=arguments)
203
        self.client.wait(promise)
204

    
205
    def queue_bind(self, queue, exchange, routing_key):
206
        logger.debug('Binding queue %s to exchange %s with key %s'
207
                 % (queue, exchange, routing_key))
208
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
209
                                         routing_key=routing_key)
210
        self.client.wait(promise)
211

    
212
    @reconnect_decorator
213
    def basic_publish(self, exchange, routing_key, body):
214
        """Publish a message with a specific routing key """
215
        self._publish(exchange, routing_key, body)
216

    
217
        self.flush_buffer()
218

    
219
        if self.confirms and len(self.unacked) >= self.confirm_buffer:
220
            self.get_confirms()
221

    
222
    @reconnect_decorator
223
    def basic_publish_multi(self, exchange, routing_key, bodies):
224
        for body in bodies:
225
            self.unsend[body] = (exchange, routing_key)
226

    
227
        for body in bodies:
228
            self._publish(exchange, routing_key, body)
229
            self.unsend.pop(body)
230

    
231
        self.flush_buffer()
232

    
233
        if self.confirms:
234
            self.get_confirms()
235

    
236
    def _publish(self, exchange, routing_key, body):
237
        # Persisent messages by default!
238
        headers = {}
239
        headers['delivery_mode'] = 2
240
        promise = self.client.basic_publish(exchange=exchange,
241
                                            routing_key=routing_key,
242
                                            body=body, headers=headers)
243

    
244
        if self.confirms:
245
            self.unacked[promise] = (exchange, routing_key, body)
246

    
247
        return promise
248

    
249
    @reconnect_decorator
250
    def flush_buffer(self):
251
        while self.client.needs_write():
252
            self.client.on_write()
253

    
254
    @reconnect_decorator
255
    def get_confirms(self):
256
        for promise in self.unacked.keys():
257
            self.client.wait(promise)
258
            self.unacked.pop(promise)
259

    
260
    @reconnect_decorator
261
    def _resend_unacked_messages(self):
262
        """Resend unacked messages in case of a connection failure."""
263
        msgs = self.unacked.values()
264
        self.unacked.clear()
265
        for exchange, routing_key, body in msgs:
266
            logger.debug('Resending message %s' % body)
267
            self.basic_publish(exchange, routing_key, body)
268

    
269
    @reconnect_decorator
270
    def _resend_unsend_messages(self):
271
        """Resend unsend messages in case of a connection failure."""
272
        for body in self.unsend.keys():
273
            (exchange, routing_key) = self.unsend[body]
274
            self.basic_publish(exchange, routing_key, body)
275
            self.unsend.pop(body)
276

    
277
    @reconnect_decorator
278
    def basic_consume(self, queue, callback, prefetch_count=0):
279
        """Consume from a queue.
280

281
        @type queue: string or list of strings
282
        @param queue: the name or list of names from the queues to consume
283
        @type callback: function
284
        @param callback: the callback function to run when a message arrives
285

286
        """
287
        # Store the queues and the callback
288
        self.consumers[queue] = callback
289

    
290
        def handle_delivery(promise, msg):
291
            """Hide promises and messages without body"""
292
            if 'body' in msg:
293
                callback(self, msg)
294
            else:
295
                logger.debug("Message without body %s" % msg)
296
                raise socket_error
297

    
298
        consume_promise = \
299
                self.client.basic_consume(queue=queue,
300
                                          prefetch_count=prefetch_count,
301
                                          callback=handle_delivery)
302

    
303
        self.consume_promises.append(consume_promise)
304
        return consume_promise
305

    
306
    @reconnect_decorator
307
    def basic_wait(self, promise=None, timeout=0):
308
        """Wait for messages from the queues declared by basic_consume.
309

310
        This function will block until a message arrives from the queues that
311
        have been declared with basic_consume. If the optional arguments
312
        'promise' is given, only messages for this promise will be delivered.
313

314
        """
315
        if promise is not None:
316
            self.client.wait(promise, timeout)
317
        else:
318
            self.client.wait(self.consume_promises)
319

    
320
    @reconnect_decorator
321
    def basic_get(self, queue):
322
        """Get a single message from a queue.
323

324
        This is a non-blocking method for getting messages from a queue.
325
        It will return None if the queue is empty.
326

327
        """
328
        get_promise = self.client.basic_get(queue=queue)
329
        result = self.client.wait(get_promise)
330
        if 'empty' in result:
331
            # The queue is empty
332
            return None
333
        else:
334
            return result
335

    
336
    @reconnect_decorator
337
    def basic_ack(self, message):
338
        self.client.basic_ack(message)
339

    
340
    @reconnect_decorator
341
    def basic_nack(self, message):
342
        #TODO:
343
        pass
344

    
345
    def close(self):
346
        """Check that messages have been send and close the connection."""
347
        try:
348
            if self.confirms:
349
                self.get_confirms()
350
            close_promise = self.client.close()
351
            self.client.wait(close_promise)
352
        except (socket_error, spec_exceptions.ConnectionForced) as e:
353
            logger.error('Connection closed while closing connection:%s',
354
                          e)
355

    
356
    def queue_delete(self, queue, if_unused=True, if_empty=True):
357
        """Delete a queue.
358

359
        Returns False if the queue does not exist
360
        """
361
        try:
362
            promise = self.client.queue_delete(queue=queue,
363
                                               if_unused=if_unused,
364
                                               if_empty=if_empty)
365
            self.client.wait(promise)
366
            return True
367
        except spec_exceptions.NotFound:
368
            logger.info("Queue %s does not exist", queue)
369
            return False
370

    
371
    def exchange_delete(self, exchange, if_unused=True):
372
        """Delete an exchange."""
373
        try:
374

    
375
            promise = self.client.exchange_delete(exchange=exchange,
376
                                                  if_unused=if_unused)
377
            self.client.wait(promise)
378
            return True
379
        except spec_exceptions.NotFound:
380
            logger.info("Exchange %s does not exist", exchange)
381
            return False
382

    
383
    @reconnect_decorator
384
    def basic_cancel(self, promise=None):
385
        """Cancel consuming from a queue. """
386
        if promise is not None:
387
            self.client.basic_cancel(promise)
388
        else:
389
            for promise in self.consume_promises:
390
                self.client.basic_cancel(promise)
391

    
392

    
393
class AMQPConnectionError(Exception):
394
    pass