Revision 4936f2e2 snf-cyclades-app/synnefo/logic/dispatcher.py

b/snf-cyclades-app/synnefo/logic/dispatcher.py
43 43
import os
44 44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45 45
sys.path.append(path)
46

  
47 46
from synnefo import settings
48 47
setup_environ(settings)
49 48

  
50 49
import logging
51 50
import time
52 51

  
52
import daemon
53 53
import daemon.runner
54
import daemon.daemon
55 54
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57

  
58 55
# Take care of differences between python-daemon versions.
59 56
try:
60 57
    from daemon import pidfile as pidlockfile
......
63 60

  
64 61
from synnefo.lib.amqp import AMQPClient
65 62
from synnefo.logic import callbacks
66
from synnefo.util.dictconfig import dictConfig
67

  
68 63

  
64
from synnefo.util.dictconfig import dictConfig
65
dictConfig(settings.DISPATCHER_LOGGING)
69 66
log = logging.getLogger()
70 67

  
71

  
72 68
# Queue names
73 69
QUEUES = []
74 70

  
......
181 177
        QUEUES += (QUEUE_DEBUG,)
182 178

  
183 179

  
184
def _exit_handler(signum, frame):
185
    """"Catch exit signal in children processes"""
186
    log.info("Caught signal %d, will raise SystemExit", signum)
187
    raise SystemExit
188

  
189

  
190
def _parent_handler(signum, frame):
191
    """"Catch exit signal in parent process and forward it to children."""
192
    global children
193
    log.info("Caught signal %d, sending SIGTERM to children %s",
194
             signum, children)
195
    [os.kill(pid, SIGTERM) for pid in children]
196

  
197

  
198
def child(cmdline):
199
    """The context of the child process"""
200

  
201
    # Cmd line argument parsing
202
    (opts, args) = parse_arguments(cmdline)
203
    disp = Dispatcher(debug=opts.debug)
204

  
205
    # Start the event loop
206
    disp.wait()
207

  
208

  
209 180
def parse_arguments(args):
210 181
    from optparse import OptionParser
211 182

  
212
    default_pid_file = os.path.join("var", "run", "synnefo", "dispatcher.pid")
183
    default_pid_file = \
184
        os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:]
213 185
    parser = OptionParser()
214 186
    parser.add_option("-d", "--debug", action="store_true", default=False,
215 187
                      dest="debug", help="Enable debug mode")
......
289 261
    client = AMQPClient()
290 262
    client.connect()
291 263

  
292
    # Register a temporary queue binding
293
    for binding in BINDINGS:
294
        if binding[0] == queue:
295
            exch = binding[1]
296

  
297 264
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
298 265

  
299 266
    print "Queue draining about to start, hit Ctrl+c when done"
300 267
    time.sleep(2)
301 268
    print "Queue draining starting"
302 269

  
303
    signal(SIGTERM, _exit_handler)
304
    signal(SIGINT, _exit_handler)
305

  
306 270
    num_processed = 0
307 271
    while True:
308 272
        client.basic_wait()
......
313 277
    client.close()
314 278

  
315 279

  
316

  
317 280
def get_user_confirmation():
318 281
    ans = raw_input("Are you sure (N/y):")
319 282

  
......
326 289

  
327 290
def debug_mode():
328 291
    disp = Dispatcher(debug=True)
329
    signal(SIGINT, _exit_handler)
330
    signal(SIGTERM, _exit_handler)
331

  
332 292
    disp.wait()
333 293

  
334 294

  
335 295
def daemon_mode(opts):
336
    global children
337

  
338
    # Create pidfile,
339
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
340

  
341
    if daemon.runner.is_pidfile_stale(pidf):
342
        log.warning("Removing stale PID lock file %s", pidf.path)
343
        pidf.break_lock()
344

  
345
    try:
346
        pidf.acquire()
347
    except (pidlockfile.AlreadyLocked, LockTimeout):
348
        log.critical("Failed to lock pidfile %s, another instance running?",
349
                     pidf.path)
350
        sys.exit(1)
351

  
352
    log.info("Became a daemon")
353

  
354
    # Fork workers
355
    children = []
356

  
357
    i = 0
358
    while i < opts.workers:
359
        newpid = os.fork()
360

  
361
        if newpid == 0:
362
            signal(SIGINT, _exit_handler)
363
            signal(SIGTERM, _exit_handler)
364
            child(sys.argv[1:])
365
            sys.exit(1)
366
        else:
367
            log.debug("%d, forked child: %d", os.getpid(), newpid)
368
            children.append(newpid)
369
        i += 1
370

  
371
    # Catch signals to ensure graceful shutdown
372
    signal(SIGINT, _parent_handler)
373
    signal(SIGTERM, _parent_handler)
374

  
375
    # Wait for all children processes to die, one by one
376
    try:
377
        for pid in children:
378
            try:
379
                os.waitpid(pid, 0)
380
            except Exception:
381
                pass
382
    finally:
383
        pidf.release()
296
    disp = Dispatcher(debug=False)
297
    disp.wait()
384 298

  
385 299

  
386 300
def main():
387 301
    (opts, args) = parse_arguments(sys.argv[1:])
388 302

  
389
    dictConfig(settings.DISPATCHER_LOGGING)
390

  
391
    global log
392

  
393 303
    # Init the global variables containing the queues
394 304
    _init_queues()
395 305

  
......
407 317
        drain_queue(opts.drain_queue)
408 318
        return
409 319

  
410
    # Debug mode, process messages without spawning workers
320
    # Debug mode, process messages without daemonizing
411 321
    if opts.debug:
412 322
        debug_mode()
413 323
        return
414 324

  
325
    # Create pidfile,
326
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
327

  
328
    if daemon.runner.is_pidfile_stale(pidf):
329
        log.warning("Removing stale PID lock file %s", pidf.path)
330
        pidf.break_lock()
331

  
415 332
    files_preserve = []
416 333
    for handler in log.handlers:
417 334
        stream = getattr(handler, 'stream')
418 335
        if stream and hasattr(stream, 'fileno'):
419 336
            files_preserve.append(handler.stream)
420 337

  
421
    daemon_context = daemon.daemon.DaemonContext(
422
        files_preserve=files_preserve,
423
        umask=022)
338
    stderr_stream = None
339
    for handler in log.handlers:
340
        stream = getattr(handler, 'stream')
341
        if stream and hasattr(handler, 'baseFilename'):
342
            stderr_stream = stream
343
            break
424 344

  
425
    daemon_context.open()
345
    daemon_context = daemon.DaemonContext(
346
        pidfile=pidf,
347
        umask=0022,
348
        stdout=stderr_stream,
349
        stderr=stderr_stream,
350
        files_preserve=files_preserve)
351

  
352
    try:
353
        daemon_context.open()
354
    except (pidlockfile.AlreadyLocked, LockTimeout):
355
        log.critical("Failed to lock pidfile %s, another instance running?",
356
                     pidf.path)
357
        sys.exit(1)
358

  
359
    log.info("Became a daemon")
360

  
361
    if 'gevent' in sys.modules:
362
        # A fork() has occured while daemonizing. If running in
363
        # gevent context we *must* reinit gevent
364
        log.debug("gevent imported. Reinitializing gevent")
365
        import gevent
366
        gevent.reinit()
426 367

  
427 368
    # Catch every exception, make sure it gets logged properly
428 369
    try:
......
431 372
        log.exception("Unknown error")
432 373
        raise
433 374

  
434

  
435 375
if __name__ == "__main__":
436 376
    sys.exit(main())
437 377

  

Also available in: Unified diff