Revision 0fcd0cad

b/Makefile.am
283 283

  
284 284
masterd_PYTHON = \
285 285
	lib/masterd/__init__.py \
286
	lib/masterd/iallocator.py \
286 287
	lib/masterd/instance.py
287 288

  
288 289
impexpd_PYTHON = \
b/lib/cmdlib.py
48 48
from ganeti import locking
49 49
from ganeti import constants
50 50
from ganeti import objects
51
from ganeti import serializer
52 51
from ganeti import ssconf
53 52
from ganeti import uidpool
54 53
from ganeti import compat
......
14811 14810
    return True
14812 14811

  
14813 14812

  
14814
class IAllocator(object):
14815
  """IAllocator framework.
14816

  
14817
  An IAllocator instance has three sets of attributes:
14818
    - cfg that is needed to query the cluster
14819
    - input data (all members of the _KEYS class attribute are required)
14820
    - four buffer attributes (in|out_data|text), that represent the
14821
      input (to the external script) in text and data structure format,
14822
      and the output from it, again in two formats
14823
    - the result variables from the script (success, info, nodes) for
14824
      easy usage
14825

  
14826
  """
14827
  # pylint: disable=R0902
14828
  # lots of instance attributes
14829

  
14830
  def __init__(self, cfg, rpc_runner, mode, **kwargs):
14831
    self.cfg = cfg
14832
    self.rpc = rpc_runner
14833
    # init buffer variables
14834
    self.in_text = self.out_text = self.in_data = self.out_data = None
14835
    # init all input fields so that pylint is happy
14836
    self.mode = mode
14837
    self.memory = self.disks = self.disk_template = self.spindle_use = None
14838
    self.os = self.tags = self.nics = self.vcpus = None
14839
    self.hypervisor = None
14840
    self.relocate_from = None
14841
    self.name = None
14842
    self.instances = None
14843
    self.evac_mode = None
14844
    self.target_groups = []
14845
    # computed fields
14846
    self.required_nodes = None
14847
    # init result fields
14848
    self.success = self.info = self.result = None
14849

  
14850
    try:
14851
      (fn, keydata, self._result_check) = self._MODE_DATA[self.mode]
14852
    except KeyError:
14853
      raise errors.ProgrammerError("Unknown mode '%s' passed to the"
14854
                                   " IAllocator" % self.mode)
14855

  
14856
    keyset = [n for (n, _) in keydata]
14857

  
14858
    for key in kwargs:
14859
      if key not in keyset:
14860
        raise errors.ProgrammerError("Invalid input parameter '%s' to"
14861
                                     " IAllocator" % key)
14862
      setattr(self, key, kwargs[key])
14863

  
14864
    for key in keyset:
14865
      if key not in kwargs:
14866
        raise errors.ProgrammerError("Missing input parameter '%s' to"
14867
                                     " IAllocator" % key)
14868
    self._BuildInputData(compat.partial(fn, self), keydata)
14869

  
14870
  def _ComputeClusterData(self):
14871
    """Compute the generic allocator input data.
14872

  
14873
    This is the data that is independent of the actual operation.
14874

  
14875
    """
14876
    cfg = self.cfg
14877
    cluster_info = cfg.GetClusterInfo()
14878
    # cluster data
14879
    data = {
14880
      "version": constants.IALLOCATOR_VERSION,
14881
      "cluster_name": cfg.GetClusterName(),
14882
      "cluster_tags": list(cluster_info.GetTags()),
14883
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
14884
      "ipolicy": cluster_info.ipolicy,
14885
      }
14886
    ninfo = cfg.GetAllNodesInfo()
14887
    iinfo = cfg.GetAllInstancesInfo().values()
14888
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
14889

  
14890
    # node data
14891
    node_list = [n.name for n in ninfo.values() if n.vm_capable]
14892

  
14893
    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
14894
      hypervisor_name = self.hypervisor
14895
    elif self.mode == constants.IALLOCATOR_MODE_RELOC:
14896
      hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
14897
    else:
14898
      hypervisor_name = cluster_info.primary_hypervisor
14899

  
14900
    node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
14901
                                        [hypervisor_name])
14902
    node_iinfo = \
14903
      self.rpc.call_all_instances_info(node_list,
14904
                                       cluster_info.enabled_hypervisors)
14905

  
14906
    data["nodegroups"] = self._ComputeNodeGroupData(cfg)
14907

  
14908
    config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
14909
    data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
14910
                                                 i_list, config_ndata)
14911
    assert len(data["nodes"]) == len(ninfo), \
14912
        "Incomplete node data computed"
14913

  
14914
    data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
14915

  
14916
    self.in_data = data
14917

  
14918
  @staticmethod
14919
  def _ComputeNodeGroupData(cfg):
14920
    """Compute node groups data.
14921

  
14922
    """
14923
    cluster = cfg.GetClusterInfo()
14924
    ng = dict((guuid, {
14925
      "name": gdata.name,
14926
      "alloc_policy": gdata.alloc_policy,
14927
      "ipolicy": _CalculateGroupIPolicy(cluster, gdata),
14928
      })
14929
      for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
14930

  
14931
    return ng
14932

  
14933
  @staticmethod
14934
  def _ComputeBasicNodeData(cfg, node_cfg):
14935
    """Compute global node data.
14936

  
14937
    @rtype: dict
14938
    @returns: a dict of name: (node dict, node config)
14939

  
14940
    """
14941
    # fill in static (config-based) values
14942
    node_results = dict((ninfo.name, {
14943
      "tags": list(ninfo.GetTags()),
14944
      "primary_ip": ninfo.primary_ip,
14945
      "secondary_ip": ninfo.secondary_ip,
14946
      "offline": ninfo.offline,
14947
      "drained": ninfo.drained,
14948
      "master_candidate": ninfo.master_candidate,
14949
      "group": ninfo.group,
14950
      "master_capable": ninfo.master_capable,
14951
      "vm_capable": ninfo.vm_capable,
14952
      "ndparams": cfg.GetNdParams(ninfo),
14953
      })
14954
      for ninfo in node_cfg.values())
14955

  
14956
    return node_results
14957

  
14958
  @staticmethod
14959
  def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
14960
                              node_results):
14961
    """Compute global node data.
14962

  
14963
    @param node_results: the basic node structures as filled from the config
14964

  
14965
    """
14966
    #TODO(dynmem): compute the right data on MAX and MIN memory
14967
    # make a copy of the current dict
14968
    node_results = dict(node_results)
14969
    for nname, nresult in node_data.items():
14970
      assert nname in node_results, "Missing basic data for node %s" % nname
14971
      ninfo = node_cfg[nname]
14972

  
14973
      if not (ninfo.offline or ninfo.drained):
14974
        nresult.Raise("Can't get data for node %s" % nname)
14975
        node_iinfo[nname].Raise("Can't get node instance info from node %s" %
14976
                                nname)
14977
        remote_info = _MakeLegacyNodeInfo(nresult.payload)
14978

  
14979
        for attr in ["memory_total", "memory_free", "memory_dom0",
14980
                     "vg_size", "vg_free", "cpu_total"]:
14981
          if attr not in remote_info:
14982
            raise errors.OpExecError("Node '%s' didn't return attribute"
14983
                                     " '%s'" % (nname, attr))
14984
          if not isinstance(remote_info[attr], int):
14985
            raise errors.OpExecError("Node '%s' returned invalid value"
14986
                                     " for '%s': %s" %
14987
                                     (nname, attr, remote_info[attr]))
14988
        # compute memory used by primary instances
14989
        i_p_mem = i_p_up_mem = 0
14990
        for iinfo, beinfo in i_list:
14991
          if iinfo.primary_node == nname:
14992
            i_p_mem += beinfo[constants.BE_MAXMEM]
14993
            if iinfo.name not in node_iinfo[nname].payload:
14994
              i_used_mem = 0
14995
            else:
14996
              i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
14997
            i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
14998
            remote_info["memory_free"] -= max(0, i_mem_diff)
14999

  
15000
            if iinfo.admin_state == constants.ADMINST_UP:
15001
              i_p_up_mem += beinfo[constants.BE_MAXMEM]
15002

  
15003
        # compute memory used by instances
15004
        pnr_dyn = {
15005
          "total_memory": remote_info["memory_total"],
15006
          "reserved_memory": remote_info["memory_dom0"],
15007
          "free_memory": remote_info["memory_free"],
15008
          "total_disk": remote_info["vg_size"],
15009
          "free_disk": remote_info["vg_free"],
15010
          "total_cpus": remote_info["cpu_total"],
15011
          "i_pri_memory": i_p_mem,
15012
          "i_pri_up_memory": i_p_up_mem,
15013
          }
15014
        pnr_dyn.update(node_results[nname])
15015
        node_results[nname] = pnr_dyn
15016

  
15017
    return node_results
15018

  
15019
  @staticmethod
15020
  def _ComputeInstanceData(cluster_info, i_list):
15021
    """Compute global instance data.
15022

  
15023
    """
15024
    instance_data = {}
15025
    for iinfo, beinfo in i_list:
15026
      nic_data = []
15027
      for nic in iinfo.nics:
15028
        filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
15029
        nic_dict = {
15030
          "mac": nic.mac,
15031
          "ip": nic.ip,
15032
          "mode": filled_params[constants.NIC_MODE],
15033
          "link": filled_params[constants.NIC_LINK],
15034
          }
15035
        if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
15036
          nic_dict["bridge"] = filled_params[constants.NIC_LINK]
15037
        nic_data.append(nic_dict)
15038
      pir = {
15039
        "tags": list(iinfo.GetTags()),
15040
        "admin_state": iinfo.admin_state,
15041
        "vcpus": beinfo[constants.BE_VCPUS],
15042
        "memory": beinfo[constants.BE_MAXMEM],
15043
        "spindle_use": beinfo[constants.BE_SPINDLE_USE],
15044
        "os": iinfo.os,
15045
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
15046
        "nics": nic_data,
15047
        "disks": [{constants.IDISK_SIZE: dsk.size,
15048
                   constants.IDISK_MODE: dsk.mode}
15049
                  for dsk in iinfo.disks],
15050
        "disk_template": iinfo.disk_template,
15051
        "hypervisor": iinfo.hypervisor,
15052
        }
15053
      pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
15054
                                                 pir["disks"])
15055
      instance_data[iinfo.name] = pir
15056

  
15057
    return instance_data
15058

  
15059
  def _AddNewInstance(self):
15060
    """Add new instance data to allocator structure.
15061

  
15062
    This in combination with _AllocatorGetClusterData will create the
15063
    correct structure needed as input for the allocator.
15064

  
15065
    The checks for the completeness of the opcode must have already been
15066
    done.
15067

  
15068
    """
15069
    disk_space = _ComputeDiskSize(self.disk_template, self.disks)
15070

  
15071
    if self.disk_template in constants.DTS_INT_MIRROR:
15072
      self.required_nodes = 2
15073
    else:
15074
      self.required_nodes = 1
15075

  
15076
    request = {
15077
      "name": self.name,
15078
      "disk_template": self.disk_template,
15079
      "tags": self.tags,
15080
      "os": self.os,
15081
      "vcpus": self.vcpus,
15082
      "memory": self.memory,
15083
      "spindle_use": self.spindle_use,
15084
      "disks": self.disks,
15085
      "disk_space_total": disk_space,
15086
      "nics": self.nics,
15087
      "required_nodes": self.required_nodes,
15088
      "hypervisor": self.hypervisor,
15089
      }
15090

  
15091
    return request
15092

  
15093
  def _AddRelocateInstance(self):
15094
    """Add relocate instance data to allocator structure.
15095

  
15096
    This in combination with _IAllocatorGetClusterData will create the
15097
    correct structure needed as input for the allocator.
15098

  
15099
    The checks for the completeness of the opcode must have already been
15100
    done.
15101

  
15102
    """
15103
    instance = self.cfg.GetInstanceInfo(self.name)
15104
    if instance is None:
15105
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
15106
                                   " IAllocator" % self.name)
15107

  
15108
    if instance.disk_template not in constants.DTS_MIRRORED:
15109
      raise errors.OpPrereqError("Can't relocate non-mirrored instances",
15110
                                 errors.ECODE_INVAL)
15111

  
15112
    if instance.disk_template in constants.DTS_INT_MIRROR and \
15113
        len(instance.secondary_nodes) != 1:
15114
      raise errors.OpPrereqError("Instance has not exactly one secondary node",
15115
                                 errors.ECODE_STATE)
15116

  
15117
    self.required_nodes = 1
15118
    disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
15119
    disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
15120

  
15121
    request = {
15122
      "name": self.name,
15123
      "disk_space_total": disk_space,
15124
      "required_nodes": self.required_nodes,
15125
      "relocate_from": self.relocate_from,
15126
      }
15127
    return request
15128

  
15129
  def _AddNodeEvacuate(self):
15130
    """Get data for node-evacuate requests.
15131

  
15132
    """
15133
    return {
15134
      "instances": self.instances,
15135
      "evac_mode": self.evac_mode,
15136
      }
15137

  
15138
  def _AddChangeGroup(self):
15139
    """Get data for node-evacuate requests.
15140

  
15141
    """
15142
    return {
15143
      "instances": self.instances,
15144
      "target_groups": self.target_groups,
15145
      }
15146

  
15147
  def _BuildInputData(self, fn, keydata):
15148
    """Build input data structures.
15149

  
15150
    """
15151
    self._ComputeClusterData()
15152

  
15153
    request = fn()
15154
    request["type"] = self.mode
15155
    for keyname, keytype in keydata:
15156
      if keyname not in request:
15157
        raise errors.ProgrammerError("Request parameter %s is missing" %
15158
                                     keyname)
15159
      val = request[keyname]
15160
      if not keytype(val):
15161
        raise errors.ProgrammerError("Request parameter %s doesn't pass"
15162
                                     " validation, value %s, expected"
15163
                                     " type %s" % (keyname, val, keytype))
15164
    self.in_data["request"] = request
15165

  
15166
    self.in_text = serializer.Dump(self.in_data)
15167

  
15168
  _STRING_LIST = ht.TListOf(ht.TString)
15169
  _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
15170
     # pylint: disable=E1101
15171
     # Class '...' has no 'OP_ID' member
15172
     "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
15173
                          opcodes.OpInstanceMigrate.OP_ID,
15174
                          opcodes.OpInstanceReplaceDisks.OP_ID])
15175
     })))
15176

  
15177
  _NEVAC_MOVED = \
15178
    ht.TListOf(ht.TAnd(ht.TIsLength(3),
15179
                       ht.TItems([ht.TNonEmptyString,
15180
                                  ht.TNonEmptyString,
15181
                                  ht.TListOf(ht.TNonEmptyString),
15182
                                  ])))
15183
  _NEVAC_FAILED = \
15184
    ht.TListOf(ht.TAnd(ht.TIsLength(2),
15185
                       ht.TItems([ht.TNonEmptyString,
15186
                                  ht.TMaybeString,
15187
                                  ])))
15188
  _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
15189
                          ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
15190

  
15191
  _MODE_DATA = {
15192
    constants.IALLOCATOR_MODE_ALLOC:
15193
      (_AddNewInstance,
15194
       [
15195
        ("name", ht.TString),
15196
        ("memory", ht.TInt),
15197
        ("spindle_use", ht.TInt),
15198
        ("disks", ht.TListOf(ht.TDict)),
15199
        ("disk_template", ht.TString),
15200
        ("os", ht.TString),
15201
        ("tags", _STRING_LIST),
15202
        ("nics", ht.TListOf(ht.TDict)),
15203
        ("vcpus", ht.TInt),
15204
        ("hypervisor", ht.TString),
15205
        ], ht.TList),
15206
    constants.IALLOCATOR_MODE_RELOC:
15207
      (_AddRelocateInstance,
15208
       [("name", ht.TString), ("relocate_from", _STRING_LIST)],
15209
       ht.TList),
15210
     constants.IALLOCATOR_MODE_NODE_EVAC:
15211
      (_AddNodeEvacuate, [
15212
        ("instances", _STRING_LIST),
15213
        ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
15214
        ], _NEVAC_RESULT),
15215
     constants.IALLOCATOR_MODE_CHG_GROUP:
15216
      (_AddChangeGroup, [
15217
        ("instances", _STRING_LIST),
15218
        ("target_groups", _STRING_LIST),
15219
        ], _NEVAC_RESULT),
15220
    }
15221

  
15222
  def Run(self, name, validate=True, call_fn=None):
15223
    """Run an instance allocator and return the results.
15224

  
15225
    """
15226
    if call_fn is None:
15227
      call_fn = self.rpc.call_iallocator_runner
15228

  
15229
    result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
15230
    result.Raise("Failure while running the iallocator script")
15231

  
15232
    self.out_text = result.payload
15233
    if validate:
15234
      self._ValidateResult()
15235

  
15236
  def _ValidateResult(self):
15237
    """Process the allocator results.
15238

  
15239
    This will process and if successful save the result in
15240
    self.out_data and the other parameters.
15241

  
15242
    """
15243
    try:
15244
      rdict = serializer.Load(self.out_text)
15245
    except Exception, err:
15246
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
15247

  
15248
    if not isinstance(rdict, dict):
15249
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
15250

  
15251
    # TODO: remove backwards compatiblity in later versions
15252
    if "nodes" in rdict and "result" not in rdict:
15253
      rdict["result"] = rdict["nodes"]
15254
      del rdict["nodes"]
15255

  
15256
    for key in "success", "info", "result":
15257
      if key not in rdict:
15258
        raise errors.OpExecError("Can't parse iallocator results:"
15259
                                 " missing key '%s'" % key)
15260
      setattr(self, key, rdict[key])
15261

  
15262
    if not self._result_check(self.result):
15263
      raise errors.OpExecError("Iallocator returned invalid result,"
15264
                               " expected %s, got %s" %
15265
                               (self._result_check, self.result),
15266
                               errors.ECODE_INVAL)
15267

  
15268
    if self.mode == constants.IALLOCATOR_MODE_RELOC:
15269
      assert self.relocate_from is not None
15270
      assert self.required_nodes == 1
15271

  
15272
      node2group = dict((name, ndata["group"])
15273
                        for (name, ndata) in self.in_data["nodes"].items())
15274

  
15275
      fn = compat.partial(self._NodesToGroups, node2group,
15276
                          self.in_data["nodegroups"])
15277

  
15278
      instance = self.cfg.GetInstanceInfo(self.name)
15279
      request_groups = fn(self.relocate_from + [instance.primary_node])
15280
      result_groups = fn(rdict["result"] + [instance.primary_node])
15281

  
15282
      if self.success and not set(result_groups).issubset(request_groups):
15283
        raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
15284
                                 " differ from original groups (%s)" %
15285
                                 (utils.CommaJoin(result_groups),
15286
                                  utils.CommaJoin(request_groups)))
15287

  
15288
    elif self.mode == constants.IALLOCATOR_MODE_NODE_EVAC:
15289
      assert self.evac_mode in constants.IALLOCATOR_NEVAC_MODES
15290

  
15291
    self.out_data = rdict
15292

  
15293
  @staticmethod
15294
  def _NodesToGroups(node2group, groups, nodes):
15295
    """Returns a list of unique group names for a list of nodes.
15296

  
15297
    @type node2group: dict
15298
    @param node2group: Map from node name to group UUID
15299
    @type groups: dict
15300
    @param groups: Group information
15301
    @type nodes: list
15302
    @param nodes: Node names
15303

  
15304
    """
15305
    result = set()
15306

  
15307
    for node in nodes:
15308
      try:
15309
        group_uuid = node2group[node]
15310
      except KeyError:
15311
        # Ignore unknown node
15312
        pass
15313
      else:
15314
        try:
15315
          group = groups[group_uuid]
15316
        except KeyError:
15317
          # Can't find group, let's use UUID
15318
          group_name = group_uuid
15319
        else:
15320
          group_name = group["name"]
15321

  
15322
        result.add(group_name)
15323

  
15324
    return sorted(result)
15325

  
15326

  
15327 14813
class LUTestAllocator(NoHooksLU):
15328 14814
  """Run allocator tests.
15329 14815

  
b/lib/masterd/iallocator.py
1
#
2
#
3

  
4
# Copyright (C) 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Module implementing the iallocator code."""
23

  
24
from ganeti import compat
25
from ganeti import constants
26
from ganeti import errors
27
from ganeti import ht
28
from ganeti import objectutils
29
from ganeti import opcodes
30
from ganeti import rpc
31
from ganeti import serializer
32
from ganeti import utils
33

  
34
import ganeti.masterd.instance as gmi
35

  
36

  
37
_STRING_LIST = ht.TListOf(ht.TString)
38
_JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
39
   # pylint: disable=E1101
40
   # Class '...' has no 'OP_ID' member
41
   "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
42
                        opcodes.OpInstanceMigrate.OP_ID,
43
                        opcodes.OpInstanceReplaceDisks.OP_ID])
44
   })))
45

  
46
_NEVAC_MOVED = \
47
  ht.TListOf(ht.TAnd(ht.TIsLength(3),
48
                     ht.TItems([ht.TNonEmptyString,
49
                                ht.TNonEmptyString,
50
                                ht.TListOf(ht.TNonEmptyString),
51
                                ])))
52
_NEVAC_FAILED = \
53
  ht.TListOf(ht.TAnd(ht.TIsLength(2),
54
                     ht.TItems([ht.TNonEmptyString,
55
                                ht.TMaybeString,
56
                                ])))
57
_NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
58
                        ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
59

  
60

  
61
class _AutoReqParam(objectutils.AutoSlots):
62
  """Meta class for request definitions.
63

  
64
  """
65
  @classmethod
66
  def _GetSlots(mcs, attrs):
67
    """Extract the slots out of REQ_PARAMS.
68

  
69
    """
70
    params = attrs.setdefault("REQ_PARAMS", [])
71
    return [slot for (slot, _) in params]
72

  
73

  
74
class IARequestBase(objectutils.ValidatedSlots):
75
  """A generic IAllocator request object.
76

  
77
  """
78
  __metaclass__ = _AutoReqParam
79

  
80
  MODE = NotImplemented
81
  REQ_PARAMS = [
82
    ("required_nodes", ht.TPositiveInt)
83
    ]
84
  REQ_RESULT = NotImplemented
85

  
86
  def __init__(self, **kwargs):
87
    """Constructor for IARequestBase.
88

  
89
    The constructor takes only keyword arguments and will set
90
    attributes on this object based on the passed arguments. As such,
91
    it means that you should not pass arguments which are not in the
92
    REQ_PARAMS attribute for this class.
93

  
94
    """
95
    self.required_nodes = 0
96
    objectutils.ValidatedSlots.__init__(self, **kwargs)
97

  
98
    self.Validate()
99

  
100
  def Validate(self):
101
    """Validates all parameters of the request.
102

  
103
    """
104
    assert self.MODE in constants.VALID_IALLOCATOR_MODES
105

  
106
    for (param, validator) in self.REQ_PARAMS:
107
      if not hasattr(self, param):
108
        raise errors.OpPrereqError("Request is missing '%s' parameter" % param,
109
                                   errors.ECODE_INVAL)
110

  
111
      value = getattr(self, param)
112
      if not validator(value):
113
        raise errors.OpPrereqError(("Request parameter '%s' has invalid"
114
                                    " type %s/value %s") %
115
                                    (param, type(value), value),
116
                                    errors.ECODE_INVAL)
117

  
118
  def GetRequest(self, cfg):
119
    """Gets the request data dict.
120

  
121
    @param cfg: The configuration instance
122

  
123
    """
124
    raise NotImplementedError
125

  
126
  def ValidateResult(self, ia, result):
127
    """Validates the result of an request.
128

  
129
    @param ia: The IAllocator instance
130
    @param result: The IAllocator run result
131
    @returns: If it validates
132

  
133
    """
134
    if not (ia.success and self.REQ_RESULT(result)):
135
      raise errors.OpExecError("Iallocator returned invalid result,"
136
                               " expected %s, got %s" %
137
                               (self.REQ_RESULT, result))
138

  
139

  
140
class IAReqInstanceAlloc(IARequestBase):
141
  """An instance allocation request.
142

  
143
  """
144
  MODE = constants.IALLOCATOR_MODE_ALLOC
145
  REQ_PARAMS = [
146
    ("name", ht.TString),
147
    ("memory", ht.TInt),
148
    ("spindle_use", ht.TInt),
149
    ("disks", ht.TListOf(ht.TDict)),
150
    ("disk_template", ht.TString),
151
    ("os", ht.TString),
152
    ("tags", _STRING_LIST),
153
    ("nics", ht.TListOf(ht.TDict)),
154
    ("vcpus", ht.TInt),
155
    ("hypervisor", ht.TString),
156
    ]
157
  REQ_RESULT = ht.TList
158

  
159
  def GetRequest(self, cfg):
160
    """Requests a new instance.
161

  
162
    The checks for the completeness of the opcode must have already been
163
    done.
164

  
165
    """
166
    disk_space = gmi.ComputeDiskSize(self.disk_template, self.disks)
167

  
168
    if self.disk_template in constants.DTS_INT_MIRROR:
169
      self.required_nodes = 2
170
    else:
171
      self.required_nodes = 1
172

  
173
    return {
174
      "name": self.name,
175
      "disk_template": self.disk_template,
176
      "tags": self.tags,
177
      "os": self.os,
178
      "vcpus": self.vcpus,
179
      "memory": self.memory,
180
      "spindle_use": self.spindle_use,
181
      "disks": self.disks,
182
      "disk_space_total": disk_space,
183
      "nics": self.nics,
184
      "required_nodes": self.required_nodes,
185
      "hypervisor": self.hypervisor,
186
      }
187

  
188

  
189
class IAReqRelocate(IARequestBase):
190
  """A relocation request.
191

  
192
  """
193
  MODE = constants.IALLOCATOR_MODE_RELOC
194
  REQ_PARAMS = [
195
    ("name", ht.TString),
196
    ("relocate_from", _STRING_LIST),
197
    ]
198
  REQ_RESULT = ht.TList
199

  
200
  def GetRequest(self, cfg):
201
    """Request an relocation of an instance
202

  
203
    The checks for the completeness of the opcode must have already been
204
    done.
205

  
206
    """
207
    instance = cfg.GetInstanceInfo(self.name)
208
    if instance is None:
209
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
210
                                   " IAllocator" % self.name)
211

  
212
    if instance.disk_template not in constants.DTS_MIRRORED:
213
      raise errors.OpPrereqError("Can't relocate non-mirrored instances",
214
                                 errors.ECODE_INVAL)
215

  
216
    if instance.disk_template in constants.DTS_INT_MIRROR and \
217
        len(instance.secondary_nodes) != 1:
218
      raise errors.OpPrereqError("Instance has not exactly one secondary node",
219
                                 errors.ECODE_STATE)
220

  
221
    self.required_nodes = 1
222
    disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
223
    disk_space = gmi.ComputeDiskSize(instance.disk_template, disk_sizes)
224

  
225
    return {
226
      "name": self.name,
227
      "disk_space_total": disk_space,
228
      "required_nodes": self.required_nodes,
229
      "relocate_from": self.relocate_from,
230
      }
231

  
232
  def ValidateResult(self, ia, result):
233
    """Validates the result of an relocation request.
234

  
235
    """
236
    node2group = dict((name, ndata["group"])
237
                      for (name, ndata) in ia.in_data["nodes"].items())
238

  
239
    fn = compat.partial(self._NodesToGroups, node2group,
240
                        ia.in_data["nodegroups"])
241

  
242
    instance = ia.cfg.GetInstanceInfo(self.name)
243
    request_groups = fn(self.relocate_from + [instance.primary_node])
244
    result_groups = fn(result + [instance.primary_node])
245

  
246
    if ia.success and not set(result_groups).issubset(request_groups):
247
      raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
248
                               " differ from original groups (%s)" %
249
                               (utils.CommaJoin(result_groups),
250
                                utils.CommaJoin(request_groups)))
251

  
252
  @staticmethod
253
  def _NodesToGroups(node2group, groups, nodes):
254
    """Returns a list of unique group names for a list of nodes.
255

  
256
    @type node2group: dict
257
    @param node2group: Map from node name to group UUID
258
    @type groups: dict
259
    @param groups: Group information
260
    @type nodes: list
261
    @param nodes: Node names
262

  
263
    """
264
    result = set()
265

  
266
    for node in nodes:
267
      try:
268
        group_uuid = node2group[node]
269
      except KeyError:
270
        # Ignore unknown node
271
        pass
272
      else:
273
        try:
274
          group = groups[group_uuid]
275
        except KeyError:
276
          # Can't find group, let's use UUID
277
          group_name = group_uuid
278
        else:
279
          group_name = group["name"]
280

  
281
        result.add(group_name)
282

  
283
    return sorted(result)
284

  
285

  
286
class IAReqNodeEvac(IARequestBase):
287
  """A node evacuation request.
288

  
289
  """
290
  MODE = constants.IALLOCATOR_MODE_NODE_EVAC
291
  REQ_PARAMS = [
292
    ("instances", _STRING_LIST),
293
    ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
294
    ]
295
  REQ_RESULT = _NEVAC_RESULT
296

  
297
  def GetRequest(self, cfg):
298
    """Get data for node-evacuate requests.
299

  
300
    """
301
    return {
302
      "instances": self.instances,
303
      "evac_mode": self.evac_mode,
304
      }
305

  
306

  
307
class IAReqGroupChange(IARequestBase):
308
  """A group change request.
309

  
310
  """
311
  MODE = constants.IALLOCATOR_MODE_CHG_GROUP
312
  REQ_PARAMS = [
313
    ("instances", _STRING_LIST),
314
    ("target_groups", _STRING_LIST),
315
    ]
316
  REQ_RESULT = _NEVAC_RESULT
317

  
318
  def GetRequest(self, cfg):
319
    """Get data for node-evacuate requests.
320

  
321
    """
322
    return {
323
      "instances": self.instances,
324
      "target_groups": self.target_groups,
325
      }
326

  
327

  
328
class IAllocator(object):
329
  """IAllocator framework.
330

  
331
  An IAllocator instance has three sets of attributes:
332
    - cfg that is needed to query the cluster
333
    - input data (all members of the _KEYS class attribute are required)
334
    - four buffer attributes (in|out_data|text), that represent the
335
      input (to the external script) in text and data structure format,
336
      and the output from it, again in two formats
337
    - the result variables from the script (success, info, nodes) for
338
      easy usage
339

  
340
  """
341
  # pylint: disable=R0902
342
  # lots of instance attributes
343

  
344
  def __init__(self, cfg, rpc_runner, req):
345
    self.cfg = cfg
346
    self.rpc = rpc_runner
347
    self.req = req
348
    # init buffer variables
349
    self.in_text = self.out_text = self.in_data = self.out_data = None
350
    # init result fields
351
    self.success = self.info = self.result = None
352

  
353
    self._BuildInputData(req)
354

  
355
  def _ComputeClusterData(self):
356
    """Compute the generic allocator input data.
357

  
358
    This is the data that is independent of the actual operation.
359

  
360
    """
361
    cfg = self.cfg
362
    cluster_info = cfg.GetClusterInfo()
363
    # cluster data
364
    data = {
365
      "version": constants.IALLOCATOR_VERSION,
366
      "cluster_name": cfg.GetClusterName(),
367
      "cluster_tags": list(cluster_info.GetTags()),
368
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
369
      "ipolicy": cluster_info.ipolicy,
370
      }
371
    ninfo = cfg.GetAllNodesInfo()
372
    iinfo = cfg.GetAllInstancesInfo().values()
373
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
374

  
375
    # node data
376
    node_list = [n.name for n in ninfo.values() if n.vm_capable]
377

  
378
    if isinstance(self.req, IAReqInstanceAlloc):
379
      hypervisor_name = self.req.hypervisor
380
    elif isinstance(self.req, IAReqRelocate):
381
      hypervisor_name = cfg.GetInstanceInfo(self.req.name).hypervisor
382
    else:
383
      hypervisor_name = cluster_info.primary_hypervisor
384

  
385
    node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
386
                                        [hypervisor_name])
387
    node_iinfo = \
388
      self.rpc.call_all_instances_info(node_list,
389
                                       cluster_info.enabled_hypervisors)
390

  
391
    data["nodegroups"] = self._ComputeNodeGroupData(cfg)
392

  
393
    config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
394
    data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
395
                                                 i_list, config_ndata)
396
    assert len(data["nodes"]) == len(ninfo), \
397
        "Incomplete node data computed"
398

  
399
    data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
400

  
401
    self.in_data = data
402

  
403
  @staticmethod
404
  def _ComputeNodeGroupData(cfg):
405
    """Compute node groups data.
406

  
407
    """
408
    cluster = cfg.GetClusterInfo()
409
    ng = dict((guuid, {
410
      "name": gdata.name,
411
      "alloc_policy": gdata.alloc_policy,
412
      "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
413
      })
414
      for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
415

  
416
    return ng
417

  
418
  @staticmethod
419
  def _ComputeBasicNodeData(cfg, node_cfg):
420
    """Compute global node data.
421

  
422
    @rtype: dict
423
    @returns: a dict of name: (node dict, node config)
424

  
425
    """
426
    # fill in static (config-based) values
427
    node_results = dict((ninfo.name, {
428
      "tags": list(ninfo.GetTags()),
429
      "primary_ip": ninfo.primary_ip,
430
      "secondary_ip": ninfo.secondary_ip,
431
      "offline": ninfo.offline,
432
      "drained": ninfo.drained,
433
      "master_candidate": ninfo.master_candidate,
434
      "group": ninfo.group,
435
      "master_capable": ninfo.master_capable,
436
      "vm_capable": ninfo.vm_capable,
437
      "ndparams": cfg.GetNdParams(ninfo),
438
      })
439
      for ninfo in node_cfg.values())
440

  
441
    return node_results
442

  
443
  @staticmethod
444
  def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
445
                              node_results):
446
    """Compute global node data.
447

  
448
    @param node_results: the basic node structures as filled from the config
449

  
450
    """
451
    #TODO(dynmem): compute the right data on MAX and MIN memory
452
    # make a copy of the current dict
453
    node_results = dict(node_results)
454
    for nname, nresult in node_data.items():
455
      assert nname in node_results, "Missing basic data for node %s" % nname
456
      ninfo = node_cfg[nname]
457

  
458
      if not (ninfo.offline or ninfo.drained):
459
        nresult.Raise("Can't get data for node %s" % nname)
460
        node_iinfo[nname].Raise("Can't get node instance info from node %s" %
461
                                nname)
462
        remote_info = rpc.MakeLegacyNodeInfo(nresult.payload)
463

  
464
        for attr in ["memory_total", "memory_free", "memory_dom0",
465
                     "vg_size", "vg_free", "cpu_total"]:
466
          if attr not in remote_info:
467
            raise errors.OpExecError("Node '%s' didn't return attribute"
468
                                     " '%s'" % (nname, attr))
469
          if not isinstance(remote_info[attr], int):
470
            raise errors.OpExecError("Node '%s' returned invalid value"
471
                                     " for '%s': %s" %
472
                                     (nname, attr, remote_info[attr]))
473
        # compute memory used by primary instances
474
        i_p_mem = i_p_up_mem = 0
475
        for iinfo, beinfo in i_list:
476
          if iinfo.primary_node == nname:
477
            i_p_mem += beinfo[constants.BE_MAXMEM]
478
            if iinfo.name not in node_iinfo[nname].payload:
479
              i_used_mem = 0
480
            else:
481
              i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
482
            i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
483
            remote_info["memory_free"] -= max(0, i_mem_diff)
484

  
485
            if iinfo.admin_state == constants.ADMINST_UP:
486
              i_p_up_mem += beinfo[constants.BE_MAXMEM]
487

  
488
        # compute memory used by instances
489
        pnr_dyn = {
490
          "total_memory": remote_info["memory_total"],
491
          "reserved_memory": remote_info["memory_dom0"],
492
          "free_memory": remote_info["memory_free"],
493
          "total_disk": remote_info["vg_size"],
494
          "free_disk": remote_info["vg_free"],
495
          "total_cpus": remote_info["cpu_total"],
496
          "i_pri_memory": i_p_mem,
497
          "i_pri_up_memory": i_p_up_mem,
498
          }
499
        pnr_dyn.update(node_results[nname])
500
        node_results[nname] = pnr_dyn
501

  
502
    return node_results
503

  
504
  @staticmethod
505
  def _ComputeInstanceData(cluster_info, i_list):
506
    """Compute global instance data.
507

  
508
    """
509
    instance_data = {}
510
    for iinfo, beinfo in i_list:
511
      nic_data = []
512
      for nic in iinfo.nics:
513
        filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
514
        nic_dict = {
515
          "mac": nic.mac,
516
          "ip": nic.ip,
517
          "mode": filled_params[constants.NIC_MODE],
518
          "link": filled_params[constants.NIC_LINK],
519
          }
520
        if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
521
          nic_dict["bridge"] = filled_params[constants.NIC_LINK]
522
        nic_data.append(nic_dict)
523
      pir = {
524
        "tags": list(iinfo.GetTags()),
525
        "admin_state": iinfo.admin_state,
526
        "vcpus": beinfo[constants.BE_VCPUS],
527
        "memory": beinfo[constants.BE_MAXMEM],
528
        "spindle_use": beinfo[constants.BE_SPINDLE_USE],
529
        "os": iinfo.os,
530
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
531
        "nics": nic_data,
532
        "disks": [{constants.IDISK_SIZE: dsk.size,
533
                   constants.IDISK_MODE: dsk.mode}
534
                  for dsk in iinfo.disks],
535
        "disk_template": iinfo.disk_template,
536
        "hypervisor": iinfo.hypervisor,
537
        }
538
      pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template,
539
                                                    pir["disks"])
540
      instance_data[iinfo.name] = pir
541

  
542
    return instance_data
543

  
544
  def _BuildInputData(self, req):
545
    """Build input data structures.
546

  
547
    """
548
    self._ComputeClusterData()
549

  
550
    request = req.GetRequest(self.cfg)
551
    request["type"] = req.MODE
552
    self.in_data["request"] = request
553

  
554
    self.in_text = serializer.Dump(self.in_data)
555

  
556
  def Run(self, name, validate=True, call_fn=None):
557
    """Run an instance allocator and return the results.
558

  
559
    """
560
    if call_fn is None:
561
      call_fn = self.rpc.call_iallocator_runner
562

  
563
    result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
564
    result.Raise("Failure while running the iallocator script")
565

  
566
    self.required_nodes = self.req.required_nodes
567
    self.out_text = result.payload
568
    if validate:
569
      self._ValidateResult()
570

  
571
  def _ValidateResult(self):
572
    """Process the allocator results.
573

  
574
    This will process and if successful save the result in
575
    self.out_data and the other parameters.
576

  
577
    """
578
    try:
579
      rdict = serializer.Load(self.out_text)
580
    except Exception, err:
581
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
582

  
583
    if not isinstance(rdict, dict):
584
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
585

  
586
    # TODO: remove backwards compatiblity in later versions
587
    if "nodes" in rdict and "result" not in rdict:
588
      rdict["result"] = rdict["nodes"]
589
      del rdict["nodes"]
590

  
591
    for key in "success", "info", "result":
592
      if key not in rdict:
593
        raise errors.OpExecError("Can't parse iallocator results:"
594
                                 " missing key '%s'" % key)
595
      setattr(self, key, rdict[key])
596

  
597
    self.req.ValidateResult(self, self.result)
598
    self.out_data = rdict
b/test/ganeti.cmdlib_unittest.py
42 42
from ganeti import objects
43 43
from ganeti import compat
44 44
from ganeti import rpc
45
from ganeti.masterd import iallocator
45 46
from ganeti.hypervisor import hv_xen
46 47

  
47 48
import testutils
......
409 410
            ]
410 411

  
411 412
          alloc_result = (moved, [], jobs)
412
          assert cmdlib.IAllocator._NEVAC_RESULT(alloc_result)
413
          assert iallocator._NEVAC_RESULT(alloc_result)
413 414

  
414 415
          lu = _FakeLU()
415 416
          result = cmdlib._LoadNodeEvacResult(lu, alloc_result,
......
438 439
    alloc_result = ([], [
439 440
      ("inst5191.example.com", "errormsg21178"),
440 441
      ], [])
441
    assert cmdlib.IAllocator._NEVAC_RESULT(alloc_result)
442
    assert iallocator._NEVAC_RESULT(alloc_result)
442 443

  
443 444
    lu = _FakeLU()
444 445
    self.assertRaises(errors.OpExecError, cmdlib._LoadNodeEvacResult,

Also available in: Unified diff