4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erroneously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
30 # pylint: disable-msg=C0103,W0142
32 # C0103: Invalid name ganeti-watcher
39 from optparse import OptionParser
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import compat
44 from ganeti import serializer
45 from ganeti import errors
46 from ganeti import opcodes
47 from ganeti import cli
48 from ganeti import luxi
49 from ganeti import ssconf
50 from ganeti import bdev
51 from ganeti import hypervisor
52 from ganeti import rapi
53 from ganeti.confd import client as confd_client
54 from ganeti import netutils
56 import ganeti.rapi.client # pylint: disable-msg=W0611
60 # Delete any record that is older than 8 hours; this value is based on
61 # the fact that the current retry counter is 5, and watcher runs every
62 # 5 minutes, so it takes around half an hour to exceed the retry
63 # counter, so 8 hours (16*1/2h) seems like a reasonable reset time
64 RETRY_EXPIRATION = 8 * 3600
65 BAD_STATES = [constants.INSTST_ERRORDOWN]
66 HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
69 KEY_RESTART_COUNT = "restart_count"
70 KEY_RESTART_WHEN = "restart_when"
71 KEY_BOOT_ID = "bootid"
74 # Global LUXI client object
78 class NotMasterError(errors.GenericError):
79 """Exception raised when this host is not the master."""
83 """Check whether we should pause.
86 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
89 def StartNodeDaemons():
90 """Start all the daemons that should be running on all nodes.
93 # on master or not, try to start the node daemon
94 utils.EnsureDaemon(constants.NODED)
95 # start confd as well. On non candidates it will be in disabled mode.
96 utils.EnsureDaemon(constants.CONFD)
99 def RunWatcherHooks():
100 """Run the watcher hooks.
103 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104 constants.HOOKS_NAME_WATCHER)
105 if not os.path.isdir(hooks_dir):
109 results = utils.RunParts(hooks_dir)
110 except Exception: # pylint: disable-msg=W0703
111 logging.exception("RunParts %s failed: %s", hooks_dir)
114 for (relname, status, runresult) in results:
115 if status == constants.RUNPARTS_SKIP:
116 logging.debug("Watcher hook %s: skipped", relname)
117 elif status == constants.RUNPARTS_ERR:
118 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
119 elif status == constants.RUNPARTS_RUN:
121 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
122 relname, runresult.exit_code, runresult.output)
124 logging.debug("Watcher hook %s: success (output: %s)", relname,
128 class NodeMaintenance(object):
129 """Talks to confd daemons and possible shutdown instances/drbd devices.
133 self.store_cb = confd_client.StoreResultCallback()
134 self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
135 self.confd_client = confd_client.GetConfdClient(self.filter_cb)
139 """Checks whether node maintenance should run.
143 return ssconf.SimpleStore().GetMaintainNodeHealth()
144 except errors.ConfigurationError, err:
145 logging.error("Configuration error, not activating node maintenance: %s",
150 def GetRunningInstances():
151 """Compute list of hypervisor/running instances.
154 hyp_list = ssconf.SimpleStore().GetHypervisorList()
156 for hv_name in hyp_list:
158 hv = hypervisor.GetHypervisor(hv_name)
159 ilist = hv.ListInstances()
160 results.extend([(iname, hv_name) for iname in ilist])
161 except: # pylint: disable-msg=W0702
162 logging.error("Error while listing instances for hypervisor %s",
163 hv_name, exc_info=True)
168 """Get list of used DRBD minors.
171 return bdev.DRBD8.GetUsedDevs().keys()
174 def DoMaintenance(cls, role):
175 """Maintain the instance list.
178 if role == constants.CONFD_NODE_ROLE_OFFLINE:
179 inst_running = cls.GetRunningInstances()
180 cls.ShutdownInstances(inst_running)
181 drbd_running = cls.GetUsedDRBDs()
182 cls.ShutdownDRBD(drbd_running)
184 logging.debug("Not doing anything for role %s", role)
187 def ShutdownInstances(inst_running):
188 """Shutdown running instances.
191 names_running = set([i[0] for i in inst_running])
193 logging.info("Following instances should not be running,"
194 " shutting them down: %s", utils.CommaJoin(names_running))
195 # this dictionary will collapse duplicate instance names (only
196 # xen pvm/vhm) into a single key, which is fine
197 i2h = dict(inst_running)
198 for name in names_running:
200 hv = hypervisor.GetHypervisor(hv_name)
201 hv.StopInstance(None, force=True, name=name)
204 def ShutdownDRBD(drbd_running):
205 """Shutdown active DRBD devices.
209 logging.info("Following DRBD minors should not be active,"
210 " shutting them down: %s", utils.CommaJoin(drbd_running))
211 for minor in drbd_running:
212 # pylint: disable-msg=W0212
213 # using the private method as is, pending enhancements to the DRBD
215 bdev.DRBD8._ShutdownAll(minor)
218 """Check node status versus cluster desired state.
221 my_name = netutils.Hostname.GetSysName()
222 req = confd_client.ConfdClientRequest(type=
223 constants.CONFD_REQ_NODE_ROLE_BYNAME,
225 self.confd_client.SendRequest(req, async=False, coverage=-1)
226 timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
228 # should have a valid response
229 status, result = self.store_cb.GetResponse(req.rsalt)
230 assert status, "Missing result but received replies"
231 if not self.filter_cb.consistent[req.rsalt]:
232 logging.warning("Inconsistent replies, not doing anything")
234 self.DoMaintenance(result.server_reply.answer)
236 logging.warning("Confd query timed out, cannot do maintenance actions")
239 class WatcherState(object):
240 """Interface to a state file recording restart attempts.
243 def __init__(self, statefile):
244 """Open, lock, read and parse the file.
246 @type statefile: file
247 @param statefile: State file object
250 self.statefile = statefile
253 state_data = self.statefile.read()
257 self._data = serializer.Load(state_data)
258 except Exception, msg: # pylint: disable-msg=W0703
259 # Ignore errors while loading the file and treat it as empty
261 logging.warning(("Invalid state file. Using defaults."
262 " Error message: %s"), msg)
264 if "instance" not in self._data:
265 self._data["instance"] = {}
266 if "node" not in self._data:
267 self._data["node"] = {}
269 self._orig_data = serializer.Dump(self._data)
272 """Save state to file, then unlock and close it.
275 assert self.statefile
277 serialized_form = serializer.Dump(self._data)
278 if self._orig_data == serialized_form:
279 logging.debug("Data didn't change, just touching status file")
280 os.utime(constants.WATCHER_STATEFILE, None)
283 # We need to make sure the file is locked before renaming it, otherwise
284 # starting ganeti-watcher again at the same time will create a conflict.
285 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
286 data=serialized_form,
287 prewrite=utils.LockFile, close=False)
288 self.statefile = os.fdopen(fd, 'w+')
291 """Unlock configuration file and close it.
294 assert self.statefile
296 # Files are automatically unlocked when closing them
297 self.statefile.close()
298 self.statefile = None
300 def GetNodeBootID(self, name):
301 """Returns the last boot ID of a node or None.
304 ndata = self._data["node"]
306 if name in ndata and KEY_BOOT_ID in ndata[name]:
307 return ndata[name][KEY_BOOT_ID]
310 def SetNodeBootID(self, name, bootid):
311 """Sets the boot ID of a node.
316 ndata = self._data["node"]
318 if name not in ndata:
321 ndata[name][KEY_BOOT_ID] = bootid
323 def NumberOfRestartAttempts(self, instance):
324 """Returns number of previous restart attempts.
326 @type instance: L{Instance}
327 @param instance: the instance to look up
330 idata = self._data["instance"]
332 if instance.name in idata:
333 return idata[instance.name][KEY_RESTART_COUNT]
337 def MaintainInstanceList(self, instances):
338 """Perform maintenance on the recorded instances.
340 @type instances: list of string
341 @param instances: the list of currently existing instances
344 idict = self._data["instance"]
345 # First, delete obsolete instances
346 obsolete_instances = set(idict).difference(instances)
347 for inst in obsolete_instances:
348 logging.debug("Forgetting obsolete instance %s", inst)
351 # Second, delete expired records
352 earliest = time.time() - RETRY_EXPIRATION
353 expired_instances = [i for i in idict
354 if idict[i][KEY_RESTART_WHEN] < earliest]
355 for inst in expired_instances:
356 logging.debug("Expiring record for instance %s", inst)
359 def RecordRestartAttempt(self, instance):
360 """Record a restart attempt.
362 @type instance: L{Instance}
363 @param instance: the instance being restarted
366 idata = self._data["instance"]
368 if instance.name not in idata:
369 inst = idata[instance.name] = {}
371 inst = idata[instance.name]
373 inst[KEY_RESTART_WHEN] = time.time()
374 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
376 def RemoveInstance(self, instance):
377 """Update state to reflect that a machine is running.
379 This method removes the record for a named instance (as we only
380 track down instances).
382 @type instance: L{Instance}
383 @param instance: the instance to remove from books
386 idata = self._data["instance"]
388 if instance.name in idata:
389 del idata[instance.name]
392 class Instance(object):
393 """Abstraction for a Virtual Machine instance.
396 def __init__(self, name, state, autostart, snodes):
399 self.autostart = autostart
403 """Encapsulates the start of an instance.
406 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
407 cli.SubmitOpCode(op, cl=client)
409 def ActivateDisks(self):
410 """Encapsulates the activation of all disks of an instance.
413 op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
414 cli.SubmitOpCode(op, cl=client)
417 def GetClusterData():
418 """Get a list of instances on this cluster.
421 op1_fields = ["name", "status", "admin_state", "snodes"]
422 op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
424 op2_fields = ["name", "bootid", "offline"]
425 op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
428 job_id = client.SubmitJob([op1, op2])
430 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
432 logging.debug("Got data from cluster, writing instance status file")
434 result = all_results[0]
440 up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
441 utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
443 for fields in result:
444 (name, status, autostart, snodes) = fields
446 # update the secondary node map
450 smap[node].append(name)
452 instances[name] = Instance(name, status, autostart, snodes)
454 nodes = dict([(name, (bootid, offline))
455 for name, bootid, offline in all_results[1]])
457 client.ArchiveJob(job_id)
459 return instances, nodes, smap
462 class Watcher(object):
463 """Encapsulate the logic for restarting erroneously halted virtual machines.
465 The calling program should periodically instantiate me and call Run().
466 This will traverse the list of instances, and make up to MAXTRIES attempts
467 to restart machines that are down.
470 def __init__(self, opts, notepad):
471 self.notepad = notepad
472 master = client.QueryConfigValues(["master_node"])[0]
473 if master != netutils.Hostname.GetSysName():
474 raise NotMasterError("This is not the master node")
475 # first archive old jobs
476 self.ArchiveJobs(opts.job_age)
477 # and only then submit new ones
478 self.instances, self.bootids, self.smap = GetClusterData()
479 self.started_instances = set()
483 """Watcher run sequence.
486 notepad = self.notepad
487 self.CheckInstances(notepad)
488 self.CheckDisks(notepad)
492 def ArchiveJobs(age):
496 arch_count, left_count = client.AutoArchiveJobs(age)
497 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
499 def CheckDisks(self, notepad):
500 """Check all nodes for restarted ones.
504 for name, (new_id, offline) in self.bootids.iteritems():
505 old = notepad.GetNodeBootID(name)
507 # Bad node, not returning a boot id
509 logging.debug("Node %s missing boot id, skipping secondary checks",
513 # Node's boot ID has changed, proably through a reboot.
514 check_nodes.append(name)
517 # Activate disks for all instances with any of the checked nodes as a
519 for node in check_nodes:
520 if node not in self.smap:
522 for instance_name in self.smap[node]:
523 instance = self.instances[instance_name]
524 if not instance.autostart:
525 logging.info(("Skipping disk activation for non-autostart"
526 " instance %s"), instance.name)
528 if instance.name in self.started_instances:
529 # we already tried to start the instance, which should have
530 # activated its drives (if they can be at all)
531 logging.debug("Skipping disk activation for instance %s, as"
532 " it was already started", instance.name)
535 logging.info("Activating disks for instance %s", instance.name)
536 instance.ActivateDisks()
537 except Exception: # pylint: disable-msg=W0703
538 logging.exception("Error while activating disks for instance %s",
541 # Keep changed boot IDs
542 for name in check_nodes:
543 notepad.SetNodeBootID(name, self.bootids[name][0])
545 def CheckInstances(self, notepad):
546 """Make a pass over the list of instances, restarting downed ones.
549 notepad.MaintainInstanceList(self.instances.keys())
551 for instance in self.instances.values():
552 if instance.state in BAD_STATES:
553 n = notepad.NumberOfRestartAttempts(instance)
556 logging.warning("Not restarting instance %s, retries exhausted",
560 last = " (Attempt #%d)" % (n + 1)
562 notepad.RecordRestartAttempt(instance)
563 logging.error("Could not restart %s after %d attempts, giving up",
564 instance.name, MAXTRIES)
567 logging.info("Restarting %s%s", instance.name, last)
569 self.started_instances.add(instance.name)
570 except Exception: # pylint: disable-msg=W0703
571 logging.exception("Error while restarting instance %s",
574 notepad.RecordRestartAttempt(instance)
575 elif instance.state in HELPLESS_STATES:
576 if notepad.NumberOfRestartAttempts(instance):
577 notepad.RemoveInstance(instance)
579 if notepad.NumberOfRestartAttempts(instance):
580 notepad.RemoveInstance(instance)
581 logging.info("Restart of %s succeeded", instance.name)
583 def _CheckForOfflineNodes(self, instance):
584 """Checks if given instances has any secondary in offline status.
586 @param instance: The instance object
587 @return: True if any of the secondary is offline, False otherwise
591 for node in instance.snodes:
592 bootids.append(self.bootids[node])
594 return compat.any(offline for (_, offline) in bootids)
596 def VerifyDisks(self):
597 """Run gnt-cluster verify-disks.
600 job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
601 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
602 client.ArchiveJob(job_id)
604 # Keep track of submitted jobs
605 jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
608 for (status, job_id) in result[constants.JOB_IDS_KEY]:
609 jex.AddJobId(None, status, job_id)
611 archive_jobs.add(job_id)
613 offline_disk_instances = set()
615 for (status, result) in jex.GetResults():
617 logging.error("Verify-disks job failed: %s", result)
620 ((_, instances, _), ) = result
622 offline_disk_instances.update(instances)
624 for job_id in archive_jobs:
625 client.ArchiveJob(job_id)
627 if not offline_disk_instances:
629 logging.debug("verify-disks reported no offline disks, nothing to do")
632 logging.debug("Will activate disks for instance(s) %s",
633 utils.CommaJoin(offline_disk_instances))
635 # we submit only one job, and wait for it. not optimal, but spams
638 for name in offline_disk_instances:
639 instance = self.instances[name]
640 if (instance.state in HELPLESS_STATES or
641 self._CheckForOfflineNodes(instance)):
642 logging.info("Skip instance %s because it is in helpless state or has"
643 " one offline secondary", name)
645 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
648 job_id = cli.SendJob(job, cl=client)
651 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
652 except Exception: # pylint: disable-msg=W0703
653 logging.exception("Error while activating disks")
656 def OpenStateFile(path):
657 """Opens the state file and acquires a lock on it.
660 @param path: Path to state file
663 # The two-step dance below is necessary to allow both opening existing
664 # file read/write and creating if not existing. Vanilla open will truncate
665 # an existing file -or- allow creating if not existing.
666 statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
668 # Try to acquire lock on state file. If this fails, another watcher instance
669 # might already be running or another program is temporarily blocking the
670 # watcher from running.
672 utils.LockFile(statefile_fd)
673 except errors.LockError, err:
674 logging.error("Can't acquire lock on state file %s: %s", path, err)
677 return os.fdopen(statefile_fd, "w+")
680 def IsRapiResponding(hostname):
681 """Connects to RAPI port and does a simple test.
683 Connects to RAPI port of hostname and does a simple test. At this time, the
686 @type hostname: string
687 @param hostname: hostname of the node to connect to.
689 @return: Whether RAPI is working properly
692 curl_config = rapi.client.GenericCurlConfig()
693 rapi_client = rapi.client.GanetiRapiClient(hostname,
694 curl_config_fn=curl_config)
696 master_version = rapi_client.GetVersion()
697 except rapi.client.CertificateError, err:
698 logging.warning("RAPI Error: CertificateError (%s)", err)
700 except rapi.client.GanetiApiError, err:
701 logging.warning("RAPI Error: GanetiApiError (%s)", err)
703 logging.debug("RAPI Result: master_version is %s", master_version)
704 return master_version == constants.RAPI_VERSION
708 """Parse the command line options.
710 @return: (options, args) as from OptionParser.parse_args()
713 parser = OptionParser(description="Ganeti cluster watcher",
715 version="%%prog (ganeti) %s" %
716 constants.RELEASE_VERSION)
718 parser.add_option(cli.DEBUG_OPT)
719 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
720 help="Autoarchive jobs older than this age (default"
722 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
723 action="store_true", help="Ignore cluster pause setting")
724 options, args = parser.parse_args()
725 options.job_age = cli.ParseTimespec(options.job_age)
728 parser.error("No arguments expected")
730 return (options, args)
733 @rapi.client.UsesRapiClient
738 global client # pylint: disable-msg=W0603
740 (options, _) = ParseOptions()
742 utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
743 debug=options.debug, stderr_logging=options.debug)
745 if ShouldPause() and not options.ignore_pause:
746 logging.debug("Pause has been set, exiting")
747 return constants.EXIT_SUCCESS
749 statefile = OpenStateFile(constants.WATCHER_STATEFILE)
751 return constants.EXIT_FAILURE
757 # run node maintenance in all cases, even if master, so that old
758 # masters can be properly cleaned up too
759 if NodeMaintenance.ShouldRun():
760 NodeMaintenance().Exec()
762 notepad = WatcherState(statefile)
765 client = cli.GetClient()
766 except errors.OpPrereqError:
767 # this is, from cli.GetClient, a not-master case
768 logging.debug("Not on master, exiting")
770 return constants.EXIT_SUCCESS
771 except luxi.NoMasterError, err:
772 logging.warning("Master seems to be down (%s), trying to restart",
774 if not utils.EnsureDaemon(constants.MASTERD):
775 logging.critical("Can't start the master, exiting")
776 return constants.EXIT_FAILURE
777 # else retry the connection
778 client = cli.GetClient()
780 # we are on master now
781 utils.EnsureDaemon(constants.RAPI)
783 # If RAPI isn't responding to queries, try one restart.
784 logging.debug("Attempting to talk with RAPI.")
785 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
786 logging.warning("Couldn't get answer from Ganeti RAPI daemon."
787 " Restarting Ganeti RAPI.")
788 utils.StopDaemon(constants.RAPI)
789 utils.EnsureDaemon(constants.RAPI)
790 logging.debug("Second attempt to talk with RAPI")
791 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
792 logging.fatal("RAPI is not responding. Please investigate.")
793 logging.debug("Successfully talked to RAPI.")
796 watcher = Watcher(options, notepad)
797 except errors.ConfigurationError:
798 # Just exit if there's no configuration
800 return constants.EXIT_SUCCESS
809 logging.debug("Not updating status file due to failure")
812 except NotMasterError:
813 logging.debug("Not master, exiting")
814 return constants.EXIT_NOTMASTER
815 except errors.ResolverError, err:
816 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
817 return constants.EXIT_NODESETUP_ERROR
818 except errors.JobQueueFull:
819 logging.error("Job queue is full, can't query cluster state")
820 except errors.JobQueueDrainError:
821 logging.error("Job queue is drained, can't maintain cluster state")
822 except Exception, err:
823 logging.exception(str(err))
824 return constants.EXIT_FAILURE
826 return constants.EXIT_SUCCESS