Change OpClusterVerifyDisks to per-group opcodes
authorMichael Hanselmann <hansmi@google.com>
Tue, 26 Jul 2011 09:34:00 +0000 (11:34 +0200)
committerMichael Hanselmann <hansmi@google.com>
Tue, 26 Jul 2011 10:53:48 +0000 (12:53 +0200)
Until now verifying disks, which is also used by the watcher,
would lock all nodes and instances. With this patch the opcode
is changed to operate on per nodegroup, requiring fewer locks.

Both “gnt-cluster” and “ganeti-watcher” are changed for the
new interface.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>

lib/client/gnt_cluster.py
lib/cmdlib.py
lib/opcodes.py
lib/watcher/__init__.py

index b74b528..5ac3713 100644 (file)
@@ -503,22 +503,30 @@ def VerifyDisks(opts, args):
   cl = GetClient()
 
   op = opcodes.OpClusterVerifyDisks()
-  result = SubmitOpCode(op, opts=opts, cl=cl)
-  if not isinstance(result, (list, tuple)) or len(result) != 3:
-    raise errors.ProgrammerError("Unknown result type for OpClusterVerifyDisks")
 
-  bad_nodes, instances, missing = result
+  result = SubmitOpCode(op, cl=cl, opts=opts)
+
+  # Keep track of submitted jobs
+  jex = JobExecutor(cl=cl, opts=opts)
+
+  for (status, job_id) in result[constants.JOB_IDS_KEY]:
+    jex.AddJobId(None, status, job_id)
 
   retcode = constants.EXIT_SUCCESS
 
-  if bad_nodes:
+  for (status, result) in jex.GetResults():
+    if not status:
+      ToStdout("Job failed: %s", result)
+      continue
+
+    ((bad_nodes, instances, missing), ) = result
+
     for node, text in bad_nodes.items():
       ToStdout("Error gathering data on node %s: %s",
                node, utils.SafeEncode(text[-400:]))
-      retcode |= 1
+      retcode = constants.EXIT_FAILURE
       ToStdout("You need to fix these nodes first before fixing instances")
 
-  if instances:
     for iname in instances:
       if iname in missing:
         continue
@@ -531,24 +539,24 @@ def VerifyDisks(opts, args):
         retcode |= nret
         ToStderr("Error activating disks for instance %s: %s", iname, msg)
 
-  if missing:
-    for iname, ival in missing.iteritems():
-      all_missing = compat.all(x[0] in bad_nodes for x in ival)
-      if all_missing:
-        ToStdout("Instance %s cannot be verified as it lives on"
-                 " broken nodes", iname)
-      else:
-        ToStdout("Instance %s has missing logical volumes:", iname)
-        ival.sort()
-        for node, vol in ival:
-          if node in bad_nodes:
-            ToStdout("\tbroken node %s /dev/%s", node, vol)
-          else:
-            ToStdout("\t%s /dev/%s", node, vol)
-
-    ToStdout("You need to run replace or recreate disks for all the above"
-             " instances, if this message persist after fixing nodes.")
-    retcode |= 1
+    if missing:
+      for iname, ival in missing.iteritems():
+        all_missing = compat.all(x[0] in bad_nodes for x in ival)
+        if all_missing:
+          ToStdout("Instance %s cannot be verified as it lives on"
+                   " broken nodes", iname)
+        else:
+          ToStdout("Instance %s has missing logical volumes:", iname)
+          ival.sort()
+          for node, vol in ival:
+            if node in bad_nodes:
+              ToStdout("\tbroken node %s /dev/%s", node, vol)
+            else:
+              ToStdout("\t%s /dev/%s", node, vol)
+
+      ToStdout("You need to replace or recreate disks for all the above"
+               " instances if this message persists after fixing broken nodes.")
+      retcode = constants.EXIT_FAILURE
 
   return retcode
 
index 50123f9..3ef83ed 100644 (file)
@@ -2867,11 +2867,109 @@ class LUClusterVerifyDisks(NoHooksLU):
   REQ_BGL = False
 
   def ExpandNames(self):
+    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
     self.needed_locks = {
-      locking.LEVEL_NODE: locking.ALL_SET,
-      locking.LEVEL_INSTANCE: locking.ALL_SET,
-    }
+      locking.LEVEL_NODEGROUP: locking.ALL_SET,
+      }
+
+  def Exec(self, feedback_fn):
+    group_names = self.glm.list_owned(locking.LEVEL_NODEGROUP)
+
+    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
+    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
+                           for group in group_names])
+
+
+class LUGroupVerifyDisks(NoHooksLU):
+  """Verifies the status of all disks in a node group.
+
+  """
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    # Raises errors.OpPrereqError on its own if group can't be found
+    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+
     self.share_locks = dict.fromkeys(locking.LEVELS, 1)
+    self.needed_locks = {
+      locking.LEVEL_INSTANCE: [],
+      locking.LEVEL_NODEGROUP: [],
+      locking.LEVEL_NODE: [],
+      }
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_INSTANCE:
+      assert not self.needed_locks[locking.LEVEL_INSTANCE]
+
+      # Lock instances optimistically, needs verification once node and group
+      # locks have been acquired
+      self.needed_locks[locking.LEVEL_INSTANCE] = \
+        self.cfg.GetNodeGroupInstances(self.group_uuid)
+
+    elif level == locking.LEVEL_NODEGROUP:
+      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
+
+      self.needed_locks[locking.LEVEL_NODEGROUP] = \
+        set([self.group_uuid] +
+            # Lock all groups used by instances optimistically; this requires
+            # going via the node before it's locked, requiring verification
+            # later on
+            [group_uuid
+             for instance_name in
+               self.glm.list_owned(locking.LEVEL_INSTANCE)
+             for group_uuid in
+               self.cfg.GetInstanceNodeGroups(instance_name)])
+
+    elif level == locking.LEVEL_NODE:
+      # This will only lock the nodes in the group to be verified which contain
+      # actual instances
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+      self._LockInstancesNodes()
+
+      # Lock all nodes in group to be verified
+      assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
+      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
+      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
+
+  def CheckPrereq(self):
+    owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE))
+    owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP))
+    owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE))
+
+    assert self.group_uuid in owned_groups
+
+    # Check if locked instances are still correct
+    wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
+    if owned_instances != wanted_instances:
+      raise errors.OpPrereqError("Instances in node group %s changed since"
+                                 " locks were acquired, wanted %s, have %s;"
+                                 " retry the operation" %
+                                 (self.op.group_name,
+                                  utils.CommaJoin(wanted_instances),
+                                  utils.CommaJoin(owned_instances)),
+                                 errors.ECODE_STATE)
+
+    # Get instance information
+    self.instances = dict((name, self.cfg.GetInstanceInfo(name))
+                          for name in owned_instances)
+
+    # Check if node groups for locked instances are still correct
+    for (instance_name, inst) in self.instances.items():
+      assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \
+        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
+      assert owned_nodes.issuperset(inst.all_nodes), \
+        "Instance %s's nodes changed while we kept the lock" % instance_name
+
+      inst_groups = self.cfg.GetInstanceNodeGroups(instance_name)
+      if not owned_groups.issuperset(inst_groups):
+        raise errors.OpPrereqError("Instance %s's node groups changed since"
+                                   " locks were acquired, current groups are"
+                                   " are '%s', owning groups '%s'; retry the"
+                                   " operation" %
+                                   (instance_name,
+                                    utils.CommaJoin(inst_groups),
+                                    utils.CommaJoin(owned_groups)),
+                                   errors.ECODE_STATE)
 
   def Exec(self, feedback_fn):
     """Verify integrity of cluster disks.
@@ -2882,50 +2980,41 @@ class LUClusterVerifyDisks(NoHooksLU):
         missing volumes
 
     """
-    result = res_nodes, res_instances, res_missing = {}, [], {}
+    res_nodes = {}
+    res_instances = set()
+    res_missing = {}
 
-    nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
-    instances = self.cfg.GetAllInstancesInfo().values()
+    nv_dict = _MapInstanceDisksToNodes([inst
+                                        for inst in self.instances.values()
+                                        if inst.admin_up])
 
-    nv_dict = {}
-    for inst in instances:
-      inst_lvs = {}
-      if not inst.admin_up:
-        continue
-      inst.MapLVsByNode(inst_lvs)
-      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
-      for node, vol_list in inst_lvs.iteritems():
-        for vol in vol_list:
-          nv_dict[(node, vol)] = inst
-
-    if not nv_dict:
-      return result
-
-    node_lvs = self.rpc.call_lv_list(nodes, [])
-    for node, node_res in node_lvs.items():
-      if node_res.offline:
-        continue
-      msg = node_res.fail_msg
-      if msg:
-        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
-        res_nodes[node] = msg
-        continue
+    if nv_dict:
+      nodes = utils.NiceSort(set(self.glm.list_owned(locking.LEVEL_NODE)) &
+                             set(self.cfg.GetVmCapableNodeList()))
 
-      lvs = node_res.payload
-      for lv_name, (_, _, lv_online) in lvs.items():
-        inst = nv_dict.pop((node, lv_name), None)
-        if (not lv_online and inst is not None
-            and inst.name not in res_instances):
-          res_instances.append(inst.name)
+      node_lvs = self.rpc.call_lv_list(nodes, [])
 
-    # any leftover items in nv_dict are missing LVs, let's arrange the
-    # data better
-    for key, inst in nv_dict.iteritems():
-      if inst.name not in res_missing:
-        res_missing[inst.name] = []
-      res_missing[inst.name].append(key)
+      for (node, node_res) in node_lvs.items():
+        if node_res.offline:
+          continue
 
-    return result
+        msg = node_res.fail_msg
+        if msg:
+          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
+          res_nodes[node] = msg
+          continue
+
+        for lv_name, (_, _, lv_online) in node_res.payload.items():
+          inst = nv_dict.pop((node, lv_name), None)
+          if not (lv_online or inst is None):
+            res_instances.add(inst)
+
+      # any leftover items in nv_dict are missing LVs, let's arrange the data
+      # better
+      for key, inst in nv_dict.iteritems():
+        res_missing.setdefault(inst, []).append(key)
+
+    return (res_nodes, list(res_instances), res_missing)
 
 
 class LUClusterRepairDiskSizes(NoHooksLU):
index 8d50084..1a4c00a 100644 (file)
@@ -594,11 +594,14 @@ class OpClusterVerifyGroup(OpCode):
 class OpClusterVerifyDisks(OpCode):
   """Verify the cluster disks.
 
-  Parameters: none
+  """
+
 
-  Result: a tuple of four elements:
-    - list of node names with bad data returned (unreachable, etc.)
-    - dict of node names with broken volume groups (values: error msg)
+class OpGroupVerifyDisks(OpCode):
+  """Verifies the status of all disks in a node group.
+
+  Result: a tuple of three elements:
+    - dict of node names with issues (values: error msg)
     - list of instances with degraded disks (that should be activated)
     - dict of instances with missing logical volumes (values: (node, vol)
       pairs with details about the missing volumes)
@@ -612,6 +615,10 @@ class OpClusterVerifyDisks(OpCode):
   consideration. This might need to be revisited in the future.
 
   """
+  OP_DSC_FIELD = "group_name"
+  OP_PARAMS = [
+    _PGroupName,
+    ]
 
 
 class OpClusterRepairDiskSizes(OpCode):
index f04b032..0e62063 100644 (file)
@@ -597,20 +597,41 @@ class Watcher(object):
     """Run gnt-cluster verify-disks.
 
     """
-    op = opcodes.OpClusterVerifyDisks()
-    job_id = client.SubmitJob([op])
+    job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
     result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
     client.ArchiveJob(job_id)
-    if not isinstance(result, (tuple, list)):
-      logging.error("Can't get a valid result from verify-disks")
-      return
-    offline_disk_instances = result[1]
+
+    # Keep track of submitted jobs
+    jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
+
+    archive_jobs = set()
+    for (status, job_id) in result[constants.JOB_IDS_KEY]:
+      jex.AddJobId(None, status, job_id)
+      if status:
+        archive_jobs.add(job_id)
+
+    offline_disk_instances = set()
+
+    for (status, result) in jex.GetResults():
+      if not status:
+        logging.error("Verify-disks job failed: %s", result)
+        continue
+
+      ((_, instances, _), ) = result
+
+      offline_disk_instances.update(instances)
+
+    for job_id in archive_jobs:
+      client.ArchiveJob(job_id)
+
     if not offline_disk_instances:
       # nothing to do
       logging.debug("verify-disks reported no offline disks, nothing to do")
       return
+
     logging.debug("Will activate disks for instance(s) %s",
                   utils.CommaJoin(offline_disk_instances))
+
     # we submit only one job, and wait for it. not optimal, but spams
     # less the job queue
     job = []