4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erroneously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
35 from optparse import OptionParser
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import compat
40 from ganeti import serializer
41 from ganeti import errors
42 from ganeti import opcodes
43 from ganeti import cli
44 from ganeti import luxi
45 from ganeti import rapi
46 from ganeti import netutils
48 import ganeti.rapi.client # pylint: disable-msg=W0611
49 import ganeti.watcher.nodemaint # pylint: disable-msg=W0611
53 # Delete any record that is older than 8 hours; this value is based on
54 # the fact that the current retry counter is 5, and watcher runs every
55 # 5 minutes, so it takes around half an hour to exceed the retry
56 # counter, so 8 hours (16*1/2h) seems like a reasonable reset time
57 RETRY_EXPIRATION = 8 * 3600
58 BAD_STATES = [constants.INSTST_ERRORDOWN]
59 HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
62 KEY_RESTART_COUNT = "restart_count"
63 KEY_RESTART_WHEN = "restart_when"
64 KEY_BOOT_ID = "bootid"
67 # Global LUXI client object
71 class NotMasterError(errors.GenericError):
72 """Exception raised when this host is not the master."""
76 """Check whether we should pause.
79 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
82 def StartNodeDaemons():
83 """Start all the daemons that should be running on all nodes.
86 # on master or not, try to start the node daemon
87 utils.EnsureDaemon(constants.NODED)
88 # start confd as well. On non candidates it will be in disabled mode.
89 utils.EnsureDaemon(constants.CONFD)
92 def RunWatcherHooks():
93 """Run the watcher hooks.
96 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
97 constants.HOOKS_NAME_WATCHER)
98 if not os.path.isdir(hooks_dir):
102 results = utils.RunParts(hooks_dir)
103 except Exception: # pylint: disable-msg=W0703
104 logging.exception("RunParts %s failed: %s", hooks_dir)
107 for (relname, status, runresult) in results:
108 if status == constants.RUNPARTS_SKIP:
109 logging.debug("Watcher hook %s: skipped", relname)
110 elif status == constants.RUNPARTS_ERR:
111 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
112 elif status == constants.RUNPARTS_RUN:
114 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
115 relname, runresult.exit_code, runresult.output)
117 logging.debug("Watcher hook %s: success (output: %s)", relname,
121 class WatcherState(object):
122 """Interface to a state file recording restart attempts.
125 def __init__(self, statefile):
126 """Open, lock, read and parse the file.
128 @type statefile: file
129 @param statefile: State file object
132 self.statefile = statefile
135 state_data = self.statefile.read()
139 self._data = serializer.Load(state_data)
140 except Exception, msg: # pylint: disable-msg=W0703
141 # Ignore errors while loading the file and treat it as empty
143 logging.warning(("Invalid state file. Using defaults."
144 " Error message: %s"), msg)
146 if "instance" not in self._data:
147 self._data["instance"] = {}
148 if "node" not in self._data:
149 self._data["node"] = {}
151 self._orig_data = serializer.Dump(self._data)
154 """Save state to file, then unlock and close it.
157 assert self.statefile
159 serialized_form = serializer.Dump(self._data)
160 if self._orig_data == serialized_form:
161 logging.debug("Data didn't change, just touching status file")
162 os.utime(constants.WATCHER_STATEFILE, None)
165 # We need to make sure the file is locked before renaming it, otherwise
166 # starting ganeti-watcher again at the same time will create a conflict.
167 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
168 data=serialized_form,
169 prewrite=utils.LockFile, close=False)
170 self.statefile = os.fdopen(fd, 'w+')
173 """Unlock configuration file and close it.
176 assert self.statefile
178 # Files are automatically unlocked when closing them
179 self.statefile.close()
180 self.statefile = None
182 def GetNodeBootID(self, name):
183 """Returns the last boot ID of a node or None.
186 ndata = self._data["node"]
188 if name in ndata and KEY_BOOT_ID in ndata[name]:
189 return ndata[name][KEY_BOOT_ID]
192 def SetNodeBootID(self, name, bootid):
193 """Sets the boot ID of a node.
198 ndata = self._data["node"]
200 if name not in ndata:
203 ndata[name][KEY_BOOT_ID] = bootid
205 def NumberOfRestartAttempts(self, instance):
206 """Returns number of previous restart attempts.
208 @type instance: L{Instance}
209 @param instance: the instance to look up
212 idata = self._data["instance"]
214 if instance.name in idata:
215 return idata[instance.name][KEY_RESTART_COUNT]
219 def MaintainInstanceList(self, instances):
220 """Perform maintenance on the recorded instances.
222 @type instances: list of string
223 @param instances: the list of currently existing instances
226 idict = self._data["instance"]
227 # First, delete obsolete instances
228 obsolete_instances = set(idict).difference(instances)
229 for inst in obsolete_instances:
230 logging.debug("Forgetting obsolete instance %s", inst)
233 # Second, delete expired records
234 earliest = time.time() - RETRY_EXPIRATION
235 expired_instances = [i for i in idict
236 if idict[i][KEY_RESTART_WHEN] < earliest]
237 for inst in expired_instances:
238 logging.debug("Expiring record for instance %s", inst)
241 def RecordRestartAttempt(self, instance):
242 """Record a restart attempt.
244 @type instance: L{Instance}
245 @param instance: the instance being restarted
248 idata = self._data["instance"]
250 if instance.name not in idata:
251 inst = idata[instance.name] = {}
253 inst = idata[instance.name]
255 inst[KEY_RESTART_WHEN] = time.time()
256 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
258 def RemoveInstance(self, instance):
259 """Update state to reflect that a machine is running.
261 This method removes the record for a named instance (as we only
262 track down instances).
264 @type instance: L{Instance}
265 @param instance: the instance to remove from books
268 idata = self._data["instance"]
270 if instance.name in idata:
271 del idata[instance.name]
274 class Instance(object):
275 """Abstraction for a Virtual Machine instance.
278 def __init__(self, name, state, autostart, snodes):
281 self.autostart = autostart
285 """Encapsulates the start of an instance.
288 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
289 cli.SubmitOpCode(op, cl=client)
291 def ActivateDisks(self):
292 """Encapsulates the activation of all disks of an instance.
295 op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
296 cli.SubmitOpCode(op, cl=client)
299 def GetClusterData():
300 """Get a list of instances on this cluster.
303 op1_fields = ["name", "status", "admin_state", "snodes"]
304 op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
306 op2_fields = ["name", "bootid", "offline"]
307 op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
310 job_id = client.SubmitJob([op1, op2])
312 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
314 logging.debug("Got data from cluster, writing instance status file")
316 result = all_results[0]
322 up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
323 utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
325 for fields in result:
326 (name, status, autostart, snodes) = fields
328 # update the secondary node map
332 smap[node].append(name)
334 instances[name] = Instance(name, status, autostart, snodes)
336 nodes = dict([(name, (bootid, offline))
337 for name, bootid, offline in all_results[1]])
339 client.ArchiveJob(job_id)
341 return instances, nodes, smap
344 class Watcher(object):
345 """Encapsulate the logic for restarting erroneously halted virtual machines.
347 The calling program should periodically instantiate me and call Run().
348 This will traverse the list of instances, and make up to MAXTRIES attempts
349 to restart machines that are down.
352 def __init__(self, opts, notepad):
353 self.notepad = notepad
354 master = client.QueryConfigValues(["master_node"])[0]
355 if master != netutils.Hostname.GetSysName():
356 raise NotMasterError("This is not the master node")
357 # first archive old jobs
358 self.ArchiveJobs(opts.job_age)
359 # and only then submit new ones
360 self.instances, self.bootids, self.smap = GetClusterData()
361 self.started_instances = set()
365 """Watcher run sequence.
368 notepad = self.notepad
369 self.CheckInstances(notepad)
370 self.CheckDisks(notepad)
374 def ArchiveJobs(age):
378 arch_count, left_count = client.AutoArchiveJobs(age)
379 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
381 def CheckDisks(self, notepad):
382 """Check all nodes for restarted ones.
386 for name, (new_id, offline) in self.bootids.iteritems():
387 old = notepad.GetNodeBootID(name)
389 # Bad node, not returning a boot id
391 logging.debug("Node %s missing boot id, skipping secondary checks",
395 # Node's boot ID has changed, proably through a reboot.
396 check_nodes.append(name)
399 # Activate disks for all instances with any of the checked nodes as a
401 for node in check_nodes:
402 if node not in self.smap:
404 for instance_name in self.smap[node]:
405 instance = self.instances[instance_name]
406 if not instance.autostart:
407 logging.info(("Skipping disk activation for non-autostart"
408 " instance %s"), instance.name)
410 if instance.name in self.started_instances:
411 # we already tried to start the instance, which should have
412 # activated its drives (if they can be at all)
413 logging.debug("Skipping disk activation for instance %s, as"
414 " it was already started", instance.name)
417 logging.info("Activating disks for instance %s", instance.name)
418 instance.ActivateDisks()
419 except Exception: # pylint: disable-msg=W0703
420 logging.exception("Error while activating disks for instance %s",
423 # Keep changed boot IDs
424 for name in check_nodes:
425 notepad.SetNodeBootID(name, self.bootids[name][0])
427 def CheckInstances(self, notepad):
428 """Make a pass over the list of instances, restarting downed ones.
431 notepad.MaintainInstanceList(self.instances.keys())
433 for instance in self.instances.values():
434 if instance.state in BAD_STATES:
435 n = notepad.NumberOfRestartAttempts(instance)
438 logging.warning("Not restarting instance %s, retries exhausted",
442 last = " (Attempt #%d)" % (n + 1)
444 notepad.RecordRestartAttempt(instance)
445 logging.error("Could not restart %s after %d attempts, giving up",
446 instance.name, MAXTRIES)
449 logging.info("Restarting %s%s", instance.name, last)
451 self.started_instances.add(instance.name)
452 except Exception: # pylint: disable-msg=W0703
453 logging.exception("Error while restarting instance %s",
456 notepad.RecordRestartAttempt(instance)
457 elif instance.state in HELPLESS_STATES:
458 if notepad.NumberOfRestartAttempts(instance):
459 notepad.RemoveInstance(instance)
461 if notepad.NumberOfRestartAttempts(instance):
462 notepad.RemoveInstance(instance)
463 logging.info("Restart of %s succeeded", instance.name)
465 def _CheckForOfflineNodes(self, instance):
466 """Checks if given instances has any secondary in offline status.
468 @param instance: The instance object
469 @return: True if any of the secondary is offline, False otherwise
473 for node in instance.snodes:
474 bootids.append(self.bootids[node])
476 return compat.any(offline for (_, offline) in bootids)
478 def VerifyDisks(self):
479 """Run gnt-cluster verify-disks.
482 job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
483 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
484 client.ArchiveJob(job_id)
486 # Keep track of submitted jobs
487 jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
490 for (status, job_id) in result[constants.JOB_IDS_KEY]:
491 jex.AddJobId(None, status, job_id)
493 archive_jobs.add(job_id)
495 offline_disk_instances = set()
497 for (status, result) in jex.GetResults():
499 logging.error("Verify-disks job failed: %s", result)
502 ((_, instances, _), ) = result
504 offline_disk_instances.update(instances)
506 for job_id in archive_jobs:
507 client.ArchiveJob(job_id)
509 if not offline_disk_instances:
511 logging.debug("verify-disks reported no offline disks, nothing to do")
514 logging.debug("Will activate disks for instance(s) %s",
515 utils.CommaJoin(offline_disk_instances))
517 # we submit only one job, and wait for it. not optimal, but spams
520 for name in offline_disk_instances:
521 instance = self.instances[name]
522 if (instance.state in HELPLESS_STATES or
523 self._CheckForOfflineNodes(instance)):
524 logging.info("Skip instance %s because it is in helpless state or has"
525 " one offline secondary", name)
527 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
530 job_id = cli.SendJob(job, cl=client)
533 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
534 except Exception: # pylint: disable-msg=W0703
535 logging.exception("Error while activating disks")
538 def OpenStateFile(path):
539 """Opens the state file and acquires a lock on it.
542 @param path: Path to state file
545 # The two-step dance below is necessary to allow both opening existing
546 # file read/write and creating if not existing. Vanilla open will truncate
547 # an existing file -or- allow creating if not existing.
548 statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
550 # Try to acquire lock on state file. If this fails, another watcher instance
551 # might already be running or another program is temporarily blocking the
552 # watcher from running.
554 utils.LockFile(statefile_fd)
555 except errors.LockError, err:
556 logging.error("Can't acquire lock on state file %s: %s", path, err)
559 return os.fdopen(statefile_fd, "w+")
562 def IsRapiResponding(hostname):
563 """Connects to RAPI port and does a simple test.
565 Connects to RAPI port of hostname and does a simple test. At this time, the
568 @type hostname: string
569 @param hostname: hostname of the node to connect to.
571 @return: Whether RAPI is working properly
574 curl_config = rapi.client.GenericCurlConfig()
575 rapi_client = rapi.client.GanetiRapiClient(hostname,
576 curl_config_fn=curl_config)
578 master_version = rapi_client.GetVersion()
579 except rapi.client.CertificateError, err:
580 logging.warning("RAPI Error: CertificateError (%s)", err)
582 except rapi.client.GanetiApiError, err:
583 logging.warning("RAPI Error: GanetiApiError (%s)", err)
585 logging.debug("RAPI Result: master_version is %s", master_version)
586 return master_version == constants.RAPI_VERSION
590 """Parse the command line options.
592 @return: (options, args) as from OptionParser.parse_args()
595 parser = OptionParser(description="Ganeti cluster watcher",
597 version="%%prog (ganeti) %s" %
598 constants.RELEASE_VERSION)
600 parser.add_option(cli.DEBUG_OPT)
601 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
602 help="Autoarchive jobs older than this age (default"
604 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
605 action="store_true", help="Ignore cluster pause setting")
606 options, args = parser.parse_args()
607 options.job_age = cli.ParseTimespec(options.job_age)
610 parser.error("No arguments expected")
612 return (options, args)
615 @rapi.client.UsesRapiClient
620 global client # pylint: disable-msg=W0603
622 (options, _) = ParseOptions()
624 utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
625 debug=options.debug, stderr_logging=options.debug)
627 if ShouldPause() and not options.ignore_pause:
628 logging.debug("Pause has been set, exiting")
629 return constants.EXIT_SUCCESS
631 statefile = OpenStateFile(constants.WATCHER_STATEFILE)
633 return constants.EXIT_FAILURE
639 # run node maintenance in all cases, even if master, so that old
640 # masters can be properly cleaned up too
641 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable-msg=E0602
642 nodemaint.NodeMaintenance().Exec() # pylint: disable-msg=E0602
644 notepad = WatcherState(statefile)
647 client = cli.GetClient()
648 except errors.OpPrereqError:
649 # this is, from cli.GetClient, a not-master case
650 logging.debug("Not on master, exiting")
652 return constants.EXIT_SUCCESS
653 except luxi.NoMasterError, err:
654 logging.warning("Master seems to be down (%s), trying to restart",
656 if not utils.EnsureDaemon(constants.MASTERD):
657 logging.critical("Can't start the master, exiting")
658 return constants.EXIT_FAILURE
659 # else retry the connection
660 client = cli.GetClient()
662 # we are on master now
663 utils.EnsureDaemon(constants.RAPI)
665 # If RAPI isn't responding to queries, try one restart.
666 logging.debug("Attempting to talk with RAPI.")
667 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
668 logging.warning("Couldn't get answer from Ganeti RAPI daemon."
669 " Restarting Ganeti RAPI.")
670 utils.StopDaemon(constants.RAPI)
671 utils.EnsureDaemon(constants.RAPI)
672 logging.debug("Second attempt to talk with RAPI")
673 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
674 logging.fatal("RAPI is not responding. Please investigate.")
675 logging.debug("Successfully talked to RAPI.")
678 watcher = Watcher(options, notepad)
679 except errors.ConfigurationError:
680 # Just exit if there's no configuration
682 return constants.EXIT_SUCCESS
691 logging.debug("Not updating status file due to failure")
694 except NotMasterError:
695 logging.debug("Not master, exiting")
696 return constants.EXIT_NOTMASTER
697 except errors.ResolverError, err:
698 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
699 return constants.EXIT_NODESETUP_ERROR
700 except errors.JobQueueFull:
701 logging.error("Job queue is full, can't query cluster state")
702 except errors.JobQueueDrainError:
703 logging.error("Job queue is drained, can't maintain cluster state")
704 except Exception, err:
705 logging.exception(str(err))
706 return constants.EXIT_FAILURE
708 return constants.EXIT_SUCCESS