4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erroneously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
30 # pylint: disable-msg=C0103,W0142
32 # C0103: Invalid name ganeti-watcher
39 from optparse import OptionParser
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import compat
44 from ganeti import serializer
45 from ganeti import errors
46 from ganeti import opcodes
47 from ganeti import cli
48 from ganeti import luxi
49 from ganeti import ssconf
50 from ganeti import bdev
51 from ganeti import hypervisor
52 from ganeti import rapi
53 from ganeti.confd import client as confd_client
54 from ganeti import netutils
56 import ganeti.rapi.client # pylint: disable-msg=W0611
60 # Delete any record that is older than 8 hours; this value is based on
61 # the fact that the current retry counter is 5, and watcher runs every
62 # 5 minutes, so it takes around half an hour to exceed the retry
63 # counter, so 8 hours (16*1/2h) seems like a reasonable reset time
64 RETRY_EXPIRATION = 8 * 3600
65 BAD_STATES = [constants.INSTST_ERRORDOWN]
66 HELPLESS_STATES = [constants.INSTST_NODEDOWN, constants.INSTST_NODEOFFLINE]
69 KEY_RESTART_COUNT = "restart_count"
70 KEY_RESTART_WHEN = "restart_when"
71 KEY_BOOT_ID = "bootid"
74 # Global client object
78 class NotMasterError(errors.GenericError):
79 """Exception raised when this host is not the master."""
83 """Check whether we should pause.
86 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
89 def StartNodeDaemons():
90 """Start all the daemons that should be running on all nodes.
93 # on master or not, try to start the node daemon
94 utils.EnsureDaemon(constants.NODED)
95 # start confd as well. On non candidates it will be in disabled mode.
96 utils.EnsureDaemon(constants.CONFD)
99 def RunWatcherHooks():
100 """Run the watcher hooks.
103 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
104 constants.HOOKS_NAME_WATCHER)
105 if not os.path.isdir(hooks_dir):
109 results = utils.RunParts(hooks_dir)
110 except Exception, msg: # pylint: disable-msg=W0703
111 logging.critical("RunParts %s failed: %s", hooks_dir, msg)
113 for (relname, status, runresult) in results:
114 if status == constants.RUNPARTS_SKIP:
115 logging.debug("Watcher hook %s: skipped", relname)
116 elif status == constants.RUNPARTS_ERR:
117 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
118 elif status == constants.RUNPARTS_RUN:
120 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
121 relname, runresult.exit_code, runresult.output)
123 logging.debug("Watcher hook %s: success (output: %s)", relname,
127 class NodeMaintenance(object):
128 """Talks to confd daemons and possible shutdown instances/drbd devices.
132 self.store_cb = confd_client.StoreResultCallback()
133 self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
134 self.confd_client = confd_client.GetConfdClient(self.filter_cb)
138 """Checks whether node maintenance should run.
142 return ssconf.SimpleStore().GetMaintainNodeHealth()
143 except errors.ConfigurationError, err:
144 logging.error("Configuration error, not activating node maintenance: %s",
149 def GetRunningInstances():
150 """Compute list of hypervisor/running instances.
153 hyp_list = ssconf.SimpleStore().GetHypervisorList()
155 for hv_name in hyp_list:
157 hv = hypervisor.GetHypervisor(hv_name)
158 ilist = hv.ListInstances()
159 results.extend([(iname, hv_name) for iname in ilist])
160 except: # pylint: disable-msg=W0702
161 logging.error("Error while listing instances for hypervisor %s",
162 hv_name, exc_info=True)
167 """Get list of used DRBD minors.
170 return bdev.DRBD8.GetUsedDevs().keys()
173 def DoMaintenance(cls, role):
174 """Maintain the instance list.
177 if role == constants.CONFD_NODE_ROLE_OFFLINE:
178 inst_running = cls.GetRunningInstances()
179 cls.ShutdownInstances(inst_running)
180 drbd_running = cls.GetUsedDRBDs()
181 cls.ShutdownDRBD(drbd_running)
183 logging.debug("Not doing anything for role %s", role)
186 def ShutdownInstances(inst_running):
187 """Shutdown running instances.
190 names_running = set([i[0] for i in inst_running])
192 logging.info("Following instances should not be running,"
193 " shutting them down: %s", utils.CommaJoin(names_running))
194 # this dictionary will collapse duplicate instance names (only
195 # xen pvm/vhm) into a single key, which is fine
196 i2h = dict(inst_running)
197 for name in names_running:
199 hv = hypervisor.GetHypervisor(hv_name)
200 hv.StopInstance(None, force=True, name=name)
203 def ShutdownDRBD(drbd_running):
204 """Shutdown active DRBD devices.
208 logging.info("Following DRBD minors should not be active,"
209 " shutting them down: %s", utils.CommaJoin(drbd_running))
210 for minor in drbd_running:
211 # pylint: disable-msg=W0212
212 # using the private method as is, pending enhancements to the DRBD
214 bdev.DRBD8._ShutdownAll(minor)
217 """Check node status versus cluster desired state.
220 my_name = netutils.Hostname.GetSysName()
221 req = confd_client.ConfdClientRequest(type=
222 constants.CONFD_REQ_NODE_ROLE_BYNAME,
224 self.confd_client.SendRequest(req, async=False, coverage=-1)
225 timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
227 # should have a valid response
228 status, result = self.store_cb.GetResponse(req.rsalt)
229 assert status, "Missing result but received replies"
230 if not self.filter_cb.consistent[req.rsalt]:
231 logging.warning("Inconsistent replies, not doing anything")
233 self.DoMaintenance(result.server_reply.answer)
235 logging.warning("Confd query timed out, cannot do maintenance actions")
238 class WatcherState(object):
239 """Interface to a state file recording restart attempts.
242 def __init__(self, statefile):
243 """Open, lock, read and parse the file.
245 @type statefile: file
246 @param statefile: State file object
249 self.statefile = statefile
252 state_data = self.statefile.read()
256 self._data = serializer.Load(state_data)
257 except Exception, msg: # pylint: disable-msg=W0703
258 # Ignore errors while loading the file and treat it as empty
260 logging.warning(("Invalid state file. Using defaults."
261 " Error message: %s"), msg)
263 if "instance" not in self._data:
264 self._data["instance"] = {}
265 if "node" not in self._data:
266 self._data["node"] = {}
268 self._orig_data = serializer.Dump(self._data)
271 """Save state to file, then unlock and close it.
274 assert self.statefile
276 serialized_form = serializer.Dump(self._data)
277 if self._orig_data == serialized_form:
278 logging.debug("Data didn't change, just touching status file")
279 os.utime(constants.WATCHER_STATEFILE, None)
282 # We need to make sure the file is locked before renaming it, otherwise
283 # starting ganeti-watcher again at the same time will create a conflict.
284 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
285 data=serialized_form,
286 prewrite=utils.LockFile, close=False)
287 self.statefile = os.fdopen(fd, 'w+')
290 """Unlock configuration file and close it.
293 assert self.statefile
295 # Files are automatically unlocked when closing them
296 self.statefile.close()
297 self.statefile = None
299 def GetNodeBootID(self, name):
300 """Returns the last boot ID of a node or None.
303 ndata = self._data["node"]
305 if name in ndata and KEY_BOOT_ID in ndata[name]:
306 return ndata[name][KEY_BOOT_ID]
309 def SetNodeBootID(self, name, bootid):
310 """Sets the boot ID of a node.
315 ndata = self._data["node"]
317 if name not in ndata:
320 ndata[name][KEY_BOOT_ID] = bootid
322 def NumberOfRestartAttempts(self, instance):
323 """Returns number of previous restart attempts.
325 @type instance: L{Instance}
326 @param instance: the instance to look up
329 idata = self._data["instance"]
331 if instance.name in idata:
332 return idata[instance.name][KEY_RESTART_COUNT]
336 def MaintainInstanceList(self, instances):
337 """Perform maintenance on the recorded instances.
339 @type instances: list of string
340 @param instances: the list of currently existing instances
343 idict = self._data["instance"]
344 # First, delete obsolete instances
345 obsolete_instances = set(idict).difference(instances)
346 for inst in obsolete_instances:
347 logging.debug("Forgetting obsolete instance %s", inst)
350 # Second, delete expired records
351 earliest = time.time() - RETRY_EXPIRATION
352 expired_instances = [i for i in idict
353 if idict[i][KEY_RESTART_WHEN] < earliest]
354 for inst in expired_instances:
355 logging.debug("Expiring record for instance %s", inst)
358 def RecordRestartAttempt(self, instance):
359 """Record a restart attempt.
361 @type instance: L{Instance}
362 @param instance: the instance being restarted
365 idata = self._data["instance"]
367 if instance.name not in idata:
368 inst = idata[instance.name] = {}
370 inst = idata[instance.name]
372 inst[KEY_RESTART_WHEN] = time.time()
373 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
375 def RemoveInstance(self, instance):
376 """Update state to reflect that a machine is running.
378 This method removes the record for a named instance (as we only
379 track down instances).
381 @type instance: L{Instance}
382 @param instance: the instance to remove from books
385 idata = self._data["instance"]
387 if instance.name in idata:
388 del idata[instance.name]
391 class Instance(object):
392 """Abstraction for a Virtual Machine instance.
395 def __init__(self, name, state, autostart, snodes):
398 self.autostart = autostart
402 """Encapsulates the start of an instance.
405 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
406 cli.SubmitOpCode(op, cl=client)
408 def ActivateDisks(self):
409 """Encapsulates the activation of all disks of an instance.
412 op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
413 cli.SubmitOpCode(op, cl=client)
416 def GetClusterData():
417 """Get a list of instances on this cluster.
420 op1_fields = ["name", "status", "admin_state", "snodes"]
421 op1 = opcodes.OpInstanceQuery(output_fields=op1_fields, names=[],
423 op2_fields = ["name", "bootid", "offline"]
424 op2 = opcodes.OpNodeQuery(output_fields=op2_fields, names=[],
427 job_id = client.SubmitJob([op1, op2])
429 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
431 logging.debug("Got data from cluster, writing instance status file")
433 result = all_results[0]
439 up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
440 utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
442 for fields in result:
443 (name, status, autostart, snodes) = fields
445 # update the secondary node map
449 smap[node].append(name)
451 instances[name] = Instance(name, status, autostart, snodes)
453 nodes = dict([(name, (bootid, offline))
454 for name, bootid, offline in all_results[1]])
456 client.ArchiveJob(job_id)
458 return instances, nodes, smap
461 class Watcher(object):
462 """Encapsulate the logic for restarting erroneously halted virtual machines.
464 The calling program should periodically instantiate me and call Run().
465 This will traverse the list of instances, and make up to MAXTRIES attempts
466 to restart machines that are down.
469 def __init__(self, opts, notepad):
470 self.notepad = notepad
471 master = client.QueryConfigValues(["master_node"])[0]
472 if master != netutils.Hostname.GetSysName():
473 raise NotMasterError("This is not the master node")
474 # first archive old jobs
475 self.ArchiveJobs(opts.job_age)
476 # and only then submit new ones
477 self.instances, self.bootids, self.smap = GetClusterData()
478 self.started_instances = set()
482 """Watcher run sequence.
485 notepad = self.notepad
486 self.CheckInstances(notepad)
487 self.CheckDisks(notepad)
491 def ArchiveJobs(age):
495 arch_count, left_count = client.AutoArchiveJobs(age)
496 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
498 def CheckDisks(self, notepad):
499 """Check all nodes for restarted ones.
503 for name, (new_id, offline) in self.bootids.iteritems():
504 old = notepad.GetNodeBootID(name)
506 # Bad node, not returning a boot id
508 logging.debug("Node %s missing boot id, skipping secondary checks",
512 # Node's boot ID has changed, proably through a reboot.
513 check_nodes.append(name)
516 # Activate disks for all instances with any of the checked nodes as a
518 for node in check_nodes:
519 if node not in self.smap:
521 for instance_name in self.smap[node]:
522 instance = self.instances[instance_name]
523 if not instance.autostart:
524 logging.info(("Skipping disk activation for non-autostart"
525 " instance %s"), instance.name)
527 if instance.name in self.started_instances:
528 # we already tried to start the instance, which should have
529 # activated its drives (if they can be at all)
530 logging.debug("Skipping disk activation for instance %s, as"
531 " it was already started", instance.name)
534 logging.info("Activating disks for instance %s", instance.name)
535 instance.ActivateDisks()
536 except Exception: # pylint: disable-msg=W0703
537 logging.exception("Error while activating disks for instance %s",
540 # Keep changed boot IDs
541 for name in check_nodes:
542 notepad.SetNodeBootID(name, self.bootids[name][0])
544 def CheckInstances(self, notepad):
545 """Make a pass over the list of instances, restarting downed ones.
548 notepad.MaintainInstanceList(self.instances.keys())
550 for instance in self.instances.values():
551 if instance.state in BAD_STATES:
552 n = notepad.NumberOfRestartAttempts(instance)
555 logging.warning("Not restarting instance %s, retries exhausted",
559 last = " (Attempt #%d)" % (n + 1)
561 notepad.RecordRestartAttempt(instance)
562 logging.error("Could not restart %s after %d attempts, giving up",
563 instance.name, MAXTRIES)
566 logging.info("Restarting %s%s", instance.name, last)
568 self.started_instances.add(instance.name)
569 except Exception: # pylint: disable-msg=W0703
570 logging.exception("Error while restarting instance %s",
573 notepad.RecordRestartAttempt(instance)
574 elif instance.state in HELPLESS_STATES:
575 if notepad.NumberOfRestartAttempts(instance):
576 notepad.RemoveInstance(instance)
578 if notepad.NumberOfRestartAttempts(instance):
579 notepad.RemoveInstance(instance)
580 logging.info("Restart of %s succeeded", instance.name)
582 def _CheckForOfflineNodes(self, instance):
583 """Checks if given instances has any secondary in offline status.
585 @param instance: The instance object
586 @return: True if any of the secondary is offline, False otherwise
590 for node in instance.snodes:
591 bootids.append(self.bootids[node])
593 return compat.any(offline for (_, offline) in bootids)
595 def VerifyDisks(self):
596 """Run gnt-cluster verify-disks.
599 op = opcodes.OpClusterVerifyDisks()
600 job_id = client.SubmitJob([op])
601 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
602 client.ArchiveJob(job_id)
603 if not isinstance(result, (tuple, list)):
604 logging.error("Can't get a valid result from verify-disks")
606 offline_disk_instances = result[1]
607 if not offline_disk_instances:
609 logging.debug("verify-disks reported no offline disks, nothing to do")
611 logging.debug("Will activate disks for instance(s) %s",
612 utils.CommaJoin(offline_disk_instances))
613 # we submit only one job, and wait for it. not optimal, but spams
616 for name in offline_disk_instances:
617 instance = self.instances[name]
618 if (instance.state in HELPLESS_STATES or
619 self._CheckForOfflineNodes(instance)):
620 logging.info("Skip instance %s because it is in helpless state or has"
621 " one offline secondary", name)
623 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
626 job_id = cli.SendJob(job, cl=client)
629 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
630 except Exception: # pylint: disable-msg=W0703
631 logging.exception("Error while activating disks")
634 def OpenStateFile(path):
635 """Opens the state file and acquires a lock on it.
638 @param path: Path to state file
641 # The two-step dance below is necessary to allow both opening existing
642 # file read/write and creating if not existing. Vanilla open will truncate
643 # an existing file -or- allow creating if not existing.
644 statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
646 # Try to acquire lock on state file. If this fails, another watcher instance
647 # might already be running or another program is temporarily blocking the
648 # watcher from running.
650 utils.LockFile(statefile_fd)
651 except errors.LockError, err:
652 logging.error("Can't acquire lock on state file %s: %s", path, err)
655 return os.fdopen(statefile_fd, "w+")
658 def IsRapiResponding(hostname):
659 """Connects to RAPI port and does a simple test.
661 Connects to RAPI port of hostname and does a simple test. At this time, the
664 @type hostname: string
665 @param hostname: hostname of the node to connect to.
667 @return: Whether RAPI is working properly
670 curl_config = rapi.client.GenericCurlConfig()
671 rapi_client = rapi.client.GanetiRapiClient(hostname,
672 curl_config_fn=curl_config)
674 master_version = rapi_client.GetVersion()
675 except rapi.client.CertificateError, err:
676 logging.warning("RAPI Error: CertificateError (%s)", err)
678 except rapi.client.GanetiApiError, err:
679 logging.warning("RAPI Error: GanetiApiError (%s)", err)
681 logging.debug("RAPI Result: master_version is %s", master_version)
682 return master_version == constants.RAPI_VERSION
686 """Parse the command line options.
688 @return: (options, args) as from OptionParser.parse_args()
691 parser = OptionParser(description="Ganeti cluster watcher",
693 version="%%prog (ganeti) %s" %
694 constants.RELEASE_VERSION)
696 parser.add_option(cli.DEBUG_OPT)
697 parser.add_option("-A", "--job-age", dest="job_age",
698 help="Autoarchive jobs older than this age (default"
699 " 6 hours)", default=6*3600)
700 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
701 action="store_true", help="Ignore cluster pause setting")
702 options, args = parser.parse_args()
703 options.job_age = cli.ParseTimespec(options.job_age)
707 @rapi.client.UsesRapiClient
712 global client # pylint: disable-msg=W0603
714 options, args = ParseOptions()
716 if args: # watcher doesn't take any arguments
717 print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
718 return constants.EXIT_FAILURE
720 utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
721 debug=options.debug, stderr_logging=options.debug)
723 if ShouldPause() and not options.ignore_pause:
724 logging.debug("Pause has been set, exiting")
725 return constants.EXIT_SUCCESS
727 statefile = OpenStateFile(constants.WATCHER_STATEFILE)
729 return constants.EXIT_FAILURE
735 # run node maintenance in all cases, even if master, so that old
736 # masters can be properly cleaned up too
737 if NodeMaintenance.ShouldRun():
738 NodeMaintenance().Exec()
740 notepad = WatcherState(statefile)
743 client = cli.GetClient()
744 except errors.OpPrereqError:
745 # this is, from cli.GetClient, a not-master case
746 logging.debug("Not on master, exiting")
748 return constants.EXIT_SUCCESS
749 except luxi.NoMasterError, err:
750 logging.warning("Master seems to be down (%s), trying to restart",
752 if not utils.EnsureDaemon(constants.MASTERD):
753 logging.critical("Can't start the master, exiting")
754 return constants.EXIT_FAILURE
755 # else retry the connection
756 client = cli.GetClient()
758 # we are on master now
759 utils.EnsureDaemon(constants.RAPI)
761 # If RAPI isn't responding to queries, try one restart.
762 logging.debug("Attempting to talk with RAPI.")
763 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
764 logging.warning("Couldn't get answer from Ganeti RAPI daemon."
765 " Restarting Ganeti RAPI.")
766 utils.StopDaemon(constants.RAPI)
767 utils.EnsureDaemon(constants.RAPI)
768 logging.debug("Second attempt to talk with RAPI")
769 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
770 logging.fatal("RAPI is not responding. Please investigate.")
771 logging.debug("Successfully talked to RAPI.")
774 watcher = Watcher(options, notepad)
775 except errors.ConfigurationError:
776 # Just exit if there's no configuration
778 return constants.EXIT_SUCCESS
787 logging.debug("Not updating status file due to failure")
790 except NotMasterError:
791 logging.debug("Not master, exiting")
792 return constants.EXIT_NOTMASTER
793 except errors.ResolverError, err:
794 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
795 return constants.EXIT_NODESETUP_ERROR
796 except errors.JobQueueFull:
797 logging.error("Job queue is full, can't query cluster state")
798 except errors.JobQueueDrainError:
799 logging.error("Job queue is drained, can't maintain cluster state")
800 except Exception, err:
801 logging.exception(str(err))
802 return constants.EXIT_FAILURE
804 return constants.EXIT_SUCCESS