4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erroneously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
30 # pylint: disable-msg=C0103,W0142
32 # C0103: Invalid name ganeti-watcher
38 from optparse import OptionParser
40 from ganeti import utils
41 from ganeti import constants
42 from ganeti import serializer
43 from ganeti import errors
44 from ganeti import opcodes
45 from ganeti import cli
46 from ganeti import luxi
47 from ganeti import ssconf
48 from ganeti import bdev
49 from ganeti import hypervisor
50 from ganeti import rapi
51 from ganeti.confd import client as confd_client
52 from ganeti import netutils
54 import ganeti.rapi.client # pylint: disable-msg=W0611
58 # Delete any record that is older than 8 hours; this value is based on
59 # the fact that the current retry counter is 5, and watcher runs every
60 # 5 minutes, so it takes around half an hour to exceed the retry
61 # counter, so 8 hours (16*1/2h) seems like a reasonable reset time
62 RETRY_EXPIRATION = 8 * 3600
63 BAD_STATES = ['ERROR_down']
64 HELPLESS_STATES = ['ERROR_nodedown', 'ERROR_nodeoffline']
67 KEY_RESTART_COUNT = "restart_count"
68 KEY_RESTART_WHEN = "restart_when"
69 KEY_BOOT_ID = "bootid"
72 # Global client object
76 class NotMasterError(errors.GenericError):
77 """Exception raised when this host is not the master."""
81 """Check whether we should pause.
84 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
87 def StartNodeDaemons():
88 """Start all the daemons that should be running on all nodes.
91 # on master or not, try to start the node daemon
92 utils.EnsureDaemon(constants.NODED)
93 # start confd as well. On non candidates it will be in disabled mode.
94 utils.EnsureDaemon(constants.CONFD)
97 def RunWatcherHooks():
98 """Run the watcher hooks.
101 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
102 constants.HOOKS_NAME_WATCHER)
103 if not os.path.isdir(hooks_dir):
107 results = utils.RunParts(hooks_dir)
108 except Exception, msg: # pylint: disable-msg=W0703
109 logging.critical("RunParts %s failed: %s", hooks_dir, msg)
111 for (relname, status, runresult) in results:
112 if status == constants.RUNPARTS_SKIP:
113 logging.debug("Watcher hook %s: skipped", relname)
114 elif status == constants.RUNPARTS_ERR:
115 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
116 elif status == constants.RUNPARTS_RUN:
118 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
119 relname, runresult.exit_code, runresult.output)
121 logging.debug("Watcher hook %s: success (output: %s)", relname,
125 class NodeMaintenance(object):
126 """Talks to confd daemons and possible shutdown instances/drbd devices.
130 self.store_cb = confd_client.StoreResultCallback()
131 self.filter_cb = confd_client.ConfdFilterCallback(self.store_cb)
132 self.confd_client = confd_client.GetConfdClient(self.filter_cb)
136 """Checks whether node maintenance should run.
140 return ssconf.SimpleStore().GetMaintainNodeHealth()
141 except errors.ConfigurationError, err:
142 logging.error("Configuration error, not activating node maintenance: %s",
147 def GetRunningInstances():
148 """Compute list of hypervisor/running instances.
151 hyp_list = ssconf.SimpleStore().GetHypervisorList()
153 for hv_name in hyp_list:
155 hv = hypervisor.GetHypervisor(hv_name)
156 ilist = hv.ListInstances()
157 results.extend([(iname, hv_name) for iname in ilist])
158 except: # pylint: disable-msg=W0702
159 logging.error("Error while listing instances for hypervisor %s",
160 hv_name, exc_info=True)
165 """Get list of used DRBD minors.
168 return bdev.DRBD8.GetUsedDevs().keys()
171 def DoMaintenance(cls, role):
172 """Maintain the instance list.
175 if role == constants.CONFD_NODE_ROLE_OFFLINE:
176 inst_running = cls.GetRunningInstances()
177 cls.ShutdownInstances(inst_running)
178 drbd_running = cls.GetUsedDRBDs()
179 cls.ShutdownDRBD(drbd_running)
181 logging.debug("Not doing anything for role %s", role)
184 def ShutdownInstances(inst_running):
185 """Shutdown running instances.
188 names_running = set([i[0] for i in inst_running])
190 logging.info("Following instances should not be running,"
191 " shutting them down: %s", utils.CommaJoin(names_running))
192 # this dictionary will collapse duplicate instance names (only
193 # xen pvm/vhm) into a single key, which is fine
194 i2h = dict(inst_running)
195 for name in names_running:
197 hv = hypervisor.GetHypervisor(hv_name)
198 hv.StopInstance(None, force=True, name=name)
201 def ShutdownDRBD(drbd_running):
202 """Shutdown active DRBD devices.
206 logging.info("Following DRBD minors should not be active,"
207 " shutting them down: %s", utils.CommaJoin(drbd_running))
208 for minor in drbd_running:
209 # pylint: disable-msg=W0212
210 # using the private method as is, pending enhancements to the DRBD
212 bdev.DRBD8._ShutdownAll(minor)
215 """Check node status versus cluster desired state.
218 my_name = netutils.Hostname.GetSysName()
219 req = confd_client.ConfdClientRequest(type=
220 constants.CONFD_REQ_NODE_ROLE_BYNAME,
222 self.confd_client.SendRequest(req, async=False, coverage=-1)
223 timed_out, _, _ = self.confd_client.WaitForReply(req.rsalt)
225 # should have a valid response
226 status, result = self.store_cb.GetResponse(req.rsalt)
227 assert status, "Missing result but received replies"
228 if not self.filter_cb.consistent[req.rsalt]:
229 logging.warning("Inconsistent replies, not doing anything")
231 self.DoMaintenance(result.server_reply.answer)
233 logging.warning("Confd query timed out, cannot do maintenance actions")
236 class WatcherState(object):
237 """Interface to a state file recording restart attempts.
240 def __init__(self, statefile):
241 """Open, lock, read and parse the file.
243 @type statefile: file
244 @param statefile: State file object
247 self.statefile = statefile
250 state_data = self.statefile.read()
254 self._data = serializer.Load(state_data)
255 except Exception, msg: # pylint: disable-msg=W0703
256 # Ignore errors while loading the file and treat it as empty
258 logging.warning(("Invalid state file. Using defaults."
259 " Error message: %s"), msg)
261 if "instance" not in self._data:
262 self._data["instance"] = {}
263 if "node" not in self._data:
264 self._data["node"] = {}
266 self._orig_data = serializer.Dump(self._data)
269 """Save state to file, then unlock and close it.
272 assert self.statefile
274 serialized_form = serializer.Dump(self._data)
275 if self._orig_data == serialized_form:
276 logging.debug("Data didn't change, just touching status file")
277 os.utime(constants.WATCHER_STATEFILE, None)
280 # We need to make sure the file is locked before renaming it, otherwise
281 # starting ganeti-watcher again at the same time will create a conflict.
282 fd = utils.WriteFile(constants.WATCHER_STATEFILE,
283 data=serialized_form,
284 prewrite=utils.LockFile, close=False)
285 self.statefile = os.fdopen(fd, 'w+')
288 """Unlock configuration file and close it.
291 assert self.statefile
293 # Files are automatically unlocked when closing them
294 self.statefile.close()
295 self.statefile = None
297 def GetNodeBootID(self, name):
298 """Returns the last boot ID of a node or None.
301 ndata = self._data["node"]
303 if name in ndata and KEY_BOOT_ID in ndata[name]:
304 return ndata[name][KEY_BOOT_ID]
307 def SetNodeBootID(self, name, bootid):
308 """Sets the boot ID of a node.
313 ndata = self._data["node"]
315 if name not in ndata:
318 ndata[name][KEY_BOOT_ID] = bootid
320 def NumberOfRestartAttempts(self, instance):
321 """Returns number of previous restart attempts.
323 @type instance: L{Instance}
324 @param instance: the instance to look up
327 idata = self._data["instance"]
329 if instance.name in idata:
330 return idata[instance.name][KEY_RESTART_COUNT]
334 def MaintainInstanceList(self, instances):
335 """Perform maintenance on the recorded instances.
337 @type instances: list of string
338 @param instances: the list of currently existing instances
341 idict = self._data["instance"]
342 # First, delete obsolete instances
343 obsolete_instances = set(idict).difference(instances)
344 for inst in obsolete_instances:
345 logging.debug("Forgetting obsolete instance %s", inst)
348 # Second, delete expired records
349 earliest = time.time() - RETRY_EXPIRATION
350 expired_instances = [i for i in idict
351 if idict[i][KEY_RESTART_WHEN] < earliest]
352 for inst in expired_instances:
353 logging.debug("Expiring record for instance %s", inst)
356 def RecordRestartAttempt(self, instance):
357 """Record a restart attempt.
359 @type instance: L{Instance}
360 @param instance: the instance being restarted
363 idata = self._data["instance"]
365 if instance.name not in idata:
366 inst = idata[instance.name] = {}
368 inst = idata[instance.name]
370 inst[KEY_RESTART_WHEN] = time.time()
371 inst[KEY_RESTART_COUNT] = inst.get(KEY_RESTART_COUNT, 0) + 1
373 def RemoveInstance(self, instance):
374 """Update state to reflect that a machine is running.
376 This method removes the record for a named instance (as we only
377 track down instances).
379 @type instance: L{Instance}
380 @param instance: the instance to remove from books
383 idata = self._data["instance"]
385 if instance.name in idata:
386 del idata[instance.name]
389 class Instance(object):
390 """Abstraction for a Virtual Machine instance.
393 def __init__(self, name, state, autostart):
396 self.autostart = autostart
399 """Encapsulates the start of an instance.
402 op = opcodes.OpStartupInstance(instance_name=self.name, force=False)
403 cli.SubmitOpCode(op, cl=client)
405 def ActivateDisks(self):
406 """Encapsulates the activation of all disks of an instance.
409 op = opcodes.OpActivateInstanceDisks(instance_name=self.name)
410 cli.SubmitOpCode(op, cl=client)
413 def GetClusterData():
414 """Get a list of instances on this cluster.
417 op1_fields = ["name", "status", "admin_state", "snodes"]
418 op1 = opcodes.OpQueryInstances(output_fields=op1_fields, names=[],
420 op2_fields = ["name", "bootid", "offline"]
421 op2 = opcodes.OpQueryNodes(output_fields=op2_fields, names=[],
424 job_id = client.SubmitJob([op1, op2])
426 all_results = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
428 logging.debug("Got data from cluster, writing instance status file")
430 result = all_results[0]
436 up_data = "".join(["%s %s\n" % (fields[0], fields[1]) for fields in result])
437 utils.WriteFile(file_name=constants.INSTANCE_UPFILE, data=up_data)
439 for fields in result:
440 (name, status, autostart, snodes) = fields
442 # update the secondary node map
446 smap[node].append(name)
448 instances[name] = Instance(name, status, autostart)
450 nodes = dict([(name, (bootid, offline))
451 for name, bootid, offline in all_results[1]])
453 client.ArchiveJob(job_id)
455 return instances, nodes, smap
458 class Watcher(object):
459 """Encapsulate the logic for restarting erroneously halted virtual machines.
461 The calling program should periodically instantiate me and call Run().
462 This will traverse the list of instances, and make up to MAXTRIES attempts
463 to restart machines that are down.
466 def __init__(self, opts, notepad):
467 self.notepad = notepad
468 master = client.QueryConfigValues(["master_node"])[0]
469 if master != netutils.Hostname.GetSysName():
470 raise NotMasterError("This is not the master node")
471 # first archive old jobs
472 self.ArchiveJobs(opts.job_age)
473 # and only then submit new ones
474 self.instances, self.bootids, self.smap = GetClusterData()
475 self.started_instances = set()
479 """Watcher run sequence.
482 notepad = self.notepad
483 self.CheckInstances(notepad)
484 self.CheckDisks(notepad)
488 def ArchiveJobs(age):
492 arch_count, left_count = client.AutoArchiveJobs(age)
493 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
495 def CheckDisks(self, notepad):
496 """Check all nodes for restarted ones.
500 for name, (new_id, offline) in self.bootids.iteritems():
501 old = notepad.GetNodeBootID(name)
503 # Bad node, not returning a boot id
505 logging.debug("Node %s missing boot id, skipping secondary checks",
509 # Node's boot ID has changed, proably through a reboot.
510 check_nodes.append(name)
513 # Activate disks for all instances with any of the checked nodes as a
515 for node in check_nodes:
516 if node not in self.smap:
518 for instance_name in self.smap[node]:
519 instance = self.instances[instance_name]
520 if not instance.autostart:
521 logging.info(("Skipping disk activation for non-autostart"
522 " instance %s"), instance.name)
524 if instance.name in self.started_instances:
525 # we already tried to start the instance, which should have
526 # activated its drives (if they can be at all)
529 logging.info("Activating disks for instance %s", instance.name)
530 instance.ActivateDisks()
531 except Exception: # pylint: disable-msg=W0703
532 logging.exception("Error while activating disks for instance %s",
535 # Keep changed boot IDs
536 for name in check_nodes:
537 notepad.SetNodeBootID(name, self.bootids[name][0])
539 def CheckInstances(self, notepad):
540 """Make a pass over the list of instances, restarting downed ones.
543 notepad.MaintainInstanceList(self.instances.keys())
545 for instance in self.instances.values():
546 if instance.state in BAD_STATES:
547 n = notepad.NumberOfRestartAttempts(instance)
550 logging.warning("Not restarting instance %s, retries exhausted",
554 last = " (Attempt #%d)" % (n + 1)
556 notepad.RecordRestartAttempt(instance)
557 logging.error("Could not restart %s after %d attempts, giving up",
558 instance.name, MAXTRIES)
561 logging.info("Restarting %s%s",
564 self.started_instances.add(instance.name)
565 except Exception: # pylint: disable-msg=W0703
566 logging.exception("Error while restarting instance %s",
569 notepad.RecordRestartAttempt(instance)
570 elif instance.state in HELPLESS_STATES:
571 if notepad.NumberOfRestartAttempts(instance):
572 notepad.RemoveInstance(instance)
574 if notepad.NumberOfRestartAttempts(instance):
575 notepad.RemoveInstance(instance)
576 logging.info("Restart of %s succeeded", instance.name)
580 """Run gnt-cluster verify-disks.
583 op = opcodes.OpVerifyDisks()
584 job_id = client.SubmitJob([op])
585 result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
586 client.ArchiveJob(job_id)
587 if not isinstance(result, (tuple, list)):
588 logging.error("Can't get a valid result from verify-disks")
590 offline_disk_instances = result[2]
591 if not offline_disk_instances:
594 logging.debug("Will activate disks for instances %s",
595 utils.CommaJoin(offline_disk_instances))
596 # we submit only one job, and wait for it. not optimal, but spams
598 job = [opcodes.OpActivateInstanceDisks(instance_name=name)
599 for name in offline_disk_instances]
600 job_id = cli.SendJob(job, cl=client)
603 cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)
604 except Exception: # pylint: disable-msg=W0703
605 logging.exception("Error while activating disks")
608 def OpenStateFile(path):
609 """Opens the state file and acquires a lock on it.
612 @param path: Path to state file
615 # The two-step dance below is necessary to allow both opening existing
616 # file read/write and creating if not existing. Vanilla open will truncate
617 # an existing file -or- allow creating if not existing.
618 statefile_fd = os.open(path, os.O_RDWR | os.O_CREAT)
620 # Try to acquire lock on state file. If this fails, another watcher instance
621 # might already be running or another program is temporarily blocking the
622 # watcher from running.
624 utils.LockFile(statefile_fd)
625 except errors.LockError, err:
626 logging.error("Can't acquire lock on state file %s: %s", path, err)
629 return os.fdopen(statefile_fd, "w+")
632 def IsRapiResponding(hostname):
633 """Connects to RAPI port and does a simple test.
635 Connects to RAPI port of hostname and does a simple test. At this time, the
638 @type hostname: string
639 @param hostname: hostname of the node to connect to.
641 @return: Whether RAPI is working properly
644 curl_config = rapi.client.GenericCurlConfig()
645 rapi_client = rapi.client.GanetiRapiClient(hostname,
646 curl_config_fn=curl_config)
648 master_version = rapi_client.GetVersion()
649 except rapi.client.CertificateError, err:
650 logging.warning("RAPI Error: CertificateError (%s)", err)
652 except rapi.client.GanetiApiError, err:
653 logging.warning("RAPI Error: GanetiApiError (%s)", err)
655 logging.debug("RAPI Result: master_version is %s", master_version)
656 return master_version == constants.RAPI_VERSION
660 """Parse the command line options.
662 @return: (options, args) as from OptionParser.parse_args()
665 parser = OptionParser(description="Ganeti cluster watcher",
667 version="%%prog (ganeti) %s" %
668 constants.RELEASE_VERSION)
670 parser.add_option(cli.DEBUG_OPT)
671 parser.add_option("-A", "--job-age", dest="job_age",
672 help="Autoarchive jobs older than this age (default"
673 " 6 hours)", default=6*3600)
674 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
675 action="store_true", help="Ignore cluster pause setting")
676 options, args = parser.parse_args()
677 options.job_age = cli.ParseTimespec(options.job_age)
681 @rapi.client.UsesRapiClient
686 global client # pylint: disable-msg=W0603
688 options, args = ParseOptions()
690 if args: # watcher doesn't take any arguments
691 print >> sys.stderr, ("Usage: %s [-f] " % sys.argv[0])
692 sys.exit(constants.EXIT_FAILURE)
694 utils.SetupLogging(constants.LOG_WATCHER, debug=options.debug,
695 stderr_logging=options.debug)
697 if ShouldPause() and not options.ignore_pause:
698 logging.debug("Pause has been set, exiting")
699 sys.exit(constants.EXIT_SUCCESS)
701 statefile = OpenStateFile(constants.WATCHER_STATEFILE)
703 sys.exit(constants.EXIT_FAILURE)
709 # run node maintenance in all cases, even if master, so that old
710 # masters can be properly cleaned up too
711 if NodeMaintenance.ShouldRun():
712 NodeMaintenance().Exec()
714 notepad = WatcherState(statefile)
717 client = cli.GetClient()
718 except errors.OpPrereqError:
719 # this is, from cli.GetClient, a not-master case
720 logging.debug("Not on master, exiting")
722 sys.exit(constants.EXIT_SUCCESS)
723 except luxi.NoMasterError, err:
724 logging.warning("Master seems to be down (%s), trying to restart",
726 if not utils.EnsureDaemon(constants.MASTERD):
727 logging.critical("Can't start the master, exiting")
728 sys.exit(constants.EXIT_FAILURE)
729 # else retry the connection
730 client = cli.GetClient()
732 # we are on master now
733 utils.EnsureDaemon(constants.RAPI)
735 # If RAPI isn't responding to queries, try one restart.
736 logging.debug("Attempting to talk with RAPI.")
737 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
738 logging.warning("Couldn't get answer from Ganeti RAPI daemon."
739 " Restarting Ganeti RAPI.")
740 utils.StopDaemon(constants.RAPI)
741 utils.EnsureDaemon(constants.RAPI)
742 logging.debug("Second attempt to talk with RAPI")
743 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
744 logging.fatal("RAPI is not responding. Please investigate.")
745 logging.debug("Successfully talked to RAPI.")
748 watcher = Watcher(options, notepad)
749 except errors.ConfigurationError:
750 # Just exit if there's no configuration
752 sys.exit(constants.EXIT_SUCCESS)
761 logging.debug("Not updating status file due to failure")
764 except NotMasterError:
765 logging.debug("Not master, exiting")
766 sys.exit(constants.EXIT_NOTMASTER)
767 except errors.ResolverError, err:
768 logging.error("Cannot resolve hostname '%s', exiting.", err.args[0])
769 sys.exit(constants.EXIT_NODESETUP_ERROR)
770 except errors.JobQueueFull:
771 logging.error("Job queue is full, can't query cluster state")
772 except errors.JobQueueDrainError:
773 logging.error("Job queue is drained, can't maintain cluster state")
774 except Exception, err:
775 logging.exception(str(err))
776 sys.exit(constants.EXIT_FAILURE)
779 if __name__ == '__main__':