4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erronously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
35 from optparse import OptionParser
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import serializer
40 from ganeti import errors
41 from ganeti import opcodes
42 from ganeti import cli
43 from ganeti import luxi
47 BAD_STATES = ['ERROR_down']
48 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
51 KEY_RESTART_COUNT = "restart_count"
52 KEY_RESTART_WHEN = "restart_when"
53 KEY_BOOT_ID = "bootid"
56 # Global client object
60 class NotMasterError(errors.GenericError):
61 """Exception raised when this host is not the master."""
64 def Indent(s, prefix='| '):
65 """Indent a piece of text with a given prefix before each line.
67 @param s: the string to indent
68 @param prefix: the string to prepend each line
71 return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
75 """Check whether we should pause.
78 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
82 """Try to start the master daemon.
85 result = utils.RunCmd(['ganeti-masterd'])
87 logging.error("Can't start the master daemon: output '%s'", result.output)
88 return not result.failed
91 def EnsureDaemon(daemon):
92 """Check for and start daemon if not alive.
95 pidfile = utils.DaemonPidFileName(daemon)
96 pid = utils.ReadPidFile(pidfile)
97 if pid == 0 or not utils.IsProcessAlive(pid): # no file or dead pid
98 logging.debug("Daemon '%s' not alive, trying to restart", daemon)
99 result = utils.RunCmd([daemon])
101 logging.error("Can't start daemon '%s', failure %s, output: %s",
102 daemon, result.fail_reason, result.output)
105 class WatcherState(object):
106 """Interface to a state file recording restart attempts.
110 """Open, lock, read and parse the file.
112 Raises exception on lock contention.
115 # The two-step dance below is necessary to allow both opening existing
116 # file read/write and creating if not existing. Vanilla open will truncate
117 # an existing file -or- allow creating if not existing.
118 fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
119 self.statefile = os.fdopen(fd, 'w+')
121 utils.LockFile(self.statefile.fileno())
124 state_data = self.statefile.read()
128 self._data = serializer.Load(state_data)
129 except Exception, msg:
130 # Ignore errors while loading the file and treat it as empty
132 logging.warning(("Invalid state file. Using defaults."
133 " Error message: %s"), msg)
135 if "instance" not in self._data:
136 self._data["instance"] = {}
137 if "node" not in self._data:
138 self._data["node"] = {}
140 self._orig_data = serializer.Dump(self._data)
143 """Save state to file, then unlock and close it.
146 assert self.statefile
148 serialized_form = serializer.Dump(self._data)
149 if self._orig_data == serialized_form:
150 logging.debug("Data didn't change, just touching status file")
151 os.utime(constants.WATCHER_STATEFILE, None)
154 # We need to make sure the file is locked before renaming it, otherwise
155 # starting ganeti-watcher again at the same time will create a conflict.
156 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
157 data=serialized_form,
158 prewrite=utils.LockFile, close=False)
159 self.statefile = os.fdopen(fd, 'w+')
162 """Unlock configuration file and close it.
165 assert self.statefile
167 # Files are automatically unlocked when closing them
168 self.statefile.close()
169 self.statefile = None
171 def GetNodeBootID(self, name):
172 """Returns the last boot ID of a node or None.
175 ndata = self._data["node"]
177 if name in ndata and KEY_BOOT_ID in ndata[name]:
178 return ndata[name][KEY_BOOT_ID]
181 def SetNodeBootID(self, name, bootid):
182 """Sets the boot ID of a node.
187 ndata = self._data["node"]
189 if name not in ndata:
192 ndata[name][KEY_BOOT_ID] = bootid
194 def NumberOfRestartAttempts(self, instance):
195 """Returns number of previous restart attempts.
197 @type instance: L{Instance}
198 @param instance: the instance to look up
201 idata = self._data["instance"]
203 if instance.name in idata:
204 return idata[instance.name][KEY_RESTART_COUNT]
208 def RecordRestartAttempt(self, instance):
209 """Record a restart attempt.
211 @type instance: L{Instance}
212 @param instance: the instance being restarted
215 idata = self._data["instance"]
217 if instance.name not in idata:
218 inst = idata[instance.name] = {}
220 inst = idata[instance.name]
222 inst[KEY_RESTART_WHEN] = time.time()
223 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
225 def RemoveInstance(self, instance):
226 """Update state to reflect that a machine is running.
228 This method removes the record for a named instance (as we only
229 track down instances).
231 @type instance: L{Instance}
232 @param instance: the instance to remove from books
235 idata = self._data["instance"]
237 if instance.name in idata:
238 del idata[instance.name]
241 class Instance(object):
242 """Abstraction for a Virtual Machine instance.
245 def __init__(self, name, state, autostart):
248 self.autostart = autostart
251 """Encapsulates the start of an instance.
254 op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
255 cli.SubmitOpCode(op, cl=client)
257 def ActivateDisks(self):
258 """Encapsulates the activation of all disks of an instance.
261 op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
262 cli.SubmitOpCode(op, cl=client)
265 def GetClusterData():
266 """Get a list of instances on this cluster.
269 op1_fields = ["name", "status", "admin_state", "snodes"]
270 op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
272 op2_fields = ["name", "bootid", "offline"]
273 op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
276 job_id = client.SubmitJob([op1, op2])
278 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
280 logging.debug("Got data from cluster, writing instance status file")
282 result = all_results[0]
288 up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
289 utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
291 for fields in result:
292 (name, status, autostart, snodes) = fields
294 # update the secondary node map
298 smap[node].append(name)
300 instances[name] = Instance(name, status, autostart)
302 nodes = dict([(name, (bootid, offline))
303 for name, bootid, offline in all_results[1]])
305 client.ArchiveJob(job_id)
307 return instances, nodes, smap
310 class Watcher(object):
311 """Encapsulate the logic for restarting erronously halted virtual machines.
313 The calling program should periodically instantiate me and call Run().
314 This will traverse the list of instances, and make up to MAXTRIES attempts
315 to restart machines that are down.
318 def __init__(self, opts, notepad):
319 self.notepad = notepad
320 master = client.QueryConfigValues(["master_node"])[0]
321 if master != utils.HostInfo().name:
322 raise NotMasterError("This is not the master node")
323 # first archive old jobs
324 self.ArchiveJobs(opts.job_age)
325 # and only then submit new ones
326 self.instances, self.bootids, self.smap = GetClusterData()
327 self.started_instances = set()
331 """Watcher run sequence.
334 notepad = self.notepad
335 self.CheckInstances(notepad)
336 self.CheckDisks(notepad)
340 def ArchiveJobs(age):
344 arch_count, left_count = client.AutoArchiveJobs(age)
345 logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
347 def CheckDisks(self, notepad):
348 """Check all nodes for restarted ones.
352 for name, (new_id, offline) in self.bootids.iteritems():
353 old = notepad.GetNodeBootID(name)
355 # Bad node, not returning a boot id
357 logging.debug("Node %s missing boot id, skipping secondary checks",
361 # Node's boot ID has changed, proably through a reboot.
362 check_nodes.append(name)
365 # Activate disks for all instances with any of the checked nodes as a
367 for node in check_nodes:
368 if node not in self.smap:
370 for instance_name in self.smap[node]:
371 instance = self.instances[instance_name]
372 if not instance.autostart:
373 logging.info(("Skipping disk activation for non-autostart"
374 " instance %s"), instance.name)
376 if instance.name in self.started_instances:
377 # we already tried to start the instance, which should have
378 # activated its drives (if they can be at all)
381 logging.info("Activating disks for instance %s", instance.name)
382 instance.ActivateDisks()
384 logging.exception("Error while activating disks for instance %s",
387 # Keep changed boot IDs
388 for name in check_nodes:
389 notepad.SetNodeBootID(name, self.bootids[name][0])
391 def CheckInstances(self, notepad):
392 """Make a pass over the list of instances, restarting downed ones.
395 for instance in self.instances.values():
396 if instance.state in BAD_STATES:
397 n = notepad.NumberOfRestartAttempts(instance)
403 last = " (Attempt #%d)" % (n + 1)
405 notepad.RecordRestartAttempt(instance)
406 logging.error("Could not restart %s after %d attempts, giving up",
407 instance.name, MAXTRIES)
410 logging.info("Restarting %s%s",
413 self.started_instances.add(instance.name)
415 logging.exception("Error while restarting instance %s",
418 notepad.RecordRestartAttempt(instance)
419 elif instance.state in HELPLESS_STATES:
420 if notepad.NumberOfRestartAttempts(instance):
421 notepad.RemoveInstance(instance)
423 if notepad.NumberOfRestartAttempts(instance):
424 notepad.RemoveInstance(instance)
425 logging.info("Restart of %s succeeded", instance.name)
429 """Run gnt-cluster verify-disks.
432 op = opcodes.OpVerifyDisks()
433 job_id = client.SubmitJob([op])
434 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
435 client.ArchiveJob(job_id)
436 if not isinstance(result, (tuple, list)):
437 logging.error("Can't get a valid result from verify-disks")
439 offline_disk_instances = result[2]
440 if not offline_disk_instances:
443 logging.debug("Will activate disks for instances %s",
444 ", ".join(offline_disk_instances))
445 # we submit only one job, and wait for it. not optimal, but spams
447 job = [opcodes.OpActivateInstanceDisks(instance_name=name)
448 for name in offline_disk_instances]
449 job_id = cli.SendJob(job, cl=client)
451 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
455 """Parse the command line options.
457 @return: (options, args) as from OptionParser.parse_args()
460 parser = OptionParser(description="Ganeti cluster watcher",
462 version="%%prog (ganeti) %s" %
463 constants.RELEASE_VERSION)
465 parser.add_option("-d", "--debug", dest="debug",
466 help="Write all messages to stderr",
467 default=False, action="store_true")
468 parser.add_option("-A", "--job-age", dest="job_age",
469 help="Autoarchive jobs older than this age (default"
470 " 6 hours)", default=6*3600)
471 options, args = parser.parse_args()
472 options.job_age = cli.ParseTimespec(options.job_age)
482 options, args = ParseOptions()
484 utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
485 stderr_logging=options.debug)
488 logging.debug("Pause has been set, exiting")
489 sys.exit(constants.EXIT_SUCCESS)
493 # on master or not, try to start the node dameon
494 EnsureDaemon(constants.NODED)
496 notepad = WatcherState()
499 client = cli.GetClient()
500 except errors.OpPrereqError:
501 # this is, from cli.GetClient, a not-master case
502 logging.debug("Not on master, exiting")
504 sys.exit(constants.EXIT_SUCCESS)
505 except luxi.NoMasterError, err:
506 logging.warning("Master seems to be down (%s), trying to restart",
508 if not StartMaster():
509 logging.critical("Can't start the master, exiting")
510 sys.exit(constants.EXIT_FAILURE)
511 # else retry the connection
512 client = cli.GetClient()
514 # we are on master now
515 EnsureDaemon(constants.RAPI)
518 watcher = Watcher(options, notepad)
519 except errors.ConfigurationError:
520 # Just exit if there's no configuration
522 sys.exit(constants.EXIT_SUCCESS)
531 logging.debug("Not updating status file due to failure")
534 except NotMasterError:
535 logging.debug("Not master, exiting")
536 sys.exit(constants.EXIT_NOTMASTER)
537 except errors.ResolverError, err:
538 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
539 sys.exit(constants.EXIT_NODESETUP_ERROR)
540 except errors.JobQueueFull:
541 logging.error("Job queue is full, can't query cluster state")
542 except errors.JobQueueDrainError:
543 logging.error("Job queue is drained, can't maintain cluster state")
544 except Exception, err:
545 logging.error(str(err), exc_info=True)
546 sys.exit(constants.EXIT_FAILURE)
549 if __name__ == '__main__':