Revision db400d82
b/snf-common/synnefo/lib/amqp.py | ||
---|---|---|
33 | 33 |
|
34 | 34 |
""" Module implementing connection and communication with an AMQP broker. |
35 | 35 |
|
36 |
AMQP Client's implemented by this module silenty detect connection failures and
|
|
36 |
AMQP Client's instatiated by this module silenty detect connection failures and
|
|
37 | 37 |
try to reconnect to any available broker. Also publishing takes advantage of |
38 | 38 |
publisher-confirms in order to guarantee that messages are properly delivered |
39 | 39 |
to the broker. |
40 | 40 |
|
41 | 41 |
""" |
42 | 42 |
|
43 |
import puka |
|
44 |
import logging |
|
45 |
import socket |
|
46 |
|
|
47 |
from time import sleep |
|
48 |
from random import shuffle |
|
49 |
from functools import wraps |
|
50 |
|
|
51 |
from ordereddict import OrderedDict |
|
52 | 43 |
from synnefo import settings |
53 | 44 |
|
54 |
AMQP_HOSTS = settings.AMQP_HOSTS |
|
55 |
|
|
56 |
MAX_RETRIES = 20 |
|
45 |
if settings.AMQP_BACKEND == 'puka': |
|
46 |
from amqp_puka import AMQPPukaClient as Client |
|
47 |
elif settings.AMQP_BACKEND == 'haigha': |
|
48 |
from amqp_haigha import AMQPHaighaClient as Client |
|
49 |
else: |
|
50 |
raise Exception('Unknown Backend %s' % settings.AMQP_BACKEND) |
|
57 | 51 |
|
58 |
log = logging.getLogger() |
|
59 |
|
|
60 |
|
|
61 |
def reconnect_decorator(func): |
|
62 |
""" |
|
63 |
Decorator for persistent connection with one or more AMQP brokers. |
|
64 | 52 |
|
65 |
""" |
|
66 |
@wraps(func) |
|
67 |
def wrapper(self, *args, **kwargs): |
|
68 |
try: |
|
69 |
if self.client.sd is None: |
|
70 |
self.connect() |
|
71 |
return func(self, *args, **kwargs) |
|
72 |
except (socket.error, puka.spec_exceptions.ConnectionForced): |
|
73 |
log.debug("Connection closed. Reconnecting") |
|
74 |
self.connect() |
|
75 |
return wrapper(self, *args, **kwargs) |
|
76 |
except Exception: |
|
77 |
if self.client.sd is None: |
|
78 |
log.debug("Connection closed. Reconnecting") |
|
79 |
self.connect() |
|
80 |
return wrapper(self, *args, **kwargs) |
|
81 |
else: |
|
82 |
raise |
|
83 |
return wrapper |
|
84 |
|
|
85 |
|
|
86 |
class AMQPClient(): |
|
53 |
class AMQPClient(object): |
|
87 | 54 |
""" |
88 | 55 |
AMQP generic client implementing most of the basic AMQP operations. |
89 | 56 |
|
90 |
This client confirms delivery of each published message before publishing |
|
91 |
the next one, which results in low performance. Better performance can be |
|
92 |
achieved by using AMQPAsyncClient. |
|
93 |
|
|
57 |
This class will create an object of AMQPPukaClient or AMQPHaigha client |
|
58 |
depending on AMQP_BACKEND setting |
|
94 | 59 |
""" |
95 |
def __init__(self, hosts=AMQP_HOSTS, max_retries=MAX_RETRIES): |
|
96 |
"""Format hosts as "amqp://username:pass@host:port" """ |
|
97 |
# Shuffle the elements of the host list for better load balancing |
|
98 |
self.hosts = hosts |
|
99 |
shuffle(self.hosts) |
|
100 |
self.max_retries = max_retries |
|
101 |
|
|
102 |
self.promises = [] |
|
103 |
self.consume_promises = [] |
|
104 |
self.consume_info = {} |
|
105 |
|
|
106 |
def connect(self, retries=0): |
|
107 |
# Pick up a host |
|
108 |
url = self.hosts.pop() |
|
109 |
self.hosts.insert(0, url) |
|
110 |
|
|
111 |
if retries > self.max_retries: |
|
112 |
raise AMQPError("Cannot connect to any node after %s attemps" % retries) |
|
113 |
if retries > 2 * len(self.hosts): |
|
114 |
sleep(1) |
|
115 |
|
|
116 |
self.client = puka.Client(url, pubacks=True) |
|
117 |
|
|
118 |
host = url.split('@')[-1] |
|
119 |
log.debug('Connecting to node %s' % host) |
|
120 |
|
|
121 |
try: |
|
122 |
promise = self.client.connect() |
|
123 |
self.client.wait(promise) |
|
124 |
except socket.error as e: |
|
125 |
log.debug('Cannot connect to node %s.' % host) |
|
126 |
return self.connect(retries+1) |
|
127 |
|
|
128 |
@reconnect_decorator |
|
129 |
def exchange_declare(self, exchange_name, exchange_type='direct', |
|
130 |
durable=True, auto_delete=False): |
|
131 |
"""Declare an exchange |
|
132 |
@type exchange_name: string |
|
133 |
@param exchange_name: name of the exchange |
|
134 |
@type exchange_type: string |
|
135 |
@param exhange_type: one of 'direct', 'topic', 'fanout' |
|
136 |
|
|
137 |
""" |
|
138 |
log.debug('Declaring exchange %s of %s type.' |
|
139 |
%(exchange_name, exchange_type)) |
|
140 |
promise = self.client.exchange_declare(exchange=exchange_name, |
|
141 |
type=exchange_type, |
|
142 |
durable=durable, |
|
143 |
auto_delete=auto_delete) |
|
144 |
self.client.wait(promise) |
|
145 |
log.debug('Exchange %s declared succesfully ' % exchange_name) |
|
146 |
|
|
147 |
@reconnect_decorator |
|
148 |
def queue_declare(self, queue, durable=True, exclusive=False, |
|
149 |
auto_delete=False, mirrored=True, mirrored_nodes='all'): |
|
150 |
"""Declare a queue |
|
151 |
|
|
152 |
@type queue: string |
|
153 |
@param queue: name of the queue |
|
154 |
@param mirrored: whether the queue will be mirrored to other brokers |
|
155 |
@param mirrored_nodes: the policy for the mirrored queue. |
|
156 |
Available policies: |
|
157 |
- 'all': The queue is mirrored to all nodes and the |
|
158 |
master node is the one to which the client is |
|
159 |
connected |
|
160 |
- a list of nodes. The queue will be mirrored only to |
|
161 |
the specified nodes, and the master will be the |
|
162 |
first node in the list. Node names must be provided |
|
163 |
and not host IP. example: [node1@rabbit,node2@rabbit] |
|
164 |
|
|
165 |
""" |
|
166 |
|
|
167 |
log.debug('Declaring queue %s' % queue) |
|
168 |
if mirrored: |
|
169 |
if mirrored_nodes == 'all': |
|
170 |
arguments={'x-ha-policy':'all'} |
|
171 |
elif isinstance(mirrored_nodes, list): |
|
172 |
arguments={'x-ha-policy':'nodes', 'x-ha-policy-params':mirrored_nodes} |
|
173 |
else: |
|
174 |
raise AttributeError |
|
175 |
else: |
|
176 |
arguments = {} |
|
177 |
|
|
178 |
promise = self.client.queue_declare(queue=queue, durable=durable, |
|
179 |
exclusive=exclusive, |
|
180 |
auto_delete=auto_delete, |
|
181 |
arguments=arguments) |
|
182 |
self.client.wait(promise) |
|
183 |
log.debug('Queue %s declared successfully.' % queue) |
|
184 |
|
|
185 |
@reconnect_decorator |
|
186 |
def queue_bind(self, queue, exchange, routing_key): |
|
187 |
log.debug('Binding queue %s to exchange %s with key %s' |
|
188 |
% (queue, exchange, routing_key)) |
|
189 |
promise = self.client.queue_bind(exchange=exchange, queue=queue, |
|
190 |
routing_key=routing_key) |
|
191 |
self.client.wait(promise) |
|
192 |
log.debug('Binding completed successfully') |
|
193 |
|
|
194 |
def basic_publish_multi(self, exhange, routing_key, msgs, headers={}): |
|
195 |
"""Send many messages to the same exchange and with the same key """ |
|
196 |
for msg in msgs: |
|
197 |
self.basic_publish(exchange, routing_key, headers, msg) |
|
198 |
|
|
199 |
@reconnect_decorator |
|
200 |
def basic_publish(self, exchange, routing_key, body, headers={}): |
|
201 |
"""Publish a message with a specific routing key """ |
|
202 |
|
|
203 |
# Persisent messages by default! |
|
204 |
if not 'delivery_mode' in headers: |
|
205 |
headers['delivery_mode'] = 2 |
|
206 |
|
|
207 |
promise = self.client.basic_publish(exchange=exchange, |
|
208 |
routing_key=routing_key, |
|
209 |
body=body, headers=headers) |
|
210 |
self.client.wait(promise) |
|
211 |
|
|
212 |
@reconnect_decorator |
|
213 |
def basic_consume(self, queue, callback, prefetch_count=0): |
|
214 |
"""Consume from a queue. |
|
215 |
|
|
216 |
@type queue: string or list of strings |
|
217 |
@param queue: the name or list of names from the queues to consume |
|
218 |
@type callback: function |
|
219 |
@param callback: the callback function to run when a message arrives |
|
220 |
|
|
221 |
""" |
|
222 |
if isinstance(queue, str): |
|
223 |
queue = [queue] |
|
224 |
elif isinstance(queue, list): |
|
225 |
pass |
|
226 |
else: |
|
227 |
raise AttributeError |
|
228 |
|
|
229 |
# Store the queues and the callback |
|
230 |
for q in queue: |
|
231 |
self.consume_info[q] = callback |
|
232 |
|
|
233 |
def handle_delivery(promise, result): |
|
234 |
"""Hide promises and messages without body""" |
|
235 |
if 'body' in result: |
|
236 |
callback(self, result) |
|
237 |
else: |
|
238 |
log.debug("Message without body %s" % result) |
|
239 |
raise socket.error |
|
240 |
|
|
241 |
consume_promise = \ |
|
242 |
self.client.basic_consume_multi(queues=queue, |
|
243 |
prefetch_count=prefetch_count, |
|
244 |
callback=handle_delivery) |
|
245 |
self.consume_promises.append(consume_promise) |
|
246 |
return consume_promise |
|
247 |
|
|
248 |
def basic_wait(self, promise=None, timeout=0): |
|
249 |
"""Wait for messages from the queues declared by basic_consume. |
|
250 |
|
|
251 |
This function will block until a message arrives from the queues that |
|
252 |
have been declared with basic_consume. If the optional arguments |
|
253 |
'promise' is given, only messages for this promise will be delivered. |
|
254 |
|
|
255 |
""" |
|
256 |
try: |
|
257 |
if promise is not None: |
|
258 |
self.client.wait(promise, timeout) |
|
259 |
else: |
|
260 |
self.client.wait(self.consume_promises) |
|
261 |
except (socket.error, puka.spec_exceptions.ConnectionForced): |
|
262 |
log.debug('Connection closed while receiving messages.') |
|
263 |
self.consume_promises = [] |
|
264 |
try: |
|
265 |
self.client.close() |
|
266 |
except: |
|
267 |
pass |
|
268 |
self.connect() |
|
269 |
for queues, callback in self.consume_info.items(): |
|
270 |
self.basic_consume(queues, callback) |
|
271 |
self.basic_wait(timeout) |
|
272 |
except Exception as e: |
|
273 |
if self.client.sd is None: |
|
274 |
log.debug('Connection closed while receiving messages.') |
|
275 |
self.consume_promises = [] |
|
276 |
self.connect() |
|
277 |
for queues, callback in self.consume_info.items(): |
|
278 |
self.basic_consume(queues, callback) |
|
279 |
self.basic_wait(timeout) |
|
280 |
else: |
|
281 |
log.error("Exception while waiting for messages ",e) |
|
282 |
raise |
|
283 |
|
|
284 |
def basic_cancel(self, promise=None): |
|
285 |
"""Cancel consuming from a queue. """ |
|
286 |
try: |
|
287 |
if promise is not None: |
|
288 |
self.client.basic_cancel(promise) |
|
289 |
else: |
|
290 |
for promise in self.consume_promises: |
|
291 |
self.client.basic_cancel(promise) |
|
292 |
except (socket.error, puka.spec_exceptions.ConnectionForced): |
|
293 |
pass |
|
294 |
except Exception as e: |
|
295 |
if self.client.sd is None: |
|
296 |
pass; |
|
297 |
else: |
|
298 |
log.error("Exception while canceling client ",e) |
|
299 |
raise |
|
300 |
|
|
301 |
|
|
302 |
@reconnect_decorator |
|
303 |
def basic_get(self, queue): |
|
304 |
"""Get a single message from a queue. |
|
305 |
|
|
306 |
This is a non-blocking method for getting messages from a queue. |
|
307 |
It will return None if the queue is empty. |
|
308 |
|
|
309 |
""" |
|
310 |
get_promise = self.client.basic_get(queue=queue) |
|
311 |
result = self.client.wait(get_promise) |
|
312 |
if 'empty' in result: |
|
313 |
# The queue is empty |
|
314 |
return None |
|
315 |
else: |
|
316 |
return result |
|
317 |
|
|
318 |
def basic_ack(self, message): |
|
319 |
"""Acknowledge a message. """ |
|
320 |
try: |
|
321 |
self.client.basic_ack(message) |
|
322 |
return True |
|
323 |
except (socket.error, puka.spec_exceptions.ConnectionForced): |
|
324 |
return False |
|
325 |
except Exception as e: |
|
326 |
if self.client.sd is None: |
|
327 |
return False |
|
328 |
else: |
|
329 |
log.error("Exception while acknowleding message ",e) |
|
330 |
raise |
|
331 |
|
|
332 |
def close(self): |
|
333 |
"""Close the connection with the AMQP broker. """ |
|
334 |
try: |
|
335 |
close_promise = self.client.close() |
|
336 |
self.client.wait(close_promise) |
|
337 |
except (socket.error, puka.spec_exceptions.ConnectionForced): |
|
338 |
pass |
|
339 |
|
|
340 |
def queue_delete(self, queue, if_unused=False, if_empty=False): |
|
341 |
"""Delete a queue. |
|
342 |
|
|
343 |
Returns False if the queue does not exist |
|
344 |
""" |
|
345 |
try: |
|
346 |
promise = self.client.queue_delete(queue=queue, if_unused=if_unused, |
|
347 |
if_empty=if_empty) |
|
348 |
self.client.wait(promise) |
|
349 |
return True |
|
350 |
except puka.spec_exceptions.NotFound: |
|
351 |
log.debug("Queue %s does not exist", queue) |
|
352 |
return False |
|
353 |
|
|
354 |
def exchange_delete(self, exchange, if_unused=False): |
|
355 |
"""Delete an exchange.""" |
|
356 |
try: |
|
357 |
|
|
358 |
promise = self.client.exchange_delete(exchange=exchange, |
|
359 |
if_unused=if_unused) |
|
360 |
self.client.wait(promise) |
|
361 |
return True |
|
362 |
except puka.spec_exceptions.NotFound: |
|
363 |
log.debug("Exchange %s does not exist", exchange) |
|
364 |
return False |
|
365 |
|
|
366 |
|
|
367 |
|
|
368 |
class AMQPAsyncClient(AMQPClient): |
|
369 |
"""AMQP client implementing asynchronous sending of messages. |
|
370 |
|
|
371 |
This client is more efficient that AMQPClient in sending large number |
|
372 |
of messages. Messages are confirmed to be sent to the broker in batches |
|
373 |
of a size specified by the 'max_buffer' argument. |
|
374 |
|
|
375 |
Messages are kept to an internal buffer until the max_buffer messages are |
|
376 |
sent or until the connection closes. Explicit delivering of messages can be |
|
377 |
achieved by calling 'wait_for_promises' method. |
|
378 |
|
|
379 |
Always remember to close the connection, or messages may be lost. |
|
380 |
|
|
381 |
""" |
|
382 |
def __init__(self, hosts=AMQP_HOSTS, max_buffer=5000, |
|
383 |
max_retries=MAX_RETRIES): |
|
384 |
AMQPClient.__init__(self, hosts, max_retries) |
|
385 |
self.published_msgs = OrderedDict() |
|
386 |
self._promise_counter = 0 |
|
387 |
self.max_buffer=max_buffer |
|
388 |
|
|
389 |
def basic_publish_multi(self, exhange, routing_key, msgs, headers={}): |
|
390 |
while msgs: |
|
391 |
msg = msgs.pop[0] |
|
392 |
self.basic_publish(exchange, routing_key, msg, headers) |
|
393 |
|
|
394 |
def basic_publish(self, exchange, routing_key, body, headers={}): |
|
395 |
"""Publish a message. |
|
396 |
|
|
397 |
The message will not be actually published to the broker until |
|
398 |
'max_buffer' messages are published or wait_for_promises is called. |
|
399 |
|
|
400 |
""" |
|
401 |
try: |
|
402 |
if not 'delivery_mode' in headers: |
|
403 |
headers['delivery_mode'] = 2 |
|
404 |
|
|
405 |
promise = self.client.basic_publish(exchange=exchange, |
|
406 |
routing_key=routing_key, |
|
407 |
body=body, |
|
408 |
headers=headers) |
|
409 |
|
|
410 |
self._promise_counter += 1 |
|
411 |
self.published_msgs[promise] = {'exchange':exchange, |
|
412 |
'routing_key':routing_key, |
|
413 |
'body':body, |
|
414 |
'headers':headers} |
|
415 |
|
|
416 |
if self._promise_counter > self.max_buffer: |
|
417 |
self.wait_for_promises() |
|
418 |
|
|
419 |
except (socket.error, puka.spec_exceptions.ConnectionForced): |
|
420 |
log.debug('Connection closed while sending message %s.\ |
|
421 |
Reconnecting and retrying' % body) |
|
422 |
self.connect() |
|
423 |
self.basic_publish(exchange, routing_key, body, headers) |
|
424 |
return self._retry_publish_msgs() |
|
425 |
except Exception as e: |
|
426 |
if self.client.sd is None: |
|
427 |
log.debug('Connection closed while sending message %s.\ |
|
428 |
Reconnecting and retrying' % body) |
|
429 |
self.connect() |
|
430 |
self.basic_publish(exchange, routing_key, body, headers) |
|
431 |
return self._retry_publish_msgs() |
|
432 |
else: |
|
433 |
log.error("Exception while publishing message ",e) |
|
434 |
raise |
|
435 |
|
|
436 |
def wait_for_promises(self): |
|
437 |
"""Wait for confirm that all messages are sent.""" |
|
438 |
try: |
|
439 |
promises = self.published_msgs.keys() |
|
440 |
for promise in promises: |
|
441 |
self.client.wait(promise) |
|
442 |
self.published_msgs.pop(promise) |
|
443 |
self._promise_counter = 0 |
|
444 |
except (socket.error, puka.spec_exceptions.ConnectionForced): |
|
445 |
log.debug('Connection closed while waiting from promises') |
|
446 |
self.connect() |
|
447 |
self._retry_publish_msgs() |
|
448 |
except Exception as e: |
|
449 |
if self.client.sd is None: |
|
450 |
log.debug('Connection closed while waiting from promises') |
|
451 |
self.connect() |
|
452 |
self._retry_publish_msgs() |
|
453 |
else: |
|
454 |
log.error("Exception while waiting for promises ",e) |
|
455 |
raise |
|
456 |
|
|
457 |
def _retry_publish_msgs(self): |
|
458 |
"""Resend messages in case of a connection failure.""" |
|
459 |
values = self.published_msgs.values() |
|
460 |
self.published_msgs = OrderedDict() |
|
461 |
for message in values: |
|
462 |
exchange = message['exchange'] |
|
463 |
key = message['routing_key'] |
|
464 |
body = message['body'] |
|
465 |
headers = message['headers'] |
|
466 |
log.debug('Resending message %s' % body) |
|
467 |
self.basic_publish(exchange, key, body, headers) |
|
468 |
|
|
469 |
def close(self): |
|
470 |
"""Check that messages have been send and close the connection.""" |
|
471 |
try: |
|
472 |
self.wait_for_promises() |
|
473 |
close_promise = self.client.close() |
|
474 |
self.client.wait(close_promise) |
|
475 |
except (socket.error, puka.spec_exceptions.ConnectionForced): |
|
476 |
pass |
|
477 |
except Exception as e: |
|
478 |
if self.client.sd is None: |
|
479 |
pass |
|
480 |
else: |
|
481 |
log.error("Exception while closing the connection ",e) |
|
482 |
raise |
|
483 |
|
|
484 |
def flush_buffer(self): |
|
485 |
try: |
|
486 |
self.client.on_write() |
|
487 |
except (socket.error, puka.spec_exceptions.ConnectionForced): |
|
488 |
log.debug('Connection closed while clearing buffer') |
|
489 |
self.connect() |
|
490 |
return self._retry_publish_msgs() |
|
491 |
except Exception as e: |
|
492 |
if self.client.sd is None: |
|
493 |
log.debug('Connection closed while clearing buffer') |
|
494 |
self.connect() |
|
495 |
return self._retry_publish_msgs() |
|
496 |
else: |
|
497 |
log.error("Exception while clearing buffer ",e) |
|
498 |
raise |
|
499 |
|
|
500 |
class AMQPConsumer(AMQPClient): |
|
501 |
"""AMQP client implementing a consumer without callbacks. |
|
502 |
|
|
503 |
""" |
|
504 |
def __init__(self, hosts=AMQP_HOSTS, max_buffer=5000, |
|
505 |
max_retries=MAX_RETRIES): |
|
506 |
AMQPClient.__init__(self, hosts, max_retries) |
|
507 |
self.consume_queues = [] |
|
508 |
self.consume_promises = [] |
|
509 |
|
|
510 |
@reconnect_decorator |
|
511 |
def basic_consume(self, queue, prefetch_count=0): |
|
512 |
"""Consume from a queue. |
|
513 |
|
|
514 |
@type queue: string or list of strings |
|
515 |
@param queue: the name or list of names from the queues to consume |
|
516 |
@type callback: function |
|
517 |
@param callback: the callback function to run when a message arrives |
|
518 |
|
|
519 |
""" |
|
520 |
if isinstance(queue, str): |
|
521 |
queue = [queue] |
|
522 |
elif isinstance(queue, list): |
|
523 |
pass |
|
524 |
else: |
|
525 |
raise AttributeError |
|
526 |
|
|
527 |
# Store the queues and the callback |
|
528 |
for q in queue: |
|
529 |
self.consume_queues.append(q) |
|
530 |
|
|
531 |
consume_promise = \ |
|
532 |
self.client.basic_consume_multi(queues=queue, |
|
533 |
prefetch_count=prefetch_count) |
|
534 |
self.consume_promises.append(consume_promise) |
|
535 |
return consume_promise |
|
536 |
|
|
537 |
def basic_wait(self, promise=None, timeout=0): |
|
538 |
"""Wait for messages from the queues declared by basic_consume. |
|
539 |
|
|
540 |
This function will block until a message arrives from the queues that |
|
541 |
have been declared with basic_consume. If the optional arguments |
|
542 |
'promise' is given, only messages for this promise will be delivered. |
|
543 |
|
|
544 |
""" |
|
545 |
try: |
|
546 |
if promise is not None: |
|
547 |
return self.client.wait(promise, timeout) |
|
548 |
else: |
|
549 |
return self.client.wait(self.consume_promises) |
|
550 |
except (socket.error, puka.spec_exceptions.ConnectionForced): |
|
551 |
log.debug('Connection closed while receiving messages.') |
|
552 |
self.consume_promises = [] |
|
553 |
try: |
|
554 |
self.client.close() |
|
555 |
except: |
|
556 |
pass |
|
557 |
self.connect() |
|
558 |
for queues in self.consume_queues: |
|
559 |
self.basic_consume(queues) |
|
560 |
self.basic_wait(timeout) |
|
561 |
except Exception as e: |
|
562 |
if self.client.sd is None: |
|
563 |
log.debug('Connection closed while receiving messages.') |
|
564 |
self.consume_promises = [] |
|
565 |
self.connect() |
|
566 |
for queues in self.consume_queues: |
|
567 |
self.basic_consume(queues) |
|
568 |
self.basic_wait(timeout) |
|
569 |
else: |
|
570 |
log.error("Exception while waiting for messages ",e) |
|
571 |
raise |
|
572 |
|
|
573 |
|
|
574 |
class AMQPError(Exception): |
|
575 |
def __init__(self, msg): |
|
576 |
self.msg = msg |
|
577 |
def __str__(self): |
|
578 |
return repr(self.msg) |
|
60 |
def __new__(cls, *args, **kwargs): |
|
61 |
return Client(*args, **kwargs) |
b/snf-common/synnefo/lib/amqp_haigha.py | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
from haigha.connections import RabbitConnection |
|
35 |
from haigha.message import Message |
|
36 |
from haigha import exceptions |
|
37 |
from random import shuffle |
|
38 |
from time import sleep |
|
39 |
import logging |
|
40 |
import socket |
|
41 |
from synnefo import settings |
|
42 |
from ordereddict import OrderedDict |
|
43 |
import gevent |
|
44 |
from gevent import monkey |
|
45 |
from functools import wraps |
|
46 |
|
|
47 |
|
|
48 |
logging.basicConfig(level=logging.INFO, format="[%(levelname)s %(asctime)s] %(message)s" ) |
|
49 |
logger = logging.getLogger('haigha') |
|
50 |
|
|
51 |
sock_opts = { |
|
52 |
(socket.IPPROTO_TCP, socket.TCP_NODELAY): 1, |
|
53 |
} |
|
54 |
|
|
55 |
|
|
56 |
def reconnect_decorator(func): |
|
57 |
""" |
|
58 |
Decorator for persistent connection with one or more AMQP brokers. |
|
59 |
|
|
60 |
""" |
|
61 |
@wraps(func) |
|
62 |
def wrapper(self, *args, **kwargs): |
|
63 |
try: |
|
64 |
func(self, *args, **kwargs) |
|
65 |
except (socket.error, exceptions.ConnectionError) as e: |
|
66 |
logger.error('Connection Closed while in %s: %s', func.__name__, e) |
|
67 |
self.connect() |
|
68 |
|
|
69 |
return wrapper |
|
70 |
|
|
71 |
|
|
72 |
class AMQPHaighaClient(): |
|
73 |
def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30, |
|
74 |
confirms=True, confirm_buffer=200): |
|
75 |
self.hosts = hosts |
|
76 |
shuffle(self.hosts) |
|
77 |
|
|
78 |
self.max_retries = max_retries |
|
79 |
self.confirms = confirms |
|
80 |
self.confirm_buffer = confirm_buffer |
|
81 |
|
|
82 |
self.connection = None |
|
83 |
self.channel = None |
|
84 |
self.consumers = {} |
|
85 |
self.unacked = OrderedDict() |
|
86 |
|
|
87 |
def connect(self, retries=0): |
|
88 |
if retries > self.max_retries: |
|
89 |
logger.error("Aborting after %s retries", retries - 1) |
|
90 |
raise AMQPConnectionError('Aborting after %d connection failures.'\ |
|
91 |
% (retries - 1)) |
|
92 |
return |
|
93 |
|
|
94 |
# Pick up a host |
|
95 |
host = self.hosts.pop() |
|
96 |
self.hosts.insert(0, host) |
|
97 |
|
|
98 |
#Patch gevent |
|
99 |
monkey.patch_all() |
|
100 |
|
|
101 |
try: |
|
102 |
self.connection = \ |
|
103 |
RabbitConnection(logger=logger, debug=True, |
|
104 |
user='rabbit', password='r@bb1t', |
|
105 |
vhost='/', host=host, |
|
106 |
heartbeat=None, |
|
107 |
sock_opts=sock_opts, |
|
108 |
transport='gevent') |
|
109 |
except socket.error as e: |
|
110 |
logger.error('Cannot connect to host %s: %s', host, e) |
|
111 |
if retries > 2 * len(self.hosts): |
|
112 |
sleep(1) |
|
113 |
return self.connect(retries + 1) |
|
114 |
|
|
115 |
logger.info('Successfully connected to host: %s', host) |
|
116 |
|
|
117 |
logger.info('Creating channel') |
|
118 |
self.channel = self.connection.channel() |
|
119 |
|
|
120 |
if self.confirms: |
|
121 |
self._confirm_select() |
|
122 |
|
|
123 |
if self.unacked: |
|
124 |
self._resend_unacked_messages() |
|
125 |
|
|
126 |
if self.consumers: |
|
127 |
for queue, callback in self.consumers.items(): |
|
128 |
self.basic_consume(queue, callback) |
|
129 |
|
|
130 |
def exchange_declare(self, exchange, type='direct'): |
|
131 |
"""Declare an exchange |
|
132 |
@type exchange_name: string |
|
133 |
@param exchange_name: name of the exchange |
|
134 |
@type exchange_type: string |
|
135 |
@param exhange_type: one of 'direct', 'topic', 'fanout' |
|
136 |
|
|
137 |
""" |
|
138 |
|
|
139 |
logger.info('Declaring %s exchange: %s', type, exchange) |
|
140 |
self.channel.exchange.declare(exchange, type, |
|
141 |
auto_delete=False, durable=True) |
|
142 |
|
|
143 |
def queue_declare(self, queue, exclusive=False, mirrored=True, |
|
144 |
mirrored_nodes='all'): |
|
145 |
"""Declare a queue |
|
146 |
|
|
147 |
@type queue: string |
|
148 |
@param queue: name of the queue |
|
149 |
@param mirrored: whether the queue will be mirrored to other brokers |
|
150 |
@param mirrored_nodes: the policy for the mirrored queue. |
|
151 |
Available policies: |
|
152 |
- 'all': The queue is mirrored to all nodes and the |
|
153 |
master node is the one to which the client is |
|
154 |
connected |
|
155 |
- a list of nodes. The queue will be mirrored only to |
|
156 |
the specified nodes, and the master will be the |
|
157 |
first node in the list. Node names must be provided |
|
158 |
and not host IP. example: [node1@rabbit,node2@rabbit] |
|
159 |
|
|
160 |
""" |
|
161 |
|
|
162 |
logger.info('Declaring queue: %s', queue) |
|
163 |
if mirrored: |
|
164 |
if mirrored_nodes == 'all': |
|
165 |
arguments = {'x-ha-policy': 'all'} |
|
166 |
elif isinstance(mirrored_nodes, list): |
|
167 |
arguments = {'x-ha-policy': 'nodes', |
|
168 |
'x-ha-policy-params': mirrored_nodes} |
|
169 |
else: |
|
170 |
raise AttributeError |
|
171 |
else: |
|
172 |
arguments = {} |
|
173 |
|
|
174 |
self.channel.queue.declare(queue, durable=True, exclusive=exclusive, |
|
175 |
auto_delete=False, arguments=arguments) |
|
176 |
|
|
177 |
def queue_bind(self, queue, exchange, routing_key): |
|
178 |
logger.info('Binding queue %s to exchange %s with key %s', queue, |
|
179 |
exchange, routing_key) |
|
180 |
self.channel.queue.bind(queue=queue, exchange=exchange, |
|
181 |
routing_key=routing_key) |
|
182 |
|
|
183 |
def _confirm_select(self): |
|
184 |
logger.info('Setting channel to confirm mode') |
|
185 |
self.channel.confirm.select() |
|
186 |
self.channel.basic.set_ack_listener(self._ack_received) |
|
187 |
self.channel.basic.set_nack_listener(self._nack_received) |
|
188 |
|
|
189 |
@reconnect_decorator |
|
190 |
def basic_publish(self, exchange, routing_key, body): |
|
191 |
msg = Message(body, delivery_mode=2) |
|
192 |
mid = self.channel.basic.publish(msg, exchange, routing_key) |
|
193 |
if self.confirms: |
|
194 |
self.unacked[mid] = (exchange, routing_key, body) |
|
195 |
if len(self.unacked) > self.confirm_buffer: |
|
196 |
self.get_confirms() |
|
197 |
|
|
198 |
logger.debug('Published message %s with id %s', body, mid) |
|
199 |
|
|
200 |
@reconnect_decorator |
|
201 |
def get_confirms(self): |
|
202 |
self.connection.read_frames() |
|
203 |
|
|
204 |
@reconnect_decorator |
|
205 |
def _resend_unacked_messages(self): |
|
206 |
msgs = self.unacked.values() |
|
207 |
self.unacked.clear() |
|
208 |
for exchange, routing_key, body in msgs: |
|
209 |
logger.debug('Resending message %s', body) |
|
210 |
self.basic_publish(exchange, routing_key, body) |
|
211 |
|
|
212 |
@reconnect_decorator |
|
213 |
def _ack_received(self, mid): |
|
214 |
print mid |
|
215 |
logger.debug('Received ACK for message with id %s', mid) |
|
216 |
self.unacked.pop(mid) |
|
217 |
|
|
218 |
@reconnect_decorator |
|
219 |
def _nack_received(self, mid): |
|
220 |
logger.error('Received NACK for message with id %s. Retrying.', mid) |
|
221 |
(exchange, routing_key, body) = self.unacked[mid] |
|
222 |
self.basic_publish(exchange, routing_key, body) |
|
223 |
|
|
224 |
def basic_consume(self, queue, callback): |
|
225 |
"""Consume from a queue. |
|
226 |
|
|
227 |
@type queue: string or list of strings |
|
228 |
@param queue: the name or list of names from the queues to consume |
|
229 |
@type callback: function |
|
230 |
@param callback: the callback function to run when a message arrives |
|
231 |
|
|
232 |
""" |
|
233 |
|
|
234 |
self.consumers[queue] = callback |
|
235 |
self.channel.basic.consume(queue, consumer=callback, no_ack=False) |
|
236 |
|
|
237 |
@reconnect_decorator |
|
238 |
def basic_wait(self): |
|
239 |
"""Wait for messages from the queues declared by basic_consume. |
|
240 |
|
|
241 |
This function will block until a message arrives from the queues that |
|
242 |
have been declared with basic_consume. If the optional arguments |
|
243 |
'promise' is given, only messages for this promise will be delivered. |
|
244 |
|
|
245 |
""" |
|
246 |
|
|
247 |
self.connection.read_frames() |
|
248 |
gevent.sleep(0) |
|
249 |
|
|
250 |
@reconnect_decorator |
|
251 |
def basic_get(self, queue): |
|
252 |
self.channel.basic.get(queue, no_ack=False) |
|
253 |
|
|
254 |
@reconnect_decorator |
|
255 |
def basic_ack(self, message): |
|
256 |
delivery_tag = message.delivery_info['delivery_tag'] |
|
257 |
self.channel.basic.ack(delivery_tag) |
|
258 |
|
|
259 |
@reconnect_decorator |
|
260 |
def basic_nack(self, message): |
|
261 |
delivery_tag = message.delivery_info['delivery_tag'] |
|
262 |
self.channel.basic.ack(delivery_tag) |
|
263 |
|
|
264 |
def close(self): |
|
265 |
try: |
|
266 |
if self.confirms: |
|
267 |
while self.unacked: |
|
268 |
print self.unacked |
|
269 |
self.get_confirms() |
|
270 |
self.channel.close() |
|
271 |
close_info = self.channel.close_info |
|
272 |
logger.info('Successfully closed channel. Info: %s', close_info) |
|
273 |
self.connection.close() |
|
274 |
except socket.error as e: |
|
275 |
logger.error('Connection closed while closing connection:%s', |
|
276 |
e) |
|
277 |
|
|
278 |
def queue_delete(self, queue, if_unused=True, if_empty=True): |
|
279 |
self.channel.queue.delete(queue, if_unused, if_empty) |
|
280 |
|
|
281 |
def exchange_delete(self, exchange, if_unused=True): |
|
282 |
self.channel.exchange.delete(exchange, if_unused) |
|
283 |
|
|
284 |
def basic_class(self): |
|
285 |
pass |
|
286 |
|
|
287 |
|
|
288 |
class AMQPConnectionError(): |
|
289 |
pass |
b/snf-common/synnefo/lib/amqp_puka.py | ||
---|---|---|
1 |
# Copyright 2012 GRNET S.A. All rights reserved. |
|
2 |
# |
|
3 |
# Redistribution and use in source and binary forms, with or |
|
4 |
# without modification, are permitted provided that the following |
|
5 |
# conditions are met: |
|
6 |
# |
|
7 |
# 1. Redistributions of source code must retain the above |
|
8 |
# copyright notice, this list of conditions and the following |
|
9 |
# disclaimer. |
|
10 |
# |
|
11 |
# 2. Redistributions in binary form must reproduce the above |
|
12 |
# copyright notice, this list of conditions and the following |
|
13 |
# disclaimer in the documentation and/or other materials |
|
14 |
# provided with the distribution. |
|
15 |
# |
|
16 |
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
|
17 |
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
18 |
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
|
19 |
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
|
20 |
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|
21 |
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|
22 |
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
|
23 |
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
|
24 |
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
|
25 |
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
|
26 |
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
27 |
# POSSIBILITY OF SUCH DAMAGE. |
|
28 |
# |
|
29 |
# The views and conclusions contained in the software and |
|
30 |
# documentation are those of the authors and should not be |
|
31 |
# interpreted as representing official policies, either expressed |
|
32 |
# or implied, of GRNET S.A. |
|
33 |
|
|
34 |
""" Module implementing connection and communication with an AMQP broker. |
|
35 |
|
|
36 |
AMQP Client's implemented by this module silenty detect connection failures and |
|
37 |
try to reconnect to any available broker. Also publishing takes advantage of |
|
38 |
publisher-confirms in order to guarantee that messages are properly delivered |
|
39 |
to the broker. |
|
40 |
|
|
41 |
""" |
|
42 |
|
|
43 |
import logging |
|
44 |
|
|
45 |
from puka import Client |
|
46 |
from puka import spec_exceptions |
|
47 |
from socket import error as socket_error |
|
48 |
from time import sleep |
|
49 |
from random import shuffle |
|
50 |
from functools import wraps |
|
51 |
from ordereddict import OrderedDict |
|
52 |
from synnefo import settings |
|
53 |
|
|
54 |
logging.basicConfig(level=logging.DEBUG, |
|
55 |
format="[%(levelname)s %(asctime)s] %(message)s") |
|
56 |
logger = logging.getLogger() |
|
57 |
|
|
58 |
|
|
59 |
def reconnect_decorator(func): |
|
60 |
""" |
|
61 |
Decorator for persistent connection with one or more AMQP brokers. |
|
62 |
|
|
63 |
""" |
|
64 |
@wraps(func) |
|
65 |
def wrapper(self, *args, **kwargs): |
|
66 |
try: |
|
67 |
func(self, *args, **kwargs) |
|
68 |
except (socket_error, spec_exceptions.ConnectionForced) as e: |
|
69 |
logger.error('Connection Closed while in %s: %s', func.__name__, e) |
|
70 |
self.consume_promises = [] |
|
71 |
self.connect() |
|
72 |
|
|
73 |
return wrapper |
|
74 |
|
|
75 |
|
|
76 |
class AMQPPukaClient(object): |
|
77 |
""" |
|
78 |
AMQP generic client implementing most of the basic AMQP operations. |
|
79 |
|
|
80 |
""" |
|
81 |
def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30, |
|
82 |
confirms=True, confirm_buffer=100): |
|
83 |
"""Format hosts as "amqp://username:pass@host:port" """ |
|
84 |
|
|
85 |
self.hosts = hosts |
|
86 |
shuffle(self.hosts) |
|
87 |
|
|
88 |
self.max_retries = max_retries |
|
89 |
self.confirms = confirms |
|
90 |
self.confirm_buffer = confirm_buffer |
|
91 |
|
|
92 |
self.connection = None |
|
93 |
self.channel = None |
|
94 |
self.consumers = {} |
|
95 |
self.unacked = OrderedDict() |
|
96 |
self.unsend = OrderedDict() |
|
97 |
self.consume_promises = [] |
|
98 |
|
|
99 |
def connect(self, retries=0): |
|
100 |
if retries > self.max_retries: |
|
101 |
logger.error("Aborting after %s retries", retries - 1) |
|
102 |
raise AMQPConnectionError('Aborting after %d connection failures.'\ |
|
103 |
% (retries - 1)) |
|
104 |
return |
|
105 |
|
|
106 |
# Pick up a host |
|
107 |
host = self.hosts.pop() |
|
108 |
self.hosts.insert(0, host) |
|
109 |
|
|
110 |
self.client = Client(host, pubacks=self.confirms) |
|
111 |
|
|
112 |
host = host.split('@')[-1] |
|
113 |
logger.debug('Connecting to node %s' % host) |
|
114 |
|
|
115 |
try: |
|
116 |
promise = self.client.connect() |
|
117 |
self.client.wait(promise) |
|
118 |
except socket_error as e: |
|
119 |
logger.error('Cannot connect to host %s: %s', host, e) |
|
120 |
if retries > 2 * len(self.hosts): |
|
121 |
sleep(1) |
|
122 |
return self.connect(retries + 1) |
|
123 |
|
|
124 |
logger.info('Successfully connected to host: %s', host) |
|
125 |
|
|
126 |
logger.info('Creating channel') |
|
127 |
|
|
128 |
if self.unacked: |
|
129 |
self._resend_unacked_messages() |
|
130 |
|
|
131 |
if self.unsend: |
|
132 |
self._resend_unsend_messages() |
|
133 |
|
|
134 |
if self.consumers: |
|
135 |
for queue, callback in self.consumers.items(): |
|
136 |
self.basic_consume(queue, callback) |
|
137 |
|
|
138 |
def exchange_declare(self, exchange, type='direct'): |
|
139 |
"""Declare an exchange |
|
140 |
@type exchange_name: string |
|
141 |
@param exchange_name: name of the exchange |
|
142 |
@type exchange_type: string |
|
143 |
@param exhange_type: one of 'direct', 'topic', 'fanout' |
|
144 |
|
|
145 |
""" |
|
146 |
logger.info('Declaring %s exchange: %s', type, exchange) |
|
147 |
promise = self.client.exchange_declare(exchange=exchange, |
|
148 |
type=type, |
|
149 |
durable=True, |
|
150 |
auto_delete=False) |
|
151 |
self.client.wait(promise) |
|
152 |
|
|
153 |
@reconnect_decorator |
|
154 |
def queue_declare(self, queue, exclusive=False, |
|
155 |
mirrored=True, mirrored_nodes='all'): |
|
156 |
"""Declare a queue |
|
157 |
|
|
158 |
@type queue: string |
|
159 |
@param queue: name of the queue |
|
160 |
@param mirrored: whether the queue will be mirrored to other brokers |
|
161 |
@param mirrored_nodes: the policy for the mirrored queue. |
|
162 |
Available policies: |
|
163 |
- 'all': The queue is mirrored to all nodes and the |
|
164 |
master node is the one to which the client is |
|
165 |
connected |
|
166 |
- a list of nodes. The queue will be mirrored only to |
|
167 |
the specified nodes, and the master will be the |
|
168 |
first node in the list. Node names must be provided |
|
169 |
and not host IP. example: [node1@rabbit,node2@rabbit] |
|
170 |
|
|
171 |
""" |
|
172 |
logger.info('Declaring queue: %s', queue) |
|
173 |
|
|
174 |
if mirrored: |
|
175 |
if mirrored_nodes == 'all': |
|
176 |
arguments = {'x-ha-policy': 'all'} |
|
177 |
elif isinstance(mirrored_nodes, list): |
|
178 |
arguments = {'x-ha-policy': 'nodes', |
|
179 |
'x-ha-policy-params': mirrored_nodes} |
|
180 |
else: |
|
181 |
raise AttributeError |
|
182 |
else: |
|
183 |
arguments = {} |
|
184 |
|
|
185 |
promise = self.client.queue_declare(queue=queue, durable=True, |
|
186 |
exclusive=exclusive, |
|
187 |
auto_delete=False, |
|
188 |
arguments=arguments) |
|
189 |
self.client.wait(promise) |
|
190 |
|
|
191 |
def queue_bind(self, queue, exchange, routing_key): |
|
192 |
logger.debug('Binding queue %s to exchange %s with key %s' |
|
193 |
% (queue, exchange, routing_key)) |
|
194 |
promise = self.client.queue_bind(exchange=exchange, queue=queue, |
|
195 |
routing_key=routing_key) |
|
196 |
self.client.wait(promise) |
|
197 |
|
|
198 |
@reconnect_decorator |
|
199 |
def basic_publish(self, exchange, routing_key, body): |
|
200 |
"""Publish a message with a specific routing key """ |
|
201 |
self._publish(exchange, routing_key, body) |
|
202 |
|
|
203 |
self.flush_buffer() |
|
204 |
|
|
205 |
if self.confirms and len(self.unacked) >= self.confirm_buffer: |
|
206 |
self.get_confirms() |
|
207 |
|
|
208 |
@reconnect_decorator |
|
209 |
def basic_publish_multi(self, exchange, routing_key, bodies): |
|
210 |
for body in bodies: |
|
211 |
self.unsend[body] = (exchange, routing_key) |
|
212 |
|
|
213 |
for body in bodies: |
|
214 |
self._publish(exchange, routing_key, body) |
|
215 |
self.unsend.pop(body) |
|
216 |
|
|
217 |
self.flush_buffer() |
|
218 |
|
|
219 |
if self.confirms: |
|
220 |
self.get_confirms() |
|
221 |
|
|
222 |
def _publish(self, exchange, routing_key, body): |
|
223 |
# Persisent messages by default! |
|
224 |
headers = {} |
|
225 |
headers['delivery_mode'] = 2 |
|
226 |
promise = self.client.basic_publish(exchange=exchange, |
|
227 |
routing_key=routing_key, |
|
228 |
body=body, headers=headers) |
|
229 |
|
|
230 |
if self.confirms: |
|
231 |
self.unacked[promise] = (exchange, routing_key, body) |
|
232 |
|
|
233 |
return promise |
|
234 |
|
|
235 |
@reconnect_decorator |
|
236 |
def flush_buffer(self): |
|
237 |
while self.client.needs_write(): |
|
238 |
self.client.on_write() |
|
239 |
|
|
240 |
@reconnect_decorator |
|
241 |
def get_confirms(self): |
|
242 |
for promise in self.unacked.keys(): |
|
243 |
self.client.wait(promise) |
|
244 |
self.unacked.pop(promise) |
|
245 |
|
|
246 |
@reconnect_decorator |
|
247 |
def _resend_unacked_messages(self): |
|
248 |
"""Resend unacked messages in case of a connection failure.""" |
|
249 |
msgs = self.unacked.values() |
|
250 |
self.unacked.clear() |
|
251 |
for exchange, routing_key, body in msgs: |
|
252 |
logger.debug('Resending message %s' % body) |
|
253 |
self.basic_publish(exchange, routing_key, body) |
|
254 |
|
|
255 |
@reconnect_decorator |
|
256 |
def _resend_unsend_messages(self): |
|
257 |
"""Resend unsend messages in case of a connection failure.""" |
|
258 |
for body in self.unsend.keys(): |
|
259 |
(exchange, routing_key) = self.unsend[body] |
|
260 |
self.basic_publish(exchange, routing_key, body) |
|
261 |
self.unsend.pop(body) |
|
262 |
|
|
263 |
@reconnect_decorator |
|
264 |
def basic_consume(self, queue, callback, prefetch_count=0): |
|
265 |
"""Consume from a queue. |
|
266 |
|
|
267 |
@type queue: string or list of strings |
|
268 |
@param queue: the name or list of names from the queues to consume |
|
269 |
@type callback: function |
|
270 |
@param callback: the callback function to run when a message arrives |
|
271 |
|
|
272 |
""" |
|
273 |
# Store the queues and the callback |
|
274 |
self.consumers[queue] = callback |
|
275 |
|
|
276 |
def handle_delivery(promise, msg): |
|
277 |
"""Hide promises and messages without body""" |
|
278 |
if 'body' in msg: |
|
279 |
callback(self, msg) |
|
280 |
else: |
|
281 |
logger.debug("Message without body %s" % msg) |
|
282 |
raise socket_error |
|
283 |
|
|
284 |
consume_promise = \ |
|
285 |
self.client.basic_consume(queue=queue, |
|
286 |
prefetch_count=prefetch_count, |
|
287 |
callback=handle_delivery) |
|
288 |
|
|
289 |
self.consume_promises.append(consume_promise) |
|
290 |
return consume_promise |
|
291 |
|
|
292 |
@reconnect_decorator |
|
293 |
def basic_wait(self, promise=None, timeout=0): |
|
294 |
"""Wait for messages from the queues declared by basic_consume. |
|
295 |
|
|
296 |
This function will block until a message arrives from the queues that |
|
297 |
have been declared with basic_consume. If the optional arguments |
|
298 |
'promise' is given, only messages for this promise will be delivered. |
|
299 |
|
|
300 |
""" |
|
301 |
if promise is not None: |
|
302 |
self.client.wait(promise, timeout) |
|
303 |
else: |
|
304 |
self.client.wait(self.consume_promises) |
|
305 |
|
|
306 |
@reconnect_decorator |
|
307 |
def basic_get(self, queue): |
|
308 |
"""Get a single message from a queue. |
|
309 |
|
|
310 |
This is a non-blocking method for getting messages from a queue. |
|
311 |
It will return None if the queue is empty. |
|
312 |
|
|
313 |
""" |
|
314 |
get_promise = self.client.basic_get(queue=queue) |
|
315 |
result = self.client.wait(get_promise) |
|
316 |
if 'empty' in result: |
|
317 |
# The queue is empty |
|
318 |
return None |
|
319 |
else: |
|
320 |
return result |
|
321 |
|
|
322 |
@reconnect_decorator |
|
323 |
def basic_ack(self, message): |
|
324 |
self.client.basic_ack(message) |
|
325 |
|
|
326 |
@reconnect_decorator |
|
327 |
def basic_nack(self, message): |
|
328 |
#TODO: |
|
329 |
pass |
|
330 |
|
|
331 |
def close(self): |
|
332 |
"""Check that messages have been send and close the connection.""" |
|
333 |
try: |
|
334 |
if self.confirms: |
|
335 |
self.get_confirms() |
|
336 |
close_promise = self.client.close() |
|
337 |
self.client.wait(close_promise) |
|
338 |
except (socket_error, spec_exceptions.ConnectionForced) as e: |
|
339 |
logger.error('Connection closed while closing connection:%s', |
|
340 |
e) |
|
341 |
|
|
342 |
def queue_delete(self, queue, if_unused=True, if_empty=True): |
|
343 |
"""Delete a queue. |
|
344 |
|
|
345 |
Returns False if the queue does not exist |
|
346 |
""" |
|
347 |
try: |
|
348 |
promise = self.client.queue_delete(queue=queue, |
|
349 |
if_unused=if_unused, |
|
350 |
if_empty=if_empty) |
|
351 |
self.client.wait(promise) |
|
352 |
return True |
|
353 |
except spec_exceptions.NotFound: |
|
354 |
logger.info("Queue %s does not exist", queue) |
|
355 |
return False |
|
356 |
|
|
357 |
def exchange_delete(self, exchange, if_unused=True): |
|
358 |
"""Delete an exchange.""" |
|
359 |
try: |
|
360 |
|
|
361 |
promise = self.client.exchange_delete(exchange=exchange, |
|
362 |
if_unused=if_unused) |
|
363 |
self.client.wait(promise) |
|
364 |
return True |
|
365 |
except spec_exceptions.NotFound: |
|
366 |
logger.info("Exchange %s does not exist", exchange) |
|
367 |
return False |
|
368 |
|
|
369 |
@reconnect_decorator |
|
370 |
def basic_cancel(self, promise=None): |
|
371 |
"""Cancel consuming from a queue. """ |
|
372 |
if promise is not None: |
|
373 |
self.client.basic_cancel(promise) |
|
374 |
else: |
|
375 |
for promise in self.consume_promises: |
|
376 |
self.client.basic_cancel(promise) |
|
377 |
|
|
378 |
|
|
379 |
class AMQPConnectionError(): |
|
380 |
pass |
b/snf-cyclades-app/synnefo/app_settings/default/queues.py | ||
---|---|---|
9 | 9 |
RABBIT_PASSWORD = "rabbit-password" |
10 | 10 |
RABBIT_VHOST = "/" |
11 | 11 |
AMQP_HOSTS=["amqp://username:password@host:port"] |
12 |
# AMQP Backend Client. One of: 'puka', 'haigha' |
|
13 |
AMQP_BACKEND='puka' |
|
12 | 14 |
|
13 | 15 |
EXCHANGE_GANETI = "ganeti" # Messages from Ganeti |
14 | 16 |
EXCHANGE_CRON = "cron" # Messages from periodically triggered tasks |
b/snf-cyclades-app/synnefo/logic/dispatcher.py | ||
---|---|---|
107 | 107 |
|
108 | 108 |
# Declare queues and exchanges |
109 | 109 |
for exchange in settings.EXCHANGES: |
110 |
self.client.exchange_declare(exchange_name=exchange,
|
|
111 |
exchange_type="topic")
|
|
110 |
self.client.exchange_declare(exchange=exchange, |
|
111 |
type="topic") |
|
112 | 112 |
|
113 | 113 |
for queue in QUEUES: |
114 | 114 |
# Queues are mirrored to all RabbitMQ brokers |
115 |
self.client.queue_declare(queue=queue,mirrored=True) |
|
115 |
self.client.queue_declare(queue=queue, mirrored=True)
|
|
116 | 116 |
|
117 | 117 |
bindings = BINDINGS |
118 | 118 |
|
b/snf-cyclades-gtools/synnefo/ganeti/eventd.py | ||
---|---|---|
114 | 114 |
def __init__(self, logger): |
115 | 115 |
pyinotify.ProcessEvent.__init__(self) |
116 | 116 |
self.logger = logger |
117 |
self.client = AMQPClient() |
|
117 |
self.client = AMQPClient(confirm_buffer=100)
|
|
118 | 118 |
handler_logger.info("Attempting to connect to RabbitMQ hosts") |
119 | 119 |
self.client.connect() |
120 |
self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic') |
|
120 | 121 |
handler_logger.info("Connected succesfully") |
121 | 122 |
|
122 | 123 |
self.op_handlers = {"INSTANCE": self.process_instance_op, |
... | ... | |
179 | 180 |
self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey) |
180 | 181 |
|
181 | 182 |
# Send the message to RabbitMQ |
182 |
self.client.basic_publish(exchange=settings.EXCHANGE_GANETI, |
|
183 |
routing_key=routekey, |
|
184 |
body=msg) |
|
185 |
|
|
183 |
self.client.basic_publish(settings.EXCHANGE_GANETI, |
|
184 |
routekey, |
|
185 |
msg) |
|
186 | 186 |
|
187 | 187 |
def process_instance_op(self, op, job_id): |
188 | 188 |
""" Process OP_INSTANCE_* opcodes. |
Also available in: Unified diff