Revision a8858945 snf-common/synnefo/lib/amqp_puka.py

b/snf-common/synnefo/lib/amqp_puka.py
52 52
from ordereddict import OrderedDict
53 53
from synnefo import settings
54 54

  
55
logger = logging.getLogger("amqp")
56

  
57 55

  
58 56
def reconnect_decorator(func):
59 57
    """
......
65 63
        try:
66 64
            return func(self, *args, **kwargs)
67 65
        except (socket_error, spec_exceptions.ConnectionForced) as e:
68
            logger.error('Connection Closed while in %s: %s', func.__name__, e)
66
            self.log.error('Connection Closed while in %s: %s', func.__name__, e)
69 67
            self.connect()
70 68

  
71 69
    return wrapper
......
77 75

  
78 76
    """
79 77
    def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
80
                 confirms=True, confirm_buffer=100):
78
                 confirms=True, confirm_buffer=100, logger=None):
81 79
        """
82 80
        Format hosts as "amqp://username:pass@host:port"
83 81
        max_retries=0 defaults to unlimited retries
......
98 96
        self.unsend = OrderedDict()
99 97
        self.consume_promises = []
100 98
        self.exchanges = []
99
        if logger:
100
            self.log = logger
101
        else:
102
            logger = logging.getLogger("amqp")
103
            logging.basicConfig()
104
            self.log = logger
101 105

  
102 106
    def connect(self, retries=0):
103 107
        if self.max_retries and retries >= self.max_retries:
104
            logger.error("Aborting after %d retries", retries)
108
            self.log.error("Aborting after %d retries", retries)
105 109
            raise AMQPConnectionError('Aborting after %d connection failures.'\
106 110
                                      % retries)
107 111
            return
......
113 117
        self.client = Client(host, pubacks=self.confirms)
114 118

  
115 119
        host = host.split('@')[-1]
116
        logger.debug('Connecting to node %s' % host)
120
        self.log.debug('Connecting to node %s' % host)
117 121

  
118 122
        try:
119 123
            promise = self.client.connect()
120 124
            self.client.wait(promise)
121 125
        except socket_error as e:
122 126
            if retries < len(self.hosts):
123
                logger.warning('Cannot connect to host %s: %s', host, e)
127
                self.log.warning('Cannot connect to host %s: %s', host, e)
124 128
            else:
125
                logger.error('Cannot connect to host %s: %s', host, e)
129
                self.log.error('Cannot connect to host %s: %s', host, e)
126 130
                sleep(1)
127 131
            return self.connect(retries + 1)
128 132

  
129
        logger.info('Successfully connected to host: %s', host)
133
        self.log.info('Successfully connected to host: %s', host)
130 134

  
131 135
        # Setup TCP keepalive option
132 136
        self.client.sd.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
......
137 141
        # Keepalive retry
138 142
        self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 10)
139 143

  
140
        logger.info('Creating channel')
144
        self.log.info('Creating channel')
141 145

  
142 146
        # Clear consume_promises each time connecting, since they are related
143 147
        # to the connection object
......
172 176
        @param exhange_type: one of 'direct', 'topic', 'fanout'
173 177

  
174 178
        """
175
        logger.info('Declaring %s exchange: %s', type, exchange)
179
        self.log.info('Declaring %s exchange: %s', type, exchange)
176 180
        promise = self.client.exchange_declare(exchange=exchange,
177 181
                                               type=type,
178 182
                                               durable=True,
......
200 204
                  and not host IP. example: [node1@rabbit,node2@rabbit]
201 205

  
202 206
        """
203
        logger.info('Declaring queue: %s', queue)
207
        self.log.info('Declaring queue: %s', queue)
204 208

  
205 209
        if mirrored:
206 210
            if mirrored_nodes == 'all':
......
223 227
        self.client.wait(promise)
224 228

  
225 229
    def queue_bind(self, queue, exchange, routing_key):
226
        logger.debug('Binding queue %s to exchange %s with key %s'
227
                 % (queue, exchange, routing_key))
230
        self.log.debug('Binding queue %s to exchange %s with key %s'
231
                       % (queue, exchange, routing_key))
228 232
        promise = self.client.queue_bind(exchange=exchange, queue=queue,
229 233
                                         routing_key=routing_key)
230 234
        self.client.wait(promise)
......
282 286
        msgs = self.unacked.values()
283 287
        self.unacked.clear()
284 288
        for exchange, routing_key, body in msgs:
285
            logger.debug('Resending message %s' % body)
289
            self.log.debug('Resending message %s' % body)
286 290
            self.basic_publish(exchange, routing_key, body)
287 291

  
288 292
    @reconnect_decorator
......
311 315
            if 'body' in msg:
312 316
                callback(self, msg)
313 317
            else:
314
                logger.debug("Message without body %s" % msg)
318
                self.log.debug("Message without body %s" % msg)
315 319
                raise socket_error
316 320

  
317 321
        consume_promise = \
......
372 376

  
373 377
    def close(self):
374 378
        """Check that messages have been send and close the connection."""
375
        logger.debug("Closing connection to %s", self.client.host)
379
        self.log.debug("Closing connection to %s", self.client.host)
376 380
        try:
377 381
            if self.confirms:
378 382
                self.get_confirms()
379 383
            close_promise = self.client.close()
380 384
            self.client.wait(close_promise)
381 385
        except (socket_error, spec_exceptions.ConnectionForced) as e:
382
            logger.error('Connection closed while closing connection:%s',
383
                          e)
386
            self.log.error('Connection closed while closing connection:%s', e)
384 387

  
385 388
    def queue_delete(self, queue, if_unused=True, if_empty=True):
386 389
        """Delete a queue.
......
394 397
            self.client.wait(promise)
395 398
            return True
396 399
        except spec_exceptions.NotFound:
397
            logger.info("Queue %s does not exist", queue)
400
            self.log.info("Queue %s does not exist", queue)
398 401
            return False
399 402

  
400 403
    def exchange_delete(self, exchange, if_unused=True):
......
406 409
            self.client.wait(promise)
407 410
            return True
408 411
        except spec_exceptions.NotFound:
409
            logger.info("Exchange %s does not exist", exchange)
412
            self.log.info("Exchange %s does not exist", exchange)
410 413
            return False
411 414

  
412 415
    @reconnect_decorator

Also available in: Unified diff