Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / iallocator.py @ 3d7d3a12

History | View | Annotate | Download (19.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the iallocator code."""
23

    
24
from ganeti import compat
25
from ganeti import constants
26
from ganeti import errors
27
from ganeti import ht
28
from ganeti import objectutils
29
from ganeti import opcodes
30
from ganeti import rpc
31
from ganeti import serializer
32
from ganeti import utils
33

    
34
import ganeti.masterd.instance as gmi
35

    
36

    
37
_STRING_LIST = ht.TListOf(ht.TString)
38
_JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
39
   # pylint: disable=E1101
40
   # Class '...' has no 'OP_ID' member
41
   "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
42
                        opcodes.OpInstanceMigrate.OP_ID,
43
                        opcodes.OpInstanceReplaceDisks.OP_ID])
44
   })))
45

    
46
_NEVAC_MOVED = \
47
  ht.TListOf(ht.TAnd(ht.TIsLength(3),
48
                     ht.TItems([ht.TNonEmptyString,
49
                                ht.TNonEmptyString,
50
                                ht.TListOf(ht.TNonEmptyString),
51
                                ])))
52
_NEVAC_FAILED = \
53
  ht.TListOf(ht.TAnd(ht.TIsLength(2),
54
                     ht.TItems([ht.TNonEmptyString,
55
                                ht.TMaybeString,
56
                                ])))
57
_NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
58
                        ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
59

    
60
_INST_NAME = ("name", ht.TNonEmptyString)
61

    
62

    
63
class _AutoReqParam(objectutils.AutoSlots):
64
  """Meta class for request definitions.
65

66
  """
67
  @classmethod
68
  def _GetSlots(mcs, attrs):
69
    """Extract the slots out of REQ_PARAMS.
70

71
    """
72
    params = attrs.setdefault("REQ_PARAMS", [])
73
    return [slot for (slot, _) in params]
74

    
75

    
76
class IARequestBase(objectutils.ValidatedSlots):
77
  """A generic IAllocator request object.
78

79
  """
80
  __metaclass__ = _AutoReqParam
81

    
82
  MODE = NotImplemented
83
  REQ_PARAMS = []
84
  REQ_RESULT = NotImplemented
85

    
86
  def __init__(self, **kwargs):
87
    """Constructor for IARequestBase.
88

89
    The constructor takes only keyword arguments and will set
90
    attributes on this object based on the passed arguments. As such,
91
    it means that you should not pass arguments which are not in the
92
    REQ_PARAMS attribute for this class.
93

94
    """
95
    objectutils.ValidatedSlots.__init__(self, **kwargs)
96

    
97
    self.Validate()
98

    
99
  def Validate(self):
100
    """Validates all parameters of the request.
101

102
    """
103
    assert self.MODE in constants.VALID_IALLOCATOR_MODES
104

    
105
    for (param, validator) in self.REQ_PARAMS:
106
      if not hasattr(self, param):
107
        raise errors.OpPrereqError("Request is missing '%s' parameter" % param,
108
                                   errors.ECODE_INVAL)
109

    
110
      value = getattr(self, param)
111
      if not validator(value):
112
        raise errors.OpPrereqError(("Request parameter '%s' has invalid"
113
                                    " type %s/value %s") %
114
                                    (param, type(value), value),
115
                                    errors.ECODE_INVAL)
116

    
117
  def GetRequest(self, cfg):
118
    """Gets the request data dict.
119

120
    @param cfg: The configuration instance
121

122
    """
123
    raise NotImplementedError
124

    
125
  def ValidateResult(self, ia, result):
126
    """Validates the result of an request.
127

128
    @param ia: The IAllocator instance
129
    @param result: The IAllocator run result
130
    @raises ResultValidationError: If validation fails
131

132
    """
133
    if not (ia.success and self.REQ_RESULT(result)):
134
      raise errors.ResultValidationError("iallocator returned invalid result,"
135
                                         " expected %s, got %s" %
136
                                         (self.REQ_RESULT, result))
137

    
138

    
139
class IAReqInstanceAlloc(IARequestBase):
140
  """An instance allocation request.
141

142
  """
143
  # pylint: disable=E1101
144
  MODE = constants.IALLOCATOR_MODE_ALLOC
145
  REQ_PARAMS = [
146
    _INST_NAME,
147
    ("memory", ht.TPositiveInt),
148
    ("spindle_use", ht.TPositiveInt),
149
    ("disks", ht.TListOf(ht.TDict)),
150
    ("disk_template", ht.TString),
151
    ("os", ht.TString),
152
    ("tags", _STRING_LIST),
153
    ("nics", ht.TListOf(ht.TDict)),
154
    ("vcpus", ht.TInt),
155
    ("hypervisor", ht.TString),
156
    ]
157
  REQ_RESULT = ht.TList
158

    
159
  def RequiredNodes(self):
160
    """Calculates the required nodes based on the disk_template.
161

162
    """
163
    if self.disk_template in constants.DTS_INT_MIRROR:
164
      return 2
165
    else:
166
      return 1
167

    
168
  def GetRequest(self, cfg):
169
    """Requests a new instance.
170

171
    The checks for the completeness of the opcode must have already been
172
    done.
173

174
    """
175
    disk_space = gmi.ComputeDiskSize(self.disk_template, self.disks)
176

    
177
    return {
178
      "name": self.name,
179
      "disk_template": self.disk_template,
180
      "tags": self.tags,
181
      "os": self.os,
182
      "vcpus": self.vcpus,
183
      "memory": self.memory,
184
      "spindle_use": self.spindle_use,
185
      "disks": self.disks,
186
      "disk_space_total": disk_space,
187
      "nics": self.nics,
188
      "required_nodes": self.RequiredNodes(),
189
      "hypervisor": self.hypervisor,
190
      }
191

    
192
  def ValidateResult(self, ia, result):
193
    """Validates an single instance allocation request.
194

195
    """
196
    IARequestBase.ValidateResult(self, ia, result)
197

    
198
    if len(result) != self.RequiredNodes():
199
      raise errors.ResultValidationError("iallocator returned invalid number"
200
                                         " of nodes (%s), required %s" %
201
                                         (len(result), self.RequiredNodes()))
202

    
203

    
204
class IAReqMultiInstanceAlloc(IARequestBase):
205
  """An multi instance allocation request.
206

207
  """
208
  # pylint: disable=E1101
209
  MODE = constants.IALLOCATOR_MODE_MULTI_ALLOC
210
  REQ_PARAMS = [
211
    ("instances", ht.TListOf(ht.TInstanceOf(IAReqInstanceAlloc)))
212
    ]
213
  _MASUCCESS = \
214
    ht.TListOf(ht.TAnd(ht.TIsLength(2),
215
                       ht.TItems([ht.TNonEmptyString,
216
                                  ht.TListOf(ht.TNonEmptyString),
217
                                  ])))
218
  _MAFAILED = ht.TListOf(ht.TNonEmptyString)
219
  REQ_RESULT = ht.TListOf(ht.TAnd(ht.TIsLength(2),
220
                                  ht.TItems([_MASUCCESS, _MAFAILED])))
221

    
222
  def GetRequest(self, cfg):
223
    return {
224
      "instances": [iareq.GetRequest(cfg) for iareq in self.instances]
225
      }
226

    
227

    
228
class IAReqRelocate(IARequestBase):
229
  """A relocation request.
230

231
  """
232
  # pylint: disable=E1101
233
  MODE = constants.IALLOCATOR_MODE_RELOC
234
  REQ_PARAMS = [
235
    _INST_NAME,
236
    ("relocate_from", _STRING_LIST),
237
    ]
238
  REQ_RESULT = ht.TList
239

    
240
  def GetRequest(self, cfg):
241
    """Request an relocation of an instance
242

243
    The checks for the completeness of the opcode must have already been
244
    done.
245

246
    """
247
    instance = cfg.GetInstanceInfo(self.name)
248
    if instance is None:
249
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
250
                                   " IAllocator" % self.name)
251

    
252
    if instance.disk_template not in constants.DTS_MIRRORED:
253
      raise errors.OpPrereqError("Can't relocate non-mirrored instances",
254
                                 errors.ECODE_INVAL)
255

    
256
    if instance.disk_template in constants.DTS_INT_MIRROR and \
257
        len(instance.secondary_nodes) != 1:
258
      raise errors.OpPrereqError("Instance has not exactly one secondary node",
259
                                 errors.ECODE_STATE)
260

    
261
    disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
262
    disk_space = gmi.ComputeDiskSize(instance.disk_template, disk_sizes)
263

    
264
    return {
265
      "name": self.name,
266
      "disk_space_total": disk_space,
267
      "required_nodes": 1,
268
      "relocate_from": self.relocate_from,
269
      }
270

    
271
  def ValidateResult(self, ia, result):
272
    """Validates the result of an relocation request.
273

274
    """
275
    IARequestBase.ValidateResult(self, ia, result)
276

    
277
    node2group = dict((name, ndata["group"])
278
                      for (name, ndata) in ia.in_data["nodes"].items())
279

    
280
    fn = compat.partial(self._NodesToGroups, node2group,
281
                        ia.in_data["nodegroups"])
282

    
283
    instance = ia.cfg.GetInstanceInfo(self.name)
284
    request_groups = fn(self.relocate_from + [instance.primary_node])
285
    result_groups = fn(result + [instance.primary_node])
286

    
287
    if ia.success and not set(result_groups).issubset(request_groups):
288
      raise errors.ResultValidationError("Groups of nodes returned by"
289
                                         "iallocator (%s) differ from original"
290
                                         " groups (%s)" %
291
                                         (utils.CommaJoin(result_groups),
292
                                          utils.CommaJoin(request_groups)))
293

    
294
  @staticmethod
295
  def _NodesToGroups(node2group, groups, nodes):
296
    """Returns a list of unique group names for a list of nodes.
297

298
    @type node2group: dict
299
    @param node2group: Map from node name to group UUID
300
    @type groups: dict
301
    @param groups: Group information
302
    @type nodes: list
303
    @param nodes: Node names
304

305
    """
306
    result = set()
307

    
308
    for node in nodes:
309
      try:
310
        group_uuid = node2group[node]
311
      except KeyError:
312
        # Ignore unknown node
313
        pass
314
      else:
315
        try:
316
          group = groups[group_uuid]
317
        except KeyError:
318
          # Can't find group, let's use UUID
319
          group_name = group_uuid
320
        else:
321
          group_name = group["name"]
322

    
323
        result.add(group_name)
324

    
325
    return sorted(result)
326

    
327

    
328
class IAReqNodeEvac(IARequestBase):
329
  """A node evacuation request.
330

331
  """
332
  # pylint: disable=E1101
333
  MODE = constants.IALLOCATOR_MODE_NODE_EVAC
334
  REQ_PARAMS = [
335
    ("instances", _STRING_LIST),
336
    ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
337
    ]
338
  REQ_RESULT = _NEVAC_RESULT
339

    
340
  def GetRequest(self, cfg):
341
    """Get data for node-evacuate requests.
342

343
    """
344
    return {
345
      "instances": self.instances,
346
      "evac_mode": self.evac_mode,
347
      }
348

    
349

    
350
class IAReqGroupChange(IARequestBase):
351
  """A group change request.
352

353
  """
354
  # pylint: disable=E1101
355
  MODE = constants.IALLOCATOR_MODE_CHG_GROUP
356
  REQ_PARAMS = [
357
    ("instances", _STRING_LIST),
358
    ("target_groups", _STRING_LIST),
359
    ]
360
  REQ_RESULT = _NEVAC_RESULT
361

    
362
  def GetRequest(self, cfg):
363
    """Get data for node-evacuate requests.
364

365
    """
366
    return {
367
      "instances": self.instances,
368
      "target_groups": self.target_groups,
369
      }
370

    
371

    
372
class IAllocator(object):
373
  """IAllocator framework.
374

375
  An IAllocator instance has three sets of attributes:
376
    - cfg that is needed to query the cluster
377
    - input data (all members of the _KEYS class attribute are required)
378
    - four buffer attributes (in|out_data|text), that represent the
379
      input (to the external script) in text and data structure format,
380
      and the output from it, again in two formats
381
    - the result variables from the script (success, info, nodes) for
382
      easy usage
383

384
  """
385
  # pylint: disable=R0902
386
  # lots of instance attributes
387

    
388
  def __init__(self, cfg, rpc_runner, req):
389
    self.cfg = cfg
390
    self.rpc = rpc_runner
391
    self.req = req
392
    # init buffer variables
393
    self.in_text = self.out_text = self.in_data = self.out_data = None
394
    # init result fields
395
    self.success = self.info = self.result = None
396

    
397
    self._BuildInputData(req)
398

    
399
  def _ComputeClusterData(self):
400
    """Compute the generic allocator input data.
401

402
    This is the data that is independent of the actual operation.
403

404
    """
405
    cfg = self.cfg
406
    cluster_info = cfg.GetClusterInfo()
407
    # cluster data
408
    data = {
409
      "version": constants.IALLOCATOR_VERSION,
410
      "cluster_name": cfg.GetClusterName(),
411
      "cluster_tags": list(cluster_info.GetTags()),
412
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
413
      "ipolicy": cluster_info.ipolicy,
414
      }
415
    ninfo = cfg.GetAllNodesInfo()
416
    iinfo = cfg.GetAllInstancesInfo().values()
417
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
418

    
419
    # node data
420
    node_list = [n.name for n in ninfo.values() if n.vm_capable]
421

    
422
    if isinstance(self.req, IAReqInstanceAlloc):
423
      hypervisor_name = self.req.hypervisor
424
    elif isinstance(self.req, IAReqRelocate):
425
      hypervisor_name = cfg.GetInstanceInfo(self.req.name).hypervisor
426
    else:
427
      hypervisor_name = cluster_info.primary_hypervisor
428

    
429
    node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
430
                                        [hypervisor_name])
431
    node_iinfo = \
432
      self.rpc.call_all_instances_info(node_list,
433
                                       cluster_info.enabled_hypervisors)
434

    
435
    data["nodegroups"] = self._ComputeNodeGroupData(cfg)
436

    
437
    config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
438
    data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
439
                                                 i_list, config_ndata)
440
    assert len(data["nodes"]) == len(ninfo), \
441
        "Incomplete node data computed"
442

    
443
    data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
444

    
445
    self.in_data = data
446

    
447
  @staticmethod
448
  def _ComputeNodeGroupData(cfg):
449
    """Compute node groups data.
450

451
    """
452
    cluster = cfg.GetClusterInfo()
453
    ng = dict((guuid, {
454
      "name": gdata.name,
455
      "alloc_policy": gdata.alloc_policy,
456
      "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
457
      })
458
      for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
459

    
460
    return ng
461

    
462
  @staticmethod
463
  def _ComputeBasicNodeData(cfg, node_cfg):
464
    """Compute global node data.
465

466
    @rtype: dict
467
    @returns: a dict of name: (node dict, node config)
468

469
    """
470
    # fill in static (config-based) values
471
    node_results = dict((ninfo.name, {
472
      "tags": list(ninfo.GetTags()),
473
      "primary_ip": ninfo.primary_ip,
474
      "secondary_ip": ninfo.secondary_ip,
475
      "offline": ninfo.offline,
476
      "drained": ninfo.drained,
477
      "master_candidate": ninfo.master_candidate,
478
      "group": ninfo.group,
479
      "master_capable": ninfo.master_capable,
480
      "vm_capable": ninfo.vm_capable,
481
      "ndparams": cfg.GetNdParams(ninfo),
482
      })
483
      for ninfo in node_cfg.values())
484

    
485
    return node_results
486

    
487
  @staticmethod
488
  def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
489
                              node_results):
490
    """Compute global node data.
491

492
    @param node_results: the basic node structures as filled from the config
493

494
    """
495
    #TODO(dynmem): compute the right data on MAX and MIN memory
496
    # make a copy of the current dict
497
    node_results = dict(node_results)
498
    for nname, nresult in node_data.items():
499
      assert nname in node_results, "Missing basic data for node %s" % nname
500
      ninfo = node_cfg[nname]
501

    
502
      if not (ninfo.offline or ninfo.drained):
503
        nresult.Raise("Can't get data for node %s" % nname)
504
        node_iinfo[nname].Raise("Can't get node instance info from node %s" %
505
                                nname)
506
        remote_info = rpc.MakeLegacyNodeInfo(nresult.payload)
507

    
508
        for attr in ["memory_total", "memory_free", "memory_dom0",
509
                     "vg_size", "vg_free", "cpu_total"]:
510
          if attr not in remote_info:
511
            raise errors.OpExecError("Node '%s' didn't return attribute"
512
                                     " '%s'" % (nname, attr))
513
          if not isinstance(remote_info[attr], int):
514
            raise errors.OpExecError("Node '%s' returned invalid value"
515
                                     " for '%s': %s" %
516
                                     (nname, attr, remote_info[attr]))
517
        # compute memory used by primary instances
518
        i_p_mem = i_p_up_mem = 0
519
        for iinfo, beinfo in i_list:
520
          if iinfo.primary_node == nname:
521
            i_p_mem += beinfo[constants.BE_MAXMEM]
522
            if iinfo.name not in node_iinfo[nname].payload:
523
              i_used_mem = 0
524
            else:
525
              i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
526
            i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
527
            remote_info["memory_free"] -= max(0, i_mem_diff)
528

    
529
            if iinfo.admin_state == constants.ADMINST_UP:
530
              i_p_up_mem += beinfo[constants.BE_MAXMEM]
531

    
532
        # compute memory used by instances
533
        pnr_dyn = {
534
          "total_memory": remote_info["memory_total"],
535
          "reserved_memory": remote_info["memory_dom0"],
536
          "free_memory": remote_info["memory_free"],
537
          "total_disk": remote_info["vg_size"],
538
          "free_disk": remote_info["vg_free"],
539
          "total_cpus": remote_info["cpu_total"],
540
          "i_pri_memory": i_p_mem,
541
          "i_pri_up_memory": i_p_up_mem,
542
          }
543
        pnr_dyn.update(node_results[nname])
544
        node_results[nname] = pnr_dyn
545

    
546
    return node_results
547

    
548
  @staticmethod
549
  def _ComputeInstanceData(cluster_info, i_list):
550
    """Compute global instance data.
551

552
    """
553
    instance_data = {}
554
    for iinfo, beinfo in i_list:
555
      nic_data = []
556
      for nic in iinfo.nics:
557
        filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
558
        nic_dict = {
559
          "mac": nic.mac,
560
          "ip": nic.ip,
561
          "mode": filled_params[constants.NIC_MODE],
562
          "link": filled_params[constants.NIC_LINK],
563
          }
564
        if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
565
          nic_dict["bridge"] = filled_params[constants.NIC_LINK]
566
        nic_data.append(nic_dict)
567
      pir = {
568
        "tags": list(iinfo.GetTags()),
569
        "admin_state": iinfo.admin_state,
570
        "vcpus": beinfo[constants.BE_VCPUS],
571
        "memory": beinfo[constants.BE_MAXMEM],
572
        "spindle_use": beinfo[constants.BE_SPINDLE_USE],
573
        "os": iinfo.os,
574
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
575
        "nics": nic_data,
576
        "disks": [{constants.IDISK_SIZE: dsk.size,
577
                   constants.IDISK_MODE: dsk.mode}
578
                  for dsk in iinfo.disks],
579
        "disk_template": iinfo.disk_template,
580
        "hypervisor": iinfo.hypervisor,
581
        }
582
      pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template,
583
                                                    pir["disks"])
584
      instance_data[iinfo.name] = pir
585

    
586
    return instance_data
587

    
588
  def _BuildInputData(self, req):
589
    """Build input data structures.
590

591
    """
592
    self._ComputeClusterData()
593

    
594
    request = req.GetRequest(self.cfg)
595
    request["type"] = req.MODE
596
    self.in_data["request"] = request
597

    
598
    self.in_text = serializer.Dump(self.in_data)
599

    
600
  def Run(self, name, validate=True, call_fn=None):
601
    """Run an instance allocator and return the results.
602

603
    """
604
    if call_fn is None:
605
      call_fn = self.rpc.call_iallocator_runner
606

    
607
    result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
608
    result.Raise("Failure while running the iallocator script")
609

    
610
    self.out_text = result.payload
611
    if validate:
612
      self._ValidateResult()
613

    
614
  def _ValidateResult(self):
615
    """Process the allocator results.
616

617
    This will process and if successful save the result in
618
    self.out_data and the other parameters.
619

620
    """
621
    try:
622
      rdict = serializer.Load(self.out_text)
623
    except Exception, err:
624
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
625

    
626
    if not isinstance(rdict, dict):
627
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
628

    
629
    # TODO: remove backwards compatiblity in later versions
630
    if "nodes" in rdict and "result" not in rdict:
631
      rdict["result"] = rdict["nodes"]
632
      del rdict["nodes"]
633

    
634
    for key in "success", "info", "result":
635
      if key not in rdict:
636
        raise errors.OpExecError("Can't parse iallocator results:"
637
                                 " missing key '%s'" % key)
638
      setattr(self, key, rdict[key])
639

    
640
    self.req.ValidateResult(self, self.result)
641
    self.out_data = rdict