Merge branch 'devel-2.6' into master
[ganeti-local] / lib / masterd / iallocator.py
1 #
2 #
3
4 # Copyright (C) 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the iallocator code."""
23
24 from ganeti import compat
25 from ganeti import constants
26 from ganeti import errors
27 from ganeti import ht
28 from ganeti import objectutils
29 from ganeti import opcodes
30 from ganeti import rpc
31 from ganeti import serializer
32 from ganeti import utils
33
34 import ganeti.masterd.instance as gmi
35
36
37 _STRING_LIST = ht.TListOf(ht.TString)
38 _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
39    # pylint: disable=E1101
40    # Class '...' has no 'OP_ID' member
41    "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
42                         opcodes.OpInstanceMigrate.OP_ID,
43                         opcodes.OpInstanceReplaceDisks.OP_ID])
44    })))
45
46 _NEVAC_MOVED = \
47   ht.TListOf(ht.TAnd(ht.TIsLength(3),
48                      ht.TItems([ht.TNonEmptyString,
49                                 ht.TNonEmptyString,
50                                 ht.TListOf(ht.TNonEmptyString),
51                                 ])))
52 _NEVAC_FAILED = \
53   ht.TListOf(ht.TAnd(ht.TIsLength(2),
54                      ht.TItems([ht.TNonEmptyString,
55                                 ht.TMaybeString,
56                                 ])))
57 _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
58                         ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
59
60 _INST_NAME = ("name", ht.TNonEmptyString)
61
62
63 class _AutoReqParam(objectutils.AutoSlots):
64   """Meta class for request definitions.
65
66   """
67   @classmethod
68   def _GetSlots(mcs, attrs):
69     """Extract the slots out of REQ_PARAMS.
70
71     """
72     params = attrs.setdefault("REQ_PARAMS", [])
73     return [slot for (slot, _) in params]
74
75
76 class IARequestBase(objectutils.ValidatedSlots):
77   """A generic IAllocator request object.
78
79   """
80   __metaclass__ = _AutoReqParam
81
82   MODE = NotImplemented
83   REQ_PARAMS = []
84   REQ_RESULT = NotImplemented
85
86   def __init__(self, **kwargs):
87     """Constructor for IARequestBase.
88
89     The constructor takes only keyword arguments and will set
90     attributes on this object based on the passed arguments. As such,
91     it means that you should not pass arguments which are not in the
92     REQ_PARAMS attribute for this class.
93
94     """
95     objectutils.ValidatedSlots.__init__(self, **kwargs)
96
97     self.Validate()
98
99   def Validate(self):
100     """Validates all parameters of the request.
101
102     """
103     assert self.MODE in constants.VALID_IALLOCATOR_MODES
104
105     for (param, validator) in self.REQ_PARAMS:
106       if not hasattr(self, param):
107         raise errors.OpPrereqError("Request is missing '%s' parameter" % param,
108                                    errors.ECODE_INVAL)
109
110       value = getattr(self, param)
111       if not validator(value):
112         raise errors.OpPrereqError(("Request parameter '%s' has invalid"
113                                     " type %s/value %s") %
114                                     (param, type(value), value),
115                                     errors.ECODE_INVAL)
116
117   def GetRequest(self, cfg):
118     """Gets the request data dict.
119
120     @param cfg: The configuration instance
121
122     """
123     raise NotImplementedError
124
125   def ValidateResult(self, ia, result):
126     """Validates the result of an request.
127
128     @param ia: The IAllocator instance
129     @param result: The IAllocator run result
130     @raises ResultValidationError: If validation fails
131
132     """
133     if ia.success and not self.REQ_RESULT(result):
134       raise errors.ResultValidationError("iallocator returned invalid result,"
135                                          " expected %s, got %s" %
136                                          (self.REQ_RESULT, result))
137
138
139 class IAReqInstanceAlloc(IARequestBase):
140   """An instance allocation request.
141
142   """
143   # pylint: disable=E1101
144   MODE = constants.IALLOCATOR_MODE_ALLOC
145   REQ_PARAMS = [
146     _INST_NAME,
147     ("memory", ht.TPositiveInt),
148     ("spindle_use", ht.TPositiveInt),
149     ("disks", ht.TListOf(ht.TDict)),
150     ("disk_template", ht.TString),
151     ("os", ht.TString),
152     ("tags", _STRING_LIST),
153     ("nics", ht.TListOf(ht.TDict)),
154     ("vcpus", ht.TInt),
155     ("hypervisor", ht.TString),
156     ]
157   REQ_RESULT = ht.TList
158
159   def RequiredNodes(self):
160     """Calculates the required nodes based on the disk_template.
161
162     """
163     if self.disk_template in constants.DTS_INT_MIRROR:
164       return 2
165     else:
166       return 1
167
168   def GetRequest(self, cfg):
169     """Requests a new instance.
170
171     The checks for the completeness of the opcode must have already been
172     done.
173
174     """
175     disk_space = gmi.ComputeDiskSize(self.disk_template, self.disks)
176
177     return {
178       "name": self.name,
179       "disk_template": self.disk_template,
180       "tags": self.tags,
181       "os": self.os,
182       "vcpus": self.vcpus,
183       "memory": self.memory,
184       "spindle_use": self.spindle_use,
185       "disks": self.disks,
186       "disk_space_total": disk_space,
187       "nics": self.nics,
188       "required_nodes": self.RequiredNodes(),
189       "hypervisor": self.hypervisor,
190       }
191
192   def ValidateResult(self, ia, result):
193     """Validates an single instance allocation request.
194
195     """
196     IARequestBase.ValidateResult(self, ia, result)
197
198     if ia.success and len(result) != self.RequiredNodes():
199       raise errors.ResultValidationError("iallocator returned invalid number"
200                                          " of nodes (%s), required %s" %
201                                          (len(result), self.RequiredNodes()))
202
203
204 class IAReqMultiInstanceAlloc(IARequestBase):
205   """An multi instance allocation request.
206
207   """
208   # pylint: disable=E1101
209   MODE = constants.IALLOCATOR_MODE_MULTI_ALLOC
210   REQ_PARAMS = [
211     ("instances", ht.TListOf(ht.TInstanceOf(IAReqInstanceAlloc)))
212     ]
213   _MASUCCESS = \
214     ht.TListOf(ht.TAnd(ht.TIsLength(2),
215                        ht.TItems([ht.TNonEmptyString,
216                                   ht.TListOf(ht.TNonEmptyString),
217                                   ])))
218   _MAFAILED = ht.TListOf(ht.TNonEmptyString)
219   REQ_RESULT = ht.TAnd(ht.TList, ht.TIsLength(2),
220                        ht.TItems([_MASUCCESS, _MAFAILED]))
221
222   def GetRequest(self, cfg):
223     return {
224       "instances": [iareq.GetRequest(cfg) for iareq in self.instances]
225       }
226
227
228 class IAReqRelocate(IARequestBase):
229   """A relocation request.
230
231   """
232   # pylint: disable=E1101
233   MODE = constants.IALLOCATOR_MODE_RELOC
234   REQ_PARAMS = [
235     _INST_NAME,
236     ("relocate_from", _STRING_LIST),
237     ]
238   REQ_RESULT = ht.TList
239
240   def GetRequest(self, cfg):
241     """Request an relocation of an instance
242
243     The checks for the completeness of the opcode must have already been
244     done.
245
246     """
247     instance = cfg.GetInstanceInfo(self.name)
248     if instance is None:
249       raise errors.ProgrammerError("Unknown instance '%s' passed to"
250                                    " IAllocator" % self.name)
251
252     if instance.disk_template not in constants.DTS_MIRRORED:
253       raise errors.OpPrereqError("Can't relocate non-mirrored instances",
254                                  errors.ECODE_INVAL)
255
256     if (instance.disk_template in constants.DTS_INT_MIRROR and
257         len(instance.secondary_nodes) != 1):
258       raise errors.OpPrereqError("Instance has not exactly one secondary node",
259                                  errors.ECODE_STATE)
260
261     disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
262     disk_space = gmi.ComputeDiskSize(instance.disk_template, disk_sizes)
263
264     return {
265       "name": self.name,
266       "disk_space_total": disk_space,
267       "required_nodes": 1,
268       "relocate_from": self.relocate_from,
269       }
270
271   def ValidateResult(self, ia, result):
272     """Validates the result of an relocation request.
273
274     """
275     IARequestBase.ValidateResult(self, ia, result)
276
277     node2group = dict((name, ndata["group"])
278                       for (name, ndata) in ia.in_data["nodes"].items())
279
280     fn = compat.partial(self._NodesToGroups, node2group,
281                         ia.in_data["nodegroups"])
282
283     instance = ia.cfg.GetInstanceInfo(self.name)
284     request_groups = fn(self.relocate_from + [instance.primary_node])
285     result_groups = fn(result + [instance.primary_node])
286
287     if ia.success and not set(result_groups).issubset(request_groups):
288       raise errors.ResultValidationError("Groups of nodes returned by"
289                                          "iallocator (%s) differ from original"
290                                          " groups (%s)" %
291                                          (utils.CommaJoin(result_groups),
292                                           utils.CommaJoin(request_groups)))
293
294   @staticmethod
295   def _NodesToGroups(node2group, groups, nodes):
296     """Returns a list of unique group names for a list of nodes.
297
298     @type node2group: dict
299     @param node2group: Map from node name to group UUID
300     @type groups: dict
301     @param groups: Group information
302     @type nodes: list
303     @param nodes: Node names
304
305     """
306     result = set()
307
308     for node in nodes:
309       try:
310         group_uuid = node2group[node]
311       except KeyError:
312         # Ignore unknown node
313         pass
314       else:
315         try:
316           group = groups[group_uuid]
317         except KeyError:
318           # Can't find group, let's use UUID
319           group_name = group_uuid
320         else:
321           group_name = group["name"]
322
323         result.add(group_name)
324
325     return sorted(result)
326
327
328 class IAReqNodeEvac(IARequestBase):
329   """A node evacuation request.
330
331   """
332   # pylint: disable=E1101
333   MODE = constants.IALLOCATOR_MODE_NODE_EVAC
334   REQ_PARAMS = [
335     ("instances", _STRING_LIST),
336     ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
337     ]
338   REQ_RESULT = _NEVAC_RESULT
339
340   def GetRequest(self, cfg):
341     """Get data for node-evacuate requests.
342
343     """
344     return {
345       "instances": self.instances,
346       "evac_mode": self.evac_mode,
347       }
348
349
350 class IAReqGroupChange(IARequestBase):
351   """A group change request.
352
353   """
354   # pylint: disable=E1101
355   MODE = constants.IALLOCATOR_MODE_CHG_GROUP
356   REQ_PARAMS = [
357     ("instances", _STRING_LIST),
358     ("target_groups", _STRING_LIST),
359     ]
360   REQ_RESULT = _NEVAC_RESULT
361
362   def GetRequest(self, cfg):
363     """Get data for node-evacuate requests.
364
365     """
366     return {
367       "instances": self.instances,
368       "target_groups": self.target_groups,
369       }
370
371
372 class IAllocator(object):
373   """IAllocator framework.
374
375   An IAllocator instance has three sets of attributes:
376     - cfg that is needed to query the cluster
377     - input data (all members of the _KEYS class attribute are required)
378     - four buffer attributes (in|out_data|text), that represent the
379       input (to the external script) in text and data structure format,
380       and the output from it, again in two formats
381     - the result variables from the script (success, info, nodes) for
382       easy usage
383
384   """
385   # pylint: disable=R0902
386   # lots of instance attributes
387
388   def __init__(self, cfg, rpc_runner, req):
389     self.cfg = cfg
390     self.rpc = rpc_runner
391     self.req = req
392     # init buffer variables
393     self.in_text = self.out_text = self.in_data = self.out_data = None
394     # init result fields
395     self.success = self.info = self.result = None
396
397     self._BuildInputData(req)
398
399   def _ComputeClusterData(self):
400     """Compute the generic allocator input data.
401
402     This is the data that is independent of the actual operation.
403
404     """
405     cfg = self.cfg
406     cluster_info = cfg.GetClusterInfo()
407     # cluster data
408     data = {
409       "version": constants.IALLOCATOR_VERSION,
410       "cluster_name": cfg.GetClusterName(),
411       "cluster_tags": list(cluster_info.GetTags()),
412       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
413       "ipolicy": cluster_info.ipolicy,
414       }
415     ninfo = cfg.GetAllNodesInfo()
416     iinfo = cfg.GetAllInstancesInfo().values()
417     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
418
419     # node data
420     node_list = [n.name for n in ninfo.values() if n.vm_capable]
421
422     if isinstance(self.req, IAReqInstanceAlloc):
423       hypervisor_name = self.req.hypervisor
424     elif isinstance(self.req, IAReqRelocate):
425       hypervisor_name = cfg.GetInstanceInfo(self.req.name).hypervisor
426     else:
427       hypervisor_name = cluster_info.primary_hypervisor
428
429     node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
430                                         [hypervisor_name])
431     node_iinfo = \
432       self.rpc.call_all_instances_info(node_list,
433                                        cluster_info.enabled_hypervisors)
434
435     data["nodegroups"] = self._ComputeNodeGroupData(cfg)
436
437     config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
438     data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
439                                                  i_list, config_ndata)
440     assert len(data["nodes"]) == len(ninfo), \
441         "Incomplete node data computed"
442
443     data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
444
445     self.in_data = data
446
447   @staticmethod
448   def _ComputeNodeGroupData(cfg):
449     """Compute node groups data.
450
451     """
452     cluster = cfg.GetClusterInfo()
453     ng = dict((guuid, {
454       "name": gdata.name,
455       "alloc_policy": gdata.alloc_policy,
456       "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
457       "tags": list(gdata.tags),
458       })
459       for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
460
461     return ng
462
463   @staticmethod
464   def _ComputeBasicNodeData(cfg, node_cfg):
465     """Compute global node data.
466
467     @rtype: dict
468     @returns: a dict of name: (node dict, node config)
469
470     """
471     # fill in static (config-based) values
472     node_results = dict((ninfo.name, {
473       "tags": list(ninfo.GetTags()),
474       "primary_ip": ninfo.primary_ip,
475       "secondary_ip": ninfo.secondary_ip,
476       "offline": ninfo.offline,
477       "drained": ninfo.drained,
478       "master_candidate": ninfo.master_candidate,
479       "group": ninfo.group,
480       "master_capable": ninfo.master_capable,
481       "vm_capable": ninfo.vm_capable,
482       "ndparams": cfg.GetNdParams(ninfo),
483       })
484       for ninfo in node_cfg.values())
485
486     return node_results
487
488   @staticmethod
489   def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
490                               node_results):
491     """Compute global node data.
492
493     @param node_results: the basic node structures as filled from the config
494
495     """
496     #TODO(dynmem): compute the right data on MAX and MIN memory
497     # make a copy of the current dict
498     node_results = dict(node_results)
499     for nname, nresult in node_data.items():
500       assert nname in node_results, "Missing basic data for node %s" % nname
501       ninfo = node_cfg[nname]
502
503       if not (ninfo.offline or ninfo.drained):
504         nresult.Raise("Can't get data for node %s" % nname)
505         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
506                                 nname)
507         remote_info = rpc.MakeLegacyNodeInfo(nresult.payload)
508
509         for attr in ["memory_total", "memory_free", "memory_dom0",
510                      "vg_size", "vg_free", "cpu_total"]:
511           if attr not in remote_info:
512             raise errors.OpExecError("Node '%s' didn't return attribute"
513                                      " '%s'" % (nname, attr))
514           if not isinstance(remote_info[attr], int):
515             raise errors.OpExecError("Node '%s' returned invalid value"
516                                      " for '%s': %s" %
517                                      (nname, attr, remote_info[attr]))
518         # compute memory used by primary instances
519         i_p_mem = i_p_up_mem = 0
520         for iinfo, beinfo in i_list:
521           if iinfo.primary_node == nname:
522             i_p_mem += beinfo[constants.BE_MAXMEM]
523             if iinfo.name not in node_iinfo[nname].payload:
524               i_used_mem = 0
525             else:
526               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
527             i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
528             remote_info["memory_free"] -= max(0, i_mem_diff)
529
530             if iinfo.admin_state == constants.ADMINST_UP:
531               i_p_up_mem += beinfo[constants.BE_MAXMEM]
532
533         # compute memory used by instances
534         pnr_dyn = {
535           "total_memory": remote_info["memory_total"],
536           "reserved_memory": remote_info["memory_dom0"],
537           "free_memory": remote_info["memory_free"],
538           "total_disk": remote_info["vg_size"],
539           "free_disk": remote_info["vg_free"],
540           "total_cpus": remote_info["cpu_total"],
541           "i_pri_memory": i_p_mem,
542           "i_pri_up_memory": i_p_up_mem,
543           }
544         pnr_dyn.update(node_results[nname])
545         node_results[nname] = pnr_dyn
546
547     return node_results
548
549   @staticmethod
550   def _ComputeInstanceData(cluster_info, i_list):
551     """Compute global instance data.
552
553     """
554     instance_data = {}
555     for iinfo, beinfo in i_list:
556       nic_data = []
557       for nic in iinfo.nics:
558         filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
559         nic_dict = {
560           "mac": nic.mac,
561           "ip": nic.ip,
562           "mode": filled_params[constants.NIC_MODE],
563           "link": filled_params[constants.NIC_LINK],
564           }
565         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
566           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
567         nic_data.append(nic_dict)
568       pir = {
569         "tags": list(iinfo.GetTags()),
570         "admin_state": iinfo.admin_state,
571         "vcpus": beinfo[constants.BE_VCPUS],
572         "memory": beinfo[constants.BE_MAXMEM],
573         "spindle_use": beinfo[constants.BE_SPINDLE_USE],
574         "os": iinfo.os,
575         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
576         "nics": nic_data,
577         "disks": [{constants.IDISK_SIZE: dsk.size,
578                    constants.IDISK_MODE: dsk.mode}
579                   for dsk in iinfo.disks],
580         "disk_template": iinfo.disk_template,
581         "hypervisor": iinfo.hypervisor,
582         }
583       pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template,
584                                                     pir["disks"])
585       instance_data[iinfo.name] = pir
586
587     return instance_data
588
589   def _BuildInputData(self, req):
590     """Build input data structures.
591
592     """
593     self._ComputeClusterData()
594
595     request = req.GetRequest(self.cfg)
596     request["type"] = req.MODE
597     self.in_data["request"] = request
598
599     self.in_text = serializer.Dump(self.in_data)
600
601   def Run(self, name, validate=True, call_fn=None):
602     """Run an instance allocator and return the results.
603
604     """
605     if call_fn is None:
606       call_fn = self.rpc.call_iallocator_runner
607
608     result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
609     result.Raise("Failure while running the iallocator script")
610
611     self.out_text = result.payload
612     if validate:
613       self._ValidateResult()
614
615   def _ValidateResult(self):
616     """Process the allocator results.
617
618     This will process and if successful save the result in
619     self.out_data and the other parameters.
620
621     """
622     try:
623       rdict = serializer.Load(self.out_text)
624     except Exception, err:
625       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
626
627     if not isinstance(rdict, dict):
628       raise errors.OpExecError("Can't parse iallocator results: not a dict")
629
630     # TODO: remove backwards compatiblity in later versions
631     if "nodes" in rdict and "result" not in rdict:
632       rdict["result"] = rdict["nodes"]
633       del rdict["nodes"]
634
635     for key in "success", "info", "result":
636       if key not in rdict:
637         raise errors.OpExecError("Can't parse iallocator results:"
638                                  " missing key '%s'" % key)
639       setattr(self, key, rdict[key])
640
641     self.req.ValidateResult(self, self.result)
642     self.out_data = rdict