4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erronously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
34 from optparse import OptionParser
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import serializer
39 from ganeti import errors
40 from ganeti import opcodes
41 from ganeti import cli
42 from ganeti import luxi
46 BAD_STATES = ['ERROR_down']
47 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
50 KEY_RESTART_COUNT = "restart_count"
51 KEY_RESTART_WHEN = "restart_when"
52 KEY_BOOT_ID = "bootid"
55 # Global client object
59 class NotMasterError(errors.GenericError):
60 """Exception raised when this host is not the master."""
63 def Indent(s, prefix='| '):
64 """Indent a piece of text with a given prefix before each line.
66 @param s: the string to indent
67 @param prefix: the string to prepend each line
70 return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
74 """Try to start the master daemon.
77 result = utils.RunCmd(['ganeti-masterd'])
79 logging.error("Can't start the master daemon: output '%s'", result.output)
80 return not result.failed
83 def EnsureDaemon(daemon):
84 """Check for and start daemon if not alive.
87 pidfile = utils.DaemonPidFileName(daemon)
88 pid = utils.ReadPidFile(pidfile)
89 if pid == 0 or not utils.IsProcessAlive(pid): # no file or dead pid
90 logging.debug("Daemon '%s' not alive, trying to restart", daemon)
91 result = utils.RunCmd([daemon])
93 logging.error("Can't start daemon '%s', failure %s, output: %s",
94 daemon, result.fail_reason, result.output)
97 class WatcherState(object):
98 """Interface to a state file recording restart attempts.
102 """Open, lock, read and parse the file.
104 Raises exception on lock contention.
107 # The two-step dance below is necessary to allow both opening existing
108 # file read/write and creating if not existing. Vanilla open will truncate
109 # an existing file -or- allow creating if not existing.
110 fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
111 self.statefile = os.fdopen(fd, 'w+')
113 utils.LockFile(self.statefile.fileno())
116 state_data = self.statefile.read()
120 self._data = serializer.Load(state_data)
121 except Exception, msg:
122 # Ignore errors while loading the file and treat it as empty
124 logging.warning(("Invalid state file. Using defaults."
125 " Error message: %s"), msg)
127 if "instance" not in self._data:
128 self._data["instance"] = {}
129 if "node" not in self._data:
130 self._data["node"] = {}
132 self._orig_data = serializer.Dump(self._data)
135 """Save state to file, then unlock and close it.
138 assert self.statefile
140 serialized_form = serializer.Dump(self._data)
141 if self._orig_data == serialized_form:
142 logging.debug("Data didn't change, just touching status file")
143 os.utime(constants.WATCHER_STATEFILE, None)
146 # We need to make sure the file is locked before renaming it, otherwise
147 # starting ganeti-watcher again at the same time will create a conflict.
148 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
149 data=serialized_form,
150 prewrite=utils.LockFile, close=False)
151 self.statefile = os.fdopen(fd, 'w+')
154 """Unlock configuration file and close it.
157 assert self.statefile
159 # Files are automatically unlocked when closing them
160 self.statefile.close()
161 self.statefile = None
163 def GetNodeBootID(self, name):
164 """Returns the last boot ID of a node or None.
167 ndata = self._data["node"]
169 if name in ndata and KEY_BOOT_ID in ndata[name]:
170 return ndata[name][KEY_BOOT_ID]
173 def SetNodeBootID(self, name, bootid):
174 """Sets the boot ID of a node.
179 ndata = self._data["node"]
181 if name not in ndata:
184 ndata[name][KEY_BOOT_ID] = bootid
186 def NumberOfRestartAttempts(self, instance):
187 """Returns number of previous restart attempts.
189 @type instance: L{Instance}
190 @param instance: the instance to look up
193 idata = self._data["instance"]
195 if instance.name in idata:
196 return idata[instance.name][KEY_RESTART_COUNT]
200 def RecordRestartAttempt(self, instance):
201 """Record a restart attempt.
203 @type instance: L{Instance}
204 @param instance: the instance being restarted
207 idata = self._data["instance"]
209 if instance.name not in idata:
210 inst = idata[instance.name] = {}
212 inst = idata[instance.name]
214 inst[KEY_RESTART_WHEN] = time.time()
215 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
217 def RemoveInstance(self, instance):
218 """Update state to reflect that a machine is running.
220 This method removes the record for a named instance (as we only
221 track down instances).
223 @type instance: L{Instance}
224 @param instance: the instance to remove from books
227 idata = self._data["instance"]
229 if instance.name in idata:
230 del idata[instance.name]
233 class Instance(object):
234 """Abstraction for a Virtual Machine instance.
237 def __init__(self, name, state, autostart):
240 self.autostart = autostart
243 """Encapsulates the start of an instance.
246 op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
247 cli.SubmitOpCode(op, cl=client)
249 def ActivateDisks(self):
250 """Encapsulates the activation of all disks of an instance.
253 op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
254 cli.SubmitOpCode(op, cl=client)
257 def GetClusterData():
258 """Get a list of instances on this cluster.
261 op1_fields = ["name", "status", "admin_state", "snodes"]
262 op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
264 op2_fields = ["name", "bootid", "offline"]
265 op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
268 job_id = client.SubmitJob([op1, op2])
270 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
272 logging.debug("Got data from cluster, writing instance status file")
274 result = all_results[0]
280 up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
281 utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
283 for fields in result:
284 (name, status, autostart, snodes) = fields
286 # update the secondary node map
290 smap[node].append(name)
292 instances[name] = Instance(name, status, autostart)
294 nodes = dict([(name, (bootid, offline))
295 for name, bootid, offline in all_results[1]])
297 client.ArchiveJob(job_id)
299 return instances, nodes, smap
302 class Watcher(object):
303 """Encapsulate the logic for restarting erronously halted virtual machines.
305 The calling program should periodically instantiate me and call Run().
306 This will traverse the list of instances, and make up to MAXTRIES attempts
307 to restart machines that are down.
310 def __init__(self, opts, notepad):
311 self.notepad = notepad
312 master = client.QueryConfigValues(["master_node"])[0]
313 if master != utils.HostInfo().name:
314 raise NotMasterError("This is not the master node")
315 # first archive old jobs
316 self.ArchiveJobs(opts.job_age)
317 # and only then submit new ones
318 self.instances, self.bootids, self.smap = GetClusterData()
319 self.started_instances = set()
323 """Watcher run sequence.
326 notepad = self.notepad
327 self.CheckInstances(notepad)
328 self.CheckDisks(notepad)
332 def ArchiveJobs(age):
336 arch_count, left_count = client.AutoArchiveJobs(age)
337 logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
339 def CheckDisks(self, notepad):
340 """Check all nodes for restarted ones.
344 for name, (new_id, offline) in self.bootids.iteritems():
345 old = notepad.GetNodeBootID(name)
347 # Bad node, not returning a boot id
349 logging.debug("Node %s missing boot id, skipping secondary checks",
353 # Node's boot ID has changed, proably through a reboot.
354 check_nodes.append(name)
357 # Activate disks for all instances with any of the checked nodes as a
359 for node in check_nodes:
360 if node not in self.smap:
362 for instance_name in self.smap[node]:
363 instance = self.instances[instance_name]
364 if not instance.autostart:
365 logging.info(("Skipping disk activation for non-autostart"
366 " instance %s"), instance.name)
368 if instance.name in self.started_instances:
369 # we already tried to start the instance, which should have
370 # activated its drives (if they can be at all)
373 logging.info("Activating disks for instance %s", instance.name)
374 instance.ActivateDisks()
376 logging.exception("Error while activating disks for instance %s",
379 # Keep changed boot IDs
380 for name in check_nodes:
381 notepad.SetNodeBootID(name, self.bootids[name][0])
383 def CheckInstances(self, notepad):
384 """Make a pass over the list of instances, restarting downed ones.
387 for instance in self.instances.values():
388 if instance.state in BAD_STATES:
389 n = notepad.NumberOfRestartAttempts(instance)
395 last = " (Attempt #%d)" % (n + 1)
397 notepad.RecordRestartAttempt(instance)
398 logging.error("Could not restart %s after %d attempts, giving up",
399 instance.name, MAXTRIES)
402 logging.info("Restarting %s%s",
405 self.started_instances.add(instance.name)
407 logging.exception("Error while restarting instance %s",
410 notepad.RecordRestartAttempt(instance)
411 elif instance.state in HELPLESS_STATES:
412 if notepad.NumberOfRestartAttempts(instance):
413 notepad.RemoveInstance(instance)
415 if notepad.NumberOfRestartAttempts(instance):
416 notepad.RemoveInstance(instance)
417 logging.info("Restart of %s succeeded", instance.name)
421 """Run gnt-cluster verify-disks.
424 op = opcodes.OpVerifyDisks()
425 job_id = client.SubmitJob([op])
426 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
427 client.ArchiveJob(job_id)
428 if not isinstance(result, (tuple, list)):
429 logging.error("Can't get a valid result from verify-disks")
431 offline_disk_instances = result[2]
432 if not offline_disk_instances:
435 logging.debug("Will activate disks for instances %s",
436 ", ".join(offline_disk_instances))
437 # we submit only one job, and wait for it. not optimal, but spams
439 job = [opcodes.OpActivateInstanceDisks(instance_name=name)
440 for name in offline_disk_instances]
441 job_id = cli.SendJob(job, cl=client)
443 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
447 """Parse the command line options.
449 @return: (options, args) as from OptionParser.parse_args()
452 parser = OptionParser(description="Ganeti cluster watcher",
454 version="%%prog (ganeti) %s" %
455 constants.RELEASE_VERSION)
457 parser.add_option("-d", "--debug", dest="debug",
458 help="Write all messages to stderr",
459 default=False, action="store_true")
460 parser.add_option("-A", "--job-age", dest="job_age",
461 help="Autoarchive jobs older than this age (default"
462 " 6 hours)", default=6*3600)
463 options, args = parser.parse_args()
464 options.job_age = cli.ParseTimespec(options.job_age)
474 options, args = ParseOptions()
476 utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
477 stderr_logging=options.debug)
481 # on master or not, try to start the node dameon (use _PID but is
482 # the same as daemon name)
483 EnsureDaemon(constants.NODED_PID)
485 notepad = WatcherState()
488 client = cli.GetClient()
489 except errors.OpPrereqError:
490 # this is, from cli.GetClient, a not-master case
491 logging.debug("Not on master, exiting")
493 sys.exit(constants.EXIT_SUCCESS)
494 except luxi.NoMasterError, err:
495 logging.warning("Master seems to be down (%s), trying to restart",
497 if not StartMaster():
498 logging.critical("Can't start the master, exiting")
499 sys.exit(constants.EXIT_FAILURE)
500 # else retry the connection
501 client = cli.GetClient()
503 # we are on master now (use _PID but is the same as daemon name)
504 EnsureDaemon(constants.RAPI_PID)
507 watcher = Watcher(options, notepad)
508 except errors.ConfigurationError:
509 # Just exit if there's no configuration
511 sys.exit(constants.EXIT_SUCCESS)
520 logging.debug("Not updating status file due to failure")
523 except NotMasterError:
524 logging.debug("Not master, exiting")
525 sys.exit(constants.EXIT_NOTMASTER)
526 except errors.ResolverError, err:
527 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
528 sys.exit(constants.EXIT_NODESETUP_ERROR)
529 except errors.JobQueueFull:
530 logging.error("Job queue is full, can't query cluster state")
531 except errors.JobQueueDrainError:
532 logging.error("Job queue is drained, can't maintain cluster state")
533 except Exception, err:
534 logging.error(str(err), exc_info=True)
535 sys.exit(constants.EXIT_FAILURE)
538 if __name__ == '__main__':