4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erronously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
35 from optparse import OptionParser
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import serializer
40 from ganeti import errors
41 from ganeti import opcodes
42 from ganeti import cli
43 from ganeti import luxi
47 BAD_STATES = ['ERROR_down']
48 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
51 KEY_RESTART_COUNT = "restart_count"
52 KEY_RESTART_WHEN = "restart_when"
53 KEY_BOOT_ID = "bootid"
56 # Global client object
60 class NotMasterError(errors.GenericError):
61 """Exception raised when this host is not the master."""
64 def Indent(s, prefix='| '):
65 """Indent a piece of text with a given prefix before each line.
67 @param s: the string to indent
68 @param prefix: the string to prepend each line
71 return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
75 """Check whether we should pause.
78 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
81 def EnsureDaemon(name):
82 """Check for and start daemon if not alive.
85 result = utils.RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
87 logging.error("Can't start daemon '%s', failure %s, output: %s",
88 name, result.fail_reason, result.output)
94 class WatcherState(object):
95 """Interface to a state file recording restart attempts.
99 """Open, lock, read and parse the file.
101 Raises exception on lock contention.
104 # The two-step dance below is necessary to allow both opening existing
105 # file read/write and creating if not existing. Vanilla open will truncate
106 # an existing file -or- allow creating if not existing.
107 fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
108 self.statefile = os.fdopen(fd, 'w+')
110 utils.LockFile(self.statefile.fileno())
113 state_data = self.statefile.read()
117 self._data = serializer.Load(state_data)
118 except Exception, msg:
119 # Ignore errors while loading the file and treat it as empty
121 logging.warning(("Invalid state file. Using defaults."
122 " Error message: %s"), msg)
124 if "instance" not in self._data:
125 self._data["instance"] = {}
126 if "node" not in self._data:
127 self._data["node"] = {}
129 self._orig_data = serializer.Dump(self._data)
132 """Save state to file, then unlock and close it.
135 assert self.statefile
137 serialized_form = serializer.Dump(self._data)
138 if self._orig_data == serialized_form:
139 logging.debug("Data didn't change, just touching status file")
140 os.utime(constants.WATCHER_STATEFILE, None)
143 # We need to make sure the file is locked before renaming it, otherwise
144 # starting ganeti-watcher again at the same time will create a conflict.
145 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
146 data=serialized_form,
147 prewrite=utils.LockFile, close=False)
148 self.statefile = os.fdopen(fd, 'w+')
151 """Unlock configuration file and close it.
154 assert self.statefile
156 # Files are automatically unlocked when closing them
157 self.statefile.close()
158 self.statefile = None
160 def GetNodeBootID(self, name):
161 """Returns the last boot ID of a node or None.
164 ndata = self._data["node"]
166 if name in ndata and KEY_BOOT_ID in ndata[name]:
167 return ndata[name][KEY_BOOT_ID]
170 def SetNodeBootID(self, name, bootid):
171 """Sets the boot ID of a node.
176 ndata = self._data["node"]
178 if name not in ndata:
181 ndata[name][KEY_BOOT_ID] = bootid
183 def NumberOfRestartAttempts(self, instance):
184 """Returns number of previous restart attempts.
186 @type instance: L{Instance}
187 @param instance: the instance to look up
190 idata = self._data["instance"]
192 if instance.name in idata:
193 return idata[instance.name][KEY_RESTART_COUNT]
197 def RecordRestartAttempt(self, instance):
198 """Record a restart attempt.
200 @type instance: L{Instance}
201 @param instance: the instance being restarted
204 idata = self._data["instance"]
206 if instance.name not in idata:
207 inst = idata[instance.name] = {}
209 inst = idata[instance.name]
211 inst[KEY_RESTART_WHEN] = time.time()
212 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
214 def RemoveInstance(self, instance):
215 """Update state to reflect that a machine is running.
217 This method removes the record for a named instance (as we only
218 track down instances).
220 @type instance: L{Instance}
221 @param instance: the instance to remove from books
224 idata = self._data["instance"]
226 if instance.name in idata:
227 del idata[instance.name]
230 class Instance(object):
231 """Abstraction for a Virtual Machine instance.
234 def __init__(self, name, state, autostart):
237 self.autostart = autostart
240 """Encapsulates the start of an instance.
243 op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
244 cli.SubmitOpCode(op, cl=client)
246 def ActivateDisks(self):
247 """Encapsulates the activation of all disks of an instance.
250 op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
251 cli.SubmitOpCode(op, cl=client)
254 def GetClusterData():
255 """Get a list of instances on this cluster.
258 op1_fields = ["name", "status", "admin_state", "snodes"]
259 op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
261 op2_fields = ["name", "bootid", "offline"]
262 op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
265 job_id = client.SubmitJob([op1, op2])
267 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
269 logging.debug("Got data from cluster, writing instance status file")
271 result = all_results[0]
277 up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
278 utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
280 for fields in result:
281 (name, status, autostart, snodes) = fields
283 # update the secondary node map
287 smap[node].append(name)
289 instances[name] = Instance(name, status, autostart)
291 nodes = dict([(name, (bootid, offline))
292 for name, bootid, offline in all_results[1]])
294 client.ArchiveJob(job_id)
296 return instances, nodes, smap
299 class Watcher(object):
300 """Encapsulate the logic for restarting erronously halted virtual machines.
302 The calling program should periodically instantiate me and call Run().
303 This will traverse the list of instances, and make up to MAXTRIES attempts
304 to restart machines that are down.
307 def __init__(self, opts, notepad):
308 self.notepad = notepad
309 master = client.QueryConfigValues(["master_node"])[0]
310 if master != utils.HostInfo().name:
311 raise NotMasterError("This is not the master node")
312 # first archive old jobs
313 self.ArchiveJobs(opts.job_age)
314 # and only then submit new ones
315 self.instances, self.bootids, self.smap = GetClusterData()
316 self.started_instances = set()
320 """Watcher run sequence.
323 notepad = self.notepad
324 self.CheckInstances(notepad)
325 self.CheckDisks(notepad)
329 def ArchiveJobs(age):
333 arch_count, left_count = client.AutoArchiveJobs(age)
334 logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
336 def CheckDisks(self, notepad):
337 """Check all nodes for restarted ones.
341 for name, (new_id, offline) in self.bootids.iteritems():
342 old = notepad.GetNodeBootID(name)
344 # Bad node, not returning a boot id
346 logging.debug("Node %s missing boot id, skipping secondary checks",
350 # Node's boot ID has changed, proably through a reboot.
351 check_nodes.append(name)
354 # Activate disks for all instances with any of the checked nodes as a
356 for node in check_nodes:
357 if node not in self.smap:
359 for instance_name in self.smap[node]:
360 instance = self.instances[instance_name]
361 if not instance.autostart:
362 logging.info(("Skipping disk activation for non-autostart"
363 " instance %s"), instance.name)
365 if instance.name in self.started_instances:
366 # we already tried to start the instance, which should have
367 # activated its drives (if they can be at all)
370 logging.info("Activating disks for instance %s", instance.name)
371 instance.ActivateDisks()
373 logging.exception("Error while activating disks for instance %s",
376 # Keep changed boot IDs
377 for name in check_nodes:
378 notepad.SetNodeBootID(name, self.bootids[name][0])
380 def CheckInstances(self, notepad):
381 """Make a pass over the list of instances, restarting downed ones.
384 for instance in self.instances.values():
385 if instance.state in BAD_STATES:
386 n = notepad.NumberOfRestartAttempts(instance)
392 last = " (Attempt #%d)" % (n + 1)
394 notepad.RecordRestartAttempt(instance)
395 logging.error("Could not restart %s after %d attempts, giving up",
396 instance.name, MAXTRIES)
399 logging.info("Restarting %s%s",
402 self.started_instances.add(instance.name)
404 logging.exception("Error while restarting instance %s",
407 notepad.RecordRestartAttempt(instance)
408 elif instance.state in HELPLESS_STATES:
409 if notepad.NumberOfRestartAttempts(instance):
410 notepad.RemoveInstance(instance)
412 if notepad.NumberOfRestartAttempts(instance):
413 notepad.RemoveInstance(instance)
414 logging.info("Restart of %s succeeded", instance.name)
418 """Run gnt-cluster verify-disks.
421 op = opcodes.OpVerifyDisks()
422 job_id = client.SubmitJob([op])
423 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
424 client.ArchiveJob(job_id)
425 if not isinstance(result, (tuple, list)):
426 logging.error("Can't get a valid result from verify-disks")
428 offline_disk_instances = result[2]
429 if not offline_disk_instances:
432 logging.debug("Will activate disks for instances %s",
433 utils.CommaJoin(offline_disk_instances))
434 # we submit only one job, and wait for it. not optimal, but spams
436 job = [opcodes.OpActivateInstanceDisks(instance_name=name)
437 for name in offline_disk_instances]
438 job_id = cli.SendJob(job, cl=client)
440 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
444 """Parse the command line options.
446 @return: (options, args) as from OptionParser.parse_args()
449 parser = OptionParser(description="Ganeti cluster watcher",
451 version="%%prog (ganeti) %s" %
452 constants.RELEASE_VERSION)
454 parser.add_option(cli.DEBUG_OPT)
455 parser.add_option("-A", "--job-age", dest="job_age",
456 help="Autoarchive jobs older than this age (default"
457 " 6 hours)", default=6*3600)
458 options, args = parser.parse_args()
459 options.job_age = cli.ParseTimespec(options.job_age)
469 options, args = ParseOptions()
471 utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
472 stderr_logging=options.debug)
475 logging.debug("Pause has been set, exiting")
476 sys.exit(constants.EXIT_SUCCESS)
480 # on master or not, try to start the node dameon
481 EnsureDaemon(constants.NODED)
483 notepad = WatcherState()
486 client = cli.GetClient()
487 except errors.OpPrereqError:
488 # this is, from cli.GetClient, a not-master case
489 logging.debug("Not on master, exiting")
491 sys.exit(constants.EXIT_SUCCESS)
492 except luxi.NoMasterError, err:
493 logging.warning("Master seems to be down (%s), trying to restart",
495 if not EnsureDaemon(constants.MASTERD):
496 logging.critical("Can't start the master, exiting")
497 sys.exit(constants.EXIT_FAILURE)
498 # else retry the connection
499 client = cli.GetClient()
501 # we are on master now
502 EnsureDaemon(constants.RAPI)
505 watcher = Watcher(options, notepad)
506 except errors.ConfigurationError:
507 # Just exit if there's no configuration
509 sys.exit(constants.EXIT_SUCCESS)
518 logging.debug("Not updating status file due to failure")
521 except NotMasterError:
522 logging.debug("Not master, exiting")
523 sys.exit(constants.EXIT_NOTMASTER)
524 except errors.ResolverError, err:
525 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
526 sys.exit(constants.EXIT_NODESETUP_ERROR)
527 except errors.JobQueueFull:
528 logging.error("Job queue is full, can't query cluster state")
529 except errors.JobQueueDrainError:
530 logging.error("Job queue is drained, can't maintain cluster state")
531 except Exception, err:
532 logging.error(str(err), exc_info=True)
533 sys.exit(constants.EXIT_FAILURE)
536 if __name__ == '__main__':