Revision 3f018af1

b/snf-cyclades-app/synnefo/logic/dispatcher.py
43 43
import os
44 44
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
45 45
sys.path.append(path)
46

  
47 46
from synnefo import settings
48 47
setup_environ(settings)
49 48

  
50 49
import logging
51 50
import time
52 51

  
52
import daemon
53 53
import daemon.runner
54
import daemon.daemon
55 54
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57

  
58 55
# Take care of differences between python-daemon versions.
59 56
try:
60 57
    from daemon import pidfile as pidlockfile
......
63 60

  
64 61
from synnefo.lib.amqp import AMQPClient
65 62
from synnefo.logic import callbacks
66
from synnefo.util.dictconfig import dictConfig
67

  
68 63

  
64
from synnefo.util.dictconfig import dictConfig
65
dictConfig(settings.DISPATCHER_LOGGING)
69 66
log = logging.getLogger()
70 67

  
71

  
72 68
# Queue names
73 69
QUEUES = []
74 70

  
......
179 175
        QUEUES += (QUEUE_DEBUG,)
180 176

  
181 177

  
182
def _exit_handler(signum, frame):
183
    """"Catch exit signal in children processes"""
184
    log.info("Caught signal %d, will raise SystemExit", signum)
185
    raise SystemExit
186

  
187

  
188
def _parent_handler(signum, frame):
189
    """"Catch exit signal in parent process and forward it to children."""
190
    global children
191
    log.info("Caught signal %d, sending SIGTERM to children %s",
192
             signum, children)
193
    [os.kill(pid, SIGTERM) for pid in children]
194

  
195

  
196
def child(cmdline):
197
    """The context of the child process"""
198

  
199
    # Cmd line argument parsing
200
    (opts, args) = parse_arguments(cmdline)
201
    disp = Dispatcher(debug=opts.debug)
202

  
203
    # Start the event loop
204
    disp.wait()
205

  
206

  
207 178
def parse_arguments(args):
208 179
    from optparse import OptionParser
209 180

  
210
    default_pid_file = os.path.join("var", "run", "synnefo", "dispatcher.pid")
181
    default_pid_file = \
182
        os.path.join(".", "var", "run", "synnefo", "dispatcher.pid")[1:]
211 183
    parser = OptionParser()
212 184
    parser.add_option("-d", "--debug", action="store_true", default=False,
213 185
                      dest="debug", help="Enable debug mode")
......
287 259
    client = AMQPClient()
288 260
    client.connect()
289 261

  
290
    # Register a temporary queue binding
291
    for binding in BINDINGS:
292
        if binding[0] == queue:
293
            exch = binding[1]
294

  
295 262
    tag = client.basic_consume(queue=queue, callback=callbacks.dummy_proc)
296 263

  
297 264
    print "Queue draining about to start, hit Ctrl+c when done"
298 265
    time.sleep(2)
299 266
    print "Queue draining starting"
300 267

  
301
    signal(SIGTERM, _exit_handler)
302
    signal(SIGINT, _exit_handler)
303

  
304 268
    num_processed = 0
305 269
    while True:
306 270
        client.basic_wait()
......
311 275
    client.close()
312 276

  
313 277

  
314

  
315 278
def get_user_confirmation():
316 279
    ans = raw_input("Are you sure (N/y):")
317 280

  
......
324 287

  
325 288
def debug_mode():
326 289
    disp = Dispatcher(debug=True)
327
    signal(SIGINT, _exit_handler)
328
    signal(SIGTERM, _exit_handler)
329

  
330 290
    disp.wait()
331 291

  
332 292

  
333 293
def daemon_mode(opts):
334
    global children
335

  
336
    # Create pidfile,
337
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
338

  
339
    if daemon.runner.is_pidfile_stale(pidf):
340
        log.warning("Removing stale PID lock file %s", pidf.path)
341
        pidf.break_lock()
342

  
343
    try:
344
        pidf.acquire()
345
    except (pidlockfile.AlreadyLocked, LockTimeout):
346
        log.critical("Failed to lock pidfile %s, another instance running?",
347
                     pidf.path)
348
        sys.exit(1)
349

  
350
    log.info("Became a daemon")
351

  
352
    # Fork workers
353
    children = []
354

  
355
    i = 0
356
    while i < opts.workers:
357
        newpid = os.fork()
358

  
359
        if newpid == 0:
360
            signal(SIGINT, _exit_handler)
361
            signal(SIGTERM, _exit_handler)
362
            child(sys.argv[1:])
363
            sys.exit(1)
364
        else:
365
            log.debug("%d, forked child: %d", os.getpid(), newpid)
366
            children.append(newpid)
367
        i += 1
368

  
369
    # Catch signals to ensure graceful shutdown
370
    signal(SIGINT, _parent_handler)
371
    signal(SIGTERM, _parent_handler)
372

  
373
    # Wait for all children processes to die, one by one
374
    try:
375
        for pid in children:
376
            try:
377
                os.waitpid(pid, 0)
378
            except Exception:
379
                pass
380
    finally:
381
        pidf.release()
294
    disp = Dispatcher(debug=False)
295
    disp.wait()
382 296

  
383 297

  
384 298
def main():
385 299
    (opts, args) = parse_arguments(sys.argv[1:])
386 300

  
387
    dictConfig(settings.DISPATCHER_LOGGING)
388

  
389
    global log
390

  
391 301
    # Init the global variables containing the queues
392 302
    _init_queues()
393 303

  
......
405 315
        drain_queue(opts.drain_queue)
406 316
        return
407 317

  
408
    # Debug mode, process messages without spawning workers
318
    # Debug mode, process messages without daemonizing
409 319
    if opts.debug:
410 320
        debug_mode()
411 321
        return
412 322

  
323
    # Create pidfile,
324
    pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
325

  
326
    if daemon.runner.is_pidfile_stale(pidf):
327
        log.warning("Removing stale PID lock file %s", pidf.path)
328
        pidf.break_lock()
329

  
413 330
    files_preserve = []
414 331
    for handler in log.handlers:
415 332
        stream = getattr(handler, 'stream')
416 333
        if stream and hasattr(stream, 'fileno'):
417 334
            files_preserve.append(handler.stream)
418 335

  
419
    daemon_context = daemon.daemon.DaemonContext(
420
        files_preserve=files_preserve,
421
        umask=022)
336
    stderr_stream = None
337
    for handler in log.handlers:
338
        stream = getattr(handler, 'stream')
339
        if stream and hasattr(handler, 'baseFilename'):
340
            stderr_stream = stream
341
            break
422 342

  
423
    daemon_context.open()
343
    daemon_context = daemon.DaemonContext(
344
        pidfile=pidf,
345
        umask=0022,
346
        stdout=stderr_stream,
347
        stderr=stderr_stream,
348
        files_preserve=files_preserve)
349

  
350
    try:
351
        daemon_context.open()
352
    except (pidlockfile.AlreadyLocked, LockTimeout):
353
        log.critical("Failed to lock pidfile %s, another instance running?",
354
                     pidf.path)
355
        sys.exit(1)
356

  
357
    log.info("Became a daemon")
358

  
359
    if 'gevent' in sys.modules:
360
        # A fork() has occured while daemonizing. If running in
361
        # gevent context we *must* reinit gevent
362
        log.debug("gevent imported. Reinitializing gevent")
363
        import gevent
364
        gevent.reinit()
424 365

  
425 366
    # Catch every exception, make sure it gets logged properly
426 367
    try:
......
429 370
        log.exception("Unknown error")
430 371
        raise
431 372

  
432

  
433 373
if __name__ == "__main__":
434 374
    sys.exit(main())
435 375

  

Also available in: Unified diff