4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Common functions used by multiple logical units."""
27 from ganeti import compat
28 from ganeti import constants
29 from ganeti import errors
30 from ganeti import hypervisor
31 from ganeti import locking
32 from ganeti import objects
33 from ganeti import opcodes
34 from ganeti import pathutils
35 from ganeti import rpc
36 from ganeti import ssconf
37 from ganeti import utils
41 INSTANCE_DOWN = [constants.ADMINST_DOWN]
42 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
43 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
45 #: Instance status in which an instance can be marked as offline/online
46 CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
47 constants.ADMINST_OFFLINE,
51 def _ExpandItemName(expand_fn, name, kind):
52 """Expand an item name.
54 @param expand_fn: the function to use for expansion
55 @param name: requested item name
56 @param kind: text description ('Node' or 'Instance')
57 @return: the result of the expand_fn, if successful
58 @raise errors.OpPrereqError: if the item is not found
61 full_name = expand_fn(name)
63 raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
68 def ExpandInstanceUuidAndName(cfg, expected_uuid, name):
69 """Wrapper over L{_ExpandItemName} for instance."""
70 (uuid, full_name) = _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
71 if expected_uuid is not None and uuid != expected_uuid:
72 raise errors.OpPrereqError(
73 "The instances UUID '%s' does not match the expected UUID '%s' for"
74 " instance '%s'. Maybe the instance changed since you submitted this"
75 " job." % (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
76 return (uuid, full_name)
79 def ExpandNodeUuidAndName(cfg, expected_uuid, name):
80 """Expand a short node name into the node UUID and full name.
82 @type cfg: L{config.ConfigWriter}
83 @param cfg: The cluster configuration
84 @type expected_uuid: string
85 @param expected_uuid: expected UUID for the node (or None if there is no
86 expectation). If it does not match, a L{errors.OpPrereqError} is
89 @param name: the short node name
92 (uuid, full_name) = _ExpandItemName(cfg.ExpandNodeName, name, "Node")
93 if expected_uuid is not None and uuid != expected_uuid:
94 raise errors.OpPrereqError(
95 "The nodes UUID '%s' does not match the expected UUID '%s' for node"
96 " '%s'. Maybe the node changed since you submitted this job." %
97 (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
98 return (uuid, full_name)
102 """Returns a dict declaring all lock levels shared.
105 return dict.fromkeys(locking.LEVELS, 1)
108 def CheckNodeGroupInstances(cfg, group_uuid, owned_instance_names):
109 """Checks if the instances in a node group are still correct.
111 @type cfg: L{config.ConfigWriter}
112 @param cfg: The cluster configuration
113 @type group_uuid: string
114 @param group_uuid: Node group UUID
115 @type owned_instance_names: set or frozenset
116 @param owned_instance_names: List of currently owned instances
119 wanted_instances = frozenset(cfg.GetInstanceNames(
120 cfg.GetNodeGroupInstances(group_uuid)))
121 if owned_instance_names != wanted_instances:
122 raise errors.OpPrereqError("Instances in node group '%s' changed since"
123 " locks were acquired, wanted '%s', have '%s';"
124 " retry the operation" %
126 utils.CommaJoin(wanted_instances),
127 utils.CommaJoin(owned_instance_names)),
130 return wanted_instances
133 def GetWantedNodes(lu, short_node_names):
134 """Returns list of checked and expanded node names.
136 @type lu: L{LogicalUnit}
137 @param lu: the logical unit on whose behalf we execute
138 @type short_node_names: list
139 @param short_node_names: list of node names or None for all nodes
140 @rtype: tuple of lists
141 @return: tupe with (list of node UUIDs, list of node names)
142 @raise errors.ProgrammerError: if the nodes parameter is wrong type
146 node_uuids = [ExpandNodeUuidAndName(lu.cfg, None, name)[0]
147 for name in short_node_names]
149 node_uuids = lu.cfg.GetNodeList()
151 return (node_uuids, [lu.cfg.GetNodeName(uuid) for uuid in node_uuids])
154 def GetWantedInstances(lu, short_inst_names):
155 """Returns list of checked and expanded instance names.
157 @type lu: L{LogicalUnit}
158 @param lu: the logical unit on whose behalf we execute
159 @type short_inst_names: list
160 @param short_inst_names: list of instance names or None for all instances
161 @rtype: tuple of lists
162 @return: tuple of (instance UUIDs, instance names)
163 @raise errors.OpPrereqError: if the instances parameter is wrong type
164 @raise errors.OpPrereqError: if any of the passed instances is not found
168 inst_uuids = [ExpandInstanceUuidAndName(lu.cfg, None, name)[0]
169 for name in short_inst_names]
171 inst_uuids = lu.cfg.GetInstanceList()
172 return (inst_uuids, [lu.cfg.GetInstanceName(uuid) for uuid in inst_uuids])
175 def RunPostHook(lu, node_name):
176 """Runs the post-hook for an opcode on a single node.
179 hm = lu.proc.BuildHooksManager(lu)
181 hm.RunPhase(constants.HOOKS_PHASE_POST, node_names=[node_name])
182 except Exception, err: # pylint: disable=W0703
183 lu.LogWarning("Errors occurred running hooks on %s: %s",
187 def RedistributeAncillaryFiles(lu):
188 """Distribute additional files which are part of the cluster configuration.
190 ConfigWriter takes care of distributing the config and ssconf files, but
191 there are more files which should be distributed to all nodes. This function
192 makes sure those are copied.
195 # Gather target nodes
196 cluster = lu.cfg.GetClusterInfo()
197 master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
199 online_node_uuids = lu.cfg.GetOnlineNodeList()
200 online_node_uuid_set = frozenset(online_node_uuids)
201 vm_node_uuids = list(online_node_uuid_set.intersection(
202 lu.cfg.GetVmCapableNodeList()))
204 # Never distribute to master node
205 for node_uuids in [online_node_uuids, vm_node_uuids]:
206 if master_info.uuid in node_uuids:
207 node_uuids.remove(master_info.uuid)
210 (files_all, _, files_mc, files_vm) = \
211 ComputeAncillaryFiles(cluster, True)
213 # Never re-distribute configuration file from here
214 assert not (pathutils.CLUSTER_CONF_FILE in files_all or
215 pathutils.CLUSTER_CONF_FILE in files_vm)
216 assert not files_mc, "Master candidates not handled in this function"
219 (online_node_uuids, files_all),
220 (vm_node_uuids, files_vm),
224 for (node_uuids, files) in filemap:
226 UploadHelper(lu, node_uuids, fname)
229 def ComputeAncillaryFiles(cluster, redist):
230 """Compute files external to Ganeti which need to be consistent.
232 @type redist: boolean
233 @param redist: Whether to include files which need to be redistributed
236 # Compute files for all nodes
238 pathutils.SSH_KNOWN_HOSTS_FILE,
239 pathutils.CONFD_HMAC_KEY,
240 pathutils.CLUSTER_DOMAIN_SECRET_FILE,
241 pathutils.SPICE_CERT_FILE,
242 pathutils.SPICE_CACERT_FILE,
243 pathutils.RAPI_USERS_FILE,
247 # we need to ship at least the RAPI certificate
248 files_all.add(pathutils.RAPI_CERT_FILE)
250 files_all.update(pathutils.ALL_CERT_FILES)
251 files_all.update(ssconf.SimpleStore().GetFileList())
253 if cluster.modify_etc_hosts:
254 files_all.add(pathutils.ETC_HOSTS)
256 if cluster.use_external_mip_script:
257 files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
259 # Files which are optional, these must:
260 # - be present in one other category as well
261 # - either exist or not exist on all nodes of that category (mc, vm all)
263 pathutils.RAPI_USERS_FILE,
266 # Files which should only be on master candidates
270 files_mc.add(pathutils.CLUSTER_CONF_FILE)
273 if (not redist and (constants.ENABLE_FILE_STORAGE or
274 constants.ENABLE_SHARED_FILE_STORAGE)):
275 files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
276 files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
278 # Files which should only be on VM-capable nodes
281 for hv_name in cluster.enabled_hypervisors
283 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
287 for hv_name in cluster.enabled_hypervisors
289 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
291 # Filenames in each category must be unique
292 all_files_set = files_all | files_mc | files_vm
293 assert (len(all_files_set) ==
294 sum(map(len, [files_all, files_mc, files_vm]))), \
295 "Found file listed in more than one file list"
297 # Optional files must be present in one other category
298 assert all_files_set.issuperset(files_opt), \
299 "Optional file not in a different required list"
301 # This one file should never ever be re-distributed via RPC
302 assert not (redist and
303 pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
305 return (files_all, files_opt, files_mc, files_vm)
308 def UploadHelper(lu, node_uuids, fname):
309 """Helper for uploading a file and showing warnings.
312 if os.path.exists(fname):
313 result = lu.rpc.call_upload_file(node_uuids, fname)
314 for to_node_uuids, to_result in result.items():
315 msg = to_result.fail_msg
317 msg = ("Copy of file %s to node %s failed: %s" %
318 (fname, lu.cfg.GetNodeName(to_node_uuids), msg))
322 def MergeAndVerifyHvState(op_input, obj_input):
323 """Combines the hv state from an opcode with the one of the object
325 @param op_input: The input dict from the opcode
326 @param obj_input: The input dict from the objects
327 @return: The verified and updated dict
331 invalid_hvs = set(op_input) - constants.HYPER_TYPES
333 raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
334 " %s" % utils.CommaJoin(invalid_hvs),
336 if obj_input is None:
338 type_check = constants.HVSTS_PARAMETER_TYPES
339 return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
344 def MergeAndVerifyDiskState(op_input, obj_input):
345 """Combines the disk state from an opcode with the one of the object
347 @param op_input: The input dict from the opcode
348 @param obj_input: The input dict from the objects
349 @return: The verified and updated dict
352 invalid_dst = set(op_input) - constants.DS_VALID_TYPES
354 raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
355 utils.CommaJoin(invalid_dst),
357 type_check = constants.DSS_PARAMETER_TYPES
358 if obj_input is None:
360 return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
362 for key, value in op_input.items())
367 def CheckOSParams(lu, required, node_uuids, osname, osparams):
368 """OS parameters validation.
370 @type lu: L{LogicalUnit}
371 @param lu: the logical unit for which we check
372 @type required: boolean
373 @param required: whether the validation should fail if the OS is not
375 @type node_uuids: list
376 @param node_uuids: the list of nodes on which we should check
378 @param osname: the name of the hypervisor we should use
380 @param osparams: the parameters which we need to check
381 @raise errors.OpPrereqError: if the parameters are not valid
384 node_uuids = _FilterVmNodes(lu, node_uuids)
385 result = lu.rpc.call_os_validate(node_uuids, required, osname,
386 [constants.OS_VALIDATE_PARAMETERS],
388 for node_uuid, nres in result.items():
389 # we don't check for offline cases since this should be run only
390 # against the master node and/or an instance's nodes
391 nres.Raise("OS Parameters validation failed on node %s" %
392 lu.cfg.GetNodeName(node_uuid))
394 lu.LogInfo("OS %s not found on node %s, validation skipped",
395 osname, lu.cfg.GetNodeName(node_uuid))
398 def CheckHVParams(lu, node_uuids, hvname, hvparams):
399 """Hypervisor parameter validation.
401 This function abstract the hypervisor parameter validation to be
402 used in both instance create and instance modify.
404 @type lu: L{LogicalUnit}
405 @param lu: the logical unit for which we check
406 @type node_uuids: list
407 @param node_uuids: the list of nodes on which we should check
409 @param hvname: the name of the hypervisor we should use
411 @param hvparams: the parameters which we need to check
412 @raise errors.OpPrereqError: if the parameters are not valid
415 node_uuids = _FilterVmNodes(lu, node_uuids)
417 cluster = lu.cfg.GetClusterInfo()
418 hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
420 hvinfo = lu.rpc.call_hypervisor_validate_params(node_uuids, hvname, hvfull)
421 for node_uuid in node_uuids:
422 info = hvinfo[node_uuid]
425 info.Raise("Hypervisor parameter validation failed on node %s" %
426 lu.cfg.GetNodeName(node_uuid))
429 def AdjustCandidatePool(lu, exceptions):
430 """Adjust the candidate pool after node operations.
433 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
435 lu.LogInfo("Promoted nodes to master candidate role: %s",
436 utils.CommaJoin(node.name for node in mod_list))
437 for node in mod_list:
438 lu.context.ReaddNode(node)
439 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
441 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
445 def CheckNodePVs(nresult, exclusive_storage):
449 pvlist_dict = nresult.get(constants.NV_PVLIST, None)
450 if pvlist_dict is None:
451 return (["Can't get PV list from node"], None)
452 pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
454 # check that ':' is not present in PV names, since it's a
455 # special character for lvcreate (denotes the range of PEs to
459 errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
460 (pv.name, pv.vg_name))
462 if exclusive_storage:
463 (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
464 errlist.extend(errmsgs)
465 shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
467 for (pvname, lvlist) in shared_pvs:
468 # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
469 errlist.append("PV %s is shared among unrelated LVs (%s)" %
470 (pvname, utils.CommaJoin(lvlist)))
471 return (errlist, es_pvinfo)
474 def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
475 """Computes if value is in the desired range.
477 @param name: name of the parameter for which we perform the check
478 @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
480 @param ispecs: dictionary containing min and max values
481 @param value: actual value that we want to use
482 @return: None or an error string
485 if value in [None, constants.VALUE_AUTO]:
487 max_v = ispecs[constants.ISPECS_MAX].get(name, value)
488 min_v = ispecs[constants.ISPECS_MIN].get(name, value)
489 if value > max_v or min_v > value:
491 fqn = "%s/%s" % (name, qualifier)
494 return ("%s value %s is not in range [%s, %s]" %
495 (fqn, value, min_v, max_v))
499 def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
500 nic_count, disk_sizes, spindle_use,
502 _compute_fn=_ComputeMinMaxSpec):
503 """Verifies ipolicy against provided specs.
506 @param ipolicy: The ipolicy
508 @param mem_size: The memory size
510 @param cpu_count: Used cpu cores
511 @type disk_count: int
512 @param disk_count: Number of disks used
514 @param nic_count: Number of nics used
515 @type disk_sizes: list of ints
516 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
517 @type spindle_use: int
518 @param spindle_use: The number of spindles this instance uses
519 @type disk_template: string
520 @param disk_template: The disk template of the instance
521 @param _compute_fn: The compute function (unittest only)
522 @return: A list of violations, or an empty list of no violations are found
525 assert disk_count == len(disk_sizes)
528 (constants.ISPEC_MEM_SIZE, "", mem_size),
529 (constants.ISPEC_CPU_COUNT, "", cpu_count),
530 (constants.ISPEC_NIC_COUNT, "", nic_count),
531 (constants.ISPEC_SPINDLE_USE, "", spindle_use),
532 ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
533 for idx, d in enumerate(disk_sizes)]
534 if disk_template != constants.DT_DISKLESS:
535 # This check doesn't make sense for diskless instances
536 test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
538 allowed_dts = ipolicy[constants.IPOLICY_DTS]
539 if disk_template not in allowed_dts:
540 ret.append("Disk template %s is not allowed (allowed templates: %s)" %
541 (disk_template, utils.CommaJoin(allowed_dts)))
544 for minmax in ipolicy[constants.ISPECS_MINMAX]:
546 (_compute_fn(name, qualifier, minmax, value)
547 for (name, qualifier, value) in test_settings))
548 if min_errs is None or len(errs) < len(min_errs):
550 assert min_errs is not None
551 return ret + min_errs
554 def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
555 _compute_fn=ComputeIPolicySpecViolation):
556 """Compute if instance meets the specs of ipolicy.
559 @param ipolicy: The ipolicy to verify against
560 @type instance: L{objects.Instance}
561 @param instance: The instance to verify
562 @type cfg: L{config.ConfigWriter}
563 @param cfg: Cluster configuration
564 @param _compute_fn: The function to verify ipolicy (unittest only)
565 @see: L{ComputeIPolicySpecViolation}
569 be_full = cfg.GetClusterInfo().FillBE(instance)
570 mem_size = be_full[constants.BE_MAXMEM]
571 cpu_count = be_full[constants.BE_VCPUS]
572 es_flags = rpc.GetExclusiveStorageForNodes(cfg, instance.all_nodes)
573 if any(es_flags.values()):
574 # With exclusive storage use the actual spindles
576 spindle_use = sum([disk.spindles for disk in instance.disks])
578 ret.append("Number of spindles not configured for disks of instance %s"
579 " while exclusive storage is enabled, try running gnt-cluster"
580 " repair-disk-sizes" % instance.name)
581 # _ComputeMinMaxSpec ignores 'None's
584 spindle_use = be_full[constants.BE_SPINDLE_USE]
585 disk_count = len(instance.disks)
586 disk_sizes = [disk.size for disk in instance.disks]
587 nic_count = len(instance.nics)
588 disk_template = instance.disk_template
590 return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
591 disk_sizes, spindle_use, disk_template)
594 def _ComputeViolatingInstances(ipolicy, instances, cfg):
595 """Computes a set of instances who violates given ipolicy.
597 @param ipolicy: The ipolicy to verify
598 @type instances: L{objects.Instance}
599 @param instances: List of instances to verify
600 @type cfg: L{config.ConfigWriter}
601 @param cfg: Cluster configuration
602 @return: A frozenset of instance names violating the ipolicy
605 return frozenset([inst.name for inst in instances
606 if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
609 def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
610 """Computes a set of any instances that would violate the new ipolicy.
612 @param old_ipolicy: The current (still in-place) ipolicy
613 @param new_ipolicy: The new (to become) ipolicy
614 @param instances: List of instances to verify
615 @type cfg: L{config.ConfigWriter}
616 @param cfg: Cluster configuration
617 @return: A list of instances which violates the new ipolicy but
621 return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
622 _ComputeViolatingInstances(old_ipolicy, instances, cfg))
625 def GetUpdatedParams(old_params, update_dict,
626 use_default=True, use_none=False):
627 """Return the new version of a parameter dictionary.
629 @type old_params: dict
630 @param old_params: old parameters
631 @type update_dict: dict
632 @param update_dict: dict containing new parameter values, or
633 constants.VALUE_DEFAULT to reset the parameter to its default
635 @param use_default: boolean
636 @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
637 values as 'to be deleted' values
638 @param use_none: boolean
639 @type use_none: whether to recognise C{None} values as 'to be
642 @return: the new parameter dictionary
645 params_copy = copy.deepcopy(old_params)
646 for key, val in update_dict.iteritems():
647 if ((use_default and val == constants.VALUE_DEFAULT) or
648 (use_none and val is None)):
654 params_copy[key] = val
658 def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
659 """Return the new version of an instance policy.
661 @param group_policy: whether this policy applies to a group and thus
662 we should support removal of policy entries
665 ipolicy = copy.deepcopy(old_ipolicy)
666 for key, value in new_ipolicy.items():
667 if key not in constants.IPOLICY_ALL_KEYS:
668 raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
670 if (not value or value == [constants.VALUE_DEFAULT] or
671 value == constants.VALUE_DEFAULT):
676 raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
677 " on the cluster'" % key,
680 if key in constants.IPOLICY_PARAMETERS:
681 # FIXME: we assume all such values are float
683 ipolicy[key] = float(value)
684 except (TypeError, ValueError), err:
685 raise errors.OpPrereqError("Invalid value for attribute"
686 " '%s': '%s', error: %s" %
687 (key, value, err), errors.ECODE_INVAL)
688 elif key == constants.ISPECS_MINMAX:
690 for k in minmax.keys():
691 utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
693 elif key == constants.ISPECS_STD:
695 msg = "%s cannot appear in group instance specs" % key
696 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
697 ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
698 use_none=False, use_default=False)
699 utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
701 # FIXME: we assume all others are lists; this should be redone
703 ipolicy[key] = list(value)
705 objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
706 except errors.ConfigurationError, err:
707 raise errors.OpPrereqError("Invalid instance policy: %s" % err,
712 def AnnotateDiskParams(instance, devs, cfg):
713 """Little helper wrapper to the rpc annotation method.
715 @param instance: The instance object
716 @type devs: List of L{objects.Disk}
717 @param devs: The root devices (not any of its children!)
718 @param cfg: The config object
719 @returns The annotated disk copies
720 @see L{rpc.AnnotateDiskParams}
723 return rpc.AnnotateDiskParams(instance.disk_template, devs,
724 cfg.GetInstanceDiskParams(instance))
727 def SupportsOob(cfg, node):
728 """Tells if node supports OOB.
730 @type cfg: L{config.ConfigWriter}
731 @param cfg: The cluster configuration
732 @type node: L{objects.Node}
733 @param node: The node
734 @return: The OOB script if supported or an empty string otherwise
737 return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
740 def _UpdateAndVerifySubDict(base, updates, type_check):
741 """Updates and verifies a dict with sub dicts of the same type.
743 @param base: The dict with the old data
744 @param updates: The dict with the new data
745 @param type_check: Dict suitable to ForceDictType to verify correct types
746 @returns: A new dict with updated and verified values
750 new = GetUpdatedParams(old, value)
751 utils.ForceDictType(new, type_check)
754 ret = copy.deepcopy(base)
755 ret.update(dict((key, fn(base.get(key, {}), value))
756 for key, value in updates.items()))
760 def _FilterVmNodes(lu, node_uuids):
761 """Filters out non-vm_capable nodes from a list.
763 @type lu: L{LogicalUnit}
764 @param lu: the logical unit for which we check
765 @type node_uuids: list
766 @param node_uuids: the list of nodes on which we should check
768 @return: the list of vm-capable nodes
771 vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
772 return [uuid for uuid in node_uuids if uuid not in vm_nodes]
775 def GetDefaultIAllocator(cfg, ialloc):
776 """Decides on which iallocator to use.
778 @type cfg: L{config.ConfigWriter}
779 @param cfg: Cluster configuration object
780 @type ialloc: string or None
781 @param ialloc: Iallocator specified in opcode
783 @return: Iallocator name
787 # Use default iallocator
788 ialloc = cfg.GetDefaultIAllocator()
791 raise errors.OpPrereqError("No iallocator was specified, neither in the"
792 " opcode nor as a cluster-wide default",
798 def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_node_uuids,
800 """Checks if node groups for locked instances are still correct.
802 @type cfg: L{config.ConfigWriter}
803 @param cfg: Cluster configuration
804 @type instances: dict; string as key, L{objects.Instance} as value
805 @param instances: Dictionary, instance UUID as key, instance object as value
806 @type owned_groups: iterable of string
807 @param owned_groups: List of owned groups
808 @type owned_node_uuids: iterable of string
809 @param owned_node_uuids: List of owned nodes
810 @type cur_group_uuid: string or None
811 @param cur_group_uuid: Optional group UUID to check against instance's groups
814 for (uuid, inst) in instances.items():
815 assert owned_node_uuids.issuperset(inst.all_nodes), \
816 "Instance %s's nodes changed while we kept the lock" % inst.name
818 inst_groups = CheckInstanceNodeGroups(cfg, uuid, owned_groups)
820 assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
821 "Instance %s has no node in group %s" % (inst.name, cur_group_uuid)
824 def CheckInstanceNodeGroups(cfg, inst_uuid, owned_groups, primary_only=False):
825 """Checks if the owned node groups are still correct for an instance.
827 @type cfg: L{config.ConfigWriter}
828 @param cfg: The cluster configuration
829 @type inst_uuid: string
830 @param inst_uuid: Instance UUID
831 @type owned_groups: set or frozenset
832 @param owned_groups: List of currently owned node groups
833 @type primary_only: boolean
834 @param primary_only: Whether to check node groups for only the primary node
837 inst_groups = cfg.GetInstanceNodeGroups(inst_uuid, primary_only)
839 if not owned_groups.issuperset(inst_groups):
840 raise errors.OpPrereqError("Instance %s's node groups changed since"
841 " locks were acquired, current groups are"
842 " are '%s', owning groups '%s'; retry the"
844 (cfg.GetInstanceName(inst_uuid),
845 utils.CommaJoin(inst_groups),
846 utils.CommaJoin(owned_groups)),
852 def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
853 """Unpacks the result of change-group and node-evacuate iallocator requests.
855 Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
856 L{constants.IALLOCATOR_MODE_CHG_GROUP}.
858 @type lu: L{LogicalUnit}
859 @param lu: Logical unit instance
860 @type alloc_result: tuple/list
861 @param alloc_result: Result from iallocator
862 @type early_release: bool
863 @param early_release: Whether to release locks early if possible
864 @type use_nodes: bool
865 @param use_nodes: Whether to display node names instead of groups
868 (moved, failed, jobs) = alloc_result
871 failreason = utils.CommaJoin("%s (%s)" % (name, reason)
872 for (name, reason) in failed)
873 lu.LogWarning("Unable to evacuate instances %s", failreason)
874 raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
877 lu.LogInfo("Instances to be moved: %s",
880 (name, _NodeEvacDest(use_nodes, group, node_names))
881 for (name, group, node_names) in moved))
883 return [map(compat.partial(_SetOpEarlyRelease, early_release),
884 map(opcodes.OpCode.LoadOpCode, ops))
888 def _NodeEvacDest(use_nodes, group, node_names):
889 """Returns group or nodes depending on caller's choice.
893 return utils.CommaJoin(node_names)
898 def _SetOpEarlyRelease(early_release, op):
899 """Sets C{early_release} flag on opcodes if available.
903 op.early_release = early_release
904 except AttributeError:
905 assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
910 def MapInstanceLvsToNodes(instances):
911 """Creates a map from (node, volume) to instance name.
913 @type instances: list of L{objects.Instance}
914 @rtype: dict; tuple of (node uuid, volume name) as key, L{objects.Instance}
918 return dict(((node_uuid, vol), inst)
919 for inst in instances
920 for (node_uuid, vols) in inst.MapLVsByNode().items()
924 def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
925 """Make sure that none of the given paramters is global.
927 If a global parameter is found, an L{errors.OpPrereqError} exception is
928 raised. This is used to avoid setting global parameters for individual nodes.
930 @type params: dictionary
931 @param params: Parameters to check
932 @type glob_pars: dictionary
933 @param glob_pars: Forbidden parameters
935 @param kind: Kind of parameters (e.g. "node")
936 @type bad_levels: string
937 @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
939 @type good_levels: strings
940 @param good_levels: Level(s) at which the parameters are allowed (e.g.
944 used_globals = glob_pars.intersection(params)
946 msg = ("The following %s parameters are global and cannot"
947 " be customized at %s level, please modify them at"
949 (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
950 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
953 def IsExclusiveStorageEnabledNode(cfg, node):
954 """Whether exclusive_storage is in effect for the given node.
956 @type cfg: L{config.ConfigWriter}
957 @param cfg: The cluster configuration
958 @type node: L{objects.Node}
959 @param node: The node
961 @return: The effective value of exclusive_storage
964 return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
967 def CheckInstanceState(lu, instance, req_states, msg=None):
968 """Ensure that an instance is in one of the required states.
970 @param lu: the LU on behalf of which we make the check
971 @param instance: the instance to check
972 @param msg: if passed, should be a message to replace the default one
973 @raise errors.OpPrereqError: if the instance is not in the required state
977 msg = ("can't use instance from outside %s states" %
978 utils.CommaJoin(req_states))
979 if instance.admin_state not in req_states:
980 raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
981 (instance.name, instance.admin_state, msg),
984 if constants.ADMINST_UP not in req_states:
985 pnode_uuid = instance.primary_node
986 if not lu.cfg.GetNodeInfo(pnode_uuid).offline:
987 all_hvparams = lu.cfg.GetClusterInfo().hvparams
988 ins_l = lu.rpc.call_instance_list(
989 [pnode_uuid], [instance.hypervisor], all_hvparams)[pnode_uuid]
990 ins_l.Raise("Can't contact node %s for instance information" %
991 lu.cfg.GetNodeName(pnode_uuid),
992 prereq=True, ecode=errors.ECODE_ENVIRON)
993 if instance.name in ins_l.payload:
994 raise errors.OpPrereqError("Instance %s is running, %s" %
995 (instance.name, msg), errors.ECODE_STATE)
997 lu.LogWarning("Primary node offline, ignoring check that instance"
1001 def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1002 """Check the sanity of iallocator and node arguments and use the
1003 cluster-wide iallocator if appropriate.
1005 Check that at most one of (iallocator, node) is specified. If none is
1006 specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
1007 then the LU's opcode's iallocator slot is filled with the cluster-wide
1010 @type iallocator_slot: string
1011 @param iallocator_slot: the name of the opcode iallocator slot
1012 @type node_slot: string
1013 @param node_slot: the name of the opcode target node slot
1016 node = getattr(lu.op, node_slot, None)
1017 ialloc = getattr(lu.op, iallocator_slot, None)
1021 if node is not None and ialloc is not None:
1022 raise errors.OpPrereqError("Do not specify both, iallocator and node",
1024 elif ((node is None and ialloc is None) or
1025 ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1026 default_iallocator = lu.cfg.GetDefaultIAllocator()
1027 if default_iallocator:
1028 setattr(lu.op, iallocator_slot, default_iallocator)
1030 raise errors.OpPrereqError("No iallocator or node given and no"
1031 " cluster-wide default iallocator found;"
1032 " please specify either an iallocator or a"
1033 " node, or set a cluster-wide default"
1034 " iallocator", errors.ECODE_INVAL)
1037 def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_uuid, prereq):
1040 for dev in instance.disks:
1041 cfg.SetDiskID(dev, node_uuid)
1043 result = rpc_runner.call_blockdev_getmirrorstatus(
1044 node_uuid, (instance.disks, instance))
1045 result.Raise("Failed to get disk status from node %s" %
1046 cfg.GetNodeName(node_uuid),
1047 prereq=prereq, ecode=errors.ECODE_ENVIRON)
1049 for idx, bdev_status in enumerate(result.payload):
1050 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1056 def CheckNodeOnline(lu, node_uuid, msg=None):
1057 """Ensure that a given node is online.
1059 @param lu: the LU on behalf of which we make the check
1060 @param node_uuid: the node to check
1061 @param msg: if passed, should be a message to replace the default one
1062 @raise errors.OpPrereqError: if the node is offline
1066 msg = "Can't use offline node"
1067 if lu.cfg.GetNodeInfo(node_uuid).offline:
1068 raise errors.OpPrereqError("%s: %s" % (msg, lu.cfg.GetNodeName(node_uuid)),