X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/37b77b18ab0f35a17a6c58dc99163b59ffc27b41..f93427cdde4b201478cff147b197bb12f8221362:/daemons/ganeti-watcher diff --git a/daemons/ganeti-watcher b/daemons/ganeti-watcher index fc7fd1b..6c97d40 100755 --- a/daemons/ganeti-watcher +++ b/daemons/ganeti-watcher @@ -27,25 +27,29 @@ by a node reboot. Run from cron or similar. """ +# pylint: disable-msg=C0103,W0142 + +# C0103: Invalid name ganeti-watcher + import os import sys import time import logging +import errno from optparse import OptionParser from ganeti import utils from ganeti import constants from ganeti import serializer -from ganeti import ssconf from ganeti import errors from ganeti import opcodes -from ganeti import logger from ganeti import cli +from ganeti import luxi MAXTRIES = 5 BAD_STATES = ['ERROR_down'] -HELPLESS_STATES = ['ERROR_nodedown'] +HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline'] NOTICE = 'NOTICE' ERROR = 'ERROR' KEY_RESTART_COUNT = "restart_count" @@ -64,14 +68,33 @@ class NotMasterError(errors.GenericError): def Indent(s, prefix='| '): """Indent a piece of text with a given prefix before each line. - Args: - s: The string to indent - prefix: The string to prepend each line. + @param s: the string to indent + @param prefix: the string to prepend each line """ return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines())) +def ShouldPause(): + """Check whether we should pause. + + """ + return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)) + + +def EnsureDaemon(name): + """Check for and start daemon if not alive. + + """ + result = utils.RunCmd([constants.DAEMON_UTIL, "check-and-start", name]) + if result.failed: + logging.error("Can't start daemon '%s', failure %s, output: %s", + name, result.fail_reason, result.output) + return False + + return True + + class WatcherState(object): """Interface to a state file recording restart attempts. @@ -91,11 +114,15 @@ class WatcherState(object): utils.LockFile(self.statefile.fileno()) try: - self._data = serializer.Load(self.statefile.read()) - except Exception, msg: + state_data = self.statefile.read() + if not state_data: + self._data = {} + else: + self._data = serializer.Load(state_data) + except Exception, msg: # pylint: disable-msg=W0703 # Ignore errors while loading the file and treat it as empty self._data = {} - logging.warning(("Empty or invalid state file. Using defaults." + logging.warning(("Invalid state file. Using defaults." " Error message: %s"), msg) if "instance" not in self._data: @@ -160,8 +187,8 @@ class WatcherState(object): def NumberOfRestartAttempts(self, instance): """Returns number of previous restart attempts. - Args: - instance - the instance to look up. + @type instance: L{Instance} + @param instance: the instance to look up """ idata = self._data["instance"] @@ -174,8 +201,8 @@ class WatcherState(object): def RecordRestartAttempt(self, instance): """Record a restart attempt. - Args: - instance - the instance being restarted + @type instance: L{Instance} + @param instance: the instance being restarted """ idata = self._data["instance"] @@ -189,12 +216,13 @@ class WatcherState(object): inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1 def RemoveInstance(self, instance): - """Update state to reflect that a machine is running, i.e. remove record. + """Update state to reflect that a machine is running. - Args: - instance - the instance to remove from books + This method removes the record for a named instance (as we only + track down instances). - This method removes the record for a named instance. + @type instance: L{Instance} + @param instance: the instance to remove from books """ idata = self._data["instance"] @@ -206,9 +234,6 @@ class WatcherState(object): class Instance(object): """Abstraction for a Virtual Machine instance. - Methods: - Restart(): issue a command to restart the represented machine. - """ def __init__(self, name, state, autostart): self.name = name @@ -219,9 +244,7 @@ class Instance(object): """Encapsulates the start of an instance. """ - op = opcodes.OpStartupInstance(instance_name=self.name, - force=False, - extra_args=None) + op = opcodes.OpStartupInstance(instance_name=self.name, force=False) cli.SubmitOpCode(op, cl=client) def ActivateDisks(self): @@ -232,45 +255,49 @@ class Instance(object): cli.SubmitOpCode(op, cl=client) -def GetInstanceList(with_secondaries=None): +def GetClusterData(): """Get a list of instances on this cluster. """ - fields = ["name", "status", "admin_state"] + op1_fields = ["name", "status", "admin_state", "snodes"] + op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[], + use_locking=True) + op2_fields = ["name", "bootid", "offline"] + op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[], + use_locking=True) - if with_secondaries is not None: - fields.append("snodes") + job_id = client.SubmitJob([op1, op2]) - result = client.QueryInstances([], fields) + all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug) - instances = [] - for fields in result: - if with_secondaries is not None: - (name, status, autostart, snodes) = fields + logging.debug("Got data from cluster, writing instance status file") - if not snodes: - continue + result = all_results[0] + smap = {} - for node in with_secondaries: - if node in snodes: - break - else: - continue + instances = {} - else: - (name, status, autostart) = fields + # write the upfile + up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result]) + utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data) - instances.append(Instance(name, status, autostart)) + for fields in result: + (name, status, autostart, snodes) = fields - return instances + # update the secondary node map + for node in snodes: + if node not in smap: + smap[node] = [] + smap[node].append(name) + instances[name] = Instance(name, status, autostart) -def GetNodeBootIDs(): - """Get a dict mapping nodes to boot IDs. + nodes = dict([(name, (bootid, offline)) + for name, bootid, offline in all_results[1]]) - """ - result = client.QueryNodes([], ["name", "bootid"]) - return dict([(name, bootid) for name, bootid in result]) + client.ArchiveJob(job_id) + + return instances, nodes, smap class Watcher(object): @@ -281,35 +308,47 @@ class Watcher(object): to restart machines that are down. """ - def __init__(self): - sstore = ssconf.SimpleStore() - master = sstore.GetMasterNode() + def __init__(self, opts, notepad): + self.notepad = notepad + master = client.QueryConfigValues(["master_node"])[0] if master != utils.HostInfo().name: raise NotMasterError("This is not the master node") - self.instances = GetInstanceList() - self.bootids = GetNodeBootIDs() + # first archive old jobs + self.ArchiveJobs(opts.job_age) + # and only then submit new ones + self.instances, self.bootids, self.smap = GetClusterData() self.started_instances = set() + self.opts = opts def Run(self): - notepad = WatcherState() - try: - self.CheckInstances(notepad) - self.CheckDisks(notepad) - self.VerifyDisks() - finally: - notepad.Save() + """Watcher run sequence. + + """ + notepad = self.notepad + self.CheckInstances(notepad) + self.CheckDisks(notepad) + self.VerifyDisks() + + @staticmethod + def ArchiveJobs(age): + """Archive old jobs. + + """ + arch_count, left_count = client.AutoArchiveJobs(age) + logging.debug("Archived %s jobs, left %s", arch_count, left_count) def CheckDisks(self, notepad): """Check all nodes for restarted ones. """ check_nodes = [] - for name, new_id in self.bootids.iteritems(): + for name, (new_id, offline) in self.bootids.iteritems(): old = notepad.GetNodeBootID(name) if new_id is None: # Bad node, not returning a boot id - logging.debug("Node %s missing boot id, skipping secondary checks", - name) + if not offline: + logging.debug("Node %s missing boot id, skipping secondary checks", + name) continue if old != new_id: # Node's boot ID has changed, proably through a reboot. @@ -318,31 +357,35 @@ class Watcher(object): if check_nodes: # Activate disks for all instances with any of the checked nodes as a # secondary node. - for instance in GetInstanceList(with_secondaries=check_nodes): - if not instance.autostart: - logging.info(("Skipping disk activation for non-autostart" - " instance %s"), instance.name) + for node in check_nodes: + if node not in self.smap: continue - if instance.name in self.started_instances: - # we already tried to start the instance, which should have - # activated its drives (if they can be at all) - continue - try: - logging.info("Activating disks for instance %s", instance.name) - instance.ActivateDisks() - except Exception: - logging.exception("Error while activating disks for instance %s", - instance.name) + for instance_name in self.smap[node]: + instance = self.instances[instance_name] + if not instance.autostart: + logging.info(("Skipping disk activation for non-autostart" + " instance %s"), instance.name) + continue + if instance.name in self.started_instances: + # we already tried to start the instance, which should have + # activated its drives (if they can be at all) + continue + try: + logging.info("Activating disks for instance %s", instance.name) + instance.ActivateDisks() + except Exception: # pylint: disable-msg=W0703 + logging.exception("Error while activating disks for instance %s", + instance.name) # Keep changed boot IDs for name in check_nodes: - notepad.SetNodeBootID(name, self.bootids[name]) + notepad.SetNodeBootID(name, self.bootids[name][0]) def CheckInstances(self, notepad): """Make a pass over the list of instances, restarting downed ones. """ - for instance in self.instances: + for instance in self.instances.values(): if instance.state in BAD_STATES: n = notepad.NumberOfRestartAttempts(instance) @@ -361,8 +404,9 @@ class Watcher(object): instance.name, last) instance.Restart() self.started_instances.add(instance.name) - except Exception: - logging.exception("Erro while restarting instance %s", instance.name) + except Exception: # pylint: disable-msg=W0703 + logging.exception("Error while restarting instance %s", + instance.name) notepad.RecordRestartAttempt(instance) elif instance.state in HELPLESS_STATES: @@ -379,7 +423,9 @@ class Watcher(object): """ op = opcodes.OpVerifyDisks() - result = cli.SubmitOpCode(op, cl=client) + job_id = client.SubmitJob([op]) + result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0] + client.ArchiveJob(job_id) if not isinstance(result, (tuple, list)): logging.error("Can't get a valid result from verify-disks") return @@ -388,7 +434,7 @@ class Watcher(object): # nothing to do return logging.debug("Will activate disks for instances %s", - ", ".join(offline_disk_instances)) + utils.CommaJoin(offline_disk_instances)) # we submit only one job, and wait for it. not optimal, but spams # less the job queue job = [opcodes.OpActivateInstanceDisks(instance_name=name) @@ -401,8 +447,7 @@ class Watcher(object): def ParseOptions(): """Parse the command line options. - Returns: - (options, args) as from OptionParser.parse_args() + @return: (options, args) as from OptionParser.parse_args() """ parser = OptionParser(description="Ganeti cluster watcher", @@ -410,10 +455,12 @@ def ParseOptions(): version="%%prog (ganeti) %s" % constants.RELEASE_VERSION) - parser.add_option("-d", "--debug", dest="debug", - help="Write all messages to stderr", - default=False, action="store_true") + parser.add_option(cli.DEBUG_OPT) + parser.add_option("-A", "--job-age", dest="job_age", + help="Autoarchive jobs older than this age (default" + " 6 hours)", default=6*3600) options, args = parser.parse_args() + options.job_age = cli.ParseTimespec(options.job_age) return options, args @@ -421,23 +468,62 @@ def main(): """Main function. """ - global client + global client # pylint: disable-msg=W0603 options, args = ParseOptions() - logger.SetupLogging(constants.LOG_WATCHER, debug=options.debug, - stderr_logging=options.debug) + if args: # watcher doesn't take any arguments + print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0]) + sys.exit(constants.EXIT_FAILURE) + + utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug, + stderr_logging=options.debug) + + if ShouldPause(): + logging.debug("Pause has been set, exiting") + sys.exit(constants.EXIT_SUCCESS) + update_file = False try: - client = cli.GetClient() + # on master or not, try to start the node dameon + EnsureDaemon(constants.NODED) + notepad = WatcherState() try: - watcher = Watcher() - except errors.ConfigurationError: - # Just exit if there's no configuration - sys.exit(constants.EXIT_SUCCESS) + try: + client = cli.GetClient() + except errors.OpPrereqError: + # this is, from cli.GetClient, a not-master case + logging.debug("Not on master, exiting") + update_file = True + sys.exit(constants.EXIT_SUCCESS) + except luxi.NoMasterError, err: + logging.warning("Master seems to be down (%s), trying to restart", + str(err)) + if not EnsureDaemon(constants.MASTERD): + logging.critical("Can't start the master, exiting") + sys.exit(constants.EXIT_FAILURE) + # else retry the connection + client = cli.GetClient() + + # we are on master now + EnsureDaemon(constants.RAPI) + + try: + watcher = Watcher(options, notepad) + except errors.ConfigurationError: + # Just exit if there's no configuration + update_file = True + sys.exit(constants.EXIT_SUCCESS) + + watcher.Run() + update_file = True - watcher.Run() + finally: + if update_file: + notepad.Save() + else: + logging.debug("Not updating status file due to failure") except SystemExit: raise except NotMasterError: @@ -446,6 +532,10 @@ def main(): except errors.ResolverError, err: logging.error("Cannot resolve hostname '%s', exiting.", err.args[0]) sys.exit(constants.EXIT_NODESETUP_ERROR) + except errors.JobQueueFull: + logging.error("Job queue is full, can't query cluster state") + except errors.JobQueueDrainError: + logging.error("Job queue is drained, can't maintain cluster state") except Exception, err: logging.error(str(err), exc_info=True) sys.exit(constants.EXIT_FAILURE)