Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / common.py @ 7352d33b

History | View | Annotate | Download (24.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Common functions used by multiple logical units."""
23
import copy
24
import os
25

    
26
from ganeti import constants
27
from ganeti import errors
28
from ganeti import hypervisor
29
from ganeti import locking
30
from ganeti import objects
31
from ganeti import pathutils
32
from ganeti import rpc
33
from ganeti import ssconf
34
from ganeti import utils
35

    
36

    
37
def _ExpandItemName(fn, name, kind):
38
  """Expand an item name.
39

40
  @param fn: the function to use for expansion
41
  @param name: requested item name
42
  @param kind: text description ('Node' or 'Instance')
43
  @return: the resolved (full) name
44
  @raise errors.OpPrereqError: if the item is not found
45

46
  """
47
  full_name = fn(name)
48
  if full_name is None:
49
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
50
                               errors.ECODE_NOENT)
51
  return full_name
52

    
53

    
54
def _ExpandInstanceName(cfg, name):
55
  """Wrapper over L{_ExpandItemName} for instance."""
56
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
57

    
58

    
59
def _ExpandNodeName(cfg, name):
60
  """Wrapper over L{_ExpandItemName} for nodes."""
61
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
62

    
63

    
64
def _ShareAll():
65
  """Returns a dict declaring all lock levels shared.
66

67
  """
68
  return dict.fromkeys(locking.LEVELS, 1)
69

    
70

    
71
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
72
  """Checks if the instances in a node group are still correct.
73

74
  @type cfg: L{config.ConfigWriter}
75
  @param cfg: The cluster configuration
76
  @type group_uuid: string
77
  @param group_uuid: Node group UUID
78
  @type owned_instances: set or frozenset
79
  @param owned_instances: List of currently owned instances
80

81
  """
82
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
83
  if owned_instances != wanted_instances:
84
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
85
                               " locks were acquired, wanted '%s', have '%s';"
86
                               " retry the operation" %
87
                               (group_uuid,
88
                                utils.CommaJoin(wanted_instances),
89
                                utils.CommaJoin(owned_instances)),
90
                               errors.ECODE_STATE)
91

    
92
  return wanted_instances
93

    
94

    
95
def _GetWantedNodes(lu, nodes):
96
  """Returns list of checked and expanded node names.
97

98
  @type lu: L{LogicalUnit}
99
  @param lu: the logical unit on whose behalf we execute
100
  @type nodes: list
101
  @param nodes: list of node names or None for all nodes
102
  @rtype: list
103
  @return: the list of nodes, sorted
104
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
105

106
  """
107
  if nodes:
108
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
109

    
110
  return utils.NiceSort(lu.cfg.GetNodeList())
111

    
112

    
113
def _GetWantedInstances(lu, instances):
114
  """Returns list of checked and expanded instance names.
115

116
  @type lu: L{LogicalUnit}
117
  @param lu: the logical unit on whose behalf we execute
118
  @type instances: list
119
  @param instances: list of instance names or None for all instances
120
  @rtype: list
121
  @return: the list of instances, sorted
122
  @raise errors.OpPrereqError: if the instances parameter is wrong type
123
  @raise errors.OpPrereqError: if any of the passed instances is not found
124

125
  """
126
  if instances:
127
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
128
  else:
129
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
130
  return wanted
131

    
132

    
133
def _RunPostHook(lu, node_name):
134
  """Runs the post-hook for an opcode on a single node.
135

136
  """
137
  hm = lu.proc.BuildHooksManager(lu)
138
  try:
139
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
140
  except Exception, err: # pylint: disable=W0703
141
    lu.LogWarning("Errors occurred running hooks on %s: %s",
142
                  node_name, err)
143

    
144

    
145
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
146
  """Distribute additional files which are part of the cluster configuration.
147

148
  ConfigWriter takes care of distributing the config and ssconf files, but
149
  there are more files which should be distributed to all nodes. This function
150
  makes sure those are copied.
151

152
  @param lu: calling logical unit
153
  @param additional_nodes: list of nodes not in the config to distribute to
154
  @type additional_vm: boolean
155
  @param additional_vm: whether the additional nodes are vm-capable or not
156

157
  """
158
  # Gather target nodes
159
  cluster = lu.cfg.GetClusterInfo()
160
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
161

    
162
  online_nodes = lu.cfg.GetOnlineNodeList()
163
  online_set = frozenset(online_nodes)
164
  vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList()))
165

    
166
  if additional_nodes is not None:
167
    online_nodes.extend(additional_nodes)
168
    if additional_vm:
169
      vm_nodes.extend(additional_nodes)
170

    
171
  # Never distribute to master node
172
  for nodelist in [online_nodes, vm_nodes]:
173
    if master_info.name in nodelist:
174
      nodelist.remove(master_info.name)
175

    
176
  # Gather file lists
177
  (files_all, _, files_mc, files_vm) = \
178
    _ComputeAncillaryFiles(cluster, True)
179

    
180
  # Never re-distribute configuration file from here
181
  assert not (pathutils.CLUSTER_CONF_FILE in files_all or
182
              pathutils.CLUSTER_CONF_FILE in files_vm)
183
  assert not files_mc, "Master candidates not handled in this function"
184

    
185
  filemap = [
186
    (online_nodes, files_all),
187
    (vm_nodes, files_vm),
188
    ]
189

    
190
  # Upload the files
191
  for (node_list, files) in filemap:
192
    for fname in files:
193
      _UploadHelper(lu, node_list, fname)
194

    
195

    
196
def _ComputeAncillaryFiles(cluster, redist):
197
  """Compute files external to Ganeti which need to be consistent.
198

199
  @type redist: boolean
200
  @param redist: Whether to include files which need to be redistributed
201

202
  """
203
  # Compute files for all nodes
204
  files_all = set([
205
    pathutils.SSH_KNOWN_HOSTS_FILE,
206
    pathutils.CONFD_HMAC_KEY,
207
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
208
    pathutils.SPICE_CERT_FILE,
209
    pathutils.SPICE_CACERT_FILE,
210
    pathutils.RAPI_USERS_FILE,
211
    ])
212

    
213
  if redist:
214
    # we need to ship at least the RAPI certificate
215
    files_all.add(pathutils.RAPI_CERT_FILE)
216
  else:
217
    files_all.update(pathutils.ALL_CERT_FILES)
218
    files_all.update(ssconf.SimpleStore().GetFileList())
219

    
220
  if cluster.modify_etc_hosts:
221
    files_all.add(pathutils.ETC_HOSTS)
222

    
223
  if cluster.use_external_mip_script:
224
    files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
225

    
226
  # Files which are optional, these must:
227
  # - be present in one other category as well
228
  # - either exist or not exist on all nodes of that category (mc, vm all)
229
  files_opt = set([
230
    pathutils.RAPI_USERS_FILE,
231
    ])
232

    
233
  # Files which should only be on master candidates
234
  files_mc = set()
235

    
236
  if not redist:
237
    files_mc.add(pathutils.CLUSTER_CONF_FILE)
238

    
239
  # File storage
240
  if (not redist and (constants.ENABLE_FILE_STORAGE or
241
                        constants.ENABLE_SHARED_FILE_STORAGE)):
242
    files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
243
    files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
244

    
245
  # Files which should only be on VM-capable nodes
246
  files_vm = set(
247
    filename
248
    for hv_name in cluster.enabled_hypervisors
249
    for filename in
250
    hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
251

    
252
  files_opt |= set(
253
    filename
254
    for hv_name in cluster.enabled_hypervisors
255
    for filename in
256
    hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
257

    
258
  # Filenames in each category must be unique
259
  all_files_set = files_all | files_mc | files_vm
260
  assert (len(all_files_set) ==
261
          sum(map(len, [files_all, files_mc, files_vm]))), \
262
    "Found file listed in more than one file list"
263

    
264
  # Optional files must be present in one other category
265
  assert all_files_set.issuperset(files_opt), \
266
    "Optional file not in a different required list"
267

    
268
  # This one file should never ever be re-distributed via RPC
269
  assert not (redist and
270
              pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
271

    
272
  return (files_all, files_opt, files_mc, files_vm)
273

    
274

    
275
def _UploadHelper(lu, nodes, fname):
276
  """Helper for uploading a file and showing warnings.
277

278
  """
279
  if os.path.exists(fname):
280
    result = lu.rpc.call_upload_file(nodes, fname)
281
    for to_node, to_result in result.items():
282
      msg = to_result.fail_msg
283
      if msg:
284
        msg = ("Copy of file %s to node %s failed: %s" %
285
               (fname, to_node, msg))
286
        lu.LogWarning(msg)
287

    
288

    
289
def _MergeAndVerifyHvState(op_input, obj_input):
290
  """Combines the hv state from an opcode with the one of the object
291

292
  @param op_input: The input dict from the opcode
293
  @param obj_input: The input dict from the objects
294
  @return: The verified and updated dict
295

296
  """
297
  if op_input:
298
    invalid_hvs = set(op_input) - constants.HYPER_TYPES
299
    if invalid_hvs:
300
      raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
301
                                 " %s" % utils.CommaJoin(invalid_hvs),
302
                                 errors.ECODE_INVAL)
303
    if obj_input is None:
304
      obj_input = {}
305
    type_check = constants.HVSTS_PARAMETER_TYPES
306
    return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
307

    
308
  return None
309

    
310

    
311
def _MergeAndVerifyDiskState(op_input, obj_input):
312
  """Combines the disk state from an opcode with the one of the object
313

314
  @param op_input: The input dict from the opcode
315
  @param obj_input: The input dict from the objects
316
  @return: The verified and updated dict
317
  """
318
  if op_input:
319
    invalid_dst = set(op_input) - constants.DS_VALID_TYPES
320
    if invalid_dst:
321
      raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
322
                                 utils.CommaJoin(invalid_dst),
323
                                 errors.ECODE_INVAL)
324
    type_check = constants.DSS_PARAMETER_TYPES
325
    if obj_input is None:
326
      obj_input = {}
327
    return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
328
                                              type_check))
329
                for key, value in op_input.items())
330

    
331
  return None
332

    
333

    
334
def _CheckOSParams(lu, required, nodenames, osname, osparams):
335
  """OS parameters validation.
336

337
  @type lu: L{LogicalUnit}
338
  @param lu: the logical unit for which we check
339
  @type required: boolean
340
  @param required: whether the validation should fail if the OS is not
341
      found
342
  @type nodenames: list
343
  @param nodenames: the list of nodes on which we should check
344
  @type osname: string
345
  @param osname: the name of the hypervisor we should use
346
  @type osparams: dict
347
  @param osparams: the parameters which we need to check
348
  @raise errors.OpPrereqError: if the parameters are not valid
349

350
  """
351
  nodenames = _FilterVmNodes(lu, nodenames)
352
  result = lu.rpc.call_os_validate(nodenames, required, osname,
353
                                   [constants.OS_VALIDATE_PARAMETERS],
354
                                   osparams)
355
  for node, nres in result.items():
356
    # we don't check for offline cases since this should be run only
357
    # against the master node and/or an instance's nodes
358
    nres.Raise("OS Parameters validation failed on node %s" % node)
359
    if not nres.payload:
360
      lu.LogInfo("OS %s not found on node %s, validation skipped",
361
                 osname, node)
362

    
363

    
364
def _CheckHVParams(lu, nodenames, hvname, hvparams):
365
  """Hypervisor parameter validation.
366

367
  This function abstract the hypervisor parameter validation to be
368
  used in both instance create and instance modify.
369

370
  @type lu: L{LogicalUnit}
371
  @param lu: the logical unit for which we check
372
  @type nodenames: list
373
  @param nodenames: the list of nodes on which we should check
374
  @type hvname: string
375
  @param hvname: the name of the hypervisor we should use
376
  @type hvparams: dict
377
  @param hvparams: the parameters which we need to check
378
  @raise errors.OpPrereqError: if the parameters are not valid
379

380
  """
381
  nodenames = _FilterVmNodes(lu, nodenames)
382

    
383
  cluster = lu.cfg.GetClusterInfo()
384
  hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
385

    
386
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull)
387
  for node in nodenames:
388
    info = hvinfo[node]
389
    if info.offline:
390
      continue
391
    info.Raise("Hypervisor parameter validation failed on node %s" % node)
392

    
393

    
394
def _AdjustCandidatePool(lu, exceptions):
395
  """Adjust the candidate pool after node operations.
396

397
  """
398
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
399
  if mod_list:
400
    lu.LogInfo("Promoted nodes to master candidate role: %s",
401
               utils.CommaJoin(node.name for node in mod_list))
402
    for name in mod_list:
403
      lu.context.ReaddNode(name)
404
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
405
  if mc_now > mc_max:
406
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
407
               (mc_now, mc_max))
408

    
409

    
410
def _CheckNodePVs(nresult, exclusive_storage):
411
  """Check node PVs.
412

413
  """
414
  pvlist_dict = nresult.get(constants.NV_PVLIST, None)
415
  if pvlist_dict is None:
416
    return (["Can't get PV list from node"], None)
417
  pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
418
  errlist = []
419
  # check that ':' is not present in PV names, since it's a
420
  # special character for lvcreate (denotes the range of PEs to
421
  # use on the PV)
422
  for pv in pvlist:
423
    if ":" in pv.name:
424
      errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
425
                     (pv.name, pv.vg_name))
426
  es_pvinfo = None
427
  if exclusive_storage:
428
    (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
429
    errlist.extend(errmsgs)
430
    shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
431
    if shared_pvs:
432
      for (pvname, lvlist) in shared_pvs:
433
        # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
434
        errlist.append("PV %s is shared among unrelated LVs (%s)" %
435
                       (pvname, utils.CommaJoin(lvlist)))
436
  return (errlist, es_pvinfo)
437

    
438

    
439
def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
440
  """Computes if value is in the desired range.
441

442
  @param name: name of the parameter for which we perform the check
443
  @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
444
      not just 'disk')
445
  @param ispecs: dictionary containing min and max values
446
  @param value: actual value that we want to use
447
  @return: None or an error string
448

449
  """
450
  if value in [None, constants.VALUE_AUTO]:
451
    return None
452
  max_v = ispecs[constants.ISPECS_MAX].get(name, value)
453
  min_v = ispecs[constants.ISPECS_MIN].get(name, value)
454
  if value > max_v or min_v > value:
455
    if qualifier:
456
      fqn = "%s/%s" % (name, qualifier)
457
    else:
458
      fqn = name
459
    return ("%s value %s is not in range [%s, %s]" %
460
            (fqn, value, min_v, max_v))
461
  return None
462

    
463

    
464
def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
465
                                 nic_count, disk_sizes, spindle_use,
466
                                 disk_template,
467
                                 _compute_fn=_ComputeMinMaxSpec):
468
  """Verifies ipolicy against provided specs.
469

470
  @type ipolicy: dict
471
  @param ipolicy: The ipolicy
472
  @type mem_size: int
473
  @param mem_size: The memory size
474
  @type cpu_count: int
475
  @param cpu_count: Used cpu cores
476
  @type disk_count: int
477
  @param disk_count: Number of disks used
478
  @type nic_count: int
479
  @param nic_count: Number of nics used
480
  @type disk_sizes: list of ints
481
  @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
482
  @type spindle_use: int
483
  @param spindle_use: The number of spindles this instance uses
484
  @type disk_template: string
485
  @param disk_template: The disk template of the instance
486
  @param _compute_fn: The compute function (unittest only)
487
  @return: A list of violations, or an empty list of no violations are found
488

489
  """
490
  assert disk_count == len(disk_sizes)
491

    
492
  test_settings = [
493
    (constants.ISPEC_MEM_SIZE, "", mem_size),
494
    (constants.ISPEC_CPU_COUNT, "", cpu_count),
495
    (constants.ISPEC_NIC_COUNT, "", nic_count),
496
    (constants.ISPEC_SPINDLE_USE, "", spindle_use),
497
    ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
498
         for idx, d in enumerate(disk_sizes)]
499
  if disk_template != constants.DT_DISKLESS:
500
    # This check doesn't make sense for diskless instances
501
    test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
502
  ret = []
503
  allowed_dts = ipolicy[constants.IPOLICY_DTS]
504
  if disk_template not in allowed_dts:
505
    ret.append("Disk template %s is not allowed (allowed templates: %s)" %
506
               (disk_template, utils.CommaJoin(allowed_dts)))
507

    
508
  min_errs = None
509
  for minmax in ipolicy[constants.ISPECS_MINMAX]:
510
    errs = filter(None,
511
                  (_compute_fn(name, qualifier, minmax, value)
512
                   for (name, qualifier, value) in test_settings))
513
    if min_errs is None or len(errs) < len(min_errs):
514
      min_errs = errs
515
  assert min_errs is not None
516
  return ret + min_errs
517

    
518

    
519
def _ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
520
                                     _compute_fn=_ComputeIPolicySpecViolation):
521
  """Compute if instance meets the specs of ipolicy.
522

523
  @type ipolicy: dict
524
  @param ipolicy: The ipolicy to verify against
525
  @type instance: L{objects.Instance}
526
  @param instance: The instance to verify
527
  @type cfg: L{config.ConfigWriter}
528
  @param cfg: Cluster configuration
529
  @param _compute_fn: The function to verify ipolicy (unittest only)
530
  @see: L{_ComputeIPolicySpecViolation}
531

532
  """
533
  be_full = cfg.GetClusterInfo().FillBE(instance)
534
  mem_size = be_full[constants.BE_MAXMEM]
535
  cpu_count = be_full[constants.BE_VCPUS]
536
  spindle_use = be_full[constants.BE_SPINDLE_USE]
537
  disk_count = len(instance.disks)
538
  disk_sizes = [disk.size for disk in instance.disks]
539
  nic_count = len(instance.nics)
540
  disk_template = instance.disk_template
541

    
542
  return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
543
                     disk_sizes, spindle_use, disk_template)
544

    
545

    
546
def _ComputeViolatingInstances(ipolicy, instances, cfg):
547
  """Computes a set of instances who violates given ipolicy.
548

549
  @param ipolicy: The ipolicy to verify
550
  @type instances: L{objects.Instance}
551
  @param instances: List of instances to verify
552
  @type cfg: L{config.ConfigWriter}
553
  @param cfg: Cluster configuration
554
  @return: A frozenset of instance names violating the ipolicy
555

556
  """
557
  return frozenset([inst.name for inst in instances
558
                    if _ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
559

    
560

    
561
def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
562
  """Computes a set of any instances that would violate the new ipolicy.
563

564
  @param old_ipolicy: The current (still in-place) ipolicy
565
  @param new_ipolicy: The new (to become) ipolicy
566
  @param instances: List of instances to verify
567
  @type cfg: L{config.ConfigWriter}
568
  @param cfg: Cluster configuration
569
  @return: A list of instances which violates the new ipolicy but
570
      did not before
571

572
  """
573
  return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
574
          _ComputeViolatingInstances(old_ipolicy, instances, cfg))
575

    
576

    
577
def _GetUpdatedParams(old_params, update_dict,
578
                      use_default=True, use_none=False):
579
  """Return the new version of a parameter dictionary.
580

581
  @type old_params: dict
582
  @param old_params: old parameters
583
  @type update_dict: dict
584
  @param update_dict: dict containing new parameter values, or
585
      constants.VALUE_DEFAULT to reset the parameter to its default
586
      value
587
  @param use_default: boolean
588
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
589
      values as 'to be deleted' values
590
  @param use_none: boolean
591
  @type use_none: whether to recognise C{None} values as 'to be
592
      deleted' values
593
  @rtype: dict
594
  @return: the new parameter dictionary
595

596
  """
597
  params_copy = copy.deepcopy(old_params)
598
  for key, val in update_dict.iteritems():
599
    if ((use_default and val == constants.VALUE_DEFAULT) or
600
          (use_none and val is None)):
601
      try:
602
        del params_copy[key]
603
      except KeyError:
604
        pass
605
    else:
606
      params_copy[key] = val
607
  return params_copy
608

    
609

    
610
def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
611
  """Return the new version of an instance policy.
612

613
  @param group_policy: whether this policy applies to a group and thus
614
    we should support removal of policy entries
615

616
  """
617
  ipolicy = copy.deepcopy(old_ipolicy)
618
  for key, value in new_ipolicy.items():
619
    if key not in constants.IPOLICY_ALL_KEYS:
620
      raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
621
                                 errors.ECODE_INVAL)
622
    if (not value or value == [constants.VALUE_DEFAULT] or
623
            value == constants.VALUE_DEFAULT):
624
      if group_policy:
625
        if key in ipolicy:
626
          del ipolicy[key]
627
      else:
628
        raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
629
                                   " on the cluster'" % key,
630
                                   errors.ECODE_INVAL)
631
    else:
632
      if key in constants.IPOLICY_PARAMETERS:
633
        # FIXME: we assume all such values are float
634
        try:
635
          ipolicy[key] = float(value)
636
        except (TypeError, ValueError), err:
637
          raise errors.OpPrereqError("Invalid value for attribute"
638
                                     " '%s': '%s', error: %s" %
639
                                     (key, value, err), errors.ECODE_INVAL)
640
      elif key == constants.ISPECS_MINMAX:
641
        for minmax in value:
642
          for k in minmax.keys():
643
            utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
644
        ipolicy[key] = value
645
      elif key == constants.ISPECS_STD:
646
        if group_policy:
647
          msg = "%s cannot appear in group instance specs" % key
648
          raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
649
        ipolicy[key] = _GetUpdatedParams(old_ipolicy.get(key, {}), value,
650
                                         use_none=False, use_default=False)
651
        utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
652
      else:
653
        # FIXME: we assume all others are lists; this should be redone
654
        # in a nicer way
655
        ipolicy[key] = list(value)
656
  try:
657
    objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
658
  except errors.ConfigurationError, err:
659
    raise errors.OpPrereqError("Invalid instance policy: %s" % err,
660
                               errors.ECODE_INVAL)
661
  return ipolicy
662

    
663

    
664
def _AnnotateDiskParams(instance, devs, cfg):
665
  """Little helper wrapper to the rpc annotation method.
666

667
  @param instance: The instance object
668
  @type devs: List of L{objects.Disk}
669
  @param devs: The root devices (not any of its children!)
670
  @param cfg: The config object
671
  @returns The annotated disk copies
672
  @see L{rpc.AnnotateDiskParams}
673

674
  """
675
  return rpc.AnnotateDiskParams(instance.disk_template, devs,
676
                                cfg.GetInstanceDiskParams(instance))
677

    
678

    
679
def _SupportsOob(cfg, node):
680
  """Tells if node supports OOB.
681

682
  @type cfg: L{config.ConfigWriter}
683
  @param cfg: The cluster configuration
684
  @type node: L{objects.Node}
685
  @param node: The node
686
  @return: The OOB script if supported or an empty string otherwise
687

688
  """
689
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
690

    
691

    
692
def _UpdateAndVerifySubDict(base, updates, type_check):
693
  """Updates and verifies a dict with sub dicts of the same type.
694

695
  @param base: The dict with the old data
696
  @param updates: The dict with the new data
697
  @param type_check: Dict suitable to ForceDictType to verify correct types
698
  @returns: A new dict with updated and verified values
699

700
  """
701
  def fn(old, value):
702
    new = _GetUpdatedParams(old, value)
703
    utils.ForceDictType(new, type_check)
704
    return new
705

    
706
  ret = copy.deepcopy(base)
707
  ret.update(dict((key, fn(base.get(key, {}), value))
708
                  for key, value in updates.items()))
709
  return ret
710

    
711

    
712
def _FilterVmNodes(lu, nodenames):
713
  """Filters out non-vm_capable nodes from a list.
714

715
  @type lu: L{LogicalUnit}
716
  @param lu: the logical unit for which we check
717
  @type nodenames: list
718
  @param nodenames: the list of nodes on which we should check
719
  @rtype: list
720
  @return: the list of vm-capable nodes
721

722
  """
723
  vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
724
  return [name for name in nodenames if name not in vm_nodes]