4 # Copyright (C) 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the iallocator code."""
24 from ganeti import compat
25 from ganeti import constants
26 from ganeti import errors
28 from ganeti import outils
29 from ganeti import opcodes
30 from ganeti import rpc
31 from ganeti import serializer
32 from ganeti import utils
34 import ganeti.masterd.instance as gmi
37 _STRING_LIST = ht.TListOf(ht.TString)
38 _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
39 # pylint: disable=E1101
40 # Class '...' has no 'OP_ID' member
41 "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
42 opcodes.OpInstanceMigrate.OP_ID,
43 opcodes.OpInstanceReplaceDisks.OP_ID]),
47 ht.TListOf(ht.TAnd(ht.TIsLength(3),
48 ht.TItems([ht.TNonEmptyString,
50 ht.TListOf(ht.TNonEmptyString),
53 ht.TListOf(ht.TAnd(ht.TIsLength(2),
54 ht.TItems([ht.TNonEmptyString,
57 _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
58 ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
60 _INST_NAME = ("name", ht.TNonEmptyString)
63 class _AutoReqParam(outils.AutoSlots):
64 """Meta class for request definitions.
68 def _GetSlots(mcs, attrs):
69 """Extract the slots out of REQ_PARAMS.
72 params = attrs.setdefault("REQ_PARAMS", [])
73 return [slot for (slot, _) in params]
76 class IARequestBase(outils.ValidatedSlots):
77 """A generic IAllocator request object.
80 __metaclass__ = _AutoReqParam
84 REQ_RESULT = NotImplemented
86 def __init__(self, **kwargs):
87 """Constructor for IARequestBase.
89 The constructor takes only keyword arguments and will set
90 attributes on this object based on the passed arguments. As such,
91 it means that you should not pass arguments which are not in the
92 REQ_PARAMS attribute for this class.
95 outils.ValidatedSlots.__init__(self, **kwargs)
100 """Validates all parameters of the request.
103 assert self.MODE in constants.VALID_IALLOCATOR_MODES
105 for (param, validator) in self.REQ_PARAMS:
106 if not hasattr(self, param):
107 raise errors.OpPrereqError("Request is missing '%s' parameter" % param,
110 value = getattr(self, param)
111 if not validator(value):
112 raise errors.OpPrereqError(("Request parameter '%s' has invalid"
113 " type %s/value %s") %
114 (param, type(value), value),
117 def GetRequest(self, cfg):
118 """Gets the request data dict.
120 @param cfg: The configuration instance
123 raise NotImplementedError
125 def ValidateResult(self, ia, result):
126 """Validates the result of an request.
128 @param ia: The IAllocator instance
129 @param result: The IAllocator run result
130 @raises ResultValidationError: If validation fails
133 if ia.success and not self.REQ_RESULT(result):
134 raise errors.ResultValidationError("iallocator returned invalid result,"
135 " expected %s, got %s" %
136 (self.REQ_RESULT, result))
139 class IAReqInstanceAlloc(IARequestBase):
140 """An instance allocation request.
143 # pylint: disable=E1101
144 MODE = constants.IALLOCATOR_MODE_ALLOC
147 ("memory", ht.TNonNegativeInt),
148 ("spindle_use", ht.TNonNegativeInt),
149 ("disks", ht.TListOf(ht.TDict)),
150 ("disk_template", ht.TString),
152 ("tags", _STRING_LIST),
153 ("nics", ht.TListOf(ht.TDict)),
155 ("hypervisor", ht.TString),
156 ("node_whitelist", ht.TMaybeListOf(ht.TNonEmptyString)),
158 REQ_RESULT = ht.TList
160 def RequiredNodes(self):
161 """Calculates the required nodes based on the disk_template.
164 if self.disk_template in constants.DTS_INT_MIRROR:
169 def GetRequest(self, cfg):
170 """Requests a new instance.
172 The checks for the completeness of the opcode must have already been
176 disk_space = gmi.ComputeDiskSize(self.disk_template, self.disks)
180 "disk_template": self.disk_template,
184 "memory": self.memory,
185 "spindle_use": self.spindle_use,
187 "disk_space_total": disk_space,
189 "required_nodes": self.RequiredNodes(),
190 "hypervisor": self.hypervisor,
193 def ValidateResult(self, ia, result):
194 """Validates an single instance allocation request.
197 IARequestBase.ValidateResult(self, ia, result)
199 if ia.success and len(result) != self.RequiredNodes():
200 raise errors.ResultValidationError("iallocator returned invalid number"
201 " of nodes (%s), required %s" %
202 (len(result), self.RequiredNodes()))
205 class IAReqMultiInstanceAlloc(IARequestBase):
206 """An multi instance allocation request.
209 # pylint: disable=E1101
210 MODE = constants.IALLOCATOR_MODE_MULTI_ALLOC
212 ("instances", ht.TListOf(ht.TInstanceOf(IAReqInstanceAlloc))),
215 ht.TListOf(ht.TAnd(ht.TIsLength(2),
216 ht.TItems([ht.TNonEmptyString,
217 ht.TListOf(ht.TNonEmptyString),
219 _MAFAILED = ht.TListOf(ht.TNonEmptyString)
220 REQ_RESULT = ht.TAnd(ht.TList, ht.TIsLength(2),
221 ht.TItems([_MASUCCESS, _MAFAILED]))
223 def GetRequest(self, cfg):
225 "instances": [iareq.GetRequest(cfg) for iareq in self.instances],
229 class IAReqRelocate(IARequestBase):
230 """A relocation request.
233 # pylint: disable=E1101
234 MODE = constants.IALLOCATOR_MODE_RELOC
237 ("relocate_from", _STRING_LIST),
239 REQ_RESULT = ht.TList
241 def GetRequest(self, cfg):
242 """Request an relocation of an instance
244 The checks for the completeness of the opcode must have already been
248 instance = cfg.GetInstanceInfo(self.name)
250 raise errors.ProgrammerError("Unknown instance '%s' passed to"
251 " IAllocator" % self.name)
253 if instance.disk_template not in constants.DTS_MIRRORED:
254 raise errors.OpPrereqError("Can't relocate non-mirrored instances",
257 if (instance.disk_template in constants.DTS_INT_MIRROR and
258 len(instance.secondary_nodes) != 1):
259 raise errors.OpPrereqError("Instance has not exactly one secondary node",
262 disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
263 disk_space = gmi.ComputeDiskSize(instance.disk_template, disk_sizes)
267 "disk_space_total": disk_space,
269 "relocate_from": self.relocate_from,
272 def ValidateResult(self, ia, result):
273 """Validates the result of an relocation request.
276 IARequestBase.ValidateResult(self, ia, result)
278 node2group = dict((name, ndata["group"])
279 for (name, ndata) in ia.in_data["nodes"].items())
281 fn = compat.partial(self._NodesToGroups, node2group,
282 ia.in_data["nodegroups"])
284 instance = ia.cfg.GetInstanceInfo(self.name)
285 request_groups = fn(self.relocate_from + [instance.primary_node])
286 result_groups = fn(result + [instance.primary_node])
288 if ia.success and not set(result_groups).issubset(request_groups):
289 raise errors.ResultValidationError("Groups of nodes returned by"
290 "iallocator (%s) differ from original"
292 (utils.CommaJoin(result_groups),
293 utils.CommaJoin(request_groups)))
296 def _NodesToGroups(node2group, groups, nodes):
297 """Returns a list of unique group names for a list of nodes.
299 @type node2group: dict
300 @param node2group: Map from node name to group UUID
302 @param groups: Group information
304 @param nodes: Node names
311 group_uuid = node2group[node]
313 # Ignore unknown node
317 group = groups[group_uuid]
319 # Can't find group, let's use UUID
320 group_name = group_uuid
322 group_name = group["name"]
324 result.add(group_name)
326 return sorted(result)
329 class IAReqNodeEvac(IARequestBase):
330 """A node evacuation request.
333 # pylint: disable=E1101
334 MODE = constants.IALLOCATOR_MODE_NODE_EVAC
336 ("instances", _STRING_LIST),
337 ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
339 REQ_RESULT = _NEVAC_RESULT
341 def GetRequest(self, cfg):
342 """Get data for node-evacuate requests.
346 "instances": self.instances,
347 "evac_mode": self.evac_mode,
351 class IAReqGroupChange(IARequestBase):
352 """A group change request.
355 # pylint: disable=E1101
356 MODE = constants.IALLOCATOR_MODE_CHG_GROUP
358 ("instances", _STRING_LIST),
359 ("target_groups", _STRING_LIST),
361 REQ_RESULT = _NEVAC_RESULT
363 def GetRequest(self, cfg):
364 """Get data for node-evacuate requests.
368 "instances": self.instances,
369 "target_groups": self.target_groups,
373 class IAllocator(object):
374 """IAllocator framework.
376 An IAllocator instance has three sets of attributes:
377 - cfg that is needed to query the cluster
378 - input data (all members of the _KEYS class attribute are required)
379 - four buffer attributes (in|out_data|text), that represent the
380 input (to the external script) in text and data structure format,
381 and the output from it, again in two formats
382 - the result variables from the script (success, info, nodes) for
386 # pylint: disable=R0902
387 # lots of instance attributes
389 def __init__(self, cfg, rpc_runner, req):
391 self.rpc = rpc_runner
393 # init buffer variables
394 self.in_text = self.out_text = self.in_data = self.out_data = None
396 self.success = self.info = self.result = None
398 self._BuildInputData(req)
400 def _ComputeClusterData(self):
401 """Compute the generic allocator input data.
403 This is the data that is independent of the actual operation.
407 cluster_info = cfg.GetClusterInfo()
410 "version": constants.IALLOCATOR_VERSION,
411 "cluster_name": cfg.GetClusterName(),
412 "cluster_tags": list(cluster_info.GetTags()),
413 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
414 "ipolicy": cluster_info.ipolicy,
416 ninfo = cfg.GetAllNodesInfo()
417 iinfo = cfg.GetAllInstancesInfo().values()
418 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
421 node_list = [n.name for n in ninfo.values() if n.vm_capable]
423 if isinstance(self.req, IAReqInstanceAlloc):
424 hypervisor_name = self.req.hypervisor
425 node_whitelist = self.req.node_whitelist
426 elif isinstance(self.req, IAReqRelocate):
427 hypervisor_name = cfg.GetInstanceInfo(self.req.name).hypervisor
428 node_whitelist = None
430 hypervisor_name = cluster_info.primary_hypervisor
431 node_whitelist = None
433 es_flags = rpc.GetExclusiveStorageForNodeNames(cfg, node_list)
434 node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
435 [hypervisor_name], es_flags)
437 self.rpc.call_all_instances_info(node_list,
438 cluster_info.enabled_hypervisors)
440 data["nodegroups"] = self._ComputeNodeGroupData(cfg)
442 config_ndata = self._ComputeBasicNodeData(cfg, ninfo, node_whitelist)
443 data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
444 i_list, config_ndata)
445 assert len(data["nodes"]) == len(ninfo), \
446 "Incomplete node data computed"
448 data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
453 def _ComputeNodeGroupData(cfg):
454 """Compute node groups data.
457 cluster = cfg.GetClusterInfo()
460 "alloc_policy": gdata.alloc_policy,
461 "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
462 "tags": list(gdata.GetTags()),
464 for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
469 def _ComputeBasicNodeData(cfg, node_cfg, node_whitelist):
470 """Compute global node data.
473 @returns: a dict of name: (node dict, node config)
476 # fill in static (config-based) values
477 node_results = dict((ninfo.name, {
478 "tags": list(ninfo.GetTags()),
479 "primary_ip": ninfo.primary_ip,
480 "secondary_ip": ninfo.secondary_ip,
481 "offline": (ninfo.offline or
482 not (node_whitelist is None or
483 ninfo.name in node_whitelist)),
484 "drained": ninfo.drained,
485 "master_candidate": ninfo.master_candidate,
486 "group": ninfo.group,
487 "master_capable": ninfo.master_capable,
488 "vm_capable": ninfo.vm_capable,
489 "ndparams": cfg.GetNdParams(ninfo),
491 for ninfo in node_cfg.values())
496 def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
498 """Compute global node data.
500 @param node_results: the basic node structures as filled from the config
503 #TODO(dynmem): compute the right data on MAX and MIN memory
504 # make a copy of the current dict
505 node_results = dict(node_results)
506 for nname, nresult in node_data.items():
507 assert nname in node_results, "Missing basic data for node %s" % nname
508 ninfo = node_cfg[nname]
510 if not (ninfo.offline or ninfo.drained):
511 nresult.Raise("Can't get data for node %s" % nname)
512 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
514 remote_info = rpc.MakeLegacyNodeInfo(nresult.payload)
516 for attr in ["memory_total", "memory_free", "memory_dom0",
517 "vg_size", "vg_free", "cpu_total"]:
518 if attr not in remote_info:
519 raise errors.OpExecError("Node '%s' didn't return attribute"
520 " '%s'" % (nname, attr))
521 if not isinstance(remote_info[attr], int):
522 raise errors.OpExecError("Node '%s' returned invalid value"
524 (nname, attr, remote_info[attr]))
525 # compute memory used by primary instances
526 i_p_mem = i_p_up_mem = 0
527 for iinfo, beinfo in i_list:
528 if iinfo.primary_node == nname:
529 i_p_mem += beinfo[constants.BE_MAXMEM]
530 if iinfo.name not in node_iinfo[nname].payload:
533 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
534 i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
535 remote_info["memory_free"] -= max(0, i_mem_diff)
537 if iinfo.admin_state == constants.ADMINST_UP:
538 i_p_up_mem += beinfo[constants.BE_MAXMEM]
540 # compute memory used by instances
542 "total_memory": remote_info["memory_total"],
543 "reserved_memory": remote_info["memory_dom0"],
544 "free_memory": remote_info["memory_free"],
545 "total_disk": remote_info["vg_size"],
546 "free_disk": remote_info["vg_free"],
547 "total_cpus": remote_info["cpu_total"],
548 "i_pri_memory": i_p_mem,
549 "i_pri_up_memory": i_p_up_mem,
551 pnr_dyn.update(node_results[nname])
552 node_results[nname] = pnr_dyn
557 def _ComputeInstanceData(cluster_info, i_list):
558 """Compute global instance data.
562 for iinfo, beinfo in i_list:
564 for nic in iinfo.nics:
565 filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
569 "mode": filled_params[constants.NIC_MODE],
570 "link": filled_params[constants.NIC_LINK],
572 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
573 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
574 nic_data.append(nic_dict)
576 "tags": list(iinfo.GetTags()),
577 "admin_state": iinfo.admin_state,
578 "vcpus": beinfo[constants.BE_VCPUS],
579 "memory": beinfo[constants.BE_MAXMEM],
580 "spindle_use": beinfo[constants.BE_SPINDLE_USE],
582 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
584 "disks": [{constants.IDISK_SIZE: dsk.size,
585 constants.IDISK_MODE: dsk.mode}
586 for dsk in iinfo.disks],
587 "disk_template": iinfo.disk_template,
588 "hypervisor": iinfo.hypervisor,
590 pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template,
592 instance_data[iinfo.name] = pir
596 def _BuildInputData(self, req):
597 """Build input data structures.
600 self._ComputeClusterData()
602 request = req.GetRequest(self.cfg)
603 request["type"] = req.MODE
604 self.in_data["request"] = request
606 self.in_text = serializer.Dump(self.in_data)
608 def Run(self, name, validate=True, call_fn=None):
609 """Run an instance allocator and return the results.
613 call_fn = self.rpc.call_iallocator_runner
615 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
616 result.Raise("Failure while running the iallocator script")
618 self.out_text = result.payload
620 self._ValidateResult()
622 def _ValidateResult(self):
623 """Process the allocator results.
625 This will process and if successful save the result in
626 self.out_data and the other parameters.
630 rdict = serializer.Load(self.out_text)
631 except Exception, err:
632 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
634 if not isinstance(rdict, dict):
635 raise errors.OpExecError("Can't parse iallocator results: not a dict")
637 # TODO: remove backwards compatiblity in later versions
638 if "nodes" in rdict and "result" not in rdict:
639 rdict["result"] = rdict["nodes"]
642 for key in "success", "info", "result":
644 raise errors.OpExecError("Can't parse iallocator results:"
645 " missing key '%s'" % key)
646 setattr(self, key, rdict[key])
648 self.req.ValidateResult(self, self.result)
649 self.out_data = rdict