cl = GetClient()
op = opcodes.OpClusterVerifyDisks()
- result = SubmitOpCode(op, opts=opts, cl=cl)
- if not isinstance(result, (list, tuple)) or len(result) != 3:
- raise errors.ProgrammerError("Unknown result type for OpClusterVerifyDisks")
- bad_nodes, instances, missing = result
+ result = SubmitOpCode(op, cl=cl, opts=opts)
+
+ # Keep track of submitted jobs
+ jex = JobExecutor(cl=cl, opts=opts)
+
+ for (status, job_id) in result[constants.JOB_IDS_KEY]:
+ jex.AddJobId(None, status, job_id)
retcode = constants.EXIT_SUCCESS
- if bad_nodes:
+ for (status, result) in jex.GetResults():
+ if not status:
+ ToStdout("Job failed: %s", result)
+ continue
+
+ ((bad_nodes, instances, missing), ) = result
+
for node, text in bad_nodes.items():
ToStdout("Error gathering data on node %s: %s",
node, utils.SafeEncode(text[-400:]))
- retcode |= 1
+ retcode = constants.EXIT_FAILURE
ToStdout("You need to fix these nodes first before fixing instances")
- if instances:
for iname in instances:
if iname in missing:
continue
retcode |= nret
ToStderr("Error activating disks for instance %s: %s", iname, msg)
- if missing:
- for iname, ival in missing.iteritems():
- all_missing = compat.all(x[0] in bad_nodes for x in ival)
- if all_missing:
- ToStdout("Instance %s cannot be verified as it lives on"
- " broken nodes", iname)
- else:
- ToStdout("Instance %s has missing logical volumes:", iname)
- ival.sort()
- for node, vol in ival:
- if node in bad_nodes:
- ToStdout("\tbroken node %s /dev/%s", node, vol)
- else:
- ToStdout("\t%s /dev/%s", node, vol)
-
- ToStdout("You need to run replace or recreate disks for all the above"
- " instances, if this message persist after fixing nodes.")
- retcode |= 1
+ if missing:
+ for iname, ival in missing.iteritems():
+ all_missing = compat.all(x[0] in bad_nodes for x in ival)
+ if all_missing:
+ ToStdout("Instance %s cannot be verified as it lives on"
+ " broken nodes", iname)
+ else:
+ ToStdout("Instance %s has missing logical volumes:", iname)
+ ival.sort()
+ for node, vol in ival:
+ if node in bad_nodes:
+ ToStdout("\tbroken node %s /dev/%s", node, vol)
+ else:
+ ToStdout("\t%s /dev/%s", node, vol)
+
+ ToStdout("You need to replace or recreate disks for all the above"
+ " instances if this message persists after fixing broken nodes.")
+ retcode = constants.EXIT_FAILURE
return retcode
REQ_BGL = False
def ExpandNames(self):
+ self.share_locks = dict.fromkeys(locking.LEVELS, 1)
self.needed_locks = {
- locking.LEVEL_NODE: locking.ALL_SET,
- locking.LEVEL_INSTANCE: locking.ALL_SET,
- }
+ locking.LEVEL_NODEGROUP: locking.ALL_SET,
+ }
+
+ def Exec(self, feedback_fn):
+ group_names = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+
+ # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
+ return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
+ for group in group_names])
+
+
+class LUGroupVerifyDisks(NoHooksLU):
+ """Verifies the status of all disks in a node group.
+
+ """
+ REQ_BGL = False
+
+ def ExpandNames(self):
+ # Raises errors.OpPrereqError on its own if group can't be found
+ self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+
self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+ self.needed_locks = {
+ locking.LEVEL_INSTANCE: [],
+ locking.LEVEL_NODEGROUP: [],
+ locking.LEVEL_NODE: [],
+ }
+
+ def DeclareLocks(self, level):
+ if level == locking.LEVEL_INSTANCE:
+ assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+ # Lock instances optimistically, needs verification once node and group
+ # locks have been acquired
+ self.needed_locks[locking.LEVEL_INSTANCE] = \
+ self.cfg.GetNodeGroupInstances(self.group_uuid)
+
+ elif level == locking.LEVEL_NODEGROUP:
+ assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+ self.needed_locks[locking.LEVEL_NODEGROUP] = \
+ set([self.group_uuid] +
+ # Lock all groups used by instances optimistically; this requires
+ # going via the node before it's locked, requiring verification
+ # later on
+ [group_uuid
+ for instance_name in
+ self.glm.list_owned(locking.LEVEL_INSTANCE)
+ for group_uuid in
+ self.cfg.GetInstanceNodeGroups(instance_name)])
+
+ elif level == locking.LEVEL_NODE:
+ # This will only lock the nodes in the group to be verified which contain
+ # actual instances
+ self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+ self._LockInstancesNodes()
+
+ # Lock all nodes in group to be verified
+ assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+ member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
+ self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
+
+ def CheckPrereq(self):
+ owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE))
+ owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP))
+ owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE))
+
+ assert self.group_uuid in owned_groups
+
+ # Check if locked instances are still correct
+ wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
+ if owned_instances != wanted_instances:
+ raise errors.OpPrereqError("Instances in node group %s changed since"
+ " locks were acquired, wanted %s, have %s;"
+ " retry the operation" %
+ (self.op.group_name,
+ utils.CommaJoin(wanted_instances),
+ utils.CommaJoin(owned_instances)),
+ errors.ECODE_STATE)
+
+ # Get instance information
+ self.instances = dict((name, self.cfg.GetInstanceInfo(name))
+ for name in owned_instances)
+
+ # Check if node groups for locked instances are still correct
+ for (instance_name, inst) in self.instances.items():
+ assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \
+ "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+ assert owned_nodes.issuperset(inst.all_nodes), \
+ "Instance %s's nodes changed while we kept the lock" % instance_name
+
+ inst_groups = self.cfg.GetInstanceNodeGroups(instance_name)
+ if not owned_groups.issuperset(inst_groups):
+ raise errors.OpPrereqError("Instance %s's node groups changed since"
+ " locks were acquired, current groups are"
+ " are '%s', owning groups '%s'; retry the"
+ " operation" %
+ (instance_name,
+ utils.CommaJoin(inst_groups),
+ utils.CommaJoin(owned_groups)),
+ errors.ECODE_STATE)
def Exec(self, feedback_fn):
"""Verify integrity of cluster disks.
missing volumes
"""
- result = res_nodes, res_instances, res_missing = {}, [], {}
+ res_nodes = {}
+ res_instances = set()
+ res_missing = {}
- nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
- instances = self.cfg.GetAllInstancesInfo().values()
+ nv_dict = _MapInstanceDisksToNodes([inst
+ for inst in self.instances.values()
+ if inst.admin_up])
- nv_dict = {}
- for inst in instances:
- inst_lvs = {}
- if not inst.admin_up:
- continue
- inst.MapLVsByNode(inst_lvs)
- # transform { iname: {node: [vol,],},} to {(node, vol): iname}
- for node, vol_list in inst_lvs.iteritems():
- for vol in vol_list:
- nv_dict[(node, vol)] = inst
-
- if not nv_dict:
- return result
-
- node_lvs = self.rpc.call_lv_list(nodes, [])
- for node, node_res in node_lvs.items():
- if node_res.offline:
- continue
- msg = node_res.fail_msg
- if msg:
- logging.warning("Error enumerating LVs on node %s: %s", node, msg)
- res_nodes[node] = msg
- continue
+ if nv_dict:
+ nodes = utils.NiceSort(set(self.glm.list_owned(locking.LEVEL_NODE)) &
+ set(self.cfg.GetVmCapableNodeList()))
- lvs = node_res.payload
- for lv_name, (_, _, lv_online) in lvs.items():
- inst = nv_dict.pop((node, lv_name), None)
- if (not lv_online and inst is not None
- and inst.name not in res_instances):
- res_instances.append(inst.name)
+ node_lvs = self.rpc.call_lv_list(nodes, [])
- # any leftover items in nv_dict are missing LVs, let's arrange the
- # data better
- for key, inst in nv_dict.iteritems():
- if inst.name not in res_missing:
- res_missing[inst.name] = []
- res_missing[inst.name].append(key)
+ for (node, node_res) in node_lvs.items():
+ if node_res.offline:
+ continue
- return result
+ msg = node_res.fail_msg
+ if msg:
+ logging.warning("Error enumerating LVs on node %s: %s", node, msg)
+ res_nodes[node] = msg
+ continue
+
+ for lv_name, (_, _, lv_online) in node_res.payload.items():
+ inst = nv_dict.pop((node, lv_name), None)
+ if not (lv_online or inst is None):
+ res_instances.add(inst)
+
+ # any leftover items in nv_dict are missing LVs, let's arrange the data
+ # better
+ for key, inst in nv_dict.iteritems():
+ res_missing.setdefault(inst, []).append(key)
+
+ return (res_nodes, list(res_instances), res_missing)
class LUClusterRepairDiskSizes(NoHooksLU):
"""Run gnt-cluster verify-disks.
"""
- op = opcodes.OpClusterVerifyDisks()
- job_id = client.SubmitJob([op])
+ job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
client.ArchiveJob(job_id)
- if not isinstance(result, (tuple, list)):
- logging.error("Can't get a valid result from verify-disks")
- return
- offline_disk_instances = result[1]
+
+ # Keep track of submitted jobs
+ jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
+
+ archive_jobs = set()
+ for (status, job_id) in result[constants.JOB_IDS_KEY]:
+ jex.AddJobId(None, status, job_id)
+ if status:
+ archive_jobs.add(job_id)
+
+ offline_disk_instances = set()
+
+ for (status, result) in jex.GetResults():
+ if not status:
+ logging.error("Verify-disks job failed: %s", result)
+ continue
+
+ ((_, instances, _), ) = result
+
+ offline_disk_instances.update(instances)
+
+ for job_id in archive_jobs:
+ client.ArchiveJob(job_id)
+
if not offline_disk_instances:
# nothing to do
logging.debug("verify-disks reported no offline disks, nothing to do")
return
+
logging.debug("Will activate disks for instance(s) %s",
utils.CommaJoin(offline_disk_instances))
+
# we submit only one job, and wait for it. not optimal, but spams
# less the job queue
job = []