X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/3ecf6786a731c0a467f0efcbf6346914e3824ec8..8fcc0cb12adba50500dd7e0021cd8900b7da5713:/daemons/ganeti-watcher?ds=inline diff --git a/daemons/ganeti-watcher b/daemons/ganeti-watcher index ff798b1..42a2eaf 100755 --- a/daemons/ganeti-watcher +++ b/daemons/ganeti-watcher @@ -1,7 +1,7 @@ #!/usr/bin/python # -# Copyright (C) 2006, 2007 Google Inc. +# Copyright (C) 2006, 2007, 2008 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -24,235 +24,261 @@ This program and set of classes implement a watchdog to restart virtual machines in a Ganeti cluster that have crashed or been killed by a node reboot. Run from cron or similar. -""" - -LOGFILE = '/var/log/ganeti/watcher.log' -MAXTRIES = 5 -BAD_STATES = ['stopped'] -HELPLESS_STATES = ['(node down)'] -NOTICE = 'NOTICE' -ERROR = 'ERROR' +""" import os import sys import time -import fcntl -import errno -import socket +import logging from optparse import OptionParser - from ganeti import utils from ganeti import constants -from ganeti import ssconf +from ganeti import serializer +from ganeti import errors +from ganeti import opcodes +from ganeti import cli +from ganeti import luxi + + +MAXTRIES = 5 +BAD_STATES = ['ERROR_down'] +HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline'] +NOTICE = 'NOTICE' +ERROR = 'ERROR' +KEY_RESTART_COUNT = "restart_count" +KEY_RESTART_WHEN = "restart_when" +KEY_BOOT_ID = "bootid" -class Error(Exception): - """Generic custom error class.""" +# Global client object +client = None -class NotMasterError(Error): +class NotMasterError(errors.GenericError): """Exception raised when this host is not the master.""" def Indent(s, prefix='| '): """Indent a piece of text with a given prefix before each line. - Args: - s: The string to indent - prefix: The string to prepend each line. + @param s: the string to indent + @param prefix: the string to prepend each line """ return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines())) -def DoCmd(cmd): - """Run a shell command. - - Args: - cmd: the command to run. - - Raises CommandError with verbose commentary on error. +def StartMaster(): + """Try to start the master daemon. """ - res = utils.RunCmd(cmd) - - if res.failed: - raise Error("Command %s failed:\n%s\nstdout:\n%sstderr:\n%s" % - (repr(cmd), - Indent(res.fail_reason), - Indent(res.stdout), - Indent(res.stderr))) + result = utils.RunCmd(['ganeti-masterd']) + if result.failed: + logging.error("Can't start the master daemon: output '%s'", result.output) + return not result.failed - return res - -class RestarterState(object): +class WatcherState(object): """Interface to a state file recording restart attempts. - Methods: - Open(): open, lock, read and parse the file. - Raises StandardError on lock contention. - - NumberOfAttempts(name): returns the number of times in succession - a restart has been attempted of the named instance. - - RecordAttempt(name, when): records one restart attempt of name at - time in when. - - Remove(name): remove record given by name, if exists. - - Save(name): saves all records to file, releases lock and closes file. - """ def __init__(self): + """Open, lock, read and parse the file. + + Raises exception on lock contention. + + """ # The two-step dance below is necessary to allow both opening existing # file read/write and creating if not existing. Vanilla open will truncate # an existing file -or- allow creating if not existing. - f = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT) - f = os.fdopen(f, 'w+') + fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT) + self.statefile = os.fdopen(fd, 'w+') + + utils.LockFile(self.statefile.fileno()) try: - fcntl.flock(f.fileno(), fcntl.LOCK_EX|fcntl.LOCK_NB) - except IOError, x: - if x.errno == errno.EAGAIN: - raise StandardError("State file already locked") - raise + state_data = self.statefile.read() + if not state_data: + self._data = {} + else: + self._data = serializer.Load(state_data) + except Exception, msg: + # Ignore errors while loading the file and treat it as empty + self._data = {} + logging.warning(("Invalid state file. Using defaults." + " Error message: %s"), msg) - self.statefile = f - self.inst_map = {} + if "instance" not in self._data: + self._data["instance"] = {} + if "node" not in self._data: + self._data["node"] = {} - for line in f: - name, when, count = line.rstrip().split(':') + self._orig_data = serializer.Dump(self._data) - when = int(when) - count = int(count) + def Save(self): + """Save state to file, then unlock and close it. - self.inst_map[name] = (when, count) + """ + assert self.statefile - def NumberOfAttempts(self, instance): - """Returns number of previous restart attempts. + serialized_form = serializer.Dump(self._data) + if self._orig_data == serialized_form: + logging.debug("Data didn't change, just touching status file") + os.utime(constants.WATCHER_STATEFILE, None) + return - Args: - instance - the instance to look up. + # We need to make sure the file is locked before renaming it, otherwise + # starting ganeti-watcher again at the same time will create a conflict. + fd = utils.WriteFile(constants.WATCHER_STATEFILE, + data=serialized_form, + prewrite=utils.LockFile, close=False) + self.statefile = os.fdopen(fd, 'w+') + + def Close(self): + """Unlock configuration file and close it. """ assert self.statefile - if instance.name in self.inst_map: - return self.inst_map[instance.name][1] + # Files are automatically unlocked when closing them + self.statefile.close() + self.statefile = None - return 0 + def GetNodeBootID(self, name): + """Returns the last boot ID of a node or None. - def RecordAttempt(self, instance): - """Record a restart attempt. + """ + ndata = self._data["node"] + + if name in ndata and KEY_BOOT_ID in ndata[name]: + return ndata[name][KEY_BOOT_ID] + return None - Args: - instance - the instance being restarted + def SetNodeBootID(self, name, bootid): + """Sets the boot ID of a node. """ - assert self.statefile + assert bootid - when = time.time() + ndata = self._data["node"] - self.inst_map[instance.name] = (when, 1 + self.NumberOfAttempts(instance)) + if name not in ndata: + ndata[name] = {} - def Remove(self, instance): - """Update state to reflect that a machine is running, i.e. remove record. + ndata[name][KEY_BOOT_ID] = bootid - Args: - instance - the instance to remove from books + def NumberOfRestartAttempts(self, instance): + """Returns number of previous restart attempts. - This method removes the record for a named instance. + @type instance: L{Instance} + @param instance: the instance to look up """ - assert self.statefile + idata = self._data["instance"] - if instance.name in self.inst_map: - del self.inst_map[instance.name] + if instance.name in idata: + return idata[instance.name][KEY_RESTART_COUNT] - def Save(self): - """Save records to file, then unlock and close file. + return 0 + + def RecordRestartAttempt(self, instance): + """Record a restart attempt. + + @type instance: L{Instance} + @param instance: the instance being restarted """ - assert self.statefile + idata = self._data["instance"] - self.statefile.seek(0) - self.statefile.truncate() + if instance.name not in idata: + inst = idata[instance.name] = {} + else: + inst = idata[instance.name] - for name in self.inst_map: - print >> self.statefile, "%s:%d:%d" % ((name,) + self.inst_map[name]) + inst[KEY_RESTART_WHEN] = time.time() + inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1 - fcntl.flock(self.statefile.fileno(), fcntl.LOCK_UN) + def RemoveInstance(self, instance): + """Update state to reflect that a machine is running. - self.statefile.close() - self.statefile = None + This method removes the record for a named instance (as we only + track down instances). + + @type instance: L{Instance} + @param instance: the instance to remove from books + + """ + idata = self._data["instance"] + + if instance.name in idata: + del idata[instance.name] class Instance(object): """Abstraction for a Virtual Machine instance. - Methods: - Restart(): issue a command to restart the represented machine. - """ - def __init__(self, name, state): + def __init__(self, name, state, autostart): self.name = name self.state = state + self.autostart = autostart def Restart(self): """Encapsulates the start of an instance. - This is currently done using the command line interface and not - the Ganeti modules. + """ + op = opcodes.OpStartupInstance(instance_name=self.name, force=False) + cli.SubmitOpCode(op, cl=client) + + def ActivateDisks(self): + """Encapsulates the activation of all disks of an instance. """ - DoCmd(['gnt-instance', 'startup', '--lock-retries=15', self.name]) + op = opcodes.OpActivateInstanceDisks(instance_name=self.name) + cli.SubmitOpCode(op, cl=client) -class InstanceList(object): - """The set of Virtual Machine instances on a cluster. +def GetClusterData(): + """Get a list of instances on this cluster. """ - cmd = ['gnt-instance', 'list', '--lock-retries=15', - '-o', 'name,admin_state,oper_state', '--no-headers', '--separator=:'] + op1_fields = ["name", "status", "admin_state", "snodes"] + op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[], + use_locking=True) + op2_fields = ["name", "bootid", "offline"] + op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[], + use_locking=True) - def __init__(self): - res = DoCmd(self.cmd) + job_id = client.SubmitJob([op1, op2]) - lines = res.stdout.splitlines() + all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug) - self.instances = [] - for line in lines: - fields = [fld.strip() for fld in line.split(':')] + result = all_results[0] + smap = {} - if len(fields) != 3: - continue - if fields[1] == "no": #no autostart, we don't care about this instance - continue - name, status = fields[0], fields[2] + instances = {} + for fields in result: + (name, status, autostart, snodes) = fields - self.instances.append(Instance(name, status)) + # update the secondary node map + for node in snodes: + if node not in smap: + smap[node] = [] + smap[node].append(name) - def __iter__(self): - return self.instances.__iter__() + instances[name] = Instance(name, status, autostart) + nodes = dict([(name, (bootid, offline)) + for name, bootid, offline in all_results[1]]) -class Message(object): - """Encapsulation of a notice or error message. + client.ArchiveJob(job_id) - """ - def __init__(self, level, msg): - self.level = level - self.msg = msg - self.when = time.time() + return instances, nodes, smap - def __str__(self): - return self.level + ' ' + time.ctime(self.when) + '\n' + Indent(self.msg) - -class Restarter(object): +class Watcher(object): """Encapsulate the logic for restarting erronously halted virtual machines. The calling program should periodically instantiate me and call Run(). @@ -260,23 +286,83 @@ class Restarter(object): to restart machines that are down. """ - def __init__(self): - sstore = ssconf.SimpleStore() - master = sstore.GetMasterNode() - if master != socket.gethostname(): + def __init__(self, opts, notepad): + self.notepad = notepad + master = client.QueryConfigValues(["master_node"])[0] + if master != utils.HostInfo().name: raise NotMasterError("This is not the master node") - self.instances = InstanceList() - self.messages = [] + self.instances, self.bootids, self.smap = GetClusterData() + self.started_instances = set() + self.opts = opts def Run(self): - """Make a pass over the list of instances, restarting downed ones. + """Watcher run sequence. + + """ + notepad = self.notepad + self.ArchiveJobs(self.opts.job_age) + self.CheckInstances(notepad) + self.CheckDisks(notepad) + self.VerifyDisks() + + def ArchiveJobs(self, age): + """Archive old jobs. """ - notepad = RestarterState() + arch_count, left_count = client.AutoArchiveJobs(age) + logging.debug("Archived %s jobs, left %s" % (arch_count, left_count)) + + def CheckDisks(self, notepad): + """Check all nodes for restarted ones. - for instance in self.instances: + """ + check_nodes = [] + for name, (new_id, offline) in self.bootids.iteritems(): + old = notepad.GetNodeBootID(name) + if new_id is None: + # Bad node, not returning a boot id + if not offline: + logging.debug("Node %s missing boot id, skipping secondary checks", + name) + continue + if old != new_id: + # Node's boot ID has changed, proably through a reboot. + check_nodes.append(name) + + if check_nodes: + # Activate disks for all instances with any of the checked nodes as a + # secondary node. + for node in check_nodes: + if node not in self.smap: + continue + for instance_name in self.smap[node]: + instance = self.instances[instance_name] + if not instance.autostart: + logging.info(("Skipping disk activation for non-autostart" + " instance %s"), instance.name) + continue + if instance.name in self.started_instances: + # we already tried to start the instance, which should have + # activated its drives (if they can be at all) + continue + try: + logging.info("Activating disks for instance %s", instance.name) + instance.ActivateDisks() + except Exception: + logging.exception("Error while activating disks for instance %s", + instance.name) + + # Keep changed boot IDs + for name in check_nodes: + notepad.SetNodeBootID(name, self.bootids[name][0]) + + def CheckInstances(self, notepad): + """Make a pass over the list of instances, restarting downed ones. + + """ + for instance in self.instances.values(): if instance.state in BAD_STATES: - n = notepad.NumberOfAttempts(instance) + n = notepad.NumberOfRestartAttempts(instance) if n > MAXTRIES: # stay quiet. @@ -284,48 +370,59 @@ class Restarter(object): elif n < MAXTRIES: last = " (Attempt #%d)" % (n + 1) else: - notepad.RecordAttempt(instance) - self.messages.append(Message(ERROR, "Could not restart %s for %d" - " times, giving up..." % - (instance.name, MAXTRIES))) + notepad.RecordRestartAttempt(instance) + logging.error("Could not restart %s after %d attempts, giving up", + instance.name, MAXTRIES) continue try: - self.messages.append(Message(NOTICE, - "Restarting %s%s." % - (instance.name, last))) + logging.info("Restarting %s%s", + instance.name, last) instance.Restart() - except Error, x: - self.messages.append(Message(ERROR, str(x))) + self.started_instances.add(instance.name) + except Exception: + logging.exception("Error while restarting instance %s", + instance.name) - notepad.RecordAttempt(instance) + notepad.RecordRestartAttempt(instance) elif instance.state in HELPLESS_STATES: - if notepad.NumberOfAttempts(instance): - notepad.Remove(instance) + if notepad.NumberOfRestartAttempts(instance): + notepad.RemoveInstance(instance) else: - if notepad.NumberOfAttempts(instance): - notepad.Remove(instance) - msg = Message(NOTICE, - "Restart of %s succeeded." % instance.name) - self.messages.append(msg) - - notepad.Save() - - def WriteReport(self, logfile): - """Log all messages to file. + if notepad.NumberOfRestartAttempts(instance): + notepad.RemoveInstance(instance) + logging.info("Restart of %s succeeded", instance.name) - Args: - logfile: file object open for writing (the log file) + @staticmethod + def VerifyDisks(): + """Run gnt-cluster verify-disks. """ - for msg in self.messages: - print >> logfile, str(msg) + op = opcodes.OpVerifyDisks() + job_id = client.SubmitJob([op]) + result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0] + client.ArchiveJob(job_id) + if not isinstance(result, (tuple, list)): + logging.error("Can't get a valid result from verify-disks") + return + offline_disk_instances = result[2] + if not offline_disk_instances: + # nothing to do + return + logging.debug("Will activate disks for instances %s", + ", ".join(offline_disk_instances)) + # we submit only one job, and wait for it. not optimal, but spams + # less the job queue + job = [opcodes.OpActivateInstanceDisks(instance_name=name) + for name in offline_disk_instances] + job_id = cli.SendJob(job, cl=client) + + cli.PollJob(job_id, cl=client, feedback_fn=logging.debug) def ParseOptions(): """Parse the command line options. - Returns: - (options, args) as from OptionParser.parse_args() + @return: (options, args) as from OptionParser.parse_args() """ parser = OptionParser(description="Ganeti cluster watcher", @@ -334,9 +431,13 @@ def ParseOptions(): constants.RELEASE_VERSION) parser.add_option("-d", "--debug", dest="debug", - help="Don't redirect messages to the log file", + help="Write all messages to stderr", default=False, action="store_true") + parser.add_option("-A", "--job-age", dest="job_age", + help="Autoarchive jobs older than this age (default" + " 6 hours)", default=6*3600) options, args = parser.parse_args() + options.job_age = cli.ParseTimespec(options.job_age) return options, args @@ -344,21 +445,57 @@ def main(): """Main function. """ + global client + options, args = ParseOptions() - if not options.debug: - sys.stderr = sys.stdout = open(LOGFILE, 'a') + utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug, + stderr_logging=options.debug) + update_file = True try: - restarter = Restarter() - restarter.Run() - restarter.WriteReport(sys.stdout) + notepad = WatcherState() + try: + try: + client = cli.GetClient() + except errors.OpPrereqError: + # this is, from cli.GetClient, a not-master case + logging.debug("Not on master, exiting") + sys.exit(constants.EXIT_SUCCESS) + except luxi.NoMasterError, err: + logging.warning("Master seems to be down (%s), trying to restart", + str(err)) + if not StartMaster(): + logging.critical("Can't start the master, exiting") + update_file = False + sys.exit(constants.EXIT_FAILURE) + # else retry the connection + client = cli.GetClient() + + try: + watcher = Watcher(options, notepad) + except errors.ConfigurationError: + # Just exit if there's no configuration + sys.exit(constants.EXIT_SUCCESS) + + watcher.Run() + finally: + if update_file: + notepad.Save() + else: + logging.debug("Not updating status file due to failure") + except SystemExit: + raise except NotMasterError: - if options.debug: - sys.stderr.write("Not master, exiting.\n") + logging.debug("Not master, exiting") sys.exit(constants.EXIT_NOTMASTER) - except Error, err: - print err + except errors.ResolverError, err: + logging.error("Cannot resolve hostname '%s', exiting.", err.args[0]) + sys.exit(constants.EXIT_NODESETUP_ERROR) + except Exception, err: + logging.error(str(err), exc_info=True) + sys.exit(constants.EXIT_FAILURE) + if __name__ == '__main__': main()