iallocator: prepare RPC call 'node_info'
[ganeti-local] / lib / masterd / iallocator.py
1 #
2 #
3
4 # Copyright (C) 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the iallocator code."""
23
24 from ganeti import compat
25 from ganeti import constants
26 from ganeti import errors
27 from ganeti import ht
28 from ganeti import outils
29 from ganeti import opcodes
30 from ganeti import rpc
31 from ganeti import serializer
32 from ganeti import utils
33
34 import ganeti.masterd.instance as gmi
35
36
37 _STRING_LIST = ht.TListOf(ht.TString)
38 _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
39    # pylint: disable=E1101
40    # Class '...' has no 'OP_ID' member
41    "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
42                         opcodes.OpInstanceMigrate.OP_ID,
43                         opcodes.OpInstanceReplaceDisks.OP_ID]),
44    })))
45
46 _NEVAC_MOVED = \
47   ht.TListOf(ht.TAnd(ht.TIsLength(3),
48                      ht.TItems([ht.TNonEmptyString,
49                                 ht.TNonEmptyString,
50                                 ht.TListOf(ht.TNonEmptyString),
51                                 ])))
52 _NEVAC_FAILED = \
53   ht.TListOf(ht.TAnd(ht.TIsLength(2),
54                      ht.TItems([ht.TNonEmptyString,
55                                 ht.TMaybeString,
56                                 ])))
57 _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
58                         ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
59
60 _INST_NAME = ("name", ht.TNonEmptyString)
61 _INST_UUID = ("inst_uuid", ht.TNonEmptyString)
62
63
64 class _AutoReqParam(outils.AutoSlots):
65   """Meta class for request definitions.
66
67   """
68   @classmethod
69   def _GetSlots(mcs, attrs):
70     """Extract the slots out of REQ_PARAMS.
71
72     """
73     params = attrs.setdefault("REQ_PARAMS", [])
74     return [slot for (slot, _) in params]
75
76
77 class IARequestBase(outils.ValidatedSlots):
78   """A generic IAllocator request object.
79
80   """
81   __metaclass__ = _AutoReqParam
82
83   MODE = NotImplemented
84   REQ_PARAMS = []
85   REQ_RESULT = NotImplemented
86
87   def __init__(self, **kwargs):
88     """Constructor for IARequestBase.
89
90     The constructor takes only keyword arguments and will set
91     attributes on this object based on the passed arguments. As such,
92     it means that you should not pass arguments which are not in the
93     REQ_PARAMS attribute for this class.
94
95     """
96     outils.ValidatedSlots.__init__(self, **kwargs)
97
98     self.Validate()
99
100   def Validate(self):
101     """Validates all parameters of the request.
102
103     """
104     assert self.MODE in constants.VALID_IALLOCATOR_MODES
105
106     for (param, validator) in self.REQ_PARAMS:
107       if not hasattr(self, param):
108         raise errors.OpPrereqError("Request is missing '%s' parameter" % param,
109                                    errors.ECODE_INVAL)
110
111       value = getattr(self, param)
112       if not validator(value):
113         raise errors.OpPrereqError(("Request parameter '%s' has invalid"
114                                     " type %s/value %s") %
115                                     (param, type(value), value),
116                                     errors.ECODE_INVAL)
117
118   def GetRequest(self, cfg):
119     """Gets the request data dict.
120
121     @param cfg: The configuration instance
122
123     """
124     raise NotImplementedError
125
126   def ValidateResult(self, ia, result):
127     """Validates the result of an request.
128
129     @param ia: The IAllocator instance
130     @param result: The IAllocator run result
131     @raises ResultValidationError: If validation fails
132
133     """
134     if ia.success and not self.REQ_RESULT(result):
135       raise errors.ResultValidationError("iallocator returned invalid result,"
136                                          " expected %s, got %s" %
137                                          (self.REQ_RESULT, result))
138
139
140 class IAReqInstanceAlloc(IARequestBase):
141   """An instance allocation request.
142
143   """
144   # pylint: disable=E1101
145   MODE = constants.IALLOCATOR_MODE_ALLOC
146   REQ_PARAMS = [
147     _INST_NAME,
148     ("memory", ht.TNonNegativeInt),
149     ("spindle_use", ht.TNonNegativeInt),
150     ("disks", ht.TListOf(ht.TDict)),
151     ("disk_template", ht.TString),
152     ("os", ht.TString),
153     ("tags", _STRING_LIST),
154     ("nics", ht.TListOf(ht.TDict)),
155     ("vcpus", ht.TInt),
156     ("hypervisor", ht.TString),
157     ("node_whitelist", ht.TMaybeListOf(ht.TNonEmptyString)),
158     ]
159   REQ_RESULT = ht.TList
160
161   def RequiredNodes(self):
162     """Calculates the required nodes based on the disk_template.
163
164     """
165     if self.disk_template in constants.DTS_INT_MIRROR:
166       return 2
167     else:
168       return 1
169
170   def GetRequest(self, cfg):
171     """Requests a new instance.
172
173     The checks for the completeness of the opcode must have already been
174     done.
175
176     """
177     disk_space = gmi.ComputeDiskSize(self.disk_template, self.disks)
178
179     return {
180       "name": self.name,
181       "disk_template": self.disk_template,
182       "tags": self.tags,
183       "os": self.os,
184       "vcpus": self.vcpus,
185       "memory": self.memory,
186       "spindle_use": self.spindle_use,
187       "disks": self.disks,
188       "disk_space_total": disk_space,
189       "nics": self.nics,
190       "required_nodes": self.RequiredNodes(),
191       "hypervisor": self.hypervisor,
192       }
193
194   def ValidateResult(self, ia, result):
195     """Validates an single instance allocation request.
196
197     """
198     IARequestBase.ValidateResult(self, ia, result)
199
200     if ia.success and len(result) != self.RequiredNodes():
201       raise errors.ResultValidationError("iallocator returned invalid number"
202                                          " of nodes (%s), required %s" %
203                                          (len(result), self.RequiredNodes()))
204
205
206 class IAReqMultiInstanceAlloc(IARequestBase):
207   """An multi instance allocation request.
208
209   """
210   # pylint: disable=E1101
211   MODE = constants.IALLOCATOR_MODE_MULTI_ALLOC
212   REQ_PARAMS = [
213     ("instances", ht.TListOf(ht.TInstanceOf(IAReqInstanceAlloc))),
214     ]
215   _MASUCCESS = \
216     ht.TListOf(ht.TAnd(ht.TIsLength(2),
217                        ht.TItems([ht.TNonEmptyString,
218                                   ht.TListOf(ht.TNonEmptyString),
219                                   ])))
220   _MAFAILED = ht.TListOf(ht.TNonEmptyString)
221   REQ_RESULT = ht.TAnd(ht.TList, ht.TIsLength(2),
222                        ht.TItems([_MASUCCESS, _MAFAILED]))
223
224   def GetRequest(self, cfg):
225     return {
226       "instances": [iareq.GetRequest(cfg) for iareq in self.instances],
227       }
228
229
230 class IAReqRelocate(IARequestBase):
231   """A relocation request.
232
233   """
234   # pylint: disable=E1101
235   MODE = constants.IALLOCATOR_MODE_RELOC
236   REQ_PARAMS = [
237     _INST_UUID,
238     ("relocate_from_node_uuids", _STRING_LIST),
239     ]
240   REQ_RESULT = ht.TList
241
242   def GetRequest(self, cfg):
243     """Request an relocation of an instance
244
245     The checks for the completeness of the opcode must have already been
246     done.
247
248     """
249     instance = cfg.GetInstanceInfo(self.inst_uuid)
250     if instance is None:
251       raise errors.ProgrammerError("Unknown instance '%s' passed to"
252                                    " IAllocator" % self.inst_uuid)
253
254     if instance.disk_template not in constants.DTS_MIRRORED:
255       raise errors.OpPrereqError("Can't relocate non-mirrored instances",
256                                  errors.ECODE_INVAL)
257
258     if (instance.disk_template in constants.DTS_INT_MIRROR and
259         len(instance.secondary_nodes) != 1):
260       raise errors.OpPrereqError("Instance has not exactly one secondary node",
261                                  errors.ECODE_STATE)
262
263     disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
264     disk_space = gmi.ComputeDiskSize(instance.disk_template, disk_sizes)
265
266     return {
267       "name": instance.name,
268       "disk_space_total": disk_space,
269       "required_nodes": 1,
270       "relocate_from": cfg.GetNodeNames(self.relocate_from_node_uuids),
271       }
272
273   def ValidateResult(self, ia, result):
274     """Validates the result of an relocation request.
275
276     """
277     IARequestBase.ValidateResult(self, ia, result)
278
279     node2group = dict((name, ndata["group"])
280                       for (name, ndata) in ia.in_data["nodes"].items())
281
282     fn = compat.partial(self._NodesToGroups, node2group,
283                         ia.in_data["nodegroups"])
284
285     instance = ia.cfg.GetInstanceInfo(self.inst_uuid)
286     request_groups = fn(ia.cfg.GetNodeNames(self.relocate_from_node_uuids) +
287                         ia.cfg.GetNodeNames([instance.primary_node]))
288     result_groups = fn(result + ia.cfg.GetNodeNames([instance.primary_node]))
289
290     if ia.success and not set(result_groups).issubset(request_groups):
291       raise errors.ResultValidationError("Groups of nodes returned by"
292                                          " iallocator (%s) differ from original"
293                                          " groups (%s)" %
294                                          (utils.CommaJoin(result_groups),
295                                           utils.CommaJoin(request_groups)))
296
297   @staticmethod
298   def _NodesToGroups(node2group, groups, nodes):
299     """Returns a list of unique group names for a list of nodes.
300
301     @type node2group: dict
302     @param node2group: Map from node name to group UUID
303     @type groups: dict
304     @param groups: Group information
305     @type nodes: list
306     @param nodes: Node names
307
308     """
309     result = set()
310
311     for node in nodes:
312       try:
313         group_uuid = node2group[node]
314       except KeyError:
315         # Ignore unknown node
316         pass
317       else:
318         try:
319           group = groups[group_uuid]
320         except KeyError:
321           # Can't find group, let's use UUID
322           group_name = group_uuid
323         else:
324           group_name = group["name"]
325
326         result.add(group_name)
327
328     return sorted(result)
329
330
331 class IAReqNodeEvac(IARequestBase):
332   """A node evacuation request.
333
334   """
335   # pylint: disable=E1101
336   MODE = constants.IALLOCATOR_MODE_NODE_EVAC
337   REQ_PARAMS = [
338     ("instances", _STRING_LIST),
339     ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
340     ]
341   REQ_RESULT = _NEVAC_RESULT
342
343   def GetRequest(self, cfg):
344     """Get data for node-evacuate requests.
345
346     """
347     return {
348       "instances": self.instances,
349       "evac_mode": self.evac_mode,
350       }
351
352
353 class IAReqGroupChange(IARequestBase):
354   """A group change request.
355
356   """
357   # pylint: disable=E1101
358   MODE = constants.IALLOCATOR_MODE_CHG_GROUP
359   REQ_PARAMS = [
360     ("instances", _STRING_LIST),
361     ("target_groups", _STRING_LIST),
362     ]
363   REQ_RESULT = _NEVAC_RESULT
364
365   def GetRequest(self, cfg):
366     """Get data for node-evacuate requests.
367
368     """
369     return {
370       "instances": self.instances,
371       "target_groups": self.target_groups,
372       }
373
374
375 class IAllocator(object):
376   """IAllocator framework.
377
378   An IAllocator instance has three sets of attributes:
379     - cfg that is needed to query the cluster
380     - input data (all members of the _KEYS class attribute are required)
381     - four buffer attributes (in|out_data|text), that represent the
382       input (to the external script) in text and data structure format,
383       and the output from it, again in two formats
384     - the result variables from the script (success, info, nodes) for
385       easy usage
386
387   """
388   # pylint: disable=R0902
389   # lots of instance attributes
390
391   def __init__(self, cfg, rpc_runner, req):
392     self.cfg = cfg
393     self.rpc = rpc_runner
394     self.req = req
395     # init buffer variables
396     self.in_text = self.out_text = self.in_data = self.out_data = None
397     # init result fields
398     self.success = self.info = self.result = None
399
400     self._BuildInputData(req)
401
402   def _ComputerClusterDataNodeInfo(self, node_list, cluster_info,
403                                    hypervisor_name):
404     """Prepare and execute node info call.
405
406     @type node_list: list of strings
407     @param node_list: list of nodes' UUIDs
408     @type cluster_info: L{objects.Cluster}
409     @param cluster_info: the cluster's information from the config
410     @type hypervisor_name: string
411     @param hypervisor_name: the hypervisor name
412     @rtype: same as the result of the node info RPC call
413     @return: the result of the node info RPC call
414
415     """
416     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_list)
417     storage_units = utils.storage.GetStorageUnitsOfCluster(
418         self.cfg, include_spindles=True)
419     hvspecs = [(hypervisor_name, cluster_info.hvparams[hypervisor_name])]
420     return self.rpc.call_node_info(node_list, storage_units, hvspecs, es_flags)
421
422   def _ComputeClusterData(self):
423     """Compute the generic allocator input data.
424
425     This is the data that is independent of the actual operation.
426
427     """
428     cluster_info = self.cfg.GetClusterInfo()
429     # cluster data
430     data = {
431       "version": constants.IALLOCATOR_VERSION,
432       "cluster_name": self.cfg.GetClusterName(),
433       "cluster_tags": list(cluster_info.GetTags()),
434       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
435       "ipolicy": cluster_info.ipolicy,
436       }
437     ninfo = self.cfg.GetAllNodesInfo()
438     iinfo = self.cfg.GetAllInstancesInfo().values()
439     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
440
441     # node data
442     node_list = [n.uuid for n in ninfo.values() if n.vm_capable]
443
444     if isinstance(self.req, IAReqInstanceAlloc):
445       hypervisor_name = self.req.hypervisor
446       node_whitelist = self.req.node_whitelist
447     elif isinstance(self.req, IAReqRelocate):
448       hypervisor_name = self.cfg.GetInstanceInfo(self.req.inst_uuid).hypervisor
449       node_whitelist = None
450     else:
451       hypervisor_name = cluster_info.primary_hypervisor
452       node_whitelist = None
453
454     has_lvm = utils.storage.IsLvmEnabled(cluster_info.enabled_disk_templates)
455     node_data = self._ComputerClusterDataNodeInfo(node_list, cluster_info,
456                                                   hypervisor_name)
457
458     node_iinfo = \
459       self.rpc.call_all_instances_info(node_list,
460                                        cluster_info.enabled_hypervisors,
461                                        cluster_info.hvparams)
462
463     data["nodegroups"] = self._ComputeNodeGroupData(self.cfg)
464
465     config_ndata = self._ComputeBasicNodeData(self.cfg, ninfo, node_whitelist)
466     data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
467                                                  i_list, config_ndata, has_lvm)
468     assert len(data["nodes"]) == len(ninfo), \
469         "Incomplete node data computed"
470
471     data["instances"] = self._ComputeInstanceData(self.cfg, cluster_info,
472                                                   i_list)
473
474     self.in_data = data
475
476   @staticmethod
477   def _ComputeNodeGroupData(cfg):
478     """Compute node groups data.
479
480     """
481     cluster = cfg.GetClusterInfo()
482     ng = dict((guuid, {
483       "name": gdata.name,
484       "alloc_policy": gdata.alloc_policy,
485       "networks": [net_uuid for net_uuid, _ in gdata.networks.items()],
486       "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
487       "tags": list(gdata.GetTags()),
488       })
489       for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
490
491     return ng
492
493   @staticmethod
494   def _ComputeBasicNodeData(cfg, node_cfg, node_whitelist):
495     """Compute global node data.
496
497     @rtype: dict
498     @returns: a dict of name: (node dict, node config)
499
500     """
501     # fill in static (config-based) values
502     node_results = dict((ninfo.name, {
503       "tags": list(ninfo.GetTags()),
504       "primary_ip": ninfo.primary_ip,
505       "secondary_ip": ninfo.secondary_ip,
506       "offline": (ninfo.offline or
507                   not (node_whitelist is None or
508                        ninfo.name in node_whitelist)),
509       "drained": ninfo.drained,
510       "master_candidate": ninfo.master_candidate,
511       "group": ninfo.group,
512       "master_capable": ninfo.master_capable,
513       "vm_capable": ninfo.vm_capable,
514       "ndparams": cfg.GetNdParams(ninfo),
515       })
516       for ninfo in node_cfg.values())
517
518     return node_results
519
520   @staticmethod
521   def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
522                               node_results, has_lvm):
523     """Compute global node data.
524
525     @param node_results: the basic node structures as filled from the config
526
527     """
528     #TODO(dynmem): compute the right data on MAX and MIN memory
529     # make a copy of the current dict
530     node_results = dict(node_results)
531     for nuuid, nresult in node_data.items():
532       ninfo = node_cfg[nuuid]
533       assert ninfo.name in node_results, "Missing basic data for node %s" % \
534                                          ninfo.name
535
536       if not (ninfo.offline or ninfo.drained):
537         nresult.Raise("Can't get data for node %s" % ninfo.name)
538         node_iinfo[nuuid].Raise("Can't get node instance info from node %s" %
539                                 ninfo.name)
540         remote_info = rpc.MakeLegacyNodeInfo(nresult.payload,
541                                              require_vg_info=has_lvm)
542
543         def get_attr(attr):
544           if attr not in remote_info:
545             raise errors.OpExecError("Node '%s' didn't return attribute"
546                                      " '%s'" % (ninfo.name, attr))
547           value = remote_info[attr]
548           if not isinstance(value, int):
549             raise errors.OpExecError("Node '%s' returned invalid value"
550                                      " for '%s': %s" %
551                                      (ninfo.name, attr, value))
552           return value
553
554         mem_free = get_attr("memory_free")
555
556         # compute memory used by primary instances
557         i_p_mem = i_p_up_mem = 0
558         for iinfo, beinfo in i_list:
559           if iinfo.primary_node == nuuid:
560             i_p_mem += beinfo[constants.BE_MAXMEM]
561             if iinfo.name not in node_iinfo[nuuid].payload:
562               i_used_mem = 0
563             else:
564               i_used_mem = int(node_iinfo[nuuid].payload[iinfo.name]["memory"])
565             i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
566             mem_free -= max(0, i_mem_diff)
567
568             if iinfo.admin_state == constants.ADMINST_UP:
569               i_p_up_mem += beinfo[constants.BE_MAXMEM]
570
571         # TODO: replace this with proper storage reporting
572         if has_lvm:
573           total_disk = get_attr("storage_size")
574           free_disk = get_attr("storage_free")
575           total_spindles = get_attr("spindles_total")
576           free_spindles = get_attr("spindles_free")
577         else:
578           # we didn't even ask the node for VG status, so use zeros
579           total_disk = free_disk = 0
580           total_spindles = free_spindles = 0
581
582         # compute memory used by instances
583         pnr_dyn = {
584           "total_memory": get_attr("memory_total"),
585           "reserved_memory": get_attr("memory_dom0"),
586           "free_memory": mem_free,
587           "total_disk": total_disk,
588           "free_disk": free_disk,
589           "total_spindles": total_spindles,
590           "free_spindles": free_spindles,
591           "total_cpus": get_attr("cpu_total"),
592           "i_pri_memory": i_p_mem,
593           "i_pri_up_memory": i_p_up_mem,
594           }
595         pnr_dyn.update(node_results[ninfo.name])
596         node_results[ninfo.name] = pnr_dyn
597
598     return node_results
599
600   @staticmethod
601   def _ComputeInstanceData(cfg, cluster_info, i_list):
602     """Compute global instance data.
603
604     """
605     instance_data = {}
606     for iinfo, beinfo in i_list:
607       nic_data = []
608       for nic in iinfo.nics:
609         filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
610         nic_dict = {
611           "mac": nic.mac,
612           "ip": nic.ip,
613           "mode": filled_params[constants.NIC_MODE],
614           "link": filled_params[constants.NIC_LINK],
615           }
616         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
617           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
618         nic_data.append(nic_dict)
619       pir = {
620         "tags": list(iinfo.GetTags()),
621         "admin_state": iinfo.admin_state,
622         "vcpus": beinfo[constants.BE_VCPUS],
623         "memory": beinfo[constants.BE_MAXMEM],
624         "spindle_use": beinfo[constants.BE_SPINDLE_USE],
625         "os": iinfo.os,
626         "nodes": [cfg.GetNodeName(iinfo.primary_node)] +
627                  cfg.GetNodeNames(iinfo.secondary_nodes),
628         "nics": nic_data,
629         "disks": [{constants.IDISK_SIZE: dsk.size,
630                    constants.IDISK_MODE: dsk.mode,
631                    constants.IDISK_SPINDLES: dsk.spindles}
632                   for dsk in iinfo.disks],
633         "disk_template": iinfo.disk_template,
634         "disks_active": iinfo.disks_active,
635         "hypervisor": iinfo.hypervisor,
636         }
637       pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template,
638                                                     pir["disks"])
639       instance_data[iinfo.name] = pir
640
641     return instance_data
642
643   def _BuildInputData(self, req):
644     """Build input data structures.
645
646     """
647     self._ComputeClusterData()
648
649     request = req.GetRequest(self.cfg)
650     request["type"] = req.MODE
651     self.in_data["request"] = request
652
653     self.in_text = serializer.Dump(self.in_data)
654
655   def Run(self, name, validate=True, call_fn=None):
656     """Run an instance allocator and return the results.
657
658     """
659     if call_fn is None:
660       call_fn = self.rpc.call_iallocator_runner
661
662     result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
663     result.Raise("Failure while running the iallocator script")
664
665     self.out_text = result.payload
666     if validate:
667       self._ValidateResult()
668
669   def _ValidateResult(self):
670     """Process the allocator results.
671
672     This will process and if successful save the result in
673     self.out_data and the other parameters.
674
675     """
676     try:
677       rdict = serializer.Load(self.out_text)
678     except Exception, err:
679       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
680
681     if not isinstance(rdict, dict):
682       raise errors.OpExecError("Can't parse iallocator results: not a dict")
683
684     # TODO: remove backwards compatiblity in later versions
685     if "nodes" in rdict and "result" not in rdict:
686       rdict["result"] = rdict["nodes"]
687       del rdict["nodes"]
688
689     for key in "success", "info", "result":
690       if key not in rdict:
691         raise errors.OpExecError("Can't parse iallocator results:"
692                                  " missing key '%s'" % key)
693       setattr(self, key, rdict[key])
694
695     self.req.ValidateResult(self, self.result)
696     self.out_data = rdict