Revision ae1a845c

b/lib/client/gnt_cluster.py
503 503
  cl = GetClient()
504 504

  
505 505
  op = opcodes.OpClusterVerifyDisks()
506
  result = SubmitOpCode(op, opts=opts, cl=cl)
507
  if not isinstance(result, (list, tuple)) or len(result) != 3:
508
    raise errors.ProgrammerError("Unknown result type for OpClusterVerifyDisks")
509 506

  
510
  bad_nodes, instances, missing = result
507
  result = SubmitOpCode(op, cl=cl, opts=opts)
508

  
509
  # Keep track of submitted jobs
510
  jex = JobExecutor(cl=cl, opts=opts)
511

  
512
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
513
    jex.AddJobId(None, status, job_id)
511 514

  
512 515
  retcode = constants.EXIT_SUCCESS
513 516

  
514
  if bad_nodes:
517
  for (status, result) in jex.GetResults():
518
    if not status:
519
      ToStdout("Job failed: %s", result)
520
      continue
521

  
522
    ((bad_nodes, instances, missing), ) = result
523

  
515 524
    for node, text in bad_nodes.items():
516 525
      ToStdout("Error gathering data on node %s: %s",
517 526
               node, utils.SafeEncode(text[-400:]))
518
      retcode |= 1
527
      retcode = constants.EXIT_FAILURE
519 528
      ToStdout("You need to fix these nodes first before fixing instances")
520 529

  
521
  if instances:
522 530
    for iname in instances:
523 531
      if iname in missing:
524 532
        continue
......
531 539
        retcode |= nret
532 540
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
533 541

  
534
  if missing:
535
    for iname, ival in missing.iteritems():
536
      all_missing = compat.all(x[0] in bad_nodes for x in ival)
537
      if all_missing:
538
        ToStdout("Instance %s cannot be verified as it lives on"
539
                 " broken nodes", iname)
540
      else:
541
        ToStdout("Instance %s has missing logical volumes:", iname)
542
        ival.sort()
543
        for node, vol in ival:
544
          if node in bad_nodes:
545
            ToStdout("\tbroken node %s /dev/%s", node, vol)
546
          else:
547
            ToStdout("\t%s /dev/%s", node, vol)
548

  
549
    ToStdout("You need to run replace or recreate disks for all the above"
550
             " instances, if this message persist after fixing nodes.")
551
    retcode |= 1
542
    if missing:
543
      for iname, ival in missing.iteritems():
544
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
545
        if all_missing:
546
          ToStdout("Instance %s cannot be verified as it lives on"
547
                   " broken nodes", iname)
548
        else:
549
          ToStdout("Instance %s has missing logical volumes:", iname)
550
          ival.sort()
551
          for node, vol in ival:
552
            if node in bad_nodes:
553
              ToStdout("\tbroken node %s /dev/%s", node, vol)
554
            else:
555
              ToStdout("\t%s /dev/%s", node, vol)
556

  
557
      ToStdout("You need to replace or recreate disks for all the above"
558
               " instances if this message persists after fixing broken nodes.")
559
      retcode = constants.EXIT_FAILURE
552 560

  
553 561
  return retcode
554 562

  
b/lib/cmdlib.py
2867 2867
  REQ_BGL = False
2868 2868

  
2869 2869
  def ExpandNames(self):
2870
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2870 2871
    self.needed_locks = {
2871
      locking.LEVEL_NODE: locking.ALL_SET,
2872
      locking.LEVEL_INSTANCE: locking.ALL_SET,
2873
    }
2872
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2873
      }
2874

  
2875
  def Exec(self, feedback_fn):
2876
    group_names = self.glm.list_owned(locking.LEVEL_NODEGROUP)
2877

  
2878
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2879
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2880
                           for group in group_names])
2881

  
2882

  
2883
class LUGroupVerifyDisks(NoHooksLU):
2884
  """Verifies the status of all disks in a node group.
2885

  
2886
  """
2887
  REQ_BGL = False
2888

  
2889
  def ExpandNames(self):
2890
    # Raises errors.OpPrereqError on its own if group can't be found
2891
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
2892

  
2874 2893
    self.share_locks = dict.fromkeys(locking.LEVELS, 1)
2894
    self.needed_locks = {
2895
      locking.LEVEL_INSTANCE: [],
2896
      locking.LEVEL_NODEGROUP: [],
2897
      locking.LEVEL_NODE: [],
2898
      }
2899

  
2900
  def DeclareLocks(self, level):
2901
    if level == locking.LEVEL_INSTANCE:
2902
      assert not self.needed_locks[locking.LEVEL_INSTANCE]
2903

  
2904
      # Lock instances optimistically, needs verification once node and group
2905
      # locks have been acquired
2906
      self.needed_locks[locking.LEVEL_INSTANCE] = \
2907
        self.cfg.GetNodeGroupInstances(self.group_uuid)
2908

  
2909
    elif level == locking.LEVEL_NODEGROUP:
2910
      assert not self.needed_locks[locking.LEVEL_NODEGROUP]
2911

  
2912
      self.needed_locks[locking.LEVEL_NODEGROUP] = \
2913
        set([self.group_uuid] +
2914
            # Lock all groups used by instances optimistically; this requires
2915
            # going via the node before it's locked, requiring verification
2916
            # later on
2917
            [group_uuid
2918
             for instance_name in
2919
               self.glm.list_owned(locking.LEVEL_INSTANCE)
2920
             for group_uuid in
2921
               self.cfg.GetInstanceNodeGroups(instance_name)])
2922

  
2923
    elif level == locking.LEVEL_NODE:
2924
      # This will only lock the nodes in the group to be verified which contain
2925
      # actual instances
2926
      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
2927
      self._LockInstancesNodes()
2928

  
2929
      # Lock all nodes in group to be verified
2930
      assert self.group_uuid in self.glm.list_owned(locking.LEVEL_NODEGROUP)
2931
      member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
2932
      self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
2933

  
2934
  def CheckPrereq(self):
2935
    owned_instances = frozenset(self.glm.list_owned(locking.LEVEL_INSTANCE))
2936
    owned_groups = frozenset(self.glm.list_owned(locking.LEVEL_NODEGROUP))
2937
    owned_nodes = frozenset(self.glm.list_owned(locking.LEVEL_NODE))
2938

  
2939
    assert self.group_uuid in owned_groups
2940

  
2941
    # Check if locked instances are still correct
2942
    wanted_instances = self.cfg.GetNodeGroupInstances(self.group_uuid)
2943
    if owned_instances != wanted_instances:
2944
      raise errors.OpPrereqError("Instances in node group %s changed since"
2945
                                 " locks were acquired, wanted %s, have %s;"
2946
                                 " retry the operation" %
2947
                                 (self.op.group_name,
2948
                                  utils.CommaJoin(wanted_instances),
2949
                                  utils.CommaJoin(owned_instances)),
2950
                                 errors.ECODE_STATE)
2951

  
2952
    # Get instance information
2953
    self.instances = dict((name, self.cfg.GetInstanceInfo(name))
2954
                          for name in owned_instances)
2955

  
2956
    # Check if node groups for locked instances are still correct
2957
    for (instance_name, inst) in self.instances.items():
2958
      assert self.group_uuid in self.cfg.GetInstanceNodeGroups(instance_name), \
2959
        "Instance %s has no node in group %s" % (instance_name, self.group_uuid)
2960
      assert owned_nodes.issuperset(inst.all_nodes), \
2961
        "Instance %s's nodes changed while we kept the lock" % instance_name
2962

  
2963
      inst_groups = self.cfg.GetInstanceNodeGroups(instance_name)
2964
      if not owned_groups.issuperset(inst_groups):
2965
        raise errors.OpPrereqError("Instance %s's node groups changed since"
2966
                                   " locks were acquired, current groups are"
2967
                                   " are '%s', owning groups '%s'; retry the"
2968
                                   " operation" %
2969
                                   (instance_name,
2970
                                    utils.CommaJoin(inst_groups),
2971
                                    utils.CommaJoin(owned_groups)),
2972
                                   errors.ECODE_STATE)
2875 2973

  
2876 2974
  def Exec(self, feedback_fn):
2877 2975
    """Verify integrity of cluster disks.
......
2882 2980
        missing volumes
2883 2981

  
2884 2982
    """
2885
    result = res_nodes, res_instances, res_missing = {}, [], {}
2983
    res_nodes = {}
2984
    res_instances = set()
2985
    res_missing = {}
2886 2986

  
2887
    nodes = utils.NiceSort(self.cfg.GetVmCapableNodeList())
2888
    instances = self.cfg.GetAllInstancesInfo().values()
2987
    nv_dict = _MapInstanceDisksToNodes([inst
2988
                                        for inst in self.instances.values()
2989
                                        if inst.admin_up])
2889 2990

  
2890
    nv_dict = {}
2891
    for inst in instances:
2892
      inst_lvs = {}
2893
      if not inst.admin_up:
2894
        continue
2895
      inst.MapLVsByNode(inst_lvs)
2896
      # transform { iname: {node: [vol,],},} to {(node, vol): iname}
2897
      for node, vol_list in inst_lvs.iteritems():
2898
        for vol in vol_list:
2899
          nv_dict[(node, vol)] = inst
2900

  
2901
    if not nv_dict:
2902
      return result
2903

  
2904
    node_lvs = self.rpc.call_lv_list(nodes, [])
2905
    for node, node_res in node_lvs.items():
2906
      if node_res.offline:
2907
        continue
2908
      msg = node_res.fail_msg
2909
      if msg:
2910
        logging.warning("Error enumerating LVs on node %s: %s", node, msg)
2911
        res_nodes[node] = msg
2912
        continue
2991
    if nv_dict:
2992
      nodes = utils.NiceSort(set(self.glm.list_owned(locking.LEVEL_NODE)) &
2993
                             set(self.cfg.GetVmCapableNodeList()))
2913 2994

  
2914
      lvs = node_res.payload
2915
      for lv_name, (_, _, lv_online) in lvs.items():
2916
        inst = nv_dict.pop((node, lv_name), None)
2917
        if (not lv_online and inst is not None
2918
            and inst.name not in res_instances):
2919
          res_instances.append(inst.name)
2995
      node_lvs = self.rpc.call_lv_list(nodes, [])
2920 2996

  
2921
    # any leftover items in nv_dict are missing LVs, let's arrange the
2922
    # data better
2923
    for key, inst in nv_dict.iteritems():
2924
      if inst.name not in res_missing:
2925
        res_missing[inst.name] = []
2926
      res_missing[inst.name].append(key)
2997
      for (node, node_res) in node_lvs.items():
2998
        if node_res.offline:
2999
          continue
2927 3000

  
2928
    return result
3001
        msg = node_res.fail_msg
3002
        if msg:
3003
          logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3004
          res_nodes[node] = msg
3005
          continue
3006

  
3007
        for lv_name, (_, _, lv_online) in node_res.payload.items():
3008
          inst = nv_dict.pop((node, lv_name), None)
3009
          if not (lv_online or inst is None):
3010
            res_instances.add(inst)
3011

  
3012
      # any leftover items in nv_dict are missing LVs, let's arrange the data
3013
      # better
3014
      for key, inst in nv_dict.iteritems():
3015
        res_missing.setdefault(inst, []).append(key)
3016

  
3017
    return (res_nodes, list(res_instances), res_missing)
2929 3018

  
2930 3019

  
2931 3020
class LUClusterRepairDiskSizes(NoHooksLU):
b/lib/opcodes.py
594 594
class OpClusterVerifyDisks(OpCode):
595 595
  """Verify the cluster disks.
596 596

  
597
  Parameters: none
597
  """
598

  
598 599

  
599
  Result: a tuple of four elements:
600
    - list of node names with bad data returned (unreachable, etc.)
601
    - dict of node names with broken volume groups (values: error msg)
600
class OpGroupVerifyDisks(OpCode):
601
  """Verifies the status of all disks in a node group.
602

  
603
  Result: a tuple of three elements:
604
    - dict of node names with issues (values: error msg)
602 605
    - list of instances with degraded disks (that should be activated)
603 606
    - dict of instances with missing logical volumes (values: (node, vol)
604 607
      pairs with details about the missing volumes)
......
612 615
  consideration. This might need to be revisited in the future.
613 616

  
614 617
  """
618
  OP_DSC_FIELD = "group_name"
619
  OP_PARAMS = [
620
    _PGroupName,
621
    ]
615 622

  
616 623

  
617 624
class OpClusterRepairDiskSizes(OpCode):
b/lib/watcher/__init__.py
597 597
    """Run gnt-cluster verify-disks.
598 598

  
599 599
    """
600
    op = opcodes.OpClusterVerifyDisks()
601
    job_id = client.SubmitJob([op])
600
    job_id = client.SubmitJob([opcodes.OpClusterVerifyDisks()])
602 601
    result = cli.PollJob(job_id, cl=client, feedback_fn=logging.debug)[0]
603 602
    client.ArchiveJob(job_id)
604
    if not isinstance(result, (tuple, list)):
605
      logging.error("Can't get a valid result from verify-disks")
606
      return
607
    offline_disk_instances = result[1]
603

  
604
    # Keep track of submitted jobs
605
    jex = cli.JobExecutor(cl=client, feedback_fn=logging.debug)
606

  
607
    archive_jobs = set()
608
    for (status, job_id) in result[constants.JOB_IDS_KEY]:
609
      jex.AddJobId(None, status, job_id)
610
      if status:
611
        archive_jobs.add(job_id)
612

  
613
    offline_disk_instances = set()
614

  
615
    for (status, result) in jex.GetResults():
616
      if not status:
617
        logging.error("Verify-disks job failed: %s", result)
618
        continue
619

  
620
      ((_, instances, _), ) = result
621

  
622
      offline_disk_instances.update(instances)
623

  
624
    for job_id in archive_jobs:
625
      client.ArchiveJob(job_id)
626

  
608 627
    if not offline_disk_instances:
609 628
      # nothing to do
610 629
      logging.debug("verify-disks reported no offline disks, nothing to do")
611 630
      return
631

  
612 632
    logging.debug("Will activate disks for instance(s) %s",
613 633
                  utils.CommaJoin(offline_disk_instances))
634

  
614 635
    # we submit only one job, and wait for it. not optimal, but spams
615 636
    # less the job queue
616 637
    job = []

Also available in: Unified diff