+def _WriteInstanceStatus(filename, data):
+ """Writes the per-group instance status file.
+
+ The entries are sorted.
+
+ @type filename: string
+ @param filename: Path to instance status file
+ @type data: list of tuple; (instance name as string, status as string)
+ @param data: Instance name and status
+
+ """
+ logging.debug("Updating instance status file '%s' with %s instances",
+ filename, len(data))
+
+ utils.WriteFile(filename,
+ data="".join(map(compat.partial(operator.mod, "%s %s\n"),
+ sorted(data))))
+
+
+def _UpdateInstanceStatus(filename, instances):
+ """Writes an instance status file from L{Instance} objects.
+
+ @type filename: string
+ @param filename: Path to status file
+ @type instances: list of L{Instance}
+
+ """
+ _WriteInstanceStatus(filename, [(inst.name, inst.status)
+ for inst in instances])
+
+
+def _ReadInstanceStatus(filename):
+ """Reads an instance status file.
+
+ @type filename: string
+ @param filename: Path to status file
+ @rtype: tuple; (None or number, list of lists containing instance name and
+ status)
+ @return: File's mtime and instance status contained in the file; mtime is
+ C{None} if file can't be read
+
+ """
+ logging.debug("Reading per-group instance status from '%s'", filename)
+
+ statcb = utils.FileStatHelper()
+ try:
+ content = utils.ReadFile(filename, preread=statcb)
+ except EnvironmentError, err:
+ if err.errno == errno.ENOENT:
+ logging.error("Can't read '%s', does not exist (yet)", filename)
+ else:
+ logging.exception("Unable to read '%s', ignoring", filename)
+ return (None, None)
+ else:
+ return (statcb.st.st_mtime, [line.split(None, 1)
+ for line in content.splitlines()])
+
+
+def _MergeInstanceStatus(filename, pergroup_filename, groups):
+ """Merges all per-group instance status files into a global one.
+
+ @type filename: string
+ @param filename: Path to global instance status file
+ @type pergroup_filename: string
+ @param pergroup_filename: Path to per-group status files, must contain "%s"
+ to be replaced with group UUID
+ @type groups: sequence
+ @param groups: UUIDs of known groups
+
+ """
+ # Lock global status file in exclusive mode
+ lock = utils.FileLock.Open(filename)
+ try:
+ lock.Exclusive(blocking=True, timeout=INSTANCE_STATUS_LOCK_TIMEOUT)
+ except errors.LockError, err:
+ # All per-group processes will lock and update the file. None of them
+ # should take longer than 10 seconds (the value of
+ # INSTANCE_STATUS_LOCK_TIMEOUT).
+ logging.error("Can't acquire lock on instance status file '%s', not"
+ " updating: %s", filename, err)
+ return
+
+ logging.debug("Acquired exclusive lock on '%s'", filename)
+
+ data = {}
+
+ # Load instance status from all groups
+ for group_uuid in groups:
+ (mtime, instdata) = _ReadInstanceStatus(pergroup_filename % group_uuid)
+
+ if mtime is not None:
+ for (instance_name, status) in instdata:
+ data.setdefault(instance_name, []).append((mtime, status))
+
+ # Select last update based on file mtime
+ inststatus = [(instance_name, sorted(status, reverse=True)[0][1])
+ for (instance_name, status) in data.items()]
+
+ # Write the global status file. Don't touch file after it's been
+ # updated--there is no lock anymore.
+ _WriteInstanceStatus(filename, inststatus)
+
+
+def GetLuxiClient(try_restart):
+ """Tries to connect to the master daemon.
+
+ @type try_restart: bool
+ @param try_restart: Whether to attempt to restart the master daemon
+
+ """
+ try:
+ return cli.GetClient()
+ except errors.OpPrereqError, err:
+ # this is, from cli.GetClient, a not-master case
+ raise NotMasterError("Not on master node (%s)" % err)
+
+ except luxi.NoMasterError, err:
+ if not try_restart:
+ raise
+
+ logging.warning("Master daemon seems to be down (%s), trying to restart",
+ err)
+
+ if not utils.EnsureDaemon(constants.MASTERD):
+ raise errors.GenericError("Can't start the master daemon")
+
+ # Retry the connection
+ return cli.GetClient()
+
+
+def _StartGroupChildren(cl, wait):
+ """Starts a new instance of the watcher for every node group.
+
+ """
+ assert not compat.any(arg.startswith(cli.NODEGROUP_OPT_NAME)
+ for arg in sys.argv)
+
+ result = cl.QueryGroups([], ["name", "uuid"], False)
+
+ children = []
+
+ for (idx, (name, uuid)) in enumerate(result):
+ args = sys.argv + [cli.NODEGROUP_OPT_NAME, uuid]
+
+ if idx > 0:
+ # Let's not kill the system
+ time.sleep(CHILD_PROCESS_DELAY)
+
+ logging.debug("Spawning child for group '%s' (%s), arguments %s",
+ name, uuid, args)
+
+ try:
+ # TODO: Should utils.StartDaemon be used instead?
+ pid = os.spawnv(os.P_NOWAIT, args[0], args)
+ except Exception: # pylint: disable=W0703
+ logging.exception("Failed to start child for group '%s' (%s)",
+ name, uuid)
+ else:
+ logging.debug("Started with PID %s", pid)
+ children.append(pid)
+
+ if wait:
+ for pid in children:
+ logging.debug("Waiting for child PID %s", pid)
+ try:
+ result = utils.RetryOnSignal(os.waitpid, pid, 0)
+ except EnvironmentError, err:
+ result = str(err)
+
+ logging.debug("Child PID %s exited with status %s", pid, result)
+
+
+def _ArchiveJobs(cl, age):
+ """Archives old jobs.
+
+ """
+ (arch_count, left_count) = cl.AutoArchiveJobs(age)
+ logging.debug("Archived %s jobs, left %s", arch_count, left_count)
+
+
+def _CheckMaster(cl):
+ """Ensures current host is master node.
+
+ """
+ (master, ) = cl.QueryConfigValues(["master_node"])
+ if master != netutils.Hostname.GetSysName():
+ raise NotMasterError("This is not the master node")
+
+
+@UsesRapiClient
+def _GlobalWatcher(opts):
+ """Main function for global watcher.
+
+ At the end child processes are spawned for every node group.
+
+ """
+ StartNodeDaemons()
+ RunWatcherHooks()
+
+ # Run node maintenance in all cases, even if master, so that old masters can
+ # be properly cleaned up
+ if nodemaint.NodeMaintenance.ShouldRun(): # pylint: disable=E0602
+ nodemaint.NodeMaintenance().Exec() # pylint: disable=E0602
+
+ try:
+ client = GetLuxiClient(True)
+ except NotMasterError:
+ # Don't proceed on non-master nodes
+ return constants.EXIT_SUCCESS
+
+ # we are on master now
+ utils.EnsureDaemon(constants.RAPI)
+
+ # If RAPI isn't responding to queries, try one restart
+ logging.debug("Attempting to talk to remote API on %s",
+ constants.IP4_ADDRESS_LOCALHOST)
+ if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
+ logging.warning("Couldn't get answer from remote API, restaring daemon")
+ utils.StopDaemon(constants.RAPI)
+ utils.EnsureDaemon(constants.RAPI)
+ logging.debug("Second attempt to talk to remote API")
+ if not IsRapiResponding(constants.IP4_ADDRESS_LOCALHOST):
+ logging.fatal("RAPI is not responding")
+ logging.debug("Successfully talked to remote API")
+
+ _CheckMaster(client)
+ _ArchiveJobs(client, opts.job_age)
+
+ # Spawn child processes for all node groups
+ _StartGroupChildren(client, opts.wait_children)
+
+ return constants.EXIT_SUCCESS
+
+
+def _GetGroupData(cl, uuid):
+ """Retrieves instances and nodes per node group.
+
+ """
+ job = [
+ # Get all primary instances in group
+ opcodes.OpQuery(what=constants.QR_INSTANCE,
+ fields=["name", "status", "admin_state", "snodes",
+ "pnode.group.uuid", "snodes.group.uuid"],
+ qfilter=[qlang.OP_EQUAL, "pnode.group.uuid", uuid],
+ use_locking=True),
+
+ # Get all nodes in group
+ opcodes.OpQuery(what=constants.QR_NODE,
+ fields=["name", "bootid", "offline"],
+ qfilter=[qlang.OP_EQUAL, "group.uuid", uuid],
+ use_locking=True),
+ ]
+
+ job_id = cl.SubmitJob(job)
+ results = map(objects.QueryResponse.FromDict,
+ cli.PollJob(job_id, cl=cl, feedback_fn=logging.debug))
+ cl.ArchiveJob(job_id)
+
+ results_data = map(operator.attrgetter("data"), results)
+
+ # Ensure results are tuples with two values
+ assert compat.all(map(ht.TListOf(ht.TListOf(ht.TIsLength(2))), results_data))
+
+ # Extract values ignoring result status
+ (raw_instances, raw_nodes) = [[map(compat.snd, values)
+ for values in res]
+ for res in results_data]
+
+ secondaries = {}
+ instances = []
+
+ # Load all instances
+ for (name, status, autostart, snodes, pnode_group_uuid,
+ snodes_group_uuid) in raw_instances:
+ if snodes and set([pnode_group_uuid]) != set(snodes_group_uuid):
+ logging.error("Ignoring split instance '%s', primary group %s, secondary"
+ " groups %s", name, pnode_group_uuid,
+ utils.CommaJoin(snodes_group_uuid))
+ else:
+ instances.append(Instance(name, status, autostart, snodes))
+
+ for node in snodes:
+ secondaries.setdefault(node, set()).add(name)
+
+ # Load all nodes
+ nodes = [Node(name, bootid, offline, secondaries.get(name, set()))
+ for (name, bootid, offline) in raw_nodes]
+
+ return (dict((node.name, node) for node in nodes),
+ dict((inst.name, inst) for inst in instances))
+
+
+def _LoadKnownGroups():
+ """Returns a list of all node groups known by L{ssconf}.
+
+ """
+ groups = ssconf.SimpleStore().GetNodegroupList()
+
+ result = list(line.split(None, 1)[0] for line in groups
+ if line.strip())
+
+ if not compat.all(map(utils.UUID_RE.match, result)):
+ raise errors.GenericError("Ssconf contains invalid group UUID")
+
+ return result
+
+
+def _GroupWatcher(opts):
+ """Main function for per-group watcher process.
+
+ """
+ group_uuid = opts.nodegroup.lower()
+
+ if not utils.UUID_RE.match(group_uuid):
+ raise errors.GenericError("Node group parameter (%s) must be given a UUID,"
+ " got '%s'" %
+ (cli.NODEGROUP_OPT_NAME, group_uuid))
+
+ logging.info("Watcher for node group '%s'", group_uuid)
+
+ known_groups = _LoadKnownGroups()
+
+ # Check if node group is known
+ if group_uuid not in known_groups:
+ raise errors.GenericError("Node group '%s' is not known by ssconf" %
+ group_uuid)
+
+ # Group UUID has been verified and should not contain any dangerous
+ # characters
+ state_path = pathutils.WATCHER_GROUP_STATE_FILE % group_uuid
+ inst_status_path = pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE % group_uuid
+
+ logging.debug("Using state file %s", state_path)
+
+ # Global watcher
+ statefile = state.OpenStateFile(state_path) # pylint: disable=E0602
+ if not statefile:
+ return constants.EXIT_FAILURE
+
+ notepad = state.WatcherState(statefile) # pylint: disable=E0602
+ try:
+ # Connect to master daemon
+ client = GetLuxiClient(False)
+
+ _CheckMaster(client)
+
+ (nodes, instances) = _GetGroupData(client, group_uuid)
+
+ # Update per-group instance status file
+ _UpdateInstanceStatus(inst_status_path, instances.values())
+
+ _MergeInstanceStatus(pathutils.INSTANCE_STATUS_FILE,
+ pathutils.WATCHER_GROUP_INSTANCE_STATUS_FILE,
+ known_groups)
+
+ started = _CheckInstances(client, notepad, instances)
+ _CheckDisks(client, notepad, nodes, instances, started)
+ _VerifyDisks(client, group_uuid, nodes, instances)
+ except Exception, err:
+ logging.info("Not updating status file due to failure: %s", err)
+ raise
+ else:
+ # Save changes for next run
+ notepad.Save(state_path)
+
+ return constants.EXIT_SUCCESS
+
+