Revision 597e7eba

b/snf-common/synnefo/lib/amqp.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
""" Module implementing connection and communication with an AMQP broker.
35

  
36
AMQP Client's implemented by this module silenty detect connection failures and
37
try to reconnect to any available broker. Also publishing takes advantage of
38
publisher-confirms in order to guarantee that messages are properly delivered
39
to the broker.
40

  
41
"""
42

  
43
import puka
44
import logging
45
import socket
46

  
47
from time import sleep
48
from random import shuffle
49
from functools import wraps
50

  
51
from ordereddict import OrderedDict
52
from synnefo import settings
53

  
54
AMQP_HOSTS = settings.AMQP_HOSTS
55

  
56
MAX_RETRIES = 20
57

  
58
log = logging.getLogger()
59

  
60

  
61
def reconnect_decorator(func):
62
    """
63
    Decorator for persistent connection with one or more AMQP brokers.
64

  
65
    """
66
    @wraps(func)
67
    def wrapper(self, *args, **kwargs):
68
        try:
69
            if self.client.sd is None:
70
                self.connect()
71
            return func(self, *args, **kwargs)
72
        except (socket.error, puka.spec_exceptions.ConnectionForced):
73
            log.debug("Connection closed. Reconnecting")
74
            self.connect()
75
            return wrapper(self, *args, **kwargs)
76
        except Exception:
77
            if self.client.sd is None:
78
                log.debug("Connection closed. Reconnecting")
79
                self.connect()
80
                return wrapper(self, *args, **kwargs)
81
            else:
82
                raise
83
    return wrapper
84

  
85

  
86
class AMQPClient():
87
    """
88
    AMQP generic client implementing most of the basic AMQP operations.
89

  
90
    This client confirms delivery of each published message before publishing
91
    the next one, which results in low performance. Better performance can be
92
    achieved by using AMQPAsyncClient.
93

  
94
    """
95
    def __init__(self, hosts=AMQP_HOSTS, max_retries=MAX_RETRIES):
96
        """Format hosts as "amqp://username:pass@host:port" """
97
        # Shuffle the elements of the host list for better load balancing
98
        self.hosts = hosts
99
        shuffle(self.hosts)
100
        self.max_retries = max_retries
101

  
102
        self.promises = []
103
        self.consume_promises = []
104
        self.consume_info = {}
105

  
106
    def connect(self, retries=0):
107
        # Pick up a host
108
        url = self.hosts.pop()
109
        self.hosts.insert(0, url)
110

  
111
        if retries > self.max_retries:
112
            raise AMQPError("Cannot connect to any node after %s attemps" % retries)
113
        if retries > 2 * len(self.hosts):
114
            sleep(1)
115

  
116
        self.client = puka.Client(url, pubacks=True)
117

  
118
        host = url.split('@')[-1]
119
        log.debug('Connecting to node %s' % host)
120

  
121
        try:
122
            promise = self.client.connect()
123
            self.client.wait(promise)
124
        except socket.error as e:
125
            log.debug('Cannot connect to node %s.' % host)
126
            return self.connect(retries+1)
127

  
128
    @reconnect_decorator
129
    def exchange_declare(self, exchange_name, exchange_type='direct',
130
                         durable=True, auto_delete=False):
131
        """Declare an exchange
132
        @type exchange_name: string
133
        @param exchange_name: name of the exchange
134
        @type exchange_type: string
135
        @param exhange_type: one of 'direct', 'topic', 'fanout'
136

  
137
        """
138
        log.debug('Declaring exchange %s of %s type.'
139
                  %(exchange_name, exchange_type))
140
        promise = self.client.exchange_declare(exchange=exchange_name,
141
                                               type=exchange_type,
142
                                               durable=durable,
143
                                               auto_delete=auto_delete)
144
        self.client.wait(promise)
145
        log.debug('Exchange %s declared succesfully ' % exchange_name)
146

  
147
    @reconnect_decorator
148
    def queue_declare(self, queue, durable=True, exclusive=False,
149
                      auto_delete=False, mirrored=True, mirrored_nodes='all'):
150
        """Declare a queue
151

  
152
        @type queue: string
153
        @param queue: name of the queue
154
        @param mirrored: whether the queue will be mirrored to other brokers
155
        @param mirrored_nodes: the policy for the mirrored queue.
156
            Available policies:
157
                - 'all': The queue is mirrored to all nodes and the
158
                  master node is the one to which the client is
159
                  connected
160
                - a list of nodes. The queue will be mirrored only to
161
                  the specified nodes, and the master will be the
162
                  first node in the list. Node names must be provided
163
                  and not host IP. example: [node1@rabbit,node2@rabbit]
164

  
165
        """
166

  
167
        log.debug('Declaring queue %s' % queue)
168
        if mirrored:
169
            if mirrored_nodes == 'all':
170
                arguments={'x-ha-policy':'all'}
171
            elif isinstance(mirrored_nodes, list):
172
                arguments={'x-ha-policy':'nodes', 'x-ha-policy-params':mirrored_nodes}
173
            else:
174
                raise AttributeError
175
        else:
176
            arguments = {}
177

  
178
        promise = self.client.queue_declare(queue=queue, durable=durable,
179
                                            exclusive=exclusive,
180
                                            auto_delete=auto_delete,
181
                                            arguments=arguments)
182
        self.client.wait(promise)
183
        log.debug('Queue %s declared successfully.' % queue)
184

  
185
    @reconnect_decorator
186
    def queue_bind(self, queue, exchange, routing_key):
187
        log.debug('Binding queue %s to exchange %s with key %s'
188
                 % (queue, exchange, routing_key))
189
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
190
                                         routing_key=routing_key)
191
        self.client.wait(promise)
192
        log.debug('Binding completed successfully')
193

  
194
    def basic_publish_multi(self, exhange, routing_key, msgs, headers={}):
195
        """Send many messages to the  same exchange and with the same key """
196
        for msg in msgs:
197
            self.basic_publish(exchange, routing_key, headers, msg)
198

  
199
    @reconnect_decorator
200
    def basic_publish(self, exchange, routing_key, body, headers={}):
201
        """Publish a message with a specific routing key """
202

  
203
        # Persisent messages by default!
204
        if not 'delivery_mode' in headers:
205
            headers['delivery_mode'] = 2
206

  
207
        promise = self.client.basic_publish(exchange=exchange,
208
                                            routing_key=routing_key,
209
                                            body=body, headers=headers)
210
        self.client.wait(promise)
211

  
212
    @reconnect_decorator
213
    def basic_consume(self, queue, callback, prefetch_count=0):
214
        """Consume from a queue.
215

  
216
        @type queue: string or list of strings
217
        @param queue: the name or list of names from the queues to consume
218
        @type callback: function
219
        @param callback: the callback function to run when a message arrives
220

  
221
        """
222
        if isinstance(queue, str):
223
            queue = [queue]
224
        elif isinstance(queue, list):
225
            pass
226
        else:
227
            raise AttributeError
228

  
229
        # Store the queues and the callback
230
        for q in queue:
231
            self.consume_info[q] = callback
232

  
233
        def handle_delivery(promise, result):
234
            """Hide promises and messages without body"""
235
            if 'body' in result:
236
                callback(self, result)
237
            else:
238
                log.debug("Message without body %s" % result)
239
                return
240

  
241
        consume_promise = \
242
                self.client.basic_consume_multi(queues=queue,
243
                                                prefetch_count=prefetch_count,
244
                                                callback=handle_delivery)
245
        self.consume_promises.append(consume_promise)
246
        return consume_promise
247

  
248
    def basic_wait(self, promise=None, timeout=0):
249
        """Wait for messages from the queues declared by basic_consume.
250

  
251
        This function will block until a message arrives from the queues that
252
        have been declared with basic_consume. If the optional arguments
253
        'promise' is given, only messages for this promise will be delivered.
254

  
255
        """
256
        try:
257
            if promise is not None:
258
               self.client.wait(promise, timeout)
259
            else:
260
               self.client.wait(self.consume_promises)
261
        except (socket.error, puka.spec_exceptions.ConnectionForced):
262
            log.debug('Connection closed while receiving messages.')
263
            self.consume_promises = []
264
            self.connect()
265
            for queues, callback in self.consume_info.items():
266
                self.basic_consume(queues, callback)
267
            self.basic_wait(timeout)
268
        except Exception as e:
269
            if self.client.sd is None:
270
                log.debug('Connection closed while receiving messages.')
271
                self.consume_promises = []
272
                self.connect()
273
                for queues, callback in self.consume_info.items():
274
                    self.basic_consume(queues, callback)
275
                self.basic_wait(timeout)
276
            else:
277
                log.error("Exception while waiting for messages ",e)
278
                raise
279

  
280
    def basic_cancel(self, promise=None):
281
        """Cancel consuming from a queue. """
282
        try:
283
            if promise is not None:
284
                self.client.basic_cancel(promise)
285
            else:
286
                for promise in self.consume_promises:
287
                    self.client.basic_cancel(promise)
288
        except (socket.error, puka.spec_exceptions.ConnectionForced):
289
            pass
290
        except Exception as e:
291
            if self.client.sd is None:
292
                pass;
293
            else:
294
                log.error("Exception while canceling client ",e)
295
                raise
296

  
297

  
298
    @reconnect_decorator
299
    def basic_get(self, queue):
300
        """Get a single message from a queue.
301

  
302
        This is a non-blocking method for getting messages from a queue.
303
        It will return None if the queue is empty.
304

  
305
        """
306
        get_promise = self.client.basic_get(queue=queue)
307
        result = self.client.wait(get_promise)
308
        if 'empty' in result:
309
            # The queue is empty
310
            return None
311
        else:
312
            return result
313

  
314
    def basic_ack(self, message):
315
        """Acknowledge a message. """
316
        try:
317
            self.client.basic_ack(message)
318
            return True
319
        except (socket.error, puka.spec_exceptions.ConnectionForced):
320
            return False
321
        except Exception as e:
322
            if self.client.sd is None:
323
                return False
324
            else:
325
                log.error("Exception while acknowleding message ",e)
326
                raise
327

  
328
    def close(self):
329
        """Close the connection with the AMQP broker. """
330
        try:
331
            close_promise = self.client.close()
332
            self.client.wait(close_promise)
333
        except (socket.error, puka.spec_exceptions.ConnectionForced):
334
            pass
335

  
336
    def queue_delete(self, queue, if_unused=False, if_empty=False):
337
        """Delete a queue.
338

  
339
        Returns False if the queue does not exist
340
        """
341
        try:
342
            promise = self.client.queue_delete(queue=queue, if_unused=if_unused,
343
                                               if_empty=if_empty)
344
            self.client.wait(promise)
345
            return True
346
        except puka.spec_exceptions.NotFound:
347
            log.debug("Queue %s does not exist", queue)
348
            return False
349

  
350
    def exchange_delete(self, exchange, if_unused=False):
351
        """Delete an exchange."""
352
        try:
353

  
354
            promise = self.client.exchange_delete(exchange=exchange,
355
                                                  if_unused=if_unused)
356
            self.client.wait(promise)
357
            return True
358
        except puka.spec_exceptions.NotFound:
359
            log.debug("Exchange %s does not exist", exchange)
360
            return False
361

  
362

  
363

  
364
class AMQPAsyncClient(AMQPClient):
365
    """AMQP client implementing asynchronous sending of messages.
366

  
367
    This client is more efficient that AMQPClient in sending large number
368
    of messages. Messages are confirmed to be sent to the broker in batches
369
    of a size specified by the 'max_buffer' argument.
370

  
371
    Messages are kept to an internal buffer until the max_buffer messages are
372
    sent or until the connection closes. Explicit delivering of messages can be
373
    achieved by calling 'wait_for_promises' method.
374

  
375
    Always remember to close the connection, or messages may be lost.
376

  
377
    """
378
    def __init__(self, hosts=AMQP_HOSTS, max_buffer=5000,
379
                 max_retries=MAX_RETRIES):
380
        AMQPClient.__init__(self, hosts, max_retries)
381
        self.published_msgs = OrderedDict()
382
        self._promise_counter = 0
383
        self.max_buffer=max_buffer
384

  
385
    def basic_publish_multi(self, exhange, routing_key, msgs, headers={}):
386
        while msgs:
387
            msg = msgs.pop[0]
388
            self.basic_publish(exchange, routing_key, msg, headers)
389

  
390
    def basic_publish(self, exchange, routing_key, body, headers={}):
391
        """Publish a message.
392

  
393
        The message will not be actually published to the broker until
394
        'max_buffer' messages are published or wait_for_promises is called.
395

  
396
        """
397
        try:
398
            if not 'delivery_mode' in headers:
399
                headers['delivery_mode'] = 2
400

  
401
            promise = self.client.basic_publish(exchange=exchange,
402
                                                routing_key=routing_key,
403
                                                body=body,
404
                                                headers=headers)
405

  
406
            self._promise_counter += 1
407
            self.published_msgs[promise] = {'exchange':exchange,
408
                                            'routing_key':routing_key,
409
                                            'body':body,
410
                                            'headers':headers}
411

  
412
            if self._promise_counter > self.max_buffer:
413
                self.wait_for_promises()
414

  
415
        except (socket.error, puka.spec_exceptions.ConnectionForced):
416
            log.debug('Connection closed while sending message %s.\
417
                      Reconnecting and retrying' % body)
418
            self.connect()
419
            self.basic_publish(exchange, routing_key, body, headers)
420
            return self._retry_publish_msgs()
421
        except Exception as e:
422
            if self.client.sd is None:
423
                log.debug('Connection closed while sending message %s.\
424
                             Reconnecting and retrying' % body)
425
                self.connect()
426
                self.basic_publish(exchange, routing_key, body, headers)
427
                return self._retry_publish_msgs()
428
            else:
429
                log.error("Exception while publishing message ",e)
430
                raise
431

  
432
    def wait_for_promises(self):
433
        """Wait for confirm that all messages are sent."""
434
        try:
435
            promises = self.published_msgs.keys()
436
            for promise in promises:
437
                self.client.wait(promise)
438
                self.published_msgs.pop(promise)
439
            self._promise_counter = 0
440
        except (socket.error, puka.spec_exceptions.ConnectionForced):
441
            log.debug('Connection closed while waiting from promises')
442
            self.connect()
443
            self._retry_publish_msgs()
444
        except Exception as e:
445
            if self.client.sd is None:
446
                log.debug('Connection closed while waiting from promises')
447
                self.connect()
448
                self._retry_publish_msgs()
449
            else:
450
                log.error("Exception while waiting for promises ",e)
451
                raise
452

  
453
    def _retry_publish_msgs(self):
454
        """Resend messages in case of a connection failure."""
455
        values = self.published_msgs.values()
456
        self.published_msgs = OrderedDict()
457
        for message in values:
458
            exchange = message['exchange']
459
            key = message['routing_key']
460
            body = message['body']
461
            headers = message['headers']
462
            log.debug('Resending message %s' % body)
463
            self.basic_publish(exchange, key, body, headers)
464

  
465
    def close(self):
466
        """Check that messages have been send and close the connection."""
467
        try:
468
            self.wait_for_promises()
469
            close_promise = self.client.close()
470
            self.client.wait(close_promise)
471
        except (socket.error, puka.spec_exceptions.ConnectionForced):
472
            pass
473
        except Exception as e:
474
            if self.client.sd is None:
475
                pass
476
            else:
477
                log.error("Exception while closing the connection ",e)
478
                raise
479

  
480
    def flush_buffer(self):
481
        try:
482
            self.client.on_write()
483
        except (socket.error, puka.spec_exceptions.ConnectionForced):
484
            log.debug('Connection closed while clearing buffer')
485
            self.connect()
486
            return self._retry_publish_msgs()
487
        except Exception as e:
488
            if self.client.sd is None:
489
                log.debug('Connection closed while clearing buffer')
490
                self.connect()
491
                return self._retry_publish_msgs()
492
            else:
493
                log.error("Exception while clearing buffer ",e)
494
                raise
495

  
496
class AMQPConsumer(AMQPClient):
497
    """AMQP client implementing a consumer without callbacks.
498

  
499
    """
500
    def __init__(self, hosts=AMQP_HOSTS, max_buffer=5000,
501
                 max_retries=MAX_RETRIES):
502
        AMQPClient.__init__(self, hosts, max_retries)
503
        self.consume_queues = []
504
        self.consume_promises = []
505

  
506
    @reconnect_decorator
507
    def basic_consume(self, queue, prefetch_count=0):
508
        """Consume from a queue.
509

  
510
        @type queue: string or list of strings
511
        @param queue: the name or list of names from the queues to consume
512
        @type callback: function
513
        @param callback: the callback function to run when a message arrives
514

  
515
        """
516
        if isinstance(queue, str):
517
            queue = [queue]
518
        elif isinstance(queue, list):
519
            pass
520
        else:
521
            raise AttributeError
522

  
523
        # Store the queues and the callback
524
        for q in queue:
525
            self.consume_queues.append(q)
526

  
527
        consume_promise = \
528
                self.client.basic_consume_multi(queues=queue,
529
                                                prefetch_count=prefetch_count)
530
        self.consume_promises.append(consume_promise)
531
        return consume_promise
532

  
533
    def basic_wait(self, promise=None, timeout=0):
534
        """Wait for messages from the queues declared by basic_consume.
535

  
536
        This function will block until a message arrives from the queues that
537
        have been declared with basic_consume. If the optional arguments
538
        'promise' is given, only messages for this promise will be delivered.
539

  
540
        """
541
        try:
542
            if promise is not None:
543
               return self.client.wait(promise, timeout)
544
            else:
545
               return self.client.wait(self.consume_promises)
546
        except (socket.error, puka.spec_exceptions.ConnectionForced):
547
            log.debug('Connection closed while receiving messages.')
548
            self.consume_promises = []
549
            self.connect()
550
            for queues in self.consume_queues:
551
                self.basic_consume(queues)
552
            self.basic_wait(timeout)
553
        except Exception as e:
554
            if self.client.sd is None:
555
                log.debug('Connection closed while receiving messages.')
556
                self.consume_promises = []
557
                self.connect()
558
                for queues in self.consume_queues:
559
                    self.basic_consume(queues)
560
                self.basic_wait(timeout)
561
            else:
562
                log.error("Exception while waiting for messages ",e)
563
                raise
564

  
565

  
566
class AMQPError(Exception):
567
    def __init__(self, msg):
568
        self.msg = msg
569
    def __str__(self):
570
        return repr(self.msg)
b/snf-common/synnefo/lib/ordereddict.py
1
# Copyright (c) 2009 Raymond Hettinger
2
#
3
# Permission is hereby granted, free of charge, to any person
4
# obtaining a copy of this software and associated documentation files
5
# (the "Software"), to deal in the Software without restriction,
6
# including without limitation the rights to use, copy, modify, merge,
7
# publish, distribute, sublicense, and/or sell copies of the Software,
8
# and to permit persons to whom the Software is furnished to do so,
9
# subject to the following conditions:
10
#
11
#     The above copyright notice and this permission notice shall be
12
#     included in all copies or substantial portions of the Software.
13
#
14
#     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15
#     EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
16
#     OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17
#     NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18
#     HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19
#     WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20
#     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
21
#     OTHER DEALINGS IN THE SOFTWARE.
22

  
23
from UserDict import DictMixin
24

  
25
class OrderedDict(dict, DictMixin):
26

  
27
    def __init__(self, *args, **kwds):
28
        if len(args) > 1:
29
            raise TypeError('expected at most 1 arguments, got %d' % len(args))
30
        try:
31
            self.__end
32
        except AttributeError:
33
            self.clear()
34
        self.update(*args, **kwds)
35

  
36
    def clear(self):
37
        self.__end = end = []
38
        end += [None, end, end]         # sentinel node for doubly linked list
39
        self.__map = {}                 # key --> [key, prev, next]
40
        dict.clear(self)
41

  
42
    def __setitem__(self, key, value):
43
        if key not in self:
44
            end = self.__end
45
            curr = end[1]
46
            curr[2] = end[1] = self.__map[key] = [key, curr, end]
47
        dict.__setitem__(self, key, value)
48

  
49
    def __delitem__(self, key):
50
        dict.__delitem__(self, key)
51
        key, prev, next = self.__map.pop(key)
52
        prev[2] = next
53
        next[1] = prev
54

  
55
    def __iter__(self):
56
        end = self.__end
57
        curr = end[2]
58
        while curr is not end:
59
            yield curr[0]
60
            curr = curr[2]
61

  
62
    def __reversed__(self):
63
        end = self.__end
64
        curr = end[1]
65
        while curr is not end:
66
            yield curr[0]
67
            curr = curr[1]
68

  
69
    def popitem(self, last=True):
70
        if not self:
71
            raise KeyError('dictionary is empty')
72
        if last:
73
            key = reversed(self).next()
74
        else:
75
            key = iter(self).next()
76
        value = self.pop(key)
77
        return key, value
78

  
79
    def __reduce__(self):
80
        items = [[k, self[k]] for k in self]
81
        tmp = self.__map, self.__end
82
        del self.__map, self.__end
83
        inst_dict = vars(self).copy()
84
        self.__map, self.__end = tmp
85
        if inst_dict:
86
            return (self.__class__, (items,), inst_dict)
87
        return self.__class__, (items,)
88

  
89
    def keys(self):
90
        return list(self)
91

  
92
    setdefault = DictMixin.setdefault
93
    update = DictMixin.update
94
    pop = DictMixin.pop
95
    values = DictMixin.values
96
    items = DictMixin.items
97
    iterkeys = DictMixin.iterkeys
98
    itervalues = DictMixin.itervalues
99
    iteritems = DictMixin.iteritems
100

  
101
    def __repr__(self):
102
        if not self:
103
            return '%s()' % (self.__class__.__name__,)
104
        return '%s(%r)' % (self.__class__.__name__, self.items())
105

  
106
    def copy(self):
107
        return self.__class__(self)
108

  
109
    @classmethod
110
    def fromkeys(cls, iterable, value=None):
111
        d = cls()
112
        for key in iterable:
113
            d[key] = value
114
        return d
115

  
116
    def __eq__(self, other):
117
        if isinstance(other, OrderedDict):
118
            if len(self) != len(other):
119
                return False
120
            for p, q in  zip(self.items(), other.items()):
121
                if p != q:
122
                    return False
123
            return True
124
        return dict.__eq__(self, other)
125

  
126
    def __ne__(self, other):
127
        return not self == other
b/snf-cyclades-app/synnefo/app_settings/default/queues.py
8 8
RABBIT_USERNAME = "username"
9 9
RABBIT_PASSWORD = "password"
10 10
RABBIT_VHOST = "/"
11
AMQP_HOSTS=["amqp://username:password@host:port"]
11 12

  
12 13
EXCHANGE_GANETI = "ganeti"  # Messages from Ganeti
13 14
EXCHANGE_CRON = "cron"      # Messages from periodically triggered tasks
b/snf-cyclades-gtools/synnefo/ganeti/settings.py
4 4
RABBIT_USERNAME = "rabbit-username"
5 5
RABBIT_PASSWORD = "rabbit-password"
6 6
RABBIT_VHOST = "/"
7
AMQP_HOSTS=["amqp://username:password@host:port"]
7 8
BACKEND_PREFIX_ID = "snf-"
8 9
EXCHANGE_GANETI = "ganeti"
9 10
PUBLIC_IPV6_PREFIX = "2001:db8::/64"

Also available in: Unified diff