Revision 74d988b0 snf-common/synnefo/lib/amqp_puka.py
b/snf-common/synnefo/lib/amqp_puka.py | ||
---|---|---|
63 | 63 |
try: |
64 | 64 |
return func(self, *args, **kwargs) |
65 | 65 |
except (socket_error, spec_exceptions.ConnectionForced) as e: |
66 |
self.log.error('Connection Closed while in %s: %s', func.__name__, e) |
|
66 |
self.log.error('Connection Closed while in %s: %s', func.__name__, |
|
67 |
e) |
|
67 | 68 |
self.connect() |
68 | 69 |
|
69 | 70 |
return wrapper |
... | ... | |
106 | 107 |
def connect(self, retries=0): |
107 | 108 |
if self.max_retries and retries >= self.max_retries: |
108 | 109 |
self.log.error("Aborting after %d retries", retries) |
109 |
raise AMQPConnectionError('Aborting after %d connection failures.'\
|
|
110 |
raise AMQPConnectionError('Aborting after %d connection failures.' |
|
110 | 111 |
% retries) |
111 | 112 |
return |
112 | 113 |
|
... | ... | |
211 | 212 |
arguments = {'x-ha-policy': 'all'} |
212 | 213 |
elif isinstance(mirrored_nodes, list): |
213 | 214 |
arguments = {'x-ha-policy': 'nodes', |
214 |
'x-ha-policy-params': mirrored_nodes} |
|
215 |
'x-ha-policy-params': mirrored_nodes}
|
|
215 | 216 |
else: |
216 | 217 |
raise AttributeError |
217 | 218 |
else: |
... | ... | |
319 | 320 |
raise socket_error |
320 | 321 |
|
321 | 322 |
consume_promise = \ |
322 |
self.client.basic_consume(queue=queue,
|
|
323 |
prefetch_count=prefetch_count,
|
|
324 |
callback=handle_delivery)
|
|
323 |
self.client.basic_consume(queue=queue, |
|
324 |
prefetch_count=prefetch_count, |
|
325 |
callback=handle_delivery) |
|
325 | 326 |
|
326 | 327 |
self.consume_promises.append(consume_promise) |
327 | 328 |
return consume_promise |
Also available in: Unified diff