Revision 30333691

/dev/null
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
""" Module implementing connection and communication with an AMQP broker.
35

  
36
AMQP Client's instatiated by this module silenty detect connection failures and
37
try to reconnect to any available broker. Also publishing takes advantage of
38
publisher-confirms in order to guarantee that messages are properly delivered
39
to the broker.
40

  
41
"""
42

  
43
from synnefo import settings
44

  
45
if settings.AMQP_BACKEND == 'puka':
46
    from amqp_puka import AMQPPukaClient as Client
47
elif settings.AMQP_BACKEND == 'haigha':
48
    from amqp_haigha import AMQPHaighaClient as Client
49
else:
50
    raise Exception('Unknown Backend %s' % settings.AMQP_BACKEND)
51

  
52

  
53
class AMQPClient(object):
54
    """
55
    AMQP generic client implementing most of the basic AMQP operations.
56

  
57
    This class will create an object of AMQPPukaClient or AMQPHaigha client
58
    depending on AMQP_BACKEND setting
59
    """
60
    def __new__(cls, *args, **kwargs):
61
        return Client(*args, **kwargs)
b/snf-common/synnefo/lib/amqp/__init__.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
""" Module implementing connection and communication with an AMQP broker.
35

  
36
AMQP Client's instatiated by this module silenty detect connection failures and
37
try to reconnect to any available broker. Also publishing takes advantage of
38
publisher-confirms in order to guarantee that messages are properly delivered
39
to the broker.
40

  
41
"""
42

  
43
from synnefo import settings
44

  
45
if settings.AMQP_BACKEND == 'puka':
46
    from amqp_puka import AMQPPukaClient as Client
47
elif settings.AMQP_BACKEND == 'haigha':
48
    from amqp_haigha import AMQPHaighaClient as Client
49
else:
50
    raise Exception('Unknown Backend %s' % settings.AMQP_BACKEND)
51

  
52

  
53
class AMQPClient(object):
54
    """
55
    AMQP generic client implementing most of the basic AMQP operations.
56

  
57
    This class will create an object of AMQPPukaClient or AMQPHaigha client
58
    depending on AMQP_BACKEND setting
59
    """
60
    def __new__(cls, *args, **kwargs):
61
        return Client(*args, **kwargs)
b/snf-common/synnefo/lib/amqp/amqp_haigha.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from haigha.connections import RabbitConnection
35
from haigha.message import Message
36
from haigha import exceptions
37
from random import shuffle
38
from time import sleep
39
import logging
40
import socket
41
from synnefo import settings
42
from ordereddict import OrderedDict
43
import gevent
44
from gevent import monkey
45
from functools import wraps
46

  
47

  
48
logging.basicConfig(level=logging.INFO,
49
                    format="[%(levelname)s %(asctime)s] %(message)s")
50
logger = logging.getLogger('haigha')
51

  
52
sock_opts = {
53
    (socket.IPPROTO_TCP, socket.TCP_NODELAY): 1,
54
}
55

  
56

  
57
def reconnect_decorator(func):
58
    """
59
    Decorator for persistent connection with one or more AMQP brokers.
60

  
61
    """
62
    @wraps(func)
63
    def wrapper(self, *args, **kwargs):
64
        try:
65
            func(self, *args, **kwargs)
66
        except (socket.error, exceptions.ConnectionError) as e:
67
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
68
            self.connect()
69

  
70
    return wrapper
71

  
72

  
73
class AMQPHaighaClient():
74
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
75
                 confirms=True, confirm_buffer=200):
76
        self.hosts = hosts
77
        shuffle(self.hosts)
78

  
79
        self.max_retries = max_retries
80
        self.confirms = confirms
81
        self.confirm_buffer = confirm_buffer
82

  
83
        self.connection = None
84
        self.channel = None
85
        self.consumers = {}
86
        self.unacked = OrderedDict()
87

  
88
    def connect(self, retries=0):
89
        if retries > self.max_retries:
90
            logger.error("Aborting after %s retries", retries - 1)
91
            raise AMQPConnectionError('Aborting after %d connection failures.'
92
                                      % (retries - 1))
93
            return
94

  
95
        # Pick up a host
96
        host = self.hosts.pop()
97
        self.hosts.insert(0, host)
98

  
99
        #Patch gevent
100
        monkey.patch_all()
101

  
102
        try:
103
            self.connection = \
104
                RabbitConnection(logger=logger, debug=True,
105
                                 user='rabbit', password='r@bb1t',
106
                                 vhost='/', host=host,
107
                                 heartbeat=None,
108
                                 sock_opts=sock_opts,
109
                                 transport='gevent')
110
        except socket.error as e:
111
            logger.error('Cannot connect to host %s: %s', host, e)
112
            if retries > 2 * len(self.hosts):
113
                sleep(1)
114
            return self.connect(retries + 1)
115

  
116
        logger.info('Successfully connected to host: %s', host)
117

  
118
        logger.info('Creating channel')
119
        self.channel = self.connection.channel()
120

  
121
        if self.confirms:
122
            self._confirm_select()
123

  
124
        if self.unacked:
125
            self._resend_unacked_messages()
126

  
127
        if self.consumers:
128
            for queue, callback in self.consumers.items():
129
                self.basic_consume(queue, callback)
130

  
131
    def exchange_declare(self, exchange, type='direct'):
132
        """Declare an exchange
133
        @type exchange_name: string
134
        @param exchange_name: name of the exchange
135
        @type exchange_type: string
136
        @param exhange_type: one of 'direct', 'topic', 'fanout'
137

  
138
        """
139

  
140
        logger.info('Declaring %s exchange: %s', type, exchange)
141
        self.channel.exchange.declare(exchange, type,
142
                                      auto_delete=False, durable=True)
143

  
144
    def queue_declare(self, queue, exclusive=False, mirrored=True,
145
                      mirrored_nodes='all'):
146
        """Declare a queue
147

  
148
        @type queue: string
149
        @param queue: name of the queue
150
        @param mirrored: whether the queue will be mirrored to other brokers
151
        @param mirrored_nodes: the policy for the mirrored queue.
152
            Available policies:
153
                - 'all': The queue is mirrored to all nodes and the
154
                  master node is the one to which the client is
155
                  connected
156
                - a list of nodes. The queue will be mirrored only to
157
                  the specified nodes, and the master will be the
158
                  first node in the list. Node names must be provided
159
                  and not host IP. example: [node1@rabbit,node2@rabbit]
160

  
161
        """
162

  
163
        logger.info('Declaring queue: %s', queue)
164
        if mirrored:
165
            if mirrored_nodes == 'all':
166
                arguments = {'x-ha-policy': 'all'}
167
            elif isinstance(mirrored_nodes, list):
168
                arguments = {'x-ha-policy': 'nodes',
169
                             'x-ha-policy-params': mirrored_nodes}
170
            else:
171
                raise AttributeError
172
        else:
173
            arguments = {}
174

  
175
        self.channel.queue.declare(queue, durable=True, exclusive=exclusive,
176
                                   auto_delete=False, arguments=arguments)
177

  
178
    def queue_bind(self, queue, exchange, routing_key):
179
        logger.info('Binding queue %s to exchange %s with key %s', queue,
180
                    exchange, routing_key)
181
        self.channel.queue.bind(queue=queue, exchange=exchange,
182
                                routing_key=routing_key)
183

  
184
    def _confirm_select(self):
185
        logger.info('Setting channel to confirm mode')
186
        self.channel.confirm.select()
187
        self.channel.basic.set_ack_listener(self._ack_received)
188
        self.channel.basic.set_nack_listener(self._nack_received)
189

  
190
    @reconnect_decorator
191
    def basic_publish(self, exchange, routing_key, body):
192
        msg = Message(body, delivery_mode=2)
193
        mid = self.channel.basic.publish(msg, exchange, routing_key)
194
        if self.confirms:
195
            self.unacked[mid] = (exchange, routing_key, body)
196
            if len(self.unacked) > self.confirm_buffer:
197
                self.get_confirms()
198

  
199
        logger.debug('Published message %s with id %s', body, mid)
200

  
201
    @reconnect_decorator
202
    def get_confirms(self):
203
        self.connection.read_frames()
204

  
205
    @reconnect_decorator
206
    def _resend_unacked_messages(self):
207
        msgs = self.unacked.values()
208
        self.unacked.clear()
209
        for exchange, routing_key, body in msgs:
210
            logger.debug('Resending message %s', body)
211
            self.basic_publish(exchange, routing_key, body)
212

  
213
    @reconnect_decorator
214
    def _ack_received(self, mid):
215
        print mid
216
        logger.debug('Received ACK for message with id %s', mid)
217
        self.unacked.pop(mid)
218

  
219
    @reconnect_decorator
220
    def _nack_received(self, mid):
221
        logger.error('Received NACK for message with id %s. Retrying.', mid)
222
        (exchange, routing_key, body) = self.unacked[mid]
223
        self.basic_publish(exchange, routing_key, body)
224

  
225
    def basic_consume(self, queue, callback):
226
        """Consume from a queue.
227

  
228
        @type queue: string or list of strings
229
        @param queue: the name or list of names from the queues to consume
230
        @type callback: function
231
        @param callback: the callback function to run when a message arrives
232

  
233
        """
234

  
235
        self.consumers[queue] = callback
236
        self.channel.basic.consume(queue, consumer=callback, no_ack=False)
237

  
238
    @reconnect_decorator
239
    def basic_wait(self):
240
        """Wait for messages from the queues declared by basic_consume.
241

  
242
        This function will block until a message arrives from the queues that
243
        have been declared with basic_consume. If the optional arguments
244
        'promise' is given, only messages for this promise will be delivered.
245

  
246
        """
247

  
248
        self.connection.read_frames()
249
        gevent.sleep(0)
250

  
251
    @reconnect_decorator
252
    def basic_get(self, queue):
253
        self.channel.basic.get(queue, no_ack=False)
254

  
255
    @reconnect_decorator
256
    def basic_ack(self, message):
257
        delivery_tag = message.delivery_info['delivery_tag']
258
        self.channel.basic.ack(delivery_tag)
259

  
260
    @reconnect_decorator
261
    def basic_nack(self, message):
262
        delivery_tag = message.delivery_info['delivery_tag']
263
        self.channel.basic.ack(delivery_tag)
264

  
265
    def close(self):
266
        try:
267
            if self.confirms:
268
                while self.unacked:
269
                    print self.unacked
270
                    self.get_confirms()
271
            self.channel.close()
272
            close_info = self.channel.close_info
273
            logger.info('Successfully closed channel. Info: %s', close_info)
274
            self.connection.close()
275
        except socket.error as e:
276
            logger.error('Connection closed while closing connection:%s', e)
277

  
278
    def queue_delete(self, queue, if_unused=True, if_empty=True):
279
        self.channel.queue.delete(queue, if_unused, if_empty)
280

  
281
    def exchange_delete(self, exchange, if_unused=True):
282
        self.channel.exchange.delete(exchange, if_unused)
283

  
284
    def basic_class(self):
285
        pass
286

  
287

  
288
class AMQPConnectionError():
289
    pass
b/snf-common/synnefo/lib/amqp/amqp_puka.py
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
""" Module implementing connection and communication with an AMQP broker.
35

  
36
AMQP Client's implemented by this module silenty detect connection failures and
37
try to reconnect to any available broker. Also publishing takes advantage of
38
publisher-confirms in order to guarantee that messages are properly delivered
39
to the broker.
40

  
41
"""
42

  
43
import logging
44

  
45
from puka import Client
46
from puka import spec_exceptions
47
import socket
48
from socket import error as socket_error
49
from time import sleep
50
from random import shuffle
51
from functools import wraps
52
from ordereddict import OrderedDict
53
from synnefo import settings
54

  
55

  
56
def reconnect_decorator(func):
57
    """
58
    Decorator for persistent connection with one or more AMQP brokers.
59

  
60
    """
61
    @wraps(func)
62
    def wrapper(self, *args, **kwargs):
63
        try:
64
            return func(self, *args, **kwargs)
65
        except (socket_error, spec_exceptions.ConnectionForced) as e:
66
            self.log.error('Connection Closed while in %s: %s', func.__name__,
67
                           e)
68
            self.connect()
69

  
70
    return wrapper
71

  
72

  
73
class AMQPPukaClient(object):
74
    """
75
    AMQP generic client implementing most of the basic AMQP operations.
76

  
77
    """
78
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
79
                 confirms=True, confirm_buffer=100, logger=None):
80
        """
81
        Format hosts as "amqp://username:pass@host:port"
82
        max_retries=0 defaults to unlimited retries
83

  
84
        """
85

  
86
        self.hosts = hosts
87
        shuffle(self.hosts)
88

  
89
        self.max_retries = max_retries
90
        self.confirms = confirms
91
        self.confirm_buffer = confirm_buffer
92

  
93
        self.connection = None
94
        self.channel = None
95
        self.consumers = {}
96
        self.unacked = OrderedDict()
97
        self.unsend = OrderedDict()
98
        self.consume_promises = []
99
        self.exchanges = []
100
        if logger:
101
            self.log = logger
102
        else:
103
            logger = logging.getLogger("amqp")
104
            logging.basicConfig()
105
            self.log = logger
106

  
107
    def connect(self, retries=0):
108
        if self.max_retries and retries >= self.max_retries:
109
            self.log.error("Aborting after %d retries", retries)
110
            raise AMQPConnectionError('Aborting after %d connection failures.'
111
                                      % retries)
112
            return
113

  
114
        # Pick up a host
115
        host = self.hosts.pop()
116
        self.hosts.insert(0, host)
117

  
118
        self.client = Client(host, pubacks=self.confirms)
119

  
120
        host = host.split('@')[-1]
121
        self.log.debug('Connecting to node %s' % host)
122

  
123
        try:
124
            promise = self.client.connect()
125
            self.client.wait(promise)
126
        except socket_error as e:
127
            if retries < len(self.hosts):
128
                self.log.warning('Cannot connect to host %s: %s', host, e)
129
            else:
130
                self.log.error('Cannot connect to host %s: %s', host, e)
131
                sleep(1)
132
            return self.connect(retries + 1)
133

  
134
        self.log.info('Successfully connected to host: %s', host)
135

  
136
        # Setup TCP keepalive option
137
        self.client.sd.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
138
        # Keepalive time
139
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 20)
140
        # Keepalive interval
141
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 2)
142
        # Keepalive retry
143
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 10)
144

  
145
        self.log.info('Creating channel')
146

  
147
        # Clear consume_promises each time connecting, since they are related
148
        # to the connection object
149
        self.consume_promises = []
150

  
151
        if self.unacked:
152
            self._resend_unacked_messages()
153

  
154
        if self.unsend:
155
            self._resend_unsend_messages()
156

  
157
        if self.consumers:
158
            for queue, callback in self.consumers.items():
159
                self.basic_consume(queue, callback)
160

  
161
        if self.exchanges:
162
            exchanges = self.exchanges
163
            self.exchanges = []
164
            for exchange, type in exchanges:
165
                self.exchange_declare(exchange, type)
166

  
167
    @reconnect_decorator
168
    def reconnect(self):
169
        self.close()
170
        self.connect()
171

  
172
    def exchange_declare(self, exchange, type='direct'):
173
        """Declare an exchange
174
        @type exchange_name: string
175
        @param exchange_name: name of the exchange
176
        @type exchange_type: string
177
        @param exhange_type: one of 'direct', 'topic', 'fanout'
178

  
179
        """
180
        self.log.info('Declaring %s exchange: %s', type, exchange)
181
        promise = self.client.exchange_declare(exchange=exchange,
182
                                               type=type,
183
                                               durable=True,
184
                                               auto_delete=False)
185
        self.client.wait(promise)
186
        self.exchanges.append((exchange, type))
187

  
188
    @reconnect_decorator
189
    def queue_declare(self, queue, exclusive=False,
190
                      mirrored=True, mirrored_nodes='all',
191
                      dead_letter_exchange=None):
192
        """Declare a queue
193

  
194
        @type queue: string
195
        @param queue: name of the queue
196
        @param mirrored: whether the queue will be mirrored to other brokers
197
        @param mirrored_nodes: the policy for the mirrored queue.
198
            Available policies:
199
                - 'all': The queue is mirrored to all nodes and the
200
                  master node is the one to which the client is
201
                  connected
202
                - a list of nodes. The queue will be mirrored only to
203
                  the specified nodes, and the master will be the
204
                  first node in the list. Node names must be provided
205
                  and not host IP. example: [node1@rabbit,node2@rabbit]
206

  
207
        """
208
        self.log.info('Declaring queue: %s', queue)
209

  
210
        if mirrored:
211
            if mirrored_nodes == 'all':
212
                arguments = {'x-ha-policy': 'all'}
213
            elif isinstance(mirrored_nodes, list):
214
                arguments = {'x-ha-policy': 'nodes',
215
                             'x-ha-policy-params': mirrored_nodes}
216
            else:
217
                raise AttributeError
218
        else:
219
            arguments = {}
220

  
221
        if dead_letter_exchange:
222
            arguments['x-dead-letter-exchange'] = dead_letter_exchange
223

  
224
        promise = self.client.queue_declare(queue=queue, durable=True,
225
                                            exclusive=exclusive,
226
                                            auto_delete=False,
227
                                            arguments=arguments)
228
        self.client.wait(promise)
229

  
230
    def queue_bind(self, queue, exchange, routing_key):
231
        self.log.debug('Binding queue %s to exchange %s with key %s'
232
                       % (queue, exchange, routing_key))
233
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
234
                                         routing_key=routing_key)
235
        self.client.wait(promise)
236

  
237
    @reconnect_decorator
238
    def basic_publish(self, exchange, routing_key, body, headers={}):
239
        """Publish a message with a specific routing key """
240
        self._publish(exchange, routing_key, body, headers)
241

  
242
        self.flush_buffer()
243

  
244
        if self.confirms and len(self.unacked) >= self.confirm_buffer:
245
            self.get_confirms()
246

  
247
    @reconnect_decorator
248
    def basic_publish_multi(self, exchange, routing_key, bodies):
249
        for body in bodies:
250
            self.unsend[body] = (exchange, routing_key)
251

  
252
        for body in bodies:
253
            self._publish(exchange, routing_key, body)
254
            self.unsend.pop(body)
255

  
256
        self.flush_buffer()
257

  
258
        if self.confirms:
259
            self.get_confirms()
260

  
261
    def _publish(self, exchange, routing_key, body, headers={}):
262
        # Persisent messages by default!
263
        headers['delivery_mode'] = 2
264
        promise = self.client.basic_publish(exchange=exchange,
265
                                            routing_key=routing_key,
266
                                            body=body, headers=headers)
267

  
268
        if self.confirms:
269
            self.unacked[promise] = (exchange, routing_key, body)
270

  
271
        return promise
272

  
273
    @reconnect_decorator
274
    def flush_buffer(self):
275
        while self.client.needs_write():
276
            self.client.on_write()
277

  
278
    @reconnect_decorator
279
    def get_confirms(self):
280
        for promise in self.unacked.keys():
281
            self.client.wait(promise)
282
            self.unacked.pop(promise)
283

  
284
    @reconnect_decorator
285
    def _resend_unacked_messages(self):
286
        """Resend unacked messages in case of a connection failure."""
287
        msgs = self.unacked.values()
288
        self.unacked.clear()
289
        for exchange, routing_key, body in msgs:
290
            self.log.debug('Resending message %s' % body)
291
            self.basic_publish(exchange, routing_key, body)
292

  
293
    @reconnect_decorator
294
    def _resend_unsend_messages(self):
295
        """Resend unsend messages in case of a connection failure."""
296
        for body in self.unsend.keys():
297
            (exchange, routing_key) = self.unsend[body]
298
            self.basic_publish(exchange, routing_key, body)
299
            self.unsend.pop(body)
300

  
301
    @reconnect_decorator
302
    def basic_consume(self, queue, callback, prefetch_count=0):
303
        """Consume from a queue.
304

  
305
        @type queue: string or list of strings
306
        @param queue: the name or list of names from the queues to consume
307
        @type callback: function
308
        @param callback: the callback function to run when a message arrives
309

  
310
        """
311
        # Store the queues and the callback
312
        self.consumers[queue] = callback
313

  
314
        def handle_delivery(promise, msg):
315
            """Hide promises and messages without body"""
316
            if 'body' in msg:
317
                callback(self, msg)
318
            else:
319
                self.log.debug("Message without body %s" % msg)
320
                raise socket_error
321

  
322
        consume_promise = \
323
            self.client.basic_consume(queue=queue,
324
                                      prefetch_count=prefetch_count,
325
                                      callback=handle_delivery)
326

  
327
        self.consume_promises.append(consume_promise)
328
        return consume_promise
329

  
330
    @reconnect_decorator
331
    def basic_wait(self, promise=None, timeout=0):
332
        """Wait for messages from the queues declared by basic_consume.
333

  
334
        This function will block until a message arrives from the queues that
335
        have been declared with basic_consume. If the optional arguments
336
        'promise' is given, only messages for this promise will be delivered.
337

  
338
        """
339
        if promise is not None:
340
            return self.client.wait(promise, timeout)
341
        else:
342
            return self.client.wait(self.consume_promises, timeout)
343

  
344
    @reconnect_decorator
345
    def basic_get(self, queue):
346
        """Get a single message from a queue.
347

  
348
        This is a non-blocking method for getting messages from a queue.
349
        It will return None if the queue is empty.
350

  
351
        """
352
        get_promise = self.client.basic_get(queue=queue)
353
        result = self.client.wait(get_promise)
354
        if 'empty' in result:
355
            # The queue is empty
356
            return None
357
        else:
358
            return result
359

  
360
    @reconnect_decorator
361
    def basic_ack(self, message):
362
        self.client.basic_ack(message)
363

  
364
    @reconnect_decorator
365
    def basic_nack(self, message):
366
        self.client.basic_ack(message)
367

  
368
    @reconnect_decorator
369
    def basic_reject(self, message, requeue=False):
370
        """Reject a message.
371

  
372
        If requeue option is False and a dead letter exchange is associated
373
        with the queue, the message will be routed to the dead letter exchange.
374

  
375
        """
376
        self.client.basic_reject(message, requeue=requeue)
377

  
378
    def close(self):
379
        """Check that messages have been send and close the connection."""
380
        self.log.debug("Closing connection to %s", self.client.host)
381
        try:
382
            if self.confirms:
383
                self.get_confirms()
384
            close_promise = self.client.close()
385
            self.client.wait(close_promise)
386
        except (socket_error, spec_exceptions.ConnectionForced) as e:
387
            self.log.error('Connection closed while closing connection:%s', e)
388

  
389
    def queue_delete(self, queue, if_unused=True, if_empty=True):
390
        """Delete a queue.
391

  
392
        Returns False if the queue does not exist
393
        """
394
        try:
395
            promise = self.client.queue_delete(queue=queue,
396
                                               if_unused=if_unused,
397
                                               if_empty=if_empty)
398
            self.client.wait(promise)
399
            return True
400
        except spec_exceptions.NotFound:
401
            self.log.info("Queue %s does not exist", queue)
402
            return False
403

  
404
    def exchange_delete(self, exchange, if_unused=True):
405
        """Delete an exchange."""
406
        try:
407

  
408
            promise = self.client.exchange_delete(exchange=exchange,
409
                                                  if_unused=if_unused)
410
            self.client.wait(promise)
411
            return True
412
        except spec_exceptions.NotFound:
413
            self.log.info("Exchange %s does not exist", exchange)
414
            return False
415

  
416
    @reconnect_decorator
417
    def basic_cancel(self, promise=None):
418
        """Cancel consuming from a queue. """
419
        if promise is not None:
420
            self.client.basic_cancel(promise)
421
        else:
422
            for promise in self.consume_promises:
423
                self.client.basic_cancel(promise)
424

  
425

  
426
class AMQPConnectionError(Exception):
427
    pass
/dev/null
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
from haigha.connections import RabbitConnection
35
from haigha.message import Message
36
from haigha import exceptions
37
from random import shuffle
38
from time import sleep
39
import logging
40
import socket
41
from synnefo import settings
42
from ordereddict import OrderedDict
43
import gevent
44
from gevent import monkey
45
from functools import wraps
46

  
47

  
48
logging.basicConfig(level=logging.INFO, format="[%(levelname)s %(asctime)s] %(message)s" )
49
logger = logging.getLogger('haigha')
50

  
51
sock_opts = {
52
  (socket.IPPROTO_TCP, socket.TCP_NODELAY): 1,
53
}
54

  
55

  
56
def reconnect_decorator(func):
57
    """
58
    Decorator for persistent connection with one or more AMQP brokers.
59

  
60
    """
61
    @wraps(func)
62
    def wrapper(self, *args, **kwargs):
63
        try:
64
            func(self, *args, **kwargs)
65
        except (socket.error, exceptions.ConnectionError) as e:
66
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
67
            self.connect()
68

  
69
    return wrapper
70

  
71

  
72
class AMQPHaighaClient():
73
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
74
                 confirms=True, confirm_buffer=200):
75
        self.hosts = hosts
76
        shuffle(self.hosts)
77

  
78
        self.max_retries = max_retries
79
        self.confirms = confirms
80
        self.confirm_buffer = confirm_buffer
81

  
82
        self.connection = None
83
        self.channel = None
84
        self.consumers = {}
85
        self.unacked = OrderedDict()
86

  
87
    def connect(self, retries=0):
88
        if retries > self.max_retries:
89
            logger.error("Aborting after %s retries", retries - 1)
90
            raise AMQPConnectionError('Aborting after %d connection failures.'\
91
                                      % (retries - 1))
92
            return
93

  
94
        # Pick up a host
95
        host = self.hosts.pop()
96
        self.hosts.insert(0, host)
97

  
98
        #Patch gevent
99
        monkey.patch_all()
100

  
101
        try:
102
            self.connection = \
103
                 RabbitConnection(logger=logger, debug=True,
104
                      user='rabbit', password='r@bb1t',
105
                      vhost='/', host=host,
106
                      heartbeat=None,
107
                      sock_opts=sock_opts,
108
                      transport='gevent')
109
        except socket.error as e:
110
            logger.error('Cannot connect to host %s: %s', host, e)
111
            if retries > 2 * len(self.hosts):
112
                sleep(1)
113
            return self.connect(retries + 1)
114

  
115
        logger.info('Successfully connected to host: %s', host)
116

  
117
        logger.info('Creating channel')
118
        self.channel = self.connection.channel()
119

  
120
        if self.confirms:
121
            self._confirm_select()
122

  
123
        if self.unacked:
124
            self._resend_unacked_messages()
125

  
126
        if self.consumers:
127
            for queue, callback in self.consumers.items():
128
                self.basic_consume(queue, callback)
129

  
130
    def exchange_declare(self, exchange, type='direct'):
131
        """Declare an exchange
132
        @type exchange_name: string
133
        @param exchange_name: name of the exchange
134
        @type exchange_type: string
135
        @param exhange_type: one of 'direct', 'topic', 'fanout'
136

  
137
        """
138

  
139
        logger.info('Declaring %s exchange: %s', type, exchange)
140
        self.channel.exchange.declare(exchange, type,
141
                                      auto_delete=False, durable=True)
142

  
143
    def queue_declare(self, queue, exclusive=False, mirrored=True,
144
                      mirrored_nodes='all'):
145
        """Declare a queue
146

  
147
        @type queue: string
148
        @param queue: name of the queue
149
        @param mirrored: whether the queue will be mirrored to other brokers
150
        @param mirrored_nodes: the policy for the mirrored queue.
151
            Available policies:
152
                - 'all': The queue is mirrored to all nodes and the
153
                  master node is the one to which the client is
154
                  connected
155
                - a list of nodes. The queue will be mirrored only to
156
                  the specified nodes, and the master will be the
157
                  first node in the list. Node names must be provided
158
                  and not host IP. example: [node1@rabbit,node2@rabbit]
159

  
160
        """
161

  
162
        logger.info('Declaring queue: %s', queue)
163
        if mirrored:
164
            if mirrored_nodes == 'all':
165
                arguments = {'x-ha-policy': 'all'}
166
            elif isinstance(mirrored_nodes, list):
167
                arguments = {'x-ha-policy': 'nodes',
168
                             'x-ha-policy-params': mirrored_nodes}
169
            else:
170
                raise AttributeError
171
        else:
172
            arguments = {}
173

  
174
        self.channel.queue.declare(queue, durable=True, exclusive=exclusive,
175
                                   auto_delete=False, arguments=arguments)
176

  
177
    def queue_bind(self, queue, exchange, routing_key):
178
        logger.info('Binding queue %s to exchange %s with key %s', queue,
179
                    exchange, routing_key)
180
        self.channel.queue.bind(queue=queue, exchange=exchange,
181
                                routing_key=routing_key)
182

  
183
    def _confirm_select(self):
184
        logger.info('Setting channel to confirm mode')
185
        self.channel.confirm.select()
186
        self.channel.basic.set_ack_listener(self._ack_received)
187
        self.channel.basic.set_nack_listener(self._nack_received)
188

  
189
    @reconnect_decorator
190
    def basic_publish(self, exchange, routing_key, body):
191
        msg = Message(body, delivery_mode=2)
192
        mid = self.channel.basic.publish(msg, exchange, routing_key)
193
        if self.confirms:
194
            self.unacked[mid] = (exchange, routing_key, body)
195
            if len(self.unacked) > self.confirm_buffer:
196
                self.get_confirms()
197

  
198
        logger.debug('Published message %s with id %s', body, mid)
199

  
200
    @reconnect_decorator
201
    def get_confirms(self):
202
        self.connection.read_frames()
203

  
204
    @reconnect_decorator
205
    def _resend_unacked_messages(self):
206
        msgs = self.unacked.values()
207
        self.unacked.clear()
208
        for exchange, routing_key, body in msgs:
209
            logger.debug('Resending message %s', body)
210
            self.basic_publish(exchange, routing_key, body)
211

  
212
    @reconnect_decorator
213
    def _ack_received(self, mid):
214
        print mid
215
        logger.debug('Received ACK for message with id %s', mid)
216
        self.unacked.pop(mid)
217

  
218
    @reconnect_decorator
219
    def _nack_received(self, mid):
220
        logger.error('Received NACK for message with id %s. Retrying.', mid)
221
        (exchange, routing_key, body) = self.unacked[mid]
222
        self.basic_publish(exchange, routing_key, body)
223

  
224
    def basic_consume(self, queue, callback):
225
        """Consume from a queue.
226

  
227
        @type queue: string or list of strings
228
        @param queue: the name or list of names from the queues to consume
229
        @type callback: function
230
        @param callback: the callback function to run when a message arrives
231

  
232
        """
233

  
234
        self.consumers[queue] = callback
235
        self.channel.basic.consume(queue, consumer=callback, no_ack=False)
236

  
237
    @reconnect_decorator
238
    def basic_wait(self):
239
        """Wait for messages from the queues declared by basic_consume.
240

  
241
        This function will block until a message arrives from the queues that
242
        have been declared with basic_consume. If the optional arguments
243
        'promise' is given, only messages for this promise will be delivered.
244

  
245
        """
246

  
247
        self.connection.read_frames()
248
        gevent.sleep(0)
249

  
250
    @reconnect_decorator
251
    def basic_get(self, queue):
252
        self.channel.basic.get(queue, no_ack=False)
253

  
254
    @reconnect_decorator
255
    def basic_ack(self, message):
256
        delivery_tag = message.delivery_info['delivery_tag']
257
        self.channel.basic.ack(delivery_tag)
258

  
259
    @reconnect_decorator
260
    def basic_nack(self, message):
261
        delivery_tag = message.delivery_info['delivery_tag']
262
        self.channel.basic.ack(delivery_tag)
263

  
264
    def close(self):
265
        try:
266
            if self.confirms:
267
                while self.unacked:
268
                    print self.unacked
269
                    self.get_confirms()
270
            self.channel.close()
271
            close_info = self.channel.close_info
272
            logger.info('Successfully closed channel. Info: %s', close_info)
273
            self.connection.close()
274
        except socket.error as e:
275
            logger.error('Connection closed while closing connection:%s',
276
                          e)
277

  
278
    def queue_delete(self, queue, if_unused=True, if_empty=True):
279
        self.channel.queue.delete(queue, if_unused, if_empty)
280

  
281
    def exchange_delete(self, exchange, if_unused=True):
282
        self.channel.exchange.delete(exchange, if_unused)
283

  
284
    def basic_class(self):
285
        pass
286

  
287

  
288
class  AMQPConnectionError():
289
    pass
/dev/null
1
# Copyright 2012 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

  
34
""" Module implementing connection and communication with an AMQP broker.
35

  
36
AMQP Client's implemented by this module silenty detect connection failures and
37
try to reconnect to any available broker. Also publishing takes advantage of
38
publisher-confirms in order to guarantee that messages are properly delivered
39
to the broker.
40

  
41
"""
42

  
43
import logging
44

  
45
from puka import Client
46
from puka import spec_exceptions
47
import socket
48
from socket import error as socket_error
49
from time import sleep
50
from random import shuffle
51
from functools import wraps
52
from ordereddict import OrderedDict
53
from synnefo import settings
54

  
55

  
56
def reconnect_decorator(func):
57
    """
58
    Decorator for persistent connection with one or more AMQP brokers.
59

  
60
    """
61
    @wraps(func)
62
    def wrapper(self, *args, **kwargs):
63
        try:
64
            return func(self, *args, **kwargs)
65
        except (socket_error, spec_exceptions.ConnectionForced) as e:
66
            self.log.error('Connection Closed while in %s: %s', func.__name__,
67
                           e)
68
            self.connect()
69

  
70
    return wrapper
71

  
72

  
73
class AMQPPukaClient(object):
74
    """
75
    AMQP generic client implementing most of the basic AMQP operations.
76

  
77
    """
78
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
79
                 confirms=True, confirm_buffer=100, logger=None):
80
        """
81
        Format hosts as "amqp://username:pass@host:port"
82
        max_retries=0 defaults to unlimited retries
83

  
84
        """
85

  
86
        self.hosts = hosts
87
        shuffle(self.hosts)
88

  
89
        self.max_retries = max_retries
90
        self.confirms = confirms
91
        self.confirm_buffer = confirm_buffer
92

  
93
        self.connection = None
94
        self.channel = None
95
        self.consumers = {}
96
        self.unacked = OrderedDict()
97
        self.unsend = OrderedDict()
98
        self.consume_promises = []
99
        self.exchanges = []
100
        if logger:
101
            self.log = logger
102
        else:
103
            logger = logging.getLogger("amqp")
104
            logging.basicConfig()
105
            self.log = logger
106

  
107
    def connect(self, retries=0):
108
        if self.max_retries and retries >= self.max_retries:
109
            self.log.error("Aborting after %d retries", retries)
110
            raise AMQPConnectionError('Aborting after %d connection failures.'
111
                                      % retries)
112
            return
113

  
114
        # Pick up a host
115
        host = self.hosts.pop()
116
        self.hosts.insert(0, host)
117

  
118
        self.client = Client(host, pubacks=self.confirms)
119

  
120
        host = host.split('@')[-1]
121
        self.log.debug('Connecting to node %s' % host)
122

  
123
        try:
124
            promise = self.client.connect()
125
            self.client.wait(promise)
126
        except socket_error as e:
127
            if retries < len(self.hosts):
128
                self.log.warning('Cannot connect to host %s: %s', host, e)
129
            else:
130
                self.log.error('Cannot connect to host %s: %s', host, e)
131
                sleep(1)
132
            return self.connect(retries + 1)
133

  
134
        self.log.info('Successfully connected to host: %s', host)
135

  
136
        # Setup TCP keepalive option
137
        self.client.sd.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
138
        # Keepalive time
139
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 20)
140
        # Keepalive interval
141
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 2)
142
        # Keepalive retry
143
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 10)
144

  
145
        self.log.info('Creating channel')
146

  
147
        # Clear consume_promises each time connecting, since they are related
148
        # to the connection object
149
        self.consume_promises = []
150

  
151
        if self.unacked:
152
            self._resend_unacked_messages()
153

  
154
        if self.unsend:
155
            self._resend_unsend_messages()
156

  
157
        if self.consumers:
158
            for queue, callback in self.consumers.items():
159
                self.basic_consume(queue, callback)
160

  
161
        if self.exchanges:
162
            exchanges = self.exchanges
163
            self.exchanges = []
164
            for exchange, type in exchanges:
165
                self.exchange_declare(exchange, type)
166

  
167
    @reconnect_decorator
168
    def reconnect(self):
169
        self.close()
170
        self.connect()
171

  
172
    def exchange_declare(self, exchange, type='direct'):
173
        """Declare an exchange
174
        @type exchange_name: string
175
        @param exchange_name: name of the exchange
176
        @type exchange_type: string
177
        @param exhange_type: one of 'direct', 'topic', 'fanout'
178

  
179
        """
180
        self.log.info('Declaring %s exchange: %s', type, exchange)
181
        promise = self.client.exchange_declare(exchange=exchange,
182
                                               type=type,
183
                                               durable=True,
184
                                               auto_delete=False)
185
        self.client.wait(promise)
186
        self.exchanges.append((exchange, type))
187

  
188
    @reconnect_decorator
189
    def queue_declare(self, queue, exclusive=False,
190
                      mirrored=True, mirrored_nodes='all',
191
                      dead_letter_exchange=None):
192
        """Declare a queue
193

  
194
        @type queue: string
195
        @param queue: name of the queue
196
        @param mirrored: whether the queue will be mirrored to other brokers
197
        @param mirrored_nodes: the policy for the mirrored queue.
198
            Available policies:
199
                - 'all': The queue is mirrored to all nodes and the
200
                  master node is the one to which the client is
201
                  connected
202
                - a list of nodes. The queue will be mirrored only to
203
                  the specified nodes, and the master will be the
204
                  first node in the list. Node names must be provided
205
                  and not host IP. example: [node1@rabbit,node2@rabbit]
206

  
207
        """
208
        self.log.info('Declaring queue: %s', queue)
209

  
210
        if mirrored:
211
            if mirrored_nodes == 'all':
212
                arguments = {'x-ha-policy': 'all'}
213
            elif isinstance(mirrored_nodes, list):
214
                arguments = {'x-ha-policy': 'nodes',
215
                             'x-ha-policy-params': mirrored_nodes}
216
            else:
217
                raise AttributeError
218
        else:
219
            arguments = {}
220

  
221
        if dead_letter_exchange:
222
            arguments['x-dead-letter-exchange'] = dead_letter_exchange
223

  
224
        promise = self.client.queue_declare(queue=queue, durable=True,
225
                                            exclusive=exclusive,
226
                                            auto_delete=False,
227
                                            arguments=arguments)
228
        self.client.wait(promise)
229

  
230
    def queue_bind(self, queue, exchange, routing_key):
231
        self.log.debug('Binding queue %s to exchange %s with key %s'
232
                       % (queue, exchange, routing_key))
233
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
234
                                         routing_key=routing_key)
235
        self.client.wait(promise)
236

  
237
    @reconnect_decorator
238
    def basic_publish(self, exchange, routing_key, body, headers={}):
239
        """Publish a message with a specific routing key """
240
        self._publish(exchange, routing_key, body, headers)
241

  
242
        self.flush_buffer()
243

  
244
        if self.confirms and len(self.unacked) >= self.confirm_buffer:
245
            self.get_confirms()
246

  
247
    @reconnect_decorator
248
    def basic_publish_multi(self, exchange, routing_key, bodies):
249
        for body in bodies:
250
            self.unsend[body] = (exchange, routing_key)
251

  
252
        for body in bodies:
253
            self._publish(exchange, routing_key, body)
254
            self.unsend.pop(body)
255

  
256
        self.flush_buffer()
257

  
258
        if self.confirms:
259
            self.get_confirms()
260

  
261
    def _publish(self, exchange, routing_key, body, headers={}):
262
        # Persisent messages by default!
263
        headers['delivery_mode'] = 2
264
        promise = self.client.basic_publish(exchange=exchange,
265
                                            routing_key=routing_key,
266
                                            body=body, headers=headers)
267

  
268
        if self.confirms:
269
            self.unacked[promise] = (exchange, routing_key, body)
270

  
271
        return promise
272

  
273
    @reconnect_decorator
274
    def flush_buffer(self):
275
        while self.client.needs_write():
276
            self.client.on_write()
277

  
278
    @reconnect_decorator
279
    def get_confirms(self):
280
        for promise in self.unacked.keys():
281
            self.client.wait(promise)
282
            self.unacked.pop(promise)
283

  
284
    @reconnect_decorator
285
    def _resend_unacked_messages(self):
286
        """Resend unacked messages in case of a connection failure."""
287
        msgs = self.unacked.values()
288
        self.unacked.clear()
289
        for exchange, routing_key, body in msgs:
290
            self.log.debug('Resending message %s' % body)
291
            self.basic_publish(exchange, routing_key, body)
292

  
293
    @reconnect_decorator
294
    def _resend_unsend_messages(self):
295
        """Resend unsend messages in case of a connection failure."""
296
        for body in self.unsend.keys():
297
            (exchange, routing_key) = self.unsend[body]
298
            self.basic_publish(exchange, routing_key, body)
299
            self.unsend.pop(body)
300

  
301
    @reconnect_decorator
302
    def basic_consume(self, queue, callback, prefetch_count=0):
303
        """Consume from a queue.
304

  
305
        @type queue: string or list of strings
306
        @param queue: the name or list of names from the queues to consume
307
        @type callback: function
308
        @param callback: the callback function to run when a message arrives
309

  
310
        """
311
        # Store the queues and the callback
312
        self.consumers[queue] = callback
313

  
314
        def handle_delivery(promise, msg):
315
            """Hide promises and messages without body"""
316
            if 'body' in msg:
317
                callback(self, msg)
318
            else:
319
                self.log.debug("Message without body %s" % msg)
320
                raise socket_error
321

  
322
        consume_promise = \
323
            self.client.basic_consume(queue=queue,
324
                                      prefetch_count=prefetch_count,
325
                                      callback=handle_delivery)
326

  
327
        self.consume_promises.append(consume_promise)
328
        return consume_promise
329

  
330
    @reconnect_decorator
331
    def basic_wait(self, promise=None, timeout=0):
332
        """Wait for messages from the queues declared by basic_consume.
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff