4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Common functions used by multiple logical units."""
27 from ganeti import compat
28 from ganeti import constants
29 from ganeti import errors
30 from ganeti import hypervisor
31 from ganeti import locking
32 from ganeti import objects
33 from ganeti import opcodes
34 from ganeti import pathutils
35 from ganeti import rpc
36 from ganeti import ssconf
37 from ganeti import utils
41 INSTANCE_DOWN = [constants.ADMINST_DOWN]
42 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
43 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
45 #: Instance status in which an instance can be marked as offline/online
46 CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
47 constants.ADMINST_OFFLINE,
51 def _ExpandItemName(fn, name, kind):
52 """Expand an item name.
54 @param fn: the function to use for expansion
55 @param name: requested item name
56 @param kind: text description ('Node' or 'Instance')
57 @return: the resolved (full) name
58 @raise errors.OpPrereqError: if the item is not found
63 raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
68 def ExpandInstanceName(cfg, name):
69 """Wrapper over L{_ExpandItemName} for instance."""
70 return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
73 def ExpandNodeName(cfg, name):
74 """Wrapper over L{_ExpandItemName} for nodes."""
75 return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
79 """Returns a dict declaring all lock levels shared.
82 return dict.fromkeys(locking.LEVELS, 1)
85 def CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
86 """Checks if the instances in a node group are still correct.
88 @type cfg: L{config.ConfigWriter}
89 @param cfg: The cluster configuration
90 @type group_uuid: string
91 @param group_uuid: Node group UUID
92 @type owned_instances: set or frozenset
93 @param owned_instances: List of currently owned instances
96 wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
97 if owned_instances != wanted_instances:
98 raise errors.OpPrereqError("Instances in node group '%s' changed since"
99 " locks were acquired, wanted '%s', have '%s';"
100 " retry the operation" %
102 utils.CommaJoin(wanted_instances),
103 utils.CommaJoin(owned_instances)),
106 return wanted_instances
109 def GetWantedNodes(lu, nodes):
110 """Returns list of checked and expanded node names.
112 @type lu: L{LogicalUnit}
113 @param lu: the logical unit on whose behalf we execute
115 @param nodes: list of node names or None for all nodes
117 @return: the list of nodes, sorted
118 @raise errors.ProgrammerError: if the nodes parameter is wrong type
122 return [ExpandNodeName(lu.cfg, name) for name in nodes]
124 return utils.NiceSort(lu.cfg.GetNodeList())
127 def GetWantedInstances(lu, instances):
128 """Returns list of checked and expanded instance names.
130 @type lu: L{LogicalUnit}
131 @param lu: the logical unit on whose behalf we execute
132 @type instances: list
133 @param instances: list of instance names or None for all instances
135 @return: the list of instances, sorted
136 @raise errors.OpPrereqError: if the instances parameter is wrong type
137 @raise errors.OpPrereqError: if any of the passed instances is not found
141 wanted = [ExpandInstanceName(lu.cfg, name) for name in instances]
143 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
147 def RunPostHook(lu, node_name):
148 """Runs the post-hook for an opcode on a single node.
151 hm = lu.proc.BuildHooksManager(lu)
153 hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
154 except Exception, err: # pylint: disable=W0703
155 lu.LogWarning("Errors occurred running hooks on %s: %s",
159 def RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True):
160 """Distribute additional files which are part of the cluster configuration.
162 ConfigWriter takes care of distributing the config and ssconf files, but
163 there are more files which should be distributed to all nodes. This function
164 makes sure those are copied.
166 @param lu: calling logical unit
167 @param additional_nodes: list of nodes not in the config to distribute to
168 @type additional_vm: boolean
169 @param additional_vm: whether the additional nodes are vm-capable or not
172 # Gather target nodes
173 cluster = lu.cfg.GetClusterInfo()
174 master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
176 online_nodes = lu.cfg.GetOnlineNodeList()
177 online_set = frozenset(online_nodes)
178 vm_nodes = list(online_set.intersection(lu.cfg.GetVmCapableNodeList()))
180 if additional_nodes is not None:
181 online_nodes.extend(additional_nodes)
183 vm_nodes.extend(additional_nodes)
185 # Never distribute to master node
186 for nodelist in [online_nodes, vm_nodes]:
187 if master_info.name in nodelist:
188 nodelist.remove(master_info.name)
191 (files_all, _, files_mc, files_vm) = \
192 ComputeAncillaryFiles(cluster, True)
194 # Never re-distribute configuration file from here
195 assert not (pathutils.CLUSTER_CONF_FILE in files_all or
196 pathutils.CLUSTER_CONF_FILE in files_vm)
197 assert not files_mc, "Master candidates not handled in this function"
200 (online_nodes, files_all),
201 (vm_nodes, files_vm),
205 for (node_list, files) in filemap:
207 UploadHelper(lu, node_list, fname)
210 def ComputeAncillaryFiles(cluster, redist):
211 """Compute files external to Ganeti which need to be consistent.
213 @type redist: boolean
214 @param redist: Whether to include files which need to be redistributed
217 # Compute files for all nodes
219 pathutils.SSH_KNOWN_HOSTS_FILE,
220 pathutils.CONFD_HMAC_KEY,
221 pathutils.CLUSTER_DOMAIN_SECRET_FILE,
222 pathutils.SPICE_CERT_FILE,
223 pathutils.SPICE_CACERT_FILE,
224 pathutils.RAPI_USERS_FILE,
228 # we need to ship at least the RAPI certificate
229 files_all.add(pathutils.RAPI_CERT_FILE)
231 files_all.update(pathutils.ALL_CERT_FILES)
232 files_all.update(ssconf.SimpleStore().GetFileList())
234 if cluster.modify_etc_hosts:
235 files_all.add(pathutils.ETC_HOSTS)
237 if cluster.use_external_mip_script:
238 files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
240 # Files which are optional, these must:
241 # - be present in one other category as well
242 # - either exist or not exist on all nodes of that category (mc, vm all)
244 pathutils.RAPI_USERS_FILE,
247 # Files which should only be on master candidates
251 files_mc.add(pathutils.CLUSTER_CONF_FILE)
254 if (not redist and (constants.ENABLE_FILE_STORAGE or
255 constants.ENABLE_SHARED_FILE_STORAGE)):
256 files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
257 files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
259 # Files which should only be on VM-capable nodes
262 for hv_name in cluster.enabled_hypervisors
264 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
268 for hv_name in cluster.enabled_hypervisors
270 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
272 # Filenames in each category must be unique
273 all_files_set = files_all | files_mc | files_vm
274 assert (len(all_files_set) ==
275 sum(map(len, [files_all, files_mc, files_vm]))), \
276 "Found file listed in more than one file list"
278 # Optional files must be present in one other category
279 assert all_files_set.issuperset(files_opt), \
280 "Optional file not in a different required list"
282 # This one file should never ever be re-distributed via RPC
283 assert not (redist and
284 pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
286 return (files_all, files_opt, files_mc, files_vm)
289 def UploadHelper(lu, nodes, fname):
290 """Helper for uploading a file and showing warnings.
293 if os.path.exists(fname):
294 result = lu.rpc.call_upload_file(nodes, fname)
295 for to_node, to_result in result.items():
296 msg = to_result.fail_msg
298 msg = ("Copy of file %s to node %s failed: %s" %
299 (fname, to_node, msg))
303 def MergeAndVerifyHvState(op_input, obj_input):
304 """Combines the hv state from an opcode with the one of the object
306 @param op_input: The input dict from the opcode
307 @param obj_input: The input dict from the objects
308 @return: The verified and updated dict
312 invalid_hvs = set(op_input) - constants.HYPER_TYPES
314 raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
315 " %s" % utils.CommaJoin(invalid_hvs),
317 if obj_input is None:
319 type_check = constants.HVSTS_PARAMETER_TYPES
320 return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
325 def MergeAndVerifyDiskState(op_input, obj_input):
326 """Combines the disk state from an opcode with the one of the object
328 @param op_input: The input dict from the opcode
329 @param obj_input: The input dict from the objects
330 @return: The verified and updated dict
333 invalid_dst = set(op_input) - constants.DS_VALID_TYPES
335 raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
336 utils.CommaJoin(invalid_dst),
338 type_check = constants.DSS_PARAMETER_TYPES
339 if obj_input is None:
341 return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
343 for key, value in op_input.items())
348 def CheckOSParams(lu, required, nodenames, osname, osparams):
349 """OS parameters validation.
351 @type lu: L{LogicalUnit}
352 @param lu: the logical unit for which we check
353 @type required: boolean
354 @param required: whether the validation should fail if the OS is not
356 @type nodenames: list
357 @param nodenames: the list of nodes on which we should check
359 @param osname: the name of the hypervisor we should use
361 @param osparams: the parameters which we need to check
362 @raise errors.OpPrereqError: if the parameters are not valid
365 nodenames = _FilterVmNodes(lu, nodenames)
366 result = lu.rpc.call_os_validate(nodenames, required, osname,
367 [constants.OS_VALIDATE_PARAMETERS],
369 for node, nres in result.items():
370 # we don't check for offline cases since this should be run only
371 # against the master node and/or an instance's nodes
372 nres.Raise("OS Parameters validation failed on node %s" % node)
374 lu.LogInfo("OS %s not found on node %s, validation skipped",
378 def CheckHVParams(lu, nodenames, hvname, hvparams):
379 """Hypervisor parameter validation.
381 This function abstract the hypervisor parameter validation to be
382 used in both instance create and instance modify.
384 @type lu: L{LogicalUnit}
385 @param lu: the logical unit for which we check
386 @type nodenames: list
387 @param nodenames: the list of nodes on which we should check
389 @param hvname: the name of the hypervisor we should use
391 @param hvparams: the parameters which we need to check
392 @raise errors.OpPrereqError: if the parameters are not valid
395 nodenames = _FilterVmNodes(lu, nodenames)
397 cluster = lu.cfg.GetClusterInfo()
398 hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
400 hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull)
401 for node in nodenames:
405 info.Raise("Hypervisor parameter validation failed on node %s" % node)
408 def AdjustCandidatePool(lu, exceptions):
409 """Adjust the candidate pool after node operations.
412 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
414 lu.LogInfo("Promoted nodes to master candidate role: %s",
415 utils.CommaJoin(node.name for node in mod_list))
416 for name in mod_list:
417 lu.context.ReaddNode(name)
418 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
420 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
424 def CheckNodePVs(nresult, exclusive_storage):
428 pvlist_dict = nresult.get(constants.NV_PVLIST, None)
429 if pvlist_dict is None:
430 return (["Can't get PV list from node"], None)
431 pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
433 # check that ':' is not present in PV names, since it's a
434 # special character for lvcreate (denotes the range of PEs to
438 errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
439 (pv.name, pv.vg_name))
441 if exclusive_storage:
442 (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
443 errlist.extend(errmsgs)
444 shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
446 for (pvname, lvlist) in shared_pvs:
447 # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
448 errlist.append("PV %s is shared among unrelated LVs (%s)" %
449 (pvname, utils.CommaJoin(lvlist)))
450 return (errlist, es_pvinfo)
453 def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
454 """Computes if value is in the desired range.
456 @param name: name of the parameter for which we perform the check
457 @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
459 @param ispecs: dictionary containing min and max values
460 @param value: actual value that we want to use
461 @return: None or an error string
464 if value in [None, constants.VALUE_AUTO]:
466 max_v = ispecs[constants.ISPECS_MAX].get(name, value)
467 min_v = ispecs[constants.ISPECS_MIN].get(name, value)
468 if value > max_v or min_v > value:
470 fqn = "%s/%s" % (name, qualifier)
473 return ("%s value %s is not in range [%s, %s]" %
474 (fqn, value, min_v, max_v))
478 def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
479 nic_count, disk_sizes, spindle_use,
481 _compute_fn=_ComputeMinMaxSpec):
482 """Verifies ipolicy against provided specs.
485 @param ipolicy: The ipolicy
487 @param mem_size: The memory size
489 @param cpu_count: Used cpu cores
490 @type disk_count: int
491 @param disk_count: Number of disks used
493 @param nic_count: Number of nics used
494 @type disk_sizes: list of ints
495 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
496 @type spindle_use: int
497 @param spindle_use: The number of spindles this instance uses
498 @type disk_template: string
499 @param disk_template: The disk template of the instance
500 @param _compute_fn: The compute function (unittest only)
501 @return: A list of violations, or an empty list of no violations are found
504 assert disk_count == len(disk_sizes)
507 (constants.ISPEC_MEM_SIZE, "", mem_size),
508 (constants.ISPEC_CPU_COUNT, "", cpu_count),
509 (constants.ISPEC_NIC_COUNT, "", nic_count),
510 (constants.ISPEC_SPINDLE_USE, "", spindle_use),
511 ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
512 for idx, d in enumerate(disk_sizes)]
513 if disk_template != constants.DT_DISKLESS:
514 # This check doesn't make sense for diskless instances
515 test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
517 allowed_dts = ipolicy[constants.IPOLICY_DTS]
518 if disk_template not in allowed_dts:
519 ret.append("Disk template %s is not allowed (allowed templates: %s)" %
520 (disk_template, utils.CommaJoin(allowed_dts)))
523 for minmax in ipolicy[constants.ISPECS_MINMAX]:
525 (_compute_fn(name, qualifier, minmax, value)
526 for (name, qualifier, value) in test_settings))
527 if min_errs is None or len(errs) < len(min_errs):
529 assert min_errs is not None
530 return ret + min_errs
533 def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
534 _compute_fn=ComputeIPolicySpecViolation):
535 """Compute if instance meets the specs of ipolicy.
538 @param ipolicy: The ipolicy to verify against
539 @type instance: L{objects.Instance}
540 @param instance: The instance to verify
541 @type cfg: L{config.ConfigWriter}
542 @param cfg: Cluster configuration
543 @param _compute_fn: The function to verify ipolicy (unittest only)
544 @see: L{ComputeIPolicySpecViolation}
548 be_full = cfg.GetClusterInfo().FillBE(instance)
549 mem_size = be_full[constants.BE_MAXMEM]
550 cpu_count = be_full[constants.BE_VCPUS]
551 es_flags = rpc.GetExclusiveStorageForNodeNames(cfg, instance.all_nodes)
552 if any(es_flags.values()):
553 # With exclusive storage use the actual spindles
555 spindle_use = sum([disk.spindles for disk in instance.disks])
557 ret.append("Number of spindles not configured for disks of instance %s"
558 " while exclusive storage is enabled, try running gnt-cluster"
559 " repair-disk-sizes" % instance.name)
560 # _ComputeMinMaxSpec ignores 'None's
563 spindle_use = be_full[constants.BE_SPINDLE_USE]
564 disk_count = len(instance.disks)
565 disk_sizes = [disk.size for disk in instance.disks]
566 nic_count = len(instance.nics)
567 disk_template = instance.disk_template
569 return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
570 disk_sizes, spindle_use, disk_template)
573 def _ComputeViolatingInstances(ipolicy, instances, cfg):
574 """Computes a set of instances who violates given ipolicy.
576 @param ipolicy: The ipolicy to verify
577 @type instances: L{objects.Instance}
578 @param instances: List of instances to verify
579 @type cfg: L{config.ConfigWriter}
580 @param cfg: Cluster configuration
581 @return: A frozenset of instance names violating the ipolicy
584 return frozenset([inst.name for inst in instances
585 if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
588 def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
589 """Computes a set of any instances that would violate the new ipolicy.
591 @param old_ipolicy: The current (still in-place) ipolicy
592 @param new_ipolicy: The new (to become) ipolicy
593 @param instances: List of instances to verify
594 @type cfg: L{config.ConfigWriter}
595 @param cfg: Cluster configuration
596 @return: A list of instances which violates the new ipolicy but
600 return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
601 _ComputeViolatingInstances(old_ipolicy, instances, cfg))
604 def GetUpdatedParams(old_params, update_dict,
605 use_default=True, use_none=False):
606 """Return the new version of a parameter dictionary.
608 @type old_params: dict
609 @param old_params: old parameters
610 @type update_dict: dict
611 @param update_dict: dict containing new parameter values, or
612 constants.VALUE_DEFAULT to reset the parameter to its default
614 @param use_default: boolean
615 @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
616 values as 'to be deleted' values
617 @param use_none: boolean
618 @type use_none: whether to recognise C{None} values as 'to be
621 @return: the new parameter dictionary
624 params_copy = copy.deepcopy(old_params)
625 for key, val in update_dict.iteritems():
626 if ((use_default and val == constants.VALUE_DEFAULT) or
627 (use_none and val is None)):
633 params_copy[key] = val
637 def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
638 """Return the new version of an instance policy.
640 @param group_policy: whether this policy applies to a group and thus
641 we should support removal of policy entries
644 ipolicy = copy.deepcopy(old_ipolicy)
645 for key, value in new_ipolicy.items():
646 if key not in constants.IPOLICY_ALL_KEYS:
647 raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
649 if (not value or value == [constants.VALUE_DEFAULT] or
650 value == constants.VALUE_DEFAULT):
655 raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
656 " on the cluster'" % key,
659 if key in constants.IPOLICY_PARAMETERS:
660 # FIXME: we assume all such values are float
662 ipolicy[key] = float(value)
663 except (TypeError, ValueError), err:
664 raise errors.OpPrereqError("Invalid value for attribute"
665 " '%s': '%s', error: %s" %
666 (key, value, err), errors.ECODE_INVAL)
667 elif key == constants.ISPECS_MINMAX:
669 for k in minmax.keys():
670 utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
672 elif key == constants.ISPECS_STD:
674 msg = "%s cannot appear in group instance specs" % key
675 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
676 ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
677 use_none=False, use_default=False)
678 utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
680 # FIXME: we assume all others are lists; this should be redone
682 ipolicy[key] = list(value)
684 objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
685 except errors.ConfigurationError, err:
686 raise errors.OpPrereqError("Invalid instance policy: %s" % err,
691 def AnnotateDiskParams(instance, devs, cfg):
692 """Little helper wrapper to the rpc annotation method.
694 @param instance: The instance object
695 @type devs: List of L{objects.Disk}
696 @param devs: The root devices (not any of its children!)
697 @param cfg: The config object
698 @returns The annotated disk copies
699 @see L{rpc.AnnotateDiskParams}
702 return rpc.AnnotateDiskParams(instance.disk_template, devs,
703 cfg.GetInstanceDiskParams(instance))
706 def SupportsOob(cfg, node):
707 """Tells if node supports OOB.
709 @type cfg: L{config.ConfigWriter}
710 @param cfg: The cluster configuration
711 @type node: L{objects.Node}
712 @param node: The node
713 @return: The OOB script if supported or an empty string otherwise
716 return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
719 def _UpdateAndVerifySubDict(base, updates, type_check):
720 """Updates and verifies a dict with sub dicts of the same type.
722 @param base: The dict with the old data
723 @param updates: The dict with the new data
724 @param type_check: Dict suitable to ForceDictType to verify correct types
725 @returns: A new dict with updated and verified values
729 new = GetUpdatedParams(old, value)
730 utils.ForceDictType(new, type_check)
733 ret = copy.deepcopy(base)
734 ret.update(dict((key, fn(base.get(key, {}), value))
735 for key, value in updates.items()))
739 def _FilterVmNodes(lu, nodenames):
740 """Filters out non-vm_capable nodes from a list.
742 @type lu: L{LogicalUnit}
743 @param lu: the logical unit for which we check
744 @type nodenames: list
745 @param nodenames: the list of nodes on which we should check
747 @return: the list of vm-capable nodes
750 vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
751 return [name for name in nodenames if name not in vm_nodes]
754 def GetDefaultIAllocator(cfg, ialloc):
755 """Decides on which iallocator to use.
757 @type cfg: L{config.ConfigWriter}
758 @param cfg: Cluster configuration object
759 @type ialloc: string or None
760 @param ialloc: Iallocator specified in opcode
762 @return: Iallocator name
766 # Use default iallocator
767 ialloc = cfg.GetDefaultIAllocator()
770 raise errors.OpPrereqError("No iallocator was specified, neither in the"
771 " opcode nor as a cluster-wide default",
777 def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
779 """Checks if node groups for locked instances are still correct.
781 @type cfg: L{config.ConfigWriter}
782 @param cfg: Cluster configuration
783 @type instances: dict; string as key, L{objects.Instance} as value
784 @param instances: Dictionary, instance name as key, instance object as value
785 @type owned_groups: iterable of string
786 @param owned_groups: List of owned groups
787 @type owned_nodes: iterable of string
788 @param owned_nodes: List of owned nodes
789 @type cur_group_uuid: string or None
790 @param cur_group_uuid: Optional group UUID to check against instance's groups
793 for (name, inst) in instances.items():
794 assert owned_nodes.issuperset(inst.all_nodes), \
795 "Instance %s's nodes changed while we kept the lock" % name
797 inst_groups = CheckInstanceNodeGroups(cfg, name, owned_groups)
799 assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
800 "Instance %s has no node in group %s" % (name, cur_group_uuid)
803 def CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
805 """Checks if the owned node groups are still correct for an instance.
807 @type cfg: L{config.ConfigWriter}
808 @param cfg: The cluster configuration
809 @type instance_name: string
810 @param instance_name: Instance name
811 @type owned_groups: set or frozenset
812 @param owned_groups: List of currently owned node groups
813 @type primary_only: boolean
814 @param primary_only: Whether to check node groups for only the primary node
817 inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
819 if not owned_groups.issuperset(inst_groups):
820 raise errors.OpPrereqError("Instance %s's node groups changed since"
821 " locks were acquired, current groups are"
822 " are '%s', owning groups '%s'; retry the"
825 utils.CommaJoin(inst_groups),
826 utils.CommaJoin(owned_groups)),
832 def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
833 """Unpacks the result of change-group and node-evacuate iallocator requests.
835 Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
836 L{constants.IALLOCATOR_MODE_CHG_GROUP}.
838 @type lu: L{LogicalUnit}
839 @param lu: Logical unit instance
840 @type alloc_result: tuple/list
841 @param alloc_result: Result from iallocator
842 @type early_release: bool
843 @param early_release: Whether to release locks early if possible
844 @type use_nodes: bool
845 @param use_nodes: Whether to display node names instead of groups
848 (moved, failed, jobs) = alloc_result
851 failreason = utils.CommaJoin("%s (%s)" % (name, reason)
852 for (name, reason) in failed)
853 lu.LogWarning("Unable to evacuate instances %s", failreason)
854 raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
857 lu.LogInfo("Instances to be moved: %s",
858 utils.CommaJoin("%s (to %s)" %
859 (name, _NodeEvacDest(use_nodes, group, nodes))
860 for (name, group, nodes) in moved))
862 return [map(compat.partial(_SetOpEarlyRelease, early_release),
863 map(opcodes.OpCode.LoadOpCode, ops))
867 def _NodeEvacDest(use_nodes, group, nodes):
868 """Returns group or nodes depending on caller's choice.
872 return utils.CommaJoin(nodes)
877 def _SetOpEarlyRelease(early_release, op):
878 """Sets C{early_release} flag on opcodes if available.
882 op.early_release = early_release
883 except AttributeError:
884 assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
889 def MapInstanceDisksToNodes(instances):
890 """Creates a map from (node, volume) to instance name.
892 @type instances: list of L{objects.Instance}
893 @rtype: dict; tuple of (node name, volume name) as key, instance name as value
896 return dict(((node, vol), inst.name)
897 for inst in instances
898 for (node, vols) in inst.MapLVsByNode().items()
902 def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
903 """Make sure that none of the given paramters is global.
905 If a global parameter is found, an L{errors.OpPrereqError} exception is
906 raised. This is used to avoid setting global parameters for individual nodes.
908 @type params: dictionary
909 @param params: Parameters to check
910 @type glob_pars: dictionary
911 @param glob_pars: Forbidden parameters
913 @param kind: Kind of parameters (e.g. "node")
914 @type bad_levels: string
915 @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
917 @type good_levels: strings
918 @param good_levels: Level(s) at which the parameters are allowed (e.g.
922 used_globals = glob_pars.intersection(params)
924 msg = ("The following %s parameters are global and cannot"
925 " be customized at %s level, please modify them at"
927 (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
928 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
931 def IsExclusiveStorageEnabledNode(cfg, node):
932 """Whether exclusive_storage is in effect for the given node.
934 @type cfg: L{config.ConfigWriter}
935 @param cfg: The cluster configuration
936 @type node: L{objects.Node}
937 @param node: The node
939 @return: The effective value of exclusive_storage
942 return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
945 def CheckInstanceState(lu, instance, req_states, msg=None):
946 """Ensure that an instance is in one of the required states.
948 @param lu: the LU on behalf of which we make the check
949 @param instance: the instance to check
950 @param msg: if passed, should be a message to replace the default one
951 @raise errors.OpPrereqError: if the instance is not in the required state
955 msg = ("can't use instance from outside %s states" %
956 utils.CommaJoin(req_states))
957 if instance.admin_state not in req_states:
958 raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
959 (instance.name, instance.admin_state, msg),
962 if constants.ADMINST_UP not in req_states:
963 pnode = instance.primary_node
964 if not lu.cfg.GetNodeInfo(pnode).offline:
965 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
966 ins_l.Raise("Can't contact node %s for instance information" % pnode,
967 prereq=True, ecode=errors.ECODE_ENVIRON)
968 if instance.name in ins_l.payload:
969 raise errors.OpPrereqError("Instance %s is running, %s" %
970 (instance.name, msg), errors.ECODE_STATE)
972 lu.LogWarning("Primary node offline, ignoring check that instance"
976 def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
977 """Check the sanity of iallocator and node arguments and use the
978 cluster-wide iallocator if appropriate.
980 Check that at most one of (iallocator, node) is specified. If none is
981 specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
982 then the LU's opcode's iallocator slot is filled with the cluster-wide
985 @type iallocator_slot: string
986 @param iallocator_slot: the name of the opcode iallocator slot
987 @type node_slot: string
988 @param node_slot: the name of the opcode target node slot
991 node = getattr(lu.op, node_slot, None)
992 ialloc = getattr(lu.op, iallocator_slot, None)
996 if node is not None and ialloc is not None:
997 raise errors.OpPrereqError("Do not specify both, iallocator and node",
999 elif ((node is None and ialloc is None) or
1000 ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1001 default_iallocator = lu.cfg.GetDefaultIAllocator()
1002 if default_iallocator:
1003 setattr(lu.op, iallocator_slot, default_iallocator)
1005 raise errors.OpPrereqError("No iallocator or node given and no"
1006 " cluster-wide default iallocator found;"
1007 " please specify either an iallocator or a"
1008 " node, or set a cluster-wide default"
1009 " iallocator", errors.ECODE_INVAL)
1012 def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1015 for dev in instance.disks:
1016 cfg.SetDiskID(dev, node_name)
1018 result = rpc_runner.call_blockdev_getmirrorstatus(node_name,
1021 result.Raise("Failed to get disk status from node %s" % node_name,
1022 prereq=prereq, ecode=errors.ECODE_ENVIRON)
1024 for idx, bdev_status in enumerate(result.payload):
1025 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1031 def CheckNodeOnline(lu, node, msg=None):
1032 """Ensure that a given node is online.
1034 @param lu: the LU on behalf of which we make the check
1035 @param node: the node to check
1036 @param msg: if passed, should be a message to replace the default one
1037 @raise errors.OpPrereqError: if the node is offline
1041 msg = "Can't use offline node"
1042 if lu.cfg.GetNodeInfo(node).offline:
1043 raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)