Revision 4936f2e2 snf-cyclades-app/synnefo/logic/dispatcher.py
b/snf-cyclades-app/synnefo/logic/dispatcher.py | ||
---|---|---|
43 | 43 |
import os |
44 | 44 |
path = os.path.normpath(os.path.join(os.getcwd(), '..')) |
45 | 45 |
sys.path.append(path) |
46 |
|
|
47 | 46 |
from synnefo import settings |
48 | 47 |
setup_environ(settings) |
49 | 48 |
|
50 | 49 |
import logging |
51 | 50 |
import time |
52 | 51 |
|
52 |
import daemon |
|
53 | 53 |
import daemon.runner |
54 |
import daemon.daemon |
|
55 | 54 |
from lockfile import LockTimeout |
56 |
from signal import signal, SIGINT, SIGTERM |
|
57 |
|
|
58 | 55 |
# Take care of differences between python-daemon versions. |
59 | 56 |
try: |
60 | 57 |
from daemon import pidfile as pidlockfile |
... | ... | |
63 | 60 |
|
64 | 61 |
from synnefo.lib.amqp import AMQPClient |
65 | 62 |
from synnefo.logic import callbacks |
66 |
from synnefo.util.dictconfig import dictConfig |
|
67 |
|
|
68 | 63 |
|
64 |
from synnefo.util.dictconfig import dictConfig |
|
65 |
dictConfig(settings.DISPATCHER_LOGGING) |
|
69 | 66 |
log = logging.getLogger() |
70 | 67 |
|
71 |
|
|
72 | 68 |
# Queue names |
73 | 69 |
QUEUES = [] |
74 | 70 |
|
... | ... | |
181 | 177 |
QUEUES += (QUEUE_DEBUG,) |
182 | 178 |
|
183 | 179 |
|
184 |
def _exit_handler(signum, frame): |
|
185 |
""""Catch exit signal in children processes""" |
|
186 |
log.info("Caught signal %d, will raise SystemExit", signum) |
|
187 |
raise SystemExit |
|
188 |
|
|
189 |
|
|
190 |
def _parent_handler(signum, frame): |
|
191 |
""""Catch exit signal in parent process and forward it to children.""" |
|
192 |
global children |
|
193 |
log.info("Caught signal %d, sending SIGTERM to children %s", |
|
194 |
signum, children) |
|
195 |
[os.kill(pid, SIGTERM) for pid in children] |
|
196 |
|
|
197 |
|
|
198 |
def child(cmdline): |
|
199 |
"""The context of the child process""" |
|
200 |
|
|
201 |
# Cmd line argument parsing |
|
202 |
(opts, args) = parse_arguments(cmdline) |
|
203 |
disp = Dispatcher(debug=opts.debug) |
|
204 |
|
|
205 |
# Start the event loop |
|
206 |
disp.wait() |
|
207 |
|
|
208 |
|
|
209 | 180 |
def parse_arguments(args): |
210 | 181 |
from optparse import OptionParser |
211 | 182 |
|
212 |
default_pid_file = os.path.join("var", "run", "synnefo", "dispatcher.pid") |
|
183 |
default_pid_file = \ |
|
184 |
os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:] |
|
213 | 185 |
parser = OptionParser() |
214 | 186 |
parser.add_option("-d", "--debug", action="store_true", default=False, |
215 | 187 |
dest="debug", help="Enable debug mode") |
... | ... | |
289 | 261 |
client = AMQPClient() |
290 | 262 |
client.connect() |
291 | 263 |
|
292 |
# Register a temporary queue binding |
|
293 |
for binding in BINDINGS: |
|
294 |
if binding[0] == queue: |
|
295 |
exch = binding[1] |
|
296 |
|
|
297 | 264 |
tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc) |
298 | 265 |
|
299 | 266 |
print "Queue draining about to start, hit Ctrl+c when done" |
300 | 267 |
time.sleep(2) |
301 | 268 |
print "Queue draining starting" |
302 | 269 |
|
303 |
signal(SIGTERM, _exit_handler) |
|
304 |
signal(SIGINT, _exit_handler) |
|
305 |
|
|
306 | 270 |
num_processed = 0 |
307 | 271 |
while True: |
308 | 272 |
client.basic_wait() |
... | ... | |
313 | 277 |
client.close() |
314 | 278 |
|
315 | 279 |
|
316 |
|
|
317 | 280 |
def get_user_confirmation(): |
318 | 281 |
ans = raw_input("Are you sure (N/y):") |
319 | 282 |
|
... | ... | |
326 | 289 |
|
327 | 290 |
def debug_mode(): |
328 | 291 |
disp = Dispatcher(debug=True) |
329 |
signal(SIGINT, _exit_handler) |
|
330 |
signal(SIGTERM, _exit_handler) |
|
331 |
|
|
332 | 292 |
disp.wait() |
333 | 293 |
|
334 | 294 |
|
335 | 295 |
def daemon_mode(opts): |
336 |
global children |
|
337 |
|
|
338 |
# Create pidfile, |
|
339 |
pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10) |
|
340 |
|
|
341 |
if daemon.runner.is_pidfile_stale(pidf): |
|
342 |
log.warning("Removing stale PID lock file %s", pidf.path) |
|
343 |
pidf.break_lock() |
|
344 |
|
|
345 |
try: |
|
346 |
pidf.acquire() |
|
347 |
except (pidlockfile.AlreadyLocked, LockTimeout): |
|
348 |
log.critical("Failed to lock pidfile %s, another instance running?", |
|
349 |
pidf.path) |
|
350 |
sys.exit(1) |
|
351 |
|
|
352 |
log.info("Became a daemon") |
|
353 |
|
|
354 |
# Fork workers |
|
355 |
children = [] |
|
356 |
|
|
357 |
i = 0 |
|
358 |
while i < opts.workers: |
|
359 |
newpid = os.fork() |
|
360 |
|
|
361 |
if newpid == 0: |
|
362 |
signal(SIGINT, _exit_handler) |
|
363 |
signal(SIGTERM, _exit_handler) |
|
364 |
child(sys.argv[1:]) |
|
365 |
sys.exit(1) |
|
366 |
else: |
|
367 |
log.debug("%d, forked child: %d", os.getpid(), newpid) |
|
368 |
children.append(newpid) |
|
369 |
i += 1 |
|
370 |
|
|
371 |
# Catch signals to ensure graceful shutdown |
|
372 |
signal(SIGINT, _parent_handler) |
|
373 |
signal(SIGTERM, _parent_handler) |
|
374 |
|
|
375 |
# Wait for all children processes to die, one by one |
|
376 |
try: |
|
377 |
for pid in children: |
|
378 |
try: |
|
379 |
os.waitpid(pid, 0) |
|
380 |
except Exception: |
|
381 |
pass |
|
382 |
finally: |
|
383 |
pidf.release() |
|
296 |
disp = Dispatcher(debug=False) |
|
297 |
disp.wait() |
|
384 | 298 |
|
385 | 299 |
|
386 | 300 |
def main(): |
387 | 301 |
(opts, args) = parse_arguments(sys.argv[1:]) |
388 | 302 |
|
389 |
dictConfig(settings.DISPATCHER_LOGGING) |
|
390 |
|
|
391 |
global log |
|
392 |
|
|
393 | 303 |
# Init the global variables containing the queues |
394 | 304 |
_init_queues() |
395 | 305 |
|
... | ... | |
407 | 317 |
drain_queue(opts.drain_queue) |
408 | 318 |
return |
409 | 319 |
|
410 |
# Debug mode, process messages without spawning workers
|
|
320 |
# Debug mode, process messages without daemonizing
|
|
411 | 321 |
if opts.debug: |
412 | 322 |
debug_mode() |
413 | 323 |
return |
414 | 324 |
|
325 |
# Create pidfile, |
|
326 |
pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10) |
|
327 |
|
|
328 |
if daemon.runner.is_pidfile_stale(pidf): |
|
329 |
log.warning("Removing stale PID lock file %s", pidf.path) |
|
330 |
pidf.break_lock() |
|
331 |
|
|
415 | 332 |
files_preserve = [] |
416 | 333 |
for handler in log.handlers: |
417 | 334 |
stream = getattr(handler, 'stream') |
418 | 335 |
if stream and hasattr(stream, 'fileno'): |
419 | 336 |
files_preserve.append(handler.stream) |
420 | 337 |
|
421 |
daemon_context = daemon.daemon.DaemonContext( |
|
422 |
files_preserve=files_preserve, |
|
423 |
umask=022) |
|
338 |
stderr_stream = None |
|
339 |
for handler in log.handlers: |
|
340 |
stream = getattr(handler, 'stream') |
|
341 |
if stream and hasattr(handler, 'baseFilename'): |
|
342 |
stderr_stream = stream |
|
343 |
break |
|
424 | 344 |
|
425 |
daemon_context.open() |
|
345 |
daemon_context = daemon.DaemonContext( |
|
346 |
pidfile=pidf, |
|
347 |
umask=0022, |
|
348 |
stdout=stderr_stream, |
|
349 |
stderr=stderr_stream, |
|
350 |
files_preserve=files_preserve) |
|
351 |
|
|
352 |
try: |
|
353 |
daemon_context.open() |
|
354 |
except (pidlockfile.AlreadyLocked, LockTimeout): |
|
355 |
log.critical("Failed to lock pidfile %s, another instance running?", |
|
356 |
pidf.path) |
|
357 |
sys.exit(1) |
|
358 |
|
|
359 |
log.info("Became a daemon") |
|
360 |
|
|
361 |
if 'gevent' in sys.modules: |
|
362 |
# A fork() has occured while daemonizing. If running in |
|
363 |
# gevent context we *must* reinit gevent |
|
364 |
log.debug("gevent imported. Reinitializing gevent") |
|
365 |
import gevent |
|
366 |
gevent.reinit() |
|
426 | 367 |
|
427 | 368 |
# Catch every exception, make sure it gets logged properly |
428 | 369 |
try: |
... | ... | |
431 | 372 |
log.exception("Unknown error") |
432 | 373 |
raise |
433 | 374 |
|
434 |
|
|
435 | 375 |
if __name__ == "__main__": |
436 | 376 |
sys.exit(main()) |
437 | 377 |
|
Also available in: Unified diff