Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / iallocator.py @ 776b6291

History | View | Annotate | Download (19.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the iallocator code."""
23

    
24
from ganeti import compat
25
from ganeti import constants
26
from ganeti import errors
27
from ganeti import ht
28
from ganeti import objectutils
29
from ganeti import opcodes
30
from ganeti import rpc
31
from ganeti import serializer
32
from ganeti import utils
33

    
34
import ganeti.masterd.instance as gmi
35

    
36

    
37
_STRING_LIST = ht.TListOf(ht.TString)
38
_JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
39
   # pylint: disable=E1101
40
   # Class '...' has no 'OP_ID' member
41
   "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
42
                        opcodes.OpInstanceMigrate.OP_ID,
43
                        opcodes.OpInstanceReplaceDisks.OP_ID])
44
   })))
45

    
46
_NEVAC_MOVED = \
47
  ht.TListOf(ht.TAnd(ht.TIsLength(3),
48
                     ht.TItems([ht.TNonEmptyString,
49
                                ht.TNonEmptyString,
50
                                ht.TListOf(ht.TNonEmptyString),
51
                                ])))
52
_NEVAC_FAILED = \
53
  ht.TListOf(ht.TAnd(ht.TIsLength(2),
54
                     ht.TItems([ht.TNonEmptyString,
55
                                ht.TMaybeString,
56
                                ])))
57
_NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
58
                        ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
59

    
60

    
61
class _AutoReqParam(objectutils.AutoSlots):
62
  """Meta class for request definitions.
63

64
  """
65
  @classmethod
66
  def _GetSlots(mcs, attrs):
67
    """Extract the slots out of REQ_PARAMS.
68

69
    """
70
    params = attrs.setdefault("REQ_PARAMS", [])
71
    return [slot for (slot, _) in params]
72

    
73

    
74
class IARequestBase(objectutils.ValidatedSlots):
75
  """A generic IAllocator request object.
76

77
  """
78
  __metaclass__ = _AutoReqParam
79

    
80
  MODE = NotImplemented
81
  REQ_PARAMS = []
82
  REQ_RESULT = NotImplemented
83

    
84
  def __init__(self, **kwargs):
85
    """Constructor for IARequestBase.
86

87
    The constructor takes only keyword arguments and will set
88
    attributes on this object based on the passed arguments. As such,
89
    it means that you should not pass arguments which are not in the
90
    REQ_PARAMS attribute for this class.
91

92
    """
93
    objectutils.ValidatedSlots.__init__(self, **kwargs)
94

    
95
    self.Validate()
96

    
97
  def Validate(self):
98
    """Validates all parameters of the request.
99

100
    """
101
    assert self.MODE in constants.VALID_IALLOCATOR_MODES
102

    
103
    for (param, validator) in self.REQ_PARAMS:
104
      if not hasattr(self, param):
105
        raise errors.OpPrereqError("Request is missing '%s' parameter" % param,
106
                                   errors.ECODE_INVAL)
107

    
108
      value = getattr(self, param)
109
      if not validator(value):
110
        raise errors.OpPrereqError(("Request parameter '%s' has invalid"
111
                                    " type %s/value %s") %
112
                                    (param, type(value), value),
113
                                    errors.ECODE_INVAL)
114

    
115
  def GetRequest(self, cfg):
116
    """Gets the request data dict.
117

118
    @param cfg: The configuration instance
119

120
    """
121
    raise NotImplementedError
122

    
123
  def ValidateResult(self, ia, result):
124
    """Validates the result of an request.
125

126
    @param ia: The IAllocator instance
127
    @param result: The IAllocator run result
128
    @raises ResultValidationError: If validation fails
129

130
    """
131
    if not (ia.success and self.REQ_RESULT(result)):
132
      raise errors.ResultValidationError("iallocator returned invalid result,"
133
                                         " expected %s, got %s" %
134
                                         (self.REQ_RESULT, result))
135

    
136

    
137
class IAReqInstanceAlloc(IARequestBase):
138
  """An instance allocation request.
139

140
  """
141
  # pylint: disable=E1101
142
  MODE = constants.IALLOCATOR_MODE_ALLOC
143
  REQ_PARAMS = [
144
    ("name", ht.TString),
145
    ("memory", ht.TInt),
146
    ("spindle_use", ht.TInt),
147
    ("disks", ht.TListOf(ht.TDict)),
148
    ("disk_template", ht.TString),
149
    ("os", ht.TString),
150
    ("tags", _STRING_LIST),
151
    ("nics", ht.TListOf(ht.TDict)),
152
    ("vcpus", ht.TInt),
153
    ("hypervisor", ht.TString),
154
    ]
155
  REQ_RESULT = ht.TList
156

    
157
  def RequiredNodes(self):
158
    """Calculates the required nodes based on the disk_template.
159

160
    """
161
    if self.disk_template in constants.DTS_INT_MIRROR:
162
      return 2
163
    else:
164
      return 1
165

    
166
  def GetRequest(self, cfg):
167
    """Requests a new instance.
168

169
    The checks for the completeness of the opcode must have already been
170
    done.
171

172
    """
173
    disk_space = gmi.ComputeDiskSize(self.disk_template, self.disks)
174

    
175
    return {
176
      "name": self.name,
177
      "disk_template": self.disk_template,
178
      "tags": self.tags,
179
      "os": self.os,
180
      "vcpus": self.vcpus,
181
      "memory": self.memory,
182
      "spindle_use": self.spindle_use,
183
      "disks": self.disks,
184
      "disk_space_total": disk_space,
185
      "nics": self.nics,
186
      "required_nodes": self.RequiredNodes(),
187
      "hypervisor": self.hypervisor,
188
      }
189

    
190
  def ValidateResult(self, ia, result):
191
    """Validates an single instance allocation request.
192

193
    """
194
    IARequestBase.ValidateResult(self, ia, result)
195

    
196
    if len(result) != self.RequiredNodes():
197
      raise errors.ResultValidationError("iallocator returned invalid number"
198
                                         " of nodes (%s), required %s" %
199
                                         (len(result), self.RequiredNodes()))
200

    
201

    
202
class IAReqMultiInstanceAlloc(IARequestBase):
203
  """An multi instance allocation request.
204

205
  """
206
  # pylint: disable=E1101
207
  MODE = constants.IALLOCATOR_MODE_MULTI_ALLOC
208
  REQ_PARAMS = [
209
    ("instances", ht.TListOf(ht.TInstanceOf(IAReqInstanceAlloc)))
210
    ]
211
  _MASUCCESS = \
212
    ht.TListOf(ht.TAnd(ht.TIsLength(2),
213
                       ht.TItems([ht.TNonEmptyString,
214
                                  ht.TListOf(ht.TNonEmptyString),
215
                                  ])))
216
  _MAFAILED = ht.TListOf(ht.TNonEmptyString)
217
  REQ_RESULT = ht.TListOf(ht.TAnd(ht.TIsLength(2),
218
                                  ht.TItems([_MASUCCESS, _MAFAILED])))
219

    
220
  def GetRequest(self, cfg):
221
    return {
222
      "instances": [iareq.GetRequest(cfg) for iareq in self.instances]
223
      }
224

    
225

    
226
class IAReqRelocate(IARequestBase):
227
  """A relocation request.
228

229
  """
230
  # pylint: disable=E1101
231
  MODE = constants.IALLOCATOR_MODE_RELOC
232
  REQ_PARAMS = [
233
    ("name", ht.TString),
234
    ("relocate_from", _STRING_LIST),
235
    ]
236
  REQ_RESULT = ht.TList
237

    
238
  def GetRequest(self, cfg):
239
    """Request an relocation of an instance
240

241
    The checks for the completeness of the opcode must have already been
242
    done.
243

244
    """
245
    instance = cfg.GetInstanceInfo(self.name)
246
    if instance is None:
247
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
248
                                   " IAllocator" % self.name)
249

    
250
    if instance.disk_template not in constants.DTS_MIRRORED:
251
      raise errors.OpPrereqError("Can't relocate non-mirrored instances",
252
                                 errors.ECODE_INVAL)
253

    
254
    if instance.disk_template in constants.DTS_INT_MIRROR and \
255
        len(instance.secondary_nodes) != 1:
256
      raise errors.OpPrereqError("Instance has not exactly one secondary node",
257
                                 errors.ECODE_STATE)
258

    
259
    disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
260
    disk_space = gmi.ComputeDiskSize(instance.disk_template, disk_sizes)
261

    
262
    return {
263
      "name": self.name,
264
      "disk_space_total": disk_space,
265
      "required_nodes": 1,
266
      "relocate_from": self.relocate_from,
267
      }
268

    
269
  def ValidateResult(self, ia, result):
270
    """Validates the result of an relocation request.
271

272
    """
273
    IARequestBase.ValidateResult(self, ia, result)
274

    
275
    node2group = dict((name, ndata["group"])
276
                      for (name, ndata) in ia.in_data["nodes"].items())
277

    
278
    fn = compat.partial(self._NodesToGroups, node2group,
279
                        ia.in_data["nodegroups"])
280

    
281
    instance = ia.cfg.GetInstanceInfo(self.name)
282
    request_groups = fn(self.relocate_from + [instance.primary_node])
283
    result_groups = fn(result + [instance.primary_node])
284

    
285
    if ia.success and not set(result_groups).issubset(request_groups):
286
      raise errors.ResultValidationError("Groups of nodes returned by"
287
                                         "iallocator (%s) differ from original"
288
                                         " groups (%s)" %
289
                                         (utils.CommaJoin(result_groups),
290
                                          utils.CommaJoin(request_groups)))
291

    
292
  @staticmethod
293
  def _NodesToGroups(node2group, groups, nodes):
294
    """Returns a list of unique group names for a list of nodes.
295

296
    @type node2group: dict
297
    @param node2group: Map from node name to group UUID
298
    @type groups: dict
299
    @param groups: Group information
300
    @type nodes: list
301
    @param nodes: Node names
302

303
    """
304
    result = set()
305

    
306
    for node in nodes:
307
      try:
308
        group_uuid = node2group[node]
309
      except KeyError:
310
        # Ignore unknown node
311
        pass
312
      else:
313
        try:
314
          group = groups[group_uuid]
315
        except KeyError:
316
          # Can't find group, let's use UUID
317
          group_name = group_uuid
318
        else:
319
          group_name = group["name"]
320

    
321
        result.add(group_name)
322

    
323
    return sorted(result)
324

    
325

    
326
class IAReqNodeEvac(IARequestBase):
327
  """A node evacuation request.
328

329
  """
330
  # pylint: disable=E1101
331
  MODE = constants.IALLOCATOR_MODE_NODE_EVAC
332
  REQ_PARAMS = [
333
    ("instances", _STRING_LIST),
334
    ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
335
    ]
336
  REQ_RESULT = _NEVAC_RESULT
337

    
338
  def GetRequest(self, cfg):
339
    """Get data for node-evacuate requests.
340

341
    """
342
    return {
343
      "instances": self.instances,
344
      "evac_mode": self.evac_mode,
345
      }
346

    
347

    
348
class IAReqGroupChange(IARequestBase):
349
  """A group change request.
350

351
  """
352
  # pylint: disable=E1101
353
  MODE = constants.IALLOCATOR_MODE_CHG_GROUP
354
  REQ_PARAMS = [
355
    ("instances", _STRING_LIST),
356
    ("target_groups", _STRING_LIST),
357
    ]
358
  REQ_RESULT = _NEVAC_RESULT
359

    
360
  def GetRequest(self, cfg):
361
    """Get data for node-evacuate requests.
362

363
    """
364
    return {
365
      "instances": self.instances,
366
      "target_groups": self.target_groups,
367
      }
368

    
369

    
370
class IAllocator(object):
371
  """IAllocator framework.
372

373
  An IAllocator instance has three sets of attributes:
374
    - cfg that is needed to query the cluster
375
    - input data (all members of the _KEYS class attribute are required)
376
    - four buffer attributes (in|out_data|text), that represent the
377
      input (to the external script) in text and data structure format,
378
      and the output from it, again in two formats
379
    - the result variables from the script (success, info, nodes) for
380
      easy usage
381

382
  """
383
  # pylint: disable=R0902
384
  # lots of instance attributes
385

    
386
  def __init__(self, cfg, rpc_runner, req):
387
    self.cfg = cfg
388
    self.rpc = rpc_runner
389
    self.req = req
390
    # init buffer variables
391
    self.in_text = self.out_text = self.in_data = self.out_data = None
392
    # init result fields
393
    self.success = self.info = self.result = None
394

    
395
    self._BuildInputData(req)
396

    
397
  def _ComputeClusterData(self):
398
    """Compute the generic allocator input data.
399

400
    This is the data that is independent of the actual operation.
401

402
    """
403
    cfg = self.cfg
404
    cluster_info = cfg.GetClusterInfo()
405
    # cluster data
406
    data = {
407
      "version": constants.IALLOCATOR_VERSION,
408
      "cluster_name": cfg.GetClusterName(),
409
      "cluster_tags": list(cluster_info.GetTags()),
410
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
411
      "ipolicy": cluster_info.ipolicy,
412
      }
413
    ninfo = cfg.GetAllNodesInfo()
414
    iinfo = cfg.GetAllInstancesInfo().values()
415
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
416

    
417
    # node data
418
    node_list = [n.name for n in ninfo.values() if n.vm_capable]
419

    
420
    if isinstance(self.req, IAReqInstanceAlloc):
421
      hypervisor_name = self.req.hypervisor
422
    elif isinstance(self.req, IAReqRelocate):
423
      hypervisor_name = cfg.GetInstanceInfo(self.req.name).hypervisor
424
    else:
425
      hypervisor_name = cluster_info.primary_hypervisor
426

    
427
    node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
428
                                        [hypervisor_name])
429
    node_iinfo = \
430
      self.rpc.call_all_instances_info(node_list,
431
                                       cluster_info.enabled_hypervisors)
432

    
433
    data["nodegroups"] = self._ComputeNodeGroupData(cfg)
434

    
435
    config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
436
    data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
437
                                                 i_list, config_ndata)
438
    assert len(data["nodes"]) == len(ninfo), \
439
        "Incomplete node data computed"
440

    
441
    data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
442

    
443
    self.in_data = data
444

    
445
  @staticmethod
446
  def _ComputeNodeGroupData(cfg):
447
    """Compute node groups data.
448

449
    """
450
    cluster = cfg.GetClusterInfo()
451
    ng = dict((guuid, {
452
      "name": gdata.name,
453
      "alloc_policy": gdata.alloc_policy,
454
      "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
455
      })
456
      for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
457

    
458
    return ng
459

    
460
  @staticmethod
461
  def _ComputeBasicNodeData(cfg, node_cfg):
462
    """Compute global node data.
463

464
    @rtype: dict
465
    @returns: a dict of name: (node dict, node config)
466

467
    """
468
    # fill in static (config-based) values
469
    node_results = dict((ninfo.name, {
470
      "tags": list(ninfo.GetTags()),
471
      "primary_ip": ninfo.primary_ip,
472
      "secondary_ip": ninfo.secondary_ip,
473
      "offline": ninfo.offline,
474
      "drained": ninfo.drained,
475
      "master_candidate": ninfo.master_candidate,
476
      "group": ninfo.group,
477
      "master_capable": ninfo.master_capable,
478
      "vm_capable": ninfo.vm_capable,
479
      "ndparams": cfg.GetNdParams(ninfo),
480
      })
481
      for ninfo in node_cfg.values())
482

    
483
    return node_results
484

    
485
  @staticmethod
486
  def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
487
                              node_results):
488
    """Compute global node data.
489

490
    @param node_results: the basic node structures as filled from the config
491

492
    """
493
    #TODO(dynmem): compute the right data on MAX and MIN memory
494
    # make a copy of the current dict
495
    node_results = dict(node_results)
496
    for nname, nresult in node_data.items():
497
      assert nname in node_results, "Missing basic data for node %s" % nname
498
      ninfo = node_cfg[nname]
499

    
500
      if not (ninfo.offline or ninfo.drained):
501
        nresult.Raise("Can't get data for node %s" % nname)
502
        node_iinfo[nname].Raise("Can't get node instance info from node %s" %
503
                                nname)
504
        remote_info = rpc.MakeLegacyNodeInfo(nresult.payload)
505

    
506
        for attr in ["memory_total", "memory_free", "memory_dom0",
507
                     "vg_size", "vg_free", "cpu_total"]:
508
          if attr not in remote_info:
509
            raise errors.OpExecError("Node '%s' didn't return attribute"
510
                                     " '%s'" % (nname, attr))
511
          if not isinstance(remote_info[attr], int):
512
            raise errors.OpExecError("Node '%s' returned invalid value"
513
                                     " for '%s': %s" %
514
                                     (nname, attr, remote_info[attr]))
515
        # compute memory used by primary instances
516
        i_p_mem = i_p_up_mem = 0
517
        for iinfo, beinfo in i_list:
518
          if iinfo.primary_node == nname:
519
            i_p_mem += beinfo[constants.BE_MAXMEM]
520
            if iinfo.name not in node_iinfo[nname].payload:
521
              i_used_mem = 0
522
            else:
523
              i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
524
            i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
525
            remote_info["memory_free"] -= max(0, i_mem_diff)
526

    
527
            if iinfo.admin_state == constants.ADMINST_UP:
528
              i_p_up_mem += beinfo[constants.BE_MAXMEM]
529

    
530
        # compute memory used by instances
531
        pnr_dyn = {
532
          "total_memory": remote_info["memory_total"],
533
          "reserved_memory": remote_info["memory_dom0"],
534
          "free_memory": remote_info["memory_free"],
535
          "total_disk": remote_info["vg_size"],
536
          "free_disk": remote_info["vg_free"],
537
          "total_cpus": remote_info["cpu_total"],
538
          "i_pri_memory": i_p_mem,
539
          "i_pri_up_memory": i_p_up_mem,
540
          }
541
        pnr_dyn.update(node_results[nname])
542
        node_results[nname] = pnr_dyn
543

    
544
    return node_results
545

    
546
  @staticmethod
547
  def _ComputeInstanceData(cluster_info, i_list):
548
    """Compute global instance data.
549

550
    """
551
    instance_data = {}
552
    for iinfo, beinfo in i_list:
553
      nic_data = []
554
      for nic in iinfo.nics:
555
        filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
556
        nic_dict = {
557
          "mac": nic.mac,
558
          "ip": nic.ip,
559
          "mode": filled_params[constants.NIC_MODE],
560
          "link": filled_params[constants.NIC_LINK],
561
          }
562
        if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
563
          nic_dict["bridge"] = filled_params[constants.NIC_LINK]
564
        nic_data.append(nic_dict)
565
      pir = {
566
        "tags": list(iinfo.GetTags()),
567
        "admin_state": iinfo.admin_state,
568
        "vcpus": beinfo[constants.BE_VCPUS],
569
        "memory": beinfo[constants.BE_MAXMEM],
570
        "spindle_use": beinfo[constants.BE_SPINDLE_USE],
571
        "os": iinfo.os,
572
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
573
        "nics": nic_data,
574
        "disks": [{constants.IDISK_SIZE: dsk.size,
575
                   constants.IDISK_MODE: dsk.mode}
576
                  for dsk in iinfo.disks],
577
        "disk_template": iinfo.disk_template,
578
        "hypervisor": iinfo.hypervisor,
579
        }
580
      pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template,
581
                                                    pir["disks"])
582
      instance_data[iinfo.name] = pir
583

    
584
    return instance_data
585

    
586
  def _BuildInputData(self, req):
587
    """Build input data structures.
588

589
    """
590
    self._ComputeClusterData()
591

    
592
    request = req.GetRequest(self.cfg)
593
    request["type"] = req.MODE
594
    self.in_data["request"] = request
595

    
596
    self.in_text = serializer.Dump(self.in_data)
597

    
598
  def Run(self, name, validate=True, call_fn=None):
599
    """Run an instance allocator and return the results.
600

601
    """
602
    if call_fn is None:
603
      call_fn = self.rpc.call_iallocator_runner
604

    
605
    result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
606
    result.Raise("Failure while running the iallocator script")
607

    
608
    self.out_text = result.payload
609
    if validate:
610
      self._ValidateResult()
611

    
612
  def _ValidateResult(self):
613
    """Process the allocator results.
614

615
    This will process and if successful save the result in
616
    self.out_data and the other parameters.
617

618
    """
619
    try:
620
      rdict = serializer.Load(self.out_text)
621
    except Exception, err:
622
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
623

    
624
    if not isinstance(rdict, dict):
625
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
626

    
627
    # TODO: remove backwards compatiblity in later versions
628
    if "nodes" in rdict and "result" not in rdict:
629
      rdict["result"] = rdict["nodes"]
630
      del rdict["nodes"]
631

    
632
    for key in "success", "info", "result":
633
      if key not in rdict:
634
        raise errors.OpExecError("Can't parse iallocator results:"
635
                                 " missing key '%s'" % key)
636
      setattr(self, key, rdict[key])
637

    
638
    self.req.ValidateResult(self, self.result)
639
    self.out_data = rdict