4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Common functions used by multiple logical units."""
27 from ganeti import compat
28 from ganeti import constants
29 from ganeti import errors
30 from ganeti import hypervisor
31 from ganeti import locking
32 from ganeti import objects
33 from ganeti import opcodes
34 from ganeti import pathutils
35 from ganeti import rpc
36 from ganeti import ssconf
37 from ganeti import utils
41 INSTANCE_DOWN = [constants.ADMINST_DOWN]
42 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
43 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
45 #: Instance status in which an instance can be marked as offline/online
46 CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
47 constants.ADMINST_OFFLINE,
51 def _ExpandItemName(expand_fn, name, kind):
52 """Expand an item name.
54 @param expand_fn: the function to use for expansion
55 @param name: requested item name
56 @param kind: text description ('Node' or 'Instance')
57 @return: the result of the expand_fn, if successful
58 @raise errors.OpPrereqError: if the item is not found
61 full_name = expand_fn(name)
63 raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
68 def ExpandInstanceName(cfg, name):
69 """Wrapper over L{_ExpandItemName} for instance."""
70 return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
73 def ExpandNodeUuidAndName(cfg, expected_uuid, name):
74 """Expand a short node name into the node UUID and full name.
76 @type cfg: L{config.ConfigWriter}
77 @param cfg: The cluster configuration
78 @type expected_uuid: string
79 @param expected_uuid: expected UUID for the node (or None if there is no
80 expectation). If it does not match, a L{errors.OpPrereqError} is
83 @param name: the short node name
86 (uuid, full_name) = _ExpandItemName(cfg.ExpandNodeName, name, "Node")
87 if expected_uuid is not None and uuid != expected_uuid:
88 raise errors.OpPrereqError(
89 "The nodes UUID '%s' does not match the expected UUID '%s' for node"
90 " '%s'. Maybe the node changed since you submitted this job." %
91 (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
92 return (uuid, full_name)
96 """Returns a dict declaring all lock levels shared.
99 return dict.fromkeys(locking.LEVELS, 1)
102 def CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
103 """Checks if the instances in a node group are still correct.
105 @type cfg: L{config.ConfigWriter}
106 @param cfg: The cluster configuration
107 @type group_uuid: string
108 @param group_uuid: Node group UUID
109 @type owned_instances: set or frozenset
110 @param owned_instances: List of currently owned instances
113 wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
114 if owned_instances != wanted_instances:
115 raise errors.OpPrereqError("Instances in node group '%s' changed since"
116 " locks were acquired, wanted '%s', have '%s';"
117 " retry the operation" %
119 utils.CommaJoin(wanted_instances),
120 utils.CommaJoin(owned_instances)),
123 return wanted_instances
126 def GetWantedNodes(lu, short_node_names):
127 """Returns list of checked and expanded node names.
129 @type lu: L{LogicalUnit}
130 @param lu: the logical unit on whose behalf we execute
131 @type short_node_names: list
132 @param short_node_names: list of node names or None for all nodes
133 @rtype: tuple of lists
134 @return: tupe with (list of node UUIDs, list of node names)
135 @raise errors.ProgrammerError: if the nodes parameter is wrong type
139 node_uuids = [ExpandNodeUuidAndName(lu.cfg, None, name)[0]
140 for name in short_node_names]
142 node_uuids = lu.cfg.GetNodeList()
144 return (node_uuids, [lu.cfg.GetNodeName(uuid) for uuid in node_uuids])
147 def GetWantedInstances(lu, instances):
148 """Returns list of checked and expanded instance names.
150 @type lu: L{LogicalUnit}
151 @param lu: the logical unit on whose behalf we execute
152 @type instances: list
153 @param instances: list of instance names or None for all instances
155 @return: the list of instances, sorted
156 @raise errors.OpPrereqError: if the instances parameter is wrong type
157 @raise errors.OpPrereqError: if any of the passed instances is not found
161 wanted = [ExpandInstanceName(lu.cfg, name) for name in instances]
163 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
167 def RunPostHook(lu, node_name):
168 """Runs the post-hook for an opcode on a single node.
171 hm = lu.proc.BuildHooksManager(lu)
173 hm.RunPhase(constants.HOOKS_PHASE_POST, node_names=[node_name])
174 except Exception, err: # pylint: disable=W0703
175 lu.LogWarning("Errors occurred running hooks on %s: %s",
179 def RedistributeAncillaryFiles(lu):
180 """Distribute additional files which are part of the cluster configuration.
182 ConfigWriter takes care of distributing the config and ssconf files, but
183 there are more files which should be distributed to all nodes. This function
184 makes sure those are copied.
187 # Gather target nodes
188 cluster = lu.cfg.GetClusterInfo()
189 master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
191 online_node_uuids = lu.cfg.GetOnlineNodeList()
192 online_node_uuid_set = frozenset(online_node_uuids)
193 vm_node_uuids = list(online_node_uuid_set.intersection(
194 lu.cfg.GetVmCapableNodeList()))
196 # Never distribute to master node
197 for node_uuids in [online_node_uuids, vm_node_uuids]:
198 if master_info.uuid in node_uuids:
199 node_uuids.remove(master_info.uuid)
202 (files_all, _, files_mc, files_vm) = \
203 ComputeAncillaryFiles(cluster, True)
205 # Never re-distribute configuration file from here
206 assert not (pathutils.CLUSTER_CONF_FILE in files_all or
207 pathutils.CLUSTER_CONF_FILE in files_vm)
208 assert not files_mc, "Master candidates not handled in this function"
211 (online_node_uuids, files_all),
212 (vm_node_uuids, files_vm),
216 for (node_uuids, files) in filemap:
218 UploadHelper(lu, node_uuids, fname)
221 def ComputeAncillaryFiles(cluster, redist):
222 """Compute files external to Ganeti which need to be consistent.
224 @type redist: boolean
225 @param redist: Whether to include files which need to be redistributed
228 # Compute files for all nodes
230 pathutils.SSH_KNOWN_HOSTS_FILE,
231 pathutils.CONFD_HMAC_KEY,
232 pathutils.CLUSTER_DOMAIN_SECRET_FILE,
233 pathutils.SPICE_CERT_FILE,
234 pathutils.SPICE_CACERT_FILE,
235 pathutils.RAPI_USERS_FILE,
239 # we need to ship at least the RAPI certificate
240 files_all.add(pathutils.RAPI_CERT_FILE)
242 files_all.update(pathutils.ALL_CERT_FILES)
243 files_all.update(ssconf.SimpleStore().GetFileList())
245 if cluster.modify_etc_hosts:
246 files_all.add(pathutils.ETC_HOSTS)
248 if cluster.use_external_mip_script:
249 files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
251 # Files which are optional, these must:
252 # - be present in one other category as well
253 # - either exist or not exist on all nodes of that category (mc, vm all)
255 pathutils.RAPI_USERS_FILE,
258 # Files which should only be on master candidates
262 files_mc.add(pathutils.CLUSTER_CONF_FILE)
265 if (not redist and (constants.ENABLE_FILE_STORAGE or
266 constants.ENABLE_SHARED_FILE_STORAGE)):
267 files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
268 files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
270 # Files which should only be on VM-capable nodes
273 for hv_name in cluster.enabled_hypervisors
275 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
279 for hv_name in cluster.enabled_hypervisors
281 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
283 # Filenames in each category must be unique
284 all_files_set = files_all | files_mc | files_vm
285 assert (len(all_files_set) ==
286 sum(map(len, [files_all, files_mc, files_vm]))), \
287 "Found file listed in more than one file list"
289 # Optional files must be present in one other category
290 assert all_files_set.issuperset(files_opt), \
291 "Optional file not in a different required list"
293 # This one file should never ever be re-distributed via RPC
294 assert not (redist and
295 pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
297 return (files_all, files_opt, files_mc, files_vm)
300 def UploadHelper(lu, node_uuids, fname):
301 """Helper for uploading a file and showing warnings.
304 if os.path.exists(fname):
305 result = lu.rpc.call_upload_file(node_uuids, fname)
306 for to_node_uuids, to_result in result.items():
307 msg = to_result.fail_msg
309 msg = ("Copy of file %s to node %s failed: %s" %
310 (fname, lu.cfg.GetNodeName(to_node_uuids), msg))
314 def MergeAndVerifyHvState(op_input, obj_input):
315 """Combines the hv state from an opcode with the one of the object
317 @param op_input: The input dict from the opcode
318 @param obj_input: The input dict from the objects
319 @return: The verified and updated dict
323 invalid_hvs = set(op_input) - constants.HYPER_TYPES
325 raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
326 " %s" % utils.CommaJoin(invalid_hvs),
328 if obj_input is None:
330 type_check = constants.HVSTS_PARAMETER_TYPES
331 return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
336 def MergeAndVerifyDiskState(op_input, obj_input):
337 """Combines the disk state from an opcode with the one of the object
339 @param op_input: The input dict from the opcode
340 @param obj_input: The input dict from the objects
341 @return: The verified and updated dict
344 invalid_dst = set(op_input) - constants.DS_VALID_TYPES
346 raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
347 utils.CommaJoin(invalid_dst),
349 type_check = constants.DSS_PARAMETER_TYPES
350 if obj_input is None:
352 return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
354 for key, value in op_input.items())
359 def CheckOSParams(lu, required, node_uuids, osname, osparams):
360 """OS parameters validation.
362 @type lu: L{LogicalUnit}
363 @param lu: the logical unit for which we check
364 @type required: boolean
365 @param required: whether the validation should fail if the OS is not
367 @type node_uuids: list
368 @param node_uuids: the list of nodes on which we should check
370 @param osname: the name of the hypervisor we should use
372 @param osparams: the parameters which we need to check
373 @raise errors.OpPrereqError: if the parameters are not valid
376 node_uuids = _FilterVmNodes(lu, node_uuids)
377 result = lu.rpc.call_os_validate(node_uuids, required, osname,
378 [constants.OS_VALIDATE_PARAMETERS],
380 for node_uuid, nres in result.items():
381 # we don't check for offline cases since this should be run only
382 # against the master node and/or an instance's nodes
383 nres.Raise("OS Parameters validation failed on node %s" %
384 lu.cfg.GetNodeName(node_uuid))
386 lu.LogInfo("OS %s not found on node %s, validation skipped",
387 osname, lu.cfg.GetNodeName(node_uuid))
390 def CheckHVParams(lu, node_uuids, hvname, hvparams):
391 """Hypervisor parameter validation.
393 This function abstract the hypervisor parameter validation to be
394 used in both instance create and instance modify.
396 @type lu: L{LogicalUnit}
397 @param lu: the logical unit for which we check
398 @type node_uuids: list
399 @param node_uuids: the list of nodes on which we should check
401 @param hvname: the name of the hypervisor we should use
403 @param hvparams: the parameters which we need to check
404 @raise errors.OpPrereqError: if the parameters are not valid
407 node_uuids = _FilterVmNodes(lu, node_uuids)
409 cluster = lu.cfg.GetClusterInfo()
410 hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
412 hvinfo = lu.rpc.call_hypervisor_validate_params(node_uuids, hvname, hvfull)
413 for node_uuid in node_uuids:
414 info = hvinfo[node_uuid]
417 info.Raise("Hypervisor parameter validation failed on node %s" %
418 lu.cfg.GetNodeName(node_uuid))
421 def AdjustCandidatePool(lu, exceptions):
422 """Adjust the candidate pool after node operations.
425 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
427 lu.LogInfo("Promoted nodes to master candidate role: %s",
428 utils.CommaJoin(node.name for node in mod_list))
429 for node in mod_list:
430 lu.context.ReaddNode(node)
431 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
433 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
437 def CheckNodePVs(nresult, exclusive_storage):
441 pvlist_dict = nresult.get(constants.NV_PVLIST, None)
442 if pvlist_dict is None:
443 return (["Can't get PV list from node"], None)
444 pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
446 # check that ':' is not present in PV names, since it's a
447 # special character for lvcreate (denotes the range of PEs to
451 errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
452 (pv.name, pv.vg_name))
454 if exclusive_storage:
455 (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
456 errlist.extend(errmsgs)
457 shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
459 for (pvname, lvlist) in shared_pvs:
460 # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
461 errlist.append("PV %s is shared among unrelated LVs (%s)" %
462 (pvname, utils.CommaJoin(lvlist)))
463 return (errlist, es_pvinfo)
466 def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
467 """Computes if value is in the desired range.
469 @param name: name of the parameter for which we perform the check
470 @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
472 @param ispecs: dictionary containing min and max values
473 @param value: actual value that we want to use
474 @return: None or an error string
477 if value in [None, constants.VALUE_AUTO]:
479 max_v = ispecs[constants.ISPECS_MAX].get(name, value)
480 min_v = ispecs[constants.ISPECS_MIN].get(name, value)
481 if value > max_v or min_v > value:
483 fqn = "%s/%s" % (name, qualifier)
486 return ("%s value %s is not in range [%s, %s]" %
487 (fqn, value, min_v, max_v))
491 def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
492 nic_count, disk_sizes, spindle_use,
494 _compute_fn=_ComputeMinMaxSpec):
495 """Verifies ipolicy against provided specs.
498 @param ipolicy: The ipolicy
500 @param mem_size: The memory size
502 @param cpu_count: Used cpu cores
503 @type disk_count: int
504 @param disk_count: Number of disks used
506 @param nic_count: Number of nics used
507 @type disk_sizes: list of ints
508 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
509 @type spindle_use: int
510 @param spindle_use: The number of spindles this instance uses
511 @type disk_template: string
512 @param disk_template: The disk template of the instance
513 @param _compute_fn: The compute function (unittest only)
514 @return: A list of violations, or an empty list of no violations are found
517 assert disk_count == len(disk_sizes)
520 (constants.ISPEC_MEM_SIZE, "", mem_size),
521 (constants.ISPEC_CPU_COUNT, "", cpu_count),
522 (constants.ISPEC_NIC_COUNT, "", nic_count),
523 (constants.ISPEC_SPINDLE_USE, "", spindle_use),
524 ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
525 for idx, d in enumerate(disk_sizes)]
526 if disk_template != constants.DT_DISKLESS:
527 # This check doesn't make sense for diskless instances
528 test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
530 allowed_dts = ipolicy[constants.IPOLICY_DTS]
531 if disk_template not in allowed_dts:
532 ret.append("Disk template %s is not allowed (allowed templates: %s)" %
533 (disk_template, utils.CommaJoin(allowed_dts)))
536 for minmax in ipolicy[constants.ISPECS_MINMAX]:
538 (_compute_fn(name, qualifier, minmax, value)
539 for (name, qualifier, value) in test_settings))
540 if min_errs is None or len(errs) < len(min_errs):
542 assert min_errs is not None
543 return ret + min_errs
546 def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
547 _compute_fn=ComputeIPolicySpecViolation):
548 """Compute if instance meets the specs of ipolicy.
551 @param ipolicy: The ipolicy to verify against
552 @type instance: L{objects.Instance}
553 @param instance: The instance to verify
554 @type cfg: L{config.ConfigWriter}
555 @param cfg: Cluster configuration
556 @param _compute_fn: The function to verify ipolicy (unittest only)
557 @see: L{ComputeIPolicySpecViolation}
561 be_full = cfg.GetClusterInfo().FillBE(instance)
562 mem_size = be_full[constants.BE_MAXMEM]
563 cpu_count = be_full[constants.BE_VCPUS]
564 es_flags = rpc.GetExclusiveStorageForNodes(cfg, instance.all_nodes)
565 if any(es_flags.values()):
566 # With exclusive storage use the actual spindles
568 spindle_use = sum([disk.spindles for disk in instance.disks])
570 ret.append("Number of spindles not configured for disks of instance %s"
571 " while exclusive storage is enabled, try running gnt-cluster"
572 " repair-disk-sizes" % instance.name)
573 # _ComputeMinMaxSpec ignores 'None's
576 spindle_use = be_full[constants.BE_SPINDLE_USE]
577 disk_count = len(instance.disks)
578 disk_sizes = [disk.size for disk in instance.disks]
579 nic_count = len(instance.nics)
580 disk_template = instance.disk_template
582 return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
583 disk_sizes, spindle_use, disk_template)
586 def _ComputeViolatingInstances(ipolicy, instances, cfg):
587 """Computes a set of instances who violates given ipolicy.
589 @param ipolicy: The ipolicy to verify
590 @type instances: L{objects.Instance}
591 @param instances: List of instances to verify
592 @type cfg: L{config.ConfigWriter}
593 @param cfg: Cluster configuration
594 @return: A frozenset of instance names violating the ipolicy
597 return frozenset([inst.name for inst in instances
598 if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
601 def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
602 """Computes a set of any instances that would violate the new ipolicy.
604 @param old_ipolicy: The current (still in-place) ipolicy
605 @param new_ipolicy: The new (to become) ipolicy
606 @param instances: List of instances to verify
607 @type cfg: L{config.ConfigWriter}
608 @param cfg: Cluster configuration
609 @return: A list of instances which violates the new ipolicy but
613 return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
614 _ComputeViolatingInstances(old_ipolicy, instances, cfg))
617 def GetUpdatedParams(old_params, update_dict,
618 use_default=True, use_none=False):
619 """Return the new version of a parameter dictionary.
621 @type old_params: dict
622 @param old_params: old parameters
623 @type update_dict: dict
624 @param update_dict: dict containing new parameter values, or
625 constants.VALUE_DEFAULT to reset the parameter to its default
627 @param use_default: boolean
628 @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
629 values as 'to be deleted' values
630 @param use_none: boolean
631 @type use_none: whether to recognise C{None} values as 'to be
634 @return: the new parameter dictionary
637 params_copy = copy.deepcopy(old_params)
638 for key, val in update_dict.iteritems():
639 if ((use_default and val == constants.VALUE_DEFAULT) or
640 (use_none and val is None)):
646 params_copy[key] = val
650 def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
651 """Return the new version of an instance policy.
653 @param group_policy: whether this policy applies to a group and thus
654 we should support removal of policy entries
657 ipolicy = copy.deepcopy(old_ipolicy)
658 for key, value in new_ipolicy.items():
659 if key not in constants.IPOLICY_ALL_KEYS:
660 raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
662 if (not value or value == [constants.VALUE_DEFAULT] or
663 value == constants.VALUE_DEFAULT):
668 raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
669 " on the cluster'" % key,
672 if key in constants.IPOLICY_PARAMETERS:
673 # FIXME: we assume all such values are float
675 ipolicy[key] = float(value)
676 except (TypeError, ValueError), err:
677 raise errors.OpPrereqError("Invalid value for attribute"
678 " '%s': '%s', error: %s" %
679 (key, value, err), errors.ECODE_INVAL)
680 elif key == constants.ISPECS_MINMAX:
682 for k in minmax.keys():
683 utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
685 elif key == constants.ISPECS_STD:
687 msg = "%s cannot appear in group instance specs" % key
688 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
689 ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
690 use_none=False, use_default=False)
691 utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
693 # FIXME: we assume all others are lists; this should be redone
695 ipolicy[key] = list(value)
697 objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
698 except errors.ConfigurationError, err:
699 raise errors.OpPrereqError("Invalid instance policy: %s" % err,
704 def AnnotateDiskParams(instance, devs, cfg):
705 """Little helper wrapper to the rpc annotation method.
707 @param instance: The instance object
708 @type devs: List of L{objects.Disk}
709 @param devs: The root devices (not any of its children!)
710 @param cfg: The config object
711 @returns The annotated disk copies
712 @see L{rpc.AnnotateDiskParams}
715 return rpc.AnnotateDiskParams(instance.disk_template, devs,
716 cfg.GetInstanceDiskParams(instance))
719 def SupportsOob(cfg, node):
720 """Tells if node supports OOB.
722 @type cfg: L{config.ConfigWriter}
723 @param cfg: The cluster configuration
724 @type node: L{objects.Node}
725 @param node: The node
726 @return: The OOB script if supported or an empty string otherwise
729 return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
732 def _UpdateAndVerifySubDict(base, updates, type_check):
733 """Updates and verifies a dict with sub dicts of the same type.
735 @param base: The dict with the old data
736 @param updates: The dict with the new data
737 @param type_check: Dict suitable to ForceDictType to verify correct types
738 @returns: A new dict with updated and verified values
742 new = GetUpdatedParams(old, value)
743 utils.ForceDictType(new, type_check)
746 ret = copy.deepcopy(base)
747 ret.update(dict((key, fn(base.get(key, {}), value))
748 for key, value in updates.items()))
752 def _FilterVmNodes(lu, node_uuids):
753 """Filters out non-vm_capable nodes from a list.
755 @type lu: L{LogicalUnit}
756 @param lu: the logical unit for which we check
757 @type node_uuids: list
758 @param node_uuids: the list of nodes on which we should check
760 @return: the list of vm-capable nodes
763 vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
764 return [uuid for uuid in node_uuids if uuid not in vm_nodes]
767 def GetDefaultIAllocator(cfg, ialloc):
768 """Decides on which iallocator to use.
770 @type cfg: L{config.ConfigWriter}
771 @param cfg: Cluster configuration object
772 @type ialloc: string or None
773 @param ialloc: Iallocator specified in opcode
775 @return: Iallocator name
779 # Use default iallocator
780 ialloc = cfg.GetDefaultIAllocator()
783 raise errors.OpPrereqError("No iallocator was specified, neither in the"
784 " opcode nor as a cluster-wide default",
790 def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_node_uuids,
792 """Checks if node groups for locked instances are still correct.
794 @type cfg: L{config.ConfigWriter}
795 @param cfg: Cluster configuration
796 @type instances: dict; string as key, L{objects.Instance} as value
797 @param instances: Dictionary, instance name as key, instance object as value
798 @type owned_groups: iterable of string
799 @param owned_groups: List of owned groups
800 @type owned_node_uuids: iterable of string
801 @param owned_node_uuids: List of owned nodes
802 @type cur_group_uuid: string or None
803 @param cur_group_uuid: Optional group UUID to check against instance's groups
806 for (name, inst) in instances.items():
807 assert owned_node_uuids.issuperset(inst.all_nodes), \
808 "Instance %s's nodes changed while we kept the lock" % name
810 inst_groups = CheckInstanceNodeGroups(cfg, name, owned_groups)
812 assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
813 "Instance %s has no node in group %s" % (name, cur_group_uuid)
816 def CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
818 """Checks if the owned node groups are still correct for an instance.
820 @type cfg: L{config.ConfigWriter}
821 @param cfg: The cluster configuration
822 @type instance_name: string
823 @param instance_name: Instance name
824 @type owned_groups: set or frozenset
825 @param owned_groups: List of currently owned node groups
826 @type primary_only: boolean
827 @param primary_only: Whether to check node groups for only the primary node
830 inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
832 if not owned_groups.issuperset(inst_groups):
833 raise errors.OpPrereqError("Instance %s's node groups changed since"
834 " locks were acquired, current groups are"
835 " are '%s', owning groups '%s'; retry the"
838 utils.CommaJoin(inst_groups),
839 utils.CommaJoin(owned_groups)),
845 def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
846 """Unpacks the result of change-group and node-evacuate iallocator requests.
848 Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
849 L{constants.IALLOCATOR_MODE_CHG_GROUP}.
851 @type lu: L{LogicalUnit}
852 @param lu: Logical unit instance
853 @type alloc_result: tuple/list
854 @param alloc_result: Result from iallocator
855 @type early_release: bool
856 @param early_release: Whether to release locks early if possible
857 @type use_nodes: bool
858 @param use_nodes: Whether to display node names instead of groups
861 (moved, failed, jobs) = alloc_result
864 failreason = utils.CommaJoin("%s (%s)" % (name, reason)
865 for (name, reason) in failed)
866 lu.LogWarning("Unable to evacuate instances %s", failreason)
867 raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
870 lu.LogInfo("Instances to be moved: %s",
873 (name, _NodeEvacDest(use_nodes, group, node_names))
874 for (name, group, node_names) in moved))
876 return [map(compat.partial(_SetOpEarlyRelease, early_release),
877 map(opcodes.OpCode.LoadOpCode, ops))
881 def _NodeEvacDest(use_nodes, group, node_names):
882 """Returns group or nodes depending on caller's choice.
886 return utils.CommaJoin(node_names)
891 def _SetOpEarlyRelease(early_release, op):
892 """Sets C{early_release} flag on opcodes if available.
896 op.early_release = early_release
897 except AttributeError:
898 assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
903 def MapInstanceDisksToNodes(instances):
904 """Creates a map from (node, volume) to instance name.
906 @type instances: list of L{objects.Instance}
907 @rtype: dict; tuple of (node uuid, volume name) as key, instance name as value
910 return dict(((node_uuid, vol), inst.name)
911 for inst in instances
912 for (node_uuid, vols) in inst.MapLVsByNode().items()
916 def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
917 """Make sure that none of the given paramters is global.
919 If a global parameter is found, an L{errors.OpPrereqError} exception is
920 raised. This is used to avoid setting global parameters for individual nodes.
922 @type params: dictionary
923 @param params: Parameters to check
924 @type glob_pars: dictionary
925 @param glob_pars: Forbidden parameters
927 @param kind: Kind of parameters (e.g. "node")
928 @type bad_levels: string
929 @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
931 @type good_levels: strings
932 @param good_levels: Level(s) at which the parameters are allowed (e.g.
936 used_globals = glob_pars.intersection(params)
938 msg = ("The following %s parameters are global and cannot"
939 " be customized at %s level, please modify them at"
941 (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
942 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
945 def IsExclusiveStorageEnabledNode(cfg, node):
946 """Whether exclusive_storage is in effect for the given node.
948 @type cfg: L{config.ConfigWriter}
949 @param cfg: The cluster configuration
950 @type node: L{objects.Node}
951 @param node: The node
953 @return: The effective value of exclusive_storage
956 return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
959 def CheckInstanceState(lu, instance, req_states, msg=None):
960 """Ensure that an instance is in one of the required states.
962 @param lu: the LU on behalf of which we make the check
963 @param instance: the instance to check
964 @param msg: if passed, should be a message to replace the default one
965 @raise errors.OpPrereqError: if the instance is not in the required state
969 msg = ("can't use instance from outside %s states" %
970 utils.CommaJoin(req_states))
971 if instance.admin_state not in req_states:
972 raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
973 (instance.name, instance.admin_state, msg),
976 if constants.ADMINST_UP not in req_states:
977 pnode_uuid = instance.primary_node
978 if not lu.cfg.GetNodeInfo(pnode_uuid).offline:
979 all_hvparams = lu.cfg.GetClusterInfo().hvparams
980 ins_l = lu.rpc.call_instance_list(
981 [pnode_uuid], [instance.hypervisor], all_hvparams)[pnode_uuid]
982 ins_l.Raise("Can't contact node %s for instance information" %
983 lu.cfg.GetNodeName(pnode_uuid),
984 prereq=True, ecode=errors.ECODE_ENVIRON)
985 if instance.name in ins_l.payload:
986 raise errors.OpPrereqError("Instance %s is running, %s" %
987 (instance.name, msg), errors.ECODE_STATE)
989 lu.LogWarning("Primary node offline, ignoring check that instance"
993 def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
994 """Check the sanity of iallocator and node arguments and use the
995 cluster-wide iallocator if appropriate.
997 Check that at most one of (iallocator, node) is specified. If none is
998 specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
999 then the LU's opcode's iallocator slot is filled with the cluster-wide
1002 @type iallocator_slot: string
1003 @param iallocator_slot: the name of the opcode iallocator slot
1004 @type node_slot: string
1005 @param node_slot: the name of the opcode target node slot
1008 node = getattr(lu.op, node_slot, None)
1009 ialloc = getattr(lu.op, iallocator_slot, None)
1013 if node is not None and ialloc is not None:
1014 raise errors.OpPrereqError("Do not specify both, iallocator and node",
1016 elif ((node is None and ialloc is None) or
1017 ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1018 default_iallocator = lu.cfg.GetDefaultIAllocator()
1019 if default_iallocator:
1020 setattr(lu.op, iallocator_slot, default_iallocator)
1022 raise errors.OpPrereqError("No iallocator or node given and no"
1023 " cluster-wide default iallocator found;"
1024 " please specify either an iallocator or a"
1025 " node, or set a cluster-wide default"
1026 " iallocator", errors.ECODE_INVAL)
1029 def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_uuid, prereq):
1032 for dev in instance.disks:
1033 cfg.SetDiskID(dev, node_uuid)
1035 result = rpc_runner.call_blockdev_getmirrorstatus(
1036 node_uuid, (instance.disks, instance))
1037 result.Raise("Failed to get disk status from node %s" %
1038 cfg.GetNodeName(node_uuid),
1039 prereq=prereq, ecode=errors.ECODE_ENVIRON)
1041 for idx, bdev_status in enumerate(result.payload):
1042 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1048 def CheckNodeOnline(lu, node_uuid, msg=None):
1049 """Ensure that a given node is online.
1051 @param lu: the LU on behalf of which we make the check
1052 @param node_uuid: the node to check
1053 @param msg: if passed, should be a message to replace the default one
1054 @raise errors.OpPrereqError: if the node is offline
1058 msg = "Can't use offline node"
1059 if lu.cfg.GetNodeInfo(node_uuid).offline:
1060 raise errors.OpPrereqError("%s: %s" % (msg, lu.cfg.GetNodeName(node_uuid)),