Export connected networks to IAllocator
[ganeti-local] / lib / masterd / iallocator.py
1 #
2 #
3
4 # Copyright (C) 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the iallocator code."""
23
24 from ganeti import compat
25 from ganeti import constants
26 from ganeti import errors
27 from ganeti import ht
28 from ganeti import outils
29 from ganeti import opcodes
30 from ganeti import rpc
31 from ganeti import serializer
32 from ganeti import utils
33
34 import ganeti.masterd.instance as gmi
35
36
37 _STRING_LIST = ht.TListOf(ht.TString)
38 _JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
39    # pylint: disable=E1101
40    # Class '...' has no 'OP_ID' member
41    "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
42                         opcodes.OpInstanceMigrate.OP_ID,
43                         opcodes.OpInstanceReplaceDisks.OP_ID]),
44    })))
45
46 _NEVAC_MOVED = \
47   ht.TListOf(ht.TAnd(ht.TIsLength(3),
48                      ht.TItems([ht.TNonEmptyString,
49                                 ht.TNonEmptyString,
50                                 ht.TListOf(ht.TNonEmptyString),
51                                 ])))
52 _NEVAC_FAILED = \
53   ht.TListOf(ht.TAnd(ht.TIsLength(2),
54                      ht.TItems([ht.TNonEmptyString,
55                                 ht.TMaybeString,
56                                 ])))
57 _NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
58                         ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
59
60 _INST_NAME = ("name", ht.TNonEmptyString)
61
62
63 class _AutoReqParam(outils.AutoSlots):
64   """Meta class for request definitions.
65
66   """
67   @classmethod
68   def _GetSlots(mcs, attrs):
69     """Extract the slots out of REQ_PARAMS.
70
71     """
72     params = attrs.setdefault("REQ_PARAMS", [])
73     return [slot for (slot, _) in params]
74
75
76 class IARequestBase(outils.ValidatedSlots):
77   """A generic IAllocator request object.
78
79   """
80   __metaclass__ = _AutoReqParam
81
82   MODE = NotImplemented
83   REQ_PARAMS = []
84   REQ_RESULT = NotImplemented
85
86   def __init__(self, **kwargs):
87     """Constructor for IARequestBase.
88
89     The constructor takes only keyword arguments and will set
90     attributes on this object based on the passed arguments. As such,
91     it means that you should not pass arguments which are not in the
92     REQ_PARAMS attribute for this class.
93
94     """
95     outils.ValidatedSlots.__init__(self, **kwargs)
96
97     self.Validate()
98
99   def Validate(self):
100     """Validates all parameters of the request.
101
102     """
103     assert self.MODE in constants.VALID_IALLOCATOR_MODES
104
105     for (param, validator) in self.REQ_PARAMS:
106       if not hasattr(self, param):
107         raise errors.OpPrereqError("Request is missing '%s' parameter" % param,
108                                    errors.ECODE_INVAL)
109
110       value = getattr(self, param)
111       if not validator(value):
112         raise errors.OpPrereqError(("Request parameter '%s' has invalid"
113                                     " type %s/value %s") %
114                                     (param, type(value), value),
115                                     errors.ECODE_INVAL)
116
117   def GetRequest(self, cfg):
118     """Gets the request data dict.
119
120     @param cfg: The configuration instance
121
122     """
123     raise NotImplementedError
124
125   def ValidateResult(self, ia, result):
126     """Validates the result of an request.
127
128     @param ia: The IAllocator instance
129     @param result: The IAllocator run result
130     @raises ResultValidationError: If validation fails
131
132     """
133     if ia.success and not self.REQ_RESULT(result):
134       raise errors.ResultValidationError("iallocator returned invalid result,"
135                                          " expected %s, got %s" %
136                                          (self.REQ_RESULT, result))
137
138
139 class IAReqInstanceAlloc(IARequestBase):
140   """An instance allocation request.
141
142   """
143   # pylint: disable=E1101
144   MODE = constants.IALLOCATOR_MODE_ALLOC
145   REQ_PARAMS = [
146     _INST_NAME,
147     ("memory", ht.TNonNegativeInt),
148     ("spindle_use", ht.TNonNegativeInt),
149     ("disks", ht.TListOf(ht.TDict)),
150     ("disk_template", ht.TString),
151     ("os", ht.TString),
152     ("tags", _STRING_LIST),
153     ("nics", ht.TListOf(ht.TDict)),
154     ("vcpus", ht.TInt),
155     ("hypervisor", ht.TString),
156     ("node_whitelist", ht.TMaybeListOf(ht.TNonEmptyString)),
157     ]
158   REQ_RESULT = ht.TList
159
160   def RequiredNodes(self):
161     """Calculates the required nodes based on the disk_template.
162
163     """
164     if self.disk_template in constants.DTS_INT_MIRROR:
165       return 2
166     else:
167       return 1
168
169   def GetRequest(self, cfg):
170     """Requests a new instance.
171
172     The checks for the completeness of the opcode must have already been
173     done.
174
175     """
176     disk_space = gmi.ComputeDiskSize(self.disk_template, self.disks)
177
178     return {
179       "name": self.name,
180       "disk_template": self.disk_template,
181       "tags": self.tags,
182       "os": self.os,
183       "vcpus": self.vcpus,
184       "memory": self.memory,
185       "spindle_use": self.spindle_use,
186       "disks": self.disks,
187       "disk_space_total": disk_space,
188       "nics": self.nics,
189       "required_nodes": self.RequiredNodes(),
190       "hypervisor": self.hypervisor,
191       }
192
193   def ValidateResult(self, ia, result):
194     """Validates an single instance allocation request.
195
196     """
197     IARequestBase.ValidateResult(self, ia, result)
198
199     if ia.success and len(result) != self.RequiredNodes():
200       raise errors.ResultValidationError("iallocator returned invalid number"
201                                          " of nodes (%s), required %s" %
202                                          (len(result), self.RequiredNodes()))
203
204
205 class IAReqMultiInstanceAlloc(IARequestBase):
206   """An multi instance allocation request.
207
208   """
209   # pylint: disable=E1101
210   MODE = constants.IALLOCATOR_MODE_MULTI_ALLOC
211   REQ_PARAMS = [
212     ("instances", ht.TListOf(ht.TInstanceOf(IAReqInstanceAlloc))),
213     ]
214   _MASUCCESS = \
215     ht.TListOf(ht.TAnd(ht.TIsLength(2),
216                        ht.TItems([ht.TNonEmptyString,
217                                   ht.TListOf(ht.TNonEmptyString),
218                                   ])))
219   _MAFAILED = ht.TListOf(ht.TNonEmptyString)
220   REQ_RESULT = ht.TAnd(ht.TList, ht.TIsLength(2),
221                        ht.TItems([_MASUCCESS, _MAFAILED]))
222
223   def GetRequest(self, cfg):
224     return {
225       "instances": [iareq.GetRequest(cfg) for iareq in self.instances],
226       }
227
228
229 class IAReqRelocate(IARequestBase):
230   """A relocation request.
231
232   """
233   # pylint: disable=E1101
234   MODE = constants.IALLOCATOR_MODE_RELOC
235   REQ_PARAMS = [
236     _INST_NAME,
237     ("relocate_from", _STRING_LIST),
238     ]
239   REQ_RESULT = ht.TList
240
241   def GetRequest(self, cfg):
242     """Request an relocation of an instance
243
244     The checks for the completeness of the opcode must have already been
245     done.
246
247     """
248     instance = cfg.GetInstanceInfo(self.name)
249     if instance is None:
250       raise errors.ProgrammerError("Unknown instance '%s' passed to"
251                                    " IAllocator" % self.name)
252
253     if instance.disk_template not in constants.DTS_MIRRORED:
254       raise errors.OpPrereqError("Can't relocate non-mirrored instances",
255                                  errors.ECODE_INVAL)
256
257     if (instance.disk_template in constants.DTS_INT_MIRROR and
258         len(instance.secondary_nodes) != 1):
259       raise errors.OpPrereqError("Instance has not exactly one secondary node",
260                                  errors.ECODE_STATE)
261
262     disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
263     disk_space = gmi.ComputeDiskSize(instance.disk_template, disk_sizes)
264
265     return {
266       "name": self.name,
267       "disk_space_total": disk_space,
268       "required_nodes": 1,
269       "relocate_from": self.relocate_from,
270       }
271
272   def ValidateResult(self, ia, result):
273     """Validates the result of an relocation request.
274
275     """
276     IARequestBase.ValidateResult(self, ia, result)
277
278     node2group = dict((name, ndata["group"])
279                       for (name, ndata) in ia.in_data["nodes"].items())
280
281     fn = compat.partial(self._NodesToGroups, node2group,
282                         ia.in_data["nodegroups"])
283
284     instance = ia.cfg.GetInstanceInfo(self.name)
285     request_groups = fn(self.relocate_from + [instance.primary_node])
286     result_groups = fn(result + [instance.primary_node])
287
288     if ia.success and not set(result_groups).issubset(request_groups):
289       raise errors.ResultValidationError("Groups of nodes returned by"
290                                          "iallocator (%s) differ from original"
291                                          " groups (%s)" %
292                                          (utils.CommaJoin(result_groups),
293                                           utils.CommaJoin(request_groups)))
294
295   @staticmethod
296   def _NodesToGroups(node2group, groups, nodes):
297     """Returns a list of unique group names for a list of nodes.
298
299     @type node2group: dict
300     @param node2group: Map from node name to group UUID
301     @type groups: dict
302     @param groups: Group information
303     @type nodes: list
304     @param nodes: Node names
305
306     """
307     result = set()
308
309     for node in nodes:
310       try:
311         group_uuid = node2group[node]
312       except KeyError:
313         # Ignore unknown node
314         pass
315       else:
316         try:
317           group = groups[group_uuid]
318         except KeyError:
319           # Can't find group, let's use UUID
320           group_name = group_uuid
321         else:
322           group_name = group["name"]
323
324         result.add(group_name)
325
326     return sorted(result)
327
328
329 class IAReqNodeEvac(IARequestBase):
330   """A node evacuation request.
331
332   """
333   # pylint: disable=E1101
334   MODE = constants.IALLOCATOR_MODE_NODE_EVAC
335   REQ_PARAMS = [
336     ("instances", _STRING_LIST),
337     ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
338     ]
339   REQ_RESULT = _NEVAC_RESULT
340
341   def GetRequest(self, cfg):
342     """Get data for node-evacuate requests.
343
344     """
345     return {
346       "instances": self.instances,
347       "evac_mode": self.evac_mode,
348       }
349
350
351 class IAReqGroupChange(IARequestBase):
352   """A group change request.
353
354   """
355   # pylint: disable=E1101
356   MODE = constants.IALLOCATOR_MODE_CHG_GROUP
357   REQ_PARAMS = [
358     ("instances", _STRING_LIST),
359     ("target_groups", _STRING_LIST),
360     ]
361   REQ_RESULT = _NEVAC_RESULT
362
363   def GetRequest(self, cfg):
364     """Get data for node-evacuate requests.
365
366     """
367     return {
368       "instances": self.instances,
369       "target_groups": self.target_groups,
370       }
371
372
373 class IAllocator(object):
374   """IAllocator framework.
375
376   An IAllocator instance has three sets of attributes:
377     - cfg that is needed to query the cluster
378     - input data (all members of the _KEYS class attribute are required)
379     - four buffer attributes (in|out_data|text), that represent the
380       input (to the external script) in text and data structure format,
381       and the output from it, again in two formats
382     - the result variables from the script (success, info, nodes) for
383       easy usage
384
385   """
386   # pylint: disable=R0902
387   # lots of instance attributes
388
389   def __init__(self, cfg, rpc_runner, req):
390     self.cfg = cfg
391     self.rpc = rpc_runner
392     self.req = req
393     # init buffer variables
394     self.in_text = self.out_text = self.in_data = self.out_data = None
395     # init result fields
396     self.success = self.info = self.result = None
397
398     self._BuildInputData(req)
399
400   def _ComputeClusterData(self):
401     """Compute the generic allocator input data.
402
403     This is the data that is independent of the actual operation.
404
405     """
406     cfg = self.cfg
407     cluster_info = cfg.GetClusterInfo()
408     # cluster data
409     data = {
410       "version": constants.IALLOCATOR_VERSION,
411       "cluster_name": cfg.GetClusterName(),
412       "cluster_tags": list(cluster_info.GetTags()),
413       "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
414       "ipolicy": cluster_info.ipolicy,
415       }
416     ninfo = cfg.GetAllNodesInfo()
417     iinfo = cfg.GetAllInstancesInfo().values()
418     i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
419
420     # node data
421     node_list = [n.name for n in ninfo.values() if n.vm_capable]
422
423     if isinstance(self.req, IAReqInstanceAlloc):
424       hypervisor_name = self.req.hypervisor
425       node_whitelist = self.req.node_whitelist
426     elif isinstance(self.req, IAReqRelocate):
427       hypervisor_name = cfg.GetInstanceInfo(self.req.name).hypervisor
428       node_whitelist = None
429     else:
430       hypervisor_name = cluster_info.primary_hypervisor
431       node_whitelist = None
432
433     es_flags = rpc.GetExclusiveStorageForNodeNames(cfg, node_list)
434     vg_name = cfg.GetVGName()
435     if vg_name is not None:
436       has_lvm = True
437       vg_req = [vg_name]
438     else:
439       has_lvm = False
440       vg_req = []
441     node_data = self.rpc.call_node_info(node_list, vg_req,
442                                         [hypervisor_name], es_flags)
443     node_iinfo = \
444       self.rpc.call_all_instances_info(node_list,
445                                        cluster_info.enabled_hypervisors)
446
447     data["nodegroups"] = self._ComputeNodeGroupData(cfg)
448
449     config_ndata = self._ComputeBasicNodeData(cfg, ninfo, node_whitelist)
450     data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
451                                                  i_list, config_ndata, has_lvm)
452     assert len(data["nodes"]) == len(ninfo), \
453         "Incomplete node data computed"
454
455     data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
456
457     self.in_data = data
458
459   @staticmethod
460   def _ComputeNodeGroupData(cfg):
461     """Compute node groups data.
462
463     """
464     cluster = cfg.GetClusterInfo()
465     ng = dict((guuid, {
466       "name": gdata.name,
467       "alloc_policy": gdata.alloc_policy,
468       "networks": [net_uuid for net_uuid, _ in gdata.networks.items()],
469       "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
470       "tags": list(gdata.GetTags()),
471       })
472       for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
473
474     return ng
475
476   @staticmethod
477   def _ComputeBasicNodeData(cfg, node_cfg, node_whitelist):
478     """Compute global node data.
479
480     @rtype: dict
481     @returns: a dict of name: (node dict, node config)
482
483     """
484     # fill in static (config-based) values
485     node_results = dict((ninfo.name, {
486       "tags": list(ninfo.GetTags()),
487       "primary_ip": ninfo.primary_ip,
488       "secondary_ip": ninfo.secondary_ip,
489       "offline": (ninfo.offline or
490                   not (node_whitelist is None or
491                        ninfo.name in node_whitelist)),
492       "drained": ninfo.drained,
493       "master_candidate": ninfo.master_candidate,
494       "group": ninfo.group,
495       "master_capable": ninfo.master_capable,
496       "vm_capable": ninfo.vm_capable,
497       "ndparams": cfg.GetNdParams(ninfo),
498       })
499       for ninfo in node_cfg.values())
500
501     return node_results
502
503   @staticmethod
504   def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
505                               node_results, has_lvm):
506     """Compute global node data.
507
508     @param node_results: the basic node structures as filled from the config
509
510     """
511     #TODO(dynmem): compute the right data on MAX and MIN memory
512     # make a copy of the current dict
513     node_results = dict(node_results)
514     for nname, nresult in node_data.items():
515       assert nname in node_results, "Missing basic data for node %s" % nname
516       ninfo = node_cfg[nname]
517
518       if not (ninfo.offline or ninfo.drained):
519         nresult.Raise("Can't get data for node %s" % nname)
520         node_iinfo[nname].Raise("Can't get node instance info from node %s" %
521                                 nname)
522         remote_info = rpc.MakeLegacyNodeInfo(nresult.payload,
523                                              require_vg_info=has_lvm)
524
525         def get_attr(attr):
526           if attr not in remote_info:
527             raise errors.OpExecError("Node '%s' didn't return attribute"
528                                      " '%s'" % (nname, attr))
529           value = remote_info[attr]
530           if not isinstance(value, int):
531             raise errors.OpExecError("Node '%s' returned invalid value"
532                                      " for '%s': %s" %
533                                      (nname, attr, value))
534           return value
535
536         mem_free = get_attr("memory_free")
537
538         # compute memory used by primary instances
539         i_p_mem = i_p_up_mem = 0
540         for iinfo, beinfo in i_list:
541           if iinfo.primary_node == nname:
542             i_p_mem += beinfo[constants.BE_MAXMEM]
543             if iinfo.name not in node_iinfo[nname].payload:
544               i_used_mem = 0
545             else:
546               i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
547             i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
548             mem_free -= max(0, i_mem_diff)
549
550             if iinfo.admin_state == constants.ADMINST_UP:
551               i_p_up_mem += beinfo[constants.BE_MAXMEM]
552
553         # TODO: replace this with proper storage reporting
554         if has_lvm:
555           total_disk = get_attr("vg_size")
556           free_disk = get_attr("vg_free")
557         else:
558           # we didn't even ask the node for VG status, so use zeros
559           total_disk = free_disk = 0
560
561         # compute memory used by instances
562         pnr_dyn = {
563           "total_memory": get_attr("memory_total"),
564           "reserved_memory": get_attr("memory_dom0"),
565           "free_memory": mem_free,
566           "total_disk": total_disk,
567           "free_disk": free_disk,
568           "total_cpus": get_attr("cpu_total"),
569           "i_pri_memory": i_p_mem,
570           "i_pri_up_memory": i_p_up_mem,
571           }
572         pnr_dyn.update(node_results[nname])
573         node_results[nname] = pnr_dyn
574
575     return node_results
576
577   @staticmethod
578   def _ComputeInstanceData(cluster_info, i_list):
579     """Compute global instance data.
580
581     """
582     instance_data = {}
583     for iinfo, beinfo in i_list:
584       nic_data = []
585       for nic in iinfo.nics:
586         filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
587         nic_dict = {
588           "mac": nic.mac,
589           "ip": nic.ip,
590           "mode": filled_params[constants.NIC_MODE],
591           "link": filled_params[constants.NIC_LINK],
592           }
593         if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
594           nic_dict["bridge"] = filled_params[constants.NIC_LINK]
595         nic_data.append(nic_dict)
596       pir = {
597         "tags": list(iinfo.GetTags()),
598         "admin_state": iinfo.admin_state,
599         "vcpus": beinfo[constants.BE_VCPUS],
600         "memory": beinfo[constants.BE_MAXMEM],
601         "spindle_use": beinfo[constants.BE_SPINDLE_USE],
602         "os": iinfo.os,
603         "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
604         "nics": nic_data,
605         "disks": [{constants.IDISK_SIZE: dsk.size,
606                    constants.IDISK_MODE: dsk.mode}
607                   for dsk in iinfo.disks],
608         "disk_template": iinfo.disk_template,
609         "disks_active": iinfo.disks_active,
610         "hypervisor": iinfo.hypervisor,
611         }
612       pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template,
613                                                     pir["disks"])
614       instance_data[iinfo.name] = pir
615
616     return instance_data
617
618   def _BuildInputData(self, req):
619     """Build input data structures.
620
621     """
622     self._ComputeClusterData()
623
624     request = req.GetRequest(self.cfg)
625     request["type"] = req.MODE
626     self.in_data["request"] = request
627
628     self.in_text = serializer.Dump(self.in_data)
629
630   def Run(self, name, validate=True, call_fn=None):
631     """Run an instance allocator and return the results.
632
633     """
634     if call_fn is None:
635       call_fn = self.rpc.call_iallocator_runner
636
637     result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
638     result.Raise("Failure while running the iallocator script")
639
640     self.out_text = result.payload
641     if validate:
642       self._ValidateResult()
643
644   def _ValidateResult(self):
645     """Process the allocator results.
646
647     This will process and if successful save the result in
648     self.out_data and the other parameters.
649
650     """
651     try:
652       rdict = serializer.Load(self.out_text)
653     except Exception, err:
654       raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
655
656     if not isinstance(rdict, dict):
657       raise errors.OpExecError("Can't parse iallocator results: not a dict")
658
659     # TODO: remove backwards compatiblity in later versions
660     if "nodes" in rdict and "result" not in rdict:
661       rdict["result"] = rdict["nodes"]
662       del rdict["nodes"]
663
664     for key in "success", "info", "result":
665       if key not in rdict:
666         raise errors.OpExecError("Can't parse iallocator results:"
667                                  " missing key '%s'" % key)
668       setattr(self, key, rdict[key])
669
670     self.req.ValidateResult(self, self.result)
671     self.out_data = rdict