#
#
-# Copyright (C) 2006, 2007, 2008, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
"""
def __init__(self, timefunc):
- sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
+ """Initializes this class.
+
+ """
+ sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
+ self._max_delay = None
+
+ def run(self, max_delay=None): # pylint: disable=W0221
+ """Run any pending events.
+
+ @type max_delay: None or number
+ @param max_delay: Maximum delay (useful if caller has timeouts running)
+
+ """
+ assert self._max_delay is None
+
+ # The delay function used by the scheduler can't be different on each run,
+ # hence an instance variable must be used.
+ if max_delay is None:
+ self._max_delay = None
+ else:
+ self._max_delay = utils.RunningTimeout(max_delay, False)
+
+ try:
+ return sched.scheduler.run(self)
+ finally:
+ self._max_delay = None
+
+ def _LimitedDelay(self, duration):
+ """Custom delay function for C{sched.scheduler}.
+
+ """
+ if self._max_delay is None:
+ timeout = duration
+ else:
+ timeout = min(duration, self._max_delay.Remaining())
+
+ return AsyncoreDelayFunction(timeout)
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
"""
if len(payload) > constants.MAX_UDP_DATA_SIZE:
- raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
+ raise errors.UdpDataSizeError("Packet too big: %s > %s" % (len(payload),
constants.MAX_UDP_DATA_SIZE))
self._out_queue.append((ip, port, payload))
"""
GanetiBaseAsyncoreDispatcher.__init__(self)
- assert signal_fn == None or callable(signal_fn)
+ assert signal_fn is None or callable(signal_fn)
(self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
socket.SOCK_STREAM)
self.in_socket.setblocking(0)
# sending more than one wakeup token, which doesn't harm at all.
if self.need_signal:
self.need_signal = False
- self.out_socket.send("\0")
+ self.out_socket.send(chr(0))
+
+
+class _ShutdownCheck:
+ """Logic for L{Mainloop} shutdown.
+
+ """
+ def __init__(self, fn):
+ """Initializes this class.
+
+ @type fn: callable
+ @param fn: Function returning C{None} if mainloop can be stopped or a
+ duration in seconds after which the function should be called again
+ @see: L{Mainloop.Run}
+
+ """
+ assert callable(fn)
+
+ self._fn = fn
+ self._defer = None
+
+ def CanShutdown(self):
+ """Checks whether mainloop can be stopped.
+
+ @rtype: bool
+
+ """
+ if self._defer and self._defer.Remaining() > 0:
+ # A deferred check has already been scheduled
+ return False
+
+ # Ask mainloop driver whether we can stop or should check again
+ timeout = self._fn()
+
+ if timeout is None:
+ # Yes, can stop mainloop
+ return True
+
+ # Schedule another check in the future
+ self._defer = utils.RunningTimeout(timeout, True)
+
+ return False
class Mainloop(object):
timed events
"""
+ _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
+
def __init__(self):
"""Constructs a new Mainloop instance.
@utils.SignalHandled([signal.SIGCHLD])
@utils.SignalHandled([signal.SIGTERM])
@utils.SignalHandled([signal.SIGINT])
- def Run(self, signal_handlers=None):
+ def Run(self, shutdown_wait_fn=None, signal_handlers=None):
"""Runs the mainloop.
+ @type shutdown_wait_fn: callable
+ @param shutdown_wait_fn: Function to check whether loop can be terminated;
+ B{important}: function must be idempotent and must return either None
+ for shutting down or a timeout for another call
@type signal_handlers: dict
@param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
assert isinstance(signal_handlers, dict) and \
len(signal_handlers) > 0, \
"Broken SignalHandled decorator"
- running = True
+
+ # Counter for received signals
+ shutdown_signals = 0
+
+ # Logic to wait for shutdown
+ shutdown_waiter = None
# Start actual main loop
- while running:
- if not self.scheduler.empty():
+ while True:
+ if shutdown_signals == 1 and shutdown_wait_fn is not None:
+ if shutdown_waiter is None:
+ shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)
+
+ # Let mainloop driver decide if we can already abort
+ if shutdown_waiter.CanShutdown():
+ break
+
+ # Re-evaluate in a second
+ timeout = 1.0
+
+ elif shutdown_signals >= 1:
+ # Abort loop if more than one signal has been sent or no callback has
+ # been given
+ break
+
+ else:
+ # Wait forever on I/O events
+ timeout = None
+
+ if self.scheduler.empty():
+ asyncore.loop(count=1, timeout=timeout, use_poll=True)
+ else:
try:
- self.scheduler.run()
+ self.scheduler.run(max_delay=timeout)
except SchedulerBreakout:
pass
- else:
- asyncore.loop(count=1, use_poll=True)
# Check whether a signal was raised
- for sig in signal_handlers:
- handler = signal_handlers[sig]
+ for (sig, handler) in signal_handlers.items():
if handler.called:
self._CallSignalWaiters(sig)
- running = sig not in (signal.SIGTERM, signal.SIGINT)
+ if sig in (signal.SIGTERM, signal.SIGINT):
+ logging.info("Received signal %s asking for shutdown", sig)
+ shutdown_signals += 1
handler.Clear()
def _CallSignalWaiters(self, signum):
constants.NODED: getents.noded_uid,
constants.CONFD: getents.confd_uid,
}
+ assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name
return (daemon_uids[daemon_name] == running_uid, running_uid,
daemon_uids[daemon_name])
err.errno)
else:
return str(err)
- except Exception: # pylint: disable-msg=W0703
+ except Exception: # pylint: disable=W0703
logging.exception("Error while handling existing error %s", err)
return "%s" % str(err)
-def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable-msg=W0613
+def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
"""Handler for SIGHUP.
@param reopen_fn: List of callback functions for reopening log files
default=constants.SYSLOG_USAGE,
choices=["no", "yes", "only"])
+ family = ssconf.SimpleStore().GetPrimaryIPFamily()
+ # family will default to AF_INET if there is no ssconf file (e.g. when
+ # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
+ # <= 2.2 can not be AF_INET6
if daemon_name in constants.DAEMONS_PORTS:
default_bind_address = constants.IP4_ADDRESS_ANY
- family = ssconf.SimpleStore().GetPrimaryIPFamily()
- # family will default to AF_INET if there is no ssconf file (e.g. when
- # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
- # <= 2.2 can not be AF_INET6
if family == netutils.IP6Address.family:
default_bind_address = constants.IP6_ADDRESS_ANY
help=("Bind address (default: '%s')" %
default_bind_address),
default=default_bind_address, metavar="ADDRESS")
+ optionparser.add_option("-i", "--interface", dest="bind_interface",
+ help=("Bind interface"), metavar="INTERFACE")
if default_ssl_key is not None and default_ssl_cert is not None:
optionparser.add_option("--no-ssl", dest="ssl",
options, args = optionparser.parse_args()
+ if getattr(options, "bind_interface", None) is not None:
+ if options.bind_address != default_bind_address:
+ msg = ("Can't specify both, bind address (%s) and bind interface (%s)" %
+ (options.bind_address, options.bind_interface))
+ print >> sys.stderr, msg
+ sys.exit(constants.EXIT_FAILURE)
+ interface_ip_addresses = \
+ netutils.GetInterfaceIpAddresses(options.bind_interface)
+ if family == netutils.IP6Address.family:
+ if_addresses = interface_ip_addresses[constants.IP6_VERSION]
+ else:
+ if_addresses = interface_ip_addresses[constants.IP4_VERSION]
+ if len(if_addresses) < 1:
+ msg = "Failed to find IP for interface %s" % options.bind_interace
+ print >> sys.stderr, msg
+ sys.exit(constants.EXIT_FAILURE)
+ options.bind_address = if_addresses[0]
+
if getattr(options, "ssl", False):
ssl_paths = {
"certificate": options.ssl_cert,
if check_fn is not None:
check_fn(options, args)
+ log_filename = constants.DAEMONS_LOGFILES[daemon_name]
+
if options.fork:
utils.CloseFDs()
- (wpipe, stdio_reopen_fn) = \
- utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
+ (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename)
else:
(wpipe, stdio_reopen_fn) = (None, None)
log_reopen_fn = \
- utils.SetupLogging(constants.DAEMONS_LOGFILES[daemon_name], daemon_name,
+ utils.SetupLogging(log_filename, daemon_name,
debug=options.debug,
stderr_logging=not options.fork,
multithreaded=multithreaded,
signal.signal(signal.SIGHUP,
compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
- utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
+ try:
+ utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
+ except errors.PidFileLockError, err:
+ print >> sys.stderr, "Error while locking PID file:\n%s" % err
+ sys.exit(constants.EXIT_FAILURE)
+
try:
try:
logging.info("%s daemon startup", daemon_name)