Statistics
| Branch: | Tag: | Revision:

root / snf-common / synnefo / lib / amqp_puka.py @ b1bb9251

History | View | Annotate | Download (14.2 kB)

1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
""" Module implementing connection and communication with an AMQP broker.
35

36
AMQP Client's implemented by this module silenty detect connection failures and
37
try to reconnect to any available broker. Also publishing takes advantage of
38
publisher-confirms in order to guarantee that messages are properly delivered
39
to the broker.
40

41
"""
42

    
43
import logging
44

    
45
from puka import Client
46
from puka import spec_exceptions
47
import socket
48
from socket import error as socket_error
49
from time import sleep
50
from random import shuffle
51
from functools import wraps
52
from ordereddict import OrderedDict
53
from synnefo import settings
54

    
55
logger = logging.getLogger()
56

    
57

    
58
def reconnect_decorator(func):
59
    """
60
    Decorator for persistent connection with one or more AMQP brokers.
61

62
    """
63
    @wraps(func)
64
    def wrapper(self, *args, **kwargs):
65
        try:
66
            return func(self, *args, **kwargs)
67
        except (socket_error, spec_exceptions.ConnectionForced) as e:
68
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
69
            self.connect()
70

    
71
    return wrapper
72

    
73

    
74
class AMQPPukaClient(object):
75
    """
76
    AMQP generic client implementing most of the basic AMQP operations.
77

78
    """
79
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
80
                 confirms=True, confirm_buffer=100):
81
        """
82
        Format hosts as "amqp://username:pass@host:port"
83
        max_retries=0 defaults to unlimited retries
84

85
        """
86

    
87
        self.hosts = hosts
88
        shuffle(self.hosts)
89

    
90
        self.max_retries = max_retries
91
        self.confirms = confirms
92
        self.confirm_buffer = confirm_buffer
93

    
94
        self.connection = None
95
        self.channel = None
96
        self.consumers = {}
97
        self.unacked = OrderedDict()
98
        self.unsend = OrderedDict()
99
        self.consume_promises = []
100
        self.exchanges = []
101

    
102
    def connect(self, retries=0):
103
        if self.max_retries and retries >= self.max_retries:
104
            logger.error("Aborting after %d retries", retries)
105
            raise AMQPConnectionError('Aborting after %d connection failures.'\
106
                                      % retries)
107
            return
108

    
109
        # Pick up a host
110
        host = self.hosts.pop()
111
        self.hosts.insert(0, host)
112

    
113
        self.client = Client(host, pubacks=self.confirms)
114

    
115
        host = host.split('@')[-1]
116
        logger.debug('Connecting to node %s' % host)
117

    
118
        try:
119
            promise = self.client.connect()
120
            self.client.wait(promise)
121
        except socket_error as e:
122
            if retries < len(self.hosts):
123
                logger.warning('Cannot connect to host %s: %s', host, e)
124
            else:
125
                logger.error('Cannot connect to host %s: %s', host, e)
126
                sleep(1)
127
            return self.connect(retries + 1)
128

    
129
        logger.info('Successfully connected to host: %s', host)
130

    
131
        # Setup TCP keepalive option
132
        self.client.sd.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
133
        # Keepalive time
134
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 20)
135
        # Keepalive interval
136
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 2)
137
        # Keepalive retry
138
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 10)
139

    
140
        logger.info('Creating channel')
141

    
142
        # Clear consume_promises each time connecting, since they are related
143
        # to the connection object
144
        self.consume_promises = []
145

    
146
        if self.unacked:
147
            self._resend_unacked_messages()
148

    
149
        if self.unsend:
150
            self._resend_unsend_messages()
151

    
152
        if self.consumers:
153
            for queue, callback in self.consumers.items():
154
                self.basic_consume(queue, callback)
155

    
156
        if self.exchanges:
157
            exchanges = self.exchanges
158
            self.exchanges = []
159
            for exchange, type in exchanges:
160
                self.exchange_declare(exchange, type)
161

    
162
    @reconnect_decorator
163
    def reconnect(self):
164
        self.close()
165
        self.connect()
166

    
167
    def exchange_declare(self, exchange, type='direct'):
168
        """Declare an exchange
169
        @type exchange_name: string
170
        @param exchange_name: name of the exchange
171
        @type exchange_type: string
172
        @param exhange_type: one of 'direct', 'topic', 'fanout'
173

174
        """
175
        logger.info('Declaring %s exchange: %s', type, exchange)
176
        promise = self.client.exchange_declare(exchange=exchange,
177
                                               type=type,
178
                                               durable=True,
179
                                               auto_delete=False)
180
        self.client.wait(promise)
181
        self.exchanges.append((exchange, type))
182

    
183
    @reconnect_decorator
184
    def queue_declare(self, queue, exclusive=False,
185
                      mirrored=True, mirrored_nodes='all'):
186
        """Declare a queue
187

188
        @type queue: string
189
        @param queue: name of the queue
190
        @param mirrored: whether the queue will be mirrored to other brokers
191
        @param mirrored_nodes: the policy for the mirrored queue.
192
            Available policies:
193
                - 'all': The queue is mirrored to all nodes and the
194
                  master node is the one to which the client is
195
                  connected
196
                - a list of nodes. The queue will be mirrored only to
197
                  the specified nodes, and the master will be the
198
                  first node in the list. Node names must be provided
199
                  and not host IP. example: [node1@rabbit,node2@rabbit]
200

201
        """
202
        logger.info('Declaring queue: %s', queue)
203

    
204
        if mirrored:
205
            if mirrored_nodes == 'all':
206
                arguments = {'x-ha-policy': 'all'}
207
            elif isinstance(mirrored_nodes, list):
208
                arguments = {'x-ha-policy': 'nodes',
209
                           'x-ha-policy-params': mirrored_nodes}
210
            else:
211
                raise AttributeError
212
        else:
213
            arguments = {}
214

    
215
        promise = self.client.queue_declare(queue=queue, durable=True,
216
                                            exclusive=exclusive,
217
                                            auto_delete=False,
218
                                            arguments=arguments)
219
        self.client.wait(promise)
220

    
221
    def queue_bind(self, queue, exchange, routing_key):
222
        logger.debug('Binding queue %s to exchange %s with key %s'
223
                 % (queue, exchange, routing_key))
224
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
225
                                         routing_key=routing_key)
226
        self.client.wait(promise)
227

    
228
    @reconnect_decorator
229
    def basic_publish(self, exchange, routing_key, body):
230
        """Publish a message with a specific routing key """
231
        self._publish(exchange, routing_key, body)
232

    
233
        self.flush_buffer()
234

    
235
        if self.confirms and len(self.unacked) >= self.confirm_buffer:
236
            self.get_confirms()
237

    
238
    @reconnect_decorator
239
    def basic_publish_multi(self, exchange, routing_key, bodies):
240
        for body in bodies:
241
            self.unsend[body] = (exchange, routing_key)
242

    
243
        for body in bodies:
244
            self._publish(exchange, routing_key, body)
245
            self.unsend.pop(body)
246

    
247
        self.flush_buffer()
248

    
249
        if self.confirms:
250
            self.get_confirms()
251

    
252
    def _publish(self, exchange, routing_key, body):
253
        # Persisent messages by default!
254
        headers = {}
255
        headers['delivery_mode'] = 2
256
        promise = self.client.basic_publish(exchange=exchange,
257
                                            routing_key=routing_key,
258
                                            body=body, headers=headers)
259

    
260
        if self.confirms:
261
            self.unacked[promise] = (exchange, routing_key, body)
262

    
263
        return promise
264

    
265
    @reconnect_decorator
266
    def flush_buffer(self):
267
        while self.client.needs_write():
268
            self.client.on_write()
269

    
270
    @reconnect_decorator
271
    def get_confirms(self):
272
        for promise in self.unacked.keys():
273
            self.client.wait(promise)
274
            self.unacked.pop(promise)
275

    
276
    @reconnect_decorator
277
    def _resend_unacked_messages(self):
278
        """Resend unacked messages in case of a connection failure."""
279
        msgs = self.unacked.values()
280
        self.unacked.clear()
281
        for exchange, routing_key, body in msgs:
282
            logger.debug('Resending message %s' % body)
283
            self.basic_publish(exchange, routing_key, body)
284

    
285
    @reconnect_decorator
286
    def _resend_unsend_messages(self):
287
        """Resend unsend messages in case of a connection failure."""
288
        for body in self.unsend.keys():
289
            (exchange, routing_key) = self.unsend[body]
290
            self.basic_publish(exchange, routing_key, body)
291
            self.unsend.pop(body)
292

    
293
    @reconnect_decorator
294
    def basic_consume(self, queue, callback, prefetch_count=0):
295
        """Consume from a queue.
296

297
        @type queue: string or list of strings
298
        @param queue: the name or list of names from the queues to consume
299
        @type callback: function
300
        @param callback: the callback function to run when a message arrives
301

302
        """
303
        # Store the queues and the callback
304
        self.consumers[queue] = callback
305

    
306
        def handle_delivery(promise, msg):
307
            """Hide promises and messages without body"""
308
            if 'body' in msg:
309
                callback(self, msg)
310
            else:
311
                logger.debug("Message without body %s" % msg)
312
                raise socket_error
313

    
314
        consume_promise = \
315
                self.client.basic_consume(queue=queue,
316
                                          prefetch_count=prefetch_count,
317
                                          callback=handle_delivery)
318

    
319
        self.consume_promises.append(consume_promise)
320
        return consume_promise
321

    
322
    @reconnect_decorator
323
    def basic_wait(self, promise=None, timeout=0):
324
        """Wait for messages from the queues declared by basic_consume.
325

326
        This function will block until a message arrives from the queues that
327
        have been declared with basic_consume. If the optional arguments
328
        'promise' is given, only messages for this promise will be delivered.
329

330
        """
331
        if promise is not None:
332
            return self.client.wait(promise, timeout)
333
        else:
334
            return self.client.wait(self.consume_promises, timeout)
335

    
336
    @reconnect_decorator
337
    def basic_get(self, queue):
338
        """Get a single message from a queue.
339

340
        This is a non-blocking method for getting messages from a queue.
341
        It will return None if the queue is empty.
342

343
        """
344
        get_promise = self.client.basic_get(queue=queue)
345
        result = self.client.wait(get_promise)
346
        if 'empty' in result:
347
            # The queue is empty
348
            return None
349
        else:
350
            return result
351

    
352
    @reconnect_decorator
353
    def basic_ack(self, message):
354
        self.client.basic_ack(message)
355

    
356
    @reconnect_decorator
357
    def basic_nack(self, message):
358
        #TODO:
359
        pass
360

    
361
    def close(self):
362
        """Check that messages have been send and close the connection."""
363
        logger.debug("Closing connection to %s", self.client.host)
364
        try:
365
            if self.confirms:
366
                self.get_confirms()
367
            close_promise = self.client.close()
368
            self.client.wait(close_promise)
369
        except (socket_error, spec_exceptions.ConnectionForced) as e:
370
            logger.error('Connection closed while closing connection:%s',
371
                          e)
372

    
373
    def queue_delete(self, queue, if_unused=True, if_empty=True):
374
        """Delete a queue.
375

376
        Returns False if the queue does not exist
377
        """
378
        try:
379
            promise = self.client.queue_delete(queue=queue,
380
                                               if_unused=if_unused,
381
                                               if_empty=if_empty)
382
            self.client.wait(promise)
383
            return True
384
        except spec_exceptions.NotFound:
385
            logger.info("Queue %s does not exist", queue)
386
            return False
387

    
388
    def exchange_delete(self, exchange, if_unused=True):
389
        """Delete an exchange."""
390
        try:
391

    
392
            promise = self.client.exchange_delete(exchange=exchange,
393
                                                  if_unused=if_unused)
394
            self.client.wait(promise)
395
            return True
396
        except spec_exceptions.NotFound:
397
            logger.info("Exchange %s does not exist", exchange)
398
            return False
399

    
400
    @reconnect_decorator
401
    def basic_cancel(self, promise=None):
402
        """Cancel consuming from a queue. """
403
        if promise is not None:
404
            self.client.basic_cancel(promise)
405
        else:
406
            for promise in self.consume_promises:
407
                self.client.basic_cancel(promise)
408

    
409

    
410
class AMQPConnectionError(Exception):
411
    pass