#!/usr/bin/python # # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. """Tool to restart erroneously downed virtual machines. This program and set of classes implement a watchdog to restart virtual machines in a Ganeti cluster that have crashed or been killed by a node reboot. Run from cron or similar. """ # pylint: disable-msg=C0103,W0142 # C0103: Invalid name ganeti-watcher import os import sys import time import logging from optparse import OptionParser from ganeti import utils from ganeti import constants from ganeti import serializer from ganeti import errors from ganeti import opcodes from ganeti import cli from ganeti import luxi from ganeti import ssconf from ganeti import bdev from ganeti import hypervisor from ganeti import rapi from ganeti.confd import client as confd_client from ganeti import netutils import ganeti.rapi.client # pylint: disable-msg=W0611 MAXTRIES = 5 # Delete any record that is older than 8 hours; this value is based on # the fact that the current retry counter is 5, and watcher runs every # 5 minutes, so it takes around half an hour to exceed the retry # counter, so 8 hours (16*1/2h) seems like a reasonable reset time RETRY_EXPIRATION = 8 * 3600 BAD_STATES = ['ERROR_down'] HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline'] NOTICE = 'NOTICE' ERROR = 'ERROR' KEY_RESTART_COUNT = "restart_count" KEY_RESTART_WHEN = "restart_when" KEY_BOOT_ID = "bootid" # Global client object client = None class NotMasterError(errors.GenericError): """Exception raised when this host is not the master.""" def ShouldPause(): """Check whether we should pause. """ return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)) def StartNodeDaemons(): """Start all the daemons that should be running on all nodes. """ # on master or not, try to start the node daemon utils.EnsureDaemon(constants.NODED) # start confd as well. On non candidates it will be in disabled mode. utils.EnsureDaemon(constants.CONFD) def RunWatcherHooks(): """Run the watcher hooks. """ hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR, constants.HOOKS_NAME_WATCHER) if not os.path.isdir(hooks_dir): return try: results = utils.RunParts(hooks_dir) except Exception, msg: # pylint: disable-msg=W0703 logging.critical("RunParts %s failed: %s", hooks_dir, msg) for (relname, status, runresult) in results: if status == constants.RUNPARTS_SKIP: logging.debug("Watcher hook %s: skipped", relname) elif status == constants.RUNPARTS_ERR: logging.warning("Watcher hook %s: error (%s)", relname, runresult) elif status == constants.RUNPARTS_RUN: if runresult.failed: logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)", relname, runresult.exit_code, runresult.output) else: logging.debug("Watcher hook %s: success (output: %s)", relname, runresult.output) class NodeMaintenance(object): """Talks to confd daemons and possible shutdown instances/drbd devices. """ def __init__(self): self.store_cb = confd_client.StoreResultCallback() self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb) self.confd_client = confd_client.GetConfdClient(self.filter_cb) @staticmethod def ShouldRun(): """Checks whether node maintenance should run. """ try: return ssconf.SimpleStore().GetMaintainNodeHealth() except errors.ConfigurationError, err: logging.error("Configuration error, not activating node maintenance: %s", err) return False @staticmethod def GetRunningInstances(): """Compute list of hypervisor/running instances. """ hyp_list = ssconf.SimpleStore().GetHypervisorList() results = [] for hv_name in hyp_list: try: hv = hypervisor.GetHypervisor(hv_name) ilist = hv.ListInstances() results.extend([(iname, hv_name) for iname in ilist]) except: # pylint: disable-msg=W0702 logging.error("Error while listing instances for hypervisor %s", hv_name, exc_info=True) return results @staticmethod def GetUsedDRBDs(): """Get list of used DRBD minors. """ return bdev.DRBD8.GetUsedDevs().keys() @classmethod def DoMaintenance(cls, role): """Maintain the instance list. """ if role == constants.CONFD_NODE_ROLE_OFFLINE: inst_running = cls.GetRunningInstances() cls.ShutdownInstances(inst_running) drbd_running = cls.GetUsedDRBDs() cls.ShutdownDRBD(drbd_running) else: logging.debug("Not doing anything for role %s", role) @staticmethod def ShutdownInstances(inst_running): """Shutdown running instances. """ names_running = set([i[0] for i in inst_running]) if names_running: logging.info("Following instances should not be running," " shutting them down: %s", utils.CommaJoin(names_running)) # this dictionary will collapse duplicate instance names (only # xen pvm/vhm) into a single key, which is fine i2h = dict(inst_running) for name in names_running: hv_name = i2h[name] hv = hypervisor.GetHypervisor(hv_name) hv.StopInstance(None, force=True, name=name) @staticmethod def ShutdownDRBD(drbd_running): """Shutdown active DRBD devices. """ if drbd_running: logging.info("Following DRBD minors should not be active," " shutting them down: %s", utils.CommaJoin(drbd_running)) for minor in drbd_running: # pylint: disable-msg=W0212 # using the private method as is, pending enhancements to the DRBD # interface bdev.DRBD8._ShutdownAll(minor) def Exec(self): """Check node status versus cluster desired state. """ my_name = netutils.Hostname.GetSysName() req = confd_client.ConfdClientRequest(type= constants.CONFD_REQ_NODE_ROLE_BYNAME, query=my_name) self.confd_client.SendRequest(req, async=False, coverage=-1) timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt) if not timed_out: # should have a valid response status, result = self.store_cb.GetResponse(req.rsalt) assert status, "Missing result but received replies" if not self.filter_cb.consistent[req.rsalt]: logging.warning("Inconsistent replies, not doing anything") return self.DoMaintenance(result.server_reply.answer) else: logging.warning("Confd query timed out, cannot do maintenance actions") class WatcherState(object): """Interface to a state file recording restart attempts. """ def __init__(self, statefile): """Open, lock, read and parse the file. @type statefile: file @param statefile: State file object """ self.statefile = statefile try: state_data = self.statefile.read() if not state_data: self._data = {} else: self._data = serializer.Load(state_data) except Exception, msg: # pylint: disable-msg=W0703 # Ignore errors while loading the file and treat it as empty self._data = {} logging.warning(("Invalid state file. Using defaults." " Error message: %s"), msg) if "instance" not in self._data: self._data["instance"] = {} if "node" not in self._data: self._data["node"] = {} self._orig_data = serializer.Dump(self._data) def Save(self): """Save state to file, then unlock and close it. """ assert self.statefile serialized_form = serializer.Dump(self._data) if self._orig_data == serialized_form: logging.debug("Data didn't change, just touching status file") os.utime(constants.WATCHER_STATEFILE, None) return # We need to make sure the file is locked before renaming it, otherwise # starting ganeti-watcher again at the same time will create a conflict. fd = utils.WriteFile(constants.WATCHER_STATEFILE, data=serialized_form, prewrite=utils.LockFile, close=False) self.statefile = os.fdopen(fd, 'w+') def Close(self): """Unlock configuration file and close it. """ assert self.statefile # Files are automatically unlocked when closing them self.statefile.close() self.statefile = None def GetNodeBootID(self, name): """Returns the last boot ID of a node or None. """ ndata = self._data["node"] if name in ndata and KEY_BOOT_ID in ndata[name]: return ndata[name][KEY_BOOT_ID] return None def SetNodeBootID(self, name, bootid): """Sets the boot ID of a node. """ assert bootid ndata = self._data["node"] if name not in ndata: ndata[name] = {} ndata[name][KEY_BOOT_ID] = bootid def NumberOfRestartAttempts(self, instance): """Returns number of previous restart attempts. @type instance: L{Instance} @param instance: the instance to look up """ idata = self._data["instance"] if instance.name in idata: return idata[instance.name][KEY_RESTART_COUNT] return 0 def MaintainInstanceList(self, instances): """Perform maintenance on the recorded instances. @type instances: list of string @param instances: the list of currently existing instances """ idict = self._data["instance"] # First, delete obsolete instances obsolete_instances = set(idict).difference(instances) for inst in obsolete_instances: logging.debug("Forgetting obsolete instance %s", inst) del idict[inst] # Second, delete expired records earliest = time.time() - RETRY_EXPIRATION expired_instances = [i for i in idict if idict[i][KEY_RESTART_WHEN] < earliest] for inst in expired_instances: logging.debug("Expiring record for instance %s", inst) del idict[inst] def RecordRestartAttempt(self, instance): """Record a restart attempt. @type instance: L{Instance} @param instance: the instance being restarted """ idata = self._data["instance"] if instance.name not in idata: inst = idata[instance.name] = {} else: inst = idata[instance.name] inst[KEY_RESTART_WHEN] = time.time() inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1 def RemoveInstance(self, instance): """Update state to reflect that a machine is running. This method removes the record for a named instance (as we only track down instances). @type instance: L{Instance} @param instance: the instance to remove from books """ idata = self._data["instance"] if instance.name in idata: del idata[instance.name] class Instance(object): """Abstraction for a Virtual Machine instance. """ def __init__(self, name, state, autostart): self.name = name self.state = state self.autostart = autostart def Restart(self): """Encapsulates the start of an instance. """ op = opcodes.OpStartupInstance(instance_name=self.name, force=False) cli.SubmitOpCode(op, cl=client) def ActivateDisks(self): """Encapsulates the activation of all disks of an instance. """ op = opcodes.OpActivateInstanceDisks(instance_name=self.name) cli.SubmitOpCode(op, cl=client) def GetClusterData(): """Get a list of instances on this cluster. """ op1_fields = ["name", "status", "admin_state", "snodes"] op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[], use_locking=True) op2_fields = ["name", "bootid", "offline"] op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[], use_locking=True) job_id = client.SubmitJob([op1, op2]) all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug) logging.debug("Got data from cluster, writing instance status file") result = all_results[0] smap = {} instances = {} # write the upfile up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result]) utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data) for fields in result: (name, status, autostart, snodes) = fields # update the secondary node map for node in snodes: if node not in smap: smap[node] = [] smap[node].append(name) instances[name] = Instance(name, status, autostart) nodes = dict([(name, (bootid, offline)) for name, bootid, offline in all_results[1]]) client.ArchiveJob(job_id) return instances, nodes, smap class Watcher(object): """Encapsulate the logic for restarting erroneously halted virtual machines. The calling program should periodically instantiate me and call Run(). This will traverse the list of instances, and make up to MAXTRIES attempts to restart machines that are down. """ def __init__(self, opts, notepad): self.notepad = notepad master = client.QueryConfigValues(["master_node"])[0] if master != netutils.Hostname.GetSysName(): raise NotMasterError("This is not the master node") # first archive old jobs self.ArchiveJobs(opts.job_age) # and only then submit new ones self.instances, self.bootids, self.smap = GetClusterData() self.started_instances = set() self.opts = opts def Run(self): """Watcher run sequence. """ notepad = self.notepad self.CheckInstances(notepad) self.CheckDisks(notepad) self.VerifyDisks() @staticmethod def ArchiveJobs(age): """Archive old jobs. """ arch_count, left_count = client.AutoArchiveJobs(age) logging.debug("Archived %s jobs, left %s", arch_count, left_count) def CheckDisks(self, notepad): """Check all nodes for restarted ones. """ check_nodes = [] for name, (new_id, offline) in self.bootids.iteritems(): old = notepad.GetNodeBootID(name) if new_id is None: # Bad node, not returning a boot id if not offline: logging.debug("Node %s missing boot id, skipping secondary checks", name) continue if old != new_id: # Node's boot ID has changed, proably through a reboot. check_nodes.append(name) if check_nodes: # Activate disks for all instances with any of the checked nodes as a # secondary node. for node in check_nodes: if node not in self.smap: continue for instance_name in self.smap[node]: instance = self.instances[instance_name] if not instance.autostart: logging.info(("Skipping disk activation for non-autostart" " instance %s"), instance.name) continue if instance.name in self.started_instances: # we already tried to start the instance, which should have # activated its drives (if they can be at all) continue try: logging.info("Activating disks for instance %s", instance.name) instance.ActivateDisks() except Exception: # pylint: disable-msg=W0703 logging.exception("Error while activating disks for instance %s", instance.name) # Keep changed boot IDs for name in check_nodes: notepad.SetNodeBootID(name, self.bootids[name][0]) def CheckInstances(self, notepad): """Make a pass over the list of instances, restarting downed ones. """ notepad.MaintainInstanceList(self.instances.keys()) for instance in self.instances.values(): if instance.state in BAD_STATES: n = notepad.NumberOfRestartAttempts(instance) if n > MAXTRIES: logging.warning("Not restarting instance %s, retries exhausted", instance.name) continue elif n < MAXTRIES: last = " (Attempt #%d)" % (n + 1) else: notepad.RecordRestartAttempt(instance) logging.error("Could not restart %s after %d attempts, giving up", instance.name, MAXTRIES) continue try: logging.info("Restarting %s%s", instance.name, last) instance.Restart() self.started_instances.add(instance.name) except Exception: # pylint: disable-msg=W0703 logging.exception("Error while restarting instance %s", instance.name) notepad.RecordRestartAttempt(instance) elif instance.state in HELPLESS_STATES: if notepad.NumberOfRestartAttempts(instance): notepad.RemoveInstance(instance) else: if notepad.NumberOfRestartAttempts(instance): notepad.RemoveInstance(instance) logging.info("Restart of %s succeeded", instance.name) @staticmethod def VerifyDisks(): """Run gnt-cluster verify-disks. """ op = opcodes.OpVerifyDisks() job_id = client.SubmitJob([op]) result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0] client.ArchiveJob(job_id) if not isinstance(result, (tuple, list)): logging.error("Can't get a valid result from verify-disks") return offline_disk_instances = result[2] if not offline_disk_instances: # nothing to do return logging.debug("Will activate disks for instances %s", utils.CommaJoin(offline_disk_instances)) # we submit only one job, and wait for it. not optimal, but spams # less the job queue job = [opcodes.OpActivateInstanceDisks(instance_name=name) for name in offline_disk_instances] job_id = cli.SendJob(job, cl=client) try: cli.PollJob(job_id, cl=client, feedback_fn=logging.debug) except Exception: # pylint: disable-msg=W0703 logging.exception("Error while activating disks") def OpenStateFile(path): """Opens the state file and acquires a lock on it. @type path: string @param path: Path to state file """ # The two-step dance below is necessary to allow both opening existing # file read/write and creating if not existing. Vanilla open will truncate # an existing file -or- allow creating if not existing. statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT) # Try to acquire lock on state file. If this fails, another watcher instance # might already be running or another program is temporarily blocking the # watcher from running. try: utils.LockFile(statefile_fd) except errors.LockError, err: logging.error("Can't acquire lock on state file %s: %s", path, err) return None return os.fdopen(statefile_fd, "w+") def IsRapiResponding(hostname): """Connects to RAPI port and does a simple test. Connects to RAPI port of hostname and does a simple test. At this time, the test is GetVersion. @type hostname: string @param hostname: hostname of the node to connect to. @rtype: bool @return: Whether RAPI is working properly """ curl_config = rapi.client.GenericCurlConfig() rapi_client = rapi.client.GanetiRapiClient(hostname, curl_config_fn=curl_config) try: master_version = rapi_client.GetVersion() except rapi.client.CertificateError, err: logging.warning("RAPI Error: CertificateError (%s)", err) return False except rapi.client.GanetiApiError, err: logging.warning("RAPI Error: GanetiApiError (%s)", err) return False logging.debug("RAPI Result: master_version is %s", master_version) return master_version == constants.RAPI_VERSION def ParseOptions(): """Parse the command line options. @return: (options, args) as from OptionParser.parse_args() """ parser = OptionParser(description="Ganeti cluster watcher", usage="%prog [-d]", version="%%prog (ganeti) %s" % constants.RELEASE_VERSION) parser.add_option(cli.DEBUG_OPT) parser.add_option("-A", "--job-age", dest="job_age", help="Autoarchive jobs older than this age (default" " 6 hours)", default=6*3600) parser.add_option("--ignore-pause", dest="ignore_pause", default=False, action="store_true", help="Ignore cluster pause setting") options, args = parser.parse_args() options.job_age = cli.ParseTimespec(options.job_age) return options, args @rapi.client.UsesRapiClient def main(): """Main function. """ global client # pylint: disable-msg=W0603 options, args = ParseOptions() if args: # watcher doesn't take any arguments print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0]) sys.exit(constants.EXIT_FAILURE) utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug, stderr_logging=options.debug) if ShouldPause() and not options.ignore_pause: logging.debug("Pause has been set, exiting") sys.exit(constants.EXIT_SUCCESS) statefile = OpenStateFile(constants.WATCHER_STATEFILE) if not statefile: sys.exit(constants.EXIT_FAILURE) update_file = False try: StartNodeDaemons() RunWatcherHooks() # run node maintenance in all cases, even if master, so that old # masters can be properly cleaned up too if NodeMaintenance.ShouldRun(): NodeMaintenance().Exec() notepad = WatcherState(statefile) try: try: client = cli.GetClient() except errors.OpPrereqError: # this is, from cli.GetClient, a not-master case logging.debug("Not on master, exiting") update_file = True sys.exit(constants.EXIT_SUCCESS) except luxi.NoMasterError, err: logging.warning("Master seems to be down (%s), trying to restart", str(err)) if not utils.EnsureDaemon(constants.MASTERD): logging.critical("Can't start the master, exiting") sys.exit(constants.EXIT_FAILURE) # else retry the connection client = cli.GetClient() # we are on master now utils.EnsureDaemon(constants.RAPI) # If RAPI isn't responding to queries, try one restart. logging.debug("Attempting to talk with RAPI.") if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): logging.warning("Couldn't get answer from Ganeti RAPI daemon." " Restarting Ganeti RAPI.") utils.StopDaemon(constants.RAPI) utils.EnsureDaemon(constants.RAPI) logging.debug("Second attempt to talk with RAPI") if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST): logging.fatal("RAPI is not responding. Please investigate.") logging.debug("Successfully talked to RAPI.") try: watcher = Watcher(options, notepad) except errors.ConfigurationError: # Just exit if there's no configuration update_file = True sys.exit(constants.EXIT_SUCCESS) watcher.Run() update_file = True finally: if update_file: notepad.Save() else: logging.debug("Not updating status file due to failure") except SystemExit: raise except NotMasterError: logging.debug("Not master, exiting") sys.exit(constants.EXIT_NOTMASTER) except errors.ResolverError, err: logging.error("Cannot resolve hostname '%s', exiting.", err.args[0]) sys.exit(constants.EXIT_NODESETUP_ERROR) except errors.JobQueueFull: logging.error("Job queue is full, can't query cluster state") except errors.JobQueueDrainError: logging.error("Job queue is drained, can't maintain cluster state") except Exception, err: logging.exception(str(err)) sys.exit(constants.EXIT_FAILURE) if __name__ == '__main__': main()