Revision 30333691
/dev/null | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
""" Module implementing connection and communication with an AMQP broker. |
|
35 |
|
|
36 |
AMQP Client's instatiated by this module silenty detect connection failures and |
|
37 |
try to reconnect to any available broker. Also publishing takes advantage of |
|
38 |
publisher-confirms in order to guarantee that messages are properly delivered |
|
39 |
to the broker. |
|
40 |
|
|
41 |
""" |
|
42 |
|
|
43 |
from synnefo import settings |
|
44 |
|
|
45 |
if settings.AMQP_BACKEND == 'puka': |
|
46 |
from amqp_puka import AMQPPukaClient as Client |
|
47 |
elif settings.AMQP_BACKEND == 'haigha': |
|
48 |
from amqp_haigha import AMQPHaighaClient as Client |
|
49 |
else: |
|
50 |
raise Exception('Unknown Backend %s' % settings.AMQP_BACKEND) |
|
51 |
|
|
52 |
|
|
53 |
class AMQPClient(object): |
|
54 |
""" |
|
55 |
AMQP generic client implementing most of the basic AMQP operations. |
|
56 |
|
|
57 |
This class will create an object of AMQPPukaClient or AMQPHaigha client |
|
58 |
depending on AMQP_BACKEND setting |
|
59 |
""" |
|
60 |
def __new__(cls, *args, **kwargs): |
|
61 |
return Client(*args, **kwargs) |
b/snf-common/synnefo/lib/amqp/__init__.py | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
""" Module implementing connection and communication with an AMQP broker. |
|
35 |
|
|
36 |
AMQP Client's instatiated by this module silenty detect connection failures and |
|
37 |
try to reconnect to any available broker. Also publishing takes advantage of |
|
38 |
publisher-confirms in order to guarantee that messages are properly delivered |
|
39 |
to the broker. |
|
40 |
|
|
41 |
""" |
|
42 |
|
|
43 |
from synnefo import settings |
|
44 |
|
|
45 |
if settings.AMQP_BACKEND == 'puka': |
|
46 |
from amqp_puka import AMQPPukaClient as Client |
|
47 |
elif settings.AMQP_BACKEND == 'haigha': |
|
48 |
from amqp_haigha import AMQPHaighaClient as Client |
|
49 |
else: |
|
50 |
raise Exception('Unknown Backend %s' % settings.AMQP_BACKEND) |
|
51 |
|
|
52 |
|
|
53 |
class AMQPClient(object): |
|
54 |
""" |
|
55 |
AMQP generic client implementing most of the basic AMQP operations. |
|
56 |
|
|
57 |
This class will create an object of AMQPPukaClient or AMQPHaigha client |
|
58 |
depending on AMQP_BACKEND setting |
|
59 |
""" |
|
60 |
def __new__(cls, *args, **kwargs): |
|
61 |
return Client(*args, **kwargs) |
b/snf-common/synnefo/lib/amqp/amqp_haigha.py | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
from haigha.connections import RabbitConnection |
|
35 |
from haigha.message import Message |
|
36 |
from haigha import exceptions |
|
37 |
from random import shuffle |
|
38 |
from time import sleep |
|
39 |
import logging |
|
40 |
import socket |
|
41 |
from synnefo import settings |
|
42 |
from ordereddict import OrderedDict |
|
43 |
import gevent |
|
44 |
from gevent import monkey |
|
45 |
from functools import wraps |
|
46 |
|
|
47 |
|
|
48 |
logging.basicConfig(level=logging.INFO, |
|
49 |
format="[%(levelname)s %(asctime)s] %(message)s") |
|
50 |
logger = logging.getLogger('haigha') |
|
51 |
|
|
52 |
sock_opts = { |
|
53 |
(socket.IPPROTO_TCP, socket.TCP_NODELAY): 1, |
|
54 |
} |
|
55 |
|
|
56 |
|
|
57 |
def reconnect_decorator(func): |
|
58 |
""" |
|
59 |
Decorator for persistent connection with one or more AMQP brokers. |
|
60 |
|
|
61 |
""" |
|
62 |
@wraps(func) |
|
63 |
def wrapper(self, *args, **kwargs): |
|
64 |
try: |
|
65 |
func(self, *args, **kwargs) |
|
66 |
except (socket.error, exceptions.ConnectionError) as e: |
|
67 |
logger.error('Connection Closed while in %s: %s', func.__name__, e) |
|
68 |
self.connect() |
|
69 |
|
|
70 |
return wrapper |
|
71 |
|
|
72 |
|
|
73 |
class AMQPHaighaClient(): |
|
74 |
def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30, |
|
75 |
confirms=True, confirm_buffer=200): |
|
76 |
self.hosts = hosts |
|
77 |
shuffle(self.hosts) |
|
78 |
|
|
79 |
self.max_retries = max_retries |
|
80 |
self.confirms = confirms |
|
81 |
self.confirm_buffer = confirm_buffer |
|
82 |
|
|
83 |
self.connection = None |
|
84 |
self.channel = None |
|
85 |
self.consumers = {} |
|
86 |
self.unacked = OrderedDict() |
|
87 |
|
|
88 |
def connect(self, retries=0): |
|
89 |
if retries > self.max_retries: |
|
90 |
logger.error("Aborting after %s retries", retries - 1) |
|
91 |
raise AMQPConnectionError('Aborting after %d connection failures.' |
|
92 |
% (retries - 1)) |
|
93 |
return |
|
94 |
|
|
95 |
# Pick up a host |
|
96 |
host = self.hosts.pop() |
|
97 |
self.hosts.insert(0, host) |
|
98 |
|
|
99 |
#Patch gevent |
|
100 |
monkey.patch_all() |
|
101 |
|
|
102 |
try: |
|
103 |
self.connection = \ |
|
104 |
RabbitConnection(logger=logger, debug=True, |
|
105 |
user='rabbit', password='r@bb1t', |
|
106 |
vhost='/', host=host, |
|
107 |
heartbeat=None, |
|
108 |
sock_opts=sock_opts, |
|
109 |
transport='gevent') |
|
110 |
except socket.error as e: |
|
111 |
logger.error('Cannot connect to host %s: %s', host, e) |
|
112 |
if retries > 2 * len(self.hosts): |
|
113 |
sleep(1) |
|
114 |
return self.connect(retries + 1) |
|
115 |
|
|
116 |
logger.info('Successfully connected to host: %s', host) |
|
117 |
|
|
118 |
logger.info('Creating channel') |
|
119 |
self.channel = self.connection.channel() |
|
120 |
|
|
121 |
if self.confirms: |
|
122 |
self._confirm_select() |
|
123 |
|
|
124 |
if self.unacked: |
|
125 |
self._resend_unacked_messages() |
|
126 |
|
|
127 |
if self.consumers: |
|
128 |
for queue, callback in self.consumers.items(): |
|
129 |
self.basic_consume(queue, callback) |
|
130 |
|
|
131 |
def exchange_declare(self, exchange, type='direct'): |
|
132 |
"""Declare an exchange |
|
133 |
@type exchange_name: string |
|
134 |
@param exchange_name: name of the exchange |
|
135 |
@type exchange_type: string |
|
136 |
@param exhange_type: one of 'direct', 'topic', 'fanout' |
|
137 |
|
|
138 |
""" |
|
139 |
|
|
140 |
logger.info('Declaring %s exchange: %s', type, exchange) |
|
141 |
self.channel.exchange.declare(exchange, type, |
|
142 |
auto_delete=False, durable=True) |
|
143 |
|
|
144 |
def queue_declare(self, queue, exclusive=False, mirrored=True, |
|
145 |
mirrored_nodes='all'): |
|
146 |
"""Declare a queue |
|
147 |
|
|
148 |
@type queue: string |
|
149 |
@param queue: name of the queue |
|
150 |
@param mirrored: whether the queue will be mirrored to other brokers |
|
151 |
@param mirrored_nodes: the policy for the mirrored queue. |
|
152 |
Available policies: |
|
153 |
- 'all': The queue is mirrored to all nodes and the |
|
154 |
master node is the one to which the client is |
|
155 |
connected |
|
156 |
- a list of nodes. The queue will be mirrored only to |
|
157 |
the specified nodes, and the master will be the |
|
158 |
first node in the list. Node names must be provided |
|
159 |
and not host IP. example: [node1@rabbit,node2@rabbit] |
|
160 |
|
|
161 |
""" |
|
162 |
|
|
163 |
logger.info('Declaring queue: %s', queue) |
|
164 |
if mirrored: |
|
165 |
if mirrored_nodes == 'all': |
|
166 |
arguments = {'x-ha-policy': 'all'} |
|
167 |
elif isinstance(mirrored_nodes, list): |
|
168 |
arguments = {'x-ha-policy': 'nodes', |
|
169 |
'x-ha-policy-params': mirrored_nodes} |
|
170 |
else: |
|
171 |
raise AttributeError |
|
172 |
else: |
|
173 |
arguments = {} |
|
174 |
|
|
175 |
self.channel.queue.declare(queue, durable=True, exclusive=exclusive, |
|
176 |
auto_delete=False, arguments=arguments) |
|
177 |
|
|
178 |
def queue_bind(self, queue, exchange, routing_key): |
|
179 |
logger.info('Binding queue %s to exchange %s with key %s', queue, |
|
180 |
exchange, routing_key) |
|
181 |
self.channel.queue.bind(queue=queue, exchange=exchange, |
|
182 |
routing_key=routing_key) |
|
183 |
|
|
184 |
def _confirm_select(self): |
|
185 |
logger.info('Setting channel to confirm mode') |
|
186 |
self.channel.confirm.select() |
|
187 |
self.channel.basic.set_ack_listener(self._ack_received) |
|
188 |
self.channel.basic.set_nack_listener(self._nack_received) |
|
189 |
|
|
190 |
@reconnect_decorator |
|
191 |
def basic_publish(self, exchange, routing_key, body): |
|
192 |
msg = Message(body, delivery_mode=2) |
|
193 |
mid = self.channel.basic.publish(msg, exchange, routing_key) |
|
194 |
if self.confirms: |
|
195 |
self.unacked[mid] = (exchange, routing_key, body) |
|
196 |
if len(self.unacked) > self.confirm_buffer: |
|
197 |
self.get_confirms() |
|
198 |
|
|
199 |
logger.debug('Published message %s with id %s', body, mid) |
|
200 |
|
|
201 |
@reconnect_decorator |
|
202 |
def get_confirms(self): |
|
203 |
self.connection.read_frames() |
|
204 |
|
|
205 |
@reconnect_decorator |
|
206 |
def _resend_unacked_messages(self): |
|
207 |
msgs = self.unacked.values() |
|
208 |
self.unacked.clear() |
|
209 |
for exchange, routing_key, body in msgs: |
|
210 |
logger.debug('Resending message %s', body) |
|
211 |
self.basic_publish(exchange, routing_key, body) |
|
212 |
|
|
213 |
@reconnect_decorator |
|
214 |
def _ack_received(self, mid): |
|
215 |
print mid |
|
216 |
logger.debug('Received ACK for message with id %s', mid) |
|
217 |
self.unacked.pop(mid) |
|
218 |
|
|
219 |
@reconnect_decorator |
|
220 |
def _nack_received(self, mid): |
|
221 |
logger.error('Received NACK for message with id %s. Retrying.', mid) |
|
222 |
(exchange, routing_key, body) = self.unacked[mid] |
|
223 |
self.basic_publish(exchange, routing_key, body) |
|
224 |
|
|
225 |
def basic_consume(self, queue, callback): |
|
226 |
"""Consume from a queue. |
|
227 |
|
|
228 |
@type queue: string or list of strings |
|
229 |
@param queue: the name or list of names from the queues to consume |
|
230 |
@type callback: function |
|
231 |
@param callback: the callback function to run when a message arrives |
|
232 |
|
|
233 |
""" |
|
234 |
|
|
235 |
self.consumers[queue] = callback |
|
236 |
self.channel.basic.consume(queue, consumer=callback, no_ack=False) |
|
237 |
|
|
238 |
@reconnect_decorator |
|
239 |
def basic_wait(self): |
|
240 |
"""Wait for messages from the queues declared by basic_consume. |
|
241 |
|
|
242 |
This function will block until a message arrives from the queues that |
|
243 |
have been declared with basic_consume. If the optional arguments |
|
244 |
'promise' is given, only messages for this promise will be delivered. |
|
245 |
|
|
246 |
""" |
|
247 |
|
|
248 |
self.connection.read_frames() |
|
249 |
gevent.sleep(0) |
|
250 |
|
|
251 |
@reconnect_decorator |
|
252 |
def basic_get(self, queue): |
|
253 |
self.channel.basic.get(queue, no_ack=False) |
|
254 |
|
|
255 |
@reconnect_decorator |
|
256 |
def basic_ack(self, message): |
|
257 |
delivery_tag = message.delivery_info['delivery_tag'] |
|
258 |
self.channel.basic.ack(delivery_tag) |
|
259 |
|
|
260 |
@reconnect_decorator |
|
261 |
def basic_nack(self, message): |
|
262 |
delivery_tag = message.delivery_info['delivery_tag'] |
|
263 |
self.channel.basic.ack(delivery_tag) |
|
264 |
|
|
265 |
def close(self): |
|
266 |
try: |
|
267 |
if self.confirms: |
|
268 |
while self.unacked: |
|
269 |
print self.unacked |
|
270 |
self.get_confirms() |
|
271 |
self.channel.close() |
|
272 |
close_info = self.channel.close_info |
|
273 |
logger.info('Successfully closed channel. Info: %s', close_info) |
|
274 |
self.connection.close() |
|
275 |
except socket.error as e: |
|
276 |
logger.error('Connection closed while closing connection:%s', e) |
|
277 |
|
|
278 |
def queue_delete(self, queue, if_unused=True, if_empty=True): |
|
279 |
self.channel.queue.delete(queue, if_unused, if_empty) |
|
280 |
|
|
281 |
def exchange_delete(self, exchange, if_unused=True): |
|
282 |
self.channel.exchange.delete(exchange, if_unused) |
|
283 |
|
|
284 |
def basic_class(self): |
|
285 |
pass |
|
286 |
|
|
287 |
|
|
288 |
class AMQPConnectionError(): |
|
289 |
pass |
b/snf-common/synnefo/lib/amqp/amqp_puka.py | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
""" Module implementing connection and communication with an AMQP broker. |
|
35 |
|
|
36 |
AMQP Client's implemented by this module silenty detect connection failures and |
|
37 |
try to reconnect to any available broker. Also publishing takes advantage of |
|
38 |
publisher-confirms in order to guarantee that messages are properly delivered |
|
39 |
to the broker. |
|
40 |
|
|
41 |
""" |
|
42 |
|
|
43 |
import logging |
|
44 |
|
|
45 |
from puka import Client |
|
46 |
from puka import spec_exceptions |
|
47 |
import socket |
|
48 |
from socket import error as socket_error |
|
49 |
from time import sleep |
|
50 |
from random import shuffle |
|
51 |
from functools import wraps |
|
52 |
from ordereddict import OrderedDict |
|
53 |
from synnefo import settings |
|
54 |
|
|
55 |
|
|
56 |
def reconnect_decorator(func): |
|
57 |
""" |
|
58 |
Decorator for persistent connection with one or more AMQP brokers. |
|
59 |
|
|
60 |
""" |
|
61 |
@wraps(func) |
|
62 |
def wrapper(self, *args, **kwargs): |
|
63 |
try: |
|
64 |
return func(self, *args, **kwargs) |
|
65 |
except (socket_error, spec_exceptions.ConnectionForced) as e: |
|
66 |
self.log.error('Connection Closed while in %s: %s', func.__name__, |
|
67 |
e) |
|
68 |
self.connect() |
|
69 |
|
|
70 |
return wrapper |
|
71 |
|
|
72 |
|
|
73 |
class AMQPPukaClient(object): |
|
74 |
""" |
|
75 |
AMQP generic client implementing most of the basic AMQP operations. |
|
76 |
|
|
77 |
""" |
|
78 |
def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30, |
|
79 |
confirms=True, confirm_buffer=100, logger=None): |
|
80 |
""" |
|
81 |
Format hosts as "amqp://username:pass@host:port" |
|
82 |
max_retries=0 defaults to unlimited retries |
|
83 |
|
|
84 |
""" |
|
85 |
|
|
86 |
self.hosts = hosts |
|
87 |
shuffle(self.hosts) |
|
88 |
|
|
89 |
self.max_retries = max_retries |
|
90 |
self.confirms = confirms |
|
91 |
self.confirm_buffer = confirm_buffer |
|
92 |
|
|
93 |
self.connection = None |
|
94 |
self.channel = None |
|
95 |
self.consumers = {} |
|
96 |
self.unacked = OrderedDict() |
|
97 |
self.unsend = OrderedDict() |
|
98 |
self.consume_promises = [] |
|
99 |
self.exchanges = [] |
|
100 |
if logger: |
|
101 |
self.log = logger |
|
102 |
else: |
|
103 |
logger = logging.getLogger("amqp") |
|
104 |
logging.basicConfig() |
|
105 |
self.log = logger |
|
106 |
|
|
107 |
def connect(self, retries=0): |
|
108 |
if self.max_retries and retries >= self.max_retries: |
|
109 |
self.log.error("Aborting after %d retries", retries) |
|
110 |
raise AMQPConnectionError('Aborting after %d connection failures.' |
|
111 |
% retries) |
|
112 |
return |
|
113 |
|
|
114 |
# Pick up a host |
|
115 |
host = self.hosts.pop() |
|
116 |
self.hosts.insert(0, host) |
|
117 |
|
|
118 |
self.client = Client(host, pubacks=self.confirms) |
|
119 |
|
|
120 |
host = host.split('@')[-1] |
|
121 |
self.log.debug('Connecting to node %s' % host) |
|
122 |
|
|
123 |
try: |
|
124 |
promise = self.client.connect() |
|
125 |
self.client.wait(promise) |
|
126 |
except socket_error as e: |
|
127 |
if retries < len(self.hosts): |
|
128 |
self.log.warning('Cannot connect to host %s: %s', host, e) |
|
129 |
else: |
|
130 |
self.log.error('Cannot connect to host %s: %s', host, e) |
|
131 |
sleep(1) |
|
132 |
return self.connect(retries + 1) |
|
133 |
|
|
134 |
self.log.info('Successfully connected to host: %s', host) |
|
135 |
|
|
136 |
# Setup TCP keepalive option |
|
137 |
self.client.sd.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
|
138 |
# Keepalive time |
|
139 |
self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 20) |
|
140 |
# Keepalive interval |
|
141 |
self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 2) |
|
142 |
# Keepalive retry |
|
143 |
self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 10) |
|
144 |
|
|
145 |
self.log.info('Creating channel') |
|
146 |
|
|
147 |
# Clear consume_promises each time connecting, since they are related |
|
148 |
# to the connection object |
|
149 |
self.consume_promises = [] |
|
150 |
|
|
151 |
if self.unacked: |
|
152 |
self._resend_unacked_messages() |
|
153 |
|
|
154 |
if self.unsend: |
|
155 |
self._resend_unsend_messages() |
|
156 |
|
|
157 |
if self.consumers: |
|
158 |
for queue, callback in self.consumers.items(): |
|
159 |
self.basic_consume(queue, callback) |
|
160 |
|
|
161 |
if self.exchanges: |
|
162 |
exchanges = self.exchanges |
|
163 |
self.exchanges = [] |
|
164 |
for exchange, type in exchanges: |
|
165 |
self.exchange_declare(exchange, type) |
|
166 |
|
|
167 |
@reconnect_decorator |
|
168 |
def reconnect(self): |
|
169 |
self.close() |
|
170 |
self.connect() |
|
171 |
|
|
172 |
def exchange_declare(self, exchange, type='direct'): |
|
173 |
"""Declare an exchange |
|
174 |
@type exchange_name: string |
|
175 |
@param exchange_name: name of the exchange |
|
176 |
@type exchange_type: string |
|
177 |
@param exhange_type: one of 'direct', 'topic', 'fanout' |
|
178 |
|
|
179 |
""" |
|
180 |
self.log.info('Declaring %s exchange: %s', type, exchange) |
|
181 |
promise = self.client.exchange_declare(exchange=exchange, |
|
182 |
type=type, |
|
183 |
durable=True, |
|
184 |
auto_delete=False) |
|
185 |
self.client.wait(promise) |
|
186 |
self.exchanges.append((exchange, type)) |
|
187 |
|
|
188 |
@reconnect_decorator |
|
189 |
def queue_declare(self, queue, exclusive=False, |
|
190 |
mirrored=True, mirrored_nodes='all', |
|
191 |
dead_letter_exchange=None): |
|
192 |
"""Declare a queue |
|
193 |
|
|
194 |
@type queue: string |
|
195 |
@param queue: name of the queue |
|
196 |
@param mirrored: whether the queue will be mirrored to other brokers |
|
197 |
@param mirrored_nodes: the policy for the mirrored queue. |
|
198 |
Available policies: |
|
199 |
- 'all': The queue is mirrored to all nodes and the |
|
200 |
master node is the one to which the client is |
|
201 |
connected |
|
202 |
- a list of nodes. The queue will be mirrored only to |
|
203 |
the specified nodes, and the master will be the |
|
204 |
first node in the list. Node names must be provided |
|
205 |
and not host IP. example: [node1@rabbit,node2@rabbit] |
|
206 |
|
|
207 |
""" |
|
208 |
self.log.info('Declaring queue: %s', queue) |
|
209 |
|
|
210 |
if mirrored: |
|
211 |
if mirrored_nodes == 'all': |
|
212 |
arguments = {'x-ha-policy': 'all'} |
|
213 |
elif isinstance(mirrored_nodes, list): |
|
214 |
arguments = {'x-ha-policy': 'nodes', |
|
215 |
'x-ha-policy-params': mirrored_nodes} |
|
216 |
else: |
|
217 |
raise AttributeError |
|
218 |
else: |
|
219 |
arguments = {} |
|
220 |
|
|
221 |
if dead_letter_exchange: |
|
222 |
arguments['x-dead-letter-exchange'] = dead_letter_exchange |
|
223 |
|
|
224 |
promise = self.client.queue_declare(queue=queue, durable=True, |
|
225 |
exclusive=exclusive, |
|
226 |
auto_delete=False, |
|
227 |
arguments=arguments) |
|
228 |
self.client.wait(promise) |
|
229 |
|
|
230 |
def queue_bind(self, queue, exchange, routing_key): |
|
231 |
self.log.debug('Binding queue %s to exchange %s with key %s' |
|
232 |
% (queue, exchange, routing_key)) |
|
233 |
promise = self.client.queue_bind(exchange=exchange, queue=queue, |
|
234 |
routing_key=routing_key) |
|
235 |
self.client.wait(promise) |
|
236 |
|
|
237 |
@reconnect_decorator |
|
238 |
def basic_publish(self, exchange, routing_key, body, headers={}): |
|
239 |
"""Publish a message with a specific routing key """ |
|
240 |
self._publish(exchange, routing_key, body, headers) |
|
241 |
|
|
242 |
self.flush_buffer() |
|
243 |
|
|
244 |
if self.confirms and len(self.unacked) >= self.confirm_buffer: |
|
245 |
self.get_confirms() |
|
246 |
|
|
247 |
@reconnect_decorator |
|
248 |
def basic_publish_multi(self, exchange, routing_key, bodies): |
|
249 |
for body in bodies: |
|
250 |
self.unsend[body] = (exchange, routing_key) |
|
251 |
|
|
252 |
for body in bodies: |
|
253 |
self._publish(exchange, routing_key, body) |
|
254 |
self.unsend.pop(body) |
|
255 |
|
|
256 |
self.flush_buffer() |
|
257 |
|
|
258 |
if self.confirms: |
|
259 |
self.get_confirms() |
|
260 |
|
|
261 |
def _publish(self, exchange, routing_key, body, headers={}): |
|
262 |
# Persisent messages by default! |
|
263 |
headers['delivery_mode'] = 2 |
|
264 |
promise = self.client.basic_publish(exchange=exchange, |
|
265 |
routing_key=routing_key, |
|
266 |
body=body, headers=headers) |
|
267 |
|
|
268 |
if self.confirms: |
|
269 |
self.unacked[promise] = (exchange, routing_key, body) |
|
270 |
|
|
271 |
return promise |
|
272 |
|
|
273 |
@reconnect_decorator |
|
274 |
def flush_buffer(self): |
|
275 |
while self.client.needs_write(): |
|
276 |
self.client.on_write() |
|
277 |
|
|
278 |
@reconnect_decorator |
|
279 |
def get_confirms(self): |
|
280 |
for promise in self.unacked.keys(): |
|
281 |
self.client.wait(promise) |
|
282 |
self.unacked.pop(promise) |
|
283 |
|
|
284 |
@reconnect_decorator |
|
285 |
def _resend_unacked_messages(self): |
|
286 |
"""Resend unacked messages in case of a connection failure.""" |
|
287 |
msgs = self.unacked.values() |
|
288 |
self.unacked.clear() |
|
289 |
for exchange, routing_key, body in msgs: |
|
290 |
self.log.debug('Resending message %s' % body) |
|
291 |
self.basic_publish(exchange, routing_key, body) |
|
292 |
|
|
293 |
@reconnect_decorator |
|
294 |
def _resend_unsend_messages(self): |
|
295 |
"""Resend unsend messages in case of a connection failure.""" |
|
296 |
for body in self.unsend.keys(): |
|
297 |
(exchange, routing_key) = self.unsend[body] |
|
298 |
self.basic_publish(exchange, routing_key, body) |
|
299 |
self.unsend.pop(body) |
|
300 |
|
|
301 |
@reconnect_decorator |
|
302 |
def basic_consume(self, queue, callback, prefetch_count=0): |
|
303 |
"""Consume from a queue. |
|
304 |
|
|
305 |
@type queue: string or list of strings |
|
306 |
@param queue: the name or list of names from the queues to consume |
|
307 |
@type callback: function |
|
308 |
@param callback: the callback function to run when a message arrives |
|
309 |
|
|
310 |
""" |
|
311 |
# Store the queues and the callback |
|
312 |
self.consumers[queue] = callback |
|
313 |
|
|
314 |
def handle_delivery(promise, msg): |
|
315 |
"""Hide promises and messages without body""" |
|
316 |
if 'body' in msg: |
|
317 |
callback(self, msg) |
|
318 |
else: |
|
319 |
self.log.debug("Message without body %s" % msg) |
|
320 |
raise socket_error |
|
321 |
|
|
322 |
consume_promise = \ |
|
323 |
self.client.basic_consume(queue=queue, |
|
324 |
prefetch_count=prefetch_count, |
|
325 |
callback=handle_delivery) |
|
326 |
|
|
327 |
self.consume_promises.append(consume_promise) |
|
328 |
return consume_promise |
|
329 |
|
|
330 |
@reconnect_decorator |
|
331 |
def basic_wait(self, promise=None, timeout=0): |
|
332 |
"""Wait for messages from the queues declared by basic_consume. |
|
333 |
|
|
334 |
This function will block until a message arrives from the queues that |
|
335 |
have been declared with basic_consume. If the optional arguments |
|
336 |
'promise' is given, only messages for this promise will be delivered. |
|
337 |
|
|
338 |
""" |
|
339 |
if promise is not None: |
|
340 |
return self.client.wait(promise, timeout) |
|
341 |
else: |
|
342 |
return self.client.wait(self.consume_promises, timeout) |
|
343 |
|
|
344 |
@reconnect_decorator |
|
345 |
def basic_get(self, queue): |
|
346 |
"""Get a single message from a queue. |
|
347 |
|
|
348 |
This is a non-blocking method for getting messages from a queue. |
|
349 |
It will return None if the queue is empty. |
|
350 |
|
|
351 |
""" |
|
352 |
get_promise = self.client.basic_get(queue=queue) |
|
353 |
result = self.client.wait(get_promise) |
|
354 |
if 'empty' in result: |
|
355 |
# The queue is empty |
|
356 |
return None |
|
357 |
else: |
|
358 |
return result |
|
359 |
|
|
360 |
@reconnect_decorator |
|
361 |
def basic_ack(self, message): |
|
362 |
self.client.basic_ack(message) |
|
363 |
|
|
364 |
@reconnect_decorator |
|
365 |
def basic_nack(self, message): |
|
366 |
self.client.basic_ack(message) |
|
367 |
|
|
368 |
@reconnect_decorator |
|
369 |
def basic_reject(self, message, requeue=False): |
|
370 |
"""Reject a message. |
|
371 |
|
|
372 |
If requeue option is False and a dead letter exchange is associated |
|
373 |
with the queue, the message will be routed to the dead letter exchange. |
|
374 |
|
|
375 |
""" |
|
376 |
self.client.basic_reject(message, requeue=requeue) |
|
377 |
|
|
378 |
def close(self): |
|
379 |
"""Check that messages have been send and close the connection.""" |
|
380 |
self.log.debug("Closing connection to %s", self.client.host) |
|
381 |
try: |
|
382 |
if self.confirms: |
|
383 |
self.get_confirms() |
|
384 |
close_promise = self.client.close() |
|
385 |
self.client.wait(close_promise) |
|
386 |
except (socket_error, spec_exceptions.ConnectionForced) as e: |
|
387 |
self.log.error('Connection closed while closing connection:%s', e) |
|
388 |
|
|
389 |
def queue_delete(self, queue, if_unused=True, if_empty=True): |
|
390 |
"""Delete a queue. |
|
391 |
|
|
392 |
Returns False if the queue does not exist |
|
393 |
""" |
|
394 |
try: |
|
395 |
promise = self.client.queue_delete(queue=queue, |
|
396 |
if_unused=if_unused, |
|
397 |
if_empty=if_empty) |
|
398 |
self.client.wait(promise) |
|
399 |
return True |
|
400 |
except spec_exceptions.NotFound: |
|
401 |
self.log.info("Queue %s does not exist", queue) |
|
402 |
return False |
|
403 |
|
|
404 |
def exchange_delete(self, exchange, if_unused=True): |
|
405 |
"""Delete an exchange.""" |
|
406 |
try: |
|
407 |
|
|
408 |
promise = self.client.exchange_delete(exchange=exchange, |
|
409 |
if_unused=if_unused) |
|
410 |
self.client.wait(promise) |
|
411 |
return True |
|
412 |
except spec_exceptions.NotFound: |
|
413 |
self.log.info("Exchange %s does not exist", exchange) |
|
414 |
return False |
|
415 |
|
|
416 |
@reconnect_decorator |
|
417 |
def basic_cancel(self, promise=None): |
|
418 |
"""Cancel consuming from a queue. """ |
|
419 |
if promise is not None: |
|
420 |
self.client.basic_cancel(promise) |
|
421 |
else: |
|
422 |
for promise in self.consume_promises: |
|
423 |
self.client.basic_cancel(promise) |
|
424 |
|
|
425 |
|
|
426 |
class AMQPConnectionError(Exception): |
|
427 |
pass |
/dev/null | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
from haigha.connections import RabbitConnection |
|
35 |
from haigha.message import Message |
|
36 |
from haigha import exceptions |
|
37 |
from random import shuffle |
|
38 |
from time import sleep |
|
39 |
import logging |
|
40 |
import socket |
|
41 |
from synnefo import settings |
|
42 |
from ordereddict import OrderedDict |
|
43 |
import gevent |
|
44 |
from gevent import monkey |
|
45 |
from functools import wraps |
|
46 |
|
|
47 |
|
|
48 |
logging.basicConfig(level=logging.INFO, format="[%(levelname)s %(asctime)s] %(message)s" ) |
|
49 |
logger = logging.getLogger('haigha') |
|
50 |
|
|
51 |
sock_opts = { |
|
52 |
(socket.IPPROTO_TCP, socket.TCP_NODELAY): 1, |
|
53 |
} |
|
54 |
|
|
55 |
|
|
56 |
def reconnect_decorator(func): |
|
57 |
""" |
|
58 |
Decorator for persistent connection with one or more AMQP brokers. |
|
59 |
|
|
60 |
""" |
|
61 |
@wraps(func) |
|
62 |
def wrapper(self, *args, **kwargs): |
|
63 |
try: |
|
64 |
func(self, *args, **kwargs) |
|
65 |
except (socket.error, exceptions.ConnectionError) as e: |
|
66 |
logger.error('Connection Closed while in %s: %s', func.__name__, e) |
|
67 |
self.connect() |
|
68 |
|
|
69 |
return wrapper |
|
70 |
|
|
71 |
|
|
72 |
class AMQPHaighaClient(): |
|
73 |
def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30, |
|
74 |
confirms=True, confirm_buffer=200): |
|
75 |
self.hosts = hosts |
|
76 |
shuffle(self.hosts) |
|
77 |
|
|
78 |
self.max_retries = max_retries |
|
79 |
self.confirms = confirms |
|
80 |
self.confirm_buffer = confirm_buffer |
|
81 |
|
|
82 |
self.connection = None |
|
83 |
self.channel = None |
|
84 |
self.consumers = {} |
|
85 |
self.unacked = OrderedDict() |
|
86 |
|
|
87 |
def connect(self, retries=0): |
|
88 |
if retries > self.max_retries: |
|
89 |
logger.error("Aborting after %s retries", retries - 1) |
|
90 |
raise AMQPConnectionError('Aborting after %d connection failures.'\ |
|
91 |
% (retries - 1)) |
|
92 |
return |
|
93 |
|
|
94 |
# Pick up a host |
|
95 |
host = self.hosts.pop() |
|
96 |
self.hosts.insert(0, host) |
|
97 |
|
|
98 |
#Patch gevent |
|
99 |
monkey.patch_all() |
|
100 |
|
|
101 |
try: |
|
102 |
self.connection = \ |
|
103 |
RabbitConnection(logger=logger, debug=True, |
|
104 |
user='rabbit', password='r@bb1t', |
|
105 |
vhost='/', host=host, |
|
106 |
heartbeat=None, |
|
107 |
sock_opts=sock_opts, |
|
108 |
transport='gevent') |
|
109 |
except socket.error as e: |
|
110 |
logger.error('Cannot connect to host %s: %s', host, e) |
|
111 |
if retries > 2 * len(self.hosts): |
|
112 |
sleep(1) |
|
113 |
return self.connect(retries + 1) |
|
114 |
|
|
115 |
logger.info('Successfully connected to host: %s', host) |
|
116 |
|
|
117 |
logger.info('Creating channel') |
|
118 |
self.channel = self.connection.channel() |
|
119 |
|
|
120 |
if self.confirms: |
|
121 |
self._confirm_select() |
|
122 |
|
|
123 |
if self.unacked: |
|
124 |
self._resend_unacked_messages() |
|
125 |
|
|
126 |
if self.consumers: |
|
127 |
for queue, callback in self.consumers.items(): |
|
128 |
self.basic_consume(queue, callback) |
|
129 |
|
|
130 |
def exchange_declare(self, exchange, type='direct'): |
|
131 |
"""Declare an exchange |
|
132 |
@type exchange_name: string |
|
133 |
@param exchange_name: name of the exchange |
|
134 |
@type exchange_type: string |
|
135 |
@param exhange_type: one of 'direct', 'topic', 'fanout' |
|
136 |
|
|
137 |
""" |
|
138 |
|
|
139 |
logger.info('Declaring %s exchange: %s', type, exchange) |
|
140 |
self.channel.exchange.declare(exchange, type, |
|
141 |
auto_delete=False, durable=True) |
|
142 |
|
|
143 |
def queue_declare(self, queue, exclusive=False, mirrored=True, |
|
144 |
mirrored_nodes='all'): |
|
145 |
"""Declare a queue |
|
146 |
|
|
147 |
@type queue: string |
|
148 |
@param queue: name of the queue |
|
149 |
@param mirrored: whether the queue will be mirrored to other brokers |
|
150 |
@param mirrored_nodes: the policy for the mirrored queue. |
|
151 |
Available policies: |
|
152 |
- 'all': The queue is mirrored to all nodes and the |
|
153 |
master node is the one to which the client is |
|
154 |
connected |
|
155 |
- a list of nodes. The queue will be mirrored only to |
|
156 |
the specified nodes, and the master will be the |
|
157 |
first node in the list. Node names must be provided |
|
158 |
and not host IP. example: [node1@rabbit,node2@rabbit] |
|
159 |
|
|
160 |
""" |
|
161 |
|
|
162 |
logger.info('Declaring queue: %s', queue) |
|
163 |
if mirrored: |
|
164 |
if mirrored_nodes == 'all': |
|
165 |
arguments = {'x-ha-policy': 'all'} |
|
166 |
elif isinstance(mirrored_nodes, list): |
|
167 |
arguments = {'x-ha-policy': 'nodes', |
|
168 |
'x-ha-policy-params': mirrored_nodes} |
|
169 |
else: |
|
170 |
raise AttributeError |
|
171 |
else: |
|
172 |
arguments = {} |
|
173 |
|
|
174 |
self.channel.queue.declare(queue, durable=True, exclusive=exclusive, |
|
175 |
auto_delete=False, arguments=arguments) |
|
176 |
|
|
177 |
def queue_bind(self, queue, exchange, routing_key): |
|
178 |
logger.info('Binding queue %s to exchange %s with key %s', queue, |
|
179 |
exchange, routing_key) |
|
180 |
self.channel.queue.bind(queue=queue, exchange=exchange, |
|
181 |
routing_key=routing_key) |
|
182 |
|
|
183 |
def _confirm_select(self): |
|
184 |
logger.info('Setting channel to confirm mode') |
|
185 |
self.channel.confirm.select() |
|
186 |
self.channel.basic.set_ack_listener(self._ack_received) |
|
187 |
self.channel.basic.set_nack_listener(self._nack_received) |
|
188 |
|
|
189 |
@reconnect_decorator |
|
190 |
def basic_publish(self, exchange, routing_key, body): |
|
191 |
msg = Message(body, delivery_mode=2) |
|
192 |
mid = self.channel.basic.publish(msg, exchange, routing_key) |
|
193 |
if self.confirms: |
|
194 |
self.unacked[mid] = (exchange, routing_key, body) |
|
195 |
if len(self.unacked) > self.confirm_buffer: |
|
196 |
self.get_confirms() |
|
197 |
|
|
198 |
logger.debug('Published message %s with id %s', body, mid) |
|
199 |
|
|
200 |
@reconnect_decorator |
|
201 |
def get_confirms(self): |
|
202 |
self.connection.read_frames() |
|
203 |
|
|
204 |
@reconnect_decorator |
|
205 |
def _resend_unacked_messages(self): |
|
206 |
msgs = self.unacked.values() |
|
207 |
self.unacked.clear() |
|
208 |
for exchange, routing_key, body in msgs: |
|
209 |
logger.debug('Resending message %s', body) |
|
210 |
self.basic_publish(exchange, routing_key, body) |
|
211 |
|
|
212 |
@reconnect_decorator |
|
213 |
def _ack_received(self, mid): |
|
214 |
print mid |
|
215 |
logger.debug('Received ACK for message with id %s', mid) |
|
216 |
self.unacked.pop(mid) |
|
217 |
|
|
218 |
@reconnect_decorator |
|
219 |
def _nack_received(self, mid): |
|
220 |
logger.error('Received NACK for message with id %s. Retrying.', mid) |
|
221 |
(exchange, routing_key, body) = self.unacked[mid] |
|
222 |
self.basic_publish(exchange, routing_key, body) |
|
223 |
|
|
224 |
def basic_consume(self, queue, callback): |
|
225 |
"""Consume from a queue. |
|
226 |
|
|
227 |
@type queue: string or list of strings |
|
228 |
@param queue: the name or list of names from the queues to consume |
|
229 |
@type callback: function |
|
230 |
@param callback: the callback function to run when a message arrives |
|
231 |
|
|
232 |
""" |
|
233 |
|
|
234 |
self.consumers[queue] = callback |
|
235 |
self.channel.basic.consume(queue, consumer=callback, no_ack=False) |
|
236 |
|
|
237 |
@reconnect_decorator |
|
238 |
def basic_wait(self): |
|
239 |
"""Wait for messages from the queues declared by basic_consume. |
|
240 |
|
|
241 |
This function will block until a message arrives from the queues that |
|
242 |
have been declared with basic_consume. If the optional arguments |
|
243 |
'promise' is given, only messages for this promise will be delivered. |
|
244 |
|
|
245 |
""" |
|
246 |
|
|
247 |
self.connection.read_frames() |
|
248 |
gevent.sleep(0) |
|
249 |
|
|
250 |
@reconnect_decorator |
|
251 |
def basic_get(self, queue): |
|
252 |
self.channel.basic.get(queue, no_ack=False) |
|
253 |
|
|
254 |
@reconnect_decorator |
|
255 |
def basic_ack(self, message): |
|
256 |
delivery_tag = message.delivery_info['delivery_tag'] |
|
257 |
self.channel.basic.ack(delivery_tag) |
|
258 |
|
|
259 |
@reconnect_decorator |
|
260 |
def basic_nack(self, message): |
|
261 |
delivery_tag = message.delivery_info['delivery_tag'] |
|
262 |
self.channel.basic.ack(delivery_tag) |
|
263 |
|
|
264 |
def close(self): |
|
265 |
try: |
|
266 |
if self.confirms: |
|
267 |
while self.unacked: |
|
268 |
print self.unacked |
|
269 |
self.get_confirms() |
|
270 |
self.channel.close() |
|
271 |
close_info = self.channel.close_info |
|
272 |
logger.info('Successfully closed channel. Info: %s', close_info) |
|
273 |
self.connection.close() |
|
274 |
except socket.error as e: |
|
275 |
logger.error('Connection closed while closing connection:%s', |
|
276 |
e) |
|
277 |
|
|
278 |
def queue_delete(self, queue, if_unused=True, if_empty=True): |
|
279 |
self.channel.queue.delete(queue, if_unused, if_empty) |
|
280 |
|
|
281 |
def exchange_delete(self, exchange, if_unused=True): |
|
282 |
self.channel.exchange.delete(exchange, if_unused) |
|
283 |
|
|
284 |
def basic_class(self): |
|
285 |
pass |
|
286 |
|
|
287 |
|
|
288 |
class AMQPConnectionError(): |
|
289 |
pass |
/dev/null | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
""" Module implementing connection and communication with an AMQP broker. |
|
35 |
|
|
36 |
AMQP Client's implemented by this module silenty detect connection failures and |
|
37 |
try to reconnect to any available broker. Also publishing takes advantage of |
|
38 |
publisher-confirms in order to guarantee that messages are properly delivered |
|
39 |
to the broker. |
|
40 |
|
|
41 |
""" |
|
42 |
|
|
43 |
import logging |
|
44 |
|
|
45 |
from puka import Client |
|
46 |
from puka import spec_exceptions |
|
47 |
import socket |
|
48 |
from socket import error as socket_error |
|
49 |
from time import sleep |
|
50 |
from random import shuffle |
|
51 |
from functools import wraps |
|
52 |
from ordereddict import OrderedDict |
|
53 |
from synnefo import settings |
|
54 |
|
|
55 |
|
|
56 |
def reconnect_decorator(func): |
|
57 |
""" |
|
58 |
Decorator for persistent connection with one or more AMQP brokers. |
|
59 |
|
|
60 |
""" |
|
61 |
@wraps(func) |
|
62 |
def wrapper(self, *args, **kwargs): |
|
63 |
try: |
|
64 |
return func(self, *args, **kwargs) |
|
65 |
except (socket_error, spec_exceptions.ConnectionForced) as e: |
|
66 |
self.log.error('Connection Closed while in %s: %s', func.__name__, |
|
67 |
e) |
|
68 |
self.connect() |
|
69 |
|
|
70 |
return wrapper |
|
71 |
|
|
72 |
|
|
73 |
class AMQPPukaClient(object): |
|
74 |
""" |
|
75 |
AMQP generic client implementing most of the basic AMQP operations. |
|
76 |
|
|
77 |
""" |
|
78 |
def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30, |
|
79 |
confirms=True, confirm_buffer=100, logger=None): |
|
80 |
""" |
|
81 |
Format hosts as "amqp://username:pass@host:port" |
|
82 |
max_retries=0 defaults to unlimited retries |
|
83 |
|
|
84 |
""" |
|
85 |
|
|
86 |
self.hosts = hosts |
|
87 |
shuffle(self.hosts) |
|
88 |
|
|
89 |
self.max_retries = max_retries |
|
90 |
self.confirms = confirms |
|
91 |
self.confirm_buffer = confirm_buffer |
|
92 |
|
|
93 |
self.connection = None |
|
94 |
self.channel = None |
|
95 |
self.consumers = {} |
|
96 |
self.unacked = OrderedDict() |
|
97 |
self.unsend = OrderedDict() |
|
98 |
self.consume_promises = [] |
|
99 |
self.exchanges = [] |
|
100 |
if logger: |
|
101 |
self.log = logger |
|
102 |
else: |
|
103 |
logger = logging.getLogger("amqp") |
|
104 |
logging.basicConfig() |
|
105 |
self.log = logger |
|
106 |
|
|
107 |
def connect(self, retries=0): |
|
108 |
if self.max_retries and retries >= self.max_retries: |
|
109 |
self.log.error("Aborting after %d retries", retries) |
|
110 |
raise AMQPConnectionError('Aborting after %d connection failures.' |
|
111 |
% retries) |
|
112 |
return |
|
113 |
|
|
114 |
# Pick up a host |
|
115 |
host = self.hosts.pop() |
|
116 |
self.hosts.insert(0, host) |
|
117 |
|
|
118 |
self.client = Client(host, pubacks=self.confirms) |
|
119 |
|
|
120 |
host = host.split('@')[-1] |
|
121 |
self.log.debug('Connecting to node %s' % host) |
|
122 |
|
|
123 |
try: |
|
124 |
promise = self.client.connect() |
|
125 |
self.client.wait(promise) |
|
126 |
except socket_error as e: |
|
127 |
if retries < len(self.hosts): |
|
128 |
self.log.warning('Cannot connect to host %s: %s', host, e) |
|
129 |
else: |
|
130 |
self.log.error('Cannot connect to host %s: %s', host, e) |
|
131 |
sleep(1) |
|
132 |
return self.connect(retries + 1) |
|
133 |
|
|
134 |
self.log.info('Successfully connected to host: %s', host) |
|
135 |
|
|
136 |
# Setup TCP keepalive option |
|
137 |
self.client.sd.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
|
138 |
# Keepalive time |
|
139 |
self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 20) |
|
140 |
# Keepalive interval |
|
141 |
self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 2) |
|
142 |
# Keepalive retry |
|
143 |
self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 10) |
|
144 |
|
|
145 |
self.log.info('Creating channel') |
|
146 |
|
|
147 |
# Clear consume_promises each time connecting, since they are related |
|
148 |
# to the connection object |
|
149 |
self.consume_promises = [] |
|
150 |
|
|
151 |
if self.unacked: |
|
152 |
self._resend_unacked_messages() |
|
153 |
|
|
154 |
if self.unsend: |
|
155 |
self._resend_unsend_messages() |
|
156 |
|
|
157 |
if self.consumers: |
|
158 |
for queue, callback in self.consumers.items(): |
|
159 |
self.basic_consume(queue, callback) |
|
160 |
|
|
161 |
if self.exchanges: |
|
162 |
exchanges = self.exchanges |
|
163 |
self.exchanges = [] |
|
164 |
for exchange, type in exchanges: |
|
165 |
self.exchange_declare(exchange, type) |
|
166 |
|
|
167 |
@reconnect_decorator |
|
168 |
def reconnect(self): |
|
169 |
self.close() |
|
170 |
self.connect() |
|
171 |
|
|
172 |
def exchange_declare(self, exchange, type='direct'): |
|
173 |
"""Declare an exchange |
|
174 |
@type exchange_name: string |
|
175 |
@param exchange_name: name of the exchange |
|
176 |
@type exchange_type: string |
|
177 |
@param exhange_type: one of 'direct', 'topic', 'fanout' |
|
178 |
|
|
179 |
""" |
|
180 |
self.log.info('Declaring %s exchange: %s', type, exchange) |
|
181 |
promise = self.client.exchange_declare(exchange=exchange, |
|
182 |
type=type, |
|
183 |
durable=True, |
|
184 |
auto_delete=False) |
|
185 |
self.client.wait(promise) |
|
186 |
self.exchanges.append((exchange, type)) |
|
187 |
|
|
188 |
@reconnect_decorator |
|
189 |
def queue_declare(self, queue, exclusive=False, |
|
190 |
mirrored=True, mirrored_nodes='all', |
|
191 |
dead_letter_exchange=None): |
|
192 |
"""Declare a queue |
|
193 |
|
|
194 |
@type queue: string |
|
195 |
@param queue: name of the queue |
|
196 |
@param mirrored: whether the queue will be mirrored to other brokers |
|
197 |
@param mirrored_nodes: the policy for the mirrored queue. |
|
198 |
Available policies: |
|
199 |
- 'all': The queue is mirrored to all nodes and the |
|
200 |
master node is the one to which the client is |
|
201 |
connected |
|
202 |
- a list of nodes. The queue will be mirrored only to |
|
203 |
the specified nodes, and the master will be the |
|
204 |
first node in the list. Node names must be provided |
|
205 |
and not host IP. example: [node1@rabbit,node2@rabbit] |
|
206 |
|
|
207 |
""" |
|
208 |
self.log.info('Declaring queue: %s', queue) |
|
209 |
|
|
210 |
if mirrored: |
|
211 |
if mirrored_nodes == 'all': |
|
212 |
arguments = {'x-ha-policy': 'all'} |
|
213 |
elif isinstance(mirrored_nodes, list): |
|
214 |
arguments = {'x-ha-policy': 'nodes', |
|
215 |
'x-ha-policy-params': mirrored_nodes} |
|
216 |
else: |
|
217 |
raise AttributeError |
|
218 |
else: |
|
219 |
arguments = {} |
|
220 |
|
|
221 |
if dead_letter_exchange: |
|
222 |
arguments['x-dead-letter-exchange'] = dead_letter_exchange |
|
223 |
|
|
224 |
promise = self.client.queue_declare(queue=queue, durable=True, |
|
225 |
exclusive=exclusive, |
|
226 |
auto_delete=False, |
|
227 |
arguments=arguments) |
|
228 |
self.client.wait(promise) |
|
229 |
|
|
230 |
def queue_bind(self, queue, exchange, routing_key): |
|
231 |
self.log.debug('Binding queue %s to exchange %s with key %s' |
|
232 |
% (queue, exchange, routing_key)) |
|
233 |
promise = self.client.queue_bind(exchange=exchange, queue=queue, |
|
234 |
routing_key=routing_key) |
|
235 |
self.client.wait(promise) |
|
236 |
|
|
237 |
@reconnect_decorator |
|
238 |
def basic_publish(self, exchange, routing_key, body, headers={}): |
|
239 |
"""Publish a message with a specific routing key """ |
|
240 |
self._publish(exchange, routing_key, body, headers) |
|
241 |
|
|
242 |
self.flush_buffer() |
|
243 |
|
|
244 |
if self.confirms and len(self.unacked) >= self.confirm_buffer: |
|
245 |
self.get_confirms() |
|
246 |
|
|
247 |
@reconnect_decorator |
|
248 |
def basic_publish_multi(self, exchange, routing_key, bodies): |
|
249 |
for body in bodies: |
|
250 |
self.unsend[body] = (exchange, routing_key) |
|
251 |
|
|
252 |
for body in bodies: |
|
253 |
self._publish(exchange, routing_key, body) |
|
254 |
self.unsend.pop(body) |
|
255 |
|
|
256 |
self.flush_buffer() |
|
257 |
|
|
258 |
if self.confirms: |
|
259 |
self.get_confirms() |
|
260 |
|
|
261 |
def _publish(self, exchange, routing_key, body, headers={}): |
|
262 |
# Persisent messages by default! |
|
263 |
headers['delivery_mode'] = 2 |
|
264 |
promise = self.client.basic_publish(exchange=exchange, |
|
265 |
routing_key=routing_key, |
|
266 |
body=body, headers=headers) |
|
267 |
|
|
268 |
if self.confirms: |
|
269 |
self.unacked[promise] = (exchange, routing_key, body) |
|
270 |
|
|
271 |
return promise |
|
272 |
|
|
273 |
@reconnect_decorator |
|
274 |
def flush_buffer(self): |
|
275 |
while self.client.needs_write(): |
|
276 |
self.client.on_write() |
|
277 |
|
|
278 |
@reconnect_decorator |
|
279 |
def get_confirms(self): |
|
280 |
for promise in self.unacked.keys(): |
|
281 |
self.client.wait(promise) |
|
282 |
self.unacked.pop(promise) |
|
283 |
|
|
284 |
@reconnect_decorator |
|
285 |
def _resend_unacked_messages(self): |
|
286 |
"""Resend unacked messages in case of a connection failure.""" |
|
287 |
msgs = self.unacked.values() |
|
288 |
self.unacked.clear() |
|
289 |
for exchange, routing_key, body in msgs: |
|
290 |
self.log.debug('Resending message %s' % body) |
|
291 |
self.basic_publish(exchange, routing_key, body) |
|
292 |
|
|
293 |
@reconnect_decorator |
|
294 |
def _resend_unsend_messages(self): |
|
295 |
"""Resend unsend messages in case of a connection failure.""" |
|
296 |
for body in self.unsend.keys(): |
|
297 |
(exchange, routing_key) = self.unsend[body] |
|
298 |
self.basic_publish(exchange, routing_key, body) |
|
299 |
self.unsend.pop(body) |
|
300 |
|
|
301 |
@reconnect_decorator |
|
302 |
def basic_consume(self, queue, callback, prefetch_count=0): |
|
303 |
"""Consume from a queue. |
|
304 |
|
|
305 |
@type queue: string or list of strings |
|
306 |
@param queue: the name or list of names from the queues to consume |
|
307 |
@type callback: function |
|
308 |
@param callback: the callback function to run when a message arrives |
|
309 |
|
|
310 |
""" |
|
311 |
# Store the queues and the callback |
|
312 |
self.consumers[queue] = callback |
|
313 |
|
|
314 |
def handle_delivery(promise, msg): |
|
315 |
"""Hide promises and messages without body""" |
|
316 |
if 'body' in msg: |
|
317 |
callback(self, msg) |
|
318 |
else: |
|
319 |
self.log.debug("Message without body %s" % msg) |
|
320 |
raise socket_error |
|
321 |
|
|
322 |
consume_promise = \ |
|
323 |
self.client.basic_consume(queue=queue, |
|
324 |
prefetch_count=prefetch_count, |
|
325 |
callback=handle_delivery) |
|
326 |
|
|
327 |
self.consume_promises.append(consume_promise) |
|
328 |
return consume_promise |
|
329 |
|
|
330 |
@reconnect_decorator |
|
331 |
def basic_wait(self, promise=None, timeout=0): |
|
332 |
"""Wait for messages from the queues declared by basic_consume. |
Also available in: Unified diff