4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Tool to restart erroneously downed virtual machines.
24 This program and set of classes implement a watchdog to restart
25 virtual machines in a Ganeti cluster that have crashed or been killed
26 by a node reboot. Run from cron or similar.
37 from optparse import OptionParser
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import compat
42 from ganeti import errors
43 from ganeti import opcodes
44 from ganeti import cli
45 from ganeti import luxi
46 from ganeti import rapi
47 from ganeti import netutils
48 from ganeti import qlang
49 from ganeti import objects
50 from ganeti import ssconf
53 import ganeti.rapi.client # pylint: disable=W0611
55 from ganeti.watcher import nodemaint
56 from ganeti.watcher import state
60 BAD_STATES = frozenset([
61 constants.INSTST_ERRORDOWN,
63 HELPLESS_STATES = frozenset([
64 constants.INSTST_NODEDOWN,
65 constants.INSTST_NODEOFFLINE,
70 #: Number of seconds to wait between starting child processes for node groups
71 CHILD_PROCESS_DELAY = 1.0
73 #: How many seconds to wait for instance status file lock
74 INSTANCE_STATUS_LOCK_TIMEOUT = 10.0
77 class NotMasterError(errors.GenericError):
78 """Exception raised when this host is not the master."""
82 """Check whether we should pause.
85 return bool(utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE))
88 def StartNodeDaemons():
89 """Start all the daemons that should be running on all nodes.
92 # on master or not, try to start the node daemon
93 utils.EnsureDaemon(constants.NODED)
94 # start confd as well. On non candidates it will be in disabled mode.
95 utils.EnsureDaemon(constants.CONFD)
98 def RunWatcherHooks():
99 """Run the watcher hooks.
102 hooks_dir = utils.PathJoin(constants.HOOKS_BASE_DIR,
103 constants.HOOKS_NAME_WATCHER)
104 if not os.path.isdir(hooks_dir):
108 results = utils.RunParts(hooks_dir)
109 except Exception, err: # pylint: disable=W0703
110 logging.exception("RunParts %s failed: %s", hooks_dir, err)
113 for (relname, status, runresult) in results:
114 if status == constants.RUNPARTS_SKIP:
115 logging.debug("Watcher hook %s: skipped", relname)
116 elif status == constants.RUNPARTS_ERR:
117 logging.warning("Watcher hook %s: error (%s)", relname, runresult)
118 elif status == constants.RUNPARTS_RUN:
120 logging.warning("Watcher hook %s: failed (exit: %d) (output: %s)",
121 relname, runresult.exit_code, runresult.output)
123 logging.debug("Watcher hook %s: success (output: %s)", relname,
126 raise errors.ProgrammerError("Unknown status %s returned by RunParts",
130 class Instance(object):
131 """Abstraction for a Virtual Machine instance.
134 def __init__(self, name, status, autostart, snodes):
137 self.autostart = autostart
140 def Restart(self, cl):
141 """Encapsulates the start of an instance.
144 op = opcodes.OpInstanceStartup(instance_name=self.name, force=False)
145 cli.SubmitOpCode(op, cl=cl)
147 def ActivateDisks(self, cl):
148 """Encapsulates the activation of all disks of an instance.
151 op = opcodes.OpInstanceActivateDisks(instance_name=self.name)
152 cli.SubmitOpCode(op, cl=cl)
156 """Data container representing cluster node.
159 def __init__(self, name, bootid, offline, secondaries):
160 """Initializes this class.
165 self.offline = offline
166 self.secondaries = secondaries
169 def _CheckInstances(cl, notepad, instances):
170 """Make a pass over the list of instances, restarting downed ones.
173 notepad.MaintainInstanceList(instances.keys())
177 for inst in instances.values():
178 if inst.status in BAD_STATES:
179 n = notepad.NumberOfRestartAttempts(inst.name)
182 logging.warning("Not restarting instance '%s', retries exhausted",
187 notepad.RecordRestartAttempt(inst.name)
188 logging.error("Could not restart instance '%s' after %s attempts,"
189 " giving up", inst.name, MAXTRIES)
193 logging.info("Restarting instance '%s' (attempt #%s)",
196 except Exception: # pylint: disable=W0703
197 logging.exception("Error while restarting instance '%s'", inst.name)
199 started.add(inst.name)
201 notepad.RecordRestartAttempt(inst.name)
204 if notepad.NumberOfRestartAttempts(inst.name):
205 notepad.RemoveInstance(inst.name)
206 if inst.status not in HELPLESS_STATES:
207 logging.info("Restart of instance '%s' succeeded", inst.name)
212 def _CheckDisks(cl, notepad, nodes, instances, started):
213 """Check all nodes for restarted ones.
218 for node in nodes.values():
219 old = notepad.GetNodeBootID(node.name)
221 # Bad node, not returning a boot id
223 logging.debug("Node '%s' missing boot ID, skipping secondary checks",
227 if old != node.bootid:
228 # Node's boot ID has changed, probably through a reboot
229 check_nodes.append(node)
232 # Activate disks for all instances with any of the checked nodes as a
234 for node in check_nodes:
235 for instance_name in node.secondaries:
237 inst = instances[instance_name]
239 logging.info("Can't find instance '%s', maybe it was ignored",
243 if not inst.autostart:
244 logging.info("Skipping disk activation for non-autostart"
245 " instance '%s'", inst.name)
248 if inst.name in started:
249 # we already tried to start the instance, which should have
250 # activated its drives (if they can be at all)
251 logging.debug("Skipping disk activation for instance '%s' as"
252 " it was already started", inst.name)
256 logging.info("Activating disks for instance '%s'", inst.name)
257 inst.ActivateDisks(cl)
258 except Exception: # pylint: disable=W0703
259 logging.exception("Error while activating disks for instance '%s'",
262 # Keep changed boot IDs
263 for node in check_nodes:
264 notepad.SetNodeBootID(node.name, node.bootid)
267 def _CheckForOfflineNodes(nodes, instance):
268 """Checks if given instances has any secondary in offline status.
270 @param instance: The instance object
271 @return: True if any of the secondary is offline, False otherwise
274 return compat.any(nodes[node_name].offline for node_name in instance.snodes)
277 def _VerifyDisks(cl, uuid, nodes, instances):
278 """Run a per-group "gnt-cluster verify-disks".
281 job_id = cl.SubmitJob([opcodes.OpGroupVerifyDisks(group_name=uuid)])
282 ((_, offline_disk_instances, _), ) = \
283 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
284 cl.ArchiveJob(job_id)
286 if not offline_disk_instances:
288 logging.debug("Verify-disks reported no offline disks, nothing to do")
291 logging.debug("Will activate disks for instance(s) %s",
292 utils.CommaJoin(offline_disk_instances))
294 # We submit only one job, and wait for it. Not optimal, but this puts less
295 # load on the job queue.
297 for name in offline_disk_instances:
299 inst = instances[name]
301 logging.info("Can't find instance '%s', maybe it was ignored", name)
304 if inst.status in HELPLESS_STATES or _CheckForOfflineNodes(nodes, inst):
305 logging.info("Skipping instance '%s' because it is in a helpless state or"
306 " has offline secondaries", name)
309 job.append(opcodes.OpInstanceActivateDisks(instance_name=name))
312 job_id = cli.SendJob(job, cl=cl)
315 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug)
316 except Exception: # pylint: disable=W0703
317 logging.exception("Error while activating disks")
320 def IsRapiResponding(hostname):
321 """Connects to RAPI port and does a simple test.
323 Connects to RAPI port of hostname and does a simple test. At this time, the
326 @type hostname: string
327 @param hostname: hostname of the node to connect to.
329 @return: Whether RAPI is working properly
332 curl_config = rapi.client.GenericCurlConfig()
333 rapi_client = rapi.client.GanetiRapiClient(hostname,
334 curl_config_fn=curl_config)
336 master_version = rapi_client.GetVersion()
337 except rapi.client.CertificateError, err:
338 logging.warning("RAPI certificate error: %s", err)
340 except rapi.client.GanetiApiError, err:
341 logging.warning("RAPI error: %s", err)
344 logging.debug("Reported RAPI version %s", master_version)
345 return master_version == constants.RAPI_VERSION
349 """Parse the command line options.
351 @return: (options, args) as from OptionParser.parse_args()
354 parser = OptionParser(description="Ganeti cluster watcher",
356 version="%%prog (ganeti) %s" %
357 constants.RELEASE_VERSION)
359 parser.add_option(cli.DEBUG_OPT)
360 parser.add_option(cli.NODEGROUP_OPT)
361 parser.add_option("-A", "--job-age", dest="job_age", default=6 * 3600,
362 help="Autoarchive jobs older than this age (default"
364 parser.add_option("--ignore-pause", dest="ignore_pause", default=False,
365 action="store_true", help="Ignore cluster pause setting")
366 parser.add_option("--wait-children", dest="wait_children",
367 action="store_true", help="Wait for child processes")
368 parser.add_option("--no-wait-children", dest="wait_children",
369 action="store_false", help="Don't wait for child processes")
370 # See optparse documentation for why default values are not set by options
371 parser.set_defaults(wait_children=True)
372 options, args = parser.parse_args()
373 options.job_age = cli.ParseTimespec(options.job_age)
376 parser.error("No arguments expected")
378 return (options, args)
381 def _WriteInstanceStatus(filename, data):
382 """Writes the per-group instance status file.
384 The entries are sorted.
386 @type filename: string
387 @param filename: Path to instance status file
388 @type data: list of tuple; (instance name as string, status as string)
389 @param data: Instance name and status
392 logging.debug("Updating instance status file '%s' with %s instances",
395 utils.WriteFile(filename,
396 data="".join(map(compat.partial(operator.mod, "%s %s\n"),
400 def _UpdateInstanceStatus(filename, instances):
401 """Writes an instance status file from L{Instance} objects.
403 @type filename: string
404 @param filename: Path to status file
405 @type instances: list of L{Instance}
408 _WriteInstanceStatus(filename, [(inst.name, inst.status)
409 for inst in instances])
413 """Helper to store file handle's C{fstat}.
417 """Initializes this class.
422 def __call__(self, fh):
423 """Calls C{fstat} on file handle.
426 self.st = os.fstat(fh.fileno())
429 def _ReadInstanceStatus(filename):
430 """Reads an instance status file.
432 @type filename: string
433 @param filename: Path to status file
434 @rtype: tuple; (None or number, list of lists containing instance name and
436 @return: File's mtime and instance status contained in the file; mtime is
437 C{None} if file can't be read
440 logging.debug("Reading per-group instance status from '%s'", filename)
444 content = utils.ReadFile(filename, preread=statcb)
445 except EnvironmentError, err:
446 if err.errno == errno.ENOENT:
447 logging.error("Can't read '%s', does not exist (yet)", filename)
449 logging.exception("Unable to read '%s', ignoring", filename)
452 return (statcb.st.st_mtime, [line.split(None, 1)
453 for line in content.splitlines()])
456 def _MergeInstanceStatus(filename, pergroup_filename, groups):
457 """Merges all per-group instance status files into a global one.
459 @type filename: string
460 @param filename: Path to global instance status file
461 @type pergroup_filename: string
462 @param pergroup_filename: Path to per-group status files, must contain "%s"
463 to be replaced with group UUID
464 @type groups: sequence
465 @param groups: UUIDs of known groups
468 # Lock global status file in exclusive mode
469 lock = utils.FileLock.Open(filename)
471 lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
472 except errors.LockError, err:
473 # All per-group processes will lock and update the file. None of them
474 # should take longer than 10 seconds (the value of
475 # INSTANCE_STATUS_LOCK_TIMEOUT).
476 logging.error("Can't acquire lock on instance status file '%s', not"
477 " updating: %s", filename, err)
480 logging.debug("Acquired exclusive lock on '%s'", filename)
484 # Load instance status from all groups
485 for group_uuid in groups:
486 (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
488 if mtime is not None:
489 for (instance_name, status) in instdata:
490 data.setdefault(instance_name, []).append((mtime, status))
492 # Select last update based on file mtime
493 inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
494 for (instance_name, status) in data.items()]
496 # Write the global status file. Don't touch file after it's been
497 # updated--there is no lock anymore.
498 _WriteInstanceStatus(filename, inststatus)
501 def GetLuxiClient(try_restart):
502 """Tries to connect to the master daemon.
504 @type try_restart: bool
505 @param try_restart: Whether to attempt to restart the master daemon
509 return cli.GetClient()
510 except errors.OpPrereqError, err:
511 # this is, from cli.GetClient, a not-master case
512 raise NotMasterError("Not on master node (%s)" % err)
514 except luxi.NoMasterError, err:
518 logging.warning("Master daemon seems to be down (%s), trying to restart",
521 if not utils.EnsureDaemon(constants.MASTERD):
522 raise errors.GenericError("Can't start the master daemon")
524 # Retry the connection
525 return cli.GetClient()
528 def _StartGroupChildren(cl, wait):
529 """Starts a new instance of the watcher for every node group.
532 assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
535 result = cl.QueryGroups([], ["name", "uuid"], False)
539 for (idx, (name, uuid)) in enumerate(result):
540 args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
543 # Let's not kill the system
544 time.sleep(CHILD_PROCESS_DELAY)
546 logging.debug("Spawning child for group '%s' (%s), arguments %s",
550 # TODO: Should utils.StartDaemon be used instead?
551 pid = os.spawnv(os.P_NOWAIT, args[0], args)
552 except Exception: # pylint: disable=W0703
553 logging.exception("Failed to start child for group '%s' (%s)",
556 logging.debug("Started with PID %s", pid)
561 logging.debug("Waiting for child PID %s", pid)
563 result = utils.RetryOnSignal(os.waitpid, pid, 0)
564 except EnvironmentError, err:
567 logging.debug("Child PID %s exited with status %s", pid, result)
570 def _ArchiveJobs(cl, age):
571 """Archives old jobs.
574 (arch_count, left_count) = cl.AutoArchiveJobs(age)
575 logging.debug("Archived %s jobs, left %s", arch_count, left_count)
578 def _CheckMaster(cl):
579 """Ensures current host is master node.
582 (master, ) = cl.QueryConfigValues(["master_node"])
583 if master != netutils.Hostname.GetSysName():
584 raise NotMasterError("This is not the master node")
587 @rapi.client.UsesRapiClient
588 def _GlobalWatcher(opts):
589 """Main function for global watcher.
591 At the end child processes are spawned for every node group.
597 # Run node maintenance in all cases, even if master, so that old masters can
598 # be properly cleaned up
599 if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
600 nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
603 client = GetLuxiClient(True)
604 except NotMasterError:
605 # Don't proceed on non-master nodes
606 return constants.EXIT_SUCCESS
608 # we are on master now
609 utils.EnsureDaemon(constants.RAPI)
611 # If RAPI isn't responding to queries, try one restart
612 logging.debug("Attempting to talk to remote API on %s",
613 constants.IP4_ADDRESS_LOCALHOST)
614 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
615 logging.warning("Couldn't get answer from remote API, restaring daemon")
616 utils.StopDaemon(constants.RAPI)
617 utils.EnsureDaemon(constants.RAPI)
618 logging.debug("Second attempt to talk to remote API")
619 if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
620 logging.fatal("RAPI is not responding")
621 logging.debug("Successfully talked to remote API")
624 _ArchiveJobs(client, opts.job_age)
626 # Spawn child processes for all node groups
627 _StartGroupChildren(client, opts.wait_children)
629 return constants.EXIT_SUCCESS
632 def _GetGroupData(cl, uuid):
633 """Retrieves instances and nodes per node group.
637 # Get all primary instances in group
638 opcodes.OpQuery(what=constants.QR_INSTANCE,
639 fields=["name", "status", "admin_state", "snodes",
640 "pnode.group.uuid", "snodes.group.uuid"],
641 filter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
644 # Get all nodes in group
645 opcodes.OpQuery(what=constants.QR_NODE,
646 fields=["name", "bootid", "offline"],
647 filter=[qlang.OP_EQUAL, "group.uuid", uuid],
651 job_id = cl.SubmitJob(job)
652 results = map(objects.QueryResponse.FromDict,
653 cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
654 cl.ArchiveJob(job_id)
656 results_data = map(operator.attrgetter("data"), results)
658 # Ensure results are tuples with two values
659 assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
661 # Extract values ignoring result status
662 (raw_instances, raw_nodes) = [[map(compat.snd, values)
664 for res in results_data]
670 for (name, status, autostart, snodes, pnode_group_uuid,
671 snodes_group_uuid) in raw_instances:
672 if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
673 logging.error("Ignoring split instance '%s', primary group %s, secondary"
674 " groups %s", name, pnode_group_uuid,
675 utils.CommaJoin(snodes_group_uuid))
677 instances.append(Instance(name, status, autostart, snodes))
680 secondaries.setdefault(node, set()).add(name)
683 nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
684 for (name, bootid, offline) in raw_nodes]
686 return (dict((node.name, node) for node in nodes),
687 dict((inst.name, inst) for inst in instances))
690 def _LoadKnownGroups():
691 """Returns a list of all node groups known by L{ssconf}.
694 groups = ssconf.SimpleStore().GetNodegroupList()
696 result = list(line.split(None, 1)[0] for line in groups
699 if not compat.all(map(utils.UUID_RE.match, result)):
700 raise errors.GenericError("Ssconf contains invalid group UUID")
705 def _GroupWatcher(opts):
706 """Main function for per-group watcher process.
709 group_uuid = opts.nodegroup.lower()
711 if not utils.UUID_RE.match(group_uuid):
712 raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
714 (cli.NODEGROUP_OPT_NAME, group_uuid))
716 logging.info("Watcher for node group '%s'", group_uuid)
718 known_groups = _LoadKnownGroups()
720 # Check if node group is known
721 if group_uuid not in known_groups:
722 raise errors.GenericError("Node group '%s' is not known by ssconf" %
725 # Group UUID has been verified and should not contain any dangerous characters
726 state_path = constants.WATCHER_GROUP_STATE_FILE % group_uuid
727 inst_status_path = constants.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
729 logging.debug("Using state file %s", state_path)
732 statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
734 return constants.EXIT_FAILURE
736 notepad = state.WatcherState(statefile) # pylint: disable=E0602
738 # Connect to master daemon
739 client = GetLuxiClient(False)
743 (nodes, instances) = _GetGroupData(client, group_uuid)
745 # Update per-group instance status file
746 _UpdateInstanceStatus(inst_status_path, instances.values())
748 _MergeInstanceStatus(constants.INSTANCE_STATUS_FILE,
749 constants.WATCHER_GROUP_INSTANCE_STATUS_FILE,
752 started = _CheckInstances(client, notepad, instances)
753 _CheckDisks(client, notepad, nodes, instances, started)
754 _VerifyDisks(client, group_uuid, nodes, instances)
755 except Exception, err:
756 logging.info("Not updating status file due to failure: %s", err)
759 # Save changes for next run
760 notepad.Save(state_path)
762 return constants.EXIT_SUCCESS
769 (options, _) = ParseOptions()
771 utils.SetupLogging(constants.LOG_WATCHER, sys.argv[0],
772 debug=options.debug, stderr_logging=options.debug)
774 if ShouldPause() and not options.ignore_pause:
775 logging.debug("Pause has been set, exiting")
776 return constants.EXIT_SUCCESS
778 # Try to acquire global watcher lock in shared mode
779 lock = utils.FileLock.Open(constants.WATCHER_LOCK_FILE)
781 lock.Shared(blocking=False)
782 except (EnvironmentError, errors.LockError), err:
783 logging.error("Can't acquire lock on %s: %s",
784 constants.WATCHER_LOCK_FILE, err)
785 return constants.EXIT_SUCCESS
787 if options.nodegroup is None:
790 # Per-nodegroup watcher
795 except (SystemExit, KeyboardInterrupt):
797 except NotMasterError:
798 logging.debug("Not master, exiting")
799 return constants.EXIT_NOTMASTER
800 except errors.ResolverError, err:
801 logging.error("Cannot resolve hostname '%s', exiting", err.args[0])
802 return constants.EXIT_NODESETUP_ERROR
803 except errors.JobQueueFull:
804 logging.error("Job queue is full, can't query cluster state")
805 except errors.JobQueueDrainError:
806 logging.error("Job queue is drained, can't maintain cluster state")
807 except Exception, err:
808 logging.exception(str(err))
809 return constants.EXIT_FAILURE
811 return constants.EXIT_SUCCESS