Revision a8858945 snf-common/synnefo/lib/amqp_puka.py
b/snf-common/synnefo/lib/amqp_puka.py | ||
---|---|---|
52 | 52 |
from ordereddict import OrderedDict |
53 | 53 |
from synnefo import settings |
54 | 54 |
|
55 |
logger = logging.getLogger("amqp") |
|
56 |
|
|
57 | 55 |
|
58 | 56 |
def reconnect_decorator(func): |
59 | 57 |
""" |
... | ... | |
65 | 63 |
try: |
66 | 64 |
return func(self, *args, **kwargs) |
67 | 65 |
except (socket_error, spec_exceptions.ConnectionForced) as e: |
68 |
logger.error('Connection Closed while in %s: %s', func.__name__, e)
|
|
66 |
self.log.error('Connection Closed while in %s: %s', func.__name__, e)
|
|
69 | 67 |
self.connect() |
70 | 68 |
|
71 | 69 |
return wrapper |
... | ... | |
77 | 75 |
|
78 | 76 |
""" |
79 | 77 |
def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30, |
80 |
confirms=True, confirm_buffer=100): |
|
78 |
confirms=True, confirm_buffer=100, logger=None):
|
|
81 | 79 |
""" |
82 | 80 |
Format hosts as "amqp://username:pass@host:port" |
83 | 81 |
max_retries=0 defaults to unlimited retries |
... | ... | |
98 | 96 |
self.unsend = OrderedDict() |
99 | 97 |
self.consume_promises = [] |
100 | 98 |
self.exchanges = [] |
99 |
if logger: |
|
100 |
self.log = logger |
|
101 |
else: |
|
102 |
logger = logging.getLogger("amqp") |
|
103 |
logging.basicConfig() |
|
104 |
self.log = logger |
|
101 | 105 |
|
102 | 106 |
def connect(self, retries=0): |
103 | 107 |
if self.max_retries and retries >= self.max_retries: |
104 |
logger.error("Aborting after %d retries", retries)
|
|
108 |
self.log.error("Aborting after %d retries", retries)
|
|
105 | 109 |
raise AMQPConnectionError('Aborting after %d connection failures.'\ |
106 | 110 |
% retries) |
107 | 111 |
return |
... | ... | |
113 | 117 |
self.client = Client(host, pubacks=self.confirms) |
114 | 118 |
|
115 | 119 |
host = host.split('@')[-1] |
116 |
logger.debug('Connecting to node %s' % host)
|
|
120 |
self.log.debug('Connecting to node %s' % host)
|
|
117 | 121 |
|
118 | 122 |
try: |
119 | 123 |
promise = self.client.connect() |
120 | 124 |
self.client.wait(promise) |
121 | 125 |
except socket_error as e: |
122 | 126 |
if retries < len(self.hosts): |
123 |
logger.warning('Cannot connect to host %s: %s', host, e)
|
|
127 |
self.log.warning('Cannot connect to host %s: %s', host, e)
|
|
124 | 128 |
else: |
125 |
logger.error('Cannot connect to host %s: %s', host, e)
|
|
129 |
self.log.error('Cannot connect to host %s: %s', host, e)
|
|
126 | 130 |
sleep(1) |
127 | 131 |
return self.connect(retries + 1) |
128 | 132 |
|
129 |
logger.info('Successfully connected to host: %s', host)
|
|
133 |
self.log.info('Successfully connected to host: %s', host)
|
|
130 | 134 |
|
131 | 135 |
# Setup TCP keepalive option |
132 | 136 |
self.client.sd.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) |
... | ... | |
137 | 141 |
# Keepalive retry |
138 | 142 |
self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 10) |
139 | 143 |
|
140 |
logger.info('Creating channel')
|
|
144 |
self.log.info('Creating channel')
|
|
141 | 145 |
|
142 | 146 |
# Clear consume_promises each time connecting, since they are related |
143 | 147 |
# to the connection object |
... | ... | |
172 | 176 |
@param exhange_type: one of 'direct', 'topic', 'fanout' |
173 | 177 |
|
174 | 178 |
""" |
175 |
logger.info('Declaring %s exchange: %s', type, exchange)
|
|
179 |
self.log.info('Declaring %s exchange: %s', type, exchange)
|
|
176 | 180 |
promise = self.client.exchange_declare(exchange=exchange, |
177 | 181 |
type=type, |
178 | 182 |
durable=True, |
... | ... | |
200 | 204 |
and not host IP. example: [node1@rabbit,node2@rabbit] |
201 | 205 |
|
202 | 206 |
""" |
203 |
logger.info('Declaring queue: %s', queue)
|
|
207 |
self.log.info('Declaring queue: %s', queue)
|
|
204 | 208 |
|
205 | 209 |
if mirrored: |
206 | 210 |
if mirrored_nodes == 'all': |
... | ... | |
223 | 227 |
self.client.wait(promise) |
224 | 228 |
|
225 | 229 |
def queue_bind(self, queue, exchange, routing_key): |
226 |
logger.debug('Binding queue %s to exchange %s with key %s'
|
|
227 |
% (queue, exchange, routing_key)) |
|
230 |
self.log.debug('Binding queue %s to exchange %s with key %s'
|
|
231 |
% (queue, exchange, routing_key))
|
|
228 | 232 |
promise = self.client.queue_bind(exchange=exchange, queue=queue, |
229 | 233 |
routing_key=routing_key) |
230 | 234 |
self.client.wait(promise) |
... | ... | |
282 | 286 |
msgs = self.unacked.values() |
283 | 287 |
self.unacked.clear() |
284 | 288 |
for exchange, routing_key, body in msgs: |
285 |
logger.debug('Resending message %s' % body)
|
|
289 |
self.log.debug('Resending message %s' % body)
|
|
286 | 290 |
self.basic_publish(exchange, routing_key, body) |
287 | 291 |
|
288 | 292 |
@reconnect_decorator |
... | ... | |
311 | 315 |
if 'body' in msg: |
312 | 316 |
callback(self, msg) |
313 | 317 |
else: |
314 |
logger.debug("Message without body %s" % msg)
|
|
318 |
self.log.debug("Message without body %s" % msg)
|
|
315 | 319 |
raise socket_error |
316 | 320 |
|
317 | 321 |
consume_promise = \ |
... | ... | |
372 | 376 |
|
373 | 377 |
def close(self): |
374 | 378 |
"""Check that messages have been send and close the connection.""" |
375 |
logger.debug("Closing connection to %s", self.client.host)
|
|
379 |
self.log.debug("Closing connection to %s", self.client.host)
|
|
376 | 380 |
try: |
377 | 381 |
if self.confirms: |
378 | 382 |
self.get_confirms() |
379 | 383 |
close_promise = self.client.close() |
380 | 384 |
self.client.wait(close_promise) |
381 | 385 |
except (socket_error, spec_exceptions.ConnectionForced) as e: |
382 |
logger.error('Connection closed while closing connection:%s', |
|
383 |
e) |
|
386 |
self.log.error('Connection closed while closing connection:%s', e) |
|
384 | 387 |
|
385 | 388 |
def queue_delete(self, queue, if_unused=True, if_empty=True): |
386 | 389 |
"""Delete a queue. |
... | ... | |
394 | 397 |
self.client.wait(promise) |
395 | 398 |
return True |
396 | 399 |
except spec_exceptions.NotFound: |
397 |
logger.info("Queue %s does not exist", queue)
|
|
400 |
self.log.info("Queue %s does not exist", queue)
|
|
398 | 401 |
return False |
399 | 402 |
|
400 | 403 |
def exchange_delete(self, exchange, if_unused=True): |
... | ... | |
406 | 409 |
self.client.wait(promise) |
407 | 410 |
return True |
408 | 411 |
except spec_exceptions.NotFound: |
409 |
logger.info("Exchange %s does not exist", exchange)
|
|
412 |
self.log.info("Exchange %s does not exist", exchange)
|
|
410 | 413 |
return False |
411 | 414 |
|
412 | 415 |
@reconnect_decorator |
Also available in: Unified diff