4 # Copyright (C) 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the iallocator code."""
24 from ganeti import compat
25 from ganeti import constants
26 from ganeti import errors
28 from ganeti import objectutils
29 from ganeti import opcodes
30 from ganeti import rpc
31 from ganeti import serializer
32 from ganeti import utils
34 import ganeti.masterd.instance as gmi
37 _STRING_LIST = ht.TListOf(ht.TString)
38 _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
39 # pylint: disable=E1101
40 # Class '...' has no 'OP_ID' member
41 "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
42 opcodes.OpInstanceMigrate.OP_ID,
43 opcodes.OpInstanceReplaceDisks.OP_ID])
47 ht.TListOf(ht.TAnd(ht.TIsLength(3),
48 ht.TItems([ht.TNonEmptyString,
50 ht.TListOf(ht.TNonEmptyString),
53 ht.TListOf(ht.TAnd(ht.TIsLength(2),
54 ht.TItems([ht.TNonEmptyString,
57 _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
58 ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
60 _INST_NAME = ("name", ht.TNonEmptyString)
63 class _AutoReqParam(objectutils.AutoSlots):
64 """Meta class for request definitions.
68 def _GetSlots(mcs, attrs):
69 """Extract the slots out of REQ_PARAMS.
72 params = attrs.setdefault("REQ_PARAMS", [])
73 return [slot for (slot, _) in params]
76 class IARequestBase(objectutils.ValidatedSlots):
77 """A generic IAllocator request object.
80 __metaclass__ = _AutoReqParam
84 REQ_RESULT = NotImplemented
86 def __init__(self, **kwargs):
87 """Constructor for IARequestBase.
89 The constructor takes only keyword arguments and will set
90 attributes on this object based on the passed arguments. As such,
91 it means that you should not pass arguments which are not in the
92 REQ_PARAMS attribute for this class.
95 objectutils.ValidatedSlots.__init__(self, **kwargs)
100 """Validates all parameters of the request.
103 assert self.MODE in constants.VALID_IALLOCATOR_MODES
105 for (param, validator) in self.REQ_PARAMS:
106 if not hasattr(self, param):
107 raise errors.OpPrereqError("Request is missing '%s' parameter" % param,
110 value = getattr(self, param)
111 if not validator(value):
112 raise errors.OpPrereqError(("Request parameter '%s' has invalid"
113 " type %s/value %s") %
114 (param, type(value), value),
117 def GetRequest(self, cfg):
118 """Gets the request data dict.
120 @param cfg: The configuration instance
123 raise NotImplementedError
125 def ValidateResult(self, ia, result):
126 """Validates the result of an request.
128 @param ia: The IAllocator instance
129 @param result: The IAllocator run result
130 @raises ResultValidationError: If validation fails
133 if ia.success and not self.REQ_RESULT(result):
134 raise errors.ResultValidationError("iallocator returned invalid result,"
135 " expected %s, got %s" %
136 (self.REQ_RESULT, result))
139 class IAReqInstanceAlloc(IARequestBase):
140 """An instance allocation request.
143 # pylint: disable=E1101
144 MODE = constants.IALLOCATOR_MODE_ALLOC
147 ("memory", ht.TPositiveInt),
148 ("spindle_use", ht.TPositiveInt),
149 ("disks", ht.TListOf(ht.TDict)),
150 ("disk_template", ht.TString),
152 ("tags", _STRING_LIST),
153 ("nics", ht.TListOf(ht.TDict)),
155 ("hypervisor", ht.TString),
157 REQ_RESULT = ht.TList
159 def RequiredNodes(self):
160 """Calculates the required nodes based on the disk_template.
163 if self.disk_template in constants.DTS_INT_MIRROR:
168 def GetRequest(self, cfg):
169 """Requests a new instance.
171 The checks for the completeness of the opcode must have already been
175 disk_space = gmi.ComputeDiskSize(self.disk_template, self.disks)
179 "disk_template": self.disk_template,
183 "memory": self.memory,
184 "spindle_use": self.spindle_use,
186 "disk_space_total": disk_space,
188 "required_nodes": self.RequiredNodes(),
189 "hypervisor": self.hypervisor,
192 def ValidateResult(self, ia, result):
193 """Validates an single instance allocation request.
196 IARequestBase.ValidateResult(self, ia, result)
198 if ia.success and len(result) != self.RequiredNodes():
199 raise errors.ResultValidationError("iallocator returned invalid number"
200 " of nodes (%s), required %s" %
201 (len(result), self.RequiredNodes()))
204 class IAReqMultiInstanceAlloc(IARequestBase):
205 """An multi instance allocation request.
208 # pylint: disable=E1101
209 MODE = constants.IALLOCATOR_MODE_MULTI_ALLOC
211 ("instances", ht.TListOf(ht.TInstanceOf(IAReqInstanceAlloc)))
214 ht.TListOf(ht.TAnd(ht.TIsLength(2),
215 ht.TItems([ht.TNonEmptyString,
216 ht.TListOf(ht.TNonEmptyString),
218 _MAFAILED = ht.TListOf(ht.TNonEmptyString)
219 REQ_RESULT = ht.TAnd(ht.TList, ht.TIsLength(2),
220 ht.TItems([_MASUCCESS, _MAFAILED]))
222 def GetRequest(self, cfg):
224 "instances": [iareq.GetRequest(cfg) for iareq in self.instances]
228 class IAReqRelocate(IARequestBase):
229 """A relocation request.
232 # pylint: disable=E1101
233 MODE = constants.IALLOCATOR_MODE_RELOC
236 ("relocate_from", _STRING_LIST),
238 REQ_RESULT = ht.TList
240 def GetRequest(self, cfg):
241 """Request an relocation of an instance
243 The checks for the completeness of the opcode must have already been
247 instance = cfg.GetInstanceInfo(self.name)
249 raise errors.ProgrammerError("Unknown instance '%s' passed to"
250 " IAllocator" % self.name)
252 if instance.disk_template not in constants.DTS_MIRRORED:
253 raise errors.OpPrereqError("Can't relocate non-mirrored instances",
256 if (instance.disk_template in constants.DTS_INT_MIRROR and
257 len(instance.secondary_nodes) != 1):
258 raise errors.OpPrereqError("Instance has not exactly one secondary node",
261 disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
262 disk_space = gmi.ComputeDiskSize(instance.disk_template, disk_sizes)
266 "disk_space_total": disk_space,
268 "relocate_from": self.relocate_from,
271 def ValidateResult(self, ia, result):
272 """Validates the result of an relocation request.
275 IARequestBase.ValidateResult(self, ia, result)
277 node2group = dict((name, ndata["group"])
278 for (name, ndata) in ia.in_data["nodes"].items())
280 fn = compat.partial(self._NodesToGroups, node2group,
281 ia.in_data["nodegroups"])
283 instance = ia.cfg.GetInstanceInfo(self.name)
284 request_groups = fn(self.relocate_from + [instance.primary_node])
285 result_groups = fn(result + [instance.primary_node])
287 if ia.success and not set(result_groups).issubset(request_groups):
288 raise errors.ResultValidationError("Groups of nodes returned by"
289 "iallocator (%s) differ from original"
291 (utils.CommaJoin(result_groups),
292 utils.CommaJoin(request_groups)))
295 def _NodesToGroups(node2group, groups, nodes):
296 """Returns a list of unique group names for a list of nodes.
298 @type node2group: dict
299 @param node2group: Map from node name to group UUID
301 @param groups: Group information
303 @param nodes: Node names
310 group_uuid = node2group[node]
312 # Ignore unknown node
316 group = groups[group_uuid]
318 # Can't find group, let's use UUID
319 group_name = group_uuid
321 group_name = group["name"]
323 result.add(group_name)
325 return sorted(result)
328 class IAReqNodeEvac(IARequestBase):
329 """A node evacuation request.
332 # pylint: disable=E1101
333 MODE = constants.IALLOCATOR_MODE_NODE_EVAC
335 ("instances", _STRING_LIST),
336 ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
338 REQ_RESULT = _NEVAC_RESULT
340 def GetRequest(self, cfg):
341 """Get data for node-evacuate requests.
345 "instances": self.instances,
346 "evac_mode": self.evac_mode,
350 class IAReqGroupChange(IARequestBase):
351 """A group change request.
354 # pylint: disable=E1101
355 MODE = constants.IALLOCATOR_MODE_CHG_GROUP
357 ("instances", _STRING_LIST),
358 ("target_groups", _STRING_LIST),
360 REQ_RESULT = _NEVAC_RESULT
362 def GetRequest(self, cfg):
363 """Get data for node-evacuate requests.
367 "instances": self.instances,
368 "target_groups": self.target_groups,
372 class IAllocator(object):
373 """IAllocator framework.
375 An IAllocator instance has three sets of attributes:
376 - cfg that is needed to query the cluster
377 - input data (all members of the _KEYS class attribute are required)
378 - four buffer attributes (in|out_data|text), that represent the
379 input (to the external script) in text and data structure format,
380 and the output from it, again in two formats
381 - the result variables from the script (success, info, nodes) for
385 # pylint: disable=R0902
386 # lots of instance attributes
388 def __init__(self, cfg, rpc_runner, req):
390 self.rpc = rpc_runner
392 # init buffer variables
393 self.in_text = self.out_text = self.in_data = self.out_data = None
395 self.success = self.info = self.result = None
397 self._BuildInputData(req)
399 def _ComputeClusterData(self):
400 """Compute the generic allocator input data.
402 This is the data that is independent of the actual operation.
406 cluster_info = cfg.GetClusterInfo()
409 "version": constants.IALLOCATOR_VERSION,
410 "cluster_name": cfg.GetClusterName(),
411 "cluster_tags": list(cluster_info.GetTags()),
412 "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
413 "ipolicy": cluster_info.ipolicy,
415 ninfo = cfg.GetAllNodesInfo()
416 iinfo = cfg.GetAllInstancesInfo().values()
417 i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
420 node_list = [n.name for n in ninfo.values() if n.vm_capable]
422 if isinstance(self.req, IAReqInstanceAlloc):
423 hypervisor_name = self.req.hypervisor
424 elif isinstance(self.req, IAReqRelocate):
425 hypervisor_name = cfg.GetInstanceInfo(self.req.name).hypervisor
427 hypervisor_name = cluster_info.primary_hypervisor
429 node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
432 self.rpc.call_all_instances_info(node_list,
433 cluster_info.enabled_hypervisors)
435 data["nodegroups"] = self._ComputeNodeGroupData(cfg)
437 config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
438 data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
439 i_list, config_ndata)
440 assert len(data["nodes"]) == len(ninfo), \
441 "Incomplete node data computed"
443 data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
448 def _ComputeNodeGroupData(cfg):
449 """Compute node groups data.
452 cluster = cfg.GetClusterInfo()
455 "alloc_policy": gdata.alloc_policy,
456 "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
458 for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
463 def _ComputeBasicNodeData(cfg, node_cfg):
464 """Compute global node data.
467 @returns: a dict of name: (node dict, node config)
470 # fill in static (config-based) values
471 node_results = dict((ninfo.name, {
472 "tags": list(ninfo.GetTags()),
473 "primary_ip": ninfo.primary_ip,
474 "secondary_ip": ninfo.secondary_ip,
475 "offline": ninfo.offline,
476 "drained": ninfo.drained,
477 "master_candidate": ninfo.master_candidate,
478 "group": ninfo.group,
479 "master_capable": ninfo.master_capable,
480 "vm_capable": ninfo.vm_capable,
481 "ndparams": cfg.GetNdParams(ninfo),
483 for ninfo in node_cfg.values())
488 def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
490 """Compute global node data.
492 @param node_results: the basic node structures as filled from the config
495 #TODO(dynmem): compute the right data on MAX and MIN memory
496 # make a copy of the current dict
497 node_results = dict(node_results)
498 for nname, nresult in node_data.items():
499 assert nname in node_results, "Missing basic data for node %s" % nname
500 ninfo = node_cfg[nname]
502 if not (ninfo.offline or ninfo.drained):
503 nresult.Raise("Can't get data for node %s" % nname)
504 node_iinfo[nname].Raise("Can't get node instance info from node %s" %
506 remote_info = rpc.MakeLegacyNodeInfo(nresult.payload)
508 for attr in ["memory_total", "memory_free", "memory_dom0",
509 "vg_size", "vg_free", "cpu_total"]:
510 if attr not in remote_info:
511 raise errors.OpExecError("Node '%s' didn't return attribute"
512 " '%s'" % (nname, attr))
513 if not isinstance(remote_info[attr], int):
514 raise errors.OpExecError("Node '%s' returned invalid value"
516 (nname, attr, remote_info[attr]))
517 # compute memory used by primary instances
518 i_p_mem = i_p_up_mem = 0
519 for iinfo, beinfo in i_list:
520 if iinfo.primary_node == nname:
521 i_p_mem += beinfo[constants.BE_MAXMEM]
522 if iinfo.name not in node_iinfo[nname].payload:
525 i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
526 i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
527 remote_info["memory_free"] -= max(0, i_mem_diff)
529 if iinfo.admin_state == constants.ADMINST_UP:
530 i_p_up_mem += beinfo[constants.BE_MAXMEM]
532 # compute memory used by instances
534 "total_memory": remote_info["memory_total"],
535 "reserved_memory": remote_info["memory_dom0"],
536 "free_memory": remote_info["memory_free"],
537 "total_disk": remote_info["vg_size"],
538 "free_disk": remote_info["vg_free"],
539 "total_cpus": remote_info["cpu_total"],
540 "i_pri_memory": i_p_mem,
541 "i_pri_up_memory": i_p_up_mem,
543 pnr_dyn.update(node_results[nname])
544 node_results[nname] = pnr_dyn
549 def _ComputeInstanceData(cluster_info, i_list):
550 """Compute global instance data.
554 for iinfo, beinfo in i_list:
556 for nic in iinfo.nics:
557 filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
561 "mode": filled_params[constants.NIC_MODE],
562 "link": filled_params[constants.NIC_LINK],
564 if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
565 nic_dict["bridge"] = filled_params[constants.NIC_LINK]
566 nic_data.append(nic_dict)
568 "tags": list(iinfo.GetTags()),
569 "admin_state": iinfo.admin_state,
570 "vcpus": beinfo[constants.BE_VCPUS],
571 "memory": beinfo[constants.BE_MAXMEM],
572 "spindle_use": beinfo[constants.BE_SPINDLE_USE],
574 "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
576 "disks": [{constants.IDISK_SIZE: dsk.size,
577 constants.IDISK_MODE: dsk.mode}
578 for dsk in iinfo.disks],
579 "disk_template": iinfo.disk_template,
580 "hypervisor": iinfo.hypervisor,
582 pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template,
584 instance_data[iinfo.name] = pir
588 def _BuildInputData(self, req):
589 """Build input data structures.
592 self._ComputeClusterData()
594 request = req.GetRequest(self.cfg)
595 request["type"] = req.MODE
596 self.in_data["request"] = request
598 self.in_text = serializer.Dump(self.in_data)
600 def Run(self, name, validate=True, call_fn=None):
601 """Run an instance allocator and return the results.
605 call_fn = self.rpc.call_iallocator_runner
607 result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
608 result.Raise("Failure while running the iallocator script")
610 self.out_text = result.payload
612 self._ValidateResult()
614 def _ValidateResult(self):
615 """Process the allocator results.
617 This will process and if successful save the result in
618 self.out_data and the other parameters.
622 rdict = serializer.Load(self.out_text)
623 except Exception, err:
624 raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
626 if not isinstance(rdict, dict):
627 raise errors.OpExecError("Can't parse iallocator results: not a dict")
629 # TODO: remove backwards compatiblity in later versions
630 if "nodes" in rdict and "result" not in rdict:
631 rdict["result"] = rdict["nodes"]
634 for key in "success", "info", "result":
636 raise errors.OpExecError("Can't parse iallocator results:"
637 " missing key '%s'" % key)
638 setattr(self, key, rdict[key])
640 self.req.ValidateResult(self, self.result)
641 self.out_data = rdict