Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / iallocator.py @ 0fcd0cad

History | View | Annotate | Download (18.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the iallocator code."""
23

    
24
from ganeti import compat
25
from ganeti import constants
26
from ganeti import errors
27
from ganeti import ht
28
from ganeti import objectutils
29
from ganeti import opcodes
30
from ganeti import rpc
31
from ganeti import serializer
32
from ganeti import utils
33

    
34
import ganeti.masterd.instance as gmi
35

    
36

    
37
_STRING_LIST = ht.TListOf(ht.TString)
38
_JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
39
   # pylint: disable=E1101
40
   # Class '...' has no 'OP_ID' member
41
   "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
42
                        opcodes.OpInstanceMigrate.OP_ID,
43
                        opcodes.OpInstanceReplaceDisks.OP_ID])
44
   })))
45

    
46
_NEVAC_MOVED = \
47
  ht.TListOf(ht.TAnd(ht.TIsLength(3),
48
                     ht.TItems([ht.TNonEmptyString,
49
                                ht.TNonEmptyString,
50
                                ht.TListOf(ht.TNonEmptyString),
51
                                ])))
52
_NEVAC_FAILED = \
53
  ht.TListOf(ht.TAnd(ht.TIsLength(2),
54
                     ht.TItems([ht.TNonEmptyString,
55
                                ht.TMaybeString,
56
                                ])))
57
_NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
58
                        ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
59

    
60

    
61
class _AutoReqParam(objectutils.AutoSlots):
62
  """Meta class for request definitions.
63

64
  """
65
  @classmethod
66
  def _GetSlots(mcs, attrs):
67
    """Extract the slots out of REQ_PARAMS.
68

69
    """
70
    params = attrs.setdefault("REQ_PARAMS", [])
71
    return [slot for (slot, _) in params]
72

    
73

    
74
class IARequestBase(objectutils.ValidatedSlots):
75
  """A generic IAllocator request object.
76

77
  """
78
  __metaclass__ = _AutoReqParam
79

    
80
  MODE = NotImplemented
81
  REQ_PARAMS = [
82
    ("required_nodes", ht.TPositiveInt)
83
    ]
84
  REQ_RESULT = NotImplemented
85

    
86
  def __init__(self, **kwargs):
87
    """Constructor for IARequestBase.
88

89
    The constructor takes only keyword arguments and will set
90
    attributes on this object based on the passed arguments. As such,
91
    it means that you should not pass arguments which are not in the
92
    REQ_PARAMS attribute for this class.
93

94
    """
95
    self.required_nodes = 0
96
    objectutils.ValidatedSlots.__init__(self, **kwargs)
97

    
98
    self.Validate()
99

    
100
  def Validate(self):
101
    """Validates all parameters of the request.
102

103
    """
104
    assert self.MODE in constants.VALID_IALLOCATOR_MODES
105

    
106
    for (param, validator) in self.REQ_PARAMS:
107
      if not hasattr(self, param):
108
        raise errors.OpPrereqError("Request is missing '%s' parameter" % param,
109
                                   errors.ECODE_INVAL)
110

    
111
      value = getattr(self, param)
112
      if not validator(value):
113
        raise errors.OpPrereqError(("Request parameter '%s' has invalid"
114
                                    " type %s/value %s") %
115
                                    (param, type(value), value),
116
                                    errors.ECODE_INVAL)
117

    
118
  def GetRequest(self, cfg):
119
    """Gets the request data dict.
120

121
    @param cfg: The configuration instance
122

123
    """
124
    raise NotImplementedError
125

    
126
  def ValidateResult(self, ia, result):
127
    """Validates the result of an request.
128

129
    @param ia: The IAllocator instance
130
    @param result: The IAllocator run result
131
    @returns: If it validates
132

133
    """
134
    if not (ia.success and self.REQ_RESULT(result)):
135
      raise errors.OpExecError("Iallocator returned invalid result,"
136
                               " expected %s, got %s" %
137
                               (self.REQ_RESULT, result))
138

    
139

    
140
class IAReqInstanceAlloc(IARequestBase):
141
  """An instance allocation request.
142

143
  """
144
  MODE = constants.IALLOCATOR_MODE_ALLOC
145
  REQ_PARAMS = [
146
    ("name", ht.TString),
147
    ("memory", ht.TInt),
148
    ("spindle_use", ht.TInt),
149
    ("disks", ht.TListOf(ht.TDict)),
150
    ("disk_template", ht.TString),
151
    ("os", ht.TString),
152
    ("tags", _STRING_LIST),
153
    ("nics", ht.TListOf(ht.TDict)),
154
    ("vcpus", ht.TInt),
155
    ("hypervisor", ht.TString),
156
    ]
157
  REQ_RESULT = ht.TList
158

    
159
  def GetRequest(self, cfg):
160
    """Requests a new instance.
161

162
    The checks for the completeness of the opcode must have already been
163
    done.
164

165
    """
166
    disk_space = gmi.ComputeDiskSize(self.disk_template, self.disks)
167

    
168
    if self.disk_template in constants.DTS_INT_MIRROR:
169
      self.required_nodes = 2
170
    else:
171
      self.required_nodes = 1
172

    
173
    return {
174
      "name": self.name,
175
      "disk_template": self.disk_template,
176
      "tags": self.tags,
177
      "os": self.os,
178
      "vcpus": self.vcpus,
179
      "memory": self.memory,
180
      "spindle_use": self.spindle_use,
181
      "disks": self.disks,
182
      "disk_space_total": disk_space,
183
      "nics": self.nics,
184
      "required_nodes": self.required_nodes,
185
      "hypervisor": self.hypervisor,
186
      }
187

    
188

    
189
class IAReqRelocate(IARequestBase):
190
  """A relocation request.
191

192
  """
193
  MODE = constants.IALLOCATOR_MODE_RELOC
194
  REQ_PARAMS = [
195
    ("name", ht.TString),
196
    ("relocate_from", _STRING_LIST),
197
    ]
198
  REQ_RESULT = ht.TList
199

    
200
  def GetRequest(self, cfg):
201
    """Request an relocation of an instance
202

203
    The checks for the completeness of the opcode must have already been
204
    done.
205

206
    """
207
    instance = cfg.GetInstanceInfo(self.name)
208
    if instance is None:
209
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
210
                                   " IAllocator" % self.name)
211

    
212
    if instance.disk_template not in constants.DTS_MIRRORED:
213
      raise errors.OpPrereqError("Can't relocate non-mirrored instances",
214
                                 errors.ECODE_INVAL)
215

    
216
    if instance.disk_template in constants.DTS_INT_MIRROR and \
217
        len(instance.secondary_nodes) != 1:
218
      raise errors.OpPrereqError("Instance has not exactly one secondary node",
219
                                 errors.ECODE_STATE)
220

    
221
    self.required_nodes = 1
222
    disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
223
    disk_space = gmi.ComputeDiskSize(instance.disk_template, disk_sizes)
224

    
225
    return {
226
      "name": self.name,
227
      "disk_space_total": disk_space,
228
      "required_nodes": self.required_nodes,
229
      "relocate_from": self.relocate_from,
230
      }
231

    
232
  def ValidateResult(self, ia, result):
233
    """Validates the result of an relocation request.
234

235
    """
236
    node2group = dict((name, ndata["group"])
237
                      for (name, ndata) in ia.in_data["nodes"].items())
238

    
239
    fn = compat.partial(self._NodesToGroups, node2group,
240
                        ia.in_data["nodegroups"])
241

    
242
    instance = ia.cfg.GetInstanceInfo(self.name)
243
    request_groups = fn(self.relocate_from + [instance.primary_node])
244
    result_groups = fn(result + [instance.primary_node])
245

    
246
    if ia.success and not set(result_groups).issubset(request_groups):
247
      raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
248
                               " differ from original groups (%s)" %
249
                               (utils.CommaJoin(result_groups),
250
                                utils.CommaJoin(request_groups)))
251

    
252
  @staticmethod
253
  def _NodesToGroups(node2group, groups, nodes):
254
    """Returns a list of unique group names for a list of nodes.
255

256
    @type node2group: dict
257
    @param node2group: Map from node name to group UUID
258
    @type groups: dict
259
    @param groups: Group information
260
    @type nodes: list
261
    @param nodes: Node names
262

263
    """
264
    result = set()
265

    
266
    for node in nodes:
267
      try:
268
        group_uuid = node2group[node]
269
      except KeyError:
270
        # Ignore unknown node
271
        pass
272
      else:
273
        try:
274
          group = groups[group_uuid]
275
        except KeyError:
276
          # Can't find group, let's use UUID
277
          group_name = group_uuid
278
        else:
279
          group_name = group["name"]
280

    
281
        result.add(group_name)
282

    
283
    return sorted(result)
284

    
285

    
286
class IAReqNodeEvac(IARequestBase):
287
  """A node evacuation request.
288

289
  """
290
  MODE = constants.IALLOCATOR_MODE_NODE_EVAC
291
  REQ_PARAMS = [
292
    ("instances", _STRING_LIST),
293
    ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
294
    ]
295
  REQ_RESULT = _NEVAC_RESULT
296

    
297
  def GetRequest(self, cfg):
298
    """Get data for node-evacuate requests.
299

300
    """
301
    return {
302
      "instances": self.instances,
303
      "evac_mode": self.evac_mode,
304
      }
305

    
306

    
307
class IAReqGroupChange(IARequestBase):
308
  """A group change request.
309

310
  """
311
  MODE = constants.IALLOCATOR_MODE_CHG_GROUP
312
  REQ_PARAMS = [
313
    ("instances", _STRING_LIST),
314
    ("target_groups", _STRING_LIST),
315
    ]
316
  REQ_RESULT = _NEVAC_RESULT
317

    
318
  def GetRequest(self, cfg):
319
    """Get data for node-evacuate requests.
320

321
    """
322
    return {
323
      "instances": self.instances,
324
      "target_groups": self.target_groups,
325
      }
326

    
327

    
328
class IAllocator(object):
329
  """IAllocator framework.
330

331
  An IAllocator instance has three sets of attributes:
332
    - cfg that is needed to query the cluster
333
    - input data (all members of the _KEYS class attribute are required)
334
    - four buffer attributes (in|out_data|text), that represent the
335
      input (to the external script) in text and data structure format,
336
      and the output from it, again in two formats
337
    - the result variables from the script (success, info, nodes) for
338
      easy usage
339

340
  """
341
  # pylint: disable=R0902
342
  # lots of instance attributes
343

    
344
  def __init__(self, cfg, rpc_runner, req):
345
    self.cfg = cfg
346
    self.rpc = rpc_runner
347
    self.req = req
348
    # init buffer variables
349
    self.in_text = self.out_text = self.in_data = self.out_data = None
350
    # init result fields
351
    self.success = self.info = self.result = None
352

    
353
    self._BuildInputData(req)
354

    
355
  def _ComputeClusterData(self):
356
    """Compute the generic allocator input data.
357

358
    This is the data that is independent of the actual operation.
359

360
    """
361
    cfg = self.cfg
362
    cluster_info = cfg.GetClusterInfo()
363
    # cluster data
364
    data = {
365
      "version": constants.IALLOCATOR_VERSION,
366
      "cluster_name": cfg.GetClusterName(),
367
      "cluster_tags": list(cluster_info.GetTags()),
368
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
369
      "ipolicy": cluster_info.ipolicy,
370
      }
371
    ninfo = cfg.GetAllNodesInfo()
372
    iinfo = cfg.GetAllInstancesInfo().values()
373
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
374

    
375
    # node data
376
    node_list = [n.name for n in ninfo.values() if n.vm_capable]
377

    
378
    if isinstance(self.req, IAReqInstanceAlloc):
379
      hypervisor_name = self.req.hypervisor
380
    elif isinstance(self.req, IAReqRelocate):
381
      hypervisor_name = cfg.GetInstanceInfo(self.req.name).hypervisor
382
    else:
383
      hypervisor_name = cluster_info.primary_hypervisor
384

    
385
    node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
386
                                        [hypervisor_name])
387
    node_iinfo = \
388
      self.rpc.call_all_instances_info(node_list,
389
                                       cluster_info.enabled_hypervisors)
390

    
391
    data["nodegroups"] = self._ComputeNodeGroupData(cfg)
392

    
393
    config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
394
    data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
395
                                                 i_list, config_ndata)
396
    assert len(data["nodes"]) == len(ninfo), \
397
        "Incomplete node data computed"
398

    
399
    data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
400

    
401
    self.in_data = data
402

    
403
  @staticmethod
404
  def _ComputeNodeGroupData(cfg):
405
    """Compute node groups data.
406

407
    """
408
    cluster = cfg.GetClusterInfo()
409
    ng = dict((guuid, {
410
      "name": gdata.name,
411
      "alloc_policy": gdata.alloc_policy,
412
      "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
413
      })
414
      for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
415

    
416
    return ng
417

    
418
  @staticmethod
419
  def _ComputeBasicNodeData(cfg, node_cfg):
420
    """Compute global node data.
421

422
    @rtype: dict
423
    @returns: a dict of name: (node dict, node config)
424

425
    """
426
    # fill in static (config-based) values
427
    node_results = dict((ninfo.name, {
428
      "tags": list(ninfo.GetTags()),
429
      "primary_ip": ninfo.primary_ip,
430
      "secondary_ip": ninfo.secondary_ip,
431
      "offline": ninfo.offline,
432
      "drained": ninfo.drained,
433
      "master_candidate": ninfo.master_candidate,
434
      "group": ninfo.group,
435
      "master_capable": ninfo.master_capable,
436
      "vm_capable": ninfo.vm_capable,
437
      "ndparams": cfg.GetNdParams(ninfo),
438
      })
439
      for ninfo in node_cfg.values())
440

    
441
    return node_results
442

    
443
  @staticmethod
444
  def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
445
                              node_results):
446
    """Compute global node data.
447

448
    @param node_results: the basic node structures as filled from the config
449

450
    """
451
    #TODO(dynmem): compute the right data on MAX and MIN memory
452
    # make a copy of the current dict
453
    node_results = dict(node_results)
454
    for nname, nresult in node_data.items():
455
      assert nname in node_results, "Missing basic data for node %s" % nname
456
      ninfo = node_cfg[nname]
457

    
458
      if not (ninfo.offline or ninfo.drained):
459
        nresult.Raise("Can't get data for node %s" % nname)
460
        node_iinfo[nname].Raise("Can't get node instance info from node %s" %
461
                                nname)
462
        remote_info = rpc.MakeLegacyNodeInfo(nresult.payload)
463

    
464
        for attr in ["memory_total", "memory_free", "memory_dom0",
465
                     "vg_size", "vg_free", "cpu_total"]:
466
          if attr not in remote_info:
467
            raise errors.OpExecError("Node '%s' didn't return attribute"
468
                                     " '%s'" % (nname, attr))
469
          if not isinstance(remote_info[attr], int):
470
            raise errors.OpExecError("Node '%s' returned invalid value"
471
                                     " for '%s': %s" %
472
                                     (nname, attr, remote_info[attr]))
473
        # compute memory used by primary instances
474
        i_p_mem = i_p_up_mem = 0
475
        for iinfo, beinfo in i_list:
476
          if iinfo.primary_node == nname:
477
            i_p_mem += beinfo[constants.BE_MAXMEM]
478
            if iinfo.name not in node_iinfo[nname].payload:
479
              i_used_mem = 0
480
            else:
481
              i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
482
            i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
483
            remote_info["memory_free"] -= max(0, i_mem_diff)
484

    
485
            if iinfo.admin_state == constants.ADMINST_UP:
486
              i_p_up_mem += beinfo[constants.BE_MAXMEM]
487

    
488
        # compute memory used by instances
489
        pnr_dyn = {
490
          "total_memory": remote_info["memory_total"],
491
          "reserved_memory": remote_info["memory_dom0"],
492
          "free_memory": remote_info["memory_free"],
493
          "total_disk": remote_info["vg_size"],
494
          "free_disk": remote_info["vg_free"],
495
          "total_cpus": remote_info["cpu_total"],
496
          "i_pri_memory": i_p_mem,
497
          "i_pri_up_memory": i_p_up_mem,
498
          }
499
        pnr_dyn.update(node_results[nname])
500
        node_results[nname] = pnr_dyn
501

    
502
    return node_results
503

    
504
  @staticmethod
505
  def _ComputeInstanceData(cluster_info, i_list):
506
    """Compute global instance data.
507

508
    """
509
    instance_data = {}
510
    for iinfo, beinfo in i_list:
511
      nic_data = []
512
      for nic in iinfo.nics:
513
        filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
514
        nic_dict = {
515
          "mac": nic.mac,
516
          "ip": nic.ip,
517
          "mode": filled_params[constants.NIC_MODE],
518
          "link": filled_params[constants.NIC_LINK],
519
          }
520
        if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
521
          nic_dict["bridge"] = filled_params[constants.NIC_LINK]
522
        nic_data.append(nic_dict)
523
      pir = {
524
        "tags": list(iinfo.GetTags()),
525
        "admin_state": iinfo.admin_state,
526
        "vcpus": beinfo[constants.BE_VCPUS],
527
        "memory": beinfo[constants.BE_MAXMEM],
528
        "spindle_use": beinfo[constants.BE_SPINDLE_USE],
529
        "os": iinfo.os,
530
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
531
        "nics": nic_data,
532
        "disks": [{constants.IDISK_SIZE: dsk.size,
533
                   constants.IDISK_MODE: dsk.mode}
534
                  for dsk in iinfo.disks],
535
        "disk_template": iinfo.disk_template,
536
        "hypervisor": iinfo.hypervisor,
537
        }
538
      pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template,
539
                                                    pir["disks"])
540
      instance_data[iinfo.name] = pir
541

    
542
    return instance_data
543

    
544
  def _BuildInputData(self, req):
545
    """Build input data structures.
546

547
    """
548
    self._ComputeClusterData()
549

    
550
    request = req.GetRequest(self.cfg)
551
    request["type"] = req.MODE
552
    self.in_data["request"] = request
553

    
554
    self.in_text = serializer.Dump(self.in_data)
555

    
556
  def Run(self, name, validate=True, call_fn=None):
557
    """Run an instance allocator and return the results.
558

559
    """
560
    if call_fn is None:
561
      call_fn = self.rpc.call_iallocator_runner
562

    
563
    result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
564
    result.Raise("Failure while running the iallocator script")
565

    
566
    self.required_nodes = self.req.required_nodes
567
    self.out_text = result.payload
568
    if validate:
569
      self._ValidateResult()
570

    
571
  def _ValidateResult(self):
572
    """Process the allocator results.
573

574
    This will process and if successful save the result in
575
    self.out_data and the other parameters.
576

577
    """
578
    try:
579
      rdict = serializer.Load(self.out_text)
580
    except Exception, err:
581
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
582

    
583
    if not isinstance(rdict, dict):
584
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
585

    
586
    # TODO: remove backwards compatiblity in later versions
587
    if "nodes" in rdict and "result" not in rdict:
588
      rdict["result"] = rdict["nodes"]
589
      del rdict["nodes"]
590

    
591
    for key in "success", "info", "result":
592
      if key not in rdict:
593
        raise errors.OpExecError("Can't parse iallocator results:"
594
                                 " missing key '%s'" % key)
595
      setattr(self, key, rdict[key])
596

    
597
    self.req.ValidateResult(self, self.result)
598
    self.out_data = rdict