Revision d28244af logic/dispatcher.py
b/logic/dispatcher.py | ||
---|---|---|
54 | 54 |
import socket |
55 | 55 |
from daemon import daemon |
56 | 56 |
|
57 |
import traceback |
|
58 |
|
|
57 | 59 |
# Take care of differences between python-daemon versions. |
58 | 60 |
try: |
59 | 61 |
from daemon import pidfile |
... | ... | |
76 | 78 |
clienttags = [] |
77 | 79 |
|
78 | 80 |
def __init__(self, debug = False): |
79 |
|
|
81 |
|
|
80 | 82 |
# Initialize logger |
81 | 83 |
self.logger = log.get_logger('synnefo.dispatcher') |
82 | 84 |
|
... | ... | |
117 | 119 |
password=settings.RABBIT_PASSWORD, |
118 | 120 |
virtual_host=settings.RABBIT_VHOST) |
119 | 121 |
except socket.error: |
120 |
time.sleep(1) |
|
122 |
self.logger.error("Failed to connect to %s, retrying in 10s...", |
|
123 |
settings.RABBIT_HOST) |
|
124 |
time.sleep(10) |
|
121 | 125 |
|
122 | 126 |
self.logger.info("Connection succesful, opening channel") |
123 | 127 |
self.chan = conn.channel() |
... | ... | |
197 | 201 |
|
198 | 202 |
|
199 | 203 |
def _exit_handler(signum, frame): |
200 |
""""Catch exit signal in children processes.""" |
|
201 |
print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum) |
|
204 |
""""Catch exit signal in children processes""" |
|
205 |
global logger |
|
206 |
logger.info("Caught signal %d, will raise SystemExit", signum) |
|
202 | 207 |
raise SystemExit |
203 | 208 |
|
204 | 209 |
|
205 | 210 |
def _parent_handler(signum, frame): |
206 | 211 |
""""Catch exit signal in parent process and forward it to children.""" |
207 |
global children |
|
208 |
print "Caught signal %d, sending kill signal to children" % signum |
|
212 |
global children, logger |
|
213 |
logger.info("Caught signal %d, sending SIGTERM to children %s", |
|
214 |
signum, children) |
|
209 | 215 |
[os.kill(pid, SIGTERM) for pid in children] |
210 | 216 |
|
211 | 217 |
|
... | ... | |
214 | 220 |
|
215 | 221 |
# Cmd line argument parsing |
216 | 222 |
(opts, args) = parse_arguments(cmdline) |
217 |
disp = Dispatcher(debug = opts.debug)
|
|
223 |
disp = Dispatcher(debug=opts.debug)
|
|
218 | 224 |
|
219 | 225 |
# Start the event loop |
220 | 226 |
disp.wait() |
... | ... | |
245 | 251 |
return parser.parse_args(args) |
246 | 252 |
|
247 | 253 |
|
248 |
def purge_queues() :
|
|
254 |
def purge_queues(): |
|
249 | 255 |
""" |
250 | 256 |
Delete declared queues from RabbitMQ. Use with care! |
251 | 257 |
""" |
... | ... | |
363 | 369 |
disp.wait() |
364 | 370 |
|
365 | 371 |
|
366 |
def main():
|
|
372 |
def daemon_mode(opts):
|
|
367 | 373 |
global children, logger |
368 |
(opts, args) = parse_arguments(sys.argv[1:]) |
|
369 |
|
|
370 |
logger = log.get_logger("synnefo.dispatcher") |
|
371 |
|
|
372 |
# Init the global variables containing the queues |
|
373 |
_init_queues() |
|
374 | 374 |
|
375 |
# Special case for the clean up queues action |
|
376 |
if opts.purge_queues: |
|
377 |
purge_queues() |
|
378 |
return |
|
379 |
|
|
380 |
# Special case for the clean up exch action |
|
381 |
if opts.purge_exchanges: |
|
382 |
purge_exchanges() |
|
383 |
return |
|
384 |
|
|
385 |
if opts.drain_queue: |
|
386 |
drain_queue(opts.drain_queue) |
|
387 |
return |
|
388 |
|
|
389 |
# Debug mode, process messages without spawning workers |
|
390 |
if opts.debug: |
|
391 |
log.console_output(logger) |
|
392 |
debug_mode() |
|
393 |
return |
|
394 |
|
|
395 |
# Become a daemon |
|
396 |
daemon_context = daemon.DaemonContext( |
|
397 |
stdout=sys.stdout, |
|
398 |
stderr=sys.stderr, |
|
399 |
umask=022) |
|
400 |
|
|
401 |
daemon_context.open() |
|
402 |
|
|
403 |
# Create pidfile. Take care of differences between python-daemon versions. |
|
375 |
# Create pidfile, |
|
376 |
# take care of differences between python-daemon versions |
|
404 | 377 |
try: |
405 | 378 |
pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10) |
406 | 379 |
except: |
... | ... | |
418 | 391 |
newpid = os.fork() |
419 | 392 |
|
420 | 393 |
if newpid == 0: |
421 |
signal(SIGINT, _exit_handler)
|
|
394 |
signal(SIGINT, _exit_handler) |
|
422 | 395 |
signal(SIGTERM, _exit_handler) |
423 | 396 |
child(sys.argv[1:]) |
424 | 397 |
sys.exit(1) |
... | ... | |
429 | 402 |
i += 1 |
430 | 403 |
|
431 | 404 |
# Catch signals to ensure graceful shutdown |
432 |
signal(SIGINT, _parent_handler)
|
|
405 |
signal(SIGINT, _parent_handler) |
|
433 | 406 |
signal(SIGTERM, _parent_handler) |
434 | 407 |
|
435 | 408 |
# Wait for all children processes to die, one by one |
436 |
try :
|
|
409 |
try: |
|
437 | 410 |
for pid in children: |
438 | 411 |
try: |
439 | 412 |
os.waitpid(pid, 0) |
... | ... | |
442 | 415 |
finally: |
443 | 416 |
pidf.release() |
444 | 417 |
|
418 |
|
|
419 |
def main(): |
|
420 |
global logger |
|
421 |
(opts, args) = parse_arguments(sys.argv[1:]) |
|
422 |
|
|
423 |
logger = log.get_logger("synnefo.dispatcher") |
|
424 |
|
|
425 |
# Init the global variables containing the queues |
|
426 |
_init_queues() |
|
427 |
|
|
428 |
# Special case for the clean up queues action |
|
429 |
if opts.purge_queues: |
|
430 |
purge_queues() |
|
431 |
return |
|
432 |
|
|
433 |
# Special case for the clean up exch action |
|
434 |
if opts.purge_exchanges: |
|
435 |
purge_exchanges() |
|
436 |
return |
|
437 |
|
|
438 |
if opts.drain_queue: |
|
439 |
drain_queue(opts.drain_queue) |
|
440 |
return |
|
441 |
|
|
442 |
# Debug mode, process messages without spawning workers |
|
443 |
if opts.debug: |
|
444 |
log.console_output(logger) |
|
445 |
debug_mode() |
|
446 |
return |
|
447 |
|
|
448 |
# Redirect stdout and stderr to the fileno of the first |
|
449 |
# file-based handler for this logger |
|
450 |
stdout_stderr_handler = None |
|
451 |
files_preserve = None |
|
452 |
for handler in logger.handlers: |
|
453 |
if hasattr(handler, 'stream') and hasattr(handler.stream, 'fileno'): |
|
454 |
stdout_stderr_handler = handler.stream |
|
455 |
files_preserve = [handler.stream] |
|
456 |
break |
|
457 |
|
|
458 |
daemon_context = daemon.DaemonContext( |
|
459 |
stdout=stdout_stderr_handler, |
|
460 |
stderr=stdout_stderr_handler, |
|
461 |
files_preserve=files_preserve, |
|
462 |
umask=022) |
|
463 |
|
|
464 |
daemon_context.open() |
|
465 |
|
|
466 |
# Catch every exception, make sure it gets logged properly |
|
467 |
try: |
|
468 |
daemon_mode(opts) |
|
469 |
except Exception: |
|
470 |
exc = "".join(traceback.format_exception(*sys.exc_info())) |
|
471 |
logger.critical(exc) |
|
472 |
raise |
|
473 |
|
|
474 |
|
|
445 | 475 |
if __name__ == "__main__": |
446 | 476 |
sys.exit(main()) |
447 | 477 |
|
Also available in: Unified diff