4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erronously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
34 from optparse import OptionParser
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import serializer
39 from ganeti import errors
40 from ganeti import opcodes
41 from ganeti import cli
45 BAD_STATES = ['ERROR_down']
46 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
49 KEY_RESTART_COUNT = "restart_count"
50 KEY_RESTART_WHEN = "restart_when"
51 KEY_BOOT_ID = "bootid"
54 # Global client object
58 class NotMasterError(errors.GenericError):
59 """Exception raised when this host is not the master."""
62 def Indent(s, prefix='| '):
63 """Indent a piece of text with a given prefix before each line.
65 @param s: the string to indent
66 @param prefix: the string to prepend each line
69 return "%s%s\n" % (prefix, ('\n' + prefix).join(s.splitlines()))
72 class WatcherState(object):
73 """Interface to a state file recording restart attempts.
77 """Open, lock, read and parse the file.
79 Raises exception on lock contention.
82 # The two-step dance below is necessary to allow both opening existing
83 # file read/write and creating if not existing. Vanilla open will truncate
84 # an existing file -or- allow creating if not existing.
85 fd = os.open(constants.WATCHER_STATEFILE, os.O_RDWR | os.O_CREAT)
86 self.statefile = os.fdopen(fd, 'w+')
88 utils.LockFile(self.statefile.fileno())
91 self._data = serializer.Load(self.statefile.read())
92 except Exception, msg:
93 # Ignore errors while loading the file and treat it as empty
95 logging.warning(("Empty or invalid state file. Using defaults."
96 " Error message: %s"), msg)
98 if "instance" not in self._data:
99 self._data["instance"] = {}
100 if "node" not in self._data:
101 self._data["node"] = {}
103 self._orig_data = serializer.Dump(self._data)
106 """Save state to file, then unlock and close it.
109 assert self.statefile
111 serialized_form = serializer.Dump(self._data)
112 if self._orig_data == serialized_form:
113 logging.debug("Data didn't change, just touching status file")
114 os.utime(constants.WATCHER_STATEFILE, None)
117 # We need to make sure the file is locked before renaming it, otherwise
118 # starting ganeti-watcher again at the same time will create a conflict.
119 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
120 data=serialized_form,
121 prewrite=utils.LockFile, close=False)
122 self.statefile = os.fdopen(fd, 'w+')
125 """Unlock configuration file and close it.
128 assert self.statefile
130 # Files are automatically unlocked when closing them
131 self.statefile.close()
132 self.statefile = None
134 def GetNodeBootID(self, name):
135 """Returns the last boot ID of a node or None.
138 ndata = self._data["node"]
140 if name in ndata and KEY_BOOT_ID in ndata[name]:
141 return ndata[name][KEY_BOOT_ID]
144 def SetNodeBootID(self, name, bootid):
145 """Sets the boot ID of a node.
150 ndata = self._data["node"]
152 if name not in ndata:
155 ndata[name][KEY_BOOT_ID] = bootid
157 def NumberOfRestartAttempts(self, instance):
158 """Returns number of previous restart attempts.
160 @type instance: L{Instance}
161 @param instance: the instance to look up
164 idata = self._data["instance"]
166 if instance.name in idata:
167 return idata[instance.name][KEY_RESTART_COUNT]
171 def RecordRestartAttempt(self, instance):
172 """Record a restart attempt.
174 @type instance: L{Instance}
175 @param instance: the instance being restarted
178 idata = self._data["instance"]
180 if instance.name not in idata:
181 inst = idata[instance.name] = {}
183 inst = idata[instance.name]
185 inst[KEY_RESTART_WHEN] = time.time()
186 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
188 def RemoveInstance(self, instance):
189 """Update state to reflect that a machine is running.
191 This method removes the record for a named instance (as we only
192 track down instances).
194 @type instance: L{Instance}
195 @param instance: the instance to remove from books
198 idata = self._data["instance"]
200 if instance.name in idata:
201 del idata[instance.name]
204 class Instance(object):
205 """Abstraction for a Virtual Machine instance.
208 def __init__(self, name, state, autostart):
211 self.autostart = autostart
214 """Encapsulates the start of an instance.
217 op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
218 cli.SubmitOpCode(op, cl=client)
220 def ActivateDisks(self):
221 """Encapsulates the activation of all disks of an instance.
224 op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
225 cli.SubmitOpCode(op, cl=client)
228 def GetInstanceList(with_secondaries=None):
229 """Get a list of instances on this cluster.
232 fields = ["name", "status", "admin_state"]
234 if with_secondaries is not None:
235 fields.append("snodes")
237 result = client.QueryInstances([], fields, True)
240 for fields in result:
241 if with_secondaries is not None:
242 (name, status, autostart, snodes) = fields
247 for node in with_secondaries:
254 (name, status, autostart) = fields
256 instances.append(Instance(name, status, autostart))
261 def GetNodeBootIDs():
262 """Get a dict mapping nodes to boot IDs.
265 result = client.QueryNodes([], ["name", "bootid", "offline"], True)
266 return dict([(name, (bootid, offline)) for name, bootid, offline in result])
269 class Watcher(object):
270 """Encapsulate the logic for restarting erronously halted virtual machines.
272 The calling program should periodically instantiate me and call Run().
273 This will traverse the list of instances, and make up to MAXTRIES attempts
274 to restart machines that are down.
277 def __init__(self, opts):
278 master = client.QueryConfigValues(["master_node"])[0]
279 if master != utils.HostInfo().name:
280 raise NotMasterError("This is not the master node")
281 self.instances = GetInstanceList()
282 self.bootids = GetNodeBootIDs()
283 self.started_instances = set()
287 notepad = WatcherState()
289 self.ArchiveJobs(self.opts.job_age)
290 self.CheckInstances(notepad)
291 self.CheckDisks(notepad)
296 def ArchiveJobs(self, age):
300 arch_count, left_count = client.AutoArchiveJobs(age)
301 logging.debug("Archived %s jobs, left %s" % (arch_count, left_count))
303 def CheckDisks(self, notepad):
304 """Check all nodes for restarted ones.
308 for name, (new_id, offline) in self.bootids.iteritems():
309 old = notepad.GetNodeBootID(name)
311 # Bad node, not returning a boot id
313 logging.debug("Node %s missing boot id, skipping secondary checks",
317 # Node's boot ID has changed, proably through a reboot.
318 check_nodes.append(name)
321 # Activate disks for all instances with any of the checked nodes as a
323 for instance in GetInstanceList(with_secondaries=check_nodes):
324 if not instance.autostart:
325 logging.info(("Skipping disk activation for non-autostart"
326 " instance %s"), instance.name)
328 if instance.name in self.started_instances:
329 # we already tried to start the instance, which should have
330 # activated its drives (if they can be at all)
333 logging.info("Activating disks for instance %s", instance.name)
334 instance.ActivateDisks()
336 logging.exception("Error while activating disks for instance %s",
339 # Keep changed boot IDs
340 for name in check_nodes:
341 notepad.SetNodeBootID(name, self.bootids[name][0])
343 def CheckInstances(self, notepad):
344 """Make a pass over the list of instances, restarting downed ones.
347 for instance in self.instances:
348 if instance.state in BAD_STATES:
349 n = notepad.NumberOfRestartAttempts(instance)
355 last = " (Attempt #%d)" % (n + 1)
357 notepad.RecordRestartAttempt(instance)
358 logging.error("Could not restart %s after %d attempts, giving up",
359 instance.name, MAXTRIES)
362 logging.info("Restarting %s%s",
365 self.started_instances.add(instance.name)
367 logging.exception("Error while restarting instance %s",
370 notepad.RecordRestartAttempt(instance)
371 elif instance.state in HELPLESS_STATES:
372 if notepad.NumberOfRestartAttempts(instance):
373 notepad.RemoveInstance(instance)
375 if notepad.NumberOfRestartAttempts(instance):
376 notepad.RemoveInstance(instance)
377 logging.info("Restart of %s succeeded", instance.name)
381 """Run gnt-cluster verify-disks.
384 op = opcodes.OpVerifyDisks()
385 result = cli.SubmitOpCode(op, cl=client)
386 if not isinstance(result, (tuple, list)):
387 logging.error("Can't get a valid result from verify-disks")
389 offline_disk_instances = result[2]
390 if not offline_disk_instances:
393 logging.debug("Will activate disks for instances %s",
394 ", ".join(offline_disk_instances))
395 # we submit only one job, and wait for it. not optimal, but spams
397 job = [opcodes.OpActivateInstanceDisks(instance_name=name)
398 for name in offline_disk_instances]
399 job_id = cli.SendJob(job, cl=client)
401 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
405 """Parse the command line options.
407 @return: (options, args) as from OptionParser.parse_args()
410 parser = OptionParser(description="Ganeti cluster watcher",
412 version="%%prog (ganeti) %s" %
413 constants.RELEASE_VERSION)
415 parser.add_option("-d", "--debug", dest="debug",
416 help="Write all messages to stderr",
417 default=False, action="store_true")
418 parser.add_option("-A", "--job-age", dest="job_age",
419 help="Autoarchive jobs older than this age (default"
420 " 6 hours)", default=6*3600)
421 options, args = parser.parse_args()
422 options.job_age = cli.ParseTimespec(options.job_age)
432 options, args = ParseOptions()
434 utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
435 stderr_logging=options.debug)
438 client = cli.GetClient()
441 watcher = Watcher(options)
442 except errors.ConfigurationError:
443 # Just exit if there's no configuration
444 sys.exit(constants.EXIT_SUCCESS)
449 except NotMasterError:
450 logging.debug("Not master, exiting")
451 sys.exit(constants.EXIT_NOTMASTER)
452 except errors.ResolverError, err:
453 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
454 sys.exit(constants.EXIT_NODESETUP_ERROR)
455 except Exception, err:
456 logging.error(str(err), exc_info=True)
457 sys.exit(constants.EXIT_FAILURE)
460 if __name__ == '__main__':