Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / common.py @ 5a13489b

History | View | Annotate | Download (35.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Common functions used by multiple logical units."""
23

    
24
import copy
25
import os
26

    
27
from ganeti import compat
28
from ganeti import constants
29
from ganeti import errors
30
from ganeti import hypervisor
31
from ganeti import locking
32
from ganeti import objects
33
from ganeti import opcodes
34
from ganeti import pathutils
35
from ganeti import rpc
36
from ganeti import ssconf
37
from ganeti import utils
38

    
39

    
40
# States of instance
41
INSTANCE_DOWN = [constants.ADMINST_DOWN]
42
INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
43
INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
44

    
45
#: Instance status in which an instance can be marked as offline/online
46
CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
47
  constants.ADMINST_OFFLINE,
48
  ]))
49

    
50

    
51
def _ExpandItemName(fn, name, kind):
52
  """Expand an item name.
53

54
  @param fn: the function to use for expansion
55
  @param name: requested item name
56
  @param kind: text description ('Node' or 'Instance')
57
  @return: the resolved (full) name
58
  @raise errors.OpPrereqError: if the item is not found
59

60
  """
61
  full_name = fn(name)
62
  if full_name is None:
63
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
64
                               errors.ECODE_NOENT)
65
  return full_name
66

    
67

    
68
def ExpandInstanceName(cfg, name):
69
  """Wrapper over L{_ExpandItemName} for instance."""
70
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
71

    
72

    
73
def ExpandNodeName(cfg, name):
74
  """Wrapper over L{_ExpandItemName} for nodes."""
75
  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
76

    
77

    
78
def ShareAll():
79
  """Returns a dict declaring all lock levels shared.
80

81
  """
82
  return dict.fromkeys(locking.LEVELS, 1)
83

    
84

    
85
def CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
86
  """Checks if the instances in a node group are still correct.
87

88
  @type cfg: L{config.ConfigWriter}
89
  @param cfg: The cluster configuration
90
  @type group_uuid: string
91
  @param group_uuid: Node group UUID
92
  @type owned_instances: set or frozenset
93
  @param owned_instances: List of currently owned instances
94

95
  """
96
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
97
  if owned_instances != wanted_instances:
98
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
99
                               " locks were acquired, wanted '%s', have '%s';"
100
                               " retry the operation" %
101
                               (group_uuid,
102
                                utils.CommaJoin(wanted_instances),
103
                                utils.CommaJoin(owned_instances)),
104
                               errors.ECODE_STATE)
105

    
106
  return wanted_instances
107

    
108

    
109
def GetWantedNodes(lu, nodes):
110
  """Returns list of checked and expanded node names.
111

112
  @type lu: L{LogicalUnit}
113
  @param lu: the logical unit on whose behalf we execute
114
  @type nodes: list
115
  @param nodes: list of node names or None for all nodes
116
  @rtype: list
117
  @return: the list of nodes, sorted
118
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
119

120
  """
121
  if nodes:
122
    return [ExpandNodeName(lu.cfg, name) for name in nodes]
123

    
124
  return utils.NiceSort(lu.cfg.GetNodeList())
125

    
126

    
127
def GetWantedInstances(lu, instances):
128
  """Returns list of checked and expanded instance names.
129

130
  @type lu: L{LogicalUnit}
131
  @param lu: the logical unit on whose behalf we execute
132
  @type instances: list
133
  @param instances: list of instance names or None for all instances
134
  @rtype: list
135
  @return: the list of instances, sorted
136
  @raise errors.OpPrereqError: if the instances parameter is wrong type
137
  @raise errors.OpPrereqError: if any of the passed instances is not found
138

139
  """
140
  if instances:
141
    wanted = [ExpandInstanceName(lu.cfg, name) for name in instances]
142
  else:
143
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
144
  return wanted
145

    
146

    
147
def RunPostHook(lu, node_name):
148
  """Runs the post-hook for an opcode on a single node.
149

150
  """
151
  hm = lu.proc.BuildHooksManager(lu)
152
  try:
153
    hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
154
  except Exception, err: # pylint: disable=W0703
155
    lu.LogWarning("Errors occurred running hooks on %s: %s",
156
                  node_name, err)
157

    
158

    
159
def RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
160
  """Distribute additional files which are part of the cluster configuration.
161

162
  ConfigWriter takes care of distributing the config and ssconf files, but
163
  there are more files which should be distributed to all nodes. This function
164
  makes sure those are copied.
165

166
  @param lu: calling logical unit
167
  @param additional_nodes: list of nodes not in the config to distribute to
168
  @type additional_vm: boolean
169
  @param additional_vm: whether the additional nodes are vm-capable or not
170

171
  """
172
  # Gather target nodes
173
  cluster = lu.cfg.GetClusterInfo()
174
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
175

    
176
  online_nodes = lu.cfg.GetOnlineNodeList()
177
  online_set = frozenset(online_nodes)
178
  vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList()))
179

    
180
  if additional_nodes is not None:
181
    online_nodes.extend(additional_nodes)
182
    if additional_vm:
183
      vm_nodes.extend(additional_nodes)
184

    
185
  # Never distribute to master node
186
  for nodelist in [online_nodes, vm_nodes]:
187
    if master_info.name in nodelist:
188
      nodelist.remove(master_info.name)
189

    
190
  # Gather file lists
191
  (files_all, _, files_mc, files_vm) = \
192
    ComputeAncillaryFiles(cluster, True)
193

    
194
  # Never re-distribute configuration file from here
195
  assert not (pathutils.CLUSTER_CONF_FILE in files_all or
196
              pathutils.CLUSTER_CONF_FILE in files_vm)
197
  assert not files_mc, "Master candidates not handled in this function"
198

    
199
  filemap = [
200
    (online_nodes, files_all),
201
    (vm_nodes, files_vm),
202
    ]
203

    
204
  # Upload the files
205
  for (node_list, files) in filemap:
206
    for fname in files:
207
      UploadHelper(lu, node_list, fname)
208

    
209

    
210
def ComputeAncillaryFiles(cluster, redist):
211
  """Compute files external to Ganeti which need to be consistent.
212

213
  @type redist: boolean
214
  @param redist: Whether to include files which need to be redistributed
215

216
  """
217
  # Compute files for all nodes
218
  files_all = set([
219
    pathutils.SSH_KNOWN_HOSTS_FILE,
220
    pathutils.CONFD_HMAC_KEY,
221
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
222
    pathutils.SPICE_CERT_FILE,
223
    pathutils.SPICE_CACERT_FILE,
224
    pathutils.RAPI_USERS_FILE,
225
    ])
226

    
227
  if redist:
228
    # we need to ship at least the RAPI certificate
229
    files_all.add(pathutils.RAPI_CERT_FILE)
230
  else:
231
    files_all.update(pathutils.ALL_CERT_FILES)
232
    files_all.update(ssconf.SimpleStore().GetFileList())
233

    
234
  if cluster.modify_etc_hosts:
235
    files_all.add(pathutils.ETC_HOSTS)
236

    
237
  if cluster.use_external_mip_script:
238
    files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
239

    
240
  # Files which are optional, these must:
241
  # - be present in one other category as well
242
  # - either exist or not exist on all nodes of that category (mc, vm all)
243
  files_opt = set([
244
    pathutils.RAPI_USERS_FILE,
245
    ])
246

    
247
  # Files which should only be on master candidates
248
  files_mc = set()
249

    
250
  if not redist:
251
    files_mc.add(pathutils.CLUSTER_CONF_FILE)
252

    
253
  # File storage
254
  if (not redist and (constants.ENABLE_FILE_STORAGE or
255
                        constants.ENABLE_SHARED_FILE_STORAGE)):
256
    files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
257
    files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
258

    
259
  # Files which should only be on VM-capable nodes
260
  files_vm = set(
261
    filename
262
    for hv_name in cluster.enabled_hypervisors
263
    for filename in
264
    hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
265

    
266
  files_opt |= set(
267
    filename
268
    for hv_name in cluster.enabled_hypervisors
269
    for filename in
270
    hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
271

    
272
  # Filenames in each category must be unique
273
  all_files_set = files_all | files_mc | files_vm
274
  assert (len(all_files_set) ==
275
          sum(map(len, [files_all, files_mc, files_vm]))), \
276
    "Found file listed in more than one file list"
277

    
278
  # Optional files must be present in one other category
279
  assert all_files_set.issuperset(files_opt), \
280
    "Optional file not in a different required list"
281

    
282
  # This one file should never ever be re-distributed via RPC
283
  assert not (redist and
284
              pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
285

    
286
  return (files_all, files_opt, files_mc, files_vm)
287

    
288

    
289
def UploadHelper(lu, nodes, fname):
290
  """Helper for uploading a file and showing warnings.
291

292
  """
293
  if os.path.exists(fname):
294
    result = lu.rpc.call_upload_file(nodes, fname)
295
    for to_node, to_result in result.items():
296
      msg = to_result.fail_msg
297
      if msg:
298
        msg = ("Copy of file %s to node %s failed: %s" %
299
               (fname, to_node, msg))
300
        lu.LogWarning(msg)
301

    
302

    
303
def MergeAndVerifyHvState(op_input, obj_input):
304
  """Combines the hv state from an opcode with the one of the object
305

306
  @param op_input: The input dict from the opcode
307
  @param obj_input: The input dict from the objects
308
  @return: The verified and updated dict
309

310
  """
311
  if op_input:
312
    invalid_hvs = set(op_input) - constants.HYPER_TYPES
313
    if invalid_hvs:
314
      raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
315
                                 " %s" % utils.CommaJoin(invalid_hvs),
316
                                 errors.ECODE_INVAL)
317
    if obj_input is None:
318
      obj_input = {}
319
    type_check = constants.HVSTS_PARAMETER_TYPES
320
    return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
321

    
322
  return None
323

    
324

    
325
def MergeAndVerifyDiskState(op_input, obj_input):
326
  """Combines the disk state from an opcode with the one of the object
327

328
  @param op_input: The input dict from the opcode
329
  @param obj_input: The input dict from the objects
330
  @return: The verified and updated dict
331
  """
332
  if op_input:
333
    invalid_dst = set(op_input) - constants.DS_VALID_TYPES
334
    if invalid_dst:
335
      raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
336
                                 utils.CommaJoin(invalid_dst),
337
                                 errors.ECODE_INVAL)
338
    type_check = constants.DSS_PARAMETER_TYPES
339
    if obj_input is None:
340
      obj_input = {}
341
    return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
342
                                              type_check))
343
                for key, value in op_input.items())
344

    
345
  return None
346

    
347

    
348
def CheckOSParams(lu, required, nodenames, osname, osparams):
349
  """OS parameters validation.
350

351
  @type lu: L{LogicalUnit}
352
  @param lu: the logical unit for which we check
353
  @type required: boolean
354
  @param required: whether the validation should fail if the OS is not
355
      found
356
  @type nodenames: list
357
  @param nodenames: the list of nodes on which we should check
358
  @type osname: string
359
  @param osname: the name of the hypervisor we should use
360
  @type osparams: dict
361
  @param osparams: the parameters which we need to check
362
  @raise errors.OpPrereqError: if the parameters are not valid
363

364
  """
365
  nodenames = _FilterVmNodes(lu, nodenames)
366
  result = lu.rpc.call_os_validate(nodenames, required, osname,
367
                                   [constants.OS_VALIDATE_PARAMETERS],
368
                                   osparams)
369
  for node, nres in result.items():
370
    # we don't check for offline cases since this should be run only
371
    # against the master node and/or an instance's nodes
372
    nres.Raise("OS Parameters validation failed on node %s" % node)
373
    if not nres.payload:
374
      lu.LogInfo("OS %s not found on node %s, validation skipped",
375
                 osname, node)
376

    
377

    
378
def CheckHVParams(lu, nodenames, hvname, hvparams):
379
  """Hypervisor parameter validation.
380

381
  This function abstract the hypervisor parameter validation to be
382
  used in both instance create and instance modify.
383

384
  @type lu: L{LogicalUnit}
385
  @param lu: the logical unit for which we check
386
  @type nodenames: list
387
  @param nodenames: the list of nodes on which we should check
388
  @type hvname: string
389
  @param hvname: the name of the hypervisor we should use
390
  @type hvparams: dict
391
  @param hvparams: the parameters which we need to check
392
  @raise errors.OpPrereqError: if the parameters are not valid
393

394
  """
395
  nodenames = _FilterVmNodes(lu, nodenames)
396

    
397
  cluster = lu.cfg.GetClusterInfo()
398
  hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
399

    
400
  hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull)
401
  for node in nodenames:
402
    info = hvinfo[node]
403
    if info.offline:
404
      continue
405
    info.Raise("Hypervisor parameter validation failed on node %s" % node)
406

    
407

    
408
def AdjustCandidatePool(lu, exceptions):
409
  """Adjust the candidate pool after node operations.
410

411
  """
412
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
413
  if mod_list:
414
    lu.LogInfo("Promoted nodes to master candidate role: %s",
415
               utils.CommaJoin(node.name for node in mod_list))
416
    for name in mod_list:
417
      lu.context.ReaddNode(name)
418
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
419
  if mc_now > mc_max:
420
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
421
               (mc_now, mc_max))
422

    
423

    
424
def CheckNodePVs(nresult, exclusive_storage):
425
  """Check node PVs.
426

427
  """
428
  pvlist_dict = nresult.get(constants.NV_PVLIST, None)
429
  if pvlist_dict is None:
430
    return (["Can't get PV list from node"], None)
431
  pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
432
  errlist = []
433
  # check that ':' is not present in PV names, since it's a
434
  # special character for lvcreate (denotes the range of PEs to
435
  # use on the PV)
436
  for pv in pvlist:
437
    if ":" in pv.name:
438
      errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
439
                     (pv.name, pv.vg_name))
440
  es_pvinfo = None
441
  if exclusive_storage:
442
    (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
443
    errlist.extend(errmsgs)
444
    shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
445
    if shared_pvs:
446
      for (pvname, lvlist) in shared_pvs:
447
        # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
448
        errlist.append("PV %s is shared among unrelated LVs (%s)" %
449
                       (pvname, utils.CommaJoin(lvlist)))
450
  return (errlist, es_pvinfo)
451

    
452

    
453
def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
454
  """Computes if value is in the desired range.
455

456
  @param name: name of the parameter for which we perform the check
457
  @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
458
      not just 'disk')
459
  @param ispecs: dictionary containing min and max values
460
  @param value: actual value that we want to use
461
  @return: None or an error string
462

463
  """
464
  if value in [None, constants.VALUE_AUTO]:
465
    return None
466
  max_v = ispecs[constants.ISPECS_MAX].get(name, value)
467
  min_v = ispecs[constants.ISPECS_MIN].get(name, value)
468
  if value > max_v or min_v > value:
469
    if qualifier:
470
      fqn = "%s/%s" % (name, qualifier)
471
    else:
472
      fqn = name
473
    return ("%s value %s is not in range [%s, %s]" %
474
            (fqn, value, min_v, max_v))
475
  return None
476

    
477

    
478
def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
479
                                nic_count, disk_sizes, spindle_use,
480
                                disk_template,
481
                                _compute_fn=_ComputeMinMaxSpec):
482
  """Verifies ipolicy against provided specs.
483

484
  @type ipolicy: dict
485
  @param ipolicy: The ipolicy
486
  @type mem_size: int
487
  @param mem_size: The memory size
488
  @type cpu_count: int
489
  @param cpu_count: Used cpu cores
490
  @type disk_count: int
491
  @param disk_count: Number of disks used
492
  @type nic_count: int
493
  @param nic_count: Number of nics used
494
  @type disk_sizes: list of ints
495
  @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
496
  @type spindle_use: int
497
  @param spindle_use: The number of spindles this instance uses
498
  @type disk_template: string
499
  @param disk_template: The disk template of the instance
500
  @param _compute_fn: The compute function (unittest only)
501
  @return: A list of violations, or an empty list of no violations are found
502

503
  """
504
  assert disk_count == len(disk_sizes)
505

    
506
  test_settings = [
507
    (constants.ISPEC_MEM_SIZE, "", mem_size),
508
    (constants.ISPEC_CPU_COUNT, "", cpu_count),
509
    (constants.ISPEC_NIC_COUNT, "", nic_count),
510
    (constants.ISPEC_SPINDLE_USE, "", spindle_use),
511
    ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
512
         for idx, d in enumerate(disk_sizes)]
513
  if disk_template != constants.DT_DISKLESS:
514
    # This check doesn't make sense for diskless instances
515
    test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
516
  ret = []
517
  allowed_dts = ipolicy[constants.IPOLICY_DTS]
518
  if disk_template not in allowed_dts:
519
    ret.append("Disk template %s is not allowed (allowed templates: %s)" %
520
               (disk_template, utils.CommaJoin(allowed_dts)))
521

    
522
  min_errs = None
523
  for minmax in ipolicy[constants.ISPECS_MINMAX]:
524
    errs = filter(None,
525
                  (_compute_fn(name, qualifier, minmax, value)
526
                   for (name, qualifier, value) in test_settings))
527
    if min_errs is None or len(errs) < len(min_errs):
528
      min_errs = errs
529
  assert min_errs is not None
530
  return ret + min_errs
531

    
532

    
533
def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
534
                                    _compute_fn=ComputeIPolicySpecViolation):
535
  """Compute if instance meets the specs of ipolicy.
536

537
  @type ipolicy: dict
538
  @param ipolicy: The ipolicy to verify against
539
  @type instance: L{objects.Instance}
540
  @param instance: The instance to verify
541
  @type cfg: L{config.ConfigWriter}
542
  @param cfg: Cluster configuration
543
  @param _compute_fn: The function to verify ipolicy (unittest only)
544
  @see: L{ComputeIPolicySpecViolation}
545

546
  """
547
  ret = []
548
  be_full = cfg.GetClusterInfo().FillBE(instance)
549
  mem_size = be_full[constants.BE_MAXMEM]
550
  cpu_count = be_full[constants.BE_VCPUS]
551
  es_flags = rpc.GetExclusiveStorageForNodeNames(cfg, instance.all_nodes)
552
  if any(es_flags.values()):
553
    # With exclusive storage use the actual spindles
554
    try:
555
      spindle_use = sum([disk.spindles for disk in instance.disks])
556
    except TypeError:
557
      ret.append("Number of spindles not configured for disks of instance %s"
558
                 " while exclusive storage is enabled, try running gnt-cluster"
559
                 " repair-disk-sizes" % instance.name)
560
      # _ComputeMinMaxSpec ignores 'None's
561
      spindle_use = None
562
  else:
563
    spindle_use = be_full[constants.BE_SPINDLE_USE]
564
  disk_count = len(instance.disks)
565
  disk_sizes = [disk.size for disk in instance.disks]
566
  nic_count = len(instance.nics)
567
  disk_template = instance.disk_template
568

    
569
  return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
570
                           disk_sizes, spindle_use, disk_template)
571

    
572

    
573
def _ComputeViolatingInstances(ipolicy, instances, cfg):
574
  """Computes a set of instances who violates given ipolicy.
575

576
  @param ipolicy: The ipolicy to verify
577
  @type instances: L{objects.Instance}
578
  @param instances: List of instances to verify
579
  @type cfg: L{config.ConfigWriter}
580
  @param cfg: Cluster configuration
581
  @return: A frozenset of instance names violating the ipolicy
582

583
  """
584
  return frozenset([inst.name for inst in instances
585
                    if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
586

    
587

    
588
def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
589
  """Computes a set of any instances that would violate the new ipolicy.
590

591
  @param old_ipolicy: The current (still in-place) ipolicy
592
  @param new_ipolicy: The new (to become) ipolicy
593
  @param instances: List of instances to verify
594
  @type cfg: L{config.ConfigWriter}
595
  @param cfg: Cluster configuration
596
  @return: A list of instances which violates the new ipolicy but
597
      did not before
598

599
  """
600
  return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
601
          _ComputeViolatingInstances(old_ipolicy, instances, cfg))
602

    
603

    
604
def GetUpdatedParams(old_params, update_dict,
605
                      use_default=True, use_none=False):
606
  """Return the new version of a parameter dictionary.
607

608
  @type old_params: dict
609
  @param old_params: old parameters
610
  @type update_dict: dict
611
  @param update_dict: dict containing new parameter values, or
612
      constants.VALUE_DEFAULT to reset the parameter to its default
613
      value
614
  @param use_default: boolean
615
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
616
      values as 'to be deleted' values
617
  @param use_none: boolean
618
  @type use_none: whether to recognise C{None} values as 'to be
619
      deleted' values
620
  @rtype: dict
621
  @return: the new parameter dictionary
622

623
  """
624
  params_copy = copy.deepcopy(old_params)
625
  for key, val in update_dict.iteritems():
626
    if ((use_default and val == constants.VALUE_DEFAULT) or
627
          (use_none and val is None)):
628
      try:
629
        del params_copy[key]
630
      except KeyError:
631
        pass
632
    else:
633
      params_copy[key] = val
634
  return params_copy
635

    
636

    
637
def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
638
  """Return the new version of an instance policy.
639

640
  @param group_policy: whether this policy applies to a group and thus
641
    we should support removal of policy entries
642

643
  """
644
  ipolicy = copy.deepcopy(old_ipolicy)
645
  for key, value in new_ipolicy.items():
646
    if key not in constants.IPOLICY_ALL_KEYS:
647
      raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
648
                                 errors.ECODE_INVAL)
649
    if (not value or value == [constants.VALUE_DEFAULT] or
650
            value == constants.VALUE_DEFAULT):
651
      if group_policy:
652
        if key in ipolicy:
653
          del ipolicy[key]
654
      else:
655
        raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
656
                                   " on the cluster'" % key,
657
                                   errors.ECODE_INVAL)
658
    else:
659
      if key in constants.IPOLICY_PARAMETERS:
660
        # FIXME: we assume all such values are float
661
        try:
662
          ipolicy[key] = float(value)
663
        except (TypeError, ValueError), err:
664
          raise errors.OpPrereqError("Invalid value for attribute"
665
                                     " '%s': '%s', error: %s" %
666
                                     (key, value, err), errors.ECODE_INVAL)
667
      elif key == constants.ISPECS_MINMAX:
668
        for minmax in value:
669
          for k in minmax.keys():
670
            utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
671
        ipolicy[key] = value
672
      elif key == constants.ISPECS_STD:
673
        if group_policy:
674
          msg = "%s cannot appear in group instance specs" % key
675
          raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
676
        ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
677
                                        use_none=False, use_default=False)
678
        utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
679
      else:
680
        # FIXME: we assume all others are lists; this should be redone
681
        # in a nicer way
682
        ipolicy[key] = list(value)
683
  try:
684
    objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
685
  except errors.ConfigurationError, err:
686
    raise errors.OpPrereqError("Invalid instance policy: %s" % err,
687
                               errors.ECODE_INVAL)
688
  return ipolicy
689

    
690

    
691
def AnnotateDiskParams(instance, devs, cfg):
692
  """Little helper wrapper to the rpc annotation method.
693

694
  @param instance: The instance object
695
  @type devs: List of L{objects.Disk}
696
  @param devs: The root devices (not any of its children!)
697
  @param cfg: The config object
698
  @returns The annotated disk copies
699
  @see L{rpc.AnnotateDiskParams}
700

701
  """
702
  return rpc.AnnotateDiskParams(instance.disk_template, devs,
703
                                cfg.GetInstanceDiskParams(instance))
704

    
705

    
706
def SupportsOob(cfg, node):
707
  """Tells if node supports OOB.
708

709
  @type cfg: L{config.ConfigWriter}
710
  @param cfg: The cluster configuration
711
  @type node: L{objects.Node}
712
  @param node: The node
713
  @return: The OOB script if supported or an empty string otherwise
714

715
  """
716
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
717

    
718

    
719
def _UpdateAndVerifySubDict(base, updates, type_check):
720
  """Updates and verifies a dict with sub dicts of the same type.
721

722
  @param base: The dict with the old data
723
  @param updates: The dict with the new data
724
  @param type_check: Dict suitable to ForceDictType to verify correct types
725
  @returns: A new dict with updated and verified values
726

727
  """
728
  def fn(old, value):
729
    new = GetUpdatedParams(old, value)
730
    utils.ForceDictType(new, type_check)
731
    return new
732

    
733
  ret = copy.deepcopy(base)
734
  ret.update(dict((key, fn(base.get(key, {}), value))
735
                  for key, value in updates.items()))
736
  return ret
737

    
738

    
739
def _FilterVmNodes(lu, nodenames):
740
  """Filters out non-vm_capable nodes from a list.
741

742
  @type lu: L{LogicalUnit}
743
  @param lu: the logical unit for which we check
744
  @type nodenames: list
745
  @param nodenames: the list of nodes on which we should check
746
  @rtype: list
747
  @return: the list of vm-capable nodes
748

749
  """
750
  vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
751
  return [name for name in nodenames if name not in vm_nodes]
752

    
753

    
754
def GetDefaultIAllocator(cfg, ialloc):
755
  """Decides on which iallocator to use.
756

757
  @type cfg: L{config.ConfigWriter}
758
  @param cfg: Cluster configuration object
759
  @type ialloc: string or None
760
  @param ialloc: Iallocator specified in opcode
761
  @rtype: string
762
  @return: Iallocator name
763

764
  """
765
  if not ialloc:
766
    # Use default iallocator
767
    ialloc = cfg.GetDefaultIAllocator()
768

    
769
  if not ialloc:
770
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
771
                               " opcode nor as a cluster-wide default",
772
                               errors.ECODE_INVAL)
773

    
774
  return ialloc
775

    
776

    
777
def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
778
                             cur_group_uuid):
779
  """Checks if node groups for locked instances are still correct.
780

781
  @type cfg: L{config.ConfigWriter}
782
  @param cfg: Cluster configuration
783
  @type instances: dict; string as key, L{objects.Instance} as value
784
  @param instances: Dictionary, instance name as key, instance object as value
785
  @type owned_groups: iterable of string
786
  @param owned_groups: List of owned groups
787
  @type owned_nodes: iterable of string
788
  @param owned_nodes: List of owned nodes
789
  @type cur_group_uuid: string or None
790
  @param cur_group_uuid: Optional group UUID to check against instance's groups
791

792
  """
793
  for (name, inst) in instances.items():
794
    assert owned_nodes.issuperset(inst.all_nodes), \
795
      "Instance %s's nodes changed while we kept the lock" % name
796

    
797
    inst_groups = CheckInstanceNodeGroups(cfg, name, owned_groups)
798

    
799
    assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
800
      "Instance %s has no node in group %s" % (name, cur_group_uuid)
801

    
802

    
803
def CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
804
                            primary_only=False):
805
  """Checks if the owned node groups are still correct for an instance.
806

807
  @type cfg: L{config.ConfigWriter}
808
  @param cfg: The cluster configuration
809
  @type instance_name: string
810
  @param instance_name: Instance name
811
  @type owned_groups: set or frozenset
812
  @param owned_groups: List of currently owned node groups
813
  @type primary_only: boolean
814
  @param primary_only: Whether to check node groups for only the primary node
815

816
  """
817
  inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
818

    
819
  if not owned_groups.issuperset(inst_groups):
820
    raise errors.OpPrereqError("Instance %s's node groups changed since"
821
                               " locks were acquired, current groups are"
822
                               " are '%s', owning groups '%s'; retry the"
823
                               " operation" %
824
                               (instance_name,
825
                                utils.CommaJoin(inst_groups),
826
                                utils.CommaJoin(owned_groups)),
827
                               errors.ECODE_STATE)
828

    
829
  return inst_groups
830

    
831

    
832
def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
833
  """Unpacks the result of change-group and node-evacuate iallocator requests.
834

835
  Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
836
  L{constants.IALLOCATOR_MODE_CHG_GROUP}.
837

838
  @type lu: L{LogicalUnit}
839
  @param lu: Logical unit instance
840
  @type alloc_result: tuple/list
841
  @param alloc_result: Result from iallocator
842
  @type early_release: bool
843
  @param early_release: Whether to release locks early if possible
844
  @type use_nodes: bool
845
  @param use_nodes: Whether to display node names instead of groups
846

847
  """
848
  (moved, failed, jobs) = alloc_result
849

    
850
  if failed:
851
    failreason = utils.CommaJoin("%s (%s)" % (name, reason)
852
                                 for (name, reason) in failed)
853
    lu.LogWarning("Unable to evacuate instances %s", failreason)
854
    raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
855

    
856
  if moved:
857
    lu.LogInfo("Instances to be moved: %s",
858
               utils.CommaJoin("%s (to %s)" %
859
                               (name, _NodeEvacDest(use_nodes, group, nodes))
860
                               for (name, group, nodes) in moved))
861

    
862
  return [map(compat.partial(_SetOpEarlyRelease, early_release),
863
              map(opcodes.OpCode.LoadOpCode, ops))
864
          for ops in jobs]
865

    
866

    
867
def _NodeEvacDest(use_nodes, group, nodes):
868
  """Returns group or nodes depending on caller's choice.
869

870
  """
871
  if use_nodes:
872
    return utils.CommaJoin(nodes)
873
  else:
874
    return group
875

    
876

    
877
def _SetOpEarlyRelease(early_release, op):
878
  """Sets C{early_release} flag on opcodes if available.
879

880
  """
881
  try:
882
    op.early_release = early_release
883
  except AttributeError:
884
    assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
885

    
886
  return op
887

    
888

    
889
def MapInstanceDisksToNodes(instances):
890
  """Creates a map from (node, volume) to instance name.
891

892
  @type instances: list of L{objects.Instance}
893
  @rtype: dict; tuple of (node name, volume name) as key, instance name as value
894

895
  """
896
  return dict(((node, vol), inst.name)
897
              for inst in instances
898
              for (node, vols) in inst.MapLVsByNode().items()
899
              for vol in vols)
900

    
901

    
902
def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
903
  """Make sure that none of the given paramters is global.
904

905
  If a global parameter is found, an L{errors.OpPrereqError} exception is
906
  raised. This is used to avoid setting global parameters for individual nodes.
907

908
  @type params: dictionary
909
  @param params: Parameters to check
910
  @type glob_pars: dictionary
911
  @param glob_pars: Forbidden parameters
912
  @type kind: string
913
  @param kind: Kind of parameters (e.g. "node")
914
  @type bad_levels: string
915
  @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
916
      "instance")
917
  @type good_levels: strings
918
  @param good_levels: Level(s) at which the parameters are allowed (e.g.
919
      "cluster or group")
920

921
  """
922
  used_globals = glob_pars.intersection(params)
923
  if used_globals:
924
    msg = ("The following %s parameters are global and cannot"
925
           " be customized at %s level, please modify them at"
926
           " %s level: %s" %
927
           (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
928
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
929

    
930

    
931
def IsExclusiveStorageEnabledNode(cfg, node):
932
  """Whether exclusive_storage is in effect for the given node.
933

934
  @type cfg: L{config.ConfigWriter}
935
  @param cfg: The cluster configuration
936
  @type node: L{objects.Node}
937
  @param node: The node
938
  @rtype: bool
939
  @return: The effective value of exclusive_storage
940

941
  """
942
  return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
943

    
944

    
945
def CheckInstanceState(lu, instance, req_states, msg=None):
946
  """Ensure that an instance is in one of the required states.
947

948
  @param lu: the LU on behalf of which we make the check
949
  @param instance: the instance to check
950
  @param msg: if passed, should be a message to replace the default one
951
  @raise errors.OpPrereqError: if the instance is not in the required state
952

953
  """
954
  if msg is None:
955
    msg = ("can't use instance from outside %s states" %
956
           utils.CommaJoin(req_states))
957
  if instance.admin_state not in req_states:
958
    raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
959
                               (instance.name, instance.admin_state, msg),
960
                               errors.ECODE_STATE)
961

    
962
  if constants.ADMINST_UP not in req_states:
963
    pnode = instance.primary_node
964
    if not lu.cfg.GetNodeInfo(pnode).offline:
965
      ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
966
      ins_l.Raise("Can't contact node %s for instance information" % pnode,
967
                  prereq=True, ecode=errors.ECODE_ENVIRON)
968
      if instance.name in ins_l.payload:
969
        raise errors.OpPrereqError("Instance %s is running, %s" %
970
                                   (instance.name, msg), errors.ECODE_STATE)
971
    else:
972
      lu.LogWarning("Primary node offline, ignoring check that instance"
973
                     " is down")
974

    
975

    
976
def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
977
  """Check the sanity of iallocator and node arguments and use the
978
  cluster-wide iallocator if appropriate.
979

980
  Check that at most one of (iallocator, node) is specified. If none is
981
  specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
982
  then the LU's opcode's iallocator slot is filled with the cluster-wide
983
  default iallocator.
984

985
  @type iallocator_slot: string
986
  @param iallocator_slot: the name of the opcode iallocator slot
987
  @type node_slot: string
988
  @param node_slot: the name of the opcode target node slot
989

990
  """
991
  node = getattr(lu.op, node_slot, None)
992
  ialloc = getattr(lu.op, iallocator_slot, None)
993
  if node == []:
994
    node = None
995

    
996
  if node is not None and ialloc is not None:
997
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
998
                               errors.ECODE_INVAL)
999
  elif ((node is None and ialloc is None) or
1000
        ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1001
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1002
    if default_iallocator:
1003
      setattr(lu.op, iallocator_slot, default_iallocator)
1004
    else:
1005
      raise errors.OpPrereqError("No iallocator or node given and no"
1006
                                 " cluster-wide default iallocator found;"
1007
                                 " please specify either an iallocator or a"
1008
                                 " node, or set a cluster-wide default"
1009
                                 " iallocator", errors.ECODE_INVAL)
1010

    
1011

    
1012
def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1013
  faulty = []
1014

    
1015
  for dev in instance.disks:
1016
    cfg.SetDiskID(dev, node_name)
1017

    
1018
  result = rpc_runner.call_blockdev_getmirrorstatus(node_name,
1019
                                                    (instance.disks,
1020
                                                     instance))
1021
  result.Raise("Failed to get disk status from node %s" % node_name,
1022
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1023

    
1024
  for idx, bdev_status in enumerate(result.payload):
1025
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1026
      faulty.append(idx)
1027

    
1028
  return faulty
1029

    
1030

    
1031
def CheckNodeOnline(lu, node, msg=None):
1032
  """Ensure that a given node is online.
1033

1034
  @param lu: the LU on behalf of which we make the check
1035
  @param node: the node to check
1036
  @param msg: if passed, should be a message to replace the default one
1037
  @raise errors.OpPrereqError: if the node is offline
1038

1039
  """
1040
  if msg is None:
1041
    msg = "Can't use offline node"
1042
  if lu.cfg.GetNodeInfo(node).offline:
1043
    raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)