54 |
54 |
import socket
|
55 |
55 |
from daemon import daemon
|
56 |
56 |
|
|
57 |
import traceback
|
|
58 |
|
57 |
59 |
# Take care of differences between python-daemon versions.
|
58 |
60 |
try:
|
59 |
61 |
from daemon import pidfile
|
... | ... | |
76 |
78 |
clienttags = []
|
77 |
79 |
|
78 |
80 |
def __init__(self, debug = False):
|
79 |
|
|
|
81 |
|
80 |
82 |
# Initialize logger
|
81 |
83 |
self.logger = log.get_logger('synnefo.dispatcher')
|
82 |
84 |
|
... | ... | |
117 |
119 |
password=settings.RABBIT_PASSWORD,
|
118 |
120 |
virtual_host=settings.RABBIT_VHOST)
|
119 |
121 |
except socket.error:
|
120 |
|
time.sleep(1)
|
|
122 |
self.logger.error("Failed to connect to %s, retrying in 10s...",
|
|
123 |
settings.RABBIT_HOST)
|
|
124 |
time.sleep(10)
|
121 |
125 |
|
122 |
126 |
self.logger.info("Connection succesful, opening channel")
|
123 |
127 |
self.chan = conn.channel()
|
... | ... | |
197 |
201 |
|
198 |
202 |
|
199 |
203 |
def _exit_handler(signum, frame):
|
200 |
|
""""Catch exit signal in children processes."""
|
201 |
|
print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
|
|
204 |
""""Catch exit signal in children processes"""
|
|
205 |
global logger
|
|
206 |
logger.info("Caught signal %d, will raise SystemExit", signum)
|
202 |
207 |
raise SystemExit
|
203 |
208 |
|
204 |
209 |
|
205 |
210 |
def _parent_handler(signum, frame):
|
206 |
211 |
""""Catch exit signal in parent process and forward it to children."""
|
207 |
|
global children
|
208 |
|
print "Caught signal %d, sending kill signal to children" % signum
|
|
212 |
global children, logger
|
|
213 |
logger.info("Caught signal %d, sending SIGTERM to children %s",
|
|
214 |
signum, children)
|
209 |
215 |
[os.kill(pid, SIGTERM) for pid in children]
|
210 |
216 |
|
211 |
217 |
|
... | ... | |
214 |
220 |
|
215 |
221 |
# Cmd line argument parsing
|
216 |
222 |
(opts, args) = parse_arguments(cmdline)
|
217 |
|
disp = Dispatcher(debug = opts.debug)
|
|
223 |
disp = Dispatcher(debug=opts.debug)
|
218 |
224 |
|
219 |
225 |
# Start the event loop
|
220 |
226 |
disp.wait()
|
... | ... | |
245 |
251 |
return parser.parse_args(args)
|
246 |
252 |
|
247 |
253 |
|
248 |
|
def purge_queues() :
|
|
254 |
def purge_queues():
|
249 |
255 |
"""
|
250 |
256 |
Delete declared queues from RabbitMQ. Use with care!
|
251 |
257 |
"""
|
... | ... | |
363 |
369 |
disp.wait()
|
364 |
370 |
|
365 |
371 |
|
366 |
|
def main():
|
|
372 |
def daemon_mode(opts):
|
367 |
373 |
global children, logger
|
368 |
|
(opts, args) = parse_arguments(sys.argv[1:])
|
369 |
|
|
370 |
|
logger = log.get_logger("synnefo.dispatcher")
|
371 |
|
|
372 |
|
# Init the global variables containing the queues
|
373 |
|
_init_queues()
|
374 |
374 |
|
375 |
|
# Special case for the clean up queues action
|
376 |
|
if opts.purge_queues:
|
377 |
|
purge_queues()
|
378 |
|
return
|
379 |
|
|
380 |
|
# Special case for the clean up exch action
|
381 |
|
if opts.purge_exchanges:
|
382 |
|
purge_exchanges()
|
383 |
|
return
|
384 |
|
|
385 |
|
if opts.drain_queue:
|
386 |
|
drain_queue(opts.drain_queue)
|
387 |
|
return
|
388 |
|
|
389 |
|
# Debug mode, process messages without spawning workers
|
390 |
|
if opts.debug:
|
391 |
|
log.console_output(logger)
|
392 |
|
debug_mode()
|
393 |
|
return
|
394 |
|
|
395 |
|
# Become a daemon
|
396 |
|
daemon_context = daemon.DaemonContext(
|
397 |
|
stdout=sys.stdout,
|
398 |
|
stderr=sys.stderr,
|
399 |
|
umask=022)
|
400 |
|
|
401 |
|
daemon_context.open()
|
402 |
|
|
403 |
|
# Create pidfile. Take care of differences between python-daemon versions.
|
|
375 |
# Create pidfile,
|
|
376 |
# take care of differences between python-daemon versions
|
404 |
377 |
try:
|
405 |
378 |
pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
|
406 |
379 |
except:
|
... | ... | |
418 |
391 |
newpid = os.fork()
|
419 |
392 |
|
420 |
393 |
if newpid == 0:
|
421 |
|
signal(SIGINT, _exit_handler)
|
|
394 |
signal(SIGINT, _exit_handler)
|
422 |
395 |
signal(SIGTERM, _exit_handler)
|
423 |
396 |
child(sys.argv[1:])
|
424 |
397 |
sys.exit(1)
|
... | ... | |
429 |
402 |
i += 1
|
430 |
403 |
|
431 |
404 |
# Catch signals to ensure graceful shutdown
|
432 |
|
signal(SIGINT, _parent_handler)
|
|
405 |
signal(SIGINT, _parent_handler)
|
433 |
406 |
signal(SIGTERM, _parent_handler)
|
434 |
407 |
|
435 |
408 |
# Wait for all children processes to die, one by one
|
436 |
|
try :
|
|
409 |
try:
|
437 |
410 |
for pid in children:
|
438 |
411 |
try:
|
439 |
412 |
os.waitpid(pid, 0)
|
... | ... | |
442 |
415 |
finally:
|
443 |
416 |
pidf.release()
|
444 |
417 |
|
|
418 |
|
|
419 |
def main():
|
|
420 |
global logger
|
|
421 |
(opts, args) = parse_arguments(sys.argv[1:])
|
|
422 |
|
|
423 |
logger = log.get_logger("synnefo.dispatcher")
|
|
424 |
|
|
425 |
# Init the global variables containing the queues
|
|
426 |
_init_queues()
|
|
427 |
|
|
428 |
# Special case for the clean up queues action
|
|
429 |
if opts.purge_queues:
|
|
430 |
purge_queues()
|
|
431 |
return
|
|
432 |
|
|
433 |
# Special case for the clean up exch action
|
|
434 |
if opts.purge_exchanges:
|
|
435 |
purge_exchanges()
|
|
436 |
return
|
|
437 |
|
|
438 |
if opts.drain_queue:
|
|
439 |
drain_queue(opts.drain_queue)
|
|
440 |
return
|
|
441 |
|
|
442 |
# Debug mode, process messages without spawning workers
|
|
443 |
if opts.debug:
|
|
444 |
log.console_output(logger)
|
|
445 |
debug_mode()
|
|
446 |
return
|
|
447 |
|
|
448 |
# Redirect stdout and stderr to the fileno of the first
|
|
449 |
# file-based handler for this logger
|
|
450 |
stdout_stderr_handler = None
|
|
451 |
files_preserve = None
|
|
452 |
for handler in logger.handlers:
|
|
453 |
if hasattr(handler, 'stream') and hasattr(handler.stream, 'fileno'):
|
|
454 |
stdout_stderr_handler = handler.stream
|
|
455 |
files_preserve = [handler.stream]
|
|
456 |
break
|
|
457 |
|
|
458 |
daemon_context = daemon.DaemonContext(
|
|
459 |
stdout=stdout_stderr_handler,
|
|
460 |
stderr=stdout_stderr_handler,
|
|
461 |
files_preserve=files_preserve,
|
|
462 |
umask=022)
|
|
463 |
|
|
464 |
daemon_context.open()
|
|
465 |
|
|
466 |
# Catch every exception, make sure it gets logged properly
|
|
467 |
try:
|
|
468 |
daemon_mode(opts)
|
|
469 |
except Exception:
|
|
470 |
exc = "".join(traceback.format_exception(*sys.exc_info()))
|
|
471 |
logger.critical(exc)
|
|
472 |
raise
|
|
473 |
|
|
474 |
|
445 |
475 |
if __name__ == "__main__":
|
446 |
476 |
sys.exit(main())
|
447 |
477 |
|