Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / common.py @ b691385f

History | View | Annotate | Download (36.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Common functions used by multiple logical units."""
23

    
24
import copy
25
import os
26

    
27
from ganeti import compat
28
from ganeti import constants
29
from ganeti import errors
30
from ganeti import hypervisor
31
from ganeti import locking
32
from ganeti import objects
33
from ganeti import opcodes
34
from ganeti import pathutils
35
from ganeti import rpc
36
from ganeti import ssconf
37
from ganeti import utils
38

    
39

    
40
# States of instance
41
INSTANCE_DOWN = [constants.ADMINST_DOWN]
42
INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
43
INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
44

    
45
#: Instance status in which an instance can be marked as offline/online
46
CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
47
  constants.ADMINST_OFFLINE,
48
  ]))
49

    
50

    
51
def _ExpandItemName(expand_fn, name, kind):
52
  """Expand an item name.
53

54
  @param expand_fn: the function to use for expansion
55
  @param name: requested item name
56
  @param kind: text description ('Node' or 'Instance')
57
  @return: the result of the expand_fn, if successful
58
  @raise errors.OpPrereqError: if the item is not found
59

60
  """
61
  full_name = expand_fn(name)
62
  if full_name is None:
63
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
64
                               errors.ECODE_NOENT)
65
  return full_name
66

    
67

    
68
def ExpandInstanceName(cfg, name):
69
  """Wrapper over L{_ExpandItemName} for instance."""
70
  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
71

    
72

    
73
def ExpandNodeUuidAndName(cfg, expected_uuid, name):
74
  """Expand a short node name into the node UUID and full name.
75

76
  @type cfg: L{config.ConfigWriter}
77
  @param cfg: The cluster configuration
78
  @type expected_uuid: string
79
  @param expected_uuid: expected UUID for the node (or None if there is no
80
        expectation). If it does not match, a L{errors.OpPrereqError} is
81
        raised.
82
  @type name: string
83
  @param name: the short node name
84

85
  """
86
  (uuid, full_name) = _ExpandItemName(cfg.ExpandNodeName, name, "Node")
87
  if expected_uuid is not None and uuid != expected_uuid:
88
    raise errors.OpPrereqError(
89
      "The nodes UUID '%s' does not match the expected UUID '%s' for node"
90
      " '%s'. Maybe the node changed since you submitted this job." %
91
      (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
92
  return (uuid, full_name)
93

    
94

    
95
def ShareAll():
96
  """Returns a dict declaring all lock levels shared.
97

98
  """
99
  return dict.fromkeys(locking.LEVELS, 1)
100

    
101

    
102
def CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
103
  """Checks if the instances in a node group are still correct.
104

105
  @type cfg: L{config.ConfigWriter}
106
  @param cfg: The cluster configuration
107
  @type group_uuid: string
108
  @param group_uuid: Node group UUID
109
  @type owned_instances: set or frozenset
110
  @param owned_instances: List of currently owned instances
111

112
  """
113
  wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
114
  if owned_instances != wanted_instances:
115
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
116
                               " locks were acquired, wanted '%s', have '%s';"
117
                               " retry the operation" %
118
                               (group_uuid,
119
                                utils.CommaJoin(wanted_instances),
120
                                utils.CommaJoin(owned_instances)),
121
                               errors.ECODE_STATE)
122

    
123
  return wanted_instances
124

    
125

    
126
def GetWantedNodes(lu, short_node_names):
127
  """Returns list of checked and expanded node names.
128

129
  @type lu: L{LogicalUnit}
130
  @param lu: the logical unit on whose behalf we execute
131
  @type short_node_names: list
132
  @param short_node_names: list of node names or None for all nodes
133
  @rtype: tuple of lists
134
  @return: tupe with (list of node UUIDs, list of node names)
135
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
136

137
  """
138
  if short_node_names:
139
    node_uuids = [ExpandNodeUuidAndName(lu.cfg, None, name)[0]
140
                  for name in short_node_names]
141
  else:
142
    node_uuids = lu.cfg.GetNodeList()
143

    
144
  return (node_uuids, [lu.cfg.GetNodeName(uuid) for uuid in node_uuids])
145

    
146

    
147
def GetWantedInstances(lu, instances):
148
  """Returns list of checked and expanded instance names.
149

150
  @type lu: L{LogicalUnit}
151
  @param lu: the logical unit on whose behalf we execute
152
  @type instances: list
153
  @param instances: list of instance names or None for all instances
154
  @rtype: list
155
  @return: the list of instances, sorted
156
  @raise errors.OpPrereqError: if the instances parameter is wrong type
157
  @raise errors.OpPrereqError: if any of the passed instances is not found
158

159
  """
160
  if instances:
161
    wanted = [ExpandInstanceName(lu.cfg, name) for name in instances]
162
  else:
163
    wanted = utils.NiceSort(lu.cfg.GetInstanceList())
164
  return wanted
165

    
166

    
167
def RunPostHook(lu, node_name):
168
  """Runs the post-hook for an opcode on a single node.
169

170
  """
171
  hm = lu.proc.BuildHooksManager(lu)
172
  try:
173
    hm.RunPhase(constants.HOOKS_PHASE_POST, node_names=[node_name])
174
  except Exception, err: # pylint: disable=W0703
175
    lu.LogWarning("Errors occurred running hooks on %s: %s",
176
                  node_name, err)
177

    
178

    
179
def RedistributeAncillaryFiles(lu):
180
  """Distribute additional files which are part of the cluster configuration.
181

182
  ConfigWriter takes care of distributing the config and ssconf files, but
183
  there are more files which should be distributed to all nodes. This function
184
  makes sure those are copied.
185

186
  """
187
  # Gather target nodes
188
  cluster = lu.cfg.GetClusterInfo()
189
  master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
190

    
191
  online_node_uuids = lu.cfg.GetOnlineNodeList()
192
  online_node_uuid_set = frozenset(online_node_uuids)
193
  vm_node_uuids = list(online_node_uuid_set.intersection(
194
                         lu.cfg.GetVmCapableNodeList()))
195

    
196
  # Never distribute to master node
197
  for node_uuids in [online_node_uuids, vm_node_uuids]:
198
    if master_info.uuid in node_uuids:
199
      node_uuids.remove(master_info.uuid)
200

    
201
  # Gather file lists
202
  (files_all, _, files_mc, files_vm) = \
203
    ComputeAncillaryFiles(cluster, True)
204

    
205
  # Never re-distribute configuration file from here
206
  assert not (pathutils.CLUSTER_CONF_FILE in files_all or
207
              pathutils.CLUSTER_CONF_FILE in files_vm)
208
  assert not files_mc, "Master candidates not handled in this function"
209

    
210
  filemap = [
211
    (online_node_uuids, files_all),
212
    (vm_node_uuids, files_vm),
213
    ]
214

    
215
  # Upload the files
216
  for (node_uuids, files) in filemap:
217
    for fname in files:
218
      UploadHelper(lu, node_uuids, fname)
219

    
220

    
221
def ComputeAncillaryFiles(cluster, redist):
222
  """Compute files external to Ganeti which need to be consistent.
223

224
  @type redist: boolean
225
  @param redist: Whether to include files which need to be redistributed
226

227
  """
228
  # Compute files for all nodes
229
  files_all = set([
230
    pathutils.SSH_KNOWN_HOSTS_FILE,
231
    pathutils.CONFD_HMAC_KEY,
232
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
233
    pathutils.SPICE_CERT_FILE,
234
    pathutils.SPICE_CACERT_FILE,
235
    pathutils.RAPI_USERS_FILE,
236
    ])
237

    
238
  if redist:
239
    # we need to ship at least the RAPI certificate
240
    files_all.add(pathutils.RAPI_CERT_FILE)
241
  else:
242
    files_all.update(pathutils.ALL_CERT_FILES)
243
    files_all.update(ssconf.SimpleStore().GetFileList())
244

    
245
  if cluster.modify_etc_hosts:
246
    files_all.add(pathutils.ETC_HOSTS)
247

    
248
  if cluster.use_external_mip_script:
249
    files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
250

    
251
  # Files which are optional, these must:
252
  # - be present in one other category as well
253
  # - either exist or not exist on all nodes of that category (mc, vm all)
254
  files_opt = set([
255
    pathutils.RAPI_USERS_FILE,
256
    ])
257

    
258
  # Files which should only be on master candidates
259
  files_mc = set()
260

    
261
  if not redist:
262
    files_mc.add(pathutils.CLUSTER_CONF_FILE)
263

    
264
  # File storage
265
  if (not redist and (constants.ENABLE_FILE_STORAGE or
266
                        constants.ENABLE_SHARED_FILE_STORAGE)):
267
    files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
268
    files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
269

    
270
  # Files which should only be on VM-capable nodes
271
  files_vm = set(
272
    filename
273
    for hv_name in cluster.enabled_hypervisors
274
    for filename in
275
    hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
276

    
277
  files_opt |= set(
278
    filename
279
    for hv_name in cluster.enabled_hypervisors
280
    for filename in
281
    hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
282

    
283
  # Filenames in each category must be unique
284
  all_files_set = files_all | files_mc | files_vm
285
  assert (len(all_files_set) ==
286
          sum(map(len, [files_all, files_mc, files_vm]))), \
287
    "Found file listed in more than one file list"
288

    
289
  # Optional files must be present in one other category
290
  assert all_files_set.issuperset(files_opt), \
291
    "Optional file not in a different required list"
292

    
293
  # This one file should never ever be re-distributed via RPC
294
  assert not (redist and
295
              pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
296

    
297
  return (files_all, files_opt, files_mc, files_vm)
298

    
299

    
300
def UploadHelper(lu, node_uuids, fname):
301
  """Helper for uploading a file and showing warnings.
302

303
  """
304
  if os.path.exists(fname):
305
    result = lu.rpc.call_upload_file(node_uuids, fname)
306
    for to_node_uuids, to_result in result.items():
307
      msg = to_result.fail_msg
308
      if msg:
309
        msg = ("Copy of file %s to node %s failed: %s" %
310
               (fname, lu.cfg.GetNodeName(to_node_uuids), msg))
311
        lu.LogWarning(msg)
312

    
313

    
314
def MergeAndVerifyHvState(op_input, obj_input):
315
  """Combines the hv state from an opcode with the one of the object
316

317
  @param op_input: The input dict from the opcode
318
  @param obj_input: The input dict from the objects
319
  @return: The verified and updated dict
320

321
  """
322
  if op_input:
323
    invalid_hvs = set(op_input) - constants.HYPER_TYPES
324
    if invalid_hvs:
325
      raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
326
                                 " %s" % utils.CommaJoin(invalid_hvs),
327
                                 errors.ECODE_INVAL)
328
    if obj_input is None:
329
      obj_input = {}
330
    type_check = constants.HVSTS_PARAMETER_TYPES
331
    return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
332

    
333
  return None
334

    
335

    
336
def MergeAndVerifyDiskState(op_input, obj_input):
337
  """Combines the disk state from an opcode with the one of the object
338

339
  @param op_input: The input dict from the opcode
340
  @param obj_input: The input dict from the objects
341
  @return: The verified and updated dict
342
  """
343
  if op_input:
344
    invalid_dst = set(op_input) - constants.DS_VALID_TYPES
345
    if invalid_dst:
346
      raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
347
                                 utils.CommaJoin(invalid_dst),
348
                                 errors.ECODE_INVAL)
349
    type_check = constants.DSS_PARAMETER_TYPES
350
    if obj_input is None:
351
      obj_input = {}
352
    return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
353
                                              type_check))
354
                for key, value in op_input.items())
355

    
356
  return None
357

    
358

    
359
def CheckOSParams(lu, required, node_uuids, osname, osparams):
360
  """OS parameters validation.
361

362
  @type lu: L{LogicalUnit}
363
  @param lu: the logical unit for which we check
364
  @type required: boolean
365
  @param required: whether the validation should fail if the OS is not
366
      found
367
  @type node_uuids: list
368
  @param node_uuids: the list of nodes on which we should check
369
  @type osname: string
370
  @param osname: the name of the hypervisor we should use
371
  @type osparams: dict
372
  @param osparams: the parameters which we need to check
373
  @raise errors.OpPrereqError: if the parameters are not valid
374

375
  """
376
  node_uuids = _FilterVmNodes(lu, node_uuids)
377
  result = lu.rpc.call_os_validate(node_uuids, required, osname,
378
                                   [constants.OS_VALIDATE_PARAMETERS],
379
                                   osparams)
380
  for node_uuid, nres in result.items():
381
    # we don't check for offline cases since this should be run only
382
    # against the master node and/or an instance's nodes
383
    nres.Raise("OS Parameters validation failed on node %s" %
384
               lu.cfg.GetNodeName(node_uuid))
385
    if not nres.payload:
386
      lu.LogInfo("OS %s not found on node %s, validation skipped",
387
                 osname, lu.cfg.GetNodeName(node_uuid))
388

    
389

    
390
def CheckHVParams(lu, node_uuids, hvname, hvparams):
391
  """Hypervisor parameter validation.
392

393
  This function abstract the hypervisor parameter validation to be
394
  used in both instance create and instance modify.
395

396
  @type lu: L{LogicalUnit}
397
  @param lu: the logical unit for which we check
398
  @type node_uuids: list
399
  @param node_uuids: the list of nodes on which we should check
400
  @type hvname: string
401
  @param hvname: the name of the hypervisor we should use
402
  @type hvparams: dict
403
  @param hvparams: the parameters which we need to check
404
  @raise errors.OpPrereqError: if the parameters are not valid
405

406
  """
407
  node_uuids = _FilterVmNodes(lu, node_uuids)
408

    
409
  cluster = lu.cfg.GetClusterInfo()
410
  hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
411

    
412
  hvinfo = lu.rpc.call_hypervisor_validate_params(node_uuids, hvname, hvfull)
413
  for node_uuid in node_uuids:
414
    info = hvinfo[node_uuid]
415
    if info.offline:
416
      continue
417
    info.Raise("Hypervisor parameter validation failed on node %s" %
418
               lu.cfg.GetNodeName(node_uuid))
419

    
420

    
421
def AdjustCandidatePool(lu, exceptions):
422
  """Adjust the candidate pool after node operations.
423

424
  """
425
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
426
  if mod_list:
427
    lu.LogInfo("Promoted nodes to master candidate role: %s",
428
               utils.CommaJoin(node.name for node in mod_list))
429
    for node in mod_list:
430
      lu.context.ReaddNode(node)
431
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
432
  if mc_now > mc_max:
433
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
434
               (mc_now, mc_max))
435

    
436

    
437
def CheckNodePVs(nresult, exclusive_storage):
438
  """Check node PVs.
439

440
  """
441
  pvlist_dict = nresult.get(constants.NV_PVLIST, None)
442
  if pvlist_dict is None:
443
    return (["Can't get PV list from node"], None)
444
  pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
445
  errlist = []
446
  # check that ':' is not present in PV names, since it's a
447
  # special character for lvcreate (denotes the range of PEs to
448
  # use on the PV)
449
  for pv in pvlist:
450
    if ":" in pv.name:
451
      errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
452
                     (pv.name, pv.vg_name))
453
  es_pvinfo = None
454
  if exclusive_storage:
455
    (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
456
    errlist.extend(errmsgs)
457
    shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
458
    if shared_pvs:
459
      for (pvname, lvlist) in shared_pvs:
460
        # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
461
        errlist.append("PV %s is shared among unrelated LVs (%s)" %
462
                       (pvname, utils.CommaJoin(lvlist)))
463
  return (errlist, es_pvinfo)
464

    
465

    
466
def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
467
  """Computes if value is in the desired range.
468

469
  @param name: name of the parameter for which we perform the check
470
  @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
471
      not just 'disk')
472
  @param ispecs: dictionary containing min and max values
473
  @param value: actual value that we want to use
474
  @return: None or an error string
475

476
  """
477
  if value in [None, constants.VALUE_AUTO]:
478
    return None
479
  max_v = ispecs[constants.ISPECS_MAX].get(name, value)
480
  min_v = ispecs[constants.ISPECS_MIN].get(name, value)
481
  if value > max_v or min_v > value:
482
    if qualifier:
483
      fqn = "%s/%s" % (name, qualifier)
484
    else:
485
      fqn = name
486
    return ("%s value %s is not in range [%s, %s]" %
487
            (fqn, value, min_v, max_v))
488
  return None
489

    
490

    
491
def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
492
                                nic_count, disk_sizes, spindle_use,
493
                                disk_template,
494
                                _compute_fn=_ComputeMinMaxSpec):
495
  """Verifies ipolicy against provided specs.
496

497
  @type ipolicy: dict
498
  @param ipolicy: The ipolicy
499
  @type mem_size: int
500
  @param mem_size: The memory size
501
  @type cpu_count: int
502
  @param cpu_count: Used cpu cores
503
  @type disk_count: int
504
  @param disk_count: Number of disks used
505
  @type nic_count: int
506
  @param nic_count: Number of nics used
507
  @type disk_sizes: list of ints
508
  @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
509
  @type spindle_use: int
510
  @param spindle_use: The number of spindles this instance uses
511
  @type disk_template: string
512
  @param disk_template: The disk template of the instance
513
  @param _compute_fn: The compute function (unittest only)
514
  @return: A list of violations, or an empty list of no violations are found
515

516
  """
517
  assert disk_count == len(disk_sizes)
518

    
519
  test_settings = [
520
    (constants.ISPEC_MEM_SIZE, "", mem_size),
521
    (constants.ISPEC_CPU_COUNT, "", cpu_count),
522
    (constants.ISPEC_NIC_COUNT, "", nic_count),
523
    (constants.ISPEC_SPINDLE_USE, "", spindle_use),
524
    ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
525
         for idx, d in enumerate(disk_sizes)]
526
  if disk_template != constants.DT_DISKLESS:
527
    # This check doesn't make sense for diskless instances
528
    test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
529
  ret = []
530
  allowed_dts = ipolicy[constants.IPOLICY_DTS]
531
  if disk_template not in allowed_dts:
532
    ret.append("Disk template %s is not allowed (allowed templates: %s)" %
533
               (disk_template, utils.CommaJoin(allowed_dts)))
534

    
535
  min_errs = None
536
  for minmax in ipolicy[constants.ISPECS_MINMAX]:
537
    errs = filter(None,
538
                  (_compute_fn(name, qualifier, minmax, value)
539
                   for (name, qualifier, value) in test_settings))
540
    if min_errs is None or len(errs) < len(min_errs):
541
      min_errs = errs
542
  assert min_errs is not None
543
  return ret + min_errs
544

    
545

    
546
def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
547
                                    _compute_fn=ComputeIPolicySpecViolation):
548
  """Compute if instance meets the specs of ipolicy.
549

550
  @type ipolicy: dict
551
  @param ipolicy: The ipolicy to verify against
552
  @type instance: L{objects.Instance}
553
  @param instance: The instance to verify
554
  @type cfg: L{config.ConfigWriter}
555
  @param cfg: Cluster configuration
556
  @param _compute_fn: The function to verify ipolicy (unittest only)
557
  @see: L{ComputeIPolicySpecViolation}
558

559
  """
560
  ret = []
561
  be_full = cfg.GetClusterInfo().FillBE(instance)
562
  mem_size = be_full[constants.BE_MAXMEM]
563
  cpu_count = be_full[constants.BE_VCPUS]
564
  es_flags = rpc.GetExclusiveStorageForNodes(cfg, instance.all_nodes)
565
  if any(es_flags.values()):
566
    # With exclusive storage use the actual spindles
567
    try:
568
      spindle_use = sum([disk.spindles for disk in instance.disks])
569
    except TypeError:
570
      ret.append("Number of spindles not configured for disks of instance %s"
571
                 " while exclusive storage is enabled, try running gnt-cluster"
572
                 " repair-disk-sizes" % instance.name)
573
      # _ComputeMinMaxSpec ignores 'None's
574
      spindle_use = None
575
  else:
576
    spindle_use = be_full[constants.BE_SPINDLE_USE]
577
  disk_count = len(instance.disks)
578
  disk_sizes = [disk.size for disk in instance.disks]
579
  nic_count = len(instance.nics)
580
  disk_template = instance.disk_template
581

    
582
  return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
583
                           disk_sizes, spindle_use, disk_template)
584

    
585

    
586
def _ComputeViolatingInstances(ipolicy, instances, cfg):
587
  """Computes a set of instances who violates given ipolicy.
588

589
  @param ipolicy: The ipolicy to verify
590
  @type instances: L{objects.Instance}
591
  @param instances: List of instances to verify
592
  @type cfg: L{config.ConfigWriter}
593
  @param cfg: Cluster configuration
594
  @return: A frozenset of instance names violating the ipolicy
595

596
  """
597
  return frozenset([inst.name for inst in instances
598
                    if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
599

    
600

    
601
def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
602
  """Computes a set of any instances that would violate the new ipolicy.
603

604
  @param old_ipolicy: The current (still in-place) ipolicy
605
  @param new_ipolicy: The new (to become) ipolicy
606
  @param instances: List of instances to verify
607
  @type cfg: L{config.ConfigWriter}
608
  @param cfg: Cluster configuration
609
  @return: A list of instances which violates the new ipolicy but
610
      did not before
611

612
  """
613
  return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
614
          _ComputeViolatingInstances(old_ipolicy, instances, cfg))
615

    
616

    
617
def GetUpdatedParams(old_params, update_dict,
618
                      use_default=True, use_none=False):
619
  """Return the new version of a parameter dictionary.
620

621
  @type old_params: dict
622
  @param old_params: old parameters
623
  @type update_dict: dict
624
  @param update_dict: dict containing new parameter values, or
625
      constants.VALUE_DEFAULT to reset the parameter to its default
626
      value
627
  @param use_default: boolean
628
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
629
      values as 'to be deleted' values
630
  @param use_none: boolean
631
  @type use_none: whether to recognise C{None} values as 'to be
632
      deleted' values
633
  @rtype: dict
634
  @return: the new parameter dictionary
635

636
  """
637
  params_copy = copy.deepcopy(old_params)
638
  for key, val in update_dict.iteritems():
639
    if ((use_default and val == constants.VALUE_DEFAULT) or
640
          (use_none and val is None)):
641
      try:
642
        del params_copy[key]
643
      except KeyError:
644
        pass
645
    else:
646
      params_copy[key] = val
647
  return params_copy
648

    
649

    
650
def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
651
  """Return the new version of an instance policy.
652

653
  @param group_policy: whether this policy applies to a group and thus
654
    we should support removal of policy entries
655

656
  """
657
  ipolicy = copy.deepcopy(old_ipolicy)
658
  for key, value in new_ipolicy.items():
659
    if key not in constants.IPOLICY_ALL_KEYS:
660
      raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
661
                                 errors.ECODE_INVAL)
662
    if (not value or value == [constants.VALUE_DEFAULT] or
663
            value == constants.VALUE_DEFAULT):
664
      if group_policy:
665
        if key in ipolicy:
666
          del ipolicy[key]
667
      else:
668
        raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
669
                                   " on the cluster'" % key,
670
                                   errors.ECODE_INVAL)
671
    else:
672
      if key in constants.IPOLICY_PARAMETERS:
673
        # FIXME: we assume all such values are float
674
        try:
675
          ipolicy[key] = float(value)
676
        except (TypeError, ValueError), err:
677
          raise errors.OpPrereqError("Invalid value for attribute"
678
                                     " '%s': '%s', error: %s" %
679
                                     (key, value, err), errors.ECODE_INVAL)
680
      elif key == constants.ISPECS_MINMAX:
681
        for minmax in value:
682
          for k in minmax.keys():
683
            utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
684
        ipolicy[key] = value
685
      elif key == constants.ISPECS_STD:
686
        if group_policy:
687
          msg = "%s cannot appear in group instance specs" % key
688
          raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
689
        ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
690
                                        use_none=False, use_default=False)
691
        utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
692
      else:
693
        # FIXME: we assume all others are lists; this should be redone
694
        # in a nicer way
695
        ipolicy[key] = list(value)
696
  try:
697
    objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
698
  except errors.ConfigurationError, err:
699
    raise errors.OpPrereqError("Invalid instance policy: %s" % err,
700
                               errors.ECODE_INVAL)
701
  return ipolicy
702

    
703

    
704
def AnnotateDiskParams(instance, devs, cfg):
705
  """Little helper wrapper to the rpc annotation method.
706

707
  @param instance: The instance object
708
  @type devs: List of L{objects.Disk}
709
  @param devs: The root devices (not any of its children!)
710
  @param cfg: The config object
711
  @returns The annotated disk copies
712
  @see L{rpc.AnnotateDiskParams}
713

714
  """
715
  return rpc.AnnotateDiskParams(instance.disk_template, devs,
716
                                cfg.GetInstanceDiskParams(instance))
717

    
718

    
719
def SupportsOob(cfg, node):
720
  """Tells if node supports OOB.
721

722
  @type cfg: L{config.ConfigWriter}
723
  @param cfg: The cluster configuration
724
  @type node: L{objects.Node}
725
  @param node: The node
726
  @return: The OOB script if supported or an empty string otherwise
727

728
  """
729
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
730

    
731

    
732
def _UpdateAndVerifySubDict(base, updates, type_check):
733
  """Updates and verifies a dict with sub dicts of the same type.
734

735
  @param base: The dict with the old data
736
  @param updates: The dict with the new data
737
  @param type_check: Dict suitable to ForceDictType to verify correct types
738
  @returns: A new dict with updated and verified values
739

740
  """
741
  def fn(old, value):
742
    new = GetUpdatedParams(old, value)
743
    utils.ForceDictType(new, type_check)
744
    return new
745

    
746
  ret = copy.deepcopy(base)
747
  ret.update(dict((key, fn(base.get(key, {}), value))
748
                  for key, value in updates.items()))
749
  return ret
750

    
751

    
752
def _FilterVmNodes(lu, node_uuids):
753
  """Filters out non-vm_capable nodes from a list.
754

755
  @type lu: L{LogicalUnit}
756
  @param lu: the logical unit for which we check
757
  @type node_uuids: list
758
  @param node_uuids: the list of nodes on which we should check
759
  @rtype: list
760
  @return: the list of vm-capable nodes
761

762
  """
763
  vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
764
  return [uuid for uuid in node_uuids if uuid not in vm_nodes]
765

    
766

    
767
def GetDefaultIAllocator(cfg, ialloc):
768
  """Decides on which iallocator to use.
769

770
  @type cfg: L{config.ConfigWriter}
771
  @param cfg: Cluster configuration object
772
  @type ialloc: string or None
773
  @param ialloc: Iallocator specified in opcode
774
  @rtype: string
775
  @return: Iallocator name
776

777
  """
778
  if not ialloc:
779
    # Use default iallocator
780
    ialloc = cfg.GetDefaultIAllocator()
781

    
782
  if not ialloc:
783
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
784
                               " opcode nor as a cluster-wide default",
785
                               errors.ECODE_INVAL)
786

    
787
  return ialloc
788

    
789

    
790
def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_node_uuids,
791
                             cur_group_uuid):
792
  """Checks if node groups for locked instances are still correct.
793

794
  @type cfg: L{config.ConfigWriter}
795
  @param cfg: Cluster configuration
796
  @type instances: dict; string as key, L{objects.Instance} as value
797
  @param instances: Dictionary, instance name as key, instance object as value
798
  @type owned_groups: iterable of string
799
  @param owned_groups: List of owned groups
800
  @type owned_node_uuids: iterable of string
801
  @param owned_node_uuids: List of owned nodes
802
  @type cur_group_uuid: string or None
803
  @param cur_group_uuid: Optional group UUID to check against instance's groups
804

805
  """
806
  for (name, inst) in instances.items():
807
    assert owned_node_uuids.issuperset(inst.all_nodes), \
808
      "Instance %s's nodes changed while we kept the lock" % name
809

    
810
    inst_groups = CheckInstanceNodeGroups(cfg, name, owned_groups)
811

    
812
    assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
813
      "Instance %s has no node in group %s" % (name, cur_group_uuid)
814

    
815

    
816
def CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
817
                            primary_only=False):
818
  """Checks if the owned node groups are still correct for an instance.
819

820
  @type cfg: L{config.ConfigWriter}
821
  @param cfg: The cluster configuration
822
  @type instance_name: string
823
  @param instance_name: Instance name
824
  @type owned_groups: set or frozenset
825
  @param owned_groups: List of currently owned node groups
826
  @type primary_only: boolean
827
  @param primary_only: Whether to check node groups for only the primary node
828

829
  """
830
  inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
831

    
832
  if not owned_groups.issuperset(inst_groups):
833
    raise errors.OpPrereqError("Instance %s's node groups changed since"
834
                               " locks were acquired, current groups are"
835
                               " are '%s', owning groups '%s'; retry the"
836
                               " operation" %
837
                               (instance_name,
838
                                utils.CommaJoin(inst_groups),
839
                                utils.CommaJoin(owned_groups)),
840
                               errors.ECODE_STATE)
841

    
842
  return inst_groups
843

    
844

    
845
def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
846
  """Unpacks the result of change-group and node-evacuate iallocator requests.
847

848
  Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
849
  L{constants.IALLOCATOR_MODE_CHG_GROUP}.
850

851
  @type lu: L{LogicalUnit}
852
  @param lu: Logical unit instance
853
  @type alloc_result: tuple/list
854
  @param alloc_result: Result from iallocator
855
  @type early_release: bool
856
  @param early_release: Whether to release locks early if possible
857
  @type use_nodes: bool
858
  @param use_nodes: Whether to display node names instead of groups
859

860
  """
861
  (moved, failed, jobs) = alloc_result
862

    
863
  if failed:
864
    failreason = utils.CommaJoin("%s (%s)" % (name, reason)
865
                                 for (name, reason) in failed)
866
    lu.LogWarning("Unable to evacuate instances %s", failreason)
867
    raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
868

    
869
  if moved:
870
    lu.LogInfo("Instances to be moved: %s",
871
               utils.CommaJoin(
872
                 "%s (to %s)" %
873
                 (name, _NodeEvacDest(use_nodes, group, node_names))
874
                 for (name, group, node_names) in moved))
875

    
876
  return [map(compat.partial(_SetOpEarlyRelease, early_release),
877
              map(opcodes.OpCode.LoadOpCode, ops))
878
          for ops in jobs]
879

    
880

    
881
def _NodeEvacDest(use_nodes, group, node_names):
882
  """Returns group or nodes depending on caller's choice.
883

884
  """
885
  if use_nodes:
886
    return utils.CommaJoin(node_names)
887
  else:
888
    return group
889

    
890

    
891
def _SetOpEarlyRelease(early_release, op):
892
  """Sets C{early_release} flag on opcodes if available.
893

894
  """
895
  try:
896
    op.early_release = early_release
897
  except AttributeError:
898
    assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
899

    
900
  return op
901

    
902

    
903
def MapInstanceDisksToNodes(instances):
904
  """Creates a map from (node, volume) to instance name.
905

906
  @type instances: list of L{objects.Instance}
907
  @rtype: dict; tuple of (node uuid, volume name) as key, instance name as value
908

909
  """
910
  return dict(((node_uuid, vol), inst.name)
911
              for inst in instances
912
              for (node_uuid, vols) in inst.MapLVsByNode().items()
913
              for vol in vols)
914

    
915

    
916
def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
917
  """Make sure that none of the given paramters is global.
918

919
  If a global parameter is found, an L{errors.OpPrereqError} exception is
920
  raised. This is used to avoid setting global parameters for individual nodes.
921

922
  @type params: dictionary
923
  @param params: Parameters to check
924
  @type glob_pars: dictionary
925
  @param glob_pars: Forbidden parameters
926
  @type kind: string
927
  @param kind: Kind of parameters (e.g. "node")
928
  @type bad_levels: string
929
  @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
930
      "instance")
931
  @type good_levels: strings
932
  @param good_levels: Level(s) at which the parameters are allowed (e.g.
933
      "cluster or group")
934

935
  """
936
  used_globals = glob_pars.intersection(params)
937
  if used_globals:
938
    msg = ("The following %s parameters are global and cannot"
939
           " be customized at %s level, please modify them at"
940
           " %s level: %s" %
941
           (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
942
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
943

    
944

    
945
def IsExclusiveStorageEnabledNode(cfg, node):
946
  """Whether exclusive_storage is in effect for the given node.
947

948
  @type cfg: L{config.ConfigWriter}
949
  @param cfg: The cluster configuration
950
  @type node: L{objects.Node}
951
  @param node: The node
952
  @rtype: bool
953
  @return: The effective value of exclusive_storage
954

955
  """
956
  return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
957

    
958

    
959
def CheckInstanceState(lu, instance, req_states, msg=None):
960
  """Ensure that an instance is in one of the required states.
961

962
  @param lu: the LU on behalf of which we make the check
963
  @param instance: the instance to check
964
  @param msg: if passed, should be a message to replace the default one
965
  @raise errors.OpPrereqError: if the instance is not in the required state
966

967
  """
968
  if msg is None:
969
    msg = ("can't use instance from outside %s states" %
970
           utils.CommaJoin(req_states))
971
  if instance.admin_state not in req_states:
972
    raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
973
                               (instance.name, instance.admin_state, msg),
974
                               errors.ECODE_STATE)
975

    
976
  if constants.ADMINST_UP not in req_states:
977
    pnode_uuid = instance.primary_node
978
    if not lu.cfg.GetNodeInfo(pnode_uuid).offline:
979
      all_hvparams = lu.cfg.GetClusterInfo().hvparams
980
      ins_l = lu.rpc.call_instance_list(
981
                [pnode_uuid], [instance.hypervisor], all_hvparams)[pnode_uuid]
982
      ins_l.Raise("Can't contact node %s for instance information" %
983
                  lu.cfg.GetNodeName(pnode_uuid),
984
                  prereq=True, ecode=errors.ECODE_ENVIRON)
985
      if instance.name in ins_l.payload:
986
        raise errors.OpPrereqError("Instance %s is running, %s" %
987
                                   (instance.name, msg), errors.ECODE_STATE)
988
    else:
989
      lu.LogWarning("Primary node offline, ignoring check that instance"
990
                     " is down")
991

    
992

    
993
def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
994
  """Check the sanity of iallocator and node arguments and use the
995
  cluster-wide iallocator if appropriate.
996

997
  Check that at most one of (iallocator, node) is specified. If none is
998
  specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
999
  then the LU's opcode's iallocator slot is filled with the cluster-wide
1000
  default iallocator.
1001

1002
  @type iallocator_slot: string
1003
  @param iallocator_slot: the name of the opcode iallocator slot
1004
  @type node_slot: string
1005
  @param node_slot: the name of the opcode target node slot
1006

1007
  """
1008
  node = getattr(lu.op, node_slot, None)
1009
  ialloc = getattr(lu.op, iallocator_slot, None)
1010
  if node == []:
1011
    node = None
1012

    
1013
  if node is not None and ialloc is not None:
1014
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1015
                               errors.ECODE_INVAL)
1016
  elif ((node is None and ialloc is None) or
1017
        ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1018
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1019
    if default_iallocator:
1020
      setattr(lu.op, iallocator_slot, default_iallocator)
1021
    else:
1022
      raise errors.OpPrereqError("No iallocator or node given and no"
1023
                                 " cluster-wide default iallocator found;"
1024
                                 " please specify either an iallocator or a"
1025
                                 " node, or set a cluster-wide default"
1026
                                 " iallocator", errors.ECODE_INVAL)
1027

    
1028

    
1029
def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_uuid, prereq):
1030
  faulty = []
1031

    
1032
  for dev in instance.disks:
1033
    cfg.SetDiskID(dev, node_uuid)
1034

    
1035
  result = rpc_runner.call_blockdev_getmirrorstatus(
1036
             node_uuid, (instance.disks, instance))
1037
  result.Raise("Failed to get disk status from node %s" %
1038
               cfg.GetNodeName(node_uuid),
1039
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1040

    
1041
  for idx, bdev_status in enumerate(result.payload):
1042
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1043
      faulty.append(idx)
1044

    
1045
  return faulty
1046

    
1047

    
1048
def CheckNodeOnline(lu, node_uuid, msg=None):
1049
  """Ensure that a given node is online.
1050

1051
  @param lu: the LU on behalf of which we make the check
1052
  @param node_uuid: the node to check
1053
  @param msg: if passed, should be a message to replace the default one
1054
  @raise errors.OpPrereqError: if the node is offline
1055

1056
  """
1057
  if msg is None:
1058
    msg = "Can't use offline node"
1059
  if lu.cfg.GetNodeInfo(node_uuid).offline:
1060
    raise errors.OpPrereqError("%s: %s" % (msg, lu.cfg.GetNodeName(node_uuid)),
1061
                               errors.ECODE_STATE)