4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erronously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
34 from optparse import OptionParser
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import serializer
39 from ganeti import errors
40 from ganeti import opcodes
41 from ganeti import cli
42 from ganeti import luxi
46 BAD_STATES = ['ERROR_down']
47 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
50 KEY_RESTART_COUNT = "restart_count"
51 KEY_RESTART_WHEN = "restart_when"
52 KEY_BOOT_ID = "bootid"
55 # Global client object
59 class NotMasterError(errors.GenericError):
60 """Exception raised when this host is not the master."""
63 def Indent(s, prefix='| '):
64 """Indent a piece of text with a given prefix before each line.
66 @param s: the string to indent
67 @param prefix: the string to prepend each line
70 return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
74 """Try to start the master daemon.
77 result = utils.RunCmd(['ganeti-masterd'])
79 logging.error("Can't start the master daemon: output '%s'", result.output)
80 return not result.failed
83 class WatcherState(object):
84 """Interface to a state file recording restart attempts.
88 """Open, lock, read and parse the file.
90 Raises exception on lock contention.
93 # The two-step dance below is necessary to allow both opening existing
94 # file read/write and creating if not existing. Vanilla open will truncate
95 # an existing file -or- allow creating if not existing.
96 fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
97 self.statefile = os.fdopen(fd, 'w+')
99 utils.LockFile(self.statefile.fileno())
102 state_data = self.statefile.read()
106 self._data = serializer.Load(state_data)
107 except Exception, msg:
108 # Ignore errors while loading the file and treat it as empty
110 logging.warning(("Invalid state file. Using defaults."
111 " Error message: %s"), msg)
113 if "instance" not in self._data:
114 self._data["instance"] = {}
115 if "node" not in self._data:
116 self._data["node"] = {}
118 self._orig_data = serializer.Dump(self._data)
121 """Save state to file, then unlock and close it.
124 assert self.statefile
126 serialized_form = serializer.Dump(self._data)
127 if self._orig_data == serialized_form:
128 logging.debug("Data didn't change, just touching status file")
129 os.utime(constants.WATCHER_STATEFILE, None)
132 # We need to make sure the file is locked before renaming it, otherwise
133 # starting ganeti-watcher again at the same time will create a conflict.
134 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
135 data=serialized_form,
136 prewrite=utils.LockFile, close=False)
137 self.statefile = os.fdopen(fd, 'w+')
140 """Unlock configuration file and close it.
143 assert self.statefile
145 # Files are automatically unlocked when closing them
146 self.statefile.close()
147 self.statefile = None
149 def GetNodeBootID(self, name):
150 """Returns the last boot ID of a node or None.
153 ndata = self._data["node"]
155 if name in ndata and KEY_BOOT_ID in ndata[name]:
156 return ndata[name][KEY_BOOT_ID]
159 def SetNodeBootID(self, name, bootid):
160 """Sets the boot ID of a node.
165 ndata = self._data["node"]
167 if name not in ndata:
170 ndata[name][KEY_BOOT_ID] = bootid
172 def NumberOfRestartAttempts(self, instance):
173 """Returns number of previous restart attempts.
175 @type instance: L{Instance}
176 @param instance: the instance to look up
179 idata = self._data["instance"]
181 if instance.name in idata:
182 return idata[instance.name][KEY_RESTART_COUNT]
186 def RecordRestartAttempt(self, instance):
187 """Record a restart attempt.
189 @type instance: L{Instance}
190 @param instance: the instance being restarted
193 idata = self._data["instance"]
195 if instance.name not in idata:
196 inst = idata[instance.name] = {}
198 inst = idata[instance.name]
200 inst[KEY_RESTART_WHEN] = time.time()
201 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
203 def RemoveInstance(self, instance):
204 """Update state to reflect that a machine is running.
206 This method removes the record for a named instance (as we only
207 track down instances).
209 @type instance: L{Instance}
210 @param instance: the instance to remove from books
213 idata = self._data["instance"]
215 if instance.name in idata:
216 del idata[instance.name]
219 class Instance(object):
220 """Abstraction for a Virtual Machine instance.
223 def __init__(self, name, state, autostart):
226 self.autostart = autostart
229 """Encapsulates the start of an instance.
232 op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
233 cli.SubmitOpCode(op, cl=client)
235 def ActivateDisks(self):
236 """Encapsulates the activation of all disks of an instance.
239 op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
240 cli.SubmitOpCode(op, cl=client)
243 def GetClusterData():
244 """Get a list of instances on this cluster.
247 op1_fields = ["name", "status", "admin_state", "snodes"]
248 op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
250 op2_fields = ["name", "bootid", "offline"]
251 op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
254 job_id = client.SubmitJob([op1, op2])
256 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
258 logging.debug("Got data from cluster, writing instance status file")
260 result = all_results[0]
266 up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
267 utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
269 for fields in result:
270 (name, status, autostart, snodes) = fields
272 # update the secondary node map
276 smap[node].append(name)
278 instances[name] = Instance(name, status, autostart)
280 nodes = dict([(name, (bootid, offline))
281 for name, bootid, offline in all_results[1]])
283 client.ArchiveJob(job_id)
285 return instances, nodes, smap
288 class Watcher(object):
289 """Encapsulate the logic for restarting erronously halted virtual machines.
291 The calling program should periodically instantiate me and call Run().
292 This will traverse the list of instances, and make up to MAXTRIES attempts
293 to restart machines that are down.
296 def __init__(self, opts, notepad):
297 self.notepad = notepad
298 master = client.QueryConfigValues(["master_node"])[0]
299 if master != utils.HostInfo().name:
300 raise NotMasterError("This is not the master node")
301 # first archive old jobs
302 self.ArchiveJobs(opts.job_age)
303 # and only then submit new ones
304 self.instances, self.bootids, self.smap = GetClusterData()
305 self.started_instances = set()
309 """Watcher run sequence.
312 notepad = self.notepad
313 self.CheckInstances(notepad)
314 self.CheckDisks(notepad)
318 def ArchiveJobs(age):
322 arch_count, left_count = client.AutoArchiveJobs(age)
323 logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
325 def CheckDisks(self, notepad):
326 """Check all nodes for restarted ones.
330 for name, (new_id, offline) in self.bootids.iteritems():
331 old = notepad.GetNodeBootID(name)
333 # Bad node, not returning a boot id
335 logging.debug("Node %s missing boot id, skipping secondary checks",
339 # Node's boot ID has changed, proably through a reboot.
340 check_nodes.append(name)
343 # Activate disks for all instances with any of the checked nodes as a
345 for node in check_nodes:
346 if node not in self.smap:
348 for instance_name in self.smap[node]:
349 instance = self.instances[instance_name]
350 if not instance.autostart:
351 logging.info(("Skipping disk activation for non-autostart"
352 " instance %s"), instance.name)
354 if instance.name in self.started_instances:
355 # we already tried to start the instance, which should have
356 # activated its drives (if they can be at all)
359 logging.info("Activating disks for instance %s", instance.name)
360 instance.ActivateDisks()
362 logging.exception("Error while activating disks for instance %s",
365 # Keep changed boot IDs
366 for name in check_nodes:
367 notepad.SetNodeBootID(name, self.bootids[name][0])
369 def CheckInstances(self, notepad):
370 """Make a pass over the list of instances, restarting downed ones.
373 for instance in self.instances.values():
374 if instance.state in BAD_STATES:
375 n = notepad.NumberOfRestartAttempts(instance)
381 last = " (Attempt #%d)" % (n + 1)
383 notepad.RecordRestartAttempt(instance)
384 logging.error("Could not restart %s after %d attempts, giving up",
385 instance.name, MAXTRIES)
388 logging.info("Restarting %s%s",
391 self.started_instances.add(instance.name)
393 logging.exception("Error while restarting instance %s",
396 notepad.RecordRestartAttempt(instance)
397 elif instance.state in HELPLESS_STATES:
398 if notepad.NumberOfRestartAttempts(instance):
399 notepad.RemoveInstance(instance)
401 if notepad.NumberOfRestartAttempts(instance):
402 notepad.RemoveInstance(instance)
403 logging.info("Restart of %s succeeded", instance.name)
407 """Run gnt-cluster verify-disks.
410 op = opcodes.OpVerifyDisks()
411 job_id = client.SubmitJob([op])
412 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
413 client.ArchiveJob(job_id)
414 if not isinstance(result, (tuple, list)):
415 logging.error("Can't get a valid result from verify-disks")
417 offline_disk_instances = result[2]
418 if not offline_disk_instances:
421 logging.debug("Will activate disks for instances %s",
422 ", ".join(offline_disk_instances))
423 # we submit only one job, and wait for it. not optimal, but spams
425 job = [opcodes.OpActivateInstanceDisks(instance_name=name)
426 for name in offline_disk_instances]
427 job_id = cli.SendJob(job, cl=client)
429 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
433 """Parse the command line options.
435 @return: (options, args) as from OptionParser.parse_args()
438 parser = OptionParser(description="Ganeti cluster watcher",
440 version="%%prog (ganeti) %s" %
441 constants.RELEASE_VERSION)
443 parser.add_option("-d", "--debug", dest="debug",
444 help="Write all messages to stderr",
445 default=False, action="store_true")
446 parser.add_option("-A", "--job-age", dest="job_age",
447 help="Autoarchive jobs older than this age (default"
448 " 6 hours)", default=6*3600)
449 options, args = parser.parse_args()
450 options.job_age = cli.ParseTimespec(options.job_age)
460 options, args = ParseOptions()
462 utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
463 stderr_logging=options.debug)
467 notepad = WatcherState()
470 client = cli.GetClient()
471 except errors.OpPrereqError:
472 # this is, from cli.GetClient, a not-master case
473 logging.debug("Not on master, exiting")
475 sys.exit(constants.EXIT_SUCCESS)
476 except luxi.NoMasterError, err:
477 logging.warning("Master seems to be down (%s), trying to restart",
479 if not StartMaster():
480 logging.critical("Can't start the master, exiting")
481 sys.exit(constants.EXIT_FAILURE)
482 # else retry the connection
483 client = cli.GetClient()
486 watcher = Watcher(options, notepad)
487 except errors.ConfigurationError:
488 # Just exit if there's no configuration
490 sys.exit(constants.EXIT_SUCCESS)
499 logging.debug("Not updating status file due to failure")
502 except NotMasterError:
503 logging.debug("Not master, exiting")
504 sys.exit(constants.EXIT_NOTMASTER)
505 except errors.ResolverError, err:
506 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
507 sys.exit(constants.EXIT_NODESETUP_ERROR)
508 except errors.JobQueueFull:
509 logging.error("Job queue is full, can't query cluster state")
510 except errors.JobQueueDrainError:
511 logging.error("Job queue is drained, can't maintain cluster state")
512 except Exception, err:
513 logging.error(str(err), exc_info=True)
514 sys.exit(constants.EXIT_FAILURE)
517 if __name__ == '__main__':