4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erroneously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
30 # pylint: disable-msg=C0103,W0142
32 # C0103: Invalid name ganeti-watcher
38 from optparse import OptionParser
40 from ganeti import utils
41 from ganeti import constants
42 from ganeti import compat
43 from ganeti import serializer
44 from ganeti import errors
45 from ganeti import opcodes
46 from ganeti import cli
47 from ganeti import luxi
48 from ganeti import ssconf
49 from ganeti import bdev
50 from ganeti import hypervisor
51 from ganeti import rapi
52 from ganeti.confd import client as confd_client
53 from ganeti import netutils
55 import ganeti.rapi.client # pylint: disable-msg=W0611
59 # Delete any record that is older than 8 hours; this value is based on
60 # the fact that the current retry counter is 5, and watcher runs every
61 # 5 minutes, so it takes around half an hour to exceed the retry
62 # counter, so 8 hours (16*1/2h) seems like a reasonable reset time
63 RETRY_EXPIRATION = 8 * 3600
64 BAD_STATES = ['ERROR_down']
65 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
68 KEY_RESTART_COUNT = "restart_count"
69 KEY_RESTART_WHEN = "restart_when"
70 KEY_BOOT_ID = "bootid"
73 # Global client object
77 class NotMasterError(errors.GenericError):
78 """Exception raised when this host is not the master."""
82 """Check whether we should pause.
85 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
88 def StartNodeDaemons():
89 """Start all the daemons that should be running on all nodes.
92 # on master or not, try to start the node daemon
93 utils.EnsureDaemon(constants.NODED)
94 # start confd as well. On non candidates it will be in disabled mode.
95 utils.EnsureDaemon(constants.CONFD)
98 def RunWatcherHooks():
99 """Run the watcher hooks.
102 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
103 constants.HOOKS_NAME_WATCHER)
104 if not os.path.isdir(hooks_dir):
108 results = utils.RunParts(hooks_dir)
109 except Exception, msg: # pylint: disable-msg=W0703
110 logging.critical("RunParts %s failed: %s", hooks_dir, msg)
112 for (relname, status, runresult) in results:
113 if status == constants.RUNPARTS_SKIP:
114 logging.debug("Watcher hook %s: skipped", relname)
115 elif status == constants.RUNPARTS_ERR:
116 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
117 elif status == constants.RUNPARTS_RUN:
119 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
120 relname, runresult.exit_code, runresult.output)
122 logging.debug("Watcher hook %s: success (output: %s)", relname,
126 class NodeMaintenance(object):
127 """Talks to confd daemons and possible shutdown instances/drbd devices.
131 self.store_cb = confd_client.StoreResultCallback()
132 self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
133 self.confd_client = confd_client.GetConfdClient(self.filter_cb)
137 """Checks whether node maintenance should run.
141 return ssconf.SimpleStore().GetMaintainNodeHealth()
142 except errors.ConfigurationError, err:
143 logging.error("Configuration error, not activating node maintenance: %s",
148 def GetRunningInstances():
149 """Compute list of hypervisor/running instances.
152 hyp_list = ssconf.SimpleStore().GetHypervisorList()
154 for hv_name in hyp_list:
156 hv = hypervisor.GetHypervisor(hv_name)
157 ilist = hv.ListInstances()
158 results.extend([(iname, hv_name) for iname in ilist])
159 except: # pylint: disable-msg=W0702
160 logging.error("Error while listing instances for hypervisor %s",
161 hv_name, exc_info=True)
166 """Get list of used DRBD minors.
169 return bdev.DRBD8.GetUsedDevs().keys()
172 def DoMaintenance(cls, role):
173 """Maintain the instance list.
176 if role == constants.CONFD_NODE_ROLE_OFFLINE:
177 inst_running = cls.GetRunningInstances()
178 cls.ShutdownInstances(inst_running)
179 drbd_running = cls.GetUsedDRBDs()
180 cls.ShutdownDRBD(drbd_running)
182 logging.debug("Not doing anything for role %s", role)
185 def ShutdownInstances(inst_running):
186 """Shutdown running instances.
189 names_running = set([i[0] for i in inst_running])
191 logging.info("Following instances should not be running,"
192 " shutting them down: %s", utils.CommaJoin(names_running))
193 # this dictionary will collapse duplicate instance names (only
194 # xen pvm/vhm) into a single key, which is fine
195 i2h = dict(inst_running)
196 for name in names_running:
198 hv = hypervisor.GetHypervisor(hv_name)
199 hv.StopInstance(None, force=True, name=name)
202 def ShutdownDRBD(drbd_running):
203 """Shutdown active DRBD devices.
207 logging.info("Following DRBD minors should not be active,"
208 " shutting them down: %s", utils.CommaJoin(drbd_running))
209 for minor in drbd_running:
210 # pylint: disable-msg=W0212
211 # using the private method as is, pending enhancements to the DRBD
213 bdev.DRBD8._ShutdownAll(minor)
216 """Check node status versus cluster desired state.
219 my_name = netutils.Hostname.GetSysName()
220 req = confd_client.ConfdClientRequest(type=
221 constants.CONFD_REQ_NODE_ROLE_BYNAME,
223 self.confd_client.SendRequest(req, async=False, coverage=-1)
224 timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
226 # should have a valid response
227 status, result = self.store_cb.GetResponse(req.rsalt)
228 assert status, "Missing result but received replies"
229 if not self.filter_cb.consistent[req.rsalt]:
230 logging.warning("Inconsistent replies, not doing anything")
232 self.DoMaintenance(result.server_reply.answer)
234 logging.warning("Confd query timed out, cannot do maintenance actions")
237 class WatcherState(object):
238 """Interface to a state file recording restart attempts.
241 def __init__(self, statefile):
242 """Open, lock, read and parse the file.
244 @type statefile: file
245 @param statefile: State file object
248 self.statefile = statefile
251 state_data = self.statefile.read()
255 self._data = serializer.Load(state_data)
256 except Exception, msg: # pylint: disable-msg=W0703
257 # Ignore errors while loading the file and treat it as empty
259 logging.warning(("Invalid state file. Using defaults."
260 " Error message: %s"), msg)
262 if "instance" not in self._data:
263 self._data["instance"] = {}
264 if "node" not in self._data:
265 self._data["node"] = {}
267 self._orig_data = serializer.Dump(self._data)
270 """Save state to file, then unlock and close it.
273 assert self.statefile
275 serialized_form = serializer.Dump(self._data)
276 if self._orig_data == serialized_form:
277 logging.debug("Data didn't change, just touching status file")
278 os.utime(constants.WATCHER_STATEFILE, None)
281 # We need to make sure the file is locked before renaming it, otherwise
282 # starting ganeti-watcher again at the same time will create a conflict.
283 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
284 data=serialized_form,
285 prewrite=utils.LockFile, close=False)
286 self.statefile = os.fdopen(fd, 'w+')
289 """Unlock configuration file and close it.
292 assert self.statefile
294 # Files are automatically unlocked when closing them
295 self.statefile.close()
296 self.statefile = None
298 def GetNodeBootID(self, name):
299 """Returns the last boot ID of a node or None.
302 ndata = self._data["node"]
304 if name in ndata and KEY_BOOT_ID in ndata[name]:
305 return ndata[name][KEY_BOOT_ID]
308 def SetNodeBootID(self, name, bootid):
309 """Sets the boot ID of a node.
314 ndata = self._data["node"]
316 if name not in ndata:
319 ndata[name][KEY_BOOT_ID] = bootid
321 def NumberOfRestartAttempts(self, instance):
322 """Returns number of previous restart attempts.
324 @type instance: L{Instance}
325 @param instance: the instance to look up
328 idata = self._data["instance"]
330 if instance.name in idata:
331 return idata[instance.name][KEY_RESTART_COUNT]
335 def MaintainInstanceList(self, instances):
336 """Perform maintenance on the recorded instances.
338 @type instances: list of string
339 @param instances: the list of currently existing instances
342 idict = self._data["instance"]
343 # First, delete obsolete instances
344 obsolete_instances = set(idict).difference(instances)
345 for inst in obsolete_instances:
346 logging.debug("Forgetting obsolete instance %s", inst)
349 # Second, delete expired records
350 earliest = time.time() - RETRY_EXPIRATION
351 expired_instances = [i for i in idict
352 if idict[i][KEY_RESTART_WHEN] < earliest]
353 for inst in expired_instances:
354 logging.debug("Expiring record for instance %s", inst)
357 def RecordRestartAttempt(self, instance):
358 """Record a restart attempt.
360 @type instance: L{Instance}
361 @param instance: the instance being restarted
364 idata = self._data["instance"]
366 if instance.name not in idata:
367 inst = idata[instance.name] = {}
369 inst = idata[instance.name]
371 inst[KEY_RESTART_WHEN] = time.time()
372 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
374 def RemoveInstance(self, instance):
375 """Update state to reflect that a machine is running.
377 This method removes the record for a named instance (as we only
378 track down instances).
380 @type instance: L{Instance}
381 @param instance: the instance to remove from books
384 idata = self._data["instance"]
386 if instance.name in idata:
387 del idata[instance.name]
390 class Instance(object):
391 """Abstraction for a Virtual Machine instance.
394 def __init__(self, name, state, autostart, snodes):
397 self.autostart = autostart
401 """Encapsulates the start of an instance.
404 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
405 cli.SubmitOpCode(op, cl=client)
407 def ActivateDisks(self):
408 """Encapsulates the activation of all disks of an instance.
411 op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
412 cli.SubmitOpCode(op, cl=client)
415 def GetClusterData():
416 """Get a list of instances on this cluster.
419 op1_fields = ["name", "status", "admin_state", "snodes"]
420 op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
422 op2_fields = ["name", "bootid", "offline"]
423 op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
426 job_id = client.SubmitJob([op1, op2])
428 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
430 logging.debug("Got data from cluster, writing instance status file")
432 result = all_results[0]
438 up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
439 utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
441 for fields in result:
442 (name, status, autostart, snodes) = fields
444 # update the secondary node map
448 smap[node].append(name)
450 instances[name] = Instance(name, status, autostart, snodes)
452 nodes = dict([(name, (bootid, offline))
453 for name, bootid, offline in all_results[1]])
455 client.ArchiveJob(job_id)
457 return instances, nodes, smap
460 class Watcher(object):
461 """Encapsulate the logic for restarting erroneously halted virtual machines.
463 The calling program should periodically instantiate me and call Run().
464 This will traverse the list of instances, and make up to MAXTRIES attempts
465 to restart machines that are down.
468 def __init__(self, opts, notepad):
469 self.notepad = notepad
470 master = client.QueryConfigValues(["master_node"])[0]
471 if master != netutils.Hostname.GetSysName():
472 raise NotMasterError("This is not the master node")
473 # first archive old jobs
474 self.ArchiveJobs(opts.job_age)
475 # and only then submit new ones
476 self.instances, self.bootids, self.smap = GetClusterData()
477 self.started_instances = set()
481 """Watcher run sequence.
484 notepad = self.notepad
485 self.CheckInstances(notepad)
486 self.CheckDisks(notepad)
490 def ArchiveJobs(age):
494 arch_count, left_count = client.AutoArchiveJobs(age)
495 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
497 def CheckDisks(self, notepad):
498 """Check all nodes for restarted ones.
502 for name, (new_id, offline) in self.bootids.iteritems():
503 old = notepad.GetNodeBootID(name)
505 # Bad node, not returning a boot id
507 logging.debug("Node %s missing boot id, skipping secondary checks",
511 # Node's boot ID has changed, proably through a reboot.
512 check_nodes.append(name)
515 # Activate disks for all instances with any of the checked nodes as a
517 for node in check_nodes:
518 if node not in self.smap:
520 for instance_name in self.smap[node]:
521 instance = self.instances[instance_name]
522 if not instance.autostart:
523 logging.info(("Skipping disk activation for non-autostart"
524 " instance %s"), instance.name)
526 if instance.name in self.started_instances:
527 # we already tried to start the instance, which should have
528 # activated its drives (if they can be at all)
531 logging.info("Activating disks for instance %s", instance.name)
532 instance.ActivateDisks()
533 except Exception: # pylint: disable-msg=W0703
534 logging.exception("Error while activating disks for instance %s",
537 # Keep changed boot IDs
538 for name in check_nodes:
539 notepad.SetNodeBootID(name, self.bootids[name][0])
541 def CheckInstances(self, notepad):
542 """Make a pass over the list of instances, restarting downed ones.
545 notepad.MaintainInstanceList(self.instances.keys())
547 for instance in self.instances.values():
548 if instance.state in BAD_STATES:
549 n = notepad.NumberOfRestartAttempts(instance)
552 logging.warning("Not restarting instance %s, retries exhausted",
556 last = " (Attempt #%d)" % (n + 1)
558 notepad.RecordRestartAttempt(instance)
559 logging.error("Could not restart %s after %d attempts, giving up",
560 instance.name, MAXTRIES)
563 logging.info("Restarting %s%s",
566 self.started_instances.add(instance.name)
567 except Exception: # pylint: disable-msg=W0703
568 logging.exception("Error while restarting instance %s",
571 notepad.RecordRestartAttempt(instance)
572 elif instance.state in HELPLESS_STATES:
573 if notepad.NumberOfRestartAttempts(instance):
574 notepad.RemoveInstance(instance)
576 if notepad.NumberOfRestartAttempts(instance):
577 notepad.RemoveInstance(instance)
578 logging.info("Restart of %s succeeded", instance.name)
580 def _CheckForOfflineNodes(self, instance):
581 """Checks if given instances has any secondary in offline status.
583 @param instance: The instance object
584 @return: True if any of the secondary is offline, False otherwise
588 for node in instance.snodes:
589 bootids.append(self.bootids[node])
591 return compat.any(offline for (_, offline) in bootids)
593 def VerifyDisks(self):
594 """Run gnt-cluster verify-disks.
597 op = opcodes.OpClusterVerifyDisks()
598 job_id = client.SubmitJob([op])
599 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
600 client.ArchiveJob(job_id)
601 if not isinstance(result, (tuple, list)):
602 logging.error("Can't get a valid result from verify-disks")
604 offline_disk_instances = result[1]
605 if not offline_disk_instances:
608 logging.debug("Will activate disks for instances %s",
609 utils.CommaJoin(offline_disk_instances))
610 # we submit only one job, and wait for it. not optimal, but spams
613 for name in offline_disk_instances:
614 instance = self.instances[name]
615 if (instance.state in HELPLESS_STATES or
616 self._CheckForOfflineNodes(instance)):
617 logging.info("Skip instance %s because it is in helpless state or has"
618 " one offline secondary", name)
620 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
623 job_id = cli.SendJob(job, cl=client)
626 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
627 except Exception: # pylint: disable-msg=W0703
628 logging.exception("Error while activating disks")
631 def OpenStateFile(path):
632 """Opens the state file and acquires a lock on it.
635 @param path: Path to state file
638 # The two-step dance below is necessary to allow both opening existing
639 # file read/write and creating if not existing. Vanilla open will truncate
640 # an existing file -or- allow creating if not existing.
641 statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
643 # Try to acquire lock on state file. If this fails, another watcher instance
644 # might already be running or another program is temporarily blocking the
645 # watcher from running.
647 utils.LockFile(statefile_fd)
648 except errors.LockError, err:
649 logging.error("Can't acquire lock on state file %s: %s", path, err)
652 return os.fdopen(statefile_fd, "w+")
655 def IsRapiResponding(hostname):
656 """Connects to RAPI port and does a simple test.
658 Connects to RAPI port of hostname and does a simple test. At this time, the
661 @type hostname: string
662 @param hostname: hostname of the node to connect to.
664 @return: Whether RAPI is working properly
667 curl_config = rapi.client.GenericCurlConfig()
668 rapi_client = rapi.client.GanetiRapiClient(hostname,
669 curl_config_fn=curl_config)
671 master_version = rapi_client.GetVersion()
672 except rapi.client.CertificateError, err:
673 logging.warning("RAPI Error: CertificateError (%s)", err)
675 except rapi.client.GanetiApiError, err:
676 logging.warning("RAPI Error: GanetiApiError (%s)", err)
678 logging.debug("RAPI Result: master_version is %s", master_version)
679 return master_version == constants.RAPI_VERSION
683 """Parse the command line options.
685 @return: (options, args) as from OptionParser.parse_args()
688 parser = OptionParser(description="Ganeti cluster watcher",
690 version="%%prog (ganeti) %s" %
691 constants.RELEASE_VERSION)
693 parser.add_option(cli.DEBUG_OPT)
694 parser.add_option("-A", "--job-age", dest="job_age",
695 help="Autoarchive jobs older than this age (default"
696 " 6 hours)", default=6*3600)
697 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
698 action="store_true", help="Ignore cluster pause setting")
699 options, args = parser.parse_args()
700 options.job_age = cli.ParseTimespec(options.job_age)
704 @rapi.client.UsesRapiClient
709 global client # pylint: disable-msg=W0603
711 options, args = ParseOptions()
713 if args: # watcher doesn't take any arguments
714 print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
715 return constants.EXIT_FAILURE
717 utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
718 stderr_logging=options.debug)
720 if ShouldPause() and not options.ignore_pause:
721 logging.debug("Pause has been set, exiting")
722 return constants.EXIT_SUCCESS
724 statefile = OpenStateFile(constants.WATCHER_STATEFILE)
726 return constants.EXIT_FAILURE
732 # run node maintenance in all cases, even if master, so that old
733 # masters can be properly cleaned up too
734 if NodeMaintenance.ShouldRun():
735 NodeMaintenance().Exec()
737 notepad = WatcherState(statefile)
740 client = cli.GetClient()
741 except errors.OpPrereqError:
742 # this is, from cli.GetClient, a not-master case
743 logging.debug("Not on master, exiting")
745 return constants.EXIT_SUCCESS
746 except luxi.NoMasterError, err:
747 logging.warning("Master seems to be down (%s), trying to restart",
749 if not utils.EnsureDaemon(constants.MASTERD):
750 logging.critical("Can't start the master, exiting")
751 return constants.EXIT_FAILURE
752 # else retry the connection
753 client = cli.GetClient()
755 # we are on master now
756 utils.EnsureDaemon(constants.RAPI)
758 # If RAPI isn't responding to queries, try one restart.
759 logging.debug("Attempting to talk with RAPI.")
760 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
761 logging.warning("Couldn't get answer from Ganeti RAPI daemon."
762 " Restarting Ganeti RAPI.")
763 utils.StopDaemon(constants.RAPI)
764 utils.EnsureDaemon(constants.RAPI)
765 logging.debug("Second attempt to talk with RAPI")
766 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
767 logging.fatal("RAPI is not responding. Please investigate.")
768 logging.debug("Successfully talked to RAPI.")
771 watcher = Watcher(options, notepad)
772 except errors.ConfigurationError:
773 # Just exit if there's no configuration
775 return constants.EXIT_SUCCESS
784 logging.debug("Not updating status file due to failure")
787 except NotMasterError:
788 logging.debug("Not master, exiting")
789 return constants.EXIT_NOTMASTER
790 except errors.ResolverError, err:
791 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
792 return constants.EXIT_NODESETUP_ERROR
793 except errors.JobQueueFull:
794 logging.error("Job queue is full, can't query cluster state")
795 except errors.JobQueueDrainError:
796 logging.error("Job queue is drained, can't maintain cluster state")
797 except Exception, err:
798 logging.exception(str(err))
799 return constants.EXIT_FAILURE
801 return constants.EXIT_SUCCESS