Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / common.py @ 22b7f6f8

History | View | Annotate | Download (34.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Common functions used by multiple logical units."""
23

    
24
import copy
25
import os
26

    
27
from ganeti import compat
28
from ganeti import constants
29
from ganeti import errors
30
from ganeti import hypervisor
31
from ganeti import locking
32
from ganeti import objects
33
from ganeti import opcodes
34
from ganeti import pathutils
35
from ganeti import rpc
36
from ganeti import ssconf
37
from ganeti import utils
38

    
39

    
40
# States of instance
41
INSTANCE_DOWN = [constants.ADMINST_DOWN]
42
INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
43
INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
44

    
45
#: Instance status in which an instance can be marked as offline/online
46
CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
47
  constants.ADMINST_OFFLINE,
48
  ]))
49

    
50

    
51
def _ExpandItemName(fn, name, kind):
52
  """Expand an item name.
53

54
  @param fn: the function to use for expansion
55
  @param name: requested item name
56
  @param kind: text description ('Node' or 'Instance')
57
  @return: the resolved (full) name
58
  @raise errors.OpPrereqError: if the item is not found
59

60
  """
61
  full_name = fn(name)
62
  if full_name is None:
63
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
64
                               errors.ECODE_NOENT)
65
  return full_name
66

    
67

    
68
def _ExpandInstanceName(cfg, name):
69
  """Wrapper over L{_ExpandItemName} for instance."""
70
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
71

    
72

    
73
def _ExpandNodeName(cfg, name):
74
  """Wrapper over L{_ExpandItemName} for nodes."""
75
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
76

    
77

    
78
def _ShareAll():
79
  """Returns a dict declaring all lock levels shared.
80

81
  """
82
  return dict.fromkeys(locking.LEVELS, 1)
83

    
84

    
85
def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
86
  """Checks if the instances in a node group are still correct.
87

88
  @type cfg: L{config.ConfigWriter}
89
  @param cfg: The cluster configuration
90
  @type group_uuid: string
91
  @param group_uuid: Node group UUID
92
  @type owned_instances: set or frozenset
93
  @param owned_instances: List of currently owned instances
94

95
  """
96
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
97
  if owned_instances != wanted_instances:
98
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
99
                               " locks were acquired, wanted '%s', have '%s';"
100
                               " retry the operation" %
101
                               (group_uuid,
102
                                utils.CommaJoin(wanted_instances),
103
                                utils.CommaJoin(owned_instances)),
104
                               errors.ECODE_STATE)
105

    
106
  return wanted_instances
107

    
108

    
109
def _GetWantedNodes(lu, nodes):
110
  """Returns list of checked and expanded node names.
111

112
  @type lu: L{LogicalUnit}
113
  @param lu: the logical unit on whose behalf we execute
114
  @type nodes: list
115
  @param nodes: list of node names or None for all nodes
116
  @rtype: list
117
  @return: the list of nodes, sorted
118
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
119

120
  """
121
  if nodes:
122
    return [_ExpandNodeName(lu.cfg, name) for name in nodes]
123

    
124
  return utils.NiceSort(lu.cfg.GetNodeList())
125

    
126

    
127
def _GetWantedInstances(lu, instances):
128
  """Returns list of checked and expanded instance names.
129

130
  @type lu: L{LogicalUnit}
131
  @param lu: the logical unit on whose behalf we execute
132
  @type instances: list
133
  @param instances: list of instance names or None for all instances
134
  @rtype: list
135
  @return: the list of instances, sorted
136
  @raise errors.OpPrereqError: if the instances parameter is wrong type
137
  @raise errors.OpPrereqError: if any of the passed instances is not found
138

139
  """
140
  if instances:
141
    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
142
  else:
143
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
144
  return wanted
145

    
146

    
147
def _RunPostHook(lu, node_name):
148
  """Runs the post-hook for an opcode on a single node.
149

150
  """
151
  hm = lu.proc.BuildHooksManager(lu)
152
  try:
153
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
154
  except Exception, err: # pylint: disable=W0703
155
    lu.LogWarning("Errors occurred running hooks on %s: %s",
156
                  node_name, err)
157

    
158

    
159
def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
160
  """Distribute additional files which are part of the cluster configuration.
161

162
  ConfigWriter takes care of distributing the config and ssconf files, but
163
  there are more files which should be distributed to all nodes. This function
164
  makes sure those are copied.
165

166
  @param lu: calling logical unit
167
  @param additional_nodes: list of nodes not in the config to distribute to
168
  @type additional_vm: boolean
169
  @param additional_vm: whether the additional nodes are vm-capable or not
170

171
  """
172
  # Gather target nodes
173
  cluster = lu.cfg.GetClusterInfo()
174
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
175

    
176
  online_nodes = lu.cfg.GetOnlineNodeList()
177
  online_set = frozenset(online_nodes)
178
  vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList()))
179

    
180
  if additional_nodes is not None:
181
    online_nodes.extend(additional_nodes)
182
    if additional_vm:
183
      vm_nodes.extend(additional_nodes)
184

    
185
  # Never distribute to master node
186
  for nodelist in [online_nodes, vm_nodes]:
187
    if master_info.name in nodelist:
188
      nodelist.remove(master_info.name)
189

    
190
  # Gather file lists
191
  (files_all, _, files_mc, files_vm) = \
192
    _ComputeAncillaryFiles(cluster, True)
193

    
194
  # Never re-distribute configuration file from here
195
  assert not (pathutils.CLUSTER_CONF_FILE in files_all or
196
              pathutils.CLUSTER_CONF_FILE in files_vm)
197
  assert not files_mc, "Master candidates not handled in this function"
198

    
199
  filemap = [
200
    (online_nodes, files_all),
201
    (vm_nodes, files_vm),
202
    ]
203

    
204
  # Upload the files
205
  for (node_list, files) in filemap:
206
    for fname in files:
207
      _UploadHelper(lu, node_list, fname)
208

    
209

    
210
def _ComputeAncillaryFiles(cluster, redist):
211
  """Compute files external to Ganeti which need to be consistent.
212

213
  @type redist: boolean
214
  @param redist: Whether to include files which need to be redistributed
215

216
  """
217
  # Compute files for all nodes
218
  files_all = set([
219
    pathutils.SSH_KNOWN_HOSTS_FILE,
220
    pathutils.CONFD_HMAC_KEY,
221
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
222
    pathutils.SPICE_CERT_FILE,
223
    pathutils.SPICE_CACERT_FILE,
224
    pathutils.RAPI_USERS_FILE,
225
    ])
226

    
227
  if redist:
228
    # we need to ship at least the RAPI certificate
229
    files_all.add(pathutils.RAPI_CERT_FILE)
230
  else:
231
    files_all.update(pathutils.ALL_CERT_FILES)
232
    files_all.update(ssconf.SimpleStore().GetFileList())
233

    
234
  if cluster.modify_etc_hosts:
235
    files_all.add(pathutils.ETC_HOSTS)
236

    
237
  if cluster.use_external_mip_script:
238
    files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
239

    
240
  # Files which are optional, these must:
241
  # - be present in one other category as well
242
  # - either exist or not exist on all nodes of that category (mc, vm all)
243
  files_opt = set([
244
    pathutils.RAPI_USERS_FILE,
245
    ])
246

    
247
  # Files which should only be on master candidates
248
  files_mc = set()
249

    
250
  if not redist:
251
    files_mc.add(pathutils.CLUSTER_CONF_FILE)
252

    
253
  # File storage
254
  if (not redist and (constants.ENABLE_FILE_STORAGE or
255
                        constants.ENABLE_SHARED_FILE_STORAGE)):
256
    files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
257
    files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
258

    
259
  # Files which should only be on VM-capable nodes
260
  files_vm = set(
261
    filename
262
    for hv_name in cluster.enabled_hypervisors
263
    for filename in
264
    hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
265

    
266
  files_opt |= set(
267
    filename
268
    for hv_name in cluster.enabled_hypervisors
269
    for filename in
270
    hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
271

    
272
  # Filenames in each category must be unique
273
  all_files_set = files_all | files_mc | files_vm
274
  assert (len(all_files_set) ==
275
          sum(map(len, [files_all, files_mc, files_vm]))), \
276
    "Found file listed in more than one file list"
277

    
278
  # Optional files must be present in one other category
279
  assert all_files_set.issuperset(files_opt), \
280
    "Optional file not in a different required list"
281

    
282
  # This one file should never ever be re-distributed via RPC
283
  assert not (redist and
284
              pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
285

    
286
  return (files_all, files_opt, files_mc, files_vm)
287

    
288

    
289
def _UploadHelper(lu, nodes, fname):
290
  """Helper for uploading a file and showing warnings.
291

292
  """
293
  if os.path.exists(fname):
294
    result = lu.rpc.call_upload_file(nodes, fname)
295
    for to_node, to_result in result.items():
296
      msg = to_result.fail_msg
297
      if msg:
298
        msg = ("Copy of file %s to node %s failed: %s" %
299
               (fname, to_node, msg))
300
        lu.LogWarning(msg)
301

    
302

    
303
def _MergeAndVerifyHvState(op_input, obj_input):
304
  """Combines the hv state from an opcode with the one of the object
305

306
  @param op_input: The input dict from the opcode
307
  @param obj_input: The input dict from the objects
308
  @return: The verified and updated dict
309

310
  """
311
  if op_input:
312
    invalid_hvs = set(op_input) - constants.HYPER_TYPES
313
    if invalid_hvs:
314
      raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
315
                                 " %s" % utils.CommaJoin(invalid_hvs),
316
                                 errors.ECODE_INVAL)
317
    if obj_input is None:
318
      obj_input = {}
319
    type_check = constants.HVSTS_PARAMETER_TYPES
320
    return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
321

    
322
  return None
323

    
324

    
325
def _MergeAndVerifyDiskState(op_input, obj_input):
326
  """Combines the disk state from an opcode with the one of the object
327

328
  @param op_input: The input dict from the opcode
329
  @param obj_input: The input dict from the objects
330
  @return: The verified and updated dict
331
  """
332
  if op_input:
333
    invalid_dst = set(op_input) - constants.DS_VALID_TYPES
334
    if invalid_dst:
335
      raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
336
                                 utils.CommaJoin(invalid_dst),
337
                                 errors.ECODE_INVAL)
338
    type_check = constants.DSS_PARAMETER_TYPES
339
    if obj_input is None:
340
      obj_input = {}
341
    return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
342
                                              type_check))
343
                for key, value in op_input.items())
344

    
345
  return None
346

    
347

    
348
def _CheckOSParams(lu, required, nodenames, osname, osparams):
349
  """OS parameters validation.
350

351
  @type lu: L{LogicalUnit}
352
  @param lu: the logical unit for which we check
353
  @type required: boolean
354
  @param required: whether the validation should fail if the OS is not
355
      found
356
  @type nodenames: list
357
  @param nodenames: the list of nodes on which we should check
358
  @type osname: string
359
  @param osname: the name of the hypervisor we should use
360
  @type osparams: dict
361
  @param osparams: the parameters which we need to check
362
  @raise errors.OpPrereqError: if the parameters are not valid
363

364
  """
365
  nodenames = _FilterVmNodes(lu, nodenames)
366
  result = lu.rpc.call_os_validate(nodenames, required, osname,
367
                                   [constants.OS_VALIDATE_PARAMETERS],
368
                                   osparams)
369
  for node, nres in result.items():
370
    # we don't check for offline cases since this should be run only
371
    # against the master node and/or an instance's nodes
372
    nres.Raise("OS Parameters validation failed on node %s" % node)
373
    if not nres.payload:
374
      lu.LogInfo("OS %s not found on node %s, validation skipped",
375
                 osname, node)
376

    
377

    
378
def _CheckHVParams(lu, nodenames, hvname, hvparams):
379
  """Hypervisor parameter validation.
380

381
  This function abstract the hypervisor parameter validation to be
382
  used in both instance create and instance modify.
383

384
  @type lu: L{LogicalUnit}
385
  @param lu: the logical unit for which we check
386
  @type nodenames: list
387
  @param nodenames: the list of nodes on which we should check
388
  @type hvname: string
389
  @param hvname: the name of the hypervisor we should use
390
  @type hvparams: dict
391
  @param hvparams: the parameters which we need to check
392
  @raise errors.OpPrereqError: if the parameters are not valid
393

394
  """
395
  nodenames = _FilterVmNodes(lu, nodenames)
396

    
397
  cluster = lu.cfg.GetClusterInfo()
398
  hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
399

    
400
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull)
401
  for node in nodenames:
402
    info = hvinfo[node]
403
    if info.offline:
404
      continue
405
    info.Raise("Hypervisor parameter validation failed on node %s" % node)
406

    
407

    
408
def _AdjustCandidatePool(lu, exceptions):
409
  """Adjust the candidate pool after node operations.
410

411
  """
412
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
413
  if mod_list:
414
    lu.LogInfo("Promoted nodes to master candidate role: %s",
415
               utils.CommaJoin(node.name for node in mod_list))
416
    for name in mod_list:
417
      lu.context.ReaddNode(name)
418
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
419
  if mc_now > mc_max:
420
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
421
               (mc_now, mc_max))
422

    
423

    
424
def _CheckNodePVs(nresult, exclusive_storage):
425
  """Check node PVs.
426

427
  """
428
  pvlist_dict = nresult.get(constants.NV_PVLIST, None)
429
  if pvlist_dict is None:
430
    return (["Can't get PV list from node"], None)
431
  pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
432
  errlist = []
433
  # check that ':' is not present in PV names, since it's a
434
  # special character for lvcreate (denotes the range of PEs to
435
  # use on the PV)
436
  for pv in pvlist:
437
    if ":" in pv.name:
438
      errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
439
                     (pv.name, pv.vg_name))
440
  es_pvinfo = None
441
  if exclusive_storage:
442
    (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
443
    errlist.extend(errmsgs)
444
    shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
445
    if shared_pvs:
446
      for (pvname, lvlist) in shared_pvs:
447
        # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
448
        errlist.append("PV %s is shared among unrelated LVs (%s)" %
449
                       (pvname, utils.CommaJoin(lvlist)))
450
  return (errlist, es_pvinfo)
451

    
452

    
453
def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
454
  """Computes if value is in the desired range.
455

456
  @param name: name of the parameter for which we perform the check
457
  @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
458
      not just 'disk')
459
  @param ispecs: dictionary containing min and max values
460
  @param value: actual value that we want to use
461
  @return: None or an error string
462

463
  """
464
  if value in [None, constants.VALUE_AUTO]:
465
    return None
466
  max_v = ispecs[constants.ISPECS_MAX].get(name, value)
467
  min_v = ispecs[constants.ISPECS_MIN].get(name, value)
468
  if value > max_v or min_v > value:
469
    if qualifier:
470
      fqn = "%s/%s" % (name, qualifier)
471
    else:
472
      fqn = name
473
    return ("%s value %s is not in range [%s, %s]" %
474
            (fqn, value, min_v, max_v))
475
  return None
476

    
477

    
478
def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
479
                                 nic_count, disk_sizes, spindle_use,
480
                                 disk_template,
481
                                 _compute_fn=_ComputeMinMaxSpec):
482
  """Verifies ipolicy against provided specs.
483

484
  @type ipolicy: dict
485
  @param ipolicy: The ipolicy
486
  @type mem_size: int
487
  @param mem_size: The memory size
488
  @type cpu_count: int
489
  @param cpu_count: Used cpu cores
490
  @type disk_count: int
491
  @param disk_count: Number of disks used
492
  @type nic_count: int
493
  @param nic_count: Number of nics used
494
  @type disk_sizes: list of ints
495
  @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
496
  @type spindle_use: int
497
  @param spindle_use: The number of spindles this instance uses
498
  @type disk_template: string
499
  @param disk_template: The disk template of the instance
500
  @param _compute_fn: The compute function (unittest only)
501
  @return: A list of violations, or an empty list of no violations are found
502

503
  """
504
  assert disk_count == len(disk_sizes)
505

    
506
  test_settings = [
507
    (constants.ISPEC_MEM_SIZE, "", mem_size),
508
    (constants.ISPEC_CPU_COUNT, "", cpu_count),
509
    (constants.ISPEC_NIC_COUNT, "", nic_count),
510
    (constants.ISPEC_SPINDLE_USE, "", spindle_use),
511
    ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
512
         for idx, d in enumerate(disk_sizes)]
513
  if disk_template != constants.DT_DISKLESS:
514
    # This check doesn't make sense for diskless instances
515
    test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
516
  ret = []
517
  allowed_dts = ipolicy[constants.IPOLICY_DTS]
518
  if disk_template not in allowed_dts:
519
    ret.append("Disk template %s is not allowed (allowed templates: %s)" %
520
               (disk_template, utils.CommaJoin(allowed_dts)))
521

    
522
  min_errs = None
523
  for minmax in ipolicy[constants.ISPECS_MINMAX]:
524
    errs = filter(None,
525
                  (_compute_fn(name, qualifier, minmax, value)
526
                   for (name, qualifier, value) in test_settings))
527
    if min_errs is None or len(errs) < len(min_errs):
528
      min_errs = errs
529
  assert min_errs is not None
530
  return ret + min_errs
531

    
532

    
533
def _ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
534
                                     _compute_fn=_ComputeIPolicySpecViolation):
535
  """Compute if instance meets the specs of ipolicy.
536

537
  @type ipolicy: dict
538
  @param ipolicy: The ipolicy to verify against
539
  @type instance: L{objects.Instance}
540
  @param instance: The instance to verify
541
  @type cfg: L{config.ConfigWriter}
542
  @param cfg: Cluster configuration
543
  @param _compute_fn: The function to verify ipolicy (unittest only)
544
  @see: L{_ComputeIPolicySpecViolation}
545

546
  """
547
  be_full = cfg.GetClusterInfo().FillBE(instance)
548
  mem_size = be_full[constants.BE_MAXMEM]
549
  cpu_count = be_full[constants.BE_VCPUS]
550
  spindle_use = be_full[constants.BE_SPINDLE_USE]
551
  disk_count = len(instance.disks)
552
  disk_sizes = [disk.size for disk in instance.disks]
553
  nic_count = len(instance.nics)
554
  disk_template = instance.disk_template
555

    
556
  return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
557
                     disk_sizes, spindle_use, disk_template)
558

    
559

    
560
def _ComputeViolatingInstances(ipolicy, instances, cfg):
561
  """Computes a set of instances who violates given ipolicy.
562

563
  @param ipolicy: The ipolicy to verify
564
  @type instances: L{objects.Instance}
565
  @param instances: List of instances to verify
566
  @type cfg: L{config.ConfigWriter}
567
  @param cfg: Cluster configuration
568
  @return: A frozenset of instance names violating the ipolicy
569

570
  """
571
  return frozenset([inst.name for inst in instances
572
                    if _ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
573

    
574

    
575
def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
576
  """Computes a set of any instances that would violate the new ipolicy.
577

578
  @param old_ipolicy: The current (still in-place) ipolicy
579
  @param new_ipolicy: The new (to become) ipolicy
580
  @param instances: List of instances to verify
581
  @type cfg: L{config.ConfigWriter}
582
  @param cfg: Cluster configuration
583
  @return: A list of instances which violates the new ipolicy but
584
      did not before
585

586
  """
587
  return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
588
          _ComputeViolatingInstances(old_ipolicy, instances, cfg))
589

    
590

    
591
def _GetUpdatedParams(old_params, update_dict,
592
                      use_default=True, use_none=False):
593
  """Return the new version of a parameter dictionary.
594

595
  @type old_params: dict
596
  @param old_params: old parameters
597
  @type update_dict: dict
598
  @param update_dict: dict containing new parameter values, or
599
      constants.VALUE_DEFAULT to reset the parameter to its default
600
      value
601
  @param use_default: boolean
602
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
603
      values as 'to be deleted' values
604
  @param use_none: boolean
605
  @type use_none: whether to recognise C{None} values as 'to be
606
      deleted' values
607
  @rtype: dict
608
  @return: the new parameter dictionary
609

610
  """
611
  params_copy = copy.deepcopy(old_params)
612
  for key, val in update_dict.iteritems():
613
    if ((use_default and val == constants.VALUE_DEFAULT) or
614
          (use_none and val is None)):
615
      try:
616
        del params_copy[key]
617
      except KeyError:
618
        pass
619
    else:
620
      params_copy[key] = val
621
  return params_copy
622

    
623

    
624
def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
625
  """Return the new version of an instance policy.
626

627
  @param group_policy: whether this policy applies to a group and thus
628
    we should support removal of policy entries
629

630
  """
631
  ipolicy = copy.deepcopy(old_ipolicy)
632
  for key, value in new_ipolicy.items():
633
    if key not in constants.IPOLICY_ALL_KEYS:
634
      raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
635
                                 errors.ECODE_INVAL)
636
    if (not value or value == [constants.VALUE_DEFAULT] or
637
            value == constants.VALUE_DEFAULT):
638
      if group_policy:
639
        if key in ipolicy:
640
          del ipolicy[key]
641
      else:
642
        raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
643
                                   " on the cluster'" % key,
644
                                   errors.ECODE_INVAL)
645
    else:
646
      if key in constants.IPOLICY_PARAMETERS:
647
        # FIXME: we assume all such values are float
648
        try:
649
          ipolicy[key] = float(value)
650
        except (TypeError, ValueError), err:
651
          raise errors.OpPrereqError("Invalid value for attribute"
652
                                     " '%s': '%s', error: %s" %
653
                                     (key, value, err), errors.ECODE_INVAL)
654
      elif key == constants.ISPECS_MINMAX:
655
        for minmax in value:
656
          for k in minmax.keys():
657
            utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
658
        ipolicy[key] = value
659
      elif key == constants.ISPECS_STD:
660
        if group_policy:
661
          msg = "%s cannot appear in group instance specs" % key
662
          raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
663
        ipolicy[key] = _GetUpdatedParams(old_ipolicy.get(key, {}), value,
664
                                         use_none=False, use_default=False)
665
        utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
666
      else:
667
        # FIXME: we assume all others are lists; this should be redone
668
        # in a nicer way
669
        ipolicy[key] = list(value)
670
  try:
671
    objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
672
  except errors.ConfigurationError, err:
673
    raise errors.OpPrereqError("Invalid instance policy: %s" % err,
674
                               errors.ECODE_INVAL)
675
  return ipolicy
676

    
677

    
678
def _AnnotateDiskParams(instance, devs, cfg):
679
  """Little helper wrapper to the rpc annotation method.
680

681
  @param instance: The instance object
682
  @type devs: List of L{objects.Disk}
683
  @param devs: The root devices (not any of its children!)
684
  @param cfg: The config object
685
  @returns The annotated disk copies
686
  @see L{rpc.AnnotateDiskParams}
687

688
  """
689
  return rpc.AnnotateDiskParams(instance.disk_template, devs,
690
                                cfg.GetInstanceDiskParams(instance))
691

    
692

    
693
def _SupportsOob(cfg, node):
694
  """Tells if node supports OOB.
695

696
  @type cfg: L{config.ConfigWriter}
697
  @param cfg: The cluster configuration
698
  @type node: L{objects.Node}
699
  @param node: The node
700
  @return: The OOB script if supported or an empty string otherwise
701

702
  """
703
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
704

    
705

    
706
def _UpdateAndVerifySubDict(base, updates, type_check):
707
  """Updates and verifies a dict with sub dicts of the same type.
708

709
  @param base: The dict with the old data
710
  @param updates: The dict with the new data
711
  @param type_check: Dict suitable to ForceDictType to verify correct types
712
  @returns: A new dict with updated and verified values
713

714
  """
715
  def fn(old, value):
716
    new = _GetUpdatedParams(old, value)
717
    utils.ForceDictType(new, type_check)
718
    return new
719

    
720
  ret = copy.deepcopy(base)
721
  ret.update(dict((key, fn(base.get(key, {}), value))
722
                  for key, value in updates.items()))
723
  return ret
724

    
725

    
726
def _FilterVmNodes(lu, nodenames):
727
  """Filters out non-vm_capable nodes from a list.
728

729
  @type lu: L{LogicalUnit}
730
  @param lu: the logical unit for which we check
731
  @type nodenames: list
732
  @param nodenames: the list of nodes on which we should check
733
  @rtype: list
734
  @return: the list of vm-capable nodes
735

736
  """
737
  vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
738
  return [name for name in nodenames if name not in vm_nodes]
739

    
740

    
741
def _GetDefaultIAllocator(cfg, ialloc):
742
  """Decides on which iallocator to use.
743

744
  @type cfg: L{config.ConfigWriter}
745
  @param cfg: Cluster configuration object
746
  @type ialloc: string or None
747
  @param ialloc: Iallocator specified in opcode
748
  @rtype: string
749
  @return: Iallocator name
750

751
  """
752
  if not ialloc:
753
    # Use default iallocator
754
    ialloc = cfg.GetDefaultIAllocator()
755

    
756
  if not ialloc:
757
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
758
                               " opcode nor as a cluster-wide default",
759
                               errors.ECODE_INVAL)
760

    
761
  return ialloc
762

    
763

    
764
def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
765
                              cur_group_uuid):
766
  """Checks if node groups for locked instances are still correct.
767

768
  @type cfg: L{config.ConfigWriter}
769
  @param cfg: Cluster configuration
770
  @type instances: dict; string as key, L{objects.Instance} as value
771
  @param instances: Dictionary, instance name as key, instance object as value
772
  @type owned_groups: iterable of string
773
  @param owned_groups: List of owned groups
774
  @type owned_nodes: iterable of string
775
  @param owned_nodes: List of owned nodes
776
  @type cur_group_uuid: string or None
777
  @param cur_group_uuid: Optional group UUID to check against instance's groups
778

779
  """
780
  for (name, inst) in instances.items():
781
    assert owned_nodes.issuperset(inst.all_nodes), \
782
      "Instance %s's nodes changed while we kept the lock" % name
783

    
784
    inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
785

    
786
    assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
787
      "Instance %s has no node in group %s" % (name, cur_group_uuid)
788

    
789

    
790
def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
791
                             primary_only=False):
792
  """Checks if the owned node groups are still correct for an instance.
793

794
  @type cfg: L{config.ConfigWriter}
795
  @param cfg: The cluster configuration
796
  @type instance_name: string
797
  @param instance_name: Instance name
798
  @type owned_groups: set or frozenset
799
  @param owned_groups: List of currently owned node groups
800
  @type primary_only: boolean
801
  @param primary_only: Whether to check node groups for only the primary node
802

803
  """
804
  inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
805

    
806
  if not owned_groups.issuperset(inst_groups):
807
    raise errors.OpPrereqError("Instance %s's node groups changed since"
808
                               " locks were acquired, current groups are"
809
                               " are '%s', owning groups '%s'; retry the"
810
                               " operation" %
811
                               (instance_name,
812
                                utils.CommaJoin(inst_groups),
813
                                utils.CommaJoin(owned_groups)),
814
                               errors.ECODE_STATE)
815

    
816
  return inst_groups
817

    
818

    
819
def _LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
820
  """Unpacks the result of change-group and node-evacuate iallocator requests.
821

822
  Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
823
  L{constants.IALLOCATOR_MODE_CHG_GROUP}.
824

825
  @type lu: L{LogicalUnit}
826
  @param lu: Logical unit instance
827
  @type alloc_result: tuple/list
828
  @param alloc_result: Result from iallocator
829
  @type early_release: bool
830
  @param early_release: Whether to release locks early if possible
831
  @type use_nodes: bool
832
  @param use_nodes: Whether to display node names instead of groups
833

834
  """
835
  (moved, failed, jobs) = alloc_result
836

    
837
  if failed:
838
    failreason = utils.CommaJoin("%s (%s)" % (name, reason)
839
                                 for (name, reason) in failed)
840
    lu.LogWarning("Unable to evacuate instances %s", failreason)
841
    raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
842

    
843
  if moved:
844
    lu.LogInfo("Instances to be moved: %s",
845
               utils.CommaJoin("%s (to %s)" %
846
                               (name, _NodeEvacDest(use_nodes, group, nodes))
847
                               for (name, group, nodes) in moved))
848

    
849
  return [map(compat.partial(_SetOpEarlyRelease, early_release),
850
              map(opcodes.OpCode.LoadOpCode, ops))
851
          for ops in jobs]
852

    
853

    
854
def _NodeEvacDest(use_nodes, group, nodes):
855
  """Returns group or nodes depending on caller's choice.
856

857
  """
858
  if use_nodes:
859
    return utils.CommaJoin(nodes)
860
  else:
861
    return group
862

    
863

    
864
def _SetOpEarlyRelease(early_release, op):
865
  """Sets C{early_release} flag on opcodes if available.
866

867
  """
868
  try:
869
    op.early_release = early_release
870
  except AttributeError:
871
    assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
872

    
873
  return op
874

    
875

    
876
def _MapInstanceDisksToNodes(instances):
877
  """Creates a map from (node, volume) to instance name.
878

879
  @type instances: list of L{objects.Instance}
880
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
881

882
  """
883
  return dict(((node, vol), inst.name)
884
              for inst in instances
885
              for (node, vols) in inst.MapLVsByNode().items()
886
              for vol in vols)
887

    
888

    
889
def _CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
890
  """Make sure that none of the given paramters is global.
891

892
  If a global parameter is found, an L{errors.OpPrereqError} exception is
893
  raised. This is used to avoid setting global parameters for individual nodes.
894

895
  @type params: dictionary
896
  @param params: Parameters to check
897
  @type glob_pars: dictionary
898
  @param glob_pars: Forbidden parameters
899
  @type kind: string
900
  @param kind: Kind of parameters (e.g. "node")
901
  @type bad_levels: string
902
  @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
903
      "instance")
904
  @type good_levels: strings
905
  @param good_levels: Level(s) at which the parameters are allowed (e.g.
906
      "cluster or group")
907

908
  """
909
  used_globals = glob_pars.intersection(params)
910
  if used_globals:
911
    msg = ("The following %s parameters are global and cannot"
912
           " be customized at %s level, please modify them at"
913
           " %s level: %s" %
914
           (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
915
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
916

    
917

    
918
def _IsExclusiveStorageEnabledNode(cfg, node):
919
  """Whether exclusive_storage is in effect for the given node.
920

921
  @type cfg: L{config.ConfigWriter}
922
  @param cfg: The cluster configuration
923
  @type node: L{objects.Node}
924
  @param node: The node
925
  @rtype: bool
926
  @return: The effective value of exclusive_storage
927

928
  """
929
  return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
930

    
931

    
932
def _CheckInstanceState(lu, instance, req_states, msg=None):
933
  """Ensure that an instance is in one of the required states.
934

935
  @param lu: the LU on behalf of which we make the check
936
  @param instance: the instance to check
937
  @param msg: if passed, should be a message to replace the default one
938
  @raise errors.OpPrereqError: if the instance is not in the required state
939

940
  """
941
  if msg is None:
942
    msg = ("can't use instance from outside %s states" %
943
           utils.CommaJoin(req_states))
944
  if instance.admin_state not in req_states:
945
    raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
946
                               (instance.name, instance.admin_state, msg),
947
                               errors.ECODE_STATE)
948

    
949
  if constants.ADMINST_UP not in req_states:
950
    pnode = instance.primary_node
951
    if not lu.cfg.GetNodeInfo(pnode).offline:
952
      ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
953
      ins_l.Raise("Can't contact node %s for instance information" % pnode,
954
                  prereq=True, ecode=errors.ECODE_ENVIRON)
955
      if instance.name in ins_l.payload:
956
        raise errors.OpPrereqError("Instance %s is running, %s" %
957
                                   (instance.name, msg), errors.ECODE_STATE)
958
    else:
959
      lu.LogWarning("Primary node offline, ignoring check that instance"
960
                     " is down")
961

    
962

    
963
def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
964
  """Check the sanity of iallocator and node arguments and use the
965
  cluster-wide iallocator if appropriate.
966

967
  Check that at most one of (iallocator, node) is specified. If none is
968
  specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
969
  then the LU's opcode's iallocator slot is filled with the cluster-wide
970
  default iallocator.
971

972
  @type iallocator_slot: string
973
  @param iallocator_slot: the name of the opcode iallocator slot
974
  @type node_slot: string
975
  @param node_slot: the name of the opcode target node slot
976

977
  """
978
  node = getattr(lu.op, node_slot, None)
979
  ialloc = getattr(lu.op, iallocator_slot, None)
980
  if node == []:
981
    node = None
982

    
983
  if node is not None and ialloc is not None:
984
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
985
                               errors.ECODE_INVAL)
986
  elif ((node is None and ialloc is None) or
987
        ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
988
    default_iallocator = lu.cfg.GetDefaultIAllocator()
989
    if default_iallocator:
990
      setattr(lu.op, iallocator_slot, default_iallocator)
991
    else:
992
      raise errors.OpPrereqError("No iallocator or node given and no"
993
                                 " cluster-wide default iallocator found;"
994
                                 " please specify either an iallocator or a"
995
                                 " node, or set a cluster-wide default"
996
                                 " iallocator", errors.ECODE_INVAL)
997

    
998

    
999
def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1000
  faulty = []
1001

    
1002
  for dev in instance.disks:
1003
    cfg.SetDiskID(dev, node_name)
1004

    
1005
  result = rpc_runner.call_blockdev_getmirrorstatus(node_name,
1006
                                                    (instance.disks,
1007
                                                     instance))
1008
  result.Raise("Failed to get disk status from node %s" % node_name,
1009
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1010

    
1011
  for idx, bdev_status in enumerate(result.payload):
1012
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1013
      faulty.append(idx)
1014

    
1015
  return faulty
1016

    
1017

    
1018
def _CheckNodeOnline(lu, node, msg=None):
1019
  """Ensure that a given node is online.
1020

1021
  @param lu: the LU on behalf of which we make the check
1022
  @param node: the node to check
1023
  @param msg: if passed, should be a message to replace the default one
1024
  @raise errors.OpPrereqError: if the node is offline
1025

1026
  """
1027
  if msg is None:
1028
    msg = "Can't use offline node"
1029
  if lu.cfg.GetNodeInfo(node).offline:
1030
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)