Revision 3f018af1 snf-cyclades-app/synnefo/logic/dispatcher.py
b/snf-cyclades-app/synnefo/logic/dispatcher.py | ||
---|---|---|
43 | 43 |
import os |
44 | 44 |
path = os.path.normpath(os.path.join(os.getcwd(), '..')) |
45 | 45 |
sys.path.append(path) |
46 |
|
|
47 | 46 |
from synnefo import settings |
48 | 47 |
setup_environ(settings) |
49 | 48 |
|
50 | 49 |
import logging |
51 | 50 |
import time |
52 | 51 |
|
52 |
import daemon |
|
53 | 53 |
import daemon.runner |
54 |
import daemon.daemon |
|
55 | 54 |
from lockfile import LockTimeout |
56 |
from signal import signal, SIGINT, SIGTERM |
|
57 |
|
|
58 | 55 |
# Take care of differences between python-daemon versions. |
59 | 56 |
try: |
60 | 57 |
from daemon import pidfile as pidlockfile |
... | ... | |
63 | 60 |
|
64 | 61 |
from synnefo.lib.amqp import AMQPClient |
65 | 62 |
from synnefo.logic import callbacks |
66 |
from synnefo.util.dictconfig import dictConfig |
|
67 |
|
|
68 | 63 |
|
64 |
from synnefo.util.dictconfig import dictConfig |
|
65 |
dictConfig(settings.DISPATCHER_LOGGING) |
|
69 | 66 |
log = logging.getLogger() |
70 | 67 |
|
71 |
|
|
72 | 68 |
# Queue names |
73 | 69 |
QUEUES = [] |
74 | 70 |
|
... | ... | |
179 | 175 |
QUEUES += (QUEUE_DEBUG,) |
180 | 176 |
|
181 | 177 |
|
182 |
def _exit_handler(signum, frame): |
|
183 |
""""Catch exit signal in children processes""" |
|
184 |
log.info("Caught signal %d, will raise SystemExit", signum) |
|
185 |
raise SystemExit |
|
186 |
|
|
187 |
|
|
188 |
def _parent_handler(signum, frame): |
|
189 |
""""Catch exit signal in parent process and forward it to children.""" |
|
190 |
global children |
|
191 |
log.info("Caught signal %d, sending SIGTERM to children %s", |
|
192 |
signum, children) |
|
193 |
[os.kill(pid, SIGTERM) for pid in children] |
|
194 |
|
|
195 |
|
|
196 |
def child(cmdline): |
|
197 |
"""The context of the child process""" |
|
198 |
|
|
199 |
# Cmd line argument parsing |
|
200 |
(opts, args) = parse_arguments(cmdline) |
|
201 |
disp = Dispatcher(debug=opts.debug) |
|
202 |
|
|
203 |
# Start the event loop |
|
204 |
disp.wait() |
|
205 |
|
|
206 |
|
|
207 | 178 |
def parse_arguments(args): |
208 | 179 |
from optparse import OptionParser |
209 | 180 |
|
210 |
default_pid_file = os.path.join("var", "run", "synnefo", "dispatcher.pid") |
|
181 |
default_pid_file = \ |
|
182 |
os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:] |
|
211 | 183 |
parser = OptionParser() |
212 | 184 |
parser.add_option("-d", "--debug", action="store_true", default=False, |
213 | 185 |
dest="debug", help="Enable debug mode") |
... | ... | |
287 | 259 |
client = AMQPClient() |
288 | 260 |
client.connect() |
289 | 261 |
|
290 |
# Register a temporary queue binding |
|
291 |
for binding in BINDINGS: |
|
292 |
if binding[0] == queue: |
|
293 |
exch = binding[1] |
|
294 |
|
|
295 | 262 |
tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc) |
296 | 263 |
|
297 | 264 |
print "Queue draining about to start, hit Ctrl+c when done" |
298 | 265 |
time.sleep(2) |
299 | 266 |
print "Queue draining starting" |
300 | 267 |
|
301 |
signal(SIGTERM, _exit_handler) |
|
302 |
signal(SIGINT, _exit_handler) |
|
303 |
|
|
304 | 268 |
num_processed = 0 |
305 | 269 |
while True: |
306 | 270 |
client.basic_wait() |
... | ... | |
311 | 275 |
client.close() |
312 | 276 |
|
313 | 277 |
|
314 |
|
|
315 | 278 |
def get_user_confirmation(): |
316 | 279 |
ans = raw_input("Are you sure (N/y):") |
317 | 280 |
|
... | ... | |
324 | 287 |
|
325 | 288 |
def debug_mode(): |
326 | 289 |
disp = Dispatcher(debug=True) |
327 |
signal(SIGINT, _exit_handler) |
|
328 |
signal(SIGTERM, _exit_handler) |
|
329 |
|
|
330 | 290 |
disp.wait() |
331 | 291 |
|
332 | 292 |
|
333 | 293 |
def daemon_mode(opts): |
334 |
global children |
|
335 |
|
|
336 |
# Create pidfile, |
|
337 |
pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10) |
|
338 |
|
|
339 |
if daemon.runner.is_pidfile_stale(pidf): |
|
340 |
log.warning("Removing stale PID lock file %s", pidf.path) |
|
341 |
pidf.break_lock() |
|
342 |
|
|
343 |
try: |
|
344 |
pidf.acquire() |
|
345 |
except (pidlockfile.AlreadyLocked, LockTimeout): |
|
346 |
log.critical("Failed to lock pidfile %s, another instance running?", |
|
347 |
pidf.path) |
|
348 |
sys.exit(1) |
|
349 |
|
|
350 |
log.info("Became a daemon") |
|
351 |
|
|
352 |
# Fork workers |
|
353 |
children = [] |
|
354 |
|
|
355 |
i = 0 |
|
356 |
while i < opts.workers: |
|
357 |
newpid = os.fork() |
|
358 |
|
|
359 |
if newpid == 0: |
|
360 |
signal(SIGINT, _exit_handler) |
|
361 |
signal(SIGTERM, _exit_handler) |
|
362 |
child(sys.argv[1:]) |
|
363 |
sys.exit(1) |
|
364 |
else: |
|
365 |
log.debug("%d, forked child: %d", os.getpid(), newpid) |
|
366 |
children.append(newpid) |
|
367 |
i += 1 |
|
368 |
|
|
369 |
# Catch signals to ensure graceful shutdown |
|
370 |
signal(SIGINT, _parent_handler) |
|
371 |
signal(SIGTERM, _parent_handler) |
|
372 |
|
|
373 |
# Wait for all children processes to die, one by one |
|
374 |
try: |
|
375 |
for pid in children: |
|
376 |
try: |
|
377 |
os.waitpid(pid, 0) |
|
378 |
except Exception: |
|
379 |
pass |
|
380 |
finally: |
|
381 |
pidf.release() |
|
294 |
disp = Dispatcher(debug=False) |
|
295 |
disp.wait() |
|
382 | 296 |
|
383 | 297 |
|
384 | 298 |
def main(): |
385 | 299 |
(opts, args) = parse_arguments(sys.argv[1:]) |
386 | 300 |
|
387 |
dictConfig(settings.DISPATCHER_LOGGING) |
|
388 |
|
|
389 |
global log |
|
390 |
|
|
391 | 301 |
# Init the global variables containing the queues |
392 | 302 |
_init_queues() |
393 | 303 |
|
... | ... | |
405 | 315 |
drain_queue(opts.drain_queue) |
406 | 316 |
return |
407 | 317 |
|
408 |
# Debug mode, process messages without spawning workers
|
|
318 |
# Debug mode, process messages without daemonizing
|
|
409 | 319 |
if opts.debug: |
410 | 320 |
debug_mode() |
411 | 321 |
return |
412 | 322 |
|
323 |
# Create pidfile, |
|
324 |
pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10) |
|
325 |
|
|
326 |
if daemon.runner.is_pidfile_stale(pidf): |
|
327 |
log.warning("Removing stale PID lock file %s", pidf.path) |
|
328 |
pidf.break_lock() |
|
329 |
|
|
413 | 330 |
files_preserve = [] |
414 | 331 |
for handler in log.handlers: |
415 | 332 |
stream = getattr(handler, 'stream') |
416 | 333 |
if stream and hasattr(stream, 'fileno'): |
417 | 334 |
files_preserve.append(handler.stream) |
418 | 335 |
|
419 |
daemon_context = daemon.daemon.DaemonContext( |
|
420 |
files_preserve=files_preserve, |
|
421 |
umask=022) |
|
336 |
stderr_stream = None |
|
337 |
for handler in log.handlers: |
|
338 |
stream = getattr(handler, 'stream') |
|
339 |
if stream and hasattr(handler, 'baseFilename'): |
|
340 |
stderr_stream = stream |
|
341 |
break |
|
422 | 342 |
|
423 |
daemon_context.open() |
|
343 |
daemon_context = daemon.DaemonContext( |
|
344 |
pidfile=pidf, |
|
345 |
umask=0022, |
|
346 |
stdout=stderr_stream, |
|
347 |
stderr=stderr_stream, |
|
348 |
files_preserve=files_preserve) |
|
349 |
|
|
350 |
try: |
|
351 |
daemon_context.open() |
|
352 |
except (pidlockfile.AlreadyLocked, LockTimeout): |
|
353 |
log.critical("Failed to lock pidfile %s, another instance running?", |
|
354 |
pidf.path) |
|
355 |
sys.exit(1) |
|
356 |
|
|
357 |
log.info("Became a daemon") |
|
358 |
|
|
359 |
if 'gevent' in sys.modules: |
|
360 |
# A fork() has occured while daemonizing. If running in |
|
361 |
# gevent context we *must* reinit gevent |
|
362 |
log.debug("gevent imported. Reinitializing gevent") |
|
363 |
import gevent |
|
364 |
gevent.reinit() |
|
424 | 365 |
|
425 | 366 |
# Catch every exception, make sure it gets logged properly |
426 | 367 |
try: |
... | ... | |
429 | 370 |
log.exception("Unknown error") |
430 | 371 |
raise |
431 | 372 |
|
432 |
|
|
433 | 373 |
if __name__ == "__main__": |
434 | 374 |
sys.exit(main()) |
435 | 375 |
|
Also available in: Unified diff