Rename OpRedistributeConfig and LURedistributeConfig
[ganeti-local] / lib / cmdlib.py
index 996489e..e6231a9 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -1089,7 +1089,7 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
                                  " iallocator.")
 
 
-class LUPostInitCluster(LogicalUnit):
+class LUClusterPostInit(LogicalUnit):
   """Logical unit for running hooks after cluster initialization.
 
   """
@@ -1111,7 +1111,7 @@ class LUPostInitCluster(LogicalUnit):
     return True
 
 
-class LUDestroyCluster(LogicalUnit):
+class LUClusterDestroy(LogicalUnit):
   """Logical unit for destroying the cluster.
 
   """
@@ -1219,6 +1219,7 @@ class LUVerifyCluster(LogicalUnit):
   EINSTANCEMISSINGDISK = (TINSTANCE, "EINSTANCEMISSINGDISK")
   EINSTANCEFAULTYDISK = (TINSTANCE, "EINSTANCEFAULTYDISK")
   EINSTANCEWRONGNODE = (TINSTANCE, "EINSTANCEWRONGNODE")
+  EINSTANCESPLITGROUPS = (TINSTANCE, "EINSTANCESPLITGROUPS")
   ENODEDRBD = (TNODE, "ENODEDRBD")
   ENODEDRBDHELPER = (TNODE, "ENODEDRBDHELPER")
   ENODEFILECHECK = (TNODE, "ENODEFILECHECK")
@@ -1837,7 +1838,7 @@ class LUVerifyCluster(LogicalUnit):
     node = ninfo.name
     # We just have to verify the paths on master and/or master candidates
     # as the oob helper is invoked on the master
-    if ((ninfo.master_candidate or ninfo.master) and
+    if ((ninfo.master_candidate or ninfo.master_capable) and
         constants.NV_OOB_PATHS in nresult):
       for path_result in nresult[constants.NV_OOB_PATHS]:
         self._ErrorIf(path_result, self.ENODEOOBPATH, node, path_result)
@@ -2048,6 +2049,7 @@ class LUVerifyCluster(LogicalUnit):
     """Verify integrity of cluster, performing various test on nodes.
 
     """
+    # This method has too many local variables. pylint: disable-msg=R0914
     self.bad = False
     _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
     verbose = self.op.verbose
@@ -2067,9 +2069,11 @@ class LUVerifyCluster(LogicalUnit):
     cluster = self.cfg.GetClusterInfo()
     nodelist = utils.NiceSort(self.cfg.GetNodeList())
     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
+    nodeinfo_byname = dict(zip(nodelist, nodeinfo))
     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
     instanceinfo = dict((iname, self.cfg.GetInstanceInfo(iname))
                         for iname in instancelist)
+    groupinfo = self.cfg.GetAllNodeGroupsInfo()
     i_non_redundant = [] # Non redundant instances
     i_non_a_balanced = [] # Non auto-balanced instances
     n_offline = 0 # Count of offline nodes
@@ -2254,11 +2258,33 @@ class LUVerifyCluster(LogicalUnit):
       # FIXME: does not support file-backed instances
       if not inst_config.secondary_nodes:
         i_non_redundant.append(instance)
+
       _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
                instance, "instance has multiple secondary nodes: %s",
                utils.CommaJoin(inst_config.secondary_nodes),
                code=self.ETYPE_WARNING)
 
+      if inst_config.disk_template in constants.DTS_NET_MIRROR:
+        pnode = inst_config.primary_node
+        instance_nodes = utils.NiceSort(inst_config.all_nodes)
+        instance_groups = {}
+
+        for node in instance_nodes:
+          instance_groups.setdefault(nodeinfo_byname[node].group,
+                                     []).append(node)
+
+        pretty_list = [
+          "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
+          # Sort so that we always list the primary node first.
+          for group, nodes in sorted(instance_groups.items(),
+                                     key=lambda (_, nodes): pnode in nodes,
+                                     reverse=True)]
+
+        self._ErrorIf(len(instance_groups) > 1, self.EINSTANCESPLITGROUPS,
+                      instance, "instance has primary and secondary nodes in"
+                      " different groups: %s", utils.CommaJoin(pretty_list),
+                      code=self.ETYPE_WARNING)
+
       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
         i_non_a_balanced.append(instance)
 
@@ -3018,7 +3044,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
     _UploadHelper(lu, vm_nodes, fname)
 
 
-class LURedistributeConfig(NoHooksLU):
+class LUClusterRedistConf(NoHooksLU):
   """Force the redistribution of cluster configuration.
 
   This is a very simple LU.
@@ -3569,7 +3595,7 @@ class _NodeQuery(_QueryBase):
     return query.NodeQueryData([all_info[name] for name in nodenames],
                                live_data, lu.cfg.GetMasterNode(),
                                node_to_primary, node_to_secondary, groups,
-                               oob_support)
+                               oob_support, lu.cfg.GetClusterInfo())
 
 
 class LUQueryNodes(NoHooksLU):
@@ -4490,7 +4516,7 @@ class LUPowercycleNode(NoHooksLU):
     return result.payload
 
 
-class LUQueryClusterInfo(NoHooksLU):
+class LUClusterQuery(NoHooksLU):
   """Query cluster configuration.
 
   """
@@ -4535,6 +4561,7 @@ class LUQueryClusterInfo(NoHooksLU):
       "beparams": cluster.beparams,
       "osparams": cluster.osparams,
       "nicparams": cluster.nicparams,
+      "ndparams": cluster.ndparams,
       "candidate_pool_size": cluster.candidate_pool_size,
       "master_netdev": cluster.master_netdev,
       "volume_group_name": cluster.volume_group_name,
@@ -4555,7 +4582,7 @@ class LUQueryClusterInfo(NoHooksLU):
     return result
 
 
-class LUQueryConfigValues(NoHooksLU):
+class LUClusterConfigQuery(NoHooksLU):
   """Return configuration values.
 
   """
@@ -4880,9 +4907,8 @@ def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
       or we cannot check the node
 
   """
-  if req_sizes is not None:
-    for vg, req_size in req_sizes.iteritems():
-      _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
+  for vg, req_size in req_sizes.items():
+    _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
 
 
 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
@@ -6501,32 +6527,52 @@ def _WipeDisks(lu, instance):
 
   """
   node = instance.primary_node
-  for idx, device in enumerate(instance.disks):
-    lu.LogInfo("* Wiping disk %d", idx)
-    logging.info("Wiping disk %d for instance %s", idx, instance.name)
-
-    # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
-    # MAX_WIPE_CHUNK at max
-    wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
-                          constants.MIN_WIPE_CHUNK_PERCENT)
-
-    offset = 0
-    size = device.size
-    last_output = 0
-    start_time = time.time()
-
-    while offset < size:
-      wipe_size = min(wipe_chunk_size, size - offset)
-      result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
-      result.Raise("Could not wipe disk %d at offset %d for size %d" %
-                   (idx, offset, wipe_size))
-      now = time.time()
-      offset += wipe_size
-      if now - last_output >= 60:
-        eta = _CalcEta(now - start_time, offset, size)
-        lu.LogInfo(" - done: %.1f%% ETA: %s" %
-                   (offset / float(size) * 100, utils.FormatSeconds(eta)))
-        last_output = now
+  logging.info("Pause sync of instance %s disks", instance.name)
+  result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, True)
+
+  for idx, success in enumerate(result.payload):
+    if not success:
+      logging.warn("pause-sync of instance %s for disks %d failed",
+                   instance.name, idx)
+
+  try:
+    for idx, device in enumerate(instance.disks):
+      lu.LogInfo("* Wiping disk %d", idx)
+      logging.info("Wiping disk %d for instance %s", idx, instance.name)
+
+      # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
+      # MAX_WIPE_CHUNK at max
+      wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 *
+                            constants.MIN_WIPE_CHUNK_PERCENT)
+
+      offset = 0
+      size = device.size
+      last_output = 0
+      start_time = time.time()
+
+      while offset < size:
+        wipe_size = min(wipe_chunk_size, size - offset)
+        result = lu.rpc.call_blockdev_wipe(node, device, offset, wipe_size)
+        result.Raise("Could not wipe disk %d at offset %d for size %d" %
+                     (idx, offset, wipe_size))
+        now = time.time()
+        offset += wipe_size
+        if now - last_output >= 60:
+          eta = _CalcEta(now - start_time, offset, size)
+          lu.LogInfo(" - done: %.1f%% ETA: %s" %
+                     (offset / float(size) * 100, utils.FormatSeconds(eta)))
+          last_output = now
+  finally:
+    logging.info("Resume sync of instance %s disks", instance.name)
+
+    result = lu.rpc.call_blockdev_pause_resume_sync(node, instance.disks, False)
+
+    for idx, success in enumerate(result.payload):
+      if not success:
+        lu.LogWarning("Warning: Resume sync of disk %d failed. Please have a"
+                      " look at the status and troubleshoot the issue.", idx)
+        logging.warn("resume-sync of instance %s for disks %d failed",
+                     instance.name, idx)
 
 
 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
@@ -6639,11 +6685,11 @@ def _ComputeDiskSizePerVG(disk_template, disks):
 
   # Required free disk space as a function of disk and swap space
   req_size_dict = {
-    constants.DT_DISKLESS: None,
+    constants.DT_DISKLESS: {},
     constants.DT_PLAIN: _compute(disks, 0),
     # 128 MB are added for drbd metadata for each disk
     constants.DT_DRBD8: _compute(disks, 128),
-    constants.DT_FILE: None,
+    constants.DT_FILE: {},
   }
 
   if disk_template not in req_size_dict:
@@ -7691,14 +7737,9 @@ class LUConnectConsole(NoHooksLU):
     # instance and then saving the defaults in the instance itself.
     hvparams = cluster.FillHV(instance)
     beparams = cluster.FillBE(instance)
-    console_cmd = hyper.GetShellCommandForConsole(instance, hvparams, beparams)
-
-    console = objects.InstanceConsole(instance=instance.name,
-                                      kind=constants.CONS_SSH,
-                                      host=node,
-                                      user="root",
-                                      command=console_cmd)
+    console = hyper.GetInstanceConsole(instance, hvparams, beparams)
 
+    assert console.instance == instance.name
     assert console.Validate()
 
     return console.ToDict()
@@ -8604,7 +8645,7 @@ class LUGrowDisk(LogicalUnit):
       # TODO: check the free disk space for file, when that feature
       # will be supported
       _CheckNodesFreeDiskPerVG(self, nodenames,
-                               {self.disk.physical_id[0]: self.op.amount})
+                               self.disk.ComputeGrowth(self.op.amount))
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
@@ -9453,7 +9494,7 @@ class LUSetInstanceParams(LogicalUnit):
     }
 
 
-class LUQueryExports(NoHooksLU):
+class LUBackupQuery(NoHooksLU):
   """Query the exports list
 
   """
@@ -9489,7 +9530,7 @@ class LUQueryExports(NoHooksLU):
     return result
 
 
-class LUPrepareExport(NoHooksLU):
+class LUBackupPrepare(NoHooksLU):
   """Prepares an instance for an export and returns useful information.
 
   """
@@ -9540,7 +9581,7 @@ class LUPrepareExport(NoHooksLU):
     return None
 
 
-class LUExportInstance(LogicalUnit):
+class LUBackupExport(LogicalUnit):
   """Export an instance to an image in the cluster.
 
   """
@@ -9712,7 +9753,7 @@ class LUExportInstance(LogicalUnit):
     nodelist.remove(self.dst_node.name)
 
     # on one-node clusters nodelist will be empty after the removal
-    # if we proceed the backup would be removed because OpQueryExports
+    # if we proceed the backup would be removed because OpBackupQuery
     # substitutes an empty list with the full cluster node list.
     iname = self.instance.name
     if nodelist:
@@ -9828,7 +9869,7 @@ class LUExportInstance(LogicalUnit):
     return fin_resu, dresults
 
 
-class LURemoveExport(NoHooksLU):
+class LUBackupRemove(NoHooksLU):
   """Remove exports related to the named instance.
 
   """
@@ -9934,6 +9975,120 @@ class LUAddGroup(LogicalUnit):
     del self.remove_locks[locking.LEVEL_NODEGROUP]
 
 
+class LUAssignGroupNodes(NoHooksLU):
+  """Logical unit for assigning nodes to groups.
+
+  """
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    # These raise errors.OpPrereqError on their own:
+    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
+    self.op.nodes = _GetWantedNodes(self, self.op.nodes)
+
+    # We want to lock all the affected nodes and groups. We have readily
+    # available the list of nodes, and the *destination* group. To gather the
+    # list of "source" groups, we need to fetch node information.
+    self.node_data = self.cfg.GetAllNodesInfo()
+    affected_groups = set(self.node_data[node].group for node in self.op.nodes)
+    affected_groups.add(self.group_uuid)
+
+    self.needed_locks = {
+      locking.LEVEL_NODEGROUP: list(affected_groups),
+      locking.LEVEL_NODE: self.op.nodes,
+      }
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    self.group = self.cfg.GetNodeGroup(self.group_uuid)
+    instance_data = self.cfg.GetAllInstancesInfo()
+
+    if self.group is None:
+      raise errors.OpExecError("Could not retrieve group '%s' (UUID: %s)" %
+                               (self.op.group_name, self.group_uuid))
+
+    (new_splits, previous_splits) = \
+      self.CheckAssignmentForSplitInstances([(node, self.group_uuid)
+                                             for node in self.op.nodes],
+                                            self.node_data, instance_data)
+
+    if new_splits:
+      fmt_new_splits = utils.CommaJoin(utils.NiceSort(new_splits))
+
+      if not self.op.force:
+        raise errors.OpExecError("The following instances get split by this"
+                                 " change and --force was not given: %s" %
+                                 fmt_new_splits)
+      else:
+        self.LogWarning("This operation will split the following instances: %s",
+                        fmt_new_splits)
+
+        if previous_splits:
+          self.LogWarning("In addition, these already-split instances continue"
+                          " to be spit across groups: %s",
+                          utils.CommaJoin(utils.NiceSort(previous_splits)))
+
+  def Exec(self, feedback_fn):
+    """Assign nodes to a new group.
+
+    """
+    for node in self.op.nodes:
+      self.node_data[node].group = self.group_uuid
+
+    self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes.
+
+  @staticmethod
+  def CheckAssignmentForSplitInstances(changes, node_data, instance_data):
+    """Check for split instances after a node assignment.
+
+    This method considers a series of node assignments as an atomic operation,
+    and returns information about split instances after applying the set of
+    changes.
+
+    In particular, it returns information about newly split instances, and
+    instances that were already split, and remain so after the change.
+
+    Only instances whose disk template is listed in constants.DTS_NET_MIRROR are
+    considered.
+
+    @type changes: list of (node_name, new_group_uuid) pairs.
+    @param changes: list of node assignments to consider.
+    @param node_data: a dict with data for all nodes
+    @param instance_data: a dict with all instances to consider
+    @rtype: a two-tuple
+    @return: a list of instances that were previously okay and result split as a
+      consequence of this change, and a list of instances that were previously
+      split and this change does not fix.
+
+    """
+    changed_nodes = dict((node, group) for node, group in changes
+                         if node_data[node].group != group)
+
+    all_split_instances = set()
+    previously_split_instances = set()
+
+    def InstanceNodes(instance):
+      return [instance.primary_node] + list(instance.secondary_nodes)
+
+    for inst in instance_data.values():
+      if inst.disk_template not in constants.DTS_NET_MIRROR:
+        continue
+
+      instance_nodes = InstanceNodes(inst)
+
+      if len(set(node_data[node].group for node in instance_nodes)) > 1:
+        previously_split_instances.add(inst.name)
+
+      if len(set(changed_nodes.get(node, node_data[node].group)
+                 for node in instance_nodes)) > 1:
+        all_split_instances.add(inst.name)
+
+    return (list(all_split_instances - previously_split_instances),
+            list(previously_split_instances & all_split_instances))
+
+
 class _GroupQuery(_QueryBase):
 
   FIELDS = query.GROUP_FIELDS
@@ -10064,8 +10219,9 @@ class LUSetGroupParams(LogicalUnit):
                                (self.op.group_name, self.group_uuid))
 
     if self.op.ndparams:
+      new_ndparams = _GetUpdatedParams(self.group.ndparams, self.op.ndparams)
       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
-      self.new_ndparams = self.group.SimpleFillND(self.op.ndparams)
+      self.new_ndparams = new_ndparams
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -10645,11 +10801,12 @@ class IAllocator(object):
       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
       # we don't have job IDs
       }
+    ninfo = cfg.GetAllNodesInfo()
     iinfo = cfg.GetAllInstancesInfo().values()
     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
 
     # node data
-    node_list = cfg.GetNodeList()
+    node_list = [n.name for n in ninfo.values() if n.vm_capable]
 
     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
       hypervisor_name = self.hypervisor
@@ -10666,7 +10823,11 @@ class IAllocator(object):
 
     data["nodegroups"] = self._ComputeNodeGroupData(cfg)
 
-    data["nodes"] = self._ComputeNodeData(cfg, node_data, node_iinfo, i_list)
+    config_ndata = self._ComputeBasicNodeData(ninfo)
+    data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
+                                                 i_list, config_ndata)
+    assert len(data["nodes"]) == len(ninfo), \
+        "Incomplete node data computed"
 
     data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
 
@@ -10686,14 +10847,16 @@ class IAllocator(object):
     return ng
 
   @staticmethod
-  def _ComputeNodeData(cfg, node_data, node_iinfo, i_list):
+  def _ComputeBasicNodeData(node_cfg):
     """Compute global node data.
 
+    @rtype: dict
+    @returns: a dict of name: (node dict, node config)
+
     """
     node_results = {}
-    for nname, nresult in node_data.items():
-      # first fill in static (config-based) values
-      ninfo = cfg.GetNodeInfo(nname)
+    for ninfo in node_cfg.values():
+      # fill in static (config-based) values
       pnr = {
         "tags": list(ninfo.GetTags()),
         "primary_ip": ninfo.primary_ip,
@@ -10706,6 +10869,24 @@ class IAllocator(object):
         "vm_capable": ninfo.vm_capable,
         }
 
+      node_results[ninfo.name] = pnr
+
+    return node_results
+
+  @staticmethod
+  def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
+                              node_results):
+    """Compute global node data.
+
+    @param node_results: the basic node structures as filled from the config
+
+    """
+    # make a copy of the current dict
+    node_results = dict(node_results)
+    for nname, nresult in node_data.items():
+      assert nname in node_results, "Missing basic data for node %s" % nname
+      ninfo = node_cfg[nname]
+
       if not (ninfo.offline or ninfo.drained):
         nresult.Raise("Can't get data for node %s" % nname)
         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
@@ -10747,9 +10928,9 @@ class IAllocator(object):
           "i_pri_memory": i_p_mem,
           "i_pri_up_memory": i_p_up_mem,
           }
-        pnr.update(pnr_dyn)
+        pnr_dyn.update(node_results[nname])
 
-      node_results[nname] = pnr
+      node_results[nname] = pnr_dyn
 
     return node_results