4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erronously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
34 from optparse import OptionParser
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import serializer
39 from ganeti import errors
40 from ganeti import opcodes
41 from ganeti import cli
45 BAD_STATES = ['ERROR_down']
46 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
49 KEY_RESTART_COUNT = "restart_count"
50 KEY_RESTART_WHEN = "restart_when"
51 KEY_BOOT_ID = "bootid"
54 # Global client object
58 class NotMasterError(errors.GenericError):
59 """Exception raised when this host is not the master."""
62 def Indent(s, prefix='| '):
63 """Indent a piece of text with a given prefix before each line.
65 @param s: the string to indent
66 @param prefix: the string to prepend each line
69 return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
72 class WatcherState(object):
73 """Interface to a state file recording restart attempts.
77 """Open, lock, read and parse the file.
79 Raises exception on lock contention.
82 # The two-step dance below is necessary to allow both opening existing
83 # file read/write and creating if not existing. Vanilla open will truncate
84 # an existing file -or- allow creating if not existing.
85 fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
86 self.statefile = os.fdopen(fd, 'w+')
88 utils.LockFile(self.statefile.fileno())
91 self._data = serializer.Load(self.statefile.read())
92 except Exception, msg:
93 # Ignore errors while loading the file and treat it as empty
95 logging.warning(("Empty or invalid state file. Using defaults."
96 " Error message: %s"), msg)
98 if "instance" not in self._data:
99 self._data["instance"] = {}
100 if "node" not in self._data:
101 self._data["node"] = {}
103 self._orig_data = serializer.Dump(self._data)
106 """Save state to file, then unlock and close it.
109 assert self.statefile
111 serialized_form = serializer.Dump(self._data)
112 if self._orig_data == serialized_form:
113 logging.debug("Data didn't change, just touching status file")
114 os.utime(constants.WATCHER_STATEFILE, None)
117 # We need to make sure the file is locked before renaming it, otherwise
118 # starting ganeti-watcher again at the same time will create a conflict.
119 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
120 data=serialized_form,
121 prewrite=utils.LockFile, close=False)
122 self.statefile = os.fdopen(fd, 'w+')
125 """Unlock configuration file and close it.
128 assert self.statefile
130 # Files are automatically unlocked when closing them
131 self.statefile.close()
132 self.statefile = None
134 def GetNodeBootID(self, name):
135 """Returns the last boot ID of a node or None.
138 ndata = self._data["node"]
140 if name in ndata and KEY_BOOT_ID in ndata[name]:
141 return ndata[name][KEY_BOOT_ID]
144 def SetNodeBootID(self, name, bootid):
145 """Sets the boot ID of a node.
150 ndata = self._data["node"]
152 if name not in ndata:
155 ndata[name][KEY_BOOT_ID] = bootid
157 def NumberOfRestartAttempts(self, instance):
158 """Returns number of previous restart attempts.
160 @type instance: L{Instance}
161 @param instance: the instance to look up
164 idata = self._data["instance"]
166 if instance.name in idata:
167 return idata[instance.name][KEY_RESTART_COUNT]
171 def RecordRestartAttempt(self, instance):
172 """Record a restart attempt.
174 @type instance: L{Instance}
175 @param instance: the instance being restarted
178 idata = self._data["instance"]
180 if instance.name not in idata:
181 inst = idata[instance.name] = {}
183 inst = idata[instance.name]
185 inst[KEY_RESTART_WHEN] = time.time()
186 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
188 def RemoveInstance(self, instance):
189 """Update state to reflect that a machine is running.
191 This method removes the record for a named instance (as we only
192 track down instances).
194 @type instance: L{Instance}
195 @param instance: the instance to remove from books
198 idata = self._data["instance"]
200 if instance.name in idata:
201 del idata[instance.name]
204 class Instance(object):
205 """Abstraction for a Virtual Machine instance.
208 def __init__(self, name, state, autostart):
211 self.autostart = autostart
214 """Encapsulates the start of an instance.
217 op = opcodes.OpStartupInstance(instance_name=self.name,
220 cli.SubmitOpCode(op, cl=client)
222 def ActivateDisks(self):
223 """Encapsulates the activation of all disks of an instance.
226 op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
227 cli.SubmitOpCode(op, cl=client)
230 def GetInstanceList(with_secondaries=None):
231 """Get a list of instances on this cluster.
234 fields = ["name", "status", "admin_state"]
236 if with_secondaries is not None:
237 fields.append("snodes")
239 result = client.QueryInstances([], fields, True)
242 for fields in result:
243 if with_secondaries is not None:
244 (name, status, autostart, snodes) = fields
249 for node in with_secondaries:
256 (name, status, autostart) = fields
258 instances.append(Instance(name, status, autostart))
263 def GetNodeBootIDs():
264 """Get a dict mapping nodes to boot IDs.
267 result = client.QueryNodes([], ["name", "bootid", "offline"], True)
268 return dict([(name, (bootid, offline)) for name, bootid, offline in result])
271 class Watcher(object):
272 """Encapsulate the logic for restarting erronously halted virtual machines.
274 The calling program should periodically instantiate me and call Run().
275 This will traverse the list of instances, and make up to MAXTRIES attempts
276 to restart machines that are down.
279 def __init__(self, opts):
280 master = client.QueryConfigValues(["master_node"])[0]
281 if master != utils.HostInfo().name:
282 raise NotMasterError("This is not the master node")
283 self.instances = GetInstanceList()
284 self.bootids = GetNodeBootIDs()
285 self.started_instances = set()
289 notepad = WatcherState()
291 self.ArchiveJobs(self.opts.job_age)
292 self.CheckInstances(notepad)
293 self.CheckDisks(notepad)
298 def ArchiveJobs(self, age):
302 arch_count, left_count = client.AutoArchiveJobs(age)
303 logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
305 def CheckDisks(self, notepad):
306 """Check all nodes for restarted ones.
310 for name, (new_id, offline) in self.bootids.iteritems():
311 old = notepad.GetNodeBootID(name)
313 # Bad node, not returning a boot id
315 logging.debug("Node %s missing boot id, skipping secondary checks",
319 # Node's boot ID has changed, proably through a reboot.
320 check_nodes.append(name)
323 # Activate disks for all instances with any of the checked nodes as a
325 for instance in GetInstanceList(with_secondaries=check_nodes):
326 if not instance.autostart:
327 logging.info(("Skipping disk activation for non-autostart"
328 " instance %s"), instance.name)
330 if instance.name in self.started_instances:
331 # we already tried to start the instance, which should have
332 # activated its drives (if they can be at all)
335 logging.info("Activating disks for instance %s", instance.name)
336 instance.ActivateDisks()
338 logging.exception("Error while activating disks for instance %s",
341 # Keep changed boot IDs
342 for name in check_nodes:
343 notepad.SetNodeBootID(name, self.bootids[name])
345 def CheckInstances(self, notepad):
346 """Make a pass over the list of instances, restarting downed ones.
349 for instance in self.instances:
350 if instance.state in BAD_STATES:
351 n = notepad.NumberOfRestartAttempts(instance)
357 last = " (Attempt #%d)" % (n + 1)
359 notepad.RecordRestartAttempt(instance)
360 logging.error("Could not restart %s after %d attempts, giving up",
361 instance.name, MAXTRIES)
364 logging.info("Restarting %s%s",
367 self.started_instances.add(instance.name)
369 logging.exception("Error while restarting instance %s",
372 notepad.RecordRestartAttempt(instance)
373 elif instance.state in HELPLESS_STATES:
374 if notepad.NumberOfRestartAttempts(instance):
375 notepad.RemoveInstance(instance)
377 if notepad.NumberOfRestartAttempts(instance):
378 notepad.RemoveInstance(instance)
379 logging.info("Restart of %s succeeded", instance.name)
383 """Run gnt-cluster verify-disks.
386 op = opcodes.OpVerifyDisks()
387 result = cli.SubmitOpCode(op, cl=client)
388 if not isinstance(result, (tuple, list)):
389 logging.error("Can't get a valid result from verify-disks")
391 offline_disk_instances = result[2]
392 if not offline_disk_instances:
395 logging.debug("Will activate disks for instances %s",
396 ", ".join(offline_disk_instances))
397 # we submit only one job, and wait for it. not optimal, but spams
399 job = [opcodes.OpActivateInstanceDisks(instance_name=name)
400 for name in offline_disk_instances]
401 job_id = cli.SendJob(job, cl=client)
403 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
407 """Parse the command line options.
409 @return: (options, args) as from OptionParser.parse_args()
412 parser = OptionParser(description="Ganeti cluster watcher",
414 version="%%prog (ganeti) %s" %
415 constants.RELEASE_VERSION)
417 parser.add_option("-d", "--debug", dest="debug",
418 help="Write all messages to stderr",
419 default=False, action="store_true")
420 parser.add_option("-A", "--job-age", dest="job_age",
421 help="Autoarchive jobs older than this age (default"
422 " 6 hours)", default=6*3600)
423 options, args = parser.parse_args()
424 options.job_age = cli.ParseTimespec(options.job_age)
434 options, args = ParseOptions()
436 utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
437 stderr_logging=options.debug)
440 client = cli.GetClient()
443 watcher = Watcher(options)
444 except errors.ConfigurationError:
445 # Just exit if there's no configuration
446 sys.exit(constants.EXIT_SUCCESS)
451 except NotMasterError:
452 logging.debug("Not master, exiting")
453 sys.exit(constants.EXIT_NOTMASTER)
454 except errors.ResolverError, err:
455 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
456 sys.exit(constants.EXIT_NODESETUP_ERROR)
457 except Exception, err:
458 logging.error(str(err), exc_info=True)
459 sys.exit(constants.EXIT_FAILURE)
462 if __name__ == '__main__':