#
#
-# Copyright (C) 2006, 2007, 2008, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
"""
def __init__(self, timefunc):
- sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
+ """Initializes this class.
+
+ """
+ sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
+ self._max_delay = None
+
+ def run(self, max_delay=None): # pylint: disable=W0221
+ """Run any pending events.
+
+ @type max_delay: None or number
+ @param max_delay: Maximum delay (useful if caller has timeouts running)
+
+ """
+ assert self._max_delay is None
+
+ # The delay function used by the scheduler can't be different on each run,
+ # hence an instance variable must be used.
+ if max_delay is None:
+ self._max_delay = None
+ else:
+ self._max_delay = utils.RunningTimeout(max_delay, False)
+
+ try:
+ return sched.scheduler.run(self)
+ finally:
+ self._max_delay = None
+
+ def _LimitedDelay(self, duration):
+ """Custom delay function for C{sched.scheduler}.
+
+ """
+ if self._max_delay is None:
+ timeout = duration
+ else:
+ timeout = min(duration, self._max_delay.Remaining())
+
+ return AsyncoreDelayFunction(timeout)
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
"""
if len(payload) > constants.MAX_UDP_DATA_SIZE:
- raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
+ raise errors.UdpDataSizeError("Packet too big: %s > %s" % (len(payload),
constants.MAX_UDP_DATA_SIZE))
self._out_queue.append((ip, port, payload))
"""
GanetiBaseAsyncoreDispatcher.__init__(self)
- assert signal_fn == None or callable(signal_fn)
+ assert signal_fn is None or callable(signal_fn)
(self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
socket.SOCK_STREAM)
self.in_socket.setblocking(0)
self.out_socket.send("\0")
+class _ShutdownCheck:
+ """Logic for L{Mainloop} shutdown.
+
+ """
+ def __init__(self, fn):
+ """Initializes this class.
+
+ @type fn: callable
+ @param fn: Function returning C{None} if mainloop can be stopped or a
+ duration in seconds after which the function should be called again
+ @see: L{Mainloop.Run}
+
+ """
+ assert callable(fn)
+
+ self._fn = fn
+ self._defer = None
+
+ def CanShutdown(self):
+ """Checks whether mainloop can be stopped.
+
+ @rtype: bool
+
+ """
+ if self._defer and self._defer.Remaining() > 0:
+ # A deferred check has already been scheduled
+ return False
+
+ # Ask mainloop driver whether we can stop or should check again
+ timeout = self._fn()
+
+ if timeout is None:
+ # Yes, can stop mainloop
+ return True
+
+ # Schedule another check in the future
+ self._defer = utils.RunningTimeout(timeout, True)
+
+ return False
+
+
class Mainloop(object):
"""Generic mainloop for daemons
timed events
"""
+ _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
+
def __init__(self):
"""Constructs a new Mainloop instance.
@utils.SignalHandled([signal.SIGCHLD])
@utils.SignalHandled([signal.SIGTERM])
@utils.SignalHandled([signal.SIGINT])
- def Run(self, signal_handlers=None):
+ def Run(self, shutdown_wait_fn=None, signal_handlers=None):
"""Runs the mainloop.
+ @type shutdown_wait_fn: callable
+ @param shutdown_wait_fn: Function to check whether loop can be terminated;
+ B{important}: function must be idempotent and must return either None
+ for shutting down or a timeout for another call
@type signal_handlers: dict
@param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
assert isinstance(signal_handlers, dict) and \
len(signal_handlers) > 0, \
"Broken SignalHandled decorator"
- running = True
+
+ # Counter for received signals
+ shutdown_signals = 0
+
+ # Logic to wait for shutdown
+ shutdown_waiter = None
# Start actual main loop
- while running:
- if not self.scheduler.empty():
+ while True:
+ if shutdown_signals == 1 and shutdown_wait_fn is not None:
+ if shutdown_waiter is None:
+ shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)
+
+ # Let mainloop driver decide if we can already abort
+ if shutdown_waiter.CanShutdown():
+ break
+
+ # Re-evaluate in a second
+ timeout = 1.0
+
+ elif shutdown_signals >= 1:
+ # Abort loop if more than one signal has been sent or no callback has
+ # been given
+ break
+
+ else:
+ # Wait forever on I/O events
+ timeout = None
+
+ if self.scheduler.empty():
+ asyncore.loop(count=1, timeout=timeout, use_poll=True)
+ else:
try:
- self.scheduler.run()
+ self.scheduler.run(max_delay=timeout)
except SchedulerBreakout:
pass
- else:
- asyncore.loop(count=1, use_poll=True)
# Check whether a signal was raised
- for sig in signal_handlers:
- handler = signal_handlers[sig]
+ for (sig, handler) in signal_handlers.items():
if handler.called:
self._CallSignalWaiters(sig)
- running = sig not in (signal.SIGTERM, signal.SIGINT)
+ if sig in (signal.SIGTERM, signal.SIGINT):
+ logging.info("Received signal %s asking for shutdown", sig)
+ shutdown_signals += 1
handler.Clear()
def _CallSignalWaiters(self, signum):
constants.NODED: getents.noded_uid,
constants.CONFD: getents.confd_uid,
}
+ assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name
return (daemon_uids[daemon_name] == running_uid, running_uid,
daemon_uids[daemon_name])
if check_fn is not None:
check_fn(options, args)
+ log_filename = constants.DAEMONS_LOGFILES[daemon_name]
+
if options.fork:
utils.CloseFDs()
- (wpipe, stdio_reopen_fn) = \
- utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
+ (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename)
else:
(wpipe, stdio_reopen_fn) = (None, None)
log_reopen_fn = \
- utils.SetupLogging(constants.DAEMONS_LOGFILES[daemon_name], daemon_name,
+ utils.SetupLogging(log_filename, daemon_name,
debug=options.debug,
stderr_logging=not options.fork,
multithreaded=multithreaded,
signal.signal(signal.SIGHUP,
compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
- utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
+ try:
+ utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
+ except errors.PidFileLockError, err:
+ print >> sys.stderr, "Error while locking PID file:\n%s" % err
+ sys.exit(constants.EXIT_FAILURE)
+
try:
try:
logging.info("%s daemon startup", daemon_name)