Statistics
| Branch: | Tag: | Revision:

root / lib / masterd / iallocator.py @ b1e47e2d

History | View | Annotate | Download (19 kB)

1
#
2
#
3

    
4
# Copyright (C) 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the iallocator code."""
23

    
24
from ganeti import compat
25
from ganeti import constants
26
from ganeti import errors
27
from ganeti import ht
28
from ganeti import objectutils
29
from ganeti import opcodes
30
from ganeti import rpc
31
from ganeti import serializer
32
from ganeti import utils
33

    
34
import ganeti.masterd.instance as gmi
35

    
36

    
37
_STRING_LIST = ht.TListOf(ht.TString)
38
_JOB_LIST = ht.TListOf(ht.TListOf(ht.TStrictDict(True, False, {
39
   # pylint: disable=E1101
40
   # Class '...' has no 'OP_ID' member
41
   "OP_ID": ht.TElemOf([opcodes.OpInstanceFailover.OP_ID,
42
                        opcodes.OpInstanceMigrate.OP_ID,
43
                        opcodes.OpInstanceReplaceDisks.OP_ID])
44
   })))
45

    
46
_NEVAC_MOVED = \
47
  ht.TListOf(ht.TAnd(ht.TIsLength(3),
48
                     ht.TItems([ht.TNonEmptyString,
49
                                ht.TNonEmptyString,
50
                                ht.TListOf(ht.TNonEmptyString),
51
                                ])))
52
_NEVAC_FAILED = \
53
  ht.TListOf(ht.TAnd(ht.TIsLength(2),
54
                     ht.TItems([ht.TNonEmptyString,
55
                                ht.TMaybeString,
56
                                ])))
57
_NEVAC_RESULT = ht.TAnd(ht.TIsLength(3),
58
                        ht.TItems([_NEVAC_MOVED, _NEVAC_FAILED, _JOB_LIST]))
59

    
60

    
61
class _AutoReqParam(objectutils.AutoSlots):
62
  """Meta class for request definitions.
63

64
  """
65
  @classmethod
66
  def _GetSlots(mcs, attrs):
67
    """Extract the slots out of REQ_PARAMS.
68

69
    """
70
    params = attrs.setdefault("REQ_PARAMS", [])
71
    return [slot for (slot, _) in params]
72

    
73

    
74
class IARequestBase(objectutils.ValidatedSlots):
75
  """A generic IAllocator request object.
76

77
  """
78
  __metaclass__ = _AutoReqParam
79

    
80
  MODE = NotImplemented
81
  REQ_PARAMS = [
82
    ("required_nodes", ht.TPositiveInt)
83
    ]
84
  REQ_RESULT = NotImplemented
85

    
86
  def __init__(self, **kwargs):
87
    """Constructor for IARequestBase.
88

89
    The constructor takes only keyword arguments and will set
90
    attributes on this object based on the passed arguments. As such,
91
    it means that you should not pass arguments which are not in the
92
    REQ_PARAMS attribute for this class.
93

94
    """
95
    self.required_nodes = 0
96
    objectutils.ValidatedSlots.__init__(self, **kwargs)
97

    
98
    self.Validate()
99

    
100
  def Validate(self):
101
    """Validates all parameters of the request.
102

103
    """
104
    assert self.MODE in constants.VALID_IALLOCATOR_MODES
105

    
106
    for (param, validator) in self.REQ_PARAMS:
107
      if not hasattr(self, param):
108
        raise errors.OpPrereqError("Request is missing '%s' parameter" % param,
109
                                   errors.ECODE_INVAL)
110

    
111
      value = getattr(self, param)
112
      if not validator(value):
113
        raise errors.OpPrereqError(("Request parameter '%s' has invalid"
114
                                    " type %s/value %s") %
115
                                    (param, type(value), value),
116
                                    errors.ECODE_INVAL)
117

    
118
  def GetRequest(self, cfg):
119
    """Gets the request data dict.
120

121
    @param cfg: The configuration instance
122

123
    """
124
    raise NotImplementedError
125

    
126
  def ValidateResult(self, ia, result):
127
    """Validates the result of an request.
128

129
    @param ia: The IAllocator instance
130
    @param result: The IAllocator run result
131
    @returns: If it validates
132

133
    """
134
    if not (ia.success and self.REQ_RESULT(result)):
135
      raise errors.OpExecError("Iallocator returned invalid result,"
136
                               " expected %s, got %s" %
137
                               (self.REQ_RESULT, result))
138

    
139

    
140
class IAReqInstanceAlloc(IARequestBase):
141
  """An instance allocation request.
142

143
  """
144
  MODE = constants.IALLOCATOR_MODE_ALLOC
145
  REQ_PARAMS = [
146
    ("name", ht.TString),
147
    ("memory", ht.TInt),
148
    ("spindle_use", ht.TInt),
149
    ("disks", ht.TListOf(ht.TDict)),
150
    ("disk_template", ht.TString),
151
    ("os", ht.TString),
152
    ("tags", _STRING_LIST),
153
    ("nics", ht.TListOf(ht.TDict)),
154
    ("vcpus", ht.TInt),
155
    ("hypervisor", ht.TString),
156
    ]
157
  REQ_RESULT = ht.TList
158

    
159
  def GetRequest(self, cfg):
160
    """Requests a new instance.
161

162
    The checks for the completeness of the opcode must have already been
163
    done.
164

165
    """
166
    disk_space = gmi.ComputeDiskSize(self.disk_template, self.disks)
167

    
168
    if self.disk_template in constants.DTS_INT_MIRROR:
169
      self.required_nodes = 2
170
    else:
171
      self.required_nodes = 1
172

    
173
    return {
174
      "name": self.name,
175
      "disk_template": self.disk_template,
176
      "tags": self.tags,
177
      "os": self.os,
178
      "vcpus": self.vcpus,
179
      "memory": self.memory,
180
      "spindle_use": self.spindle_use,
181
      "disks": self.disks,
182
      "disk_space_total": disk_space,
183
      "nics": self.nics,
184
      "required_nodes": self.required_nodes,
185
      "hypervisor": self.hypervisor,
186
      }
187

    
188

    
189
class IAReqMultiInstanceAlloc(IARequestBase):
190
  """An multi instance allocation request.
191

192
  """
193
  MODE = constants.IALLOCATOR_MODE_MULTI_ALLOC
194
  REQ_PARAMS = [
195
    ("instances", ht.TListOf(ht.TInstanceOf(IAReqInstanceAlloc)))
196
    ]
197
  _MASUCCESS = \
198
    ht.TListOf(ht.TAnd(ht.TIsLength(2),
199
                       ht.TItems([ht.TNonEmptyString,
200
                                  ht.TListOf(ht.TNonEmptyString),
201
                                  ])))
202
  _MAFAILED = ht.TListOf(ht.TNonEmptyString)
203
  REQ_RESULT = ht.TListOf(ht.TAnd(ht.TIsLength(2),
204
                                  ht.TItems([_MASUCCESS, _MAFAILED])))
205

    
206
  def GetRequest(self, cfg):
207
    return {
208
      "instances": [iareq.GetRequest(cfg) for iareq in self.instances]
209
      }
210

    
211

    
212
class IAReqRelocate(IARequestBase):
213
  """A relocation request.
214

215
  """
216
  MODE = constants.IALLOCATOR_MODE_RELOC
217
  REQ_PARAMS = [
218
    ("name", ht.TString),
219
    ("relocate_from", _STRING_LIST),
220
    ]
221
  REQ_RESULT = ht.TList
222

    
223
  def GetRequest(self, cfg):
224
    """Request an relocation of an instance
225

226
    The checks for the completeness of the opcode must have already been
227
    done.
228

229
    """
230
    instance = cfg.GetInstanceInfo(self.name)
231
    if instance is None:
232
      raise errors.ProgrammerError("Unknown instance '%s' passed to"
233
                                   " IAllocator" % self.name)
234

    
235
    if instance.disk_template not in constants.DTS_MIRRORED:
236
      raise errors.OpPrereqError("Can't relocate non-mirrored instances",
237
                                 errors.ECODE_INVAL)
238

    
239
    if instance.disk_template in constants.DTS_INT_MIRROR and \
240
        len(instance.secondary_nodes) != 1:
241
      raise errors.OpPrereqError("Instance has not exactly one secondary node",
242
                                 errors.ECODE_STATE)
243

    
244
    self.required_nodes = 1
245
    disk_sizes = [{constants.IDISK_SIZE: disk.size} for disk in instance.disks]
246
    disk_space = gmi.ComputeDiskSize(instance.disk_template, disk_sizes)
247

    
248
    return {
249
      "name": self.name,
250
      "disk_space_total": disk_space,
251
      "required_nodes": self.required_nodes,
252
      "relocate_from": self.relocate_from,
253
      }
254

    
255
  def ValidateResult(self, ia, result):
256
    """Validates the result of an relocation request.
257

258
    """
259
    node2group = dict((name, ndata["group"])
260
                      for (name, ndata) in ia.in_data["nodes"].items())
261

    
262
    fn = compat.partial(self._NodesToGroups, node2group,
263
                        ia.in_data["nodegroups"])
264

    
265
    instance = ia.cfg.GetInstanceInfo(self.name)
266
    request_groups = fn(self.relocate_from + [instance.primary_node])
267
    result_groups = fn(result + [instance.primary_node])
268

    
269
    if ia.success and not set(result_groups).issubset(request_groups):
270
      raise errors.OpExecError("Groups of nodes returned by iallocator (%s)"
271
                               " differ from original groups (%s)" %
272
                               (utils.CommaJoin(result_groups),
273
                                utils.CommaJoin(request_groups)))
274

    
275
  @staticmethod
276
  def _NodesToGroups(node2group, groups, nodes):
277
    """Returns a list of unique group names for a list of nodes.
278

279
    @type node2group: dict
280
    @param node2group: Map from node name to group UUID
281
    @type groups: dict
282
    @param groups: Group information
283
    @type nodes: list
284
    @param nodes: Node names
285

286
    """
287
    result = set()
288

    
289
    for node in nodes:
290
      try:
291
        group_uuid = node2group[node]
292
      except KeyError:
293
        # Ignore unknown node
294
        pass
295
      else:
296
        try:
297
          group = groups[group_uuid]
298
        except KeyError:
299
          # Can't find group, let's use UUID
300
          group_name = group_uuid
301
        else:
302
          group_name = group["name"]
303

    
304
        result.add(group_name)
305

    
306
    return sorted(result)
307

    
308

    
309
class IAReqNodeEvac(IARequestBase):
310
  """A node evacuation request.
311

312
  """
313
  MODE = constants.IALLOCATOR_MODE_NODE_EVAC
314
  REQ_PARAMS = [
315
    ("instances", _STRING_LIST),
316
    ("evac_mode", ht.TElemOf(constants.IALLOCATOR_NEVAC_MODES)),
317
    ]
318
  REQ_RESULT = _NEVAC_RESULT
319

    
320
  def GetRequest(self, cfg):
321
    """Get data for node-evacuate requests.
322

323
    """
324
    return {
325
      "instances": self.instances,
326
      "evac_mode": self.evac_mode,
327
      }
328

    
329

    
330
class IAReqGroupChange(IARequestBase):
331
  """A group change request.
332

333
  """
334
  MODE = constants.IALLOCATOR_MODE_CHG_GROUP
335
  REQ_PARAMS = [
336
    ("instances", _STRING_LIST),
337
    ("target_groups", _STRING_LIST),
338
    ]
339
  REQ_RESULT = _NEVAC_RESULT
340

    
341
  def GetRequest(self, cfg):
342
    """Get data for node-evacuate requests.
343

344
    """
345
    return {
346
      "instances": self.instances,
347
      "target_groups": self.target_groups,
348
      }
349

    
350

    
351
class IAllocator(object):
352
  """IAllocator framework.
353

354
  An IAllocator instance has three sets of attributes:
355
    - cfg that is needed to query the cluster
356
    - input data (all members of the _KEYS class attribute are required)
357
    - four buffer attributes (in|out_data|text), that represent the
358
      input (to the external script) in text and data structure format,
359
      and the output from it, again in two formats
360
    - the result variables from the script (success, info, nodes) for
361
      easy usage
362

363
  """
364
  # pylint: disable=R0902
365
  # lots of instance attributes
366

    
367
  def __init__(self, cfg, rpc_runner, req):
368
    self.cfg = cfg
369
    self.rpc = rpc_runner
370
    self.req = req
371
    # init buffer variables
372
    self.in_text = self.out_text = self.in_data = self.out_data = None
373
    # init result fields
374
    self.success = self.info = self.result = None
375

    
376
    self._BuildInputData(req)
377

    
378
  def _ComputeClusterData(self):
379
    """Compute the generic allocator input data.
380

381
    This is the data that is independent of the actual operation.
382

383
    """
384
    cfg = self.cfg
385
    cluster_info = cfg.GetClusterInfo()
386
    # cluster data
387
    data = {
388
      "version": constants.IALLOCATOR_VERSION,
389
      "cluster_name": cfg.GetClusterName(),
390
      "cluster_tags": list(cluster_info.GetTags()),
391
      "enabled_hypervisors": list(cluster_info.enabled_hypervisors),
392
      "ipolicy": cluster_info.ipolicy,
393
      }
394
    ninfo = cfg.GetAllNodesInfo()
395
    iinfo = cfg.GetAllInstancesInfo().values()
396
    i_list = [(inst, cluster_info.FillBE(inst)) for inst in iinfo]
397

    
398
    # node data
399
    node_list = [n.name for n in ninfo.values() if n.vm_capable]
400

    
401
    if isinstance(self.req, IAReqInstanceAlloc):
402
      hypervisor_name = self.req.hypervisor
403
    elif isinstance(self.req, IAReqRelocate):
404
      hypervisor_name = cfg.GetInstanceInfo(self.req.name).hypervisor
405
    else:
406
      hypervisor_name = cluster_info.primary_hypervisor
407

    
408
    node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()],
409
                                        [hypervisor_name])
410
    node_iinfo = \
411
      self.rpc.call_all_instances_info(node_list,
412
                                       cluster_info.enabled_hypervisors)
413

    
414
    data["nodegroups"] = self._ComputeNodeGroupData(cfg)
415

    
416
    config_ndata = self._ComputeBasicNodeData(cfg, ninfo)
417
    data["nodes"] = self._ComputeDynamicNodeData(ninfo, node_data, node_iinfo,
418
                                                 i_list, config_ndata)
419
    assert len(data["nodes"]) == len(ninfo), \
420
        "Incomplete node data computed"
421

    
422
    data["instances"] = self._ComputeInstanceData(cluster_info, i_list)
423

    
424
    self.in_data = data
425

    
426
  @staticmethod
427
  def _ComputeNodeGroupData(cfg):
428
    """Compute node groups data.
429

430
    """
431
    cluster = cfg.GetClusterInfo()
432
    ng = dict((guuid, {
433
      "name": gdata.name,
434
      "alloc_policy": gdata.alloc_policy,
435
      "ipolicy": gmi.CalculateGroupIPolicy(cluster, gdata),
436
      })
437
      for guuid, gdata in cfg.GetAllNodeGroupsInfo().items())
438

    
439
    return ng
440

    
441
  @staticmethod
442
  def _ComputeBasicNodeData(cfg, node_cfg):
443
    """Compute global node data.
444

445
    @rtype: dict
446
    @returns: a dict of name: (node dict, node config)
447

448
    """
449
    # fill in static (config-based) values
450
    node_results = dict((ninfo.name, {
451
      "tags": list(ninfo.GetTags()),
452
      "primary_ip": ninfo.primary_ip,
453
      "secondary_ip": ninfo.secondary_ip,
454
      "offline": ninfo.offline,
455
      "drained": ninfo.drained,
456
      "master_candidate": ninfo.master_candidate,
457
      "group": ninfo.group,
458
      "master_capable": ninfo.master_capable,
459
      "vm_capable": ninfo.vm_capable,
460
      "ndparams": cfg.GetNdParams(ninfo),
461
      })
462
      for ninfo in node_cfg.values())
463

    
464
    return node_results
465

    
466
  @staticmethod
467
  def _ComputeDynamicNodeData(node_cfg, node_data, node_iinfo, i_list,
468
                              node_results):
469
    """Compute global node data.
470

471
    @param node_results: the basic node structures as filled from the config
472

473
    """
474
    #TODO(dynmem): compute the right data on MAX and MIN memory
475
    # make a copy of the current dict
476
    node_results = dict(node_results)
477
    for nname, nresult in node_data.items():
478
      assert nname in node_results, "Missing basic data for node %s" % nname
479
      ninfo = node_cfg[nname]
480

    
481
      if not (ninfo.offline or ninfo.drained):
482
        nresult.Raise("Can't get data for node %s" % nname)
483
        node_iinfo[nname].Raise("Can't get node instance info from node %s" %
484
                                nname)
485
        remote_info = rpc.MakeLegacyNodeInfo(nresult.payload)
486

    
487
        for attr in ["memory_total", "memory_free", "memory_dom0",
488
                     "vg_size", "vg_free", "cpu_total"]:
489
          if attr not in remote_info:
490
            raise errors.OpExecError("Node '%s' didn't return attribute"
491
                                     " '%s'" % (nname, attr))
492
          if not isinstance(remote_info[attr], int):
493
            raise errors.OpExecError("Node '%s' returned invalid value"
494
                                     " for '%s': %s" %
495
                                     (nname, attr, remote_info[attr]))
496
        # compute memory used by primary instances
497
        i_p_mem = i_p_up_mem = 0
498
        for iinfo, beinfo in i_list:
499
          if iinfo.primary_node == nname:
500
            i_p_mem += beinfo[constants.BE_MAXMEM]
501
            if iinfo.name not in node_iinfo[nname].payload:
502
              i_used_mem = 0
503
            else:
504
              i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"])
505
            i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem
506
            remote_info["memory_free"] -= max(0, i_mem_diff)
507

    
508
            if iinfo.admin_state == constants.ADMINST_UP:
509
              i_p_up_mem += beinfo[constants.BE_MAXMEM]
510

    
511
        # compute memory used by instances
512
        pnr_dyn = {
513
          "total_memory": remote_info["memory_total"],
514
          "reserved_memory": remote_info["memory_dom0"],
515
          "free_memory": remote_info["memory_free"],
516
          "total_disk": remote_info["vg_size"],
517
          "free_disk": remote_info["vg_free"],
518
          "total_cpus": remote_info["cpu_total"],
519
          "i_pri_memory": i_p_mem,
520
          "i_pri_up_memory": i_p_up_mem,
521
          }
522
        pnr_dyn.update(node_results[nname])
523
        node_results[nname] = pnr_dyn
524

    
525
    return node_results
526

    
527
  @staticmethod
528
  def _ComputeInstanceData(cluster_info, i_list):
529
    """Compute global instance data.
530

531
    """
532
    instance_data = {}
533
    for iinfo, beinfo in i_list:
534
      nic_data = []
535
      for nic in iinfo.nics:
536
        filled_params = cluster_info.SimpleFillNIC(nic.nicparams)
537
        nic_dict = {
538
          "mac": nic.mac,
539
          "ip": nic.ip,
540
          "mode": filled_params[constants.NIC_MODE],
541
          "link": filled_params[constants.NIC_LINK],
542
          }
543
        if filled_params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
544
          nic_dict["bridge"] = filled_params[constants.NIC_LINK]
545
        nic_data.append(nic_dict)
546
      pir = {
547
        "tags": list(iinfo.GetTags()),
548
        "admin_state": iinfo.admin_state,
549
        "vcpus": beinfo[constants.BE_VCPUS],
550
        "memory": beinfo[constants.BE_MAXMEM],
551
        "spindle_use": beinfo[constants.BE_SPINDLE_USE],
552
        "os": iinfo.os,
553
        "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes),
554
        "nics": nic_data,
555
        "disks": [{constants.IDISK_SIZE: dsk.size,
556
                   constants.IDISK_MODE: dsk.mode}
557
                  for dsk in iinfo.disks],
558
        "disk_template": iinfo.disk_template,
559
        "hypervisor": iinfo.hypervisor,
560
        }
561
      pir["disk_space_total"] = gmi.ComputeDiskSize(iinfo.disk_template,
562
                                                    pir["disks"])
563
      instance_data[iinfo.name] = pir
564

    
565
    return instance_data
566

    
567
  def _BuildInputData(self, req):
568
    """Build input data structures.
569

570
    """
571
    self._ComputeClusterData()
572

    
573
    request = req.GetRequest(self.cfg)
574
    request["type"] = req.MODE
575
    self.in_data["request"] = request
576

    
577
    self.in_text = serializer.Dump(self.in_data)
578

    
579
  def Run(self, name, validate=True, call_fn=None):
580
    """Run an instance allocator and return the results.
581

582
    """
583
    if call_fn is None:
584
      call_fn = self.rpc.call_iallocator_runner
585

    
586
    result = call_fn(self.cfg.GetMasterNode(), name, self.in_text)
587
    result.Raise("Failure while running the iallocator script")
588

    
589
    self.required_nodes = self.req.required_nodes
590
    self.out_text = result.payload
591
    if validate:
592
      self._ValidateResult()
593

    
594
  def _ValidateResult(self):
595
    """Process the allocator results.
596

597
    This will process and if successful save the result in
598
    self.out_data and the other parameters.
599

600
    """
601
    try:
602
      rdict = serializer.Load(self.out_text)
603
    except Exception, err:
604
      raise errors.OpExecError("Can't parse iallocator results: %s" % str(err))
605

    
606
    if not isinstance(rdict, dict):
607
      raise errors.OpExecError("Can't parse iallocator results: not a dict")
608

    
609
    # TODO: remove backwards compatiblity in later versions
610
    if "nodes" in rdict and "result" not in rdict:
611
      rdict["result"] = rdict["nodes"]
612
      del rdict["nodes"]
613

    
614
    for key in "success", "info", "result":
615
      if key not in rdict:
616
        raise errors.OpExecError("Can't parse iallocator results:"
617
                                 " missing key '%s'" % key)
618
      setattr(self, key, rdict[key])
619

    
620
    self.req.ValidateResult(self, self.result)
621
    self.out_data = rdict