Revision 9e98ba3c logic/dispatcher.py
b/logic/dispatcher.py | ||
---|---|---|
43 | 43 |
path = os.path.normpath(os.path.join(os.getcwd(), '..')) |
44 | 44 |
sys.path.append(path) |
45 | 45 |
import synnefo.settings as settings |
46 |
from synnefo.logic import log |
|
47 | 46 |
|
48 | 47 |
setup_environ(settings) |
49 | 48 |
|
50 | 49 |
from amqplib import client_0_8 as amqp |
51 | 50 |
from signal import signal, SIGINT, SIGTERM |
52 | 51 |
|
52 |
import logging |
|
53 | 53 |
import time |
54 | 54 |
import socket |
55 | 55 |
from daemon import daemon |
56 | 56 |
|
57 |
import traceback |
|
58 |
|
|
59 | 57 |
# Take care of differences between python-daemon versions. |
60 | 58 |
try: |
61 | 59 |
from daemon import pidfile |
... | ... | |
63 | 61 |
from daemon import pidlockfile |
64 | 62 |
|
65 | 63 |
from synnefo.logic import callbacks |
64 |
from synnefo.util.dictconfig import dictConfig |
|
65 |
|
|
66 |
|
|
67 |
log = logging.getLogger() |
|
68 |
|
|
66 | 69 |
|
67 | 70 |
# Queue names |
68 | 71 |
QUEUES = [] |
... | ... | |
72 | 75 |
|
73 | 76 |
|
74 | 77 |
class Dispatcher: |
75 |
|
|
76 |
logger = None |
|
77 | 78 |
chan = None |
78 | 79 |
debug = False |
79 | 80 |
clienttags = [] |
80 | 81 |
|
81 | 82 |
def __init__(self, debug=False): |
82 |
|
|
83 |
# Initialize logger |
|
84 |
self.logger = log.get_logger('synnefo.dispatcher') |
|
85 |
|
|
86 | 83 |
self.debug = debug |
87 | 84 |
self._init() |
88 | 85 |
|
... | ... | |
93 | 90 |
except SystemExit: |
94 | 91 |
break |
95 | 92 |
except amqp.exceptions.AMQPConnectionException: |
96 |
self.logger.error("Server went away, reconnecting...")
|
|
93 |
log.error("Server went away, reconnecting...")
|
|
97 | 94 |
self._init() |
98 | 95 |
except socket.error: |
99 |
self.logger.error("Server went away, reconnecting...")
|
|
96 |
log.error("Server went away, reconnecting...")
|
|
100 | 97 |
self._init() |
101 | 98 |
except Exception, e: |
102 |
self.logger.exception("Caught unexpected exception")
|
|
99 |
log.exception("Caught unexpected exception")
|
|
103 | 100 |
|
104 | 101 |
[self.chan.basic_cancel(clienttag) for clienttag in self.clienttags] |
105 | 102 |
self.chan.connection.close() |
... | ... | |
107 | 104 |
|
108 | 105 |
def _init(self): |
109 | 106 |
global QUEUES, BINDINGS |
110 |
self.logger.info("Initializing")
|
|
107 |
log.info("Initializing")
|
|
111 | 108 |
|
112 | 109 |
# Connect to RabbitMQ |
113 | 110 |
conn = None |
114 | 111 |
while conn == None: |
115 |
self.logger.info("Attempting to connect to %s", |
|
116 |
settings.RABBIT_HOST) |
|
112 |
log.info("Attempting to connect to %s", settings.RABBIT_HOST) |
|
117 | 113 |
try: |
118 | 114 |
conn = amqp.Connection(host=settings.RABBIT_HOST, |
119 | 115 |
userid=settings.RABBIT_USERNAME, |
120 | 116 |
password=settings.RABBIT_PASSWORD, |
121 | 117 |
virtual_host=settings.RABBIT_VHOST) |
122 | 118 |
except socket.error: |
123 |
self.logger.error("Failed to connect to %s, retrying in 10s",
|
|
119 |
log.error("Failed to connect to %s, retrying in 10s",
|
|
124 | 120 |
settings.RABBIT_HOST) |
125 | 121 |
time.sleep(10) |
126 | 122 |
|
127 |
self.logger.info("Connection succesful, opening channel")
|
|
123 |
log.info("Connection succesful, opening channel")
|
|
128 | 124 |
self.chan = conn.channel() |
129 | 125 |
|
130 | 126 |
# Declare queues and exchanges |
... | ... | |
143 | 139 |
try: |
144 | 140 |
callback = getattr(callbacks, binding[3]) |
145 | 141 |
except AttributeError: |
146 |
self.logger.error("Cannot find callback %s" % binding[3])
|
|
142 |
log.error("Cannot find callback %s", binding[3])
|
|
147 | 143 |
raise SystemExit(1) |
148 | 144 |
|
149 | 145 |
self.chan.queue_bind(queue=binding[0], exchange=binding[1], |
150 | 146 |
routing_key=binding[2]) |
151 | 147 |
tag = self.chan.basic_consume(queue=binding[0], callback=callback) |
152 |
self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
|
|
153 |
(binding[1], binding[2], binding[0], binding[3]))
|
|
148 |
log.debug("Binding %s(%s) to queue %s with handler %s",
|
|
149 |
binding[1], binding[2], binding[0], binding[3])
|
|
154 | 150 |
self.clienttags.append(tag) |
155 | 151 |
|
156 | 152 |
|
... | ... | |
207 | 203 |
|
208 | 204 |
def _exit_handler(signum, frame): |
209 | 205 |
""""Catch exit signal in children processes""" |
210 |
global logger |
|
211 |
logger.info("Caught signal %d, will raise SystemExit", signum) |
|
206 |
log.info("Caught signal %d, will raise SystemExit", signum) |
|
212 | 207 |
raise SystemExit |
213 | 208 |
|
214 | 209 |
|
215 | 210 |
def _parent_handler(signum, frame): |
216 | 211 |
""""Catch exit signal in parent process and forward it to children.""" |
217 |
global children, logger
|
|
218 |
logger.info("Caught signal %d, sending SIGTERM to children %s",
|
|
212 |
global children |
|
213 |
log.info("Caught signal %d, sending SIGTERM to children %s", |
|
219 | 214 |
signum, children) |
220 | 215 |
[os.kill(pid, SIGTERM) for pid in children] |
221 | 216 |
|
... | ... | |
375 | 370 |
|
376 | 371 |
|
377 | 372 |
def daemon_mode(opts): |
378 |
global children, logger
|
|
373 |
global children |
|
379 | 374 |
|
380 | 375 |
# Create pidfile, |
381 | 376 |
# take care of differences between python-daemon versions |
... | ... | |
386 | 381 |
|
387 | 382 |
pidf.acquire() |
388 | 383 |
|
389 |
logger.info("Became a daemon")
|
|
384 |
log.info("Became a daemon") |
|
390 | 385 |
|
391 | 386 |
# Fork workers |
392 | 387 |
children = [] |
... | ... | |
401 | 396 |
child(sys.argv[1:]) |
402 | 397 |
sys.exit(1) |
403 | 398 |
else: |
404 |
pids = (os.getpid(), newpid) |
|
405 |
logger.debug("%d, forked child: %d" % pids) |
|
406 |
children.append(pids[1]) |
|
399 |
log.debug("%d, forked child: %d", os.getpid(), newpid) |
|
400 |
children.append(newpid) |
|
407 | 401 |
i += 1 |
408 | 402 |
|
409 | 403 |
# Catch signals to ensure graceful shutdown |
... | ... | |
422 | 416 |
|
423 | 417 |
|
424 | 418 |
def main(): |
425 |
global logger
|
|
419 |
global log |
|
426 | 420 |
(opts, args) = parse_arguments(sys.argv[1:]) |
427 | 421 |
|
428 |
logger = log.get_logger("synnefo.dispatcher") |
|
429 |
|
|
430 | 422 |
# Init the global variables containing the queues |
431 | 423 |
_init_queues() |
432 | 424 |
|
... | ... | |
446 | 438 |
|
447 | 439 |
# Debug mode, process messages without spawning workers |
448 | 440 |
if opts.debug: |
449 |
log.console_output(logger) |
|
450 | 441 |
debug_mode() |
451 | 442 |
return |
452 |
|
|
453 |
# Redirect stdout and stderr to the fileno of the first |
|
454 |
# file-based handler for this logger |
|
455 |
stdout_stderr_handler = None |
|
456 |
files_preserve = None |
|
457 |
for handler in logger.handlers: |
|
458 |
if hasattr(handler, 'stream') and hasattr(handler.stream, 'fileno'): |
|
459 |
stdout_stderr_handler = handler.stream |
|
460 |
files_preserve = [handler.stream] |
|
461 |
break |
|
462 |
|
|
443 |
|
|
444 |
files_preserve = [] |
|
445 |
for handler in log.handlers: |
|
446 |
stream = getattr(handler, 'stream') |
|
447 |
if stream and hasattr(stream, 'fileno'): |
|
448 |
files_preserve.append(handler.stream) |
|
449 |
|
|
463 | 450 |
daemon_context = daemon.DaemonContext( |
464 |
stdout=stdout_stderr_handler, |
|
465 |
stderr=stdout_stderr_handler, |
|
466 | 451 |
files_preserve=files_preserve, |
467 | 452 |
umask=022) |
468 |
|
|
453 |
|
|
469 | 454 |
daemon_context.open() |
470 | 455 |
|
471 | 456 |
# Catch every exception, make sure it gets logged properly |
472 | 457 |
try: |
473 | 458 |
daemon_mode(opts) |
474 | 459 |
except Exception: |
475 |
exc = "".join(traceback.format_exception(*sys.exc_info())) |
|
476 |
logger.critical(exc) |
|
460 |
log.exception("Unknown error") |
|
477 | 461 |
raise |
478 | 462 |
|
479 | 463 |
|
480 | 464 |
if __name__ == "__main__": |
465 |
dictConfig(settings.DISPATCHER_LOGGING) |
|
481 | 466 |
sys.exit(main()) |
482 | 467 |
|
483 | 468 |
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai : |
Also available in: Unified diff