Merge 'EvacNode' and 'NodeEvacMode'
[ganeti-local] / lib / masterd / iallocator.py
index 8a04b26..e380db6 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2012 Google Inc.
+# Copyright (C) 2012, 2013 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -25,7 +25,7 @@ from ganeti import compat
 from ganeti import constants
 from ganeti import errors
 from ganeti import ht
-from ganeti import objectutils
+from ganeti import outils
 from ganeti import opcodes
 from ganeti import rpc
 from ganeti import serializer
@@ -40,7 +40,7 @@ _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
    # Class '...' has no 'OP_ID' member
    "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
                         opcodes.OpInstanceMigrate.OP_ID,
-                        opcodes.OpInstanceReplaceDisks.OP_ID])
+                        opcodes.OpInstanceReplaceDisks.OP_ID]),
    })))
 
 _NEVAC_MOVED = \
@@ -58,9 +58,10 @@ _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
                         ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
 
 _INST_NAME = ("name", ht.TNonEmptyString)
+_INST_UUID = ("inst_uuid", ht.TNonEmptyString)
 
 
-class _AutoReqParam(objectutils.AutoSlots):
+class _AutoReqParam(outils.AutoSlots):
   """Meta class for request definitions.
 
   """
@@ -73,7 +74,7 @@ class _AutoReqParam(objectutils.AutoSlots):
     return [slot for (slot, _) in params]
 
 
-class IARequestBase(objectutils.ValidatedSlots):
+class IARequestBase(outils.ValidatedSlots):
   """A generic IAllocator request object.
 
   """
@@ -92,7 +93,7 @@ class IARequestBase(objectutils.ValidatedSlots):
     REQ_PARAMS attribute for this class.
 
     """
-    objectutils.ValidatedSlots.__init__(self, **kwargs)
+    outils.ValidatedSlots.__init__(self, **kwargs)
 
     self.Validate()
 
@@ -130,7 +131,7 @@ class IARequestBase(objectutils.ValidatedSlots):
     @raises ResultValidationError: If validation fails
 
     """
-    if not (ia.success and self.REQ_RESULT(result)):
+    if ia.success and not self.REQ_RESULT(result):
       raise errors.ResultValidationError("iallocator returned invalid result,"
                                          " expected %s, got %s" %
                                          (self.REQ_RESULT, result))
@@ -144,8 +145,8 @@ class IAReqInstanceAlloc(IARequestBase):
   MODE = constants.IALLOCATOR_MODE_ALLOC
   REQ_PARAMS = [
     _INST_NAME,
-    ("memory", ht.TPositiveInt),
-    ("spindle_use", ht.TPositiveInt),
+    ("memory", ht.TNonNegativeInt),
+    ("spindle_use", ht.TNonNegativeInt),
     ("disks", ht.TListOf(ht.TDict)),
     ("disk_template", ht.TString),
     ("os", ht.TString),
@@ -153,6 +154,7 @@ class IAReqInstanceAlloc(IARequestBase):
     ("nics", ht.TListOf(ht.TDict)),
     ("vcpus", ht.TInt),
     ("hypervisor", ht.TString),
+    ("node_whitelist", ht.TMaybeListOf(ht.TNonEmptyString)),
     ]
   REQ_RESULT = ht.TList
 
@@ -195,7 +197,7 @@ class IAReqInstanceAlloc(IARequestBase):
     """
     IARequestBase.ValidateResult(self, ia, result)
 
-    if len(result) != self.RequiredNodes():
+    if ia.success and len(result) != self.RequiredNodes():
       raise errors.ResultValidationError("iallocator returned invalid number"
                                          " of nodes (%s), required %s" %
                                          (len(result), self.RequiredNodes()))
@@ -208,7 +210,7 @@ class IAReqMultiInstanceAlloc(IARequestBase):
   # pylint: disable=E1101
   MODE = constants.IALLOCATOR_MODE_MULTI_ALLOC
   REQ_PARAMS = [
-    ("instances", ht.TListOf(ht.TInstanceOf(IAReqInstanceAlloc)))
+    ("instances", ht.TListOf(ht.TInstanceOf(IAReqInstanceAlloc))),
     ]
   _MASUCCESS = \
     ht.TListOf(ht.TAnd(ht.TIsLength(2),
@@ -216,12 +218,12 @@ class IAReqMultiInstanceAlloc(IARequestBase):
                                   ht.TListOf(ht.TNonEmptyString),
                                   ])))
   _MAFAILED = ht.TListOf(ht.TNonEmptyString)
-  REQ_RESULT = ht.TListOf(ht.TAnd(ht.TIsLength(2),
-                                  ht.TItems([_MASUCCESS, _MAFAILED])))
+  REQ_RESULT = ht.TAnd(ht.TList, ht.TIsLength(2),
+                       ht.TItems([_MASUCCESS, _MAFAILED]))
 
   def GetRequest(self, cfg):
     return {
-      "instances": [iareq.GetRequest(cfg) for iareq in self.instances]
+      "instances": [iareq.GetRequest(cfg) for iareq in self.instances],
       }
 
 
@@ -232,8 +234,8 @@ class IAReqRelocate(IARequestBase):
   # pylint: disable=E1101
   MODE = constants.IALLOCATOR_MODE_RELOC
   REQ_PARAMS = [
-    _INST_NAME,
-    ("relocate_from", _STRING_LIST),
+    _INST_UUID,
+    ("relocate_from_node_uuids", _STRING_LIST),
     ]
   REQ_RESULT = ht.TList
 
@@ -244,10 +246,10 @@ class IAReqRelocate(IARequestBase):
     done.
 
     """
-    instance = cfg.GetInstanceInfo(self.name)
+    instance = cfg.GetInstanceInfo(self.inst_uuid)
     if instance is None:
       raise errors.ProgrammerError("Unknown instance '%s' passed to"
-                                   " IAllocator" % self.name)
+                                   " IAllocator" % self.inst_uuid)
 
     if instance.disk_template not in constants.DTS_MIRRORED:
       raise errors.OpPrereqError("Can't relocate non-mirrored instances",
@@ -262,10 +264,10 @@ class IAReqRelocate(IARequestBase):
     disk_space = gmi.ComputeDiskSize(instance.disk_template, disk_sizes)
 
     return {
-      "name": self.name,
+      "name": instance.name,
       "disk_space_total": disk_space,
       "required_nodes": 1,
-      "relocate_from": self.relocate_from,
+      "relocate_from": cfg.GetNodeNames(self.relocate_from_node_uuids),
       }
 
   def ValidateResult(self, ia, result):
@@ -280,13 +282,14 @@ class IAReqRelocate(IARequestBase):
     fn = compat.partial(self._NodesToGroups, node2group,
                         ia.in_data["nodegroups"])
 
-    instance = ia.cfg.GetInstanceInfo(self.name)
-    request_groups = fn(self.relocate_from + [instance.primary_node])
-    result_groups = fn(result + [instance.primary_node])
+    instance = ia.cfg.GetInstanceInfo(self.inst_uuid)
+    request_groups = fn(ia.cfg.GetNodeNames(self.relocate_from_node_uuids) +
+                        ia.cfg.GetNodeNames([instance.primary_node]))
+    result_groups = fn(result + ia.cfg.GetNodeNames([instance.primary_node]))
 
     if ia.success and not set(result_groups).issubset(request_groups):
       raise errors.ResultValidationError("Groups of nodes returned by"
-                                         "iallocator (%s) differ from original"
+                                         " iallocator (%s) differ from original"
                                          " groups (%s)" %
                                          (utils.CommaJoin(result_groups),
                                           utils.CommaJoin(request_groups)))
@@ -333,7 +336,7 @@ class IAReqNodeEvac(IARequestBase):
   MODE = constants.IALLOCATOR_MODE_NODE_EVAC
   REQ_PARAMS = [
     ("instances", _STRING_LIST),
-    ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
+    ("evac_mode", ht.TEvacMode),
     ]
   REQ_RESULT = _NEVAC_RESULT
 
@@ -396,51 +399,78 @@ class IAllocator(object):
 
     self._BuildInputData(req)
 
+  def _ComputeClusterDataNodeInfo(self, node_list, cluster_info,
+                                   hypervisor_name):
+    """Prepare and execute node info call.
+
+    @type node_list: list of strings
+    @param node_list: list of nodes' UUIDs
+    @type cluster_info: L{objects.Cluster}
+    @param cluster_info: the cluster's information from the config
+    @type hypervisor_name: string
+    @param hypervisor_name: the hypervisor name
+    @rtype: same as the result of the node info RPC call
+    @return: the result of the node info RPC call
+
+    """
+    storage_units_raw = utils.storage.GetStorageUnitsOfCluster(
+        self.cfg, include_spindles=True)
+    storage_units = rpc.PrepareStorageUnitsForNodes(self.cfg, storage_units_raw,
+                                                    node_list)
+    hvspecs = [(hypervisor_name, cluster_info.hvparams[hypervisor_name])]
+    return self.rpc.call_node_info(node_list, storage_units, hvspecs)
+
   def _ComputeClusterData(self):
     """Compute the generic allocator input data.
 
     This is the data that is independent of the actual operation.
 
     """
-    cfg = self.cfg
-    cluster_info = cfg.GetClusterInfo()
+    cluster_info = self.cfg.GetClusterInfo()
     # cluster data
     data = {
       "version": constants.IALLOCATOR_VERSION,
-      "cluster_name": cfg.GetClusterName(),
+      "cluster_name": self.cfg.GetClusterName(),
       "cluster_tags": list(cluster_info.GetTags()),
       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
       "ipolicy": cluster_info.ipolicy,
       }
-    ninfo = cfg.GetAllNodesInfo()
-    iinfo = cfg.GetAllInstancesInfo().values()
+    ninfo = self.cfg.GetAllNodesInfo()
+    iinfo = self.cfg.GetAllInstancesInfo().values()
     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
 
     # node data
-    node_list = [n.name for n in ninfo.values() if n.vm_capable]
+    node_list = [n.uuid for n in ninfo.values() if n.vm_capable]
 
     if isinstance(self.req, IAReqInstanceAlloc):
       hypervisor_name = self.req.hypervisor
+      node_whitelist = self.req.node_whitelist
     elif isinstance(self.req, IAReqRelocate):
-      hypervisor_name = cfg.GetInstanceInfo(self.req.name).hypervisor
+      hypervisor_name = self.cfg.GetInstanceInfo(self.req.inst_uuid).hypervisor
+      node_whitelist = None
     else:
       hypervisor_name = cluster_info.primary_hypervisor
+      node_whitelist = None
+
+    has_lvm = utils.storage.IsLvmEnabled(cluster_info.enabled_disk_templates)
+    node_data = self._ComputeClusterDataNodeInfo(node_list, cluster_info,
+                                                 hypervisor_name)
 
-    node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
-                                        [hypervisor_name])
     node_iinfo = \
       self.rpc.call_all_instances_info(node_list,
-                                       cluster_info.enabled_hypervisors)
+                                       cluster_info.enabled_hypervisors,
+                                       cluster_info.hvparams)
 
-    data["nodegroups"] = self._ComputeNodeGroupData(cfg)
+    data["nodegroups"] = self._ComputeNodeGroupData(self.cfg)
 
-    config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
+    config_ndata = self._ComputeBasicNodeData(self.cfg, ninfo, node_whitelist)
     data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
-                                                 i_list, config_ndata)
+                                                 i_list, config_ndata, has_lvm)
     assert len(data["nodes"]) == len(ninfo), \
         "Incomplete node data computed"
 
-    data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
+    data["instances"] = self._ComputeInstanceData(self.cfg, cluster_info,
+                                                  i_list)
 
     self.in_data = data
 
@@ -453,14 +483,16 @@ class IAllocator(object):
     ng = dict((guuid, {
       "name": gdata.name,
       "alloc_policy": gdata.alloc_policy,
+      "networks": [net_uuid for net_uuid, _ in gdata.networks.items()],
       "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
+      "tags": list(gdata.GetTags()),
       })
       for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
 
     return ng
 
   @staticmethod
-  def _ComputeBasicNodeData(cfg, node_cfg):
+  def _ComputeBasicNodeData(cfg, node_cfg, node_whitelist):
     """Compute global node data.
 
     @rtype: dict
@@ -472,7 +504,9 @@ class IAllocator(object):
       "tags": list(ninfo.GetTags()),
       "primary_ip": ninfo.primary_ip,
       "secondary_ip": ninfo.secondary_ip,
-      "offline": ninfo.offline,
+      "offline": (ninfo.offline or
+                  not (node_whitelist is None or
+                       ninfo.name in node_whitelist)),
       "drained": ninfo.drained,
       "master_candidate": ninfo.master_candidate,
       "group": ninfo.group,
@@ -485,8 +519,104 @@ class IAllocator(object):
     return node_results
 
   @staticmethod
-  def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
-                              node_results):
+  def _GetAttributeFromHypervisorNodeData(hv_info, node_name, attr):
+    """Extract an attribute from the hypervisor's node information.
+
+    This is a helper function to extract data from the hypervisor's information
+    about the node, as part of the result of a node_info query.
+
+    @type hv_info: dict of strings
+    @param hv_info: dictionary of node information from the hypervisor
+    @type node_name: string
+    @param node_name: name of the node
+    @type attr: string
+    @param attr: key of the attribute in the hv_info dictionary
+    @rtype: integer
+    @return: the value of the attribute
+    @raises errors.OpExecError: if key not in dictionary or value not
+      integer
+
+    """
+    if attr not in hv_info:
+      raise errors.OpExecError("Node '%s' didn't return attribute"
+                               " '%s'" % (node_name, attr))
+    value = hv_info[attr]
+    if not isinstance(value, int):
+      raise errors.OpExecError("Node '%s' returned invalid value"
+                               " for '%s': %s" %
+                               (node_name, attr, value))
+    return value
+
+  @staticmethod
+  def _ComputeStorageDataFromSpaceInfo(space_info, node_name, has_lvm):
+    """Extract storage data from node info.
+
+    @type space_info: see result of the RPC call node info
+    @param space_info: the storage reporting part of the result of the RPC call
+      node info
+    @type node_name: string
+    @param node_name: the node's name
+    @type has_lvm: boolean
+    @param has_lvm: whether or not LVM storage information is requested
+    @rtype: 4-tuple of integers
+    @return: tuple of storage info (total_disk, free_disk, total_spindles,
+       free_spindles)
+
+    """
+    # TODO: replace this with proper storage reporting
+    if has_lvm:
+      lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
+         space_info, constants.ST_LVM_VG)
+      if not lvm_vg_info:
+        raise errors.OpExecError("Node '%s' didn't return LVM vg space info."
+                                 % (node_name))
+      total_disk = lvm_vg_info["storage_size"]
+      free_disk = lvm_vg_info["storage_free"]
+      lvm_pv_info = utils.storage.LookupSpaceInfoByStorageType(
+         space_info, constants.ST_LVM_PV)
+      if not lvm_vg_info:
+        raise errors.OpExecError("Node '%s' didn't return LVM pv space info."
+                                 % (node_name))
+      total_spindles = lvm_pv_info["storage_size"]
+      free_spindles = lvm_pv_info["storage_free"]
+    else:
+      # we didn't even ask the node for VG status, so use zeros
+      total_disk = free_disk = 0
+      total_spindles = free_spindles = 0
+    return (total_disk, free_disk, total_spindles, free_spindles)
+
+  @staticmethod
+  def _ComputeInstanceMemory(instance_list, node_instances_info, node_uuid,
+                             input_mem_free):
+    """Compute memory used by primary instances.
+
+    @rtype: tuple (int, int, int)
+    @returns: A tuple of three integers: 1. the sum of memory used by primary
+      instances on the node (including the ones that are currently down), 2.
+      the sum of memory used by primary instances of the node that are up, 3.
+      the amount of memory that is free on the node considering the current
+      usage of the instances.
+
+    """
+    i_p_mem = i_p_up_mem = 0
+    mem_free = input_mem_free
+    for iinfo, beinfo in instance_list:
+      if iinfo.primary_node == node_uuid:
+        i_p_mem += beinfo[constants.BE_MAXMEM]
+        if iinfo.name not in node_instances_info[node_uuid].payload:
+          i_used_mem = 0
+        else:
+          i_used_mem = int(node_instances_info[node_uuid]
+                           .payload[iinfo.name]["memory"])
+        i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
+        mem_free -= max(0, i_mem_diff)
+
+        if iinfo.admin_state == constants.ADMINST_UP:
+          i_p_up_mem += beinfo[constants.BE_MAXMEM]
+    return (i_p_mem, i_p_up_mem, mem_free)
+
+  def _ComputeDynamicNodeData(self, node_cfg, node_data, node_iinfo, i_list,
+                              node_results, has_lvm):
     """Compute global node data.
 
     @param node_results: the basic node structures as filled from the config
@@ -495,58 +625,51 @@ class IAllocator(object):
     #TODO(dynmem): compute the right data on MAX and MIN memory
     # make a copy of the current dict
     node_results = dict(node_results)
-    for nname, nresult in node_data.items():
-      assert nname in node_results, "Missing basic data for node %s" % nname
-      ninfo = node_cfg[nname]
+    for nuuid, nresult in node_data.items():
+      ninfo = node_cfg[nuuid]
+      assert ninfo.name in node_results, "Missing basic data for node %s" % \
+                                         ninfo.name
 
       if not (ninfo.offline or ninfo.drained):
-        nresult.Raise("Can't get data for node %s" % nname)
-        node_iinfo[nname].Raise("Can't get node instance info from node %s" %
-                                nname)
-        remote_info = rpc.MakeLegacyNodeInfo(nresult.payload)
-
-        for attr in ["memory_total", "memory_free", "memory_dom0",
-                     "vg_size", "vg_free", "cpu_total"]:
-          if attr not in remote_info:
-            raise errors.OpExecError("Node '%s' didn't return attribute"
-                                     " '%s'" % (nname, attr))
-          if not isinstance(remote_info[attr], int):
-            raise errors.OpExecError("Node '%s' returned invalid value"
-                                     " for '%s': %s" %
-                                     (nname, attr, remote_info[attr]))
-        # compute memory used by primary instances
-        i_p_mem = i_p_up_mem = 0
-        for iinfo, beinfo in i_list:
-          if iinfo.primary_node == nname:
-            i_p_mem += beinfo[constants.BE_MAXMEM]
-            if iinfo.name not in node_iinfo[nname].payload:
-              i_used_mem = 0
-            else:
-              i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
-            i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
-            remote_info["memory_free"] -= max(0, i_mem_diff)
-
-            if iinfo.admin_state == constants.ADMINST_UP:
-              i_p_up_mem += beinfo[constants.BE_MAXMEM]
+        nresult.Raise("Can't get data for node %s" % ninfo.name)
+        node_iinfo[nuuid].Raise("Can't get node instance info from node %s" %
+                                ninfo.name)
+        (_, space_info, (hv_info, )) = nresult.payload
+
+        mem_free = self._GetAttributeFromHypervisorNodeData(hv_info, ninfo.name,
+                                                            "memory_free")
+
+        (i_p_mem, i_p_up_mem, mem_free) = self._ComputeInstanceMemory(
+             i_list, node_iinfo, nuuid, mem_free)
+        (total_disk, free_disk, total_spindles, free_spindles) = \
+            self._ComputeStorageDataFromSpaceInfo(space_info, ninfo.name,
+                                                  has_lvm)
 
         # compute memory used by instances
         pnr_dyn = {
-          "total_memory": remote_info["memory_total"],
-          "reserved_memory": remote_info["memory_dom0"],
-          "free_memory": remote_info["memory_free"],
-          "total_disk": remote_info["vg_size"],
-          "free_disk": remote_info["vg_free"],
-          "total_cpus": remote_info["cpu_total"],
+          "total_memory": self._GetAttributeFromHypervisorNodeData(
+              hv_info, ninfo.name, "memory_total"),
+          "reserved_memory": self._GetAttributeFromHypervisorNodeData(
+              hv_info, ninfo.name, "memory_dom0"),
+          "free_memory": mem_free,
+          "total_disk": total_disk,
+          "free_disk": free_disk,
+          "total_spindles": total_spindles,
+          "free_spindles": free_spindles,
+          "total_cpus": self._GetAttributeFromHypervisorNodeData(
+              hv_info, ninfo.name, "cpu_total"),
+          "reserved_cpus": self._GetAttributeFromHypervisorNodeData(
+            hv_info, ninfo.name, "cpu_dom0"),
           "i_pri_memory": i_p_mem,
           "i_pri_up_memory": i_p_up_mem,
           }
-        pnr_dyn.update(node_results[nname])
-        node_results[nname] = pnr_dyn
+        pnr_dyn.update(node_results[ninfo.name])
+        node_results[ninfo.name] = pnr_dyn
 
     return node_results
 
   @staticmethod
-  def _ComputeInstanceData(cluster_info, i_list):
+  def _ComputeInstanceData(cfg, cluster_info, i_list):
     """Compute global instance data.
 
     """
@@ -571,12 +694,15 @@ class IAllocator(object):
         "memory": beinfo[constants.BE_MAXMEM],
         "spindle_use": beinfo[constants.BE_SPINDLE_USE],
         "os": iinfo.os,
-        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
+        "nodes": [cfg.GetNodeName(iinfo.primary_node)] +
+                 cfg.GetNodeNames(iinfo.secondary_nodes),
         "nics": nic_data,
         "disks": [{constants.IDISK_SIZE: dsk.size,
-                   constants.IDISK_MODE: dsk.mode}
+                   constants.IDISK_MODE: dsk.mode,
+                   constants.IDISK_SPINDLES: dsk.spindles}
                   for dsk in iinfo.disks],
         "disk_template": iinfo.disk_template,
+        "disks_active": iinfo.disks_active,
         "hypervisor": iinfo.hypervisor,
         }
       pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template,