#
#
-# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
"""
-# pylint: disable-msg=C0103,W0142
-
-# C0103: Invalid name ganeti-watcher
-
import os
import os.path
import sys
import time
import logging
+import operator
+import errno
from optparse import OptionParser
from ganeti import utils
from ganeti import constants
from ganeti import compat
-from ganeti import serializer
from ganeti import errors
from ganeti import opcodes
from ganeti import cli
from ganeti import luxi
-from ganeti import ssconf
-from ganeti import bdev
-from ganeti import hypervisor
from ganeti import rapi
-from ganeti.confd import client as confd_client
from ganeti import netutils
+from ganeti import qlang
+from ganeti import objects
+from ganeti import ssconf
+from ganeti import ht
+from ganeti import pathutils
-import ganeti.rapi.client # pylint: disable-msg=W0611
+import ganeti.rapi.client # pylint: disable=W0611
+from ganeti.rapi.client import UsesRapiClient
+
+from ganeti.watcher import nodemaint
+from ganeti.watcher import state
MAXTRIES = 5
-# Delete any record that is older than 8 hours; this value is based on
-# the fact that the current retry counter is 5, and watcher runs every
-# 5 minutes, so it takes around half an hour to exceed the retry
-# counter, so 8 hours (16*1/2h) seems like a reasonable reset time
-RETRY_EXPIRATION = 8 * 3600
-BAD_STATES = ['ERROR_down']
-HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
-NOTICE = 'NOTICE'
-ERROR = 'ERROR'
-KEY_RESTART_COUNT = "restart_count"
-KEY_RESTART_WHEN = "restart_when"
-KEY_BOOT_ID = "bootid"
+BAD_STATES = compat.UniqueFrozenset([
+ constants.INSTST_ERRORDOWN,
+ ])
+HELPLESS_STATES = compat.UniqueFrozenset([
+ constants.INSTST_NODEDOWN,
+ constants.INSTST_NODEOFFLINE,
+ ])
+NOTICE = "NOTICE"
+ERROR = "ERROR"
+#: Number of seconds to wait between starting child processes for node groups
+CHILD_PROCESS_DELAY = 1.0
-# Global LUXI client object
-client = None
+#: How many seconds to wait for instance status file lock
+INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
class NotMasterError(errors.GenericError):
"""Check whether we should pause.
"""
- return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
+ return bool(utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE))
def StartNodeDaemons():
# on master or not, try to start the node daemon
utils.EnsureDaemon(constants.NODED)
# start confd as well. On non candidates it will be in disabled mode.
- utils.EnsureDaemon(constants.CONFD)
+ if constants.ENABLE_CONFD:
+ utils.EnsureDaemon(constants.CONFD)
+ # start mond as well: all nodes need monitoring
+ if constants.ENABLE_MOND:
+ utils.EnsureDaemon(constants.MOND)
def RunWatcherHooks():
"""Run the watcher hooks.
"""
- hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
+ hooks_dir = utils.PathJoin(pathutils.HOOKS_BASE_DIR,
constants.HOOKS_NAME_WATCHER)
if not os.path.isdir(hooks_dir):
return
try:
results = utils.RunParts(hooks_dir)
- except Exception: # pylint: disable-msg=W0703
- logging.exception("RunParts %s failed: %s", hooks_dir)
+ except Exception, err: # pylint: disable=W0703
+ logging.exception("RunParts %s failed: %s", hooks_dir, err)
return
for (relname, status, runresult) in results:
else:
logging.debug("Watcher hook %s: success (output: %s)", relname,
runresult.output)
+ else:
+ raise errors.ProgrammerError("Unknown status %s returned by RunParts",
+ status)
-class NodeMaintenance(object):
- """Talks to confd daemons and possible shutdown instances/drbd devices.
+class Instance(object):
+ """Abstraction for a Virtual Machine instance.
"""
- def __init__(self):
- self.store_cb = confd_client.StoreResultCallback()
- self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
- self.confd_client = confd_client.GetConfdClient(self.filter_cb)
+ def __init__(self, name, status, disks_active, snodes):
+ self.name = name
+ self.status = status
+ self.disks_active = disks_active
+ self.snodes = snodes
- @staticmethod
- def ShouldRun():
- """Checks whether node maintenance should run.
+ def Restart(self, cl):
+ """Encapsulates the start of an instance.
"""
- try:
- return ssconf.SimpleStore().GetMaintainNodeHealth()
- except errors.ConfigurationError, err:
- logging.error("Configuration error, not activating node maintenance: %s",
- err)
- return False
+ op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
+ cli.SubmitOpCode(op, cl=cl)
- @staticmethod
- def GetRunningInstances():
- """Compute list of hypervisor/running instances.
+ def ActivateDisks(self, cl):
+ """Encapsulates the activation of all disks of an instance.
"""
- hyp_list = ssconf.SimpleStore().GetHypervisorList()
- results = []
- for hv_name in hyp_list:
- try:
- hv = hypervisor.GetHypervisor(hv_name)
- ilist = hv.ListInstances()
- results.extend([(iname, hv_name) for iname in ilist])
- except: # pylint: disable-msg=W0702
- logging.error("Error while listing instances for hypervisor %s",
- hv_name, exc_info=True)
- return results
-
- @staticmethod
- def GetUsedDRBDs():
- """Get list of used DRBD minors.
+ op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
+ cli.SubmitOpCode(op, cl=cl)
- """
- return bdev.DRBD8.GetUsedDevs().keys()
- @classmethod
- def DoMaintenance(cls, role):
- """Maintain the instance list.
+class Node:
+ """Data container representing cluster node.
+
+ """
+ def __init__(self, name, bootid, offline, secondaries):
+ """Initializes this class.
"""
- if role == constants.CONFD_NODE_ROLE_OFFLINE:
- inst_running = cls.GetRunningInstances()
- cls.ShutdownInstances(inst_running)
- drbd_running = cls.GetUsedDRBDs()
- cls.ShutdownDRBD(drbd_running)
- else:
- logging.debug("Not doing anything for role %s", role)
+ self.name = name
+ self.bootid = bootid
+ self.offline = offline
+ self.secondaries = secondaries
- @staticmethod
- def ShutdownInstances(inst_running):
- """Shutdown running instances.
- """
- names_running = set([i[0] for i in inst_running])
- if names_running:
- logging.info("Following instances should not be running,"
- " shutting them down: %s", utils.CommaJoin(names_running))
- # this dictionary will collapse duplicate instance names (only
- # xen pvm/vhm) into a single key, which is fine
- i2h = dict(inst_running)
- for name in names_running:
- hv_name = i2h[name]
- hv = hypervisor.GetHypervisor(hv_name)
- hv.StopInstance(None, force=True, name=name)
-
- @staticmethod
- def ShutdownDRBD(drbd_running):
- """Shutdown active DRBD devices.
+def _CheckInstances(cl, notepad, instances):
+ """Make a pass over the list of instances, restarting downed ones.
- """
- if drbd_running:
- logging.info("Following DRBD minors should not be active,"
- " shutting them down: %s", utils.CommaJoin(drbd_running))
- for minor in drbd_running:
- # pylint: disable-msg=W0212
- # using the private method as is, pending enhancements to the DRBD
- # interface
- bdev.DRBD8._ShutdownAll(minor)
-
- def Exec(self):
- """Check node status versus cluster desired state.
+ """
+ notepad.MaintainInstanceList(instances.keys())
+
+ started = set()
+
+ for inst in instances.values():
+ if inst.status in BAD_STATES:
+ n = notepad.NumberOfRestartAttempts(inst.name)
+
+ if n > MAXTRIES:
+ logging.warning("Not restarting instance '%s', retries exhausted",
+ inst.name)
+ continue
+
+ if n == MAXTRIES:
+ notepad.RecordRestartAttempt(inst.name)
+ logging.error("Could not restart instance '%s' after %s attempts,"
+ " giving up", inst.name, MAXTRIES)
+ continue
+
+ try:
+ logging.info("Restarting instance '%s' (attempt #%s)",
+ inst.name, n + 1)
+ inst.Restart(cl)
+ except Exception: # pylint: disable=W0703
+ logging.exception("Error while restarting instance '%s'", inst.name)
+ else:
+ started.add(inst.name)
+
+ notepad.RecordRestartAttempt(inst.name)
- """
- my_name = netutils.Hostname.GetSysName()
- req = confd_client.ConfdClientRequest(type=
- constants.CONFD_REQ_NODE_ROLE_BYNAME,
- query=my_name)
- self.confd_client.SendRequest(req, async=False, coverage=-1)
- timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
- if not timed_out:
- # should have a valid response
- status, result = self.store_cb.GetResponse(req.rsalt)
- assert status, "Missing result but received replies"
- if not self.filter_cb.consistent[req.rsalt]:
- logging.warning("Inconsistent replies, not doing anything")
- return
- self.DoMaintenance(result.server_reply.answer)
else:
- logging.warning("Confd query timed out, cannot do maintenance actions")
+ if notepad.NumberOfRestartAttempts(inst.name):
+ notepad.RemoveInstance(inst.name)
+ if inst.status not in HELPLESS_STATES:
+ logging.info("Restart of instance '%s' succeeded", inst.name)
+
+ return started
-class WatcherState(object):
- """Interface to a state file recording restart attempts.
+def _CheckDisks(cl, notepad, nodes, instances, started):
+ """Check all nodes for restarted ones.
"""
- def __init__(self, statefile):
- """Open, lock, read and parse the file.
+ check_nodes = []
+
+ for node in nodes.values():
+ old = notepad.GetNodeBootID(node.name)
+ if not node.bootid:
+ # Bad node, not returning a boot id
+ if not node.offline:
+ logging.debug("Node '%s' missing boot ID, skipping secondary checks",
+ node.name)
+ continue
+
+ if old != node.bootid:
+ # Node's boot ID has changed, probably through a reboot
+ check_nodes.append(node)
+
+ if check_nodes:
+ # Activate disks for all instances with any of the checked nodes as a
+ # secondary node.
+ for node in check_nodes:
+ for instance_name in node.secondaries:
+ try:
+ inst = instances[instance_name]
+ except KeyError:
+ logging.info("Can't find instance '%s', maybe it was ignored",
+ instance_name)
+ continue
- @type statefile: file
- @param statefile: State file object
+ if not inst.disks_active:
+ logging.info("Skipping disk activation for instance with not"
+ " activated disks '%s'", inst.name)
+ continue
- """
- self.statefile = statefile
+ if inst.name in started:
+ # we already tried to start the instance, which should have
+ # activated its drives (if they can be at all)
+ logging.debug("Skipping disk activation for instance '%s' as"
+ " it was already started", inst.name)
+ continue
- try:
- state_data = self.statefile.read()
- if not state_data:
- self._data = {}
- else:
- self._data = serializer.Load(state_data)
- except Exception, msg: # pylint: disable-msg=W0703
- # Ignore errors while loading the file and treat it as empty
- self._data = {}
- logging.warning(("Invalid state file. Using defaults."
- " Error message: %s"), msg)
+ try:
+ logging.info("Activating disks for instance '%s'", inst.name)
+ inst.ActivateDisks(cl)
+ except Exception: # pylint: disable=W0703
+ logging.exception("Error while activating disks for instance '%s'",
+ inst.name)
- if "instance" not in self._data:
- self._data["instance"] = {}
- if "node" not in self._data:
- self._data["node"] = {}
+ # Keep changed boot IDs
+ for node in check_nodes:
+ notepad.SetNodeBootID(node.name, node.bootid)
- self._orig_data = serializer.Dump(self._data)
- def Save(self):
- """Save state to file, then unlock and close it.
+def _CheckForOfflineNodes(nodes, instance):
+ """Checks if given instances has any secondary in offline status.
- """
- assert self.statefile
+ @param instance: The instance object
+ @return: True if any of the secondary is offline, False otherwise
- serialized_form = serializer.Dump(self._data)
- if self._orig_data == serialized_form:
- logging.debug("Data didn't change, just touching status file")
- os.utime(constants.WATCHER_STATEFILE, None)
- return
+ """
+ return compat.any(nodes[node_name].offline for node_name in instance.snodes)
- # We need to make sure the file is locked before renaming it, otherwise
- # starting ganeti-watcher again at the same time will create a conflict.
- fd = utils.WriteFile(constants.WATCHER_STATEFILE,
- data=serialized_form,
- prewrite=utils.LockFile, close=False)
- self.statefile = os.fdopen(fd, 'w+')
- def Close(self):
- """Unlock configuration file and close it.
+def _VerifyDisks(cl, uuid, nodes, instances):
+ """Run a per-group "gnt-cluster verify-disks".
- """
- assert self.statefile
+ """
+ job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
+ ((_, offline_disk_instances, _), ) = \
+ cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
+ cl.ArchiveJob(job_id)
+
+ if not offline_disk_instances:
+ # nothing to do
+ logging.debug("Verify-disks reported no offline disks, nothing to do")
+ return
- # Files are automatically unlocked when closing them
- self.statefile.close()
- self.statefile = None
+ logging.debug("Will activate disks for instance(s) %s",
+ utils.CommaJoin(offline_disk_instances))
- def GetNodeBootID(self, name):
- """Returns the last boot ID of a node or None.
+ # We submit only one job, and wait for it. Not optimal, but this puts less
+ # load on the job queue.
+ job = []
+ for name in offline_disk_instances:
+ try:
+ inst = instances[name]
+ except KeyError:
+ logging.info("Can't find instance '%s', maybe it was ignored", name)
+ continue
- """
- ndata = self._data["node"]
+ if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
+ logging.info("Skipping instance '%s' because it is in a helpless state"
+ " or has offline secondaries", name)
+ continue
- if name in ndata and KEY_BOOT_ID in ndata[name]:
- return ndata[name][KEY_BOOT_ID]
- return None
+ job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
- def SetNodeBootID(self, name, bootid):
- """Sets the boot ID of a node.
+ if job:
+ job_id = cli.SendJob(job, cl=cl)
- """
- assert bootid
+ try:
+ cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
+ except Exception: # pylint: disable=W0703
+ logging.exception("Error while activating disks")
- ndata = self._data["node"]
- if name not in ndata:
- ndata[name] = {}
+def IsRapiResponding(hostname):
+ """Connects to RAPI port and does a simple test.
- ndata[name][KEY_BOOT_ID] = bootid
+ Connects to RAPI port of hostname and does a simple test. At this time, the
+ test is GetVersion.
- def NumberOfRestartAttempts(self, instance):
- """Returns number of previous restart attempts.
+ @type hostname: string
+ @param hostname: hostname of the node to connect to.
+ @rtype: bool
+ @return: Whether RAPI is working properly
- @type instance: L{Instance}
- @param instance: the instance to look up
+ """
+ curl_config = rapi.client.GenericCurlConfig()
+ rapi_client = rapi.client.GanetiRapiClient(hostname,
+ curl_config_fn=curl_config)
+ try:
+ master_version = rapi_client.GetVersion()
+ except rapi.client.CertificateError, err:
+ logging.warning("RAPI certificate error: %s", err)
+ return False
+ except rapi.client.GanetiApiError, err:
+ logging.warning("RAPI error: %s", err)
+ return False
+ else:
+ logging.debug("Reported RAPI version %s", master_version)
+ return master_version == constants.RAPI_VERSION
- """
- idata = self._data["instance"]
- if instance.name in idata:
- return idata[instance.name][KEY_RESTART_COUNT]
+def ParseOptions():
+ """Parse the command line options.
- return 0
+ @return: (options, args) as from OptionParser.parse_args()
- def MaintainInstanceList(self, instances):
- """Perform maintenance on the recorded instances.
+ """
+ parser = OptionParser(description="Ganeti cluster watcher",
+ usage="%prog [-d]",
+ version="%%prog (ganeti) %s" %
+ constants.RELEASE_VERSION)
- @type instances: list of string
- @param instances: the list of currently existing instances
+ parser.add_option(cli.DEBUG_OPT)
+ parser.add_option(cli.NODEGROUP_OPT)
+ parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
+ help="Autoarchive jobs older than this age (default"
+ " 6 hours)")
+ parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
+ action="store_true", help="Ignore cluster pause setting")
+ parser.add_option("--wait-children", dest="wait_children",
+ action="store_true", help="Wait for child processes")
+ parser.add_option("--no-wait-children", dest="wait_children",
+ action="store_false",
+ help="Don't wait for child processes")
+ # See optparse documentation for why default values are not set by options
+ parser.set_defaults(wait_children=True)
+ options, args = parser.parse_args()
+ options.job_age = cli.ParseTimespec(options.job_age)
- """
- idict = self._data["instance"]
- # First, delete obsolete instances
- obsolete_instances = set(idict).difference(instances)
- for inst in obsolete_instances:
- logging.debug("Forgetting obsolete instance %s", inst)
- del idict[inst]
-
- # Second, delete expired records
- earliest = time.time() - RETRY_EXPIRATION
- expired_instances = [i for i in idict
- if idict[i][KEY_RESTART_WHEN] < earliest]
- for inst in expired_instances:
- logging.debug("Expiring record for instance %s", inst)
- del idict[inst]
-
- def RecordRestartAttempt(self, instance):
- """Record a restart attempt.
-
- @type instance: L{Instance}
- @param instance: the instance being restarted
+ if args:
+ parser.error("No arguments expected")
- """
- idata = self._data["instance"]
+ return (options, args)
- if instance.name not in idata:
- inst = idata[instance.name] = {}
- else:
- inst = idata[instance.name]
- inst[KEY_RESTART_WHEN] = time.time()
- inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
+def _WriteInstanceStatus(filename, data):
+ """Writes the per-group instance status file.
- def RemoveInstance(self, instance):
- """Update state to reflect that a machine is running.
+ The entries are sorted.
- This method removes the record for a named instance (as we only
- track down instances).
+ @type filename: string
+ @param filename: Path to instance status file
+ @type data: list of tuple; (instance name as string, status as string)
+ @param data: Instance name and status
- @type instance: L{Instance}
- @param instance: the instance to remove from books
+ """
+ logging.debug("Updating instance status file '%s' with %s instances",
+ filename, len(data))
- """
- idata = self._data["instance"]
+ utils.WriteFile(filename,
+ data="".join(map(compat.partial(operator.mod, "%s %s\n"),
+ sorted(data))))
- if instance.name in idata:
- del idata[instance.name]
+def _UpdateInstanceStatus(filename, instances):
+ """Writes an instance status file from L{Instance} objects.
-class Instance(object):
- """Abstraction for a Virtual Machine instance.
+ @type filename: string
+ @param filename: Path to status file
+ @type instances: list of L{Instance}
"""
- def __init__(self, name, state, autostart, snodes):
- self.name = name
- self.state = state
- self.autostart = autostart
- self.snodes = snodes
+ _WriteInstanceStatus(filename, [(inst.name, inst.status)
+ for inst in instances])
- def Restart(self):
- """Encapsulates the start of an instance.
- """
- op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
- cli.SubmitOpCode(op, cl=client)
+def _ReadInstanceStatus(filename):
+ """Reads an instance status file.
- def ActivateDisks(self):
- """Encapsulates the activation of all disks of an instance.
+ @type filename: string
+ @param filename: Path to status file
+ @rtype: tuple; (None or number, list of lists containing instance name and
+ status)
+ @return: File's mtime and instance status contained in the file; mtime is
+ C{None} if file can't be read
- """
- op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
- cli.SubmitOpCode(op, cl=client)
+ """
+ logging.debug("Reading per-group instance status from '%s'", filename)
+ statcb = utils.FileStatHelper()
+ try:
+ content = utils.ReadFile(filename, preread=statcb)
+ except EnvironmentError, err:
+ if err.errno == errno.ENOENT:
+ logging.error("Can't read '%s', does not exist (yet)", filename)
+ else:
+ logging.exception("Unable to read '%s', ignoring", filename)
+ return (None, None)
+ else:
+ return (statcb.st.st_mtime, [line.split(None, 1)
+ for line in content.splitlines()])
-def GetClusterData():
- """Get a list of instances on this cluster.
+
+def _MergeInstanceStatus(filename, pergroup_filename, groups):
+ """Merges all per-group instance status files into a global one.
+
+ @type filename: string
+ @param filename: Path to global instance status file
+ @type pergroup_filename: string
+ @param pergroup_filename: Path to per-group status files, must contain "%s"
+ to be replaced with group UUID
+ @type groups: sequence
+ @param groups: UUIDs of known groups
"""
- op1_fields = ["name", "status", "admin_state", "snodes"]
- op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
- use_locking=True)
- op2_fields = ["name", "bootid", "offline"]
- op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
- use_locking=True)
+ # Lock global status file in exclusive mode
+ lock = utils.FileLock.Open(filename)
+ try:
+ lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
+ except errors.LockError, err:
+ # All per-group processes will lock and update the file. None of them
+ # should take longer than 10 seconds (the value of
+ # INSTANCE_STATUS_LOCK_TIMEOUT).
+ logging.error("Can't acquire lock on instance status file '%s', not"
+ " updating: %s", filename, err)
+ return
+
+ logging.debug("Acquired exclusive lock on '%s'", filename)
- job_id = client.SubmitJob([op1, op2])
+ data = {}
- all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
+ # Load instance status from all groups
+ for group_uuid in groups:
+ (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
- logging.debug("Got data from cluster, writing instance status file")
+ if mtime is not None:
+ for (instance_name, status) in instdata:
+ data.setdefault(instance_name, []).append((mtime, status))
- result = all_results[0]
- smap = {}
+ # Select last update based on file mtime
+ inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
+ for (instance_name, status) in data.items()]
- instances = {}
+ # Write the global status file. Don't touch file after it's been
+ # updated--there is no lock anymore.
+ _WriteInstanceStatus(filename, inststatus)
- # write the upfile
- up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
- utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
- for fields in result:
- (name, status, autostart, snodes) = fields
+def GetLuxiClient(try_restart):
+ """Tries to connect to the master daemon.
- # update the secondary node map
- for node in snodes:
- if node not in smap:
- smap[node] = []
- smap[node].append(name)
+ @type try_restart: bool
+ @param try_restart: Whether to attempt to restart the master daemon
- instances[name] = Instance(name, status, autostart, snodes)
+ """
+ try:
+ return cli.GetClient()
+ except errors.OpPrereqError, err:
+ # this is, from cli.GetClient, a not-master case
+ raise NotMasterError("Not on master node (%s)" % err)
- nodes = dict([(name, (bootid, offline))
- for name, bootid, offline in all_results[1]])
+ except luxi.NoMasterError, err:
+ if not try_restart:
+ raise
- client.ArchiveJob(job_id)
+ logging.warning("Master daemon seems to be down (%s), trying to restart",
+ err)
- return instances, nodes, smap
+ if not utils.EnsureDaemon(constants.MASTERD):
+ raise errors.GenericError("Can't start the master daemon")
+ # Retry the connection
+ return cli.GetClient()
-class Watcher(object):
- """Encapsulate the logic for restarting erroneously halted virtual machines.
- The calling program should periodically instantiate me and call Run().
- This will traverse the list of instances, and make up to MAXTRIES attempts
- to restart machines that are down.
+def _StartGroupChildren(cl, wait):
+ """Starts a new instance of the watcher for every node group.
"""
- def __init__(self, opts, notepad):
- self.notepad = notepad
- master = client.QueryConfigValues(["master_node"])[0]
- if master != netutils.Hostname.GetSysName():
- raise NotMasterError("This is not the master node")
- # first archive old jobs
- self.ArchiveJobs(opts.job_age)
- # and only then submit new ones
- self.instances, self.bootids, self.smap = GetClusterData()
- self.started_instances = set()
- self.opts = opts
-
- def Run(self):
- """Watcher run sequence.
+ assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
+ for arg in sys.argv)
- """
- notepad = self.notepad
- self.CheckInstances(notepad)
- self.CheckDisks(notepad)
- self.VerifyDisks()
+ result = cl.QueryGroups([], ["name", "uuid"], False)
- @staticmethod
- def ArchiveJobs(age):
- """Archive old jobs.
+ children = []
- """
- arch_count, left_count = client.AutoArchiveJobs(age)
- logging.debug("Archived %s jobs, left %s", arch_count, left_count)
+ for (idx, (name, uuid)) in enumerate(result):
+ args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
- def CheckDisks(self, notepad):
- """Check all nodes for restarted ones.
+ if idx > 0:
+ # Let's not kill the system
+ time.sleep(CHILD_PROCESS_DELAY)
- """
- check_nodes = []
- for name, (new_id, offline) in self.bootids.iteritems():
- old = notepad.GetNodeBootID(name)
- if new_id is None:
- # Bad node, not returning a boot id
- if not offline:
- logging.debug("Node %s missing boot id, skipping secondary checks",
- name)
- continue
- if old != new_id:
- # Node's boot ID has changed, proably through a reboot.
- check_nodes.append(name)
-
- if check_nodes:
- # Activate disks for all instances with any of the checked nodes as a
- # secondary node.
- for node in check_nodes:
- if node not in self.smap:
- continue
- for instance_name in self.smap[node]:
- instance = self.instances[instance_name]
- if not instance.autostart:
- logging.info(("Skipping disk activation for non-autostart"
- " instance %s"), instance.name)
- continue
- if instance.name in self.started_instances:
- # we already tried to start the instance, which should have
- # activated its drives (if they can be at all)
- continue
- try:
- logging.info("Activating disks for instance %s", instance.name)
- instance.ActivateDisks()
- except Exception: # pylint: disable-msg=W0703
- logging.exception("Error while activating disks for instance %s",
- instance.name)
-
- # Keep changed boot IDs
- for name in check_nodes:
- notepad.SetNodeBootID(name, self.bootids[name][0])
-
- def CheckInstances(self, notepad):
- """Make a pass over the list of instances, restarting downed ones.
+ logging.debug("Spawning child for group '%s' (%s), arguments %s",
+ name, uuid, args)
- """
- notepad.MaintainInstanceList(self.instances.keys())
+ try:
+ # TODO: Should utils.StartDaemon be used instead?
+ pid = os.spawnv(os.P_NOWAIT, args[0], args)
+ except Exception: # pylint: disable=W0703
+ logging.exception("Failed to start child for group '%s' (%s)",
+ name, uuid)
+ else:
+ logging.debug("Started with PID %s", pid)
+ children.append(pid)
- for instance in self.instances.values():
- if instance.state in BAD_STATES:
- n = notepad.NumberOfRestartAttempts(instance)
+ if wait:
+ for pid in children:
+ logging.debug("Waiting for child PID %s", pid)
+ try:
+ result = utils.RetryOnSignal(os.waitpid, pid, 0)
+ except EnvironmentError, err:
+ result = str(err)
- if n > MAXTRIES:
- logging.warning("Not restarting instance %s, retries exhausted",
- instance.name)
- continue
- elif n < MAXTRIES:
- last = " (Attempt #%d)" % (n + 1)
- else:
- notepad.RecordRestartAttempt(instance)
- logging.error("Could not restart %s after %d attempts, giving up",
- instance.name, MAXTRIES)
- continue
- try:
- logging.info("Restarting %s%s",
- instance.name, last)
- instance.Restart()
- self.started_instances.add(instance.name)
- except Exception: # pylint: disable-msg=W0703
- logging.exception("Error while restarting instance %s",
- instance.name)
-
- notepad.RecordRestartAttempt(instance)
- elif instance.state in HELPLESS_STATES:
- if notepad.NumberOfRestartAttempts(instance):
- notepad.RemoveInstance(instance)
- else:
- if notepad.NumberOfRestartAttempts(instance):
- notepad.RemoveInstance(instance)
- logging.info("Restart of %s succeeded", instance.name)
+ logging.debug("Child PID %s exited with status %s", pid, result)
- def _CheckForOfflineNodes(self, instance):
- """Checks if given instances has any secondary in offline status.
- @param instance: The instance object
- @return: True if any of the secondary is offline, False otherwise
+def _ArchiveJobs(cl, age):
+ """Archives old jobs.
- """
- bootids = []
- for node in instance.snodes:
- bootids.append(self.bootids[node])
+ """
+ (arch_count, left_count) = cl.AutoArchiveJobs(age)
+ logging.debug("Archived %s jobs, left %s", arch_count, left_count)
- return compat.any(offline for (_, offline) in bootids)
- def VerifyDisks(self):
- """Run gnt-cluster verify-disks.
+def _CheckMaster(cl):
+ """Ensures current host is master node.
- """
- op = opcodes.OpClusterVerifyDisks()
- job_id = client.SubmitJob([op])
- result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
- client.ArchiveJob(job_id)
- if not isinstance(result, (tuple, list)):
- logging.error("Can't get a valid result from verify-disks")
- return
- offline_disk_instances = result[1]
- if not offline_disk_instances:
- # nothing to do
- return
- logging.debug("Will activate disks for instances %s",
- utils.CommaJoin(offline_disk_instances))
- # we submit only one job, and wait for it. not optimal, but spams
- # less the job queue
- job = []
- for name in offline_disk_instances:
- instance = self.instances[name]
- if (instance.state in HELPLESS_STATES or
- self._CheckForOfflineNodes(instance)):
- logging.info("Skip instance %s because it is in helpless state or has"
- " one offline secondary", name)
- continue
- job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
+ """
+ (master, ) = cl.QueryConfigValues(["master_node"])
+ if master != netutils.Hostname.GetSysName():
+ raise NotMasterError("This is not the master node")
- if job:
- job_id = cli.SendJob(job, cl=client)
- try:
- cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
- except Exception: # pylint: disable-msg=W0703
- logging.exception("Error while activating disks")
+@UsesRapiClient
+def _GlobalWatcher(opts):
+ """Main function for global watcher.
+ At the end child processes are spawned for every node group.
-def OpenStateFile(path):
- """Opens the state file and acquires a lock on it.
+ """
+ StartNodeDaemons()
+ RunWatcherHooks()
- @type path: string
- @param path: Path to state file
+ # Run node maintenance in all cases, even if master, so that old masters can
+ # be properly cleaned up
+ if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
+ nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
- """
- # The two-step dance below is necessary to allow both opening existing
- # file read/write and creating if not existing. Vanilla open will truncate
- # an existing file -or- allow creating if not existing.
- statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
-
- # Try to acquire lock on state file. If this fails, another watcher instance
- # might already be running or another program is temporarily blocking the
- # watcher from running.
try:
- utils.LockFile(statefile_fd)
- except errors.LockError, err:
- logging.error("Can't acquire lock on state file %s: %s", path, err)
- return None
+ client = GetLuxiClient(True)
+ except NotMasterError:
+ # Don't proceed on non-master nodes
+ return constants.EXIT_SUCCESS
- return os.fdopen(statefile_fd, "w+")
+ # we are on master now
+ utils.EnsureDaemon(constants.RAPI)
+ # If RAPI isn't responding to queries, try one restart
+ logging.debug("Attempting to talk to remote API on %s",
+ constants.IP4_ADDRESS_LOCALHOST)
+ if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
+ logging.warning("Couldn't get answer from remote API, restaring daemon")
+ utils.StopDaemon(constants.RAPI)
+ utils.EnsureDaemon(constants.RAPI)
+ logging.debug("Second attempt to talk to remote API")
+ if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
+ logging.fatal("RAPI is not responding")
+ logging.debug("Successfully talked to remote API")
-def IsRapiResponding(hostname):
- """Connects to RAPI port and does a simple test.
+ _CheckMaster(client)
+ _ArchiveJobs(client, opts.job_age)
- Connects to RAPI port of hostname and does a simple test. At this time, the
- test is GetVersion.
+ # Spawn child processes for all node groups
+ _StartGroupChildren(client, opts.wait_children)
- @type hostname: string
- @param hostname: hostname of the node to connect to.
- @rtype: bool
- @return: Whether RAPI is working properly
+ return constants.EXIT_SUCCESS
+
+
+def _GetGroupData(cl, uuid):
+ """Retrieves instances and nodes per node group.
"""
- curl_config = rapi.client.GenericCurlConfig()
- rapi_client = rapi.client.GanetiRapiClient(hostname,
- curl_config_fn=curl_config)
- try:
- master_version = rapi_client.GetVersion()
- except rapi.client.CertificateError, err:
- logging.warning("RAPI Error: CertificateError (%s)", err)
- return False
- except rapi.client.GanetiApiError, err:
- logging.warning("RAPI Error: GanetiApiError (%s)", err)
- return False
- logging.debug("RAPI Result: master_version is %s", master_version)
- return master_version == constants.RAPI_VERSION
+ job = [
+ # Get all primary instances in group
+ opcodes.OpQuery(what=constants.QR_INSTANCE,
+ fields=["name", "status", "disks_active", "snodes",
+ "pnode.group.uuid", "snodes.group.uuid"],
+ qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
+ use_locking=True),
+
+ # Get all nodes in group
+ opcodes.OpQuery(what=constants.QR_NODE,
+ fields=["name", "bootid", "offline"],
+ qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
+ use_locking=True),
+ ]
+
+ job_id = cl.SubmitJob(job)
+ results = map(objects.QueryResponse.FromDict,
+ cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
+ cl.ArchiveJob(job_id)
+
+ results_data = map(operator.attrgetter("data"), results)
+
+ # Ensure results are tuples with two values
+ assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
+
+ # Extract values ignoring result status
+ (raw_instances, raw_nodes) = [[map(compat.snd, values)
+ for values in res]
+ for res in results_data]
+
+ secondaries = {}
+ instances = []
+
+ # Load all instances
+ for (name, status, disks_active, snodes, pnode_group_uuid,
+ snodes_group_uuid) in raw_instances:
+ if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
+ logging.error("Ignoring split instance '%s', primary group %s, secondary"
+ " groups %s", name, pnode_group_uuid,
+ utils.CommaJoin(snodes_group_uuid))
+ else:
+ instances.append(Instance(name, status, disks_active, snodes))
+ for node in snodes:
+ secondaries.setdefault(node, set()).add(name)
-def ParseOptions():
- """Parse the command line options.
+ # Load all nodes
+ nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
+ for (name, bootid, offline) in raw_nodes]
- @return: (options, args) as from OptionParser.parse_args()
+ return (dict((node.name, node) for node in nodes),
+ dict((inst.name, inst) for inst in instances))
+
+
+def _LoadKnownGroups():
+ """Returns a list of all node groups known by L{ssconf}.
"""
- parser = OptionParser(description="Ganeti cluster watcher",
- usage="%prog [-d]",
- version="%%prog (ganeti) %s" %
- constants.RELEASE_VERSION)
+ groups = ssconf.SimpleStore().GetNodegroupList()
- parser.add_option(cli.DEBUG_OPT)
- parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
- help="Autoarchive jobs older than this age (default"
- " 6 hours)")
- parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
- action="store_true", help="Ignore cluster pause setting")
- options, args = parser.parse_args()
- options.job_age = cli.ParseTimespec(options.job_age)
+ result = list(line.split(None, 1)[0] for line in groups
+ if line.strip())
- if args:
- parser.error("No arguments expected")
+ if not compat.all(map(utils.UUID_RE.match, result)):
+ raise errors.GenericError("Ssconf contains invalid group UUID")
- return (options, args)
+ return result
+
+
+def _GroupWatcher(opts):
+ """Main function for per-group watcher process.
+
+ """
+ group_uuid = opts.nodegroup.lower()
+
+ if not utils.UUID_RE.match(group_uuid):
+ raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
+ " got '%s'" %
+ (cli.NODEGROUP_OPT_NAME, group_uuid))
+
+ logging.info("Watcher for node group '%s'", group_uuid)
+
+ known_groups = _LoadKnownGroups()
+
+ # Check if node group is known
+ if group_uuid not in known_groups:
+ raise errors.GenericError("Node group '%s' is not known by ssconf" %
+ group_uuid)
+
+ # Group UUID has been verified and should not contain any dangerous
+ # characters
+ state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
+ inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
+
+ logging.debug("Using state file %s", state_path)
+
+ # Global watcher
+ statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
+ if not statefile:
+ return constants.EXIT_FAILURE
+
+ notepad = state.WatcherState(statefile) # pylint: disable=E0602
+ try:
+ # Connect to master daemon
+ client = GetLuxiClient(False)
+
+ _CheckMaster(client)
+
+ (nodes, instances) = _GetGroupData(client, group_uuid)
+
+ # Update per-group instance status file
+ _UpdateInstanceStatus(inst_status_path, instances.values())
+
+ _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
+ pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
+ known_groups)
+
+ started = _CheckInstances(client, notepad, instances)
+ _CheckDisks(client, notepad, nodes, instances, started)
+ _VerifyDisks(client, group_uuid, nodes, instances)
+ except Exception, err:
+ logging.info("Not updating status file due to failure: %s", err)
+ raise
+ else:
+ # Save changes for next run
+ notepad.Save(state_path)
+
+ return constants.EXIT_SUCCESS
-@rapi.client.UsesRapiClient
def Main():
"""Main function.
"""
- global client # pylint: disable-msg=W0603
-
(options, _) = ParseOptions()
- utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
+ utils.SetupLogging(pathutils.LOG_WATCHER, sys.argv[0],
debug=options.debug, stderr_logging=options.debug)
if ShouldPause() and not options.ignore_pause:
logging.debug("Pause has been set, exiting")
return constants.EXIT_SUCCESS
- statefile = OpenStateFile(constants.WATCHER_STATEFILE)
- if not statefile:
- return constants.EXIT_FAILURE
-
- update_file = False
+ # Try to acquire global watcher lock in shared mode
+ lock = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
try:
- StartNodeDaemons()
- RunWatcherHooks()
- # run node maintenance in all cases, even if master, so that old
- # masters can be properly cleaned up too
- if NodeMaintenance.ShouldRun():
- NodeMaintenance().Exec()
-
- notepad = WatcherState(statefile)
- try:
- try:
- client = cli.GetClient()
- except errors.OpPrereqError:
- # this is, from cli.GetClient, a not-master case
- logging.debug("Not on master, exiting")
- update_file = True
- return constants.EXIT_SUCCESS
- except luxi.NoMasterError, err:
- logging.warning("Master seems to be down (%s), trying to restart",
- str(err))
- if not utils.EnsureDaemon(constants.MASTERD):
- logging.critical("Can't start the master, exiting")
- return constants.EXIT_FAILURE
- # else retry the connection
- client = cli.GetClient()
-
- # we are on master now
- utils.EnsureDaemon(constants.RAPI)
-
- # If RAPI isn't responding to queries, try one restart.
- logging.debug("Attempting to talk with RAPI.")
- if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
- logging.warning("Couldn't get answer from Ganeti RAPI daemon."
- " Restarting Ganeti RAPI.")
- utils.StopDaemon(constants.RAPI)
- utils.EnsureDaemon(constants.RAPI)
- logging.debug("Second attempt to talk with RAPI")
- if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
- logging.fatal("RAPI is not responding. Please investigate.")
- logging.debug("Successfully talked to RAPI.")
+ lock.Shared(blocking=False)
+ except (EnvironmentError, errors.LockError), err:
+ logging.error("Can't acquire lock on %s: %s",
+ pathutils.WATCHER_LOCK_FILE, err)
+ return constants.EXIT_SUCCESS
- try:
- watcher = Watcher(options, notepad)
- except errors.ConfigurationError:
- # Just exit if there's no configuration
- update_file = True
- return constants.EXIT_SUCCESS
-
- watcher.Run()
- update_file = True
-
- finally:
- if update_file:
- notepad.Save()
- else:
- logging.debug("Not updating status file due to failure")
- except SystemExit:
+ if options.nodegroup is None:
+ fn = _GlobalWatcher
+ else:
+ # Per-nodegroup watcher
+ fn = _GroupWatcher
+
+ try:
+ return fn(options)
+ except (SystemExit, KeyboardInterrupt):
raise
except NotMasterError:
logging.debug("Not master, exiting")
return constants.EXIT_NOTMASTER
except errors.ResolverError, err:
- logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
+ logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
return constants.EXIT_NODESETUP_ERROR
except errors.JobQueueFull:
logging.error("Job queue is full, can't query cluster state")