Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / common.py @ f380d53c

History | View | Annotate | Download (29.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Common functions used by multiple logical units."""
23

    
24
import copy
25
import os
26

    
27
from ganeti import compat
28
from ganeti import constants
29
from ganeti import errors
30
from ganeti import hypervisor
31
from ganeti import locking
32
from ganeti import objects
33
from ganeti import opcodes
34
from ganeti import pathutils
35
from ganeti import rpc
36
from ganeti import ssconf
37
from ganeti import utils
38

    
39

    
40
def _ExpandItemName(fn, name, kind):
41
  """Expand an item name.
42

43
  @param fn: the function to use for expansion
44
  @param name: requested item name
45
  @param kind: text description ('Node' or 'Instance')
46
  @return: the resolved (full) name
47
  @raise errors.OpPrereqError: if the item is not found
48

49
  """
50
  full_name = fn(name)
51
  if full_name is None:
52
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
53
                               errors.ECODE_NOENT)
54
  return full_name
55

    
56

    
57
def _ExpandInstanceName(cfg, name):
58
  """Wrapper over L{_ExpandItemName} for instance."""
59
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
60

    
61

    
62
def _ExpandNodeName(cfg, name):
63
  """Wrapper over L{_ExpandItemName} for nodes."""
64
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
65

    
66

    
67
def _ShareAll():
68
  """Returns a dict declaring all lock levels shared.
69

70
  """
71
  return dict.fromkeys(locking.LEVELS, 1)
72

    
73

    
74
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
75
  """Checks if the instances in a node group are still correct.
76

77
  @type cfg: L{config.ConfigWriter}
78
  @param cfg: The cluster configuration
79
  @type group_uuid: string
80
  @param group_uuid: Node group UUID
81
  @type owned_instances: set or frozenset
82
  @param owned_instances: List of currently owned instances
83

84
  """
85
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
86
  if owned_instances != wanted_instances:
87
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
88
                               " locks were acquired, wanted '%s', have '%s';"
89
                               " retry the operation" %
90
                               (group_uuid,
91
                                utils.CommaJoin(wanted_instances),
92
                                utils.CommaJoin(owned_instances)),
93
                               errors.ECODE_STATE)
94

    
95
  return wanted_instances
96

    
97

    
98
def _GetWantedNodes(lu, nodes):
99
  """Returns list of checked and expanded node names.
100

101
  @type lu: L{LogicalUnit}
102
  @param lu: the logical unit on whose behalf we execute
103
  @type nodes: list
104
  @param nodes: list of node names or None for all nodes
105
  @rtype: list
106
  @return: the list of nodes, sorted
107
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
108

109
  """
110
  if nodes:
111
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
112

    
113
  return utils.NiceSort(lu.cfg.GetNodeList())
114

    
115

    
116
def _GetWantedInstances(lu, instances):
117
  """Returns list of checked and expanded instance names.
118

119
  @type lu: L{LogicalUnit}
120
  @param lu: the logical unit on whose behalf we execute
121
  @type instances: list
122
  @param instances: list of instance names or None for all instances
123
  @rtype: list
124
  @return: the list of instances, sorted
125
  @raise errors.OpPrereqError: if the instances parameter is wrong type
126
  @raise errors.OpPrereqError: if any of the passed instances is not found
127

128
  """
129
  if instances:
130
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
131
  else:
132
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
133
  return wanted
134

    
135

    
136
def _RunPostHook(lu, node_name):
137
  """Runs the post-hook for an opcode on a single node.
138

139
  """
140
  hm = lu.proc.BuildHooksManager(lu)
141
  try:
142
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
143
  except Exception, err: # pylint: disable=W0703
144
    lu.LogWarning("Errors occurred running hooks on %s: %s",
145
                  node_name, err)
146

    
147

    
148
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
149
  """Distribute additional files which are part of the cluster configuration.
150

151
  ConfigWriter takes care of distributing the config and ssconf files, but
152
  there are more files which should be distributed to all nodes. This function
153
  makes sure those are copied.
154

155
  @param lu: calling logical unit
156
  @param additional_nodes: list of nodes not in the config to distribute to
157
  @type additional_vm: boolean
158
  @param additional_vm: whether the additional nodes are vm-capable or not
159

160
  """
161
  # Gather target nodes
162
  cluster = lu.cfg.GetClusterInfo()
163
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
164

    
165
  online_nodes = lu.cfg.GetOnlineNodeList()
166
  online_set = frozenset(online_nodes)
167
  vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList()))
168

    
169
  if additional_nodes is not None:
170
    online_nodes.extend(additional_nodes)
171
    if additional_vm:
172
      vm_nodes.extend(additional_nodes)
173

    
174
  # Never distribute to master node
175
  for nodelist in [online_nodes, vm_nodes]:
176
    if master_info.name in nodelist:
177
      nodelist.remove(master_info.name)
178

    
179
  # Gather file lists
180
  (files_all, _, files_mc, files_vm) = \
181
    _ComputeAncillaryFiles(cluster, True)
182

    
183
  # Never re-distribute configuration file from here
184
  assert not (pathutils.CLUSTER_CONF_FILE in files_all or
185
              pathutils.CLUSTER_CONF_FILE in files_vm)
186
  assert not files_mc, "Master candidates not handled in this function"
187

    
188
  filemap = [
189
    (online_nodes, files_all),
190
    (vm_nodes, files_vm),
191
    ]
192

    
193
  # Upload the files
194
  for (node_list, files) in filemap:
195
    for fname in files:
196
      _UploadHelper(lu, node_list, fname)
197

    
198

    
199
def _ComputeAncillaryFiles(cluster, redist):
200
  """Compute files external to Ganeti which need to be consistent.
201

202
  @type redist: boolean
203
  @param redist: Whether to include files which need to be redistributed
204

205
  """
206
  # Compute files for all nodes
207
  files_all = set([
208
    pathutils.SSH_KNOWN_HOSTS_FILE,
209
    pathutils.CONFD_HMAC_KEY,
210
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
211
    pathutils.SPICE_CERT_FILE,
212
    pathutils.SPICE_CACERT_FILE,
213
    pathutils.RAPI_USERS_FILE,
214
    ])
215

    
216
  if redist:
217
    # we need to ship at least the RAPI certificate
218
    files_all.add(pathutils.RAPI_CERT_FILE)
219
  else:
220
    files_all.update(pathutils.ALL_CERT_FILES)
221
    files_all.update(ssconf.SimpleStore().GetFileList())
222

    
223
  if cluster.modify_etc_hosts:
224
    files_all.add(pathutils.ETC_HOSTS)
225

    
226
  if cluster.use_external_mip_script:
227
    files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
228

    
229
  # Files which are optional, these must:
230
  # - be present in one other category as well
231
  # - either exist or not exist on all nodes of that category (mc, vm all)
232
  files_opt = set([
233
    pathutils.RAPI_USERS_FILE,
234
    ])
235

    
236
  # Files which should only be on master candidates
237
  files_mc = set()
238

    
239
  if not redist:
240
    files_mc.add(pathutils.CLUSTER_CONF_FILE)
241

    
242
  # File storage
243
  if (not redist and (constants.ENABLE_FILE_STORAGE or
244
                        constants.ENABLE_SHARED_FILE_STORAGE)):
245
    files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
246
    files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
247

    
248
  # Files which should only be on VM-capable nodes
249
  files_vm = set(
250
    filename
251
    for hv_name in cluster.enabled_hypervisors
252
    for filename in
253
    hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
254

    
255
  files_opt |= set(
256
    filename
257
    for hv_name in cluster.enabled_hypervisors
258
    for filename in
259
    hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
260

    
261
  # Filenames in each category must be unique
262
  all_files_set = files_all | files_mc | files_vm
263
  assert (len(all_files_set) ==
264
          sum(map(len, [files_all, files_mc, files_vm]))), \
265
    "Found file listed in more than one file list"
266

    
267
  # Optional files must be present in one other category
268
  assert all_files_set.issuperset(files_opt), \
269
    "Optional file not in a different required list"
270

    
271
  # This one file should never ever be re-distributed via RPC
272
  assert not (redist and
273
              pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
274

    
275
  return (files_all, files_opt, files_mc, files_vm)
276

    
277

    
278
def _UploadHelper(lu, nodes, fname):
279
  """Helper for uploading a file and showing warnings.
280

281
  """
282
  if os.path.exists(fname):
283
    result = lu.rpc.call_upload_file(nodes, fname)
284
    for to_node, to_result in result.items():
285
      msg = to_result.fail_msg
286
      if msg:
287
        msg = ("Copy of file %s to node %s failed: %s" %
288
               (fname, to_node, msg))
289
        lu.LogWarning(msg)
290

    
291

    
292
def _MergeAndVerifyHvState(op_input, obj_input):
293
  """Combines the hv state from an opcode with the one of the object
294

295
  @param op_input: The input dict from the opcode
296
  @param obj_input: The input dict from the objects
297
  @return: The verified and updated dict
298

299
  """
300
  if op_input:
301
    invalid_hvs = set(op_input) - constants.HYPER_TYPES
302
    if invalid_hvs:
303
      raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
304
                                 " %s" % utils.CommaJoin(invalid_hvs),
305
                                 errors.ECODE_INVAL)
306
    if obj_input is None:
307
      obj_input = {}
308
    type_check = constants.HVSTS_PARAMETER_TYPES
309
    return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
310

    
311
  return None
312

    
313

    
314
def _MergeAndVerifyDiskState(op_input, obj_input):
315
  """Combines the disk state from an opcode with the one of the object
316

317
  @param op_input: The input dict from the opcode
318
  @param obj_input: The input dict from the objects
319
  @return: The verified and updated dict
320
  """
321
  if op_input:
322
    invalid_dst = set(op_input) - constants.DS_VALID_TYPES
323
    if invalid_dst:
324
      raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
325
                                 utils.CommaJoin(invalid_dst),
326
                                 errors.ECODE_INVAL)
327
    type_check = constants.DSS_PARAMETER_TYPES
328
    if obj_input is None:
329
      obj_input = {}
330
    return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
331
                                              type_check))
332
                for key, value in op_input.items())
333

    
334
  return None
335

    
336

    
337
def _CheckOSParams(lu, required, nodenames, osname, osparams):
338
  """OS parameters validation.
339

340
  @type lu: L{LogicalUnit}
341
  @param lu: the logical unit for which we check
342
  @type required: boolean
343
  @param required: whether the validation should fail if the OS is not
344
      found
345
  @type nodenames: list
346
  @param nodenames: the list of nodes on which we should check
347
  @type osname: string
348
  @param osname: the name of the hypervisor we should use
349
  @type osparams: dict
350
  @param osparams: the parameters which we need to check
351
  @raise errors.OpPrereqError: if the parameters are not valid
352

353
  """
354
  nodenames = _FilterVmNodes(lu, nodenames)
355
  result = lu.rpc.call_os_validate(nodenames, required, osname,
356
                                   [constants.OS_VALIDATE_PARAMETERS],
357
                                   osparams)
358
  for node, nres in result.items():
359
    # we don't check for offline cases since this should be run only
360
    # against the master node and/or an instance's nodes
361
    nres.Raise("OS Parameters validation failed on node %s" % node)
362
    if not nres.payload:
363
      lu.LogInfo("OS %s not found on node %s, validation skipped",
364
                 osname, node)
365

    
366

    
367
def _CheckHVParams(lu, nodenames, hvname, hvparams):
368
  """Hypervisor parameter validation.
369

370
  This function abstract the hypervisor parameter validation to be
371
  used in both instance create and instance modify.
372

373
  @type lu: L{LogicalUnit}
374
  @param lu: the logical unit for which we check
375
  @type nodenames: list
376
  @param nodenames: the list of nodes on which we should check
377
  @type hvname: string
378
  @param hvname: the name of the hypervisor we should use
379
  @type hvparams: dict
380
  @param hvparams: the parameters which we need to check
381
  @raise errors.OpPrereqError: if the parameters are not valid
382

383
  """
384
  nodenames = _FilterVmNodes(lu, nodenames)
385

    
386
  cluster = lu.cfg.GetClusterInfo()
387
  hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
388

    
389
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull)
390
  for node in nodenames:
391
    info = hvinfo[node]
392
    if info.offline:
393
      continue
394
    info.Raise("Hypervisor parameter validation failed on node %s" % node)
395

    
396

    
397
def _AdjustCandidatePool(lu, exceptions):
398
  """Adjust the candidate pool after node operations.
399

400
  """
401
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
402
  if mod_list:
403
    lu.LogInfo("Promoted nodes to master candidate role: %s",
404
               utils.CommaJoin(node.name for node in mod_list))
405
    for name in mod_list:
406
      lu.context.ReaddNode(name)
407
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
408
  if mc_now > mc_max:
409
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
410
               (mc_now, mc_max))
411

    
412

    
413
def _CheckNodePVs(nresult, exclusive_storage):
414
  """Check node PVs.
415

416
  """
417
  pvlist_dict = nresult.get(constants.NV_PVLIST, None)
418
  if pvlist_dict is None:
419
    return (["Can't get PV list from node"], None)
420
  pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
421
  errlist = []
422
  # check that ':' is not present in PV names, since it's a
423
  # special character for lvcreate (denotes the range of PEs to
424
  # use on the PV)
425
  for pv in pvlist:
426
    if ":" in pv.name:
427
      errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
428
                     (pv.name, pv.vg_name))
429
  es_pvinfo = None
430
  if exclusive_storage:
431
    (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
432
    errlist.extend(errmsgs)
433
    shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
434
    if shared_pvs:
435
      for (pvname, lvlist) in shared_pvs:
436
        # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
437
        errlist.append("PV %s is shared among unrelated LVs (%s)" %
438
                       (pvname, utils.CommaJoin(lvlist)))
439
  return (errlist, es_pvinfo)
440

    
441

    
442
def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
443
  """Computes if value is in the desired range.
444

445
  @param name: name of the parameter for which we perform the check
446
  @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
447
      not just 'disk')
448
  @param ispecs: dictionary containing min and max values
449
  @param value: actual value that we want to use
450
  @return: None or an error string
451

452
  """
453
  if value in [None, constants.VALUE_AUTO]:
454
    return None
455
  max_v = ispecs[constants.ISPECS_MAX].get(name, value)
456
  min_v = ispecs[constants.ISPECS_MIN].get(name, value)
457
  if value > max_v or min_v > value:
458
    if qualifier:
459
      fqn = "%s/%s" % (name, qualifier)
460
    else:
461
      fqn = name
462
    return ("%s value %s is not in range [%s, %s]" %
463
            (fqn, value, min_v, max_v))
464
  return None
465

    
466

    
467
def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
468
                                 nic_count, disk_sizes, spindle_use,
469
                                 disk_template,
470
                                 _compute_fn=_ComputeMinMaxSpec):
471
  """Verifies ipolicy against provided specs.
472

473
  @type ipolicy: dict
474
  @param ipolicy: The ipolicy
475
  @type mem_size: int
476
  @param mem_size: The memory size
477
  @type cpu_count: int
478
  @param cpu_count: Used cpu cores
479
  @type disk_count: int
480
  @param disk_count: Number of disks used
481
  @type nic_count: int
482
  @param nic_count: Number of nics used
483
  @type disk_sizes: list of ints
484
  @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
485
  @type spindle_use: int
486
  @param spindle_use: The number of spindles this instance uses
487
  @type disk_template: string
488
  @param disk_template: The disk template of the instance
489
  @param _compute_fn: The compute function (unittest only)
490
  @return: A list of violations, or an empty list of no violations are found
491

492
  """
493
  assert disk_count == len(disk_sizes)
494

    
495
  test_settings = [
496
    (constants.ISPEC_MEM_SIZE, "", mem_size),
497
    (constants.ISPEC_CPU_COUNT, "", cpu_count),
498
    (constants.ISPEC_NIC_COUNT, "", nic_count),
499
    (constants.ISPEC_SPINDLE_USE, "", spindle_use),
500
    ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
501
         for idx, d in enumerate(disk_sizes)]
502
  if disk_template != constants.DT_DISKLESS:
503
    # This check doesn't make sense for diskless instances
504
    test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
505
  ret = []
506
  allowed_dts = ipolicy[constants.IPOLICY_DTS]
507
  if disk_template not in allowed_dts:
508
    ret.append("Disk template %s is not allowed (allowed templates: %s)" %
509
               (disk_template, utils.CommaJoin(allowed_dts)))
510

    
511
  min_errs = None
512
  for minmax in ipolicy[constants.ISPECS_MINMAX]:
513
    errs = filter(None,
514
                  (_compute_fn(name, qualifier, minmax, value)
515
                   for (name, qualifier, value) in test_settings))
516
    if min_errs is None or len(errs) < len(min_errs):
517
      min_errs = errs
518
  assert min_errs is not None
519
  return ret + min_errs
520

    
521

    
522
def _ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
523
                                     _compute_fn=_ComputeIPolicySpecViolation):
524
  """Compute if instance meets the specs of ipolicy.
525

526
  @type ipolicy: dict
527
  @param ipolicy: The ipolicy to verify against
528
  @type instance: L{objects.Instance}
529
  @param instance: The instance to verify
530
  @type cfg: L{config.ConfigWriter}
531
  @param cfg: Cluster configuration
532
  @param _compute_fn: The function to verify ipolicy (unittest only)
533
  @see: L{_ComputeIPolicySpecViolation}
534

535
  """
536
  be_full = cfg.GetClusterInfo().FillBE(instance)
537
  mem_size = be_full[constants.BE_MAXMEM]
538
  cpu_count = be_full[constants.BE_VCPUS]
539
  spindle_use = be_full[constants.BE_SPINDLE_USE]
540
  disk_count = len(instance.disks)
541
  disk_sizes = [disk.size for disk in instance.disks]
542
  nic_count = len(instance.nics)
543
  disk_template = instance.disk_template
544

    
545
  return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
546
                     disk_sizes, spindle_use, disk_template)
547

    
548

    
549
def _ComputeViolatingInstances(ipolicy, instances, cfg):
550
  """Computes a set of instances who violates given ipolicy.
551

552
  @param ipolicy: The ipolicy to verify
553
  @type instances: L{objects.Instance}
554
  @param instances: List of instances to verify
555
  @type cfg: L{config.ConfigWriter}
556
  @param cfg: Cluster configuration
557
  @return: A frozenset of instance names violating the ipolicy
558

559
  """
560
  return frozenset([inst.name for inst in instances
561
                    if _ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
562

    
563

    
564
def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
565
  """Computes a set of any instances that would violate the new ipolicy.
566

567
  @param old_ipolicy: The current (still in-place) ipolicy
568
  @param new_ipolicy: The new (to become) ipolicy
569
  @param instances: List of instances to verify
570
  @type cfg: L{config.ConfigWriter}
571
  @param cfg: Cluster configuration
572
  @return: A list of instances which violates the new ipolicy but
573
      did not before
574

575
  """
576
  return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
577
          _ComputeViolatingInstances(old_ipolicy, instances, cfg))
578

    
579

    
580
def _GetUpdatedParams(old_params, update_dict,
581
                      use_default=True, use_none=False):
582
  """Return the new version of a parameter dictionary.
583

584
  @type old_params: dict
585
  @param old_params: old parameters
586
  @type update_dict: dict
587
  @param update_dict: dict containing new parameter values, or
588
      constants.VALUE_DEFAULT to reset the parameter to its default
589
      value
590
  @param use_default: boolean
591
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
592
      values as 'to be deleted' values
593
  @param use_none: boolean
594
  @type use_none: whether to recognise C{None} values as 'to be
595
      deleted' values
596
  @rtype: dict
597
  @return: the new parameter dictionary
598

599
  """
600
  params_copy = copy.deepcopy(old_params)
601
  for key, val in update_dict.iteritems():
602
    if ((use_default and val == constants.VALUE_DEFAULT) or
603
          (use_none and val is None)):
604
      try:
605
        del params_copy[key]
606
      except KeyError:
607
        pass
608
    else:
609
      params_copy[key] = val
610
  return params_copy
611

    
612

    
613
def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
614
  """Return the new version of an instance policy.
615

616
  @param group_policy: whether this policy applies to a group and thus
617
    we should support removal of policy entries
618

619
  """
620
  ipolicy = copy.deepcopy(old_ipolicy)
621
  for key, value in new_ipolicy.items():
622
    if key not in constants.IPOLICY_ALL_KEYS:
623
      raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
624
                                 errors.ECODE_INVAL)
625
    if (not value or value == [constants.VALUE_DEFAULT] or
626
            value == constants.VALUE_DEFAULT):
627
      if group_policy:
628
        if key in ipolicy:
629
          del ipolicy[key]
630
      else:
631
        raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
632
                                   " on the cluster'" % key,
633
                                   errors.ECODE_INVAL)
634
    else:
635
      if key in constants.IPOLICY_PARAMETERS:
636
        # FIXME: we assume all such values are float
637
        try:
638
          ipolicy[key] = float(value)
639
        except (TypeError, ValueError), err:
640
          raise errors.OpPrereqError("Invalid value for attribute"
641
                                     " '%s': '%s', error: %s" %
642
                                     (key, value, err), errors.ECODE_INVAL)
643
      elif key == constants.ISPECS_MINMAX:
644
        for minmax in value:
645
          for k in minmax.keys():
646
            utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
647
        ipolicy[key] = value
648
      elif key == constants.ISPECS_STD:
649
        if group_policy:
650
          msg = "%s cannot appear in group instance specs" % key
651
          raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
652
        ipolicy[key] = _GetUpdatedParams(old_ipolicy.get(key, {}), value,
653
                                         use_none=False, use_default=False)
654
        utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
655
      else:
656
        # FIXME: we assume all others are lists; this should be redone
657
        # in a nicer way
658
        ipolicy[key] = list(value)
659
  try:
660
    objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
661
  except errors.ConfigurationError, err:
662
    raise errors.OpPrereqError("Invalid instance policy: %s" % err,
663
                               errors.ECODE_INVAL)
664
  return ipolicy
665

    
666

    
667
def _AnnotateDiskParams(instance, devs, cfg):
668
  """Little helper wrapper to the rpc annotation method.
669

670
  @param instance: The instance object
671
  @type devs: List of L{objects.Disk}
672
  @param devs: The root devices (not any of its children!)
673
  @param cfg: The config object
674
  @returns The annotated disk copies
675
  @see L{rpc.AnnotateDiskParams}
676

677
  """
678
  return rpc.AnnotateDiskParams(instance.disk_template, devs,
679
                                cfg.GetInstanceDiskParams(instance))
680

    
681

    
682
def _SupportsOob(cfg, node):
683
  """Tells if node supports OOB.
684

685
  @type cfg: L{config.ConfigWriter}
686
  @param cfg: The cluster configuration
687
  @type node: L{objects.Node}
688
  @param node: The node
689
  @return: The OOB script if supported or an empty string otherwise
690

691
  """
692
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
693

    
694

    
695
def _UpdateAndVerifySubDict(base, updates, type_check):
696
  """Updates and verifies a dict with sub dicts of the same type.
697

698
  @param base: The dict with the old data
699
  @param updates: The dict with the new data
700
  @param type_check: Dict suitable to ForceDictType to verify correct types
701
  @returns: A new dict with updated and verified values
702

703
  """
704
  def fn(old, value):
705
    new = _GetUpdatedParams(old, value)
706
    utils.ForceDictType(new, type_check)
707
    return new
708

    
709
  ret = copy.deepcopy(base)
710
  ret.update(dict((key, fn(base.get(key, {}), value))
711
                  for key, value in updates.items()))
712
  return ret
713

    
714

    
715
def _FilterVmNodes(lu, nodenames):
716
  """Filters out non-vm_capable nodes from a list.
717

718
  @type lu: L{LogicalUnit}
719
  @param lu: the logical unit for which we check
720
  @type nodenames: list
721
  @param nodenames: the list of nodes on which we should check
722
  @rtype: list
723
  @return: the list of vm-capable nodes
724

725
  """
726
  vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
727
  return [name for name in nodenames if name not in vm_nodes]
728

    
729

    
730
def _GetDefaultIAllocator(cfg, ialloc):
731
  """Decides on which iallocator to use.
732

733
  @type cfg: L{config.ConfigWriter}
734
  @param cfg: Cluster configuration object
735
  @type ialloc: string or None
736
  @param ialloc: Iallocator specified in opcode
737
  @rtype: string
738
  @return: Iallocator name
739

740
  """
741
  if not ialloc:
742
    # Use default iallocator
743
    ialloc = cfg.GetDefaultIAllocator()
744

    
745
  if not ialloc:
746
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
747
                               " opcode nor as a cluster-wide default",
748
                               errors.ECODE_INVAL)
749

    
750
  return ialloc
751

    
752

    
753
def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
754
                              cur_group_uuid):
755
  """Checks if node groups for locked instances are still correct.
756

757
  @type cfg: L{config.ConfigWriter}
758
  @param cfg: Cluster configuration
759
  @type instances: dict; string as key, L{objects.Instance} as value
760
  @param instances: Dictionary, instance name as key, instance object as value
761
  @type owned_groups: iterable of string
762
  @param owned_groups: List of owned groups
763
  @type owned_nodes: iterable of string
764
  @param owned_nodes: List of owned nodes
765
  @type cur_group_uuid: string or None
766
  @param cur_group_uuid: Optional group UUID to check against instance's groups
767

768
  """
769
  for (name, inst) in instances.items():
770
    assert owned_nodes.issuperset(inst.all_nodes), \
771
      "Instance %s's nodes changed while we kept the lock" % name
772

    
773
    inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
774

    
775
    assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
776
      "Instance %s has no node in group %s" % (name, cur_group_uuid)
777

    
778

    
779
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
780
                             primary_only=False):
781
  """Checks if the owned node groups are still correct for an instance.
782

783
  @type cfg: L{config.ConfigWriter}
784
  @param cfg: The cluster configuration
785
  @type instance_name: string
786
  @param instance_name: Instance name
787
  @type owned_groups: set or frozenset
788
  @param owned_groups: List of currently owned node groups
789
  @type primary_only: boolean
790
  @param primary_only: Whether to check node groups for only the primary node
791

792
  """
793
  inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
794

    
795
  if not owned_groups.issuperset(inst_groups):
796
    raise errors.OpPrereqError("Instance %s's node groups changed since"
797
                               " locks were acquired, current groups are"
798
                               " are '%s', owning groups '%s'; retry the"
799
                               " operation" %
800
                               (instance_name,
801
                                utils.CommaJoin(inst_groups),
802
                                utils.CommaJoin(owned_groups)),
803
                               errors.ECODE_STATE)
804

    
805
  return inst_groups
806

    
807

    
808
def _LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
809
  """Unpacks the result of change-group and node-evacuate iallocator requests.
810

811
  Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
812
  L{constants.IALLOCATOR_MODE_CHG_GROUP}.
813

814
  @type lu: L{LogicalUnit}
815
  @param lu: Logical unit instance
816
  @type alloc_result: tuple/list
817
  @param alloc_result: Result from iallocator
818
  @type early_release: bool
819
  @param early_release: Whether to release locks early if possible
820
  @type use_nodes: bool
821
  @param use_nodes: Whether to display node names instead of groups
822

823
  """
824
  (moved, failed, jobs) = alloc_result
825

    
826
  if failed:
827
    failreason = utils.CommaJoin("%s (%s)" % (name, reason)
828
                                 for (name, reason) in failed)
829
    lu.LogWarning("Unable to evacuate instances %s", failreason)
830
    raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
831

    
832
  if moved:
833
    lu.LogInfo("Instances to be moved: %s",
834
               utils.CommaJoin("%s (to %s)" %
835
                               (name, _NodeEvacDest(use_nodes, group, nodes))
836
                               for (name, group, nodes) in moved))
837

    
838
  return [map(compat.partial(_SetOpEarlyRelease, early_release),
839
              map(opcodes.OpCode.LoadOpCode, ops))
840
          for ops in jobs]
841

    
842

    
843
def _NodeEvacDest(use_nodes, group, nodes):
844
  """Returns group or nodes depending on caller's choice.
845

846
  """
847
  if use_nodes:
848
    return utils.CommaJoin(nodes)
849
  else:
850
    return group
851

    
852

    
853
def _SetOpEarlyRelease(early_release, op):
854
  """Sets C{early_release} flag on opcodes if available.
855

856
  """
857
  try:
858
    op.early_release = early_release
859
  except AttributeError:
860
    assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
861

    
862
  return op
863

    
864

    
865
def _MapInstanceDisksToNodes(instances):
866
  """Creates a map from (node, volume) to instance name.
867

868
  @type instances: list of L{objects.Instance}
869
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
870

871
  """
872
  return dict(((node, vol), inst.name)
873
              for inst in instances
874
              for (node, vols) in inst.MapLVsByNode().items()
875
              for vol in vols)