4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erronously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
34 from optparse import OptionParser
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import serializer
39 from ganeti import errors
40 from ganeti import opcodes
41 from ganeti import cli
42 from ganeti import luxi
46 BAD_STATES = ['ERROR_down']
47 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
50 KEY_RESTART_COUNT = "restart_count"
51 KEY_RESTART_WHEN = "restart_when"
52 KEY_BOOT_ID = "bootid"
55 # Global client object
59 class NotMasterError(errors.GenericError):
60 """Exception raised when this host is not the master."""
63 def Indent(s, prefix='| '):
64 """Indent a piece of text with a given prefix before each line.
66 @param s: the string to indent
67 @param prefix: the string to prepend each line
70 return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
74 """Try to start the master daemon.
77 result = utils.RunCmd(['ganeti-masterd'])
79 logging.error("Can't start the master daemon: output '%s'", result.output)
80 return not result.failed
83 class WatcherState(object):
84 """Interface to a state file recording restart attempts.
88 """Open, lock, read and parse the file.
90 Raises exception on lock contention.
93 # The two-step dance below is necessary to allow both opening existing
94 # file read/write and creating if not existing. Vanilla open will truncate
95 # an existing file -or- allow creating if not existing.
96 fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
97 self.statefile = os.fdopen(fd, 'w+')
99 utils.LockFile(self.statefile.fileno())
102 state_data = self.statefile.read()
106 self._data = serializer.Load(state_data)
107 except Exception, msg:
108 # Ignore errors while loading the file and treat it as empty
110 logging.warning(("Invalid state file. Using defaults."
111 " Error message: %s"), msg)
113 if "instance" not in self._data:
114 self._data["instance"] = {}
115 if "node" not in self._data:
116 self._data["node"] = {}
118 self._orig_data = serializer.Dump(self._data)
121 """Save state to file, then unlock and close it.
124 assert self.statefile
126 serialized_form = serializer.Dump(self._data)
127 if self._orig_data == serialized_form:
128 logging.debug("Data didn't change, just touching status file")
129 os.utime(constants.WATCHER_STATEFILE, None)
132 # We need to make sure the file is locked before renaming it, otherwise
133 # starting ganeti-watcher again at the same time will create a conflict.
134 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
135 data=serialized_form,
136 prewrite=utils.LockFile, close=False)
137 self.statefile = os.fdopen(fd, 'w+')
140 """Unlock configuration file and close it.
143 assert self.statefile
145 # Files are automatically unlocked when closing them
146 self.statefile.close()
147 self.statefile = None
149 def GetNodeBootID(self, name):
150 """Returns the last boot ID of a node or None.
153 ndata = self._data["node"]
155 if name in ndata and KEY_BOOT_ID in ndata[name]:
156 return ndata[name][KEY_BOOT_ID]
159 def SetNodeBootID(self, name, bootid):
160 """Sets the boot ID of a node.
165 ndata = self._data["node"]
167 if name not in ndata:
170 ndata[name][KEY_BOOT_ID] = bootid
172 def NumberOfRestartAttempts(self, instance):
173 """Returns number of previous restart attempts.
175 @type instance: L{Instance}
176 @param instance: the instance to look up
179 idata = self._data["instance"]
181 if instance.name in idata:
182 return idata[instance.name][KEY_RESTART_COUNT]
186 def RecordRestartAttempt(self, instance):
187 """Record a restart attempt.
189 @type instance: L{Instance}
190 @param instance: the instance being restarted
193 idata = self._data["instance"]
195 if instance.name not in idata:
196 inst = idata[instance.name] = {}
198 inst = idata[instance.name]
200 inst[KEY_RESTART_WHEN] = time.time()
201 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
203 def RemoveInstance(self, instance):
204 """Update state to reflect that a machine is running.
206 This method removes the record for a named instance (as we only
207 track down instances).
209 @type instance: L{Instance}
210 @param instance: the instance to remove from books
213 idata = self._data["instance"]
215 if instance.name in idata:
216 del idata[instance.name]
219 class Instance(object):
220 """Abstraction for a Virtual Machine instance.
223 def __init__(self, name, state, autostart):
226 self.autostart = autostart
229 """Encapsulates the start of an instance.
232 op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
233 cli.SubmitOpCode(op, cl=client)
235 def ActivateDisks(self):
236 """Encapsulates the activation of all disks of an instance.
239 op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
240 cli.SubmitOpCode(op, cl=client)
243 def GetClusterData():
244 """Get a list of instances on this cluster.
247 op1_fields = ["name", "status", "admin_state", "snodes"]
248 op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
250 op2_fields = ["name", "bootid", "offline"]
251 op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
254 job_id = client.SubmitJob([op1, op2])
256 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
258 result = all_results[0]
262 for fields in result:
263 (name, status, autostart, snodes) = fields
265 # update the secondary node map
269 smap[node].append(name)
271 instances[name] = Instance(name, status, autostart)
273 nodes = dict([(name, (bootid, offline))
274 for name, bootid, offline in all_results[1]])
276 client.ArchiveJob(job_id)
278 return instances, nodes, smap
281 class Watcher(object):
282 """Encapsulate the logic for restarting erronously halted virtual machines.
284 The calling program should periodically instantiate me and call Run().
285 This will traverse the list of instances, and make up to MAXTRIES attempts
286 to restart machines that are down.
289 def __init__(self, opts, notepad):
290 self.notepad = notepad
291 master = client.QueryConfigValues(["master_node"])[0]
292 if master != utils.HostInfo().name:
293 raise NotMasterError("This is not the master node")
294 self.instances, self.bootids, self.smap = GetClusterData()
295 self.started_instances = set()
299 """Watcher run sequence.
302 notepad = self.notepad
303 self.ArchiveJobs(self.opts.job_age)
304 self.CheckInstances(notepad)
305 self.CheckDisks(notepad)
308 def ArchiveJobs(self, age):
312 arch_count, left_count = client.AutoArchiveJobs(age)
313 logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
315 def CheckDisks(self, notepad):
316 """Check all nodes for restarted ones.
320 for name, (new_id, offline) in self.bootids.iteritems():
321 old = notepad.GetNodeBootID(name)
323 # Bad node, not returning a boot id
325 logging.debug("Node %s missing boot id, skipping secondary checks",
329 # Node's boot ID has changed, proably through a reboot.
330 check_nodes.append(name)
333 # Activate disks for all instances with any of the checked nodes as a
335 for node in check_nodes:
336 if node not in self.smap:
338 for instance_name in self.smap[node]:
339 instance = self.instances[instance_name]
340 if not instance.autostart:
341 logging.info(("Skipping disk activation for non-autostart"
342 " instance %s"), instance.name)
344 if instance.name in self.started_instances:
345 # we already tried to start the instance, which should have
346 # activated its drives (if they can be at all)
349 logging.info("Activating disks for instance %s", instance.name)
350 instance.ActivateDisks()
352 logging.exception("Error while activating disks for instance %s",
355 # Keep changed boot IDs
356 for name in check_nodes:
357 notepad.SetNodeBootID(name, self.bootids[name][0])
359 def CheckInstances(self, notepad):
360 """Make a pass over the list of instances, restarting downed ones.
363 for instance in self.instances.values():
364 if instance.state in BAD_STATES:
365 n = notepad.NumberOfRestartAttempts(instance)
371 last = " (Attempt #%d)" % (n + 1)
373 notepad.RecordRestartAttempt(instance)
374 logging.error("Could not restart %s after %d attempts, giving up",
375 instance.name, MAXTRIES)
378 logging.info("Restarting %s%s",
381 self.started_instances.add(instance.name)
383 logging.exception("Error while restarting instance %s",
386 notepad.RecordRestartAttempt(instance)
387 elif instance.state in HELPLESS_STATES:
388 if notepad.NumberOfRestartAttempts(instance):
389 notepad.RemoveInstance(instance)
391 if notepad.NumberOfRestartAttempts(instance):
392 notepad.RemoveInstance(instance)
393 logging.info("Restart of %s succeeded", instance.name)
397 """Run gnt-cluster verify-disks.
400 op = opcodes.OpVerifyDisks()
401 job_id = client.SubmitJob([op])
402 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
403 client.ArchiveJob(job_id)
404 if not isinstance(result, (tuple, list)):
405 logging.error("Can't get a valid result from verify-disks")
407 offline_disk_instances = result[2]
408 if not offline_disk_instances:
411 logging.debug("Will activate disks for instances %s",
412 ", ".join(offline_disk_instances))
413 # we submit only one job, and wait for it. not optimal, but spams
415 job = [opcodes.OpActivateInstanceDisks(instance_name=name)
416 for name in offline_disk_instances]
417 job_id = cli.SendJob(job, cl=client)
419 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
423 """Parse the command line options.
425 @return: (options, args) as from OptionParser.parse_args()
428 parser = OptionParser(description="Ganeti cluster watcher",
430 version="%%prog (ganeti) %s" %
431 constants.RELEASE_VERSION)
433 parser.add_option("-d", "--debug", dest="debug",
434 help="Write all messages to stderr",
435 default=False, action="store_true")
436 parser.add_option("-A", "--job-age", dest="job_age",
437 help="Autoarchive jobs older than this age (default"
438 " 6 hours)", default=6*3600)
439 options, args = parser.parse_args()
440 options.job_age = cli.ParseTimespec(options.job_age)
450 options, args = ParseOptions()
452 utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
453 stderr_logging=options.debug)
457 notepad = WatcherState()
460 client = cli.GetClient()
461 except errors.OpPrereqError:
462 # this is, from cli.GetClient, a not-master case
463 logging.debug("Not on master, exiting")
464 sys.exit(constants.EXIT_SUCCESS)
465 except luxi.NoMasterError, err:
466 logging.warning("Master seems to be down (%s), trying to restart",
468 if not StartMaster():
469 logging.critical("Can't start the master, exiting")
471 sys.exit(constants.EXIT_FAILURE)
472 # else retry the connection
473 client = cli.GetClient()
476 watcher = Watcher(options, notepad)
477 except errors.ConfigurationError:
478 # Just exit if there's no configuration
479 sys.exit(constants.EXIT_SUCCESS)
486 logging.debug("Not updating status file due to failure")
489 except NotMasterError:
490 logging.debug("Not master, exiting")
491 sys.exit(constants.EXIT_NOTMASTER)
492 except errors.ResolverError, err:
493 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
494 sys.exit(constants.EXIT_NODESETUP_ERROR)
495 except Exception, err:
496 logging.error(str(err), exc_info=True)
497 sys.exit(constants.EXIT_FAILURE)
500 if __name__ == '__main__':