#
#
-# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import asyncore
import asynchat
import collections
-import grp
import os
-import pwd
import signal
import logging
import sched
from ganeti import errors
from ganeti import netutils
from ganeti import ssconf
-
-
-_DEFAULT_RUN_USER = "root"
-_DEFAULT_RUN_GROUP = "root"
+from ganeti import runtime
+from ganeti import compat
class SchedulerBreakout(Exception):
"""
def __init__(self, timefunc):
- sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
+ """Initializes this class.
+
+ """
+ sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
+ self._max_delay = None
+
+ def run(self, max_delay=None): # pylint: disable=W0221
+ """Run any pending events.
+
+ @type max_delay: None or number
+ @param max_delay: Maximum delay (useful if caller has timeouts running)
+
+ """
+ assert self._max_delay is None
+
+ # The delay function used by the scheduler can't be different on each run,
+ # hence an instance variable must be used.
+ if max_delay is None:
+ self._max_delay = None
+ else:
+ self._max_delay = utils.RunningTimeout(max_delay, False)
+
+ try:
+ return sched.scheduler.run(self)
+ finally:
+ self._max_delay = None
+
+ def _LimitedDelay(self, duration):
+ """Custom delay function for C{sched.scheduler}.
+
+ """
+ if self._max_delay is None:
+ timeout = duration
+ else:
+ timeout = min(duration, self._max_delay.Remaining())
+
+ return AsyncoreDelayFunction(timeout)
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
"""
if len(payload) > constants.MAX_UDP_DATA_SIZE:
- raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
+ raise errors.UdpDataSizeError("Packet too big: %s > %s" % (len(payload),
constants.MAX_UDP_DATA_SIZE))
self._out_queue.append((ip, port, payload))
"""
GanetiBaseAsyncoreDispatcher.__init__(self)
- assert signal_fn == None or callable(signal_fn)
+ assert signal_fn is None or callable(signal_fn)
(self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
socket.SOCK_STREAM)
self.in_socket.setblocking(0)
# sending more than one wakeup token, which doesn't harm at all.
if self.need_signal:
self.need_signal = False
- self.out_socket.send("\0")
+ self.out_socket.send(chr(0))
+
+
+class _ShutdownCheck:
+ """Logic for L{Mainloop} shutdown.
+
+ """
+ def __init__(self, fn):
+ """Initializes this class.
+
+ @type fn: callable
+ @param fn: Function returning C{None} if mainloop can be stopped or a
+ duration in seconds after which the function should be called again
+ @see: L{Mainloop.Run}
+
+ """
+ assert callable(fn)
+
+ self._fn = fn
+ self._defer = None
+
+ def CanShutdown(self):
+ """Checks whether mainloop can be stopped.
+
+ @rtype: bool
+
+ """
+ if self._defer and self._defer.Remaining() > 0:
+ # A deferred check has already been scheduled
+ return False
+
+ # Ask mainloop driver whether we can stop or should check again
+ timeout = self._fn()
+
+ if timeout is None:
+ # Yes, can stop mainloop
+ return True
+
+ # Schedule another check in the future
+ self._defer = utils.RunningTimeout(timeout, True)
+
+ return False
class Mainloop(object):
timed events
"""
+ _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
+
def __init__(self):
"""Constructs a new Mainloop instance.
self._signal_wait = []
self.scheduler = AsyncoreScheduler(time.time)
+ # Resolve uid/gids used
+ runtime.GetEnts()
+
@utils.SignalHandled([signal.SIGCHLD])
@utils.SignalHandled([signal.SIGTERM])
@utils.SignalHandled([signal.SIGINT])
- def Run(self, signal_handlers=None):
+ def Run(self, shutdown_wait_fn=None, signal_handlers=None):
"""Runs the mainloop.
+ @type shutdown_wait_fn: callable
+ @param shutdown_wait_fn: Function to check whether loop can be terminated;
+ B{important}: function must be idempotent and must return either None
+ for shutting down or a timeout for another call
@type signal_handlers: dict
@param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
assert isinstance(signal_handlers, dict) and \
len(signal_handlers) > 0, \
"Broken SignalHandled decorator"
- running = True
+
+ # Counter for received signals
+ shutdown_signals = 0
+
+ # Logic to wait for shutdown
+ shutdown_waiter = None
+
# Start actual main loop
- while running:
- if not self.scheduler.empty():
+ while True:
+ if shutdown_signals == 1 and shutdown_wait_fn is not None:
+ if shutdown_waiter is None:
+ shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)
+
+ # Let mainloop driver decide if we can already abort
+ if shutdown_waiter.CanShutdown():
+ break
+
+ # Re-evaluate in a second
+ timeout = 1.0
+
+ elif shutdown_signals >= 1:
+ # Abort loop if more than one signal has been sent or no callback has
+ # been given
+ break
+
+ else:
+ # Wait forever on I/O events
+ timeout = None
+
+ if self.scheduler.empty():
+ asyncore.loop(count=1, timeout=timeout, use_poll=True)
+ else:
try:
- self.scheduler.run()
+ self.scheduler.run(max_delay=timeout)
except SchedulerBreakout:
pass
- else:
- asyncore.loop(count=1, use_poll=True)
# Check whether a signal was raised
- for sig in signal_handlers:
- handler = signal_handlers[sig]
+ for (sig, handler) in signal_handlers.items():
if handler.called:
self._CallSignalWaiters(sig)
- running = sig not in (signal.SIGTERM, signal.SIGINT)
+ if sig in (signal.SIGTERM, signal.SIGINT):
+ logging.info("Received signal %s asking for shutdown", sig)
+ shutdown_signals += 1
handler.Clear()
def _CallSignalWaiters(self, signum):
self._signal_wait.append(owner)
-def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
+def _VerifyDaemonUser(daemon_name):
+ """Verifies the process uid matches the configured uid.
+
+ This method verifies that a daemon is started as the user it is
+ intended to be run
+
+ @param daemon_name: The name of daemon to be started
+ @return: A tuple with the first item indicating success or not,
+ the second item current uid and third with expected uid
+
+ """
+ getents = runtime.GetEnts()
+ running_uid = os.getuid()
+ daemon_uids = {
+ constants.MASTERD: getents.masterd_uid,
+ constants.RAPI: getents.rapi_uid,
+ constants.NODED: getents.noded_uid,
+ constants.CONFD: getents.confd_uid,
+ }
+ assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name
+
+ return (daemon_uids[daemon_name] == running_uid, running_uid,
+ daemon_uids[daemon_name])
+
+
+def _BeautifyError(err):
+ """Try to format an error better.
+
+ Since we're dealing with daemon startup errors, in many cases this
+ will be due to socket error and such, so we try to format these cases better.
+
+ @param err: an exception object
+ @rtype: string
+ @return: the formatted error description
+
+ """
+ try:
+ if isinstance(err, socket.error):
+ return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
+ elif isinstance(err, EnvironmentError):
+ if err.filename is None:
+ return "%s (errno=%s)" % (err.strerror, err.errno)
+ else:
+ return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
+ err.errno)
+ else:
+ return str(err)
+ except Exception: # pylint: disable=W0703
+ logging.exception("Error while handling existing error %s", err)
+ return "%s" % str(err)
+
+
+def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
+ """Handler for SIGHUP.
+
+ @param reopen_fn: List of callback functions for reopening log files
+
+ """
+ logging.info("Reopening log files after receiving SIGHUP")
+
+ for fn in reopen_fn:
+ if fn:
+ fn()
+
+
+def GenericMain(daemon_name, optionparser,
+ check_fn, prepare_fn, exec_fn,
multithreaded=False, console_logging=False,
- default_ssl_cert=None, default_ssl_key=None,
- user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
+ default_ssl_cert=None, default_ssl_key=None):
"""Shared main function for daemons.
@type daemon_name: string
@type optionparser: optparse.OptionParser
@param optionparser: initialized optionparser with daemon-specific options
(common -f -d options will be handled by this module)
- @type dirs: list of (string, integer)
- @param dirs: list of directories that must be created if they don't exist,
- and the permissions to be used to create them
@type check_fn: function which accepts (options, args)
@param check_fn: function that checks start conditions and exits if they're
not met
- @type exec_fn: function which accepts (options, args)
+ @type prepare_fn: function which accepts (options, args)
+ @param prepare_fn: function that is run before forking, or None;
+ it's result will be passed as the third parameter to exec_fn, or
+ if None was passed in, we will just pass None to exec_fn
+ @type exec_fn: function which accepts (options, args, prepare_results)
@param exec_fn: function that's executed with the daemon's pid file held, and
runs the daemon itself.
@type multithreaded: bool
@param default_ssl_cert: Default SSL certificate path
@type default_ssl_key: string
@param default_ssl_key: Default SSL key path
- @param user: Default user to run as
- @type user: string
- @param group: Default group to run as
- @type group: string
"""
optionparser.add_option("-f", "--foreground", dest="fork",
default=constants.SYSLOG_USAGE,
choices=["no", "yes", "only"])
+ family = ssconf.SimpleStore().GetPrimaryIPFamily()
+ # family will default to AF_INET if there is no ssconf file (e.g. when
+ # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
+ # <= 2.2 can not be AF_INET6
if daemon_name in constants.DAEMONS_PORTS:
default_bind_address = constants.IP4_ADDRESS_ANY
- family = ssconf.SimpleStore().GetPrimaryIPFamily()
- # family will default to AF_INET if there is no ssconf file (e.g. when
- # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
- # <= 2.2 can not be AF_INET6
if family == netutils.IP6Address.family:
default_bind_address = constants.IP6_ADDRESS_ANY
help=("Bind address (default: '%s')" %
default_bind_address),
default=default_bind_address, metavar="ADDRESS")
+ optionparser.add_option("-i", "--interface", dest="bind_interface",
+ help=("Bind interface"), metavar="INTERFACE")
if default_ssl_key is not None and default_ssl_cert is not None:
optionparser.add_option("--no-ssl", dest="ssl",
metavar="SSL_CERT_PATH")
# Disable the use of fork(2) if the daemon uses threads
- utils.no_fork = multithreaded
+ if multithreaded:
+ utils.DisableFork()
options, args = optionparser.parse_args()
+ if getattr(options, "bind_interface", None) is not None:
+ if options.bind_address != default_bind_address:
+ msg = ("Can't specify both, bind address (%s) and bind interface (%s)" %
+ (options.bind_address, options.bind_interface))
+ print >> sys.stderr, msg
+ sys.exit(constants.EXIT_FAILURE)
+ interface_ip_addresses = \
+ netutils.GetInterfaceIpAddresses(options.bind_interface)
+ if family == netutils.IP6Address.family:
+ if_addresses = interface_ip_addresses[constants.IP6_VERSION]
+ else:
+ if_addresses = interface_ip_addresses[constants.IP4_VERSION]
+ if len(if_addresses) < 1:
+ msg = "Failed to find IP for interface %s" % options.bind_interace
+ print >> sys.stderr, msg
+ sys.exit(constants.EXIT_FAILURE)
+ options.bind_address = if_addresses[0]
+
if getattr(options, "ssl", False):
ssl_paths = {
"certificate": options.ssl_cert,
# once and have a proper validation (isfile returns False on directories)
# at the same time.
+ result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
+ if not result:
+ msg = ("%s started using wrong user ID (%d), expected %d" %
+ (daemon_name, running_uid, expected_uid))
+ print >> sys.stderr, msg
+ sys.exit(constants.EXIT_FAILURE)
+
if check_fn is not None:
check_fn(options, args)
- utils.EnsureDirs(dirs)
+ log_filename = constants.DAEMONS_LOGFILES[daemon_name]
if options.fork:
- try:
- uid = pwd.getpwnam(user).pw_uid
- gid = grp.getgrnam(group).gr_gid
- except KeyError:
- raise errors.ConfigurationError("User or group not existing on system:"
- " %s:%s" % (user, group))
utils.CloseFDs()
- utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
+ (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename)
+ else:
+ (wpipe, stdio_reopen_fn) = (None, None)
- utils.WritePidFile(daemon_name)
- try:
- utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
+ log_reopen_fn = \
+ utils.SetupLogging(log_filename, daemon_name,
debug=options.debug,
stderr_logging=not options.fork,
multithreaded=multithreaded,
- program=daemon_name,
syslog=options.syslog,
console_logging=console_logging)
- logging.info("%s daemon startup", daemon_name)
- exec_fn(options, args)
+
+ # Reopen log file(s) on SIGHUP
+ signal.signal(signal.SIGHUP,
+ compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
+
+ try:
+ utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
+ except errors.PidFileLockError, err:
+ print >> sys.stderr, "Error while locking PID file:\n%s" % err
+ sys.exit(constants.EXIT_FAILURE)
+
+ try:
+ try:
+ logging.info("%s daemon startup", daemon_name)
+ if callable(prepare_fn):
+ prep_results = prepare_fn(options, args)
+ else:
+ prep_results = None
+ except Exception, err:
+ utils.WriteErrorToFD(wpipe, _BeautifyError(err))
+ raise
+
+ if wpipe is not None:
+ # we're done with the preparation phase, we close the pipe to
+ # let the parent know it's safe to exit
+ os.close(wpipe)
+
+ exec_fn(options, args, prep_results)
finally:
- utils.RemovePidFile(daemon_name)
+ utils.RemoveFile(utils.DaemonPidFileName(daemon_name))