Revision 9e98ba3c logic/dispatcher.py

b/logic/dispatcher.py
43 43
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
44 44
sys.path.append(path)
45 45
import synnefo.settings as settings
46
from synnefo.logic import log
47 46

  
48 47
setup_environ(settings)
49 48

  
50 49
from amqplib import client_0_8 as amqp
51 50
from signal import signal, SIGINT, SIGTERM
52 51

  
52
import logging
53 53
import time
54 54
import socket
55 55
from daemon import daemon
56 56

  
57
import traceback
58

  
59 57
# Take care of differences between python-daemon versions.
60 58
try:
61 59
    from daemon import pidfile
......
63 61
    from daemon import pidlockfile
64 62

  
65 63
from synnefo.logic import callbacks
64
from synnefo.util.dictconfig import dictConfig
65

  
66

  
67
log = logging.getLogger()
68

  
66 69

  
67 70
# Queue names
68 71
QUEUES = []
......
72 75

  
73 76

  
74 77
class Dispatcher:
75

  
76
    logger = None
77 78
    chan = None
78 79
    debug = False
79 80
    clienttags = []
80 81

  
81 82
    def __init__(self, debug=False):
82

  
83
        # Initialize logger
84
        self.logger = log.get_logger('synnefo.dispatcher')
85

  
86 83
        self.debug = debug
87 84
        self._init()
88 85

  
......
93 90
            except SystemExit:
94 91
                break
95 92
            except amqp.exceptions.AMQPConnectionException:
96
                self.logger.error("Server went away, reconnecting...")
93
                log.error("Server went away, reconnecting...")
97 94
                self._init()
98 95
            except socket.error:
99
                self.logger.error("Server went away, reconnecting...")
96
                log.error("Server went away, reconnecting...")
100 97
                self._init()
101 98
            except Exception, e:
102
                self.logger.exception("Caught unexpected exception")
99
                log.exception("Caught unexpected exception")
103 100

  
104 101
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
105 102
        self.chan.connection.close()
......
107 104

  
108 105
    def _init(self):
109 106
        global QUEUES, BINDINGS
110
        self.logger.info("Initializing")
107
        log.info("Initializing")
111 108

  
112 109
        # Connect to RabbitMQ
113 110
        conn = None
114 111
        while conn == None:
115
            self.logger.info("Attempting to connect to %s",
116
                             settings.RABBIT_HOST)
112
            log.info("Attempting to connect to %s", settings.RABBIT_HOST)
117 113
            try:
118 114
                conn = amqp.Connection(host=settings.RABBIT_HOST,
119 115
                                       userid=settings.RABBIT_USERNAME,
120 116
                                       password=settings.RABBIT_PASSWORD,
121 117
                                       virtual_host=settings.RABBIT_VHOST)
122 118
            except socket.error:
123
                self.logger.error("Failed to connect to %s, retrying in 10s",
119
                log.error("Failed to connect to %s, retrying in 10s",
124 120
                                  settings.RABBIT_HOST)
125 121
                time.sleep(10)
126 122

  
127
        self.logger.info("Connection succesful, opening channel")
123
        log.info("Connection succesful, opening channel")
128 124
        self.chan = conn.channel()
129 125

  
130 126
        # Declare queues and exchanges
......
143 139
            try:
144 140
                callback = getattr(callbacks, binding[3])
145 141
            except AttributeError:
146
                self.logger.error("Cannot find callback %s" % binding[3])
142
                log.error("Cannot find callback %s", binding[3])
147 143
                raise SystemExit(1)
148 144

  
149 145
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
150 146
                                 routing_key=binding[2])
151 147
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
152
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
153
                              (binding[1], binding[2], binding[0], binding[3]))
148
            log.debug("Binding %s(%s) to queue %s with handler %s",
149
                              binding[1], binding[2], binding[0], binding[3])
154 150
            self.clienttags.append(tag)
155 151

  
156 152

  
......
207 203

  
208 204
def _exit_handler(signum, frame):
209 205
    """"Catch exit signal in children processes"""
210
    global logger
211
    logger.info("Caught signal %d, will raise SystemExit", signum)
206
    log.info("Caught signal %d, will raise SystemExit", signum)
212 207
    raise SystemExit
213 208

  
214 209

  
215 210
def _parent_handler(signum, frame):
216 211
    """"Catch exit signal in parent process and forward it to children."""
217
    global children, logger
218
    logger.info("Caught signal %d, sending SIGTERM to children %s",
212
    global children
213
    log.info("Caught signal %d, sending SIGTERM to children %s",
219 214
                signum, children)
220 215
    [os.kill(pid, SIGTERM) for pid in children]
221 216

  
......
375 370

  
376 371

  
377 372
def daemon_mode(opts):
378
    global children, logger
373
    global children
379 374

  
380 375
    # Create pidfile,
381 376
    # take care of differences between python-daemon versions
......
386 381

  
387 382
    pidf.acquire()
388 383

  
389
    logger.info("Became a daemon")
384
    log.info("Became a daemon")
390 385

  
391 386
    # Fork workers
392 387
    children = []
......
401 396
            child(sys.argv[1:])
402 397
            sys.exit(1)
403 398
        else:
404
            pids = (os.getpid(), newpid)
405
            logger.debug("%d, forked child: %d" % pids)
406
            children.append(pids[1])
399
            log.debug("%d, forked child: %d", os.getpid(), newpid)
400
            children.append(newpid)
407 401
        i += 1
408 402

  
409 403
    # Catch signals to ensure graceful shutdown
......
422 416

  
423 417

  
424 418
def main():
425
    global logger
419
    global log
426 420
    (opts, args) = parse_arguments(sys.argv[1:])
427 421

  
428
    logger = log.get_logger("synnefo.dispatcher")
429

  
430 422
    # Init the global variables containing the queues
431 423
    _init_queues()
432 424

  
......
446 438

  
447 439
    # Debug mode, process messages without spawning workers
448 440
    if opts.debug:
449
        log.console_output(logger)
450 441
        debug_mode()
451 442
        return
452

  
453
    # Redirect stdout and stderr to the fileno of the first
454
    # file-based handler for this logger
455
    stdout_stderr_handler = None
456
    files_preserve = None
457
    for handler in logger.handlers:
458
        if hasattr(handler, 'stream') and hasattr(handler.stream, 'fileno'):
459
            stdout_stderr_handler = handler.stream
460
            files_preserve = [handler.stream]
461
            break
462

  
443
    
444
    files_preserve = []
445
    for handler in log.handlers:
446
        stream = getattr(handler, 'stream')
447
        if stream and hasattr(stream, 'fileno'):
448
            files_preserve.append(handler.stream)
449
    
463 450
    daemon_context = daemon.DaemonContext(
464
        stdout=stdout_stderr_handler,
465
        stderr=stdout_stderr_handler,
466 451
        files_preserve=files_preserve,
467 452
        umask=022)
468

  
453
    
469 454
    daemon_context.open()
470 455

  
471 456
    # Catch every exception, make sure it gets logged properly
472 457
    try:
473 458
        daemon_mode(opts)
474 459
    except Exception:
475
        exc = "".join(traceback.format_exception(*sys.exc_info()))
476
        logger.critical(exc)
460
        log.exception("Unknown error")
477 461
        raise
478 462

  
479 463

  
480 464
if __name__ == "__main__":
465
    dictConfig(settings.DISPATCHER_LOGGING)
481 466
    sys.exit(main())
482 467

  
483 468
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :

Also available in: Unified diff