Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / iallocator.py @ c269efc3

History | View | Annotate | Download (19.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the iallocator code."""
23

    
24
from ganeti import compat
25
from ganeti import constants
26
from ganeti import errors
27
from ganeti import ht
28
from ganeti import objectutils
29
from ganeti import opcodes
30
from ganeti import rpc
31
from ganeti import serializer
32
from ganeti import utils
33

    
34
import ganeti.masterd.instance as gmi
35

    
36

    
37
_STRING_LIST = ht.TListOf(ht.TString)
38
_JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
39
   # pylint: disable=E1101
40
   # Class '...' has no 'OP_ID' member
41
   "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
42
                        opcodes.OpInstanceMigrate.OP_ID,
43
                        opcodes.OpInstanceReplaceDisks.OP_ID])
44
   })))
45

    
46
_NEVAC_MOVED = \
47
  ht.TListOf(ht.TAnd(ht.TIsLength(3),
48
                     ht.TItems([ht.TNonEmptyString,
49
                                ht.TNonEmptyString,
50
                                ht.TListOf(ht.TNonEmptyString),
51
                                ])))
52
_NEVAC_FAILED = \
53
  ht.TListOf(ht.TAnd(ht.TIsLength(2),
54
                     ht.TItems([ht.TNonEmptyString,
55
                                ht.TMaybeString,
56
                                ])))
57
_NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
58
                        ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
59

    
60

    
61
class _AutoReqParam(objectutils.AutoSlots):
62
  """Meta class for request definitions.
63

64
  """
65
  @classmethod
66
  def _GetSlots(mcs, attrs):
67
    """Extract the slots out of REQ_PARAMS.
68

69
    """
70
    params = attrs.setdefault("REQ_PARAMS", [])
71
    return [slot for (slot, _) in params]
72

    
73

    
74
class IARequestBase(objectutils.ValidatedSlots):
75
  """A generic IAllocator request object.
76

77
  """
78
  __metaclass__ = _AutoReqParam
79

    
80
  MODE = NotImplemented
81
  REQ_PARAMS = [
82
    ("required_nodes", ht.TPositiveInt)
83
    ]
84
  REQ_RESULT = NotImplemented
85

    
86
  def __init__(self, **kwargs):
87
    """Constructor for IARequestBase.
88

89
    The constructor takes only keyword arguments and will set
90
    attributes on this object based on the passed arguments. As such,
91
    it means that you should not pass arguments which are not in the
92
    REQ_PARAMS attribute for this class.
93

94
    """
95
    self.required_nodes = 0
96
    objectutils.ValidatedSlots.__init__(self, **kwargs)
97

    
98
    self.Validate()
99

    
100
  def Validate(self):
101
    """Validates all parameters of the request.
102

103
    """
104
    assert self.MODE in constants.VALID_IALLOCATOR_MODES
105

    
106
    for (param, validator) in self.REQ_PARAMS:
107
      if not hasattr(self, param):
108
        raise errors.OpPrereqError("Request is missing '%s' parameter" % param,
109
                                   errors.ECODE_INVAL)
110

    
111
      value = getattr(self, param)
112
      if not validator(value):
113
        raise errors.OpPrereqError(("Request parameter '%s' has invalid"
114
                                    " type %s/value %s") %
115
                                    (param, type(value), value),
116
                                    errors.ECODE_INVAL)
117

    
118
  def GetRequest(self, cfg):
119
    """Gets the request data dict.
120

121
    @param cfg: The configuration instance
122

123
    """
124
    raise NotImplementedError
125

    
126
  def ValidateResult(self, ia, result):
127
    """Validates the result of an request.
128

129
    @param ia: The IAllocator instance
130
    @param result: The IAllocator run result
131
    @returns: If it validates
132

133
    """
134
    if not (ia.success and self.REQ_RESULT(result)):
135
      raise errors.OpExecError("Iallocator returned invalid result,"
136
                               " expected %s, got %s" %
137
                               (self.REQ_RESULT, result))
138

    
139

    
140
class IAReqInstanceAlloc(IARequestBase):
141
  """An instance allocation request.
142

143
  """
144
  # pylint: disable=E1101
145
  MODE = constants.IALLOCATOR_MODE_ALLOC
146
  REQ_PARAMS = [
147
    ("name", ht.TString),
148
    ("memory", ht.TInt),
149
    ("spindle_use", ht.TInt),
150
    ("disks", ht.TListOf(ht.TDict)),
151
    ("disk_template", ht.TString),
152
    ("os", ht.TString),
153
    ("tags", _STRING_LIST),
154
    ("nics", ht.TListOf(ht.TDict)),
155
    ("vcpus", ht.TInt),
156
    ("hypervisor", ht.TString),
157
    ]
158
  REQ_RESULT = ht.TList
159

    
160
  def GetRequest(self, cfg):
161
    """Requests a new instance.
162

163
    The checks for the completeness of the opcode must have already been
164
    done.
165

166
    """
167
    disk_space = gmi.ComputeDiskSize(self.disk_template, self.disks)
168

    
169
    if self.disk_template in constants.DTS_INT_MIRROR:
170
      self.required_nodes = 2
171
    else:
172
      self.required_nodes = 1
173

    
174
    return {
175
      "name": self.name,
176
      "disk_template": self.disk_template,
177
      "tags": self.tags,
178
      "os": self.os,
179
      "vcpus": self.vcpus,
180
      "memory": self.memory,
181
      "spindle_use": self.spindle_use,
182
      "disks": self.disks,
183
      "disk_space_total": disk_space,
184
      "nics": self.nics,
185
      "required_nodes": self.required_nodes,
186
      "hypervisor": self.hypervisor,
187
      }
188

    
189

    
190
class IAReqMultiInstanceAlloc(IARequestBase):
191
  """An multi instance allocation request.
192

193
  """
194
  # pylint: disable=E1101
195
  MODE = constants.IALLOCATOR_MODE_MULTI_ALLOC
196
  REQ_PARAMS = [
197
    ("instances", ht.TListOf(ht.TInstanceOf(IAReqInstanceAlloc)))
198
    ]
199
  _MASUCCESS = \
200
    ht.TListOf(ht.TAnd(ht.TIsLength(2),
201
                       ht.TItems([ht.TNonEmptyString,
202
                                  ht.TListOf(ht.TNonEmptyString),
203
                                  ])))
204
  _MAFAILED = ht.TListOf(ht.TNonEmptyString)
205
  REQ_RESULT = ht.TListOf(ht.TAnd(ht.TIsLength(2),
206
                                  ht.TItems([_MASUCCESS, _MAFAILED])))
207

    
208
  def GetRequest(self, cfg):
209
    return {
210
      "instances": [iareq.GetRequest(cfg) for iareq in self.instances]
211
      }
212

    
213

    
214
class IAReqRelocate(IARequestBase):
215
  """A relocation request.
216

217
  """
218
  # pylint: disable=E1101
219
  MODE = constants.IALLOCATOR_MODE_RELOC
220
  REQ_PARAMS = [
221
    ("name", ht.TString),
222
    ("relocate_from", _STRING_LIST),
223
    ]
224
  REQ_RESULT = ht.TList
225

    
226
  def GetRequest(self, cfg):
227
    """Request an relocation of an instance
228

229
    The checks for the completeness of the opcode must have already been
230
    done.
231

232
    """
233
    instance = cfg.GetInstanceInfo(self.name)
234
    if instance is None:
235
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
236
                                   " IAllocator" % self.name)
237

    
238
    if instance.disk_template not in constants.DTS_MIRRORED:
239
      raise errors.OpPrereqError("Can't relocate non-mirrored instances",
240
                                 errors.ECODE_INVAL)
241

    
242
    if instance.disk_template in constants.DTS_INT_MIRROR and \
243
        len(instance.secondary_nodes) != 1:
244
      raise errors.OpPrereqError("Instance has not exactly one secondary node",
245
                                 errors.ECODE_STATE)
246

    
247
    self.required_nodes = 1
248
    disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
249
    disk_space = gmi.ComputeDiskSize(instance.disk_template, disk_sizes)
250

    
251
    return {
252
      "name": self.name,
253
      "disk_space_total": disk_space,
254
      "required_nodes": self.required_nodes,
255
      "relocate_from": self.relocate_from,
256
      }
257

    
258
  def ValidateResult(self, ia, result):
259
    """Validates the result of an relocation request.
260

261
    """
262
    node2group = dict((name, ndata["group"])
263
                      for (name, ndata) in ia.in_data["nodes"].items())
264

    
265
    fn = compat.partial(self._NodesToGroups, node2group,
266
                        ia.in_data["nodegroups"])
267

    
268
    instance = ia.cfg.GetInstanceInfo(self.name)
269
    request_groups = fn(self.relocate_from + [instance.primary_node])
270
    result_groups = fn(result + [instance.primary_node])
271

    
272
    if ia.success and not set(result_groups).issubset(request_groups):
273
      raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
274
                               " differ from original groups (%s)" %
275
                               (utils.CommaJoin(result_groups),
276
                                utils.CommaJoin(request_groups)))
277

    
278
  @staticmethod
279
  def _NodesToGroups(node2group, groups, nodes):
280
    """Returns a list of unique group names for a list of nodes.
281

282
    @type node2group: dict
283
    @param node2group: Map from node name to group UUID
284
    @type groups: dict
285
    @param groups: Group information
286
    @type nodes: list
287
    @param nodes: Node names
288

289
    """
290
    result = set()
291

    
292
    for node in nodes:
293
      try:
294
        group_uuid = node2group[node]
295
      except KeyError:
296
        # Ignore unknown node
297
        pass
298
      else:
299
        try:
300
          group = groups[group_uuid]
301
        except KeyError:
302
          # Can't find group, let's use UUID
303
          group_name = group_uuid
304
        else:
305
          group_name = group["name"]
306

    
307
        result.add(group_name)
308

    
309
    return sorted(result)
310

    
311

    
312
class IAReqNodeEvac(IARequestBase):
313
  """A node evacuation request.
314

315
  """
316
  # pylint: disable=E1101
317
  MODE = constants.IALLOCATOR_MODE_NODE_EVAC
318
  REQ_PARAMS = [
319
    ("instances", _STRING_LIST),
320
    ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
321
    ]
322
  REQ_RESULT = _NEVAC_RESULT
323

    
324
  def GetRequest(self, cfg):
325
    """Get data for node-evacuate requests.
326

327
    """
328
    return {
329
      "instances": self.instances,
330
      "evac_mode": self.evac_mode,
331
      }
332

    
333

    
334
class IAReqGroupChange(IARequestBase):
335
  """A group change request.
336

337
  """
338
  # pylint: disable=E1101
339
  MODE = constants.IALLOCATOR_MODE_CHG_GROUP
340
  REQ_PARAMS = [
341
    ("instances", _STRING_LIST),
342
    ("target_groups", _STRING_LIST),
343
    ]
344
  REQ_RESULT = _NEVAC_RESULT
345

    
346
  def GetRequest(self, cfg):
347
    """Get data for node-evacuate requests.
348

349
    """
350
    return {
351
      "instances": self.instances,
352
      "target_groups": self.target_groups,
353
      }
354

    
355

    
356
class IAllocator(object):
357
  """IAllocator framework.
358

359
  An IAllocator instance has three sets of attributes:
360
    - cfg that is needed to query the cluster
361
    - input data (all members of the _KEYS class attribute are required)
362
    - four buffer attributes (in|out_data|text), that represent the
363
      input (to the external script) in text and data structure format,
364
      and the output from it, again in two formats
365
    - the result variables from the script (success, info, nodes) for
366
      easy usage
367

368
  """
369
  # pylint: disable=R0902
370
  # lots of instance attributes
371

    
372
  def __init__(self, cfg, rpc_runner, req):
373
    self.cfg = cfg
374
    self.rpc = rpc_runner
375
    self.req = req
376
    # init buffer variables
377
    self.in_text = self.out_text = self.in_data = self.out_data = None
378
    # init result fields
379
    self.success = self.info = self.result = None
380

    
381
    self._BuildInputData(req)
382

    
383
  def _ComputeClusterData(self):
384
    """Compute the generic allocator input data.
385

386
    This is the data that is independent of the actual operation.
387

388
    """
389
    cfg = self.cfg
390
    cluster_info = cfg.GetClusterInfo()
391
    # cluster data
392
    data = {
393
      "version": constants.IALLOCATOR_VERSION,
394
      "cluster_name": cfg.GetClusterName(),
395
      "cluster_tags": list(cluster_info.GetTags()),
396
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
397
      "ipolicy": cluster_info.ipolicy,
398
      }
399
    ninfo = cfg.GetAllNodesInfo()
400
    iinfo = cfg.GetAllInstancesInfo().values()
401
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
402

    
403
    # node data
404
    node_list = [n.name for n in ninfo.values() if n.vm_capable]
405

    
406
    if isinstance(self.req, IAReqInstanceAlloc):
407
      hypervisor_name = self.req.hypervisor
408
    elif isinstance(self.req, IAReqRelocate):
409
      hypervisor_name = cfg.GetInstanceInfo(self.req.name).hypervisor
410
    else:
411
      hypervisor_name = cluster_info.primary_hypervisor
412

    
413
    node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
414
                                        [hypervisor_name])
415
    node_iinfo = \
416
      self.rpc.call_all_instances_info(node_list,
417
                                       cluster_info.enabled_hypervisors)
418

    
419
    data["nodegroups"] = self._ComputeNodeGroupData(cfg)
420

    
421
    config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
422
    data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
423
                                                 i_list, config_ndata)
424
    assert len(data["nodes"]) == len(ninfo), \
425
        "Incomplete node data computed"
426

    
427
    data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
428

    
429
    self.in_data = data
430

    
431
  @staticmethod
432
  def _ComputeNodeGroupData(cfg):
433
    """Compute node groups data.
434

435
    """
436
    cluster = cfg.GetClusterInfo()
437
    ng = dict((guuid, {
438
      "name": gdata.name,
439
      "alloc_policy": gdata.alloc_policy,
440
      "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
441
      })
442
      for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
443

    
444
    return ng
445

    
446
  @staticmethod
447
  def _ComputeBasicNodeData(cfg, node_cfg):
448
    """Compute global node data.
449

450
    @rtype: dict
451
    @returns: a dict of name: (node dict, node config)
452

453
    """
454
    # fill in static (config-based) values
455
    node_results = dict((ninfo.name, {
456
      "tags": list(ninfo.GetTags()),
457
      "primary_ip": ninfo.primary_ip,
458
      "secondary_ip": ninfo.secondary_ip,
459
      "offline": ninfo.offline,
460
      "drained": ninfo.drained,
461
      "master_candidate": ninfo.master_candidate,
462
      "group": ninfo.group,
463
      "master_capable": ninfo.master_capable,
464
      "vm_capable": ninfo.vm_capable,
465
      "ndparams": cfg.GetNdParams(ninfo),
466
      })
467
      for ninfo in node_cfg.values())
468

    
469
    return node_results
470

    
471
  @staticmethod
472
  def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
473
                              node_results):
474
    """Compute global node data.
475

476
    @param node_results: the basic node structures as filled from the config
477

478
    """
479
    #TODO(dynmem): compute the right data on MAX and MIN memory
480
    # make a copy of the current dict
481
    node_results = dict(node_results)
482
    for nname, nresult in node_data.items():
483
      assert nname in node_results, "Missing basic data for node %s" % nname
484
      ninfo = node_cfg[nname]
485

    
486
      if not (ninfo.offline or ninfo.drained):
487
        nresult.Raise("Can't get data for node %s" % nname)
488
        node_iinfo[nname].Raise("Can't get node instance info from node %s" %
489
                                nname)
490
        remote_info = rpc.MakeLegacyNodeInfo(nresult.payload)
491

    
492
        for attr in ["memory_total", "memory_free", "memory_dom0",
493
                     "vg_size", "vg_free", "cpu_total"]:
494
          if attr not in remote_info:
495
            raise errors.OpExecError("Node '%s' didn't return attribute"
496
                                     " '%s'" % (nname, attr))
497
          if not isinstance(remote_info[attr], int):
498
            raise errors.OpExecError("Node '%s' returned invalid value"
499
                                     " for '%s': %s" %
500
                                     (nname, attr, remote_info[attr]))
501
        # compute memory used by primary instances
502
        i_p_mem = i_p_up_mem = 0
503
        for iinfo, beinfo in i_list:
504
          if iinfo.primary_node == nname:
505
            i_p_mem += beinfo[constants.BE_MAXMEM]
506
            if iinfo.name not in node_iinfo[nname].payload:
507
              i_used_mem = 0
508
            else:
509
              i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
510
            i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
511
            remote_info["memory_free"] -= max(0, i_mem_diff)
512

    
513
            if iinfo.admin_state == constants.ADMINST_UP:
514
              i_p_up_mem += beinfo[constants.BE_MAXMEM]
515

    
516
        # compute memory used by instances
517
        pnr_dyn = {
518
          "total_memory": remote_info["memory_total"],
519
          "reserved_memory": remote_info["memory_dom0"],
520
          "free_memory": remote_info["memory_free"],
521
          "total_disk": remote_info["vg_size"],
522
          "free_disk": remote_info["vg_free"],
523
          "total_cpus": remote_info["cpu_total"],
524
          "i_pri_memory": i_p_mem,
525
          "i_pri_up_memory": i_p_up_mem,
526
          }
527
        pnr_dyn.update(node_results[nname])
528
        node_results[nname] = pnr_dyn
529

    
530
    return node_results
531

    
532
  @staticmethod
533
  def _ComputeInstanceData(cluster_info, i_list):
534
    """Compute global instance data.
535

536
    """
537
    instance_data = {}
538
    for iinfo, beinfo in i_list:
539
      nic_data = []
540
      for nic in iinfo.nics:
541
        filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
542
        nic_dict = {
543
          "mac": nic.mac,
544
          "ip": nic.ip,
545
          "mode": filled_params[constants.NIC_MODE],
546
          "link": filled_params[constants.NIC_LINK],
547
          }
548
        if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
549
          nic_dict["bridge"] = filled_params[constants.NIC_LINK]
550
        nic_data.append(nic_dict)
551
      pir = {
552
        "tags": list(iinfo.GetTags()),
553
        "admin_state": iinfo.admin_state,
554
        "vcpus": beinfo[constants.BE_VCPUS],
555
        "memory": beinfo[constants.BE_MAXMEM],
556
        "spindle_use": beinfo[constants.BE_SPINDLE_USE],
557
        "os": iinfo.os,
558
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
559
        "nics": nic_data,
560
        "disks": [{constants.IDISK_SIZE: dsk.size,
561
                   constants.IDISK_MODE: dsk.mode}
562
                  for dsk in iinfo.disks],
563
        "disk_template": iinfo.disk_template,
564
        "hypervisor": iinfo.hypervisor,
565
        }
566
      pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template,
567
                                                    pir["disks"])
568
      instance_data[iinfo.name] = pir
569

    
570
    return instance_data
571

    
572
  def _BuildInputData(self, req):
573
    """Build input data structures.
574

575
    """
576
    self._ComputeClusterData()
577

    
578
    request = req.GetRequest(self.cfg)
579
    request["type"] = req.MODE
580
    self.in_data["request"] = request
581

    
582
    self.in_text = serializer.Dump(self.in_data)
583

    
584
  def Run(self, name, validate=True, call_fn=None):
585
    """Run an instance allocator and return the results.
586

587
    """
588
    if call_fn is None:
589
      call_fn = self.rpc.call_iallocator_runner
590

    
591
    result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
592
    result.Raise("Failure while running the iallocator script")
593

    
594
    self.required_nodes = self.req.required_nodes
595
    self.out_text = result.payload
596
    if validate:
597
      self._ValidateResult()
598

    
599
  def _ValidateResult(self):
600
    """Process the allocator results.
601

602
    This will process and if successful save the result in
603
    self.out_data and the other parameters.
604

605
    """
606
    try:
607
      rdict = serializer.Load(self.out_text)
608
    except Exception, err:
609
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
610

    
611
    if not isinstance(rdict, dict):
612
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
613

    
614
    # TODO: remove backwards compatiblity in later versions
615
    if "nodes" in rdict and "result" not in rdict:
616
      rdict["result"] = rdict["nodes"]
617
      del rdict["nodes"]
618

    
619
    for key in "success", "info", "result":
620
      if key not in rdict:
621
        raise errors.OpExecError("Can't parse iallocator results:"
622
                                 " missing key '%s'" % key)
623
      setattr(self, key, rdict[key])
624

    
625
    self.req.ValidateResult(self, self.result)
626
    self.out_data = rdict