Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / common.py @ 28756f80

History | View | Annotate | Download (44.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Common functions used by multiple logical units."""
23

    
24
import copy
25
import os
26

    
27
from ganeti import compat
28
from ganeti import constants
29
from ganeti import errors
30
from ganeti import hypervisor
31
from ganeti import locking
32
from ganeti import objects
33
from ganeti import opcodes
34
from ganeti import pathutils
35
import ganeti.rpc.node as rpc
36
from ganeti import ssconf
37
from ganeti import utils
38

    
39

    
40
# States of instance
41
INSTANCE_DOWN = [constants.ADMINST_DOWN]
42
INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
43
INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
44

    
45
#: Instance status in which an instance can be marked as offline/online
46
CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
47
  constants.ADMINST_OFFLINE,
48
  ]))
49

    
50

    
51
def _ExpandItemName(expand_fn, name, kind):
52
  """Expand an item name.
53

54
  @param expand_fn: the function to use for expansion
55
  @param name: requested item name
56
  @param kind: text description ('Node' or 'Instance')
57
  @return: the result of the expand_fn, if successful
58
  @raise errors.OpPrereqError: if the item is not found
59

60
  """
61
  (uuid, full_name) = expand_fn(name)
62
  if uuid is None or full_name is None:
63
    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
64
                               errors.ECODE_NOENT)
65
  return (uuid, full_name)
66

    
67

    
68
def ExpandInstanceUuidAndName(cfg, expected_uuid, name):
69
  """Wrapper over L{_ExpandItemName} for instance."""
70
  (uuid, full_name) = _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
71
  if expected_uuid is not None and uuid != expected_uuid:
72
    raise errors.OpPrereqError(
73
      "The instances UUID '%s' does not match the expected UUID '%s' for"
74
      " instance '%s'. Maybe the instance changed since you submitted this"
75
      " job." % (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
76
  return (uuid, full_name)
77

    
78

    
79
def ExpandNodeUuidAndName(cfg, expected_uuid, name):
80
  """Expand a short node name into the node UUID and full name.
81

82
  @type cfg: L{config.ConfigWriter}
83
  @param cfg: The cluster configuration
84
  @type expected_uuid: string
85
  @param expected_uuid: expected UUID for the node (or None if there is no
86
        expectation). If it does not match, a L{errors.OpPrereqError} is
87
        raised.
88
  @type name: string
89
  @param name: the short node name
90

91
  """
92
  (uuid, full_name) = _ExpandItemName(cfg.ExpandNodeName, name, "Node")
93
  if expected_uuid is not None and uuid != expected_uuid:
94
    raise errors.OpPrereqError(
95
      "The nodes UUID '%s' does not match the expected UUID '%s' for node"
96
      " '%s'. Maybe the node changed since you submitted this job." %
97
      (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
98
  return (uuid, full_name)
99

    
100

    
101
def ShareAll():
102
  """Returns a dict declaring all lock levels shared.
103

104
  """
105
  return dict.fromkeys(locking.LEVELS, 1)
106

    
107

    
108
def CheckNodeGroupInstances(cfg, group_uuid, owned_instance_names):
109
  """Checks if the instances in a node group are still correct.
110

111
  @type cfg: L{config.ConfigWriter}
112
  @param cfg: The cluster configuration
113
  @type group_uuid: string
114
  @param group_uuid: Node group UUID
115
  @type owned_instance_names: set or frozenset
116
  @param owned_instance_names: List of currently owned instances
117

118
  """
119
  wanted_instances = frozenset(cfg.GetInstanceNames(
120
                                 cfg.GetNodeGroupInstances(group_uuid)))
121
  if owned_instance_names != wanted_instances:
122
    raise errors.OpPrereqError("Instances in node group '%s' changed since"
123
                               " locks were acquired, wanted '%s', have '%s';"
124
                               " retry the operation" %
125
                               (group_uuid,
126
                                utils.CommaJoin(wanted_instances),
127
                                utils.CommaJoin(owned_instance_names)),
128
                               errors.ECODE_STATE)
129

    
130
  return wanted_instances
131

    
132

    
133
def GetWantedNodes(lu, short_node_names):
134
  """Returns list of checked and expanded node names.
135

136
  @type lu: L{LogicalUnit}
137
  @param lu: the logical unit on whose behalf we execute
138
  @type short_node_names: list
139
  @param short_node_names: list of node names or None for all nodes
140
  @rtype: tuple of lists
141
  @return: tupe with (list of node UUIDs, list of node names)
142
  @raise errors.ProgrammerError: if the nodes parameter is wrong type
143

144
  """
145
  if short_node_names:
146
    node_uuids = [ExpandNodeUuidAndName(lu.cfg, None, name)[0]
147
                  for name in short_node_names]
148
  else:
149
    node_uuids = lu.cfg.GetNodeList()
150

    
151
  return (node_uuids, [lu.cfg.GetNodeName(uuid) for uuid in node_uuids])
152

    
153

    
154
def GetWantedInstances(lu, short_inst_names):
155
  """Returns list of checked and expanded instance names.
156

157
  @type lu: L{LogicalUnit}
158
  @param lu: the logical unit on whose behalf we execute
159
  @type short_inst_names: list
160
  @param short_inst_names: list of instance names or None for all instances
161
  @rtype: tuple of lists
162
  @return: tuple of (instance UUIDs, instance names)
163
  @raise errors.OpPrereqError: if the instances parameter is wrong type
164
  @raise errors.OpPrereqError: if any of the passed instances is not found
165

166
  """
167
  if short_inst_names:
168
    inst_uuids = [ExpandInstanceUuidAndName(lu.cfg, None, name)[0]
169
                  for name in short_inst_names]
170
  else:
171
    inst_uuids = lu.cfg.GetInstanceList()
172
  return (inst_uuids, [lu.cfg.GetInstanceName(uuid) for uuid in inst_uuids])
173

    
174

    
175
def RunPostHook(lu, node_name):
176
  """Runs the post-hook for an opcode on a single node.
177

178
  """
179
  hm = lu.proc.BuildHooksManager(lu)
180
  try:
181
    hm.RunPhase(constants.HOOKS_PHASE_POST, node_names=[node_name])
182
  except Exception, err: # pylint: disable=W0703
183
    lu.LogWarning("Errors occurred running hooks on %s: %s",
184
                  node_name, err)
185

    
186

    
187
def RedistributeAncillaryFiles(lu):
188
  """Distribute additional files which are part of the cluster configuration.
189

190
  ConfigWriter takes care of distributing the config and ssconf files, but
191
  there are more files which should be distributed to all nodes. This function
192
  makes sure those are copied.
193

194
  """
195
  # Gather target nodes
196
  cluster = lu.cfg.GetClusterInfo()
197
  master_info = lu.cfg.GetMasterNodeInfo()
198

    
199
  online_node_uuids = lu.cfg.GetOnlineNodeList()
200
  online_node_uuid_set = frozenset(online_node_uuids)
201
  vm_node_uuids = list(online_node_uuid_set.intersection(
202
                         lu.cfg.GetVmCapableNodeList()))
203

    
204
  # Never distribute to master node
205
  for node_uuids in [online_node_uuids, vm_node_uuids]:
206
    if master_info.uuid in node_uuids:
207
      node_uuids.remove(master_info.uuid)
208

    
209
  # Gather file lists
210
  (files_all, _, files_mc, files_vm) = \
211
    ComputeAncillaryFiles(cluster, True)
212

    
213
  # Never re-distribute configuration file from here
214
  assert not (pathutils.CLUSTER_CONF_FILE in files_all or
215
              pathutils.CLUSTER_CONF_FILE in files_vm)
216
  assert not files_mc, "Master candidates not handled in this function"
217

    
218
  filemap = [
219
    (online_node_uuids, files_all),
220
    (vm_node_uuids, files_vm),
221
    ]
222

    
223
  # Upload the files
224
  for (node_uuids, files) in filemap:
225
    for fname in files:
226
      UploadHelper(lu, node_uuids, fname)
227

    
228

    
229
def ComputeAncillaryFiles(cluster, redist):
230
  """Compute files external to Ganeti which need to be consistent.
231

232
  @type redist: boolean
233
  @param redist: Whether to include files which need to be redistributed
234

235
  """
236
  # Compute files for all nodes
237
  files_all = set([
238
    pathutils.SSH_KNOWN_HOSTS_FILE,
239
    pathutils.CONFD_HMAC_KEY,
240
    pathutils.CLUSTER_DOMAIN_SECRET_FILE,
241
    pathutils.SPICE_CERT_FILE,
242
    pathutils.SPICE_CACERT_FILE,
243
    pathutils.RAPI_USERS_FILE,
244
    ])
245

    
246
  if redist:
247
    # we need to ship at least the RAPI certificate
248
    files_all.add(pathutils.RAPI_CERT_FILE)
249
  else:
250
    files_all.update(pathutils.ALL_CERT_FILES)
251
    files_all.update(ssconf.SimpleStore().GetFileList())
252

    
253
  if cluster.modify_etc_hosts:
254
    files_all.add(pathutils.ETC_HOSTS)
255

    
256
  if cluster.use_external_mip_script:
257
    files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
258

    
259
  # Files which are optional, these must:
260
  # - be present in one other category as well
261
  # - either exist or not exist on all nodes of that category (mc, vm all)
262
  files_opt = set([
263
    pathutils.RAPI_USERS_FILE,
264
    ])
265

    
266
  # Files which should only be on master candidates
267
  files_mc = set()
268

    
269
  if not redist:
270
    files_mc.add(pathutils.CLUSTER_CONF_FILE)
271

    
272
  # File storage
273
  if (not redist and (cluster.IsFileStorageEnabled() or
274
                        cluster.IsSharedFileStorageEnabled())):
275
    files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
276
    files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
277

    
278
  # Files which should only be on VM-capable nodes
279
  files_vm = set(
280
    filename
281
    for hv_name in cluster.enabled_hypervisors
282
    for filename in
283
    hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
284

    
285
  files_opt |= set(
286
    filename
287
    for hv_name in cluster.enabled_hypervisors
288
    for filename in
289
    hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
290

    
291
  # Filenames in each category must be unique
292
  all_files_set = files_all | files_mc | files_vm
293
  assert (len(all_files_set) ==
294
          sum(map(len, [files_all, files_mc, files_vm]))), \
295
    "Found file listed in more than one file list"
296

    
297
  # Optional files must be present in one other category
298
  assert all_files_set.issuperset(files_opt), \
299
    "Optional file not in a different required list"
300

    
301
  # This one file should never ever be re-distributed via RPC
302
  assert not (redist and
303
              pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
304

    
305
  return (files_all, files_opt, files_mc, files_vm)
306

    
307

    
308
def UploadHelper(lu, node_uuids, fname):
309
  """Helper for uploading a file and showing warnings.
310

311
  """
312
  if os.path.exists(fname):
313
    result = lu.rpc.call_upload_file(node_uuids, fname)
314
    for to_node_uuids, to_result in result.items():
315
      msg = to_result.fail_msg
316
      if msg:
317
        msg = ("Copy of file %s to node %s failed: %s" %
318
               (fname, lu.cfg.GetNodeName(to_node_uuids), msg))
319
        lu.LogWarning(msg)
320

    
321

    
322
def MergeAndVerifyHvState(op_input, obj_input):
323
  """Combines the hv state from an opcode with the one of the object
324

325
  @param op_input: The input dict from the opcode
326
  @param obj_input: The input dict from the objects
327
  @return: The verified and updated dict
328

329
  """
330
  if op_input:
331
    invalid_hvs = set(op_input) - constants.HYPER_TYPES
332
    if invalid_hvs:
333
      raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
334
                                 " %s" % utils.CommaJoin(invalid_hvs),
335
                                 errors.ECODE_INVAL)
336
    if obj_input is None:
337
      obj_input = {}
338
    type_check = constants.HVSTS_PARAMETER_TYPES
339
    return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
340

    
341
  return None
342

    
343

    
344
def MergeAndVerifyDiskState(op_input, obj_input):
345
  """Combines the disk state from an opcode with the one of the object
346

347
  @param op_input: The input dict from the opcode
348
  @param obj_input: The input dict from the objects
349
  @return: The verified and updated dict
350
  """
351
  if op_input:
352
    invalid_dst = set(op_input) - constants.DS_VALID_TYPES
353
    if invalid_dst:
354
      raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
355
                                 utils.CommaJoin(invalid_dst),
356
                                 errors.ECODE_INVAL)
357
    type_check = constants.DSS_PARAMETER_TYPES
358
    if obj_input is None:
359
      obj_input = {}
360
    return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
361
                                              type_check))
362
                for key, value in op_input.items())
363

    
364
  return None
365

    
366

    
367
def CheckOSParams(lu, required, node_uuids, osname, osparams):
368
  """OS parameters validation.
369

370
  @type lu: L{LogicalUnit}
371
  @param lu: the logical unit for which we check
372
  @type required: boolean
373
  @param required: whether the validation should fail if the OS is not
374
      found
375
  @type node_uuids: list
376
  @param node_uuids: the list of nodes on which we should check
377
  @type osname: string
378
  @param osname: the name of the hypervisor we should use
379
  @type osparams: dict
380
  @param osparams: the parameters which we need to check
381
  @raise errors.OpPrereqError: if the parameters are not valid
382

383
  """
384
  node_uuids = _FilterVmNodes(lu, node_uuids)
385
  result = lu.rpc.call_os_validate(node_uuids, required, osname,
386
                                   [constants.OS_VALIDATE_PARAMETERS],
387
                                   osparams)
388
  for node_uuid, nres in result.items():
389
    # we don't check for offline cases since this should be run only
390
    # against the master node and/or an instance's nodes
391
    nres.Raise("OS Parameters validation failed on node %s" %
392
               lu.cfg.GetNodeName(node_uuid))
393
    if not nres.payload:
394
      lu.LogInfo("OS %s not found on node %s, validation skipped",
395
                 osname, lu.cfg.GetNodeName(node_uuid))
396

    
397

    
398
def CheckHVParams(lu, node_uuids, hvname, hvparams):
399
  """Hypervisor parameter validation.
400

401
  This function abstract the hypervisor parameter validation to be
402
  used in both instance create and instance modify.
403

404
  @type lu: L{LogicalUnit}
405
  @param lu: the logical unit for which we check
406
  @type node_uuids: list
407
  @param node_uuids: the list of nodes on which we should check
408
  @type hvname: string
409
  @param hvname: the name of the hypervisor we should use
410
  @type hvparams: dict
411
  @param hvparams: the parameters which we need to check
412
  @raise errors.OpPrereqError: if the parameters are not valid
413

414
  """
415
  node_uuids = _FilterVmNodes(lu, node_uuids)
416

    
417
  cluster = lu.cfg.GetClusterInfo()
418
  hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
419

    
420
  hvinfo = lu.rpc.call_hypervisor_validate_params(node_uuids, hvname, hvfull)
421
  for node_uuid in node_uuids:
422
    info = hvinfo[node_uuid]
423
    if info.offline:
424
      continue
425
    info.Raise("Hypervisor parameter validation failed on node %s" %
426
               lu.cfg.GetNodeName(node_uuid))
427

    
428

    
429
def AdjustCandidatePool(lu, exceptions):
430
  """Adjust the candidate pool after node operations.
431

432
  """
433
  mod_list = lu.cfg.MaintainCandidatePool(exceptions)
434
  if mod_list:
435
    lu.LogInfo("Promoted nodes to master candidate role: %s",
436
               utils.CommaJoin(node.name for node in mod_list))
437
    for node in mod_list:
438
      lu.context.ReaddNode(node)
439
  mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
440
  if mc_now > mc_max:
441
    lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
442
               (mc_now, mc_max))
443

    
444

    
445
def CheckNodePVs(nresult, exclusive_storage):
446
  """Check node PVs.
447

448
  """
449
  pvlist_dict = nresult.get(constants.NV_PVLIST, None)
450
  if pvlist_dict is None:
451
    return (["Can't get PV list from node"], None)
452
  pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
453
  errlist = []
454
  # check that ':' is not present in PV names, since it's a
455
  # special character for lvcreate (denotes the range of PEs to
456
  # use on the PV)
457
  for pv in pvlist:
458
    if ":" in pv.name:
459
      errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
460
                     (pv.name, pv.vg_name))
461
  es_pvinfo = None
462
  if exclusive_storage:
463
    (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
464
    errlist.extend(errmsgs)
465
    shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
466
    if shared_pvs:
467
      for (pvname, lvlist) in shared_pvs:
468
        # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
469
        errlist.append("PV %s is shared among unrelated LVs (%s)" %
470
                       (pvname, utils.CommaJoin(lvlist)))
471
  return (errlist, es_pvinfo)
472

    
473

    
474
def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
475
  """Computes if value is in the desired range.
476

477
  @param name: name of the parameter for which we perform the check
478
  @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
479
      not just 'disk')
480
  @param ispecs: dictionary containing min and max values
481
  @param value: actual value that we want to use
482
  @return: None or an error string
483

484
  """
485
  if value in [None, constants.VALUE_AUTO]:
486
    return None
487
  max_v = ispecs[constants.ISPECS_MAX].get(name, value)
488
  min_v = ispecs[constants.ISPECS_MIN].get(name, value)
489
  if value > max_v or min_v > value:
490
    if qualifier:
491
      fqn = "%s/%s" % (name, qualifier)
492
    else:
493
      fqn = name
494
    return ("%s value %s is not in range [%s, %s]" %
495
            (fqn, value, min_v, max_v))
496
  return None
497

    
498

    
499
def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
500
                                nic_count, disk_sizes, spindle_use,
501
                                disk_template,
502
                                _compute_fn=_ComputeMinMaxSpec):
503
  """Verifies ipolicy against provided specs.
504

505
  @type ipolicy: dict
506
  @param ipolicy: The ipolicy
507
  @type mem_size: int
508
  @param mem_size: The memory size
509
  @type cpu_count: int
510
  @param cpu_count: Used cpu cores
511
  @type disk_count: int
512
  @param disk_count: Number of disks used
513
  @type nic_count: int
514
  @param nic_count: Number of nics used
515
  @type disk_sizes: list of ints
516
  @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
517
  @type spindle_use: int
518
  @param spindle_use: The number of spindles this instance uses
519
  @type disk_template: string
520
  @param disk_template: The disk template of the instance
521
  @param _compute_fn: The compute function (unittest only)
522
  @return: A list of violations, or an empty list of no violations are found
523

524
  """
525
  assert disk_count == len(disk_sizes)
526

    
527
  test_settings = [
528
    (constants.ISPEC_MEM_SIZE, "", mem_size),
529
    (constants.ISPEC_CPU_COUNT, "", cpu_count),
530
    (constants.ISPEC_NIC_COUNT, "", nic_count),
531
    (constants.ISPEC_SPINDLE_USE, "", spindle_use),
532
    ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
533
         for idx, d in enumerate(disk_sizes)]
534
  if disk_template != constants.DT_DISKLESS:
535
    # This check doesn't make sense for diskless instances
536
    test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
537
  ret = []
538
  allowed_dts = ipolicy[constants.IPOLICY_DTS]
539
  if disk_template not in allowed_dts:
540
    ret.append("Disk template %s is not allowed (allowed templates: %s)" %
541
               (disk_template, utils.CommaJoin(allowed_dts)))
542

    
543
  min_errs = None
544
  for minmax in ipolicy[constants.ISPECS_MINMAX]:
545
    errs = filter(None,
546
                  (_compute_fn(name, qualifier, minmax, value)
547
                   for (name, qualifier, value) in test_settings))
548
    if min_errs is None or len(errs) < len(min_errs):
549
      min_errs = errs
550
  assert min_errs is not None
551
  return ret + min_errs
552

    
553

    
554
def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
555
                                    _compute_fn=ComputeIPolicySpecViolation):
556
  """Compute if instance meets the specs of ipolicy.
557

558
  @type ipolicy: dict
559
  @param ipolicy: The ipolicy to verify against
560
  @type instance: L{objects.Instance}
561
  @param instance: The instance to verify
562
  @type cfg: L{config.ConfigWriter}
563
  @param cfg: Cluster configuration
564
  @param _compute_fn: The function to verify ipolicy (unittest only)
565
  @see: L{ComputeIPolicySpecViolation}
566

567
  """
568
  ret = []
569
  be_full = cfg.GetClusterInfo().FillBE(instance)
570
  mem_size = be_full[constants.BE_MAXMEM]
571
  cpu_count = be_full[constants.BE_VCPUS]
572
  es_flags = rpc.GetExclusiveStorageForNodes(cfg, instance.all_nodes)
573
  if any(es_flags.values()):
574
    # With exclusive storage use the actual spindles
575
    try:
576
      spindle_use = sum([disk.spindles for disk in instance.disks])
577
    except TypeError:
578
      ret.append("Number of spindles not configured for disks of instance %s"
579
                 " while exclusive storage is enabled, try running gnt-cluster"
580
                 " repair-disk-sizes" % instance.name)
581
      # _ComputeMinMaxSpec ignores 'None's
582
      spindle_use = None
583
  else:
584
    spindle_use = be_full[constants.BE_SPINDLE_USE]
585
  disk_count = len(instance.disks)
586
  disk_sizes = [disk.size for disk in instance.disks]
587
  nic_count = len(instance.nics)
588
  disk_template = instance.disk_template
589

    
590
  return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
591
                           disk_sizes, spindle_use, disk_template)
592

    
593

    
594
def _ComputeViolatingInstances(ipolicy, instances, cfg):
595
  """Computes a set of instances who violates given ipolicy.
596

597
  @param ipolicy: The ipolicy to verify
598
  @type instances: L{objects.Instance}
599
  @param instances: List of instances to verify
600
  @type cfg: L{config.ConfigWriter}
601
  @param cfg: Cluster configuration
602
  @return: A frozenset of instance names violating the ipolicy
603

604
  """
605
  return frozenset([inst.name for inst in instances
606
                    if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
607

    
608

    
609
def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
610
  """Computes a set of any instances that would violate the new ipolicy.
611

612
  @param old_ipolicy: The current (still in-place) ipolicy
613
  @param new_ipolicy: The new (to become) ipolicy
614
  @param instances: List of instances to verify
615
  @type cfg: L{config.ConfigWriter}
616
  @param cfg: Cluster configuration
617
  @return: A list of instances which violates the new ipolicy but
618
      did not before
619

620
  """
621
  return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
622
          _ComputeViolatingInstances(old_ipolicy, instances, cfg))
623

    
624

    
625
def GetUpdatedParams(old_params, update_dict,
626
                      use_default=True, use_none=False):
627
  """Return the new version of a parameter dictionary.
628

629
  @type old_params: dict
630
  @param old_params: old parameters
631
  @type update_dict: dict
632
  @param update_dict: dict containing new parameter values, or
633
      constants.VALUE_DEFAULT to reset the parameter to its default
634
      value
635
  @param use_default: boolean
636
  @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
637
      values as 'to be deleted' values
638
  @param use_none: boolean
639
  @type use_none: whether to recognise C{None} values as 'to be
640
      deleted' values
641
  @rtype: dict
642
  @return: the new parameter dictionary
643

644
  """
645
  params_copy = copy.deepcopy(old_params)
646
  for key, val in update_dict.iteritems():
647
    if ((use_default and val == constants.VALUE_DEFAULT) or
648
          (use_none and val is None)):
649
      try:
650
        del params_copy[key]
651
      except KeyError:
652
        pass
653
    else:
654
      params_copy[key] = val
655
  return params_copy
656

    
657

    
658
def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
659
  """Return the new version of an instance policy.
660

661
  @param group_policy: whether this policy applies to a group and thus
662
    we should support removal of policy entries
663

664
  """
665
  ipolicy = copy.deepcopy(old_ipolicy)
666
  for key, value in new_ipolicy.items():
667
    if key not in constants.IPOLICY_ALL_KEYS:
668
      raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
669
                                 errors.ECODE_INVAL)
670
    if (not value or value == [constants.VALUE_DEFAULT] or
671
            value == constants.VALUE_DEFAULT):
672
      if group_policy:
673
        if key in ipolicy:
674
          del ipolicy[key]
675
      else:
676
        raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
677
                                   " on the cluster'" % key,
678
                                   errors.ECODE_INVAL)
679
    else:
680
      if key in constants.IPOLICY_PARAMETERS:
681
        # FIXME: we assume all such values are float
682
        try:
683
          ipolicy[key] = float(value)
684
        except (TypeError, ValueError), err:
685
          raise errors.OpPrereqError("Invalid value for attribute"
686
                                     " '%s': '%s', error: %s" %
687
                                     (key, value, err), errors.ECODE_INVAL)
688
      elif key == constants.ISPECS_MINMAX:
689
        for minmax in value:
690
          for k in minmax.keys():
691
            utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
692
        ipolicy[key] = value
693
      elif key == constants.ISPECS_STD:
694
        if group_policy:
695
          msg = "%s cannot appear in group instance specs" % key
696
          raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
697
        ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
698
                                        use_none=False, use_default=False)
699
        utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
700
      else:
701
        # FIXME: we assume all others are lists; this should be redone
702
        # in a nicer way
703
        ipolicy[key] = list(value)
704
  try:
705
    objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
706
  except errors.ConfigurationError, err:
707
    raise errors.OpPrereqError("Invalid instance policy: %s" % err,
708
                               errors.ECODE_INVAL)
709
  return ipolicy
710

    
711

    
712
def AnnotateDiskParams(instance, devs, cfg):
713
  """Little helper wrapper to the rpc annotation method.
714

715
  @param instance: The instance object
716
  @type devs: List of L{objects.Disk}
717
  @param devs: The root devices (not any of its children!)
718
  @param cfg: The config object
719
  @returns The annotated disk copies
720
  @see L{rpc.node.AnnotateDiskParams}
721

722
  """
723
  return rpc.AnnotateDiskParams(devs, cfg.GetInstanceDiskParams(instance))
724

    
725

    
726
def SupportsOob(cfg, node):
727
  """Tells if node supports OOB.
728

729
  @type cfg: L{config.ConfigWriter}
730
  @param cfg: The cluster configuration
731
  @type node: L{objects.Node}
732
  @param node: The node
733
  @return: The OOB script if supported or an empty string otherwise
734

735
  """
736
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
737

    
738

    
739
def _UpdateAndVerifySubDict(base, updates, type_check):
740
  """Updates and verifies a dict with sub dicts of the same type.
741

742
  @param base: The dict with the old data
743
  @param updates: The dict with the new data
744
  @param type_check: Dict suitable to ForceDictType to verify correct types
745
  @returns: A new dict with updated and verified values
746

747
  """
748
  def fn(old, value):
749
    new = GetUpdatedParams(old, value)
750
    utils.ForceDictType(new, type_check)
751
    return new
752

    
753
  ret = copy.deepcopy(base)
754
  ret.update(dict((key, fn(base.get(key, {}), value))
755
                  for key, value in updates.items()))
756
  return ret
757

    
758

    
759
def _FilterVmNodes(lu, node_uuids):
760
  """Filters out non-vm_capable nodes from a list.
761

762
  @type lu: L{LogicalUnit}
763
  @param lu: the logical unit for which we check
764
  @type node_uuids: list
765
  @param node_uuids: the list of nodes on which we should check
766
  @rtype: list
767
  @return: the list of vm-capable nodes
768

769
  """
770
  vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
771
  return [uuid for uuid in node_uuids if uuid not in vm_nodes]
772

    
773

    
774
def GetDefaultIAllocator(cfg, ialloc):
775
  """Decides on which iallocator to use.
776

777
  @type cfg: L{config.ConfigWriter}
778
  @param cfg: Cluster configuration object
779
  @type ialloc: string or None
780
  @param ialloc: Iallocator specified in opcode
781
  @rtype: string
782
  @return: Iallocator name
783

784
  """
785
  if not ialloc:
786
    # Use default iallocator
787
    ialloc = cfg.GetDefaultIAllocator()
788

    
789
  if not ialloc:
790
    raise errors.OpPrereqError("No iallocator was specified, neither in the"
791
                               " opcode nor as a cluster-wide default",
792
                               errors.ECODE_INVAL)
793

    
794
  return ialloc
795

    
796

    
797
def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_node_uuids,
798
                             cur_group_uuid):
799
  """Checks if node groups for locked instances are still correct.
800

801
  @type cfg: L{config.ConfigWriter}
802
  @param cfg: Cluster configuration
803
  @type instances: dict; string as key, L{objects.Instance} as value
804
  @param instances: Dictionary, instance UUID as key, instance object as value
805
  @type owned_groups: iterable of string
806
  @param owned_groups: List of owned groups
807
  @type owned_node_uuids: iterable of string
808
  @param owned_node_uuids: List of owned nodes
809
  @type cur_group_uuid: string or None
810
  @param cur_group_uuid: Optional group UUID to check against instance's groups
811

812
  """
813
  for (uuid, inst) in instances.items():
814
    assert owned_node_uuids.issuperset(inst.all_nodes), \
815
      "Instance %s's nodes changed while we kept the lock" % inst.name
816

    
817
    inst_groups = CheckInstanceNodeGroups(cfg, uuid, owned_groups)
818

    
819
    assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
820
      "Instance %s has no node in group %s" % (inst.name, cur_group_uuid)
821

    
822

    
823
def CheckInstanceNodeGroups(cfg, inst_uuid, owned_groups, primary_only=False):
824
  """Checks if the owned node groups are still correct for an instance.
825

826
  @type cfg: L{config.ConfigWriter}
827
  @param cfg: The cluster configuration
828
  @type inst_uuid: string
829
  @param inst_uuid: Instance UUID
830
  @type owned_groups: set or frozenset
831
  @param owned_groups: List of currently owned node groups
832
  @type primary_only: boolean
833
  @param primary_only: Whether to check node groups for only the primary node
834

835
  """
836
  inst_groups = cfg.GetInstanceNodeGroups(inst_uuid, primary_only)
837

    
838
  if not owned_groups.issuperset(inst_groups):
839
    raise errors.OpPrereqError("Instance %s's node groups changed since"
840
                               " locks were acquired, current groups are"
841
                               " are '%s', owning groups '%s'; retry the"
842
                               " operation" %
843
                               (cfg.GetInstanceName(inst_uuid),
844
                                utils.CommaJoin(inst_groups),
845
                                utils.CommaJoin(owned_groups)),
846
                               errors.ECODE_STATE)
847

    
848
  return inst_groups
849

    
850

    
851
def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
852
  """Unpacks the result of change-group and node-evacuate iallocator requests.
853

854
  Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
855
  L{constants.IALLOCATOR_MODE_CHG_GROUP}.
856

857
  @type lu: L{LogicalUnit}
858
  @param lu: Logical unit instance
859
  @type alloc_result: tuple/list
860
  @param alloc_result: Result from iallocator
861
  @type early_release: bool
862
  @param early_release: Whether to release locks early if possible
863
  @type use_nodes: bool
864
  @param use_nodes: Whether to display node names instead of groups
865

866
  """
867
  (moved, failed, jobs) = alloc_result
868

    
869
  if failed:
870
    failreason = utils.CommaJoin("%s (%s)" % (name, reason)
871
                                 for (name, reason) in failed)
872
    lu.LogWarning("Unable to evacuate instances %s", failreason)
873
    raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
874

    
875
  if moved:
876
    lu.LogInfo("Instances to be moved: %s",
877
               utils.CommaJoin(
878
                 "%s (to %s)" %
879
                 (name, _NodeEvacDest(use_nodes, group, node_names))
880
                 for (name, group, node_names) in moved))
881

    
882
  return [map(compat.partial(_SetOpEarlyRelease, early_release),
883
              map(opcodes.OpCode.LoadOpCode, ops))
884
          for ops in jobs]
885

    
886

    
887
def _NodeEvacDest(use_nodes, group, node_names):
888
  """Returns group or nodes depending on caller's choice.
889

890
  """
891
  if use_nodes:
892
    return utils.CommaJoin(node_names)
893
  else:
894
    return group
895

    
896

    
897
def _SetOpEarlyRelease(early_release, op):
898
  """Sets C{early_release} flag on opcodes if available.
899

900
  """
901
  try:
902
    op.early_release = early_release
903
  except AttributeError:
904
    assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
905

    
906
  return op
907

    
908

    
909
def MapInstanceLvsToNodes(instances):
910
  """Creates a map from (node, volume) to instance name.
911

912
  @type instances: list of L{objects.Instance}
913
  @rtype: dict; tuple of (node uuid, volume name) as key, L{objects.Instance}
914
          object as value
915

916
  """
917
  return dict(((node_uuid, vol), inst)
918
              for inst in instances
919
              for (node_uuid, vols) in inst.MapLVsByNode().items()
920
              for vol in vols)
921

    
922

    
923
def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
924
  """Make sure that none of the given paramters is global.
925

926
  If a global parameter is found, an L{errors.OpPrereqError} exception is
927
  raised. This is used to avoid setting global parameters for individual nodes.
928

929
  @type params: dictionary
930
  @param params: Parameters to check
931
  @type glob_pars: dictionary
932
  @param glob_pars: Forbidden parameters
933
  @type kind: string
934
  @param kind: Kind of parameters (e.g. "node")
935
  @type bad_levels: string
936
  @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
937
      "instance")
938
  @type good_levels: strings
939
  @param good_levels: Level(s) at which the parameters are allowed (e.g.
940
      "cluster or group")
941

942
  """
943
  used_globals = glob_pars.intersection(params)
944
  if used_globals:
945
    msg = ("The following %s parameters are global and cannot"
946
           " be customized at %s level, please modify them at"
947
           " %s level: %s" %
948
           (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
949
    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
950

    
951

    
952
def IsExclusiveStorageEnabledNode(cfg, node):
953
  """Whether exclusive_storage is in effect for the given node.
954

955
  @type cfg: L{config.ConfigWriter}
956
  @param cfg: The cluster configuration
957
  @type node: L{objects.Node}
958
  @param node: The node
959
  @rtype: bool
960
  @return: The effective value of exclusive_storage
961

962
  """
963
  return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
964

    
965

    
966
def CheckInstanceState(lu, instance, req_states, msg=None):
967
  """Ensure that an instance is in one of the required states.
968

969
  @param lu: the LU on behalf of which we make the check
970
  @param instance: the instance to check
971
  @param msg: if passed, should be a message to replace the default one
972
  @raise errors.OpPrereqError: if the instance is not in the required state
973

974
  """
975
  if msg is None:
976
    msg = ("can't use instance from outside %s states" %
977
           utils.CommaJoin(req_states))
978
  if instance.admin_state not in req_states:
979
    raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
980
                               (instance.name, instance.admin_state, msg),
981
                               errors.ECODE_STATE)
982

    
983
  if constants.ADMINST_UP not in req_states:
984
    pnode_uuid = instance.primary_node
985
    if not lu.cfg.GetNodeInfo(pnode_uuid).offline:
986
      all_hvparams = lu.cfg.GetClusterInfo().hvparams
987
      ins_l = lu.rpc.call_instance_list(
988
                [pnode_uuid], [instance.hypervisor], all_hvparams)[pnode_uuid]
989
      ins_l.Raise("Can't contact node %s for instance information" %
990
                  lu.cfg.GetNodeName(pnode_uuid),
991
                  prereq=True, ecode=errors.ECODE_ENVIRON)
992
      if instance.name in ins_l.payload:
993
        raise errors.OpPrereqError("Instance %s is running, %s" %
994
                                   (instance.name, msg), errors.ECODE_STATE)
995
    else:
996
      lu.LogWarning("Primary node offline, ignoring check that instance"
997
                     " is down")
998

    
999

    
1000
def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1001
  """Check the sanity of iallocator and node arguments and use the
1002
  cluster-wide iallocator if appropriate.
1003

1004
  Check that at most one of (iallocator, node) is specified. If none is
1005
  specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
1006
  then the LU's opcode's iallocator slot is filled with the cluster-wide
1007
  default iallocator.
1008

1009
  @type iallocator_slot: string
1010
  @param iallocator_slot: the name of the opcode iallocator slot
1011
  @type node_slot: string
1012
  @param node_slot: the name of the opcode target node slot
1013

1014
  """
1015
  node = getattr(lu.op, node_slot, None)
1016
  ialloc = getattr(lu.op, iallocator_slot, None)
1017
  if node == []:
1018
    node = None
1019

    
1020
  if node is not None and ialloc is not None:
1021
    raise errors.OpPrereqError("Do not specify both, iallocator and node",
1022
                               errors.ECODE_INVAL)
1023
  elif ((node is None and ialloc is None) or
1024
        ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1025
    default_iallocator = lu.cfg.GetDefaultIAllocator()
1026
    if default_iallocator:
1027
      setattr(lu.op, iallocator_slot, default_iallocator)
1028
    else:
1029
      raise errors.OpPrereqError("No iallocator or node given and no"
1030
                                 " cluster-wide default iallocator found;"
1031
                                 " please specify either an iallocator or a"
1032
                                 " node, or set a cluster-wide default"
1033
                                 " iallocator", errors.ECODE_INVAL)
1034

    
1035

    
1036
def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_uuid, prereq):
1037
  faulty = []
1038

    
1039
  result = rpc_runner.call_blockdev_getmirrorstatus(
1040
             node_uuid, (instance.disks, instance))
1041
  result.Raise("Failed to get disk status from node %s" %
1042
               cfg.GetNodeName(node_uuid),
1043
               prereq=prereq, ecode=errors.ECODE_ENVIRON)
1044

    
1045
  for idx, bdev_status in enumerate(result.payload):
1046
    if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1047
      faulty.append(idx)
1048

    
1049
  return faulty
1050

    
1051

    
1052
def CheckNodeOnline(lu, node_uuid, msg=None):
1053
  """Ensure that a given node is online.
1054

1055
  @param lu: the LU on behalf of which we make the check
1056
  @param node_uuid: the node to check
1057
  @param msg: if passed, should be a message to replace the default one
1058
  @raise errors.OpPrereqError: if the node is offline
1059

1060
  """
1061
  if msg is None:
1062
    msg = "Can't use offline node"
1063
  if lu.cfg.GetNodeInfo(node_uuid).offline:
1064
    raise errors.OpPrereqError("%s: %s" % (msg, lu.cfg.GetNodeName(node_uuid)),
1065
                               errors.ECODE_STATE)
1066

    
1067

    
1068
def CheckDiskTemplateEnabled(cluster, disk_template):
1069
  """Helper function to check if a disk template is enabled.
1070

1071
  @type cluster: C{objects.Cluster}
1072
  @param cluster: the cluster's configuration
1073
  @type disk_template: str
1074
  @param disk_template: the disk template to be checked
1075

1076
  """
1077
  assert disk_template is not None
1078
  if disk_template not in constants.DISK_TEMPLATES:
1079
    raise errors.OpPrereqError("'%s' is not a valid disk template."
1080
                               " Valid disk templates are: %s" %
1081
                               (disk_template,
1082
                                ",".join(constants.DISK_TEMPLATES)))
1083
  if not disk_template in cluster.enabled_disk_templates:
1084
    raise errors.OpPrereqError("Disk template '%s' is not enabled in cluster."
1085
                               " Enabled disk templates are: %s" %
1086
                               (disk_template,
1087
                                ",".join(cluster.enabled_disk_templates)))
1088

    
1089

    
1090
def CheckStorageTypeEnabled(cluster, storage_type):
1091
  """Helper function to check if a storage type is enabled.
1092

1093
  @type cluster: C{objects.Cluster}
1094
  @param cluster: the cluster's configuration
1095
  @type storage_type: str
1096
  @param storage_type: the storage type to be checked
1097

1098
  """
1099
  assert storage_type is not None
1100
  assert storage_type in constants.STORAGE_TYPES
1101
  # special case for lvm-pv, because it cannot be enabled
1102
  # via disk templates
1103
  if storage_type == constants.ST_LVM_PV:
1104
    CheckStorageTypeEnabled(cluster, constants.ST_LVM_VG)
1105
  else:
1106
    possible_disk_templates = \
1107
        utils.storage.GetDiskTemplatesOfStorageTypes(storage_type)
1108
    for disk_template in possible_disk_templates:
1109
      if disk_template in cluster.enabled_disk_templates:
1110
        return
1111
    raise errors.OpPrereqError("No disk template of storage type '%s' is"
1112
                               " enabled in this cluster. Enabled disk"
1113
                               " templates are: %s" % (storage_type,
1114
                               ",".join(cluster.enabled_disk_templates)))
1115

    
1116

    
1117
def CheckIpolicyVsDiskTemplates(ipolicy, enabled_disk_templates):
1118
  """Checks ipolicy disk templates against enabled disk tempaltes.
1119

1120
  @type ipolicy: dict
1121
  @param ipolicy: the new ipolicy
1122
  @type enabled_disk_templates: list of string
1123
  @param enabled_disk_templates: list of enabled disk templates on the
1124
    cluster
1125
  @raises errors.OpPrereqError: if there is at least one allowed disk
1126
    template that is not also enabled.
1127

1128
  """
1129
  assert constants.IPOLICY_DTS in ipolicy
1130
  allowed_disk_templates = ipolicy[constants.IPOLICY_DTS]
1131
  not_enabled = set(allowed_disk_templates) - set(enabled_disk_templates)
1132
  if not_enabled:
1133
    raise errors.OpPrereqError("The following disk template are allowed"
1134
                               " by the ipolicy, but not enabled on the"
1135
                               " cluster: %s" % utils.CommaJoin(not_enabled))
1136

    
1137

    
1138
def CheckDiskAccessModeValidity(parameters):
1139
  """Checks if the access parameter is legal.
1140

1141
  @see: L{CheckDiskAccessModeConsistency} for cluster consistency checks.
1142
  @raise errors.OpPrereqError: if the check fails.
1143

1144
  """
1145
  for disk_template in parameters:
1146
    access = parameters[disk_template].get(constants.LDP_ACCESS,
1147
                                           constants.DISK_KERNELSPACE)
1148
    if access not in constants.DISK_VALID_ACCESS_MODES:
1149
      valid_vals_str = utils.CommaJoin(constants.DISK_VALID_ACCESS_MODES)
1150
      raise errors.OpPrereqError("Invalid value of '{d}:{a}': '{v}' (expected"
1151
                                 " one of {o})".format(d=disk_template,
1152
                                                       a=constants.LDP_ACCESS,
1153
                                                       v=access,
1154
                                                       o=valid_vals_str))
1155

    
1156

    
1157
def CheckDiskAccessModeConsistency(parameters, cfg, group=None):
1158
  """Checks if the access param is consistent with the cluster configuration.
1159

1160
  @note: requires a configuration lock to run.
1161
  @param parameters: the parameters to validate
1162
  @param cfg: the cfg object of the cluster
1163
  @param group: if set, only check for consistency within this group.
1164
  @raise errors.OpPrereqError: if the LU attempts to change the access parameter
1165
                               to an invalid value, such as "pink bunny".
1166
  @raise errors.OpPrereqError: if the LU attempts to change the access parameter
1167
                               to an inconsistent value, such as asking for RBD
1168
                               userspace access to the chroot hypervisor.
1169

1170
  """
1171
  CheckDiskAccessModeValidity(parameters)
1172

    
1173
  for disk_template in parameters:
1174
    access = parameters[disk_template].get(constants.LDP_ACCESS,
1175
                                           constants.DISK_KERNELSPACE)
1176

    
1177
    if disk_template not in constants.DTS_HAVE_ACCESS:
1178
      continue
1179

    
1180
    #Check the combination of instance hypervisor, disk template and access
1181
    #protocol is sane.
1182
    inst_uuids = cfg.GetNodeGroupInstances(group) if group else \
1183
                 cfg.GetInstanceList()
1184

    
1185
    for entry in inst_uuids:
1186
      inst = cfg.GetInstanceInfo(entry)
1187
      hv = inst.hypervisor
1188
      dt = inst.disk_template
1189

    
1190
      if not IsValidDiskAccessModeCombination(hv, dt, access):
1191
        raise errors.OpPrereqError("Instance {i}: cannot use '{a}' access"
1192
                                   " setting with {h} hypervisor and {d} disk"
1193
                                   " type.".format(i=inst.name,
1194
                                                   a=access,
1195
                                                   h=hv,
1196
                                                   d=dt))
1197

    
1198

    
1199
def IsValidDiskAccessModeCombination(hv, disk_template, mode):
1200
  """Checks if an hypervisor can read a disk template with given mode.
1201

1202
  @param hv: the hypervisor that will access the data
1203
  @param disk_template: the disk template the data is stored as
1204
  @param mode: how the hypervisor should access the data
1205
  @return: True if the hypervisor can read a given read disk_template
1206
           in the specified mode.
1207

1208
  """
1209
  if mode == constants.DISK_KERNELSPACE:
1210
    return True
1211

    
1212
  if (hv == constants.HT_KVM and
1213
      disk_template in (constants.DT_RBD, constants.DT_GLUSTER) and
1214
      mode == constants.DISK_USERSPACE):
1215
    return True
1216

    
1217
  # Everything else:
1218
  return False
1219

    
1220

    
1221
def AddNodeCertToCandidateCerts(lu, node_uuid, cluster):
1222
  """Add the node's client SSL certificate digest to the candidate certs.
1223

1224
  @type node_uuid: string
1225
  @param node_uuid: the node's UUID
1226
  @type cluster: C{object.Cluster}
1227
  @param cluster: the cluster's configuration
1228

1229
  """
1230
  result = lu.rpc.call_node_crypto_tokens(
1231
             node_uuid,
1232
             [(constants.CRYPTO_TYPE_SSL_DIGEST, constants.CRYPTO_ACTION_GET,
1233
               None)])
1234
  result.Raise("Could not retrieve the node's (uuid %s) SSL digest."
1235
               % node_uuid)
1236
  ((crypto_type, digest), ) = result.payload
1237
  assert crypto_type == constants.CRYPTO_TYPE_SSL_DIGEST
1238

    
1239
  utils.AddNodeToCandidateCerts(node_uuid, digest, cluster.candidate_certs)
1240

    
1241

    
1242
def RemoveNodeCertFromCandidateCerts(node_uuid, cluster):
1243
  """Removes the node's certificate from the candidate certificates list.
1244

1245
  @type node_uuid: string
1246
  @param node_uuid: the node's UUID
1247
  @type cluster: C{objects.Cluster}
1248
  @param cluster: the cluster's configuration
1249

1250
  """
1251
  utils.RemoveNodeFromCandidateCerts(node_uuid, cluster.candidate_certs)
1252

    
1253

    
1254
def CreateNewClientCert(self, node_uuid, filename=None):
1255
  """Creates a new client SSL certificate for the node.
1256

1257
  @type node_uuid: string
1258
  @param node_uuid: the node's UUID
1259
  @type filename: string
1260
  @param filename: the certificate's filename
1261
  @rtype: string
1262
  @return: the digest of the newly created certificate
1263

1264
  """
1265
  options = {}
1266
  if filename:
1267
    options[constants.CRYPTO_OPTION_CERT_FILE] = filename
1268
  result = self.rpc.call_node_crypto_tokens(
1269
             node_uuid,
1270
             [(constants.CRYPTO_TYPE_SSL_DIGEST,
1271
               constants.CRYPTO_ACTION_CREATE,
1272
               options)])
1273
  result.Raise("Could not create the node's (uuid %s) SSL client"
1274
               " certificate." % node_uuid)
1275
  ((crypto_type, new_digest), ) = result.payload
1276
  assert crypto_type == constants.CRYPTO_TYPE_SSL_DIGEST
1277
  return new_digest