Revision db400d82

b/snf-common/synnefo/lib/amqp.py
33 33

  
34 34
""" Module implementing connection and communication with an AMQP broker.
35 35

  
36
AMQP Client's implemented by this module silenty detect connection failures and
36
AMQP Client's instatiated by this module silenty detect connection failures and
37 37
try to reconnect to any available broker. Also publishing takes advantage of
38 38
publisher-confirms in order to guarantee that messages are properly delivered
39 39
to the broker.
40 40

  
41 41
"""
42 42

  
43
import puka
44
import logging
45
import socket
46

  
47
from time import sleep
48
from random import shuffle
49
from functools import wraps
50

  
51
from ordereddict import OrderedDict
52 43
from synnefo import settings
53 44

  
54
AMQP_HOSTS = settings.AMQP_HOSTS
55

  
56
MAX_RETRIES = 20
45
if settings.AMQP_BACKEND == 'puka':
46
    from amqp_puka import AMQPPukaClient as Client
47
elif settings.AMQP_BACKEND == 'haigha':
48
    from amqp_haigha import AMQPHaighaClient as Client
49
else:
50
    raise Exception('Unknown Backend %s' % settings.AMQP_BACKEND)
57 51

  
58
log = logging.getLogger()
59

  
60

  
61
def reconnect_decorator(func):
62
    """
63
    Decorator for persistent connection with one or more AMQP brokers.
64 52

  
65
    """
66
    @wraps(func)
67
    def wrapper(self, *args, **kwargs):
68
        try:
69
            if self.client.sd is None:
70
                self.connect()
71
            return func(self, *args, **kwargs)
72
        except (socket.error, puka.spec_exceptions.ConnectionForced):
73
            log.debug("Connection closed. Reconnecting")
74
            self.connect()
75
            return wrapper(self, *args, **kwargs)
76
        except Exception:
77
            if self.client.sd is None:
78
                log.debug("Connection closed. Reconnecting")
79
                self.connect()
80
                return wrapper(self, *args, **kwargs)
81
            else:
82
                raise
83
    return wrapper
84

  
85

  
86
class AMQPClient():
53
class AMQPClient(object):
87 54
    """
88 55
    AMQP generic client implementing most of the basic AMQP operations.
89 56

  
90
    This client confirms delivery of each published message before publishing
91
    the next one, which results in low performance. Better performance can be
92
    achieved by using AMQPAsyncClient.
93

  
57
    This class will create an object of AMQPPukaClient or AMQPHaigha client
58
    depending on AMQP_BACKEND setting
94 59
    """
95
    def __init__(self, hosts=AMQP_HOSTS, max_retries=MAX_RETRIES):
96
        """Format hosts as "amqp://username:pass@host:port" """
97
        # Shuffle the elements of the host list for better load balancing
98
        self.hosts = hosts
99
        shuffle(self.hosts)
100
        self.max_retries = max_retries
101

  
102
        self.promises = []
103
        self.consume_promises = []
104
        self.consume_info = {}
105

  
106
    def connect(self, retries=0):
107
        # Pick up a host
108
        url = self.hosts.pop()
109
        self.hosts.insert(0, url)
110

  
111
        if retries > self.max_retries:
112
            raise AMQPError("Cannot connect to any node after %s attemps" % retries)
113
        if retries > 2 * len(self.hosts):
114
            sleep(1)
115

  
116
        self.client = puka.Client(url, pubacks=True)
117

  
118
        host = url.split('@')[-1]
119
        log.debug('Connecting to node %s' % host)
120

  
121
        try:
122
            promise = self.client.connect()
123
            self.client.wait(promise)
124
        except socket.error as e:
125
            log.debug('Cannot connect to node %s.' % host)
126
            return self.connect(retries+1)
127

  
128
    @reconnect_decorator
129
    def exchange_declare(self, exchange_name, exchange_type='direct',
130
                         durable=True, auto_delete=False):
131
        """Declare an exchange
132
        @type exchange_name: string
133
        @param exchange_name: name of the exchange
134
        @type exchange_type: string
135
        @param exhange_type: one of 'direct', 'topic', 'fanout'
136

  
137
        """
138
        log.debug('Declaring exchange %s of %s type.'
139
                  %(exchange_name, exchange_type))
140
        promise = self.client.exchange_declare(exchange=exchange_name,
141
                                               type=exchange_type,
142
                                               durable=durable,
143
                                               auto_delete=auto_delete)
144
        self.client.wait(promise)
145
        log.debug('Exchange %s declared succesfully ' % exchange_name)
146

  
147
    @reconnect_decorator
148
    def queue_declare(self, queue, durable=True, exclusive=False,
149
                      auto_delete=False, mirrored=True, mirrored_nodes='all'):
150
        """Declare a queue
151

  
152
        @type queue: string
153
        @param queue: name of the queue
154
        @param mirrored: whether the queue will be mirrored to other brokers
155
        @param mirrored_nodes: the policy for the mirrored queue.
156
            Available policies:
157
                - 'all': The queue is mirrored to all nodes and the
158
                  master node is the one to which the client is
159
                  connected
160
                - a list of nodes. The queue will be mirrored only to
161
                  the specified nodes, and the master will be the
162
                  first node in the list. Node names must be provided
163
                  and not host IP. example: [node1@rabbit,node2@rabbit]
164

  
165
        """
166

  
167
        log.debug('Declaring queue %s' % queue)
168
        if mirrored:
169
            if mirrored_nodes == 'all':
170
                arguments={'x-ha-policy':'all'}
171
            elif isinstance(mirrored_nodes, list):
172
                arguments={'x-ha-policy':'nodes', 'x-ha-policy-params':mirrored_nodes}
173
            else:
174
                raise AttributeError
175
        else:
176
            arguments = {}
177

  
178
        promise = self.client.queue_declare(queue=queue, durable=durable,
179
                                            exclusive=exclusive,
180
                                            auto_delete=auto_delete,
181
                                            arguments=arguments)
182
        self.client.wait(promise)
183
        log.debug('Queue %s declared successfully.' % queue)
184

  
185
    @reconnect_decorator
186
    def queue_bind(self, queue, exchange, routing_key):
187
        log.debug('Binding queue %s to exchange %s with key %s'
188
                 % (queue, exchange, routing_key))
189
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
190
                                         routing_key=routing_key)
191
        self.client.wait(promise)
192
        log.debug('Binding completed successfully')
193

  
194
    def basic_publish_multi(self, exhange, routing_key, msgs, headers={}):
195
        """Send many messages to the  same exchange and with the same key """
196
        for msg in msgs:
197
            self.basic_publish(exchange, routing_key, headers, msg)
198

  
199
    @reconnect_decorator
200
    def basic_publish(self, exchange, routing_key, body, headers={}):
201
        """Publish a message with a specific routing key """
202

  
203
        # Persisent messages by default!
204
        if not 'delivery_mode' in headers:
205
            headers['delivery_mode'] = 2
206

  
207
        promise = self.client.basic_publish(exchange=exchange,
208
                                            routing_key=routing_key,
209
                                            body=body, headers=headers)
210
        self.client.wait(promise)
211

  
212
    @reconnect_decorator
213
    def basic_consume(self, queue, callback, prefetch_count=0):
214
        """Consume from a queue.
215

  
216
        @type queue: string or list of strings
217
        @param queue: the name or list of names from the queues to consume
218
        @type callback: function
219
        @param callback: the callback function to run when a message arrives
220

  
221
        """
222
        if isinstance(queue, str):
223
            queue = [queue]
224
        elif isinstance(queue, list):
225
            pass
226
        else:
227
            raise AttributeError
228

  
229
        # Store the queues and the callback
230
        for q in queue:
231
            self.consume_info[q] = callback
232

  
233
        def handle_delivery(promise, result):
234
            """Hide promises and messages without body"""
235
            if 'body' in result:
236
                callback(self, result)
237
            else:
238
                log.debug("Message without body %s" % result)
239
                raise socket.error
240

  
241
        consume_promise = \
242
                self.client.basic_consume_multi(queues=queue,
243
                                                prefetch_count=prefetch_count,
244
                                                callback=handle_delivery)
245
        self.consume_promises.append(consume_promise)
246
        return consume_promise
247

  
248
    def basic_wait(self, promise=None, timeout=0):
249
        """Wait for messages from the queues declared by basic_consume.
250

  
251
        This function will block until a message arrives from the queues that
252
        have been declared with basic_consume. If the optional arguments
253
        'promise' is given, only messages for this promise will be delivered.
254

  
255
        """
256
        try:
257
            if promise is not None:
258
               self.client.wait(promise, timeout)
259
            else:
260
               self.client.wait(self.consume_promises)
261
        except (socket.error, puka.spec_exceptions.ConnectionForced):
262
            log.debug('Connection closed while receiving messages.')
263
            self.consume_promises = []
264
            try:
265
                self.client.close()
266
            except:
267
                pass
268
            self.connect()
269
            for queues, callback in self.consume_info.items():
270
                self.basic_consume(queues, callback)
271
            self.basic_wait(timeout)
272
        except Exception as e:
273
            if self.client.sd is None:
274
                log.debug('Connection closed while receiving messages.')
275
                self.consume_promises = []
276
                self.connect()
277
                for queues, callback in self.consume_info.items():
278
                    self.basic_consume(queues, callback)
279
                self.basic_wait(timeout)
280
            else:
281
                log.error("Exception while waiting for messages ",e)
282
                raise
283

  
284
    def basic_cancel(self, promise=None):
285
        """Cancel consuming from a queue. """
286
        try:
287
            if promise is not None:
288
                self.client.basic_cancel(promise)
289
            else:
290
                for promise in self.consume_promises:
291
                    self.client.basic_cancel(promise)
292
        except (socket.error, puka.spec_exceptions.ConnectionForced):
293
            pass
294
        except Exception as e:
295
            if self.client.sd is None:
296
                pass;
297
            else:
298
                log.error("Exception while canceling client ",e)
299
                raise
300

  
301

  
302
    @reconnect_decorator
303
    def basic_get(self, queue):
304
        """Get a single message from a queue.
305

  
306
        This is a non-blocking method for getting messages from a queue.
307
        It will return None if the queue is empty.
308

  
309
        """
310
        get_promise = self.client.basic_get(queue=queue)
311
        result = self.client.wait(get_promise)
312
        if 'empty' in result:
313
            # The queue is empty
314
            return None
315
        else:
316
            return result
317

  
318
    def basic_ack(self, message):
319
        """Acknowledge a message. """
320
        try:
321
            self.client.basic_ack(message)
322
            return True
323
        except (socket.error, puka.spec_exceptions.ConnectionForced):
324
            return False
325
        except Exception as e:
326
            if self.client.sd is None:
327
                return False
328
            else:
329
                log.error("Exception while acknowleding message ",e)
330
                raise
331

  
332
    def close(self):
333
        """Close the connection with the AMQP broker. """
334
        try:
335
            close_promise = self.client.close()
336
            self.client.wait(close_promise)
337
        except (socket.error, puka.spec_exceptions.ConnectionForced):
338
            pass
339

  
340
    def queue_delete(self, queue, if_unused=False, if_empty=False):
341
        """Delete a queue.
342

  
343
        Returns False if the queue does not exist
344
        """
345
        try:
346
            promise = self.client.queue_delete(queue=queue, if_unused=if_unused,
347
                                               if_empty=if_empty)
348
            self.client.wait(promise)
349
            return True
350
        except puka.spec_exceptions.NotFound:
351
            log.debug("Queue %s does not exist", queue)
352
            return False
353

  
354
    def exchange_delete(self, exchange, if_unused=False):
355
        """Delete an exchange."""
356
        try:
357

  
358
            promise = self.client.exchange_delete(exchange=exchange,
359
                                                  if_unused=if_unused)
360
            self.client.wait(promise)
361
            return True
362
        except puka.spec_exceptions.NotFound:
363
            log.debug("Exchange %s does not exist", exchange)
364
            return False
365

  
366

  
367

  
368
class AMQPAsyncClient(AMQPClient):
369
    """AMQP client implementing asynchronous sending of messages.
370

  
371
    This client is more efficient that AMQPClient in sending large number
372
    of messages. Messages are confirmed to be sent to the broker in batches
373
    of a size specified by the 'max_buffer' argument.
374

  
375
    Messages are kept to an internal buffer until the max_buffer messages are
376
    sent or until the connection closes. Explicit delivering of messages can be
377
    achieved by calling 'wait_for_promises' method.
378

  
379
    Always remember to close the connection, or messages may be lost.
380

  
381
    """
382
    def __init__(self, hosts=AMQP_HOSTS, max_buffer=5000,
383
                 max_retries=MAX_RETRIES):
384
        AMQPClient.__init__(self, hosts, max_retries)
385
        self.published_msgs = OrderedDict()
386
        self._promise_counter = 0
387
        self.max_buffer=max_buffer
388

  
389
    def basic_publish_multi(self, exhange, routing_key, msgs, headers={}):
390
        while msgs:
391
            msg = msgs.pop[0]
392
            self.basic_publish(exchange, routing_key, msg, headers)
393

  
394
    def basic_publish(self, exchange, routing_key, body, headers={}):
395
        """Publish a message.
396

  
397
        The message will not be actually published to the broker until
398
        'max_buffer' messages are published or wait_for_promises is called.
399

  
400
        """
401
        try:
402
            if not 'delivery_mode' in headers:
403
                headers['delivery_mode'] = 2
404

  
405
            promise = self.client.basic_publish(exchange=exchange,
406
                                                routing_key=routing_key,
407
                                                body=body,
408
                                                headers=headers)
409

  
410
            self._promise_counter += 1
411
            self.published_msgs[promise] = {'exchange':exchange,
412
                                            'routing_key':routing_key,
413
                                            'body':body,
414
                                            'headers':headers}
415

  
416
            if self._promise_counter > self.max_buffer:
417
                self.wait_for_promises()
418

  
419
        except (socket.error, puka.spec_exceptions.ConnectionForced):
420
            log.debug('Connection closed while sending message %s.\
421
                      Reconnecting and retrying' % body)
422
            self.connect()
423
            self.basic_publish(exchange, routing_key, body, headers)
424
            return self._retry_publish_msgs()
425
        except Exception as e:
426
            if self.client.sd is None:
427
                log.debug('Connection closed while sending message %s.\
428
                             Reconnecting and retrying' % body)
429
                self.connect()
430
                self.basic_publish(exchange, routing_key, body, headers)
431
                return self._retry_publish_msgs()
432
            else:
433
                log.error("Exception while publishing message ",e)
434
                raise
435

  
436
    def wait_for_promises(self):
437
        """Wait for confirm that all messages are sent."""
438
        try:
439
            promises = self.published_msgs.keys()
440
            for promise in promises:
441
                self.client.wait(promise)
442
                self.published_msgs.pop(promise)
443
            self._promise_counter = 0
444
        except (socket.error, puka.spec_exceptions.ConnectionForced):
445
            log.debug('Connection closed while waiting from promises')
446
            self.connect()
447
            self._retry_publish_msgs()
448
        except Exception as e:
449
            if self.client.sd is None:
450
                log.debug('Connection closed while waiting from promises')
451
                self.connect()
452
                self._retry_publish_msgs()
453
            else:
454
                log.error("Exception while waiting for promises ",e)
455
                raise
456

  
457
    def _retry_publish_msgs(self):
458
        """Resend messages in case of a connection failure."""
459
        values = self.published_msgs.values()
460
        self.published_msgs = OrderedDict()
461
        for message in values:
462
            exchange = message['exchange']
463
            key = message['routing_key']
464
            body = message['body']
465
            headers = message['headers']
466
            log.debug('Resending message %s' % body)
467
            self.basic_publish(exchange, key, body, headers)
468

  
469
    def close(self):
470
        """Check that messages have been send and close the connection."""
471
        try:
472
            self.wait_for_promises()
473
            close_promise = self.client.close()
474
            self.client.wait(close_promise)
475
        except (socket.error, puka.spec_exceptions.ConnectionForced):
476
            pass
477
        except Exception as e:
478
            if self.client.sd is None:
479
                pass
480
            else:
481
                log.error("Exception while closing the connection ",e)
482
                raise
483

  
484
    def flush_buffer(self):
485
        try:
486
            self.client.on_write()
487
        except (socket.error, puka.spec_exceptions.ConnectionForced):
488
            log.debug('Connection closed while clearing buffer')
489
            self.connect()
490
            return self._retry_publish_msgs()
491
        except Exception as e:
492
            if self.client.sd is None:
493
                log.debug('Connection closed while clearing buffer')
494
                self.connect()
495
                return self._retry_publish_msgs()
496
            else:
497
                log.error("Exception while clearing buffer ",e)
498
                raise
499

  
500
class AMQPConsumer(AMQPClient):
501
    """AMQP client implementing a consumer without callbacks.
502

  
503
    """
504
    def __init__(self, hosts=AMQP_HOSTS, max_buffer=5000,
505
                 max_retries=MAX_RETRIES):
506
        AMQPClient.__init__(self, hosts, max_retries)
507
        self.consume_queues = []
508
        self.consume_promises = []
509

  
510
    @reconnect_decorator
511
    def basic_consume(self, queue, prefetch_count=0):
512
        """Consume from a queue.
513

  
514
        @type queue: string or list of strings
515
        @param queue: the name or list of names from the queues to consume
516
        @type callback: function
517
        @param callback: the callback function to run when a message arrives
518

  
519
        """
520
        if isinstance(queue, str):
521
            queue = [queue]
522
        elif isinstance(queue, list):
523
            pass
524
        else:
525
            raise AttributeError
526

  
527
        # Store the queues and the callback
528
        for q in queue:
529
            self.consume_queues.append(q)
530

  
531
        consume_promise = \
532
                self.client.basic_consume_multi(queues=queue,
533
                                                prefetch_count=prefetch_count)
534
        self.consume_promises.append(consume_promise)
535
        return consume_promise
536

  
537
    def basic_wait(self, promise=None, timeout=0):
538
        """Wait for messages from the queues declared by basic_consume.
539

  
540
        This function will block until a message arrives from the queues that
541
        have been declared with basic_consume. If the optional arguments
542
        'promise' is given, only messages for this promise will be delivered.
543

  
544
        """
545
        try:
546
            if promise is not None:
547
               return self.client.wait(promise, timeout)
548
            else:
549
               return self.client.wait(self.consume_promises)
550
        except (socket.error, puka.spec_exceptions.ConnectionForced):
551
            log.debug('Connection closed while receiving messages.')
552
            self.consume_promises = []
553
            try:
554
                self.client.close()
555
            except:
556
                pass
557
            self.connect()
558
            for queues in self.consume_queues:
559
                self.basic_consume(queues)
560
            self.basic_wait(timeout)
561
        except Exception as e:
562
            if self.client.sd is None:
563
                log.debug('Connection closed while receiving messages.')
564
                self.consume_promises = []
565
                self.connect()
566
                for queues in self.consume_queues:
567
                    self.basic_consume(queues)
568
                self.basic_wait(timeout)
569
            else:
570
                log.error("Exception while waiting for messages ",e)
571
                raise
572

  
573

  
574
class AMQPError(Exception):
575
    def __init__(self, msg):
576
        self.msg = msg
577
    def __str__(self):
578
        return repr(self.msg)
60
    def __new__(cls, *args, **kwargs):
61
        return Client(*args, **kwargs)
b/snf-common/synnefo/lib/amqp_haigha.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from haigha.connections import RabbitConnection
35
from haigha.message import Message
36
from haigha import exceptions
37
from random import shuffle
38
from time import sleep
39
import logging
40
import socket
41
from synnefo import settings
42
from ordereddict import OrderedDict
43
import gevent
44
from gevent import monkey
45
from functools import wraps
46

  
47

  
48
logging.basicConfig(level=logging.INFO, format="[%(levelname)s %(asctime)s] %(message)s" )
49
logger = logging.getLogger('haigha')
50

  
51
sock_opts = {
52
  (socket.IPPROTO_TCP, socket.TCP_NODELAY): 1,
53
}
54

  
55

  
56
def reconnect_decorator(func):
57
    """
58
    Decorator for persistent connection with one or more AMQP brokers.
59

  
60
    """
61
    @wraps(func)
62
    def wrapper(self, *args, **kwargs):
63
        try:
64
            func(self, *args, **kwargs)
65
        except (socket.error, exceptions.ConnectionError) as e:
66
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
67
            self.connect()
68

  
69
    return wrapper
70

  
71

  
72
class AMQPHaighaClient():
73
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
74
                 confirms=True, confirm_buffer=200):
75
        self.hosts = hosts
76
        shuffle(self.hosts)
77

  
78
        self.max_retries = max_retries
79
        self.confirms = confirms
80
        self.confirm_buffer = confirm_buffer
81

  
82
        self.connection = None
83
        self.channel = None
84
        self.consumers = {}
85
        self.unacked = OrderedDict()
86

  
87
    def connect(self, retries=0):
88
        if retries > self.max_retries:
89
            logger.error("Aborting after %s retries", retries - 1)
90
            raise AMQPConnectionError('Aborting after %d connection failures.'\
91
                                      % (retries - 1))
92
            return
93

  
94
        # Pick up a host
95
        host = self.hosts.pop()
96
        self.hosts.insert(0, host)
97

  
98
        #Patch gevent
99
        monkey.patch_all()
100

  
101
        try:
102
            self.connection = \
103
                 RabbitConnection(logger=logger, debug=True,
104
                      user='rabbit', password='r@bb1t',
105
                      vhost='/', host=host,
106
                      heartbeat=None,
107
                      sock_opts=sock_opts,
108
                      transport='gevent')
109
        except socket.error as e:
110
            logger.error('Cannot connect to host %s: %s', host, e)
111
            if retries > 2 * len(self.hosts):
112
                sleep(1)
113
            return self.connect(retries + 1)
114

  
115
        logger.info('Successfully connected to host: %s', host)
116

  
117
        logger.info('Creating channel')
118
        self.channel = self.connection.channel()
119

  
120
        if self.confirms:
121
            self._confirm_select()
122

  
123
        if self.unacked:
124
            self._resend_unacked_messages()
125

  
126
        if self.consumers:
127
            for queue, callback in self.consumers.items():
128
                self.basic_consume(queue, callback)
129

  
130
    def exchange_declare(self, exchange, type='direct'):
131
        """Declare an exchange
132
        @type exchange_name: string
133
        @param exchange_name: name of the exchange
134
        @type exchange_type: string
135
        @param exhange_type: one of 'direct', 'topic', 'fanout'
136

  
137
        """
138

  
139
        logger.info('Declaring %s exchange: %s', type, exchange)
140
        self.channel.exchange.declare(exchange, type,
141
                                      auto_delete=False, durable=True)
142

  
143
    def queue_declare(self, queue, exclusive=False, mirrored=True,
144
                      mirrored_nodes='all'):
145
        """Declare a queue
146

  
147
        @type queue: string
148
        @param queue: name of the queue
149
        @param mirrored: whether the queue will be mirrored to other brokers
150
        @param mirrored_nodes: the policy for the mirrored queue.
151
            Available policies:
152
                - 'all': The queue is mirrored to all nodes and the
153
                  master node is the one to which the client is
154
                  connected
155
                - a list of nodes. The queue will be mirrored only to
156
                  the specified nodes, and the master will be the
157
                  first node in the list. Node names must be provided
158
                  and not host IP. example: [node1@rabbit,node2@rabbit]
159

  
160
        """
161

  
162
        logger.info('Declaring queue: %s', queue)
163
        if mirrored:
164
            if mirrored_nodes == 'all':
165
                arguments = {'x-ha-policy': 'all'}
166
            elif isinstance(mirrored_nodes, list):
167
                arguments = {'x-ha-policy': 'nodes',
168
                             'x-ha-policy-params': mirrored_nodes}
169
            else:
170
                raise AttributeError
171
        else:
172
            arguments = {}
173

  
174
        self.channel.queue.declare(queue, durable=True, exclusive=exclusive,
175
                                   auto_delete=False, arguments=arguments)
176

  
177
    def queue_bind(self, queue, exchange, routing_key):
178
        logger.info('Binding queue %s to exchange %s with key %s', queue,
179
                    exchange, routing_key)
180
        self.channel.queue.bind(queue=queue, exchange=exchange,
181
                                routing_key=routing_key)
182

  
183
    def _confirm_select(self):
184
        logger.info('Setting channel to confirm mode')
185
        self.channel.confirm.select()
186
        self.channel.basic.set_ack_listener(self._ack_received)
187
        self.channel.basic.set_nack_listener(self._nack_received)
188

  
189
    @reconnect_decorator
190
    def basic_publish(self, exchange, routing_key, body):
191
        msg = Message(body, delivery_mode=2)
192
        mid = self.channel.basic.publish(msg, exchange, routing_key)
193
        if self.confirms:
194
            self.unacked[mid] = (exchange, routing_key, body)
195
            if len(self.unacked) > self.confirm_buffer:
196
                self.get_confirms()
197

  
198
        logger.debug('Published message %s with id %s', body, mid)
199

  
200
    @reconnect_decorator
201
    def get_confirms(self):
202
        self.connection.read_frames()
203

  
204
    @reconnect_decorator
205
    def _resend_unacked_messages(self):
206
        msgs = self.unacked.values()
207
        self.unacked.clear()
208
        for exchange, routing_key, body in msgs:
209
            logger.debug('Resending message %s', body)
210
            self.basic_publish(exchange, routing_key, body)
211

  
212
    @reconnect_decorator
213
    def _ack_received(self, mid):
214
        print mid
215
        logger.debug('Received ACK for message with id %s', mid)
216
        self.unacked.pop(mid)
217

  
218
    @reconnect_decorator
219
    def _nack_received(self, mid):
220
        logger.error('Received NACK for message with id %s. Retrying.', mid)
221
        (exchange, routing_key, body) = self.unacked[mid]
222
        self.basic_publish(exchange, routing_key, body)
223

  
224
    def basic_consume(self, queue, callback):
225
        """Consume from a queue.
226

  
227
        @type queue: string or list of strings
228
        @param queue: the name or list of names from the queues to consume
229
        @type callback: function
230
        @param callback: the callback function to run when a message arrives
231

  
232
        """
233

  
234
        self.consumers[queue] = callback
235
        self.channel.basic.consume(queue, consumer=callback, no_ack=False)
236

  
237
    @reconnect_decorator
238
    def basic_wait(self):
239
        """Wait for messages from the queues declared by basic_consume.
240

  
241
        This function will block until a message arrives from the queues that
242
        have been declared with basic_consume. If the optional arguments
243
        'promise' is given, only messages for this promise will be delivered.
244

  
245
        """
246

  
247
        self.connection.read_frames()
248
        gevent.sleep(0)
249

  
250
    @reconnect_decorator
251
    def basic_get(self, queue):
252
        self.channel.basic.get(queue, no_ack=False)
253

  
254
    @reconnect_decorator
255
    def basic_ack(self, message):
256
        delivery_tag = message.delivery_info['delivery_tag']
257
        self.channel.basic.ack(delivery_tag)
258

  
259
    @reconnect_decorator
260
    def basic_nack(self, message):
261
        delivery_tag = message.delivery_info['delivery_tag']
262
        self.channel.basic.ack(delivery_tag)
263

  
264
    def close(self):
265
        try:
266
            if self.confirms:
267
                while self.unacked:
268
                    print self.unacked
269
                    self.get_confirms()
270
            self.channel.close()
271
            close_info = self.channel.close_info
272
            logger.info('Successfully closed channel. Info: %s', close_info)
273
            self.connection.close()
274
        except socket.error as e:
275
            logger.error('Connection closed while closing connection:%s',
276
                          e)
277

  
278
    def queue_delete(self, queue, if_unused=True, if_empty=True):
279
        self.channel.queue.delete(queue, if_unused, if_empty)
280

  
281
    def exchange_delete(self, exchange, if_unused=True):
282
        self.channel.exchange.delete(exchange, if_unused)
283

  
284
    def basic_class(self):
285
        pass
286

  
287

  
288
class  AMQPConnectionError():
289
    pass
b/snf-common/synnefo/lib/amqp_puka.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
""" Module implementing connection and communication with an AMQP broker.
35

  
36
AMQP Client's implemented by this module silenty detect connection failures and
37
try to reconnect to any available broker. Also publishing takes advantage of
38
publisher-confirms in order to guarantee that messages are properly delivered
39
to the broker.
40

  
41
"""
42

  
43
import logging
44

  
45
from puka import Client
46
from puka import spec_exceptions
47
from socket import error as socket_error
48
from time import sleep
49
from random import shuffle
50
from functools import wraps
51
from ordereddict import OrderedDict
52
from synnefo import settings
53

  
54
logging.basicConfig(level=logging.DEBUG,
55
                    format="[%(levelname)s %(asctime)s] %(message)s")
56
logger = logging.getLogger()
57

  
58

  
59
def reconnect_decorator(func):
60
    """
61
    Decorator for persistent connection with one or more AMQP brokers.
62

  
63
    """
64
    @wraps(func)
65
    def wrapper(self, *args, **kwargs):
66
        try:
67
            func(self, *args, **kwargs)
68
        except (socket_error, spec_exceptions.ConnectionForced) as e:
69
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
70
            self.consume_promises = []
71
            self.connect()
72

  
73
    return wrapper
74

  
75

  
76
class AMQPPukaClient(object):
77
    """
78
    AMQP generic client implementing most of the basic AMQP operations.
79

  
80
    """
81
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
82
                 confirms=True, confirm_buffer=100):
83
        """Format hosts as "amqp://username:pass@host:port" """
84

  
85
        self.hosts = hosts
86
        shuffle(self.hosts)
87

  
88
        self.max_retries = max_retries
89
        self.confirms = confirms
90
        self.confirm_buffer = confirm_buffer
91

  
92
        self.connection = None
93
        self.channel = None
94
        self.consumers = {}
95
        self.unacked = OrderedDict()
96
        self.unsend = OrderedDict()
97
        self.consume_promises = []
98

  
99
    def connect(self, retries=0):
100
        if retries > self.max_retries:
101
            logger.error("Aborting after %s retries", retries - 1)
102
            raise AMQPConnectionError('Aborting after %d connection failures.'\
103
                                      % (retries - 1))
104
            return
105

  
106
        # Pick up a host
107
        host = self.hosts.pop()
108
        self.hosts.insert(0, host)
109

  
110
        self.client = Client(host, pubacks=self.confirms)
111

  
112
        host = host.split('@')[-1]
113
        logger.debug('Connecting to node %s' % host)
114

  
115
        try:
116
            promise = self.client.connect()
117
            self.client.wait(promise)
118
        except socket_error as e:
119
            logger.error('Cannot connect to host %s: %s', host, e)
120
            if retries > 2 * len(self.hosts):
121
                sleep(1)
122
            return self.connect(retries + 1)
123

  
124
        logger.info('Successfully connected to host: %s', host)
125

  
126
        logger.info('Creating channel')
127

  
128
        if self.unacked:
129
            self._resend_unacked_messages()
130

  
131
        if self.unsend:
132
            self._resend_unsend_messages()
133

  
134
        if self.consumers:
135
            for queue, callback in self.consumers.items():
136
                self.basic_consume(queue, callback)
137

  
138
    def exchange_declare(self, exchange, type='direct'):
139
        """Declare an exchange
140
        @type exchange_name: string
141
        @param exchange_name: name of the exchange
142
        @type exchange_type: string
143
        @param exhange_type: one of 'direct', 'topic', 'fanout'
144

  
145
        """
146
        logger.info('Declaring %s exchange: %s', type, exchange)
147
        promise = self.client.exchange_declare(exchange=exchange,
148
                                               type=type,
149
                                               durable=True,
150
                                               auto_delete=False)
151
        self.client.wait(promise)
152

  
153
    @reconnect_decorator
154
    def queue_declare(self, queue, exclusive=False,
155
                      mirrored=True, mirrored_nodes='all'):
156
        """Declare a queue
157

  
158
        @type queue: string
159
        @param queue: name of the queue
160
        @param mirrored: whether the queue will be mirrored to other brokers
161
        @param mirrored_nodes: the policy for the mirrored queue.
162
            Available policies:
163
                - 'all': The queue is mirrored to all nodes and the
164
                  master node is the one to which the client is
165
                  connected
166
                - a list of nodes. The queue will be mirrored only to
167
                  the specified nodes, and the master will be the
168
                  first node in the list. Node names must be provided
169
                  and not host IP. example: [node1@rabbit,node2@rabbit]
170

  
171
        """
172
        logger.info('Declaring queue: %s', queue)
173

  
174
        if mirrored:
175
            if mirrored_nodes == 'all':
176
                arguments = {'x-ha-policy': 'all'}
177
            elif isinstance(mirrored_nodes, list):
178
                arguments = {'x-ha-policy': 'nodes',
179
                           'x-ha-policy-params': mirrored_nodes}
180
            else:
181
                raise AttributeError
182
        else:
183
            arguments = {}
184

  
185
        promise = self.client.queue_declare(queue=queue, durable=True,
186
                                            exclusive=exclusive,
187
                                            auto_delete=False,
188
                                            arguments=arguments)
189
        self.client.wait(promise)
190

  
191
    def queue_bind(self, queue, exchange, routing_key):
192
        logger.debug('Binding queue %s to exchange %s with key %s'
193
                 % (queue, exchange, routing_key))
194
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
195
                                         routing_key=routing_key)
196
        self.client.wait(promise)
197

  
198
    @reconnect_decorator
199
    def basic_publish(self, exchange, routing_key, body):
200
        """Publish a message with a specific routing key """
201
        self._publish(exchange, routing_key, body)
202

  
203
        self.flush_buffer()
204

  
205
        if self.confirms and len(self.unacked) >= self.confirm_buffer:
206
            self.get_confirms()
207

  
208
    @reconnect_decorator
209
    def basic_publish_multi(self, exchange, routing_key, bodies):
210
        for body in bodies:
211
            self.unsend[body] = (exchange, routing_key)
212

  
213
        for body in bodies:
214
            self._publish(exchange, routing_key, body)
215
            self.unsend.pop(body)
216

  
217
        self.flush_buffer()
218

  
219
        if self.confirms:
220
            self.get_confirms()
221

  
222
    def _publish(self, exchange, routing_key, body):
223
        # Persisent messages by default!
224
        headers = {}
225
        headers['delivery_mode'] = 2
226
        promise = self.client.basic_publish(exchange=exchange,
227
                                            routing_key=routing_key,
228
                                            body=body, headers=headers)
229

  
230
        if self.confirms:
231
            self.unacked[promise] = (exchange, routing_key, body)
232

  
233
        return promise
234

  
235
    @reconnect_decorator
236
    def flush_buffer(self):
237
        while self.client.needs_write():
238
            self.client.on_write()
239

  
240
    @reconnect_decorator
241
    def get_confirms(self):
242
        for promise in self.unacked.keys():
243
            self.client.wait(promise)
244
            self.unacked.pop(promise)
245

  
246
    @reconnect_decorator
247
    def _resend_unacked_messages(self):
248
        """Resend unacked messages in case of a connection failure."""
249
        msgs = self.unacked.values()
250
        self.unacked.clear()
251
        for exchange, routing_key, body in msgs:
252
            logger.debug('Resending message %s' % body)
253
            self.basic_publish(exchange, routing_key, body)
254

  
255
    @reconnect_decorator
256
    def _resend_unsend_messages(self):
257
        """Resend unsend messages in case of a connection failure."""
258
        for body in self.unsend.keys():
259
            (exchange, routing_key) = self.unsend[body]
260
            self.basic_publish(exchange, routing_key, body)
261
            self.unsend.pop(body)
262

  
263
    @reconnect_decorator
264
    def basic_consume(self, queue, callback, prefetch_count=0):
265
        """Consume from a queue.
266

  
267
        @type queue: string or list of strings
268
        @param queue: the name or list of names from the queues to consume
269
        @type callback: function
270
        @param callback: the callback function to run when a message arrives
271

  
272
        """
273
        # Store the queues and the callback
274
        self.consumers[queue] = callback
275

  
276
        def handle_delivery(promise, msg):
277
            """Hide promises and messages without body"""
278
            if 'body' in msg:
279
                callback(self, msg)
280
            else:
281
                logger.debug("Message without body %s" % msg)
282
                raise socket_error
283

  
284
        consume_promise = \
285
                self.client.basic_consume(queue=queue,
286
                                          prefetch_count=prefetch_count,
287
                                          callback=handle_delivery)
288

  
289
        self.consume_promises.append(consume_promise)
290
        return consume_promise
291

  
292
    @reconnect_decorator
293
    def basic_wait(self, promise=None, timeout=0):
294
        """Wait for messages from the queues declared by basic_consume.
295

  
296
        This function will block until a message arrives from the queues that
297
        have been declared with basic_consume. If the optional arguments
298
        'promise' is given, only messages for this promise will be delivered.
299

  
300
        """
301
        if promise is not None:
302
            self.client.wait(promise, timeout)
303
        else:
304
            self.client.wait(self.consume_promises)
305

  
306
    @reconnect_decorator
307
    def basic_get(self, queue):
308
        """Get a single message from a queue.
309

  
310
        This is a non-blocking method for getting messages from a queue.
311
        It will return None if the queue is empty.
312

  
313
        """
314
        get_promise = self.client.basic_get(queue=queue)
315
        result = self.client.wait(get_promise)
316
        if 'empty' in result:
317
            # The queue is empty
318
            return None
319
        else:
320
            return result
321

  
322
    @reconnect_decorator
323
    def basic_ack(self, message):
324
        self.client.basic_ack(message)
325

  
326
    @reconnect_decorator
327
    def basic_nack(self, message):
328
        #TODO:
329
        pass
330

  
331
    def close(self):
332
        """Check that messages have been send and close the connection."""
333
        try:
334
            if self.confirms:
335
                self.get_confirms()
336
            close_promise = self.client.close()
337
            self.client.wait(close_promise)
338
        except (socket_error, spec_exceptions.ConnectionForced) as e:
339
            logger.error('Connection closed while closing connection:%s',
340
                          e)
341

  
342
    def queue_delete(self, queue, if_unused=True, if_empty=True):
343
        """Delete a queue.
344

  
345
        Returns False if the queue does not exist
346
        """
347
        try:
348
            promise = self.client.queue_delete(queue=queue,
349
                                               if_unused=if_unused,
350
                                               if_empty=if_empty)
351
            self.client.wait(promise)
352
            return True
353
        except spec_exceptions.NotFound:
354
            logger.info("Queue %s does not exist", queue)
355
            return False
356

  
357
    def exchange_delete(self, exchange, if_unused=True):
358
        """Delete an exchange."""
359
        try:
360

  
361
            promise = self.client.exchange_delete(exchange=exchange,
362
                                                  if_unused=if_unused)
363
            self.client.wait(promise)
364
            return True
365
        except spec_exceptions.NotFound:
366
            logger.info("Exchange %s does not exist", exchange)
367
            return False
368

  
369
    @reconnect_decorator
370
    def basic_cancel(self, promise=None):
371
        """Cancel consuming from a queue. """
372
        if promise is not None:
373
            self.client.basic_cancel(promise)
374
        else:
375
            for promise in self.consume_promises:
376
                self.client.basic_cancel(promise)
377

  
378

  
379
class AMQPConnectionError():
380
    pass
b/snf-cyclades-app/synnefo/app_settings/default/queues.py
9 9
RABBIT_PASSWORD = "rabbit-password"
10 10
RABBIT_VHOST = "/"
11 11
AMQP_HOSTS=["amqp://username:password@host:port"]
12
# AMQP Backend Client. One of: 'puka', 'haigha'
13
AMQP_BACKEND='puka'
12 14

  
13 15
EXCHANGE_GANETI = "ganeti"  # Messages from Ganeti
14 16
EXCHANGE_CRON = "cron"      # Messages from periodically triggered tasks
b/snf-cyclades-app/synnefo/logic/dispatcher.py
107 107

  
108 108
        # Declare queues and exchanges
109 109
        for exchange in settings.EXCHANGES:
110
            self.client.exchange_declare(exchange_name=exchange,
111
                                         exchange_type="topic")
110
            self.client.exchange_declare(exchange=exchange,
111
                                         type="topic")
112 112

  
113 113
        for queue in QUEUES:
114 114
            # Queues are mirrored to all RabbitMQ brokers
115
            self.client.queue_declare(queue=queue,mirrored=True)
115
            self.client.queue_declare(queue=queue, mirrored=True)
116 116

  
117 117
        bindings = BINDINGS
118 118

  
b/snf-cyclades-gtools/synnefo/ganeti/eventd.py
114 114
    def __init__(self, logger):
115 115
        pyinotify.ProcessEvent.__init__(self)
116 116
        self.logger = logger
117
        self.client = AMQPClient()
117
        self.client = AMQPClient(confirm_buffer=100)
118 118
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
119 119
        self.client.connect()
120
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
120 121
        handler_logger.info("Connected succesfully")
121 122

  
122 123
        self.op_handlers = {"INSTANCE": self.process_instance_op,
......
179 180
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
180 181

  
181 182
            # Send the message to RabbitMQ
182
            self.client.basic_publish(exchange=settings.EXCHANGE_GANETI,
183
                                      routing_key=routekey,
184
                                      body=msg)
185

  
183
            self.client.basic_publish(settings.EXCHANGE_GANETI,
184
                                      routekey,
185
                                      msg)
186 186

  
187 187
    def process_instance_op(self, op, job_id):
188 188
        """ Process OP_INSTANCE_* opcodes.

Also available in: Unified diff