4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erronously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
34 from optparse import OptionParser
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import serializer
39 from ganeti import errors
40 from ganeti import opcodes
41 from ganeti import cli
42 from ganeti import luxi
46 BAD_STATES = ['ERROR_down']
47 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
50 KEY_RESTART_COUNT = "restart_count"
51 KEY_RESTART_WHEN = "restart_when"
52 KEY_BOOT_ID = "bootid"
55 # Global client object
59 class NotMasterError(errors.GenericError):
60 """Exception raised when this host is not the master."""
63 def Indent(s, prefix='| '):
64 """Indent a piece of text with a given prefix before each line.
66 @param s: the string to indent
67 @param prefix: the string to prepend each line
70 return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
74 """Try to start the master daemon.
77 result = utils.RunCmd(['ganeti-masterd'])
79 logging.error("Can't start the master daemon: output '%s'", result.output)
80 return not result.failed
83 class WatcherState(object):
84 """Interface to a state file recording restart attempts.
88 """Open, lock, read and parse the file.
90 Raises exception on lock contention.
93 # The two-step dance below is necessary to allow both opening existing
94 # file read/write and creating if not existing. Vanilla open will truncate
95 # an existing file -or- allow creating if not existing.
96 fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
97 self.statefile = os.fdopen(fd, 'w+')
99 utils.LockFile(self.statefile.fileno())
102 state_data = self.statefile.read()
106 self._data = serializer.Load(state_data)
107 except Exception, msg:
108 # Ignore errors while loading the file and treat it as empty
110 logging.warning(("Invalid state file. Using defaults."
111 " Error message: %s"), msg)
113 if "instance" not in self._data:
114 self._data["instance"] = {}
115 if "node" not in self._data:
116 self._data["node"] = {}
118 self._orig_data = serializer.Dump(self._data)
121 """Save state to file, then unlock and close it.
124 assert self.statefile
126 serialized_form = serializer.Dump(self._data)
127 if self._orig_data == serialized_form:
128 logging.debug("Data didn't change, just touching status file")
129 os.utime(constants.WATCHER_STATEFILE, None)
132 # We need to make sure the file is locked before renaming it, otherwise
133 # starting ganeti-watcher again at the same time will create a conflict.
134 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
135 data=serialized_form,
136 prewrite=utils.LockFile, close=False)
137 self.statefile = os.fdopen(fd, 'w+')
140 """Unlock configuration file and close it.
143 assert self.statefile
145 # Files are automatically unlocked when closing them
146 self.statefile.close()
147 self.statefile = None
149 def GetNodeBootID(self, name):
150 """Returns the last boot ID of a node or None.
153 ndata = self._data["node"]
155 if name in ndata and KEY_BOOT_ID in ndata[name]:
156 return ndata[name][KEY_BOOT_ID]
159 def SetNodeBootID(self, name, bootid):
160 """Sets the boot ID of a node.
165 ndata = self._data["node"]
167 if name not in ndata:
170 ndata[name][KEY_BOOT_ID] = bootid
172 def NumberOfRestartAttempts(self, instance):
173 """Returns number of previous restart attempts.
175 @type instance: L{Instance}
176 @param instance: the instance to look up
179 idata = self._data["instance"]
181 if instance.name in idata:
182 return idata[instance.name][KEY_RESTART_COUNT]
186 def RecordRestartAttempt(self, instance):
187 """Record a restart attempt.
189 @type instance: L{Instance}
190 @param instance: the instance being restarted
193 idata = self._data["instance"]
195 if instance.name not in idata:
196 inst = idata[instance.name] = {}
198 inst = idata[instance.name]
200 inst[KEY_RESTART_WHEN] = time.time()
201 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
203 def RemoveInstance(self, instance):
204 """Update state to reflect that a machine is running.
206 This method removes the record for a named instance (as we only
207 track down instances).
209 @type instance: L{Instance}
210 @param instance: the instance to remove from books
213 idata = self._data["instance"]
215 if instance.name in idata:
216 del idata[instance.name]
219 class Instance(object):
220 """Abstraction for a Virtual Machine instance.
223 def __init__(self, name, state, autostart):
226 self.autostart = autostart
229 """Encapsulates the start of an instance.
232 op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
233 cli.SubmitOpCode(op, cl=client)
235 def ActivateDisks(self):
236 """Encapsulates the activation of all disks of an instance.
239 op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
240 cli.SubmitOpCode(op, cl=client)
243 def GetClusterData():
244 """Get a list of instances on this cluster.
247 op1_fields = ["name", "status", "admin_state", "snodes"]
248 op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
250 op2_fields = ["name", "bootid", "offline"]
251 op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
254 job_id = client.SubmitJob([op1, op2])
256 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
258 logging.debug("Got data from cluster, writing instance status file")
260 result = all_results[0]
266 up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
267 utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
269 for fields in result:
270 (name, status, autostart, snodes) = fields
272 # update the secondary node map
276 smap[node].append(name)
278 instances[name] = Instance(name, status, autostart)
280 nodes = dict([(name, (bootid, offline))
281 for name, bootid, offline in all_results[1]])
283 client.ArchiveJob(job_id)
285 return instances, nodes, smap
288 class Watcher(object):
289 """Encapsulate the logic for restarting erronously halted virtual machines.
291 The calling program should periodically instantiate me and call Run().
292 This will traverse the list of instances, and make up to MAXTRIES attempts
293 to restart machines that are down.
296 def __init__(self, opts, notepad):
297 self.notepad = notepad
298 master = client.QueryConfigValues(["master_node"])[0]
299 if master != utils.HostInfo().name:
300 raise NotMasterError("This is not the master node")
301 self.instances, self.bootids, self.smap = GetClusterData()
302 self.started_instances = set()
306 """Watcher run sequence.
309 notepad = self.notepad
310 self.ArchiveJobs(self.opts.job_age)
311 self.CheckInstances(notepad)
312 self.CheckDisks(notepad)
315 def ArchiveJobs(self, age):
319 arch_count, left_count = client.AutoArchiveJobs(age)
320 logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
322 def CheckDisks(self, notepad):
323 """Check all nodes for restarted ones.
327 for name, (new_id, offline) in self.bootids.iteritems():
328 old = notepad.GetNodeBootID(name)
330 # Bad node, not returning a boot id
332 logging.debug("Node %s missing boot id, skipping secondary checks",
336 # Node's boot ID has changed, proably through a reboot.
337 check_nodes.append(name)
340 # Activate disks for all instances with any of the checked nodes as a
342 for node in check_nodes:
343 if node not in self.smap:
345 for instance_name in self.smap[node]:
346 instance = self.instances[instance_name]
347 if not instance.autostart:
348 logging.info(("Skipping disk activation for non-autostart"
349 " instance %s"), instance.name)
351 if instance.name in self.started_instances:
352 # we already tried to start the instance, which should have
353 # activated its drives (if they can be at all)
356 logging.info("Activating disks for instance %s", instance.name)
357 instance.ActivateDisks()
359 logging.exception("Error while activating disks for instance %s",
362 # Keep changed boot IDs
363 for name in check_nodes:
364 notepad.SetNodeBootID(name, self.bootids[name][0])
366 def CheckInstances(self, notepad):
367 """Make a pass over the list of instances, restarting downed ones.
370 for instance in self.instances.values():
371 if instance.state in BAD_STATES:
372 n = notepad.NumberOfRestartAttempts(instance)
378 last = " (Attempt #%d)" % (n + 1)
380 notepad.RecordRestartAttempt(instance)
381 logging.error("Could not restart %s after %d attempts, giving up",
382 instance.name, MAXTRIES)
385 logging.info("Restarting %s%s",
388 self.started_instances.add(instance.name)
390 logging.exception("Error while restarting instance %s",
393 notepad.RecordRestartAttempt(instance)
394 elif instance.state in HELPLESS_STATES:
395 if notepad.NumberOfRestartAttempts(instance):
396 notepad.RemoveInstance(instance)
398 if notepad.NumberOfRestartAttempts(instance):
399 notepad.RemoveInstance(instance)
400 logging.info("Restart of %s succeeded", instance.name)
404 """Run gnt-cluster verify-disks.
407 op = opcodes.OpVerifyDisks()
408 job_id = client.SubmitJob([op])
409 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
410 client.ArchiveJob(job_id)
411 if not isinstance(result, (tuple, list)):
412 logging.error("Can't get a valid result from verify-disks")
414 offline_disk_instances = result[2]
415 if not offline_disk_instances:
418 logging.debug("Will activate disks for instances %s",
419 ", ".join(offline_disk_instances))
420 # we submit only one job, and wait for it. not optimal, but spams
422 job = [opcodes.OpActivateInstanceDisks(instance_name=name)
423 for name in offline_disk_instances]
424 job_id = cli.SendJob(job, cl=client)
426 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
430 """Parse the command line options.
432 @return: (options, args) as from OptionParser.parse_args()
435 parser = OptionParser(description="Ganeti cluster watcher",
437 version="%%prog (ganeti) %s" %
438 constants.RELEASE_VERSION)
440 parser.add_option("-d", "--debug", dest="debug",
441 help="Write all messages to stderr",
442 default=False, action="store_true")
443 parser.add_option("-A", "--job-age", dest="job_age",
444 help="Autoarchive jobs older than this age (default"
445 " 6 hours)", default=6*3600)
446 options, args = parser.parse_args()
447 options.job_age = cli.ParseTimespec(options.job_age)
457 options, args = ParseOptions()
459 utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
460 stderr_logging=options.debug)
464 notepad = WatcherState()
467 client = cli.GetClient()
468 except errors.OpPrereqError:
469 # this is, from cli.GetClient, a not-master case
470 logging.debug("Not on master, exiting")
471 sys.exit(constants.EXIT_SUCCESS)
472 except luxi.NoMasterError, err:
473 logging.warning("Master seems to be down (%s), trying to restart",
475 if not StartMaster():
476 logging.critical("Can't start the master, exiting")
478 sys.exit(constants.EXIT_FAILURE)
479 # else retry the connection
480 client = cli.GetClient()
483 watcher = Watcher(options, notepad)
484 except errors.ConfigurationError:
485 # Just exit if there's no configuration
486 sys.exit(constants.EXIT_SUCCESS)
493 logging.debug("Not updating status file due to failure")
496 except NotMasterError:
497 logging.debug("Not master, exiting")
498 sys.exit(constants.EXIT_NOTMASTER)
499 except errors.ResolverError, err:
500 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
501 sys.exit(constants.EXIT_NODESETUP_ERROR)
502 except Exception, err:
503 logging.error(str(err), exc_info=True)
504 sys.exit(constants.EXIT_FAILURE)
507 if __name__ == '__main__':