X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/3448aa2292aa4a21941da6c2c809007d678f9ef0..ba55d062da8dfb89a37afc2f13f2e689d0094829:/daemons/ganeti-watcher diff --git a/daemons/ganeti-watcher b/daemons/ganeti-watcher index f12cab5..42a2eaf 100755 --- a/daemons/ganeti-watcher +++ b/daemons/ganeti-watcher @@ -39,6 +39,7 @@ from ganeti import serializer from ganeti import errors from ganeti import opcodes from ganeti import cli +from ganeti import luxi MAXTRIES = 5 @@ -69,6 +70,16 @@ def Indent(s, prefix='| '): return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines())) +def StartMaster(): + """Try to start the master daemon. + + """ + result = utils.RunCmd(['ganeti-masterd']) + if result.failed: + logging.error("Can't start the master daemon: output '%s'", result.output) + return not result.failed + + class WatcherState(object): """Interface to a state file recording restart attempts. @@ -88,11 +99,15 @@ class WatcherState(object): utils.LockFile(self.statefile.fileno()) try: - self._data = serializer.Load(self.statefile.read()) + state_data = self.statefile.read() + if not state_data: + self._data = {} + else: + self._data = serializer.Load(state_data) except Exception, msg: # Ignore errors while loading the file and treat it as empty self._data = {} - logging.warning(("Empty or invalid state file. Using defaults." + logging.warning(("Invalid state file. Using defaults." " Error message: %s"), msg) if "instance" not in self._data: @@ -214,9 +229,7 @@ class Instance(object): """Encapsulates the start of an instance. """ - op = opcodes.OpStartupInstance(instance_name=self.name, - force=False, - extra_args=None) + op = opcodes.OpStartupInstance(instance_name=self.name, force=False) cli.SubmitOpCode(op, cl=client) def ActivateDisks(self): @@ -227,45 +240,42 @@ class Instance(object): cli.SubmitOpCode(op, cl=client) -def GetInstanceList(with_secondaries=None): +def GetClusterData(): """Get a list of instances on this cluster. """ - fields = ["name", "status", "admin_state"] - - if with_secondaries is not None: - fields.append("snodes") - - result = client.QueryInstances([], fields, True) + op1_fields = ["name", "status", "admin_state", "snodes"] + op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[], + use_locking=True) + op2_fields = ["name", "bootid", "offline"] + op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[], + use_locking=True) - instances = [] - for fields in result: - if with_secondaries is not None: - (name, status, autostart, snodes) = fields + job_id = client.SubmitJob([op1, op2]) - if not snodes: - continue + all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug) - for node in with_secondaries: - if node in snodes: - break - else: - continue + result = all_results[0] + smap = {} - else: - (name, status, autostart) = fields + instances = {} + for fields in result: + (name, status, autostart, snodes) = fields - instances.append(Instance(name, status, autostart)) + # update the secondary node map + for node in snodes: + if node not in smap: + smap[node] = [] + smap[node].append(name) - return instances + instances[name] = Instance(name, status, autostart) + nodes = dict([(name, (bootid, offline)) + for name, bootid, offline in all_results[1]]) -def GetNodeBootIDs(): - """Get a dict mapping nodes to boot IDs. + client.ArchiveJob(job_id) - """ - result = client.QueryNodes([], ["name", "bootid", "offline"], True) - return dict([(name, (bootid, offline)) for name, bootid, offline in result]) + return instances, nodes, smap class Watcher(object): @@ -276,24 +286,24 @@ class Watcher(object): to restart machines that are down. """ - def __init__(self, opts): + def __init__(self, opts, notepad): + self.notepad = notepad master = client.QueryConfigValues(["master_node"])[0] if master != utils.HostInfo().name: raise NotMasterError("This is not the master node") - self.instances = GetInstanceList() - self.bootids = GetNodeBootIDs() + self.instances, self.bootids, self.smap = GetClusterData() self.started_instances = set() self.opts = opts def Run(self): - notepad = WatcherState() - try: - self.ArchiveJobs(self.opts.job_age) - self.CheckInstances(notepad) - self.CheckDisks(notepad) - self.VerifyDisks() - finally: - notepad.Save() + """Watcher run sequence. + + """ + notepad = self.notepad + self.ArchiveJobs(self.opts.job_age) + self.CheckInstances(notepad) + self.CheckDisks(notepad) + self.VerifyDisks() def ArchiveJobs(self, age): """Archive old jobs. @@ -322,21 +332,25 @@ class Watcher(object): if check_nodes: # Activate disks for all instances with any of the checked nodes as a # secondary node. - for instance in GetInstanceList(with_secondaries=check_nodes): - if not instance.autostart: - logging.info(("Skipping disk activation for non-autostart" - " instance %s"), instance.name) - continue - if instance.name in self.started_instances: - # we already tried to start the instance, which should have - # activated its drives (if they can be at all) + for node in check_nodes: + if node not in self.smap: continue - try: - logging.info("Activating disks for instance %s", instance.name) - instance.ActivateDisks() - except Exception: - logging.exception("Error while activating disks for instance %s", - instance.name) + for instance_name in self.smap[node]: + instance = self.instances[instance_name] + if not instance.autostart: + logging.info(("Skipping disk activation for non-autostart" + " instance %s"), instance.name) + continue + if instance.name in self.started_instances: + # we already tried to start the instance, which should have + # activated its drives (if they can be at all) + continue + try: + logging.info("Activating disks for instance %s", instance.name) + instance.ActivateDisks() + except Exception: + logging.exception("Error while activating disks for instance %s", + instance.name) # Keep changed boot IDs for name in check_nodes: @@ -346,7 +360,7 @@ class Watcher(object): """Make a pass over the list of instances, restarting downed ones. """ - for instance in self.instances: + for instance in self.instances.values(): if instance.state in BAD_STATES: n = notepad.NumberOfRestartAttempts(instance) @@ -384,7 +398,9 @@ class Watcher(object): """ op = opcodes.OpVerifyDisks() - result = cli.SubmitOpCode(op, cl=client) + job_id = client.SubmitJob([op]) + result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0] + client.ArchiveJob(job_id) if not isinstance(result, (tuple, list)): logging.error("Can't get a valid result from verify-disks") return @@ -436,16 +452,38 @@ def main(): utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug, stderr_logging=options.debug) + update_file = True try: - client = cli.GetClient() - + notepad = WatcherState() try: - watcher = Watcher(options) - except errors.ConfigurationError: - # Just exit if there's no configuration - sys.exit(constants.EXIT_SUCCESS) - - watcher.Run() + try: + client = cli.GetClient() + except errors.OpPrereqError: + # this is, from cli.GetClient, a not-master case + logging.debug("Not on master, exiting") + sys.exit(constants.EXIT_SUCCESS) + except luxi.NoMasterError, err: + logging.warning("Master seems to be down (%s), trying to restart", + str(err)) + if not StartMaster(): + logging.critical("Can't start the master, exiting") + update_file = False + sys.exit(constants.EXIT_FAILURE) + # else retry the connection + client = cli.GetClient() + + try: + watcher = Watcher(options, notepad) + except errors.ConfigurationError: + # Just exit if there's no configuration + sys.exit(constants.EXIT_SUCCESS) + + watcher.Run() + finally: + if update_file: + notepad.Save() + else: + logging.debug("Not updating status file due to failure") except SystemExit: raise except NotMasterError: