4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Common functions used by multiple logical units."""
27 from ganeti import compat
28 from ganeti import constants
29 from ganeti import errors
30 from ganeti import hypervisor
31 from ganeti import locking
32 from ganeti import objects
33 from ganeti import opcodes
34 from ganeti import pathutils
35 from ganeti import rpc
36 from ganeti import ssconf
37 from ganeti import utils
41 INSTANCE_DOWN = [constants.ADMINST_DOWN]
42 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
43 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
45 #: Instance status in which an instance can be marked as offline/online
46 CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
47 constants.ADMINST_OFFLINE,
51 def _ExpandItemName(expand_fn, name, kind):
52 """Expand an item name.
54 @param expand_fn: the function to use for expansion
55 @param name: requested item name
56 @param kind: text description ('Node' or 'Instance')
57 @return: the result of the expand_fn, if successful
58 @raise errors.OpPrereqError: if the item is not found
61 full_name = expand_fn(name)
63 raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
68 def ExpandInstanceName(cfg, name):
69 """Wrapper over L{_ExpandItemName} for instance."""
70 return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
73 def ExpandNodeUuidAndName(cfg, expected_uuid, name):
74 """Expand a short node name into the node UUID and full name.
76 @type cfg: L{config.ConfigWriter}
77 @param cfg: The cluster configuration
78 @type expected_uuid: string
79 @param expected_uuid: expected UUID for the node (or None if there is no
80 expectation). If it does not match, a L{errors.OpPrereqError} is
83 @param name: the short node name
86 (uuid, full_name) = _ExpandItemName(cfg.ExpandNodeName, name, "Node")
87 if expected_uuid is not None and uuid != expected_uuid:
88 raise errors.OpPrereqError(
89 "The nodes UUID '%s' does not match the expected UUID '%s' for node"
90 " '%s'. Maybe the node changed since you submitted this job." %
91 (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
92 return (uuid, full_name)
96 """Returns a dict declaring all lock levels shared.
99 return dict.fromkeys(locking.LEVELS, 1)
102 def CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
103 """Checks if the instances in a node group are still correct.
105 @type cfg: L{config.ConfigWriter}
106 @param cfg: The cluster configuration
107 @type group_uuid: string
108 @param group_uuid: Node group UUID
109 @type owned_instances: set or frozenset
110 @param owned_instances: List of currently owned instances
113 wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
114 if owned_instances != wanted_instances:
115 raise errors.OpPrereqError("Instances in node group '%s' changed since"
116 " locks were acquired, wanted '%s', have '%s';"
117 " retry the operation" %
119 utils.CommaJoin(wanted_instances),
120 utils.CommaJoin(owned_instances)),
123 return wanted_instances
126 def GetWantedNodes(lu, short_node_names):
127 """Returns list of checked and expanded node names.
129 @type lu: L{LogicalUnit}
130 @param lu: the logical unit on whose behalf we execute
131 @type short_node_names: list
132 @param short_node_names: list of node names or None for all nodes
133 @rtype: tuple of lists
134 @return: tupe with (list of node UUIDs, list of node names)
135 @raise errors.ProgrammerError: if the nodes parameter is wrong type
139 node_uuids = [ExpandNodeUuidAndName(lu.cfg, None, name)[0]
140 for name in short_node_names]
142 node_uuids = lu.cfg.GetNodeList()
144 return (node_uuids, [lu.cfg.GetNodeInfo(uuid).name for uuid in node_uuids])
147 def GetWantedInstances(lu, instances):
148 """Returns list of checked and expanded instance names.
150 @type lu: L{LogicalUnit}
151 @param lu: the logical unit on whose behalf we execute
152 @type instances: list
153 @param instances: list of instance names or None for all instances
155 @return: the list of instances, sorted
156 @raise errors.OpPrereqError: if the instances parameter is wrong type
157 @raise errors.OpPrereqError: if any of the passed instances is not found
161 wanted = [ExpandInstanceName(lu.cfg, name) for name in instances]
163 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
167 def RunPostHook(lu, node_name):
168 """Runs the post-hook for an opcode on a single node.
171 hm = lu.proc.BuildHooksManager(lu)
173 node_names = [node_name]
174 hm.RunPhase(constants.HOOKS_PHASE_POST, node_names=node_names)
175 except Exception, err: # pylint: disable=W0703
176 lu.LogWarning("Errors occurred running hooks on %s: %s",
180 def RedistributeAncillaryFiles(lu):
181 """Distribute additional files which are part of the cluster configuration.
183 ConfigWriter takes care of distributing the config and ssconf files, but
184 there are more files which should be distributed to all nodes. This function
185 makes sure those are copied.
188 # Gather target nodes
189 cluster = lu.cfg.GetClusterInfo()
190 master_info = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
192 online_node_uuids = lu.cfg.GetOnlineNodeList()
193 online_node_uuid_set = frozenset(online_node_uuids)
194 vm_node_uuids = list(online_node_uuid_set.intersection(
195 lu.cfg.GetVmCapableNodeList()))
197 # Never distribute to master node
198 for node_uuids in [online_node_uuids, vm_node_uuids]:
199 if master_info.uuid in node_uuids:
200 node_uuids.remove(master_info.uuid)
203 (files_all, _, files_mc, files_vm) = \
204 ComputeAncillaryFiles(cluster, True)
206 # Never re-distribute configuration file from here
207 assert not (pathutils.CLUSTER_CONF_FILE in files_all or
208 pathutils.CLUSTER_CONF_FILE in files_vm)
209 assert not files_mc, "Master candidates not handled in this function"
212 (online_node_uuids, files_all),
213 (vm_node_uuids, files_vm),
217 for (node_uuids, files) in filemap:
219 UploadHelper(lu, node_uuids, fname)
222 def ComputeAncillaryFiles(cluster, redist):
223 """Compute files external to Ganeti which need to be consistent.
225 @type redist: boolean
226 @param redist: Whether to include files which need to be redistributed
229 # Compute files for all nodes
231 pathutils.SSH_KNOWN_HOSTS_FILE,
232 pathutils.CONFD_HMAC_KEY,
233 pathutils.CLUSTER_DOMAIN_SECRET_FILE,
234 pathutils.SPICE_CERT_FILE,
235 pathutils.SPICE_CACERT_FILE,
236 pathutils.RAPI_USERS_FILE,
240 # we need to ship at least the RAPI certificate
241 files_all.add(pathutils.RAPI_CERT_FILE)
243 files_all.update(pathutils.ALL_CERT_FILES)
244 files_all.update(ssconf.SimpleStore().GetFileList())
246 if cluster.modify_etc_hosts:
247 files_all.add(pathutils.ETC_HOSTS)
249 if cluster.use_external_mip_script:
250 files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
252 # Files which are optional, these must:
253 # - be present in one other category as well
254 # - either exist or not exist on all nodes of that category (mc, vm all)
256 pathutils.RAPI_USERS_FILE,
259 # Files which should only be on master candidates
263 files_mc.add(pathutils.CLUSTER_CONF_FILE)
266 if (not redist and (constants.ENABLE_FILE_STORAGE or
267 constants.ENABLE_SHARED_FILE_STORAGE)):
268 files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
269 files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
271 # Files which should only be on VM-capable nodes
274 for hv_name in cluster.enabled_hypervisors
276 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
280 for hv_name in cluster.enabled_hypervisors
282 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
284 # Filenames in each category must be unique
285 all_files_set = files_all | files_mc | files_vm
286 assert (len(all_files_set) ==
287 sum(map(len, [files_all, files_mc, files_vm]))), \
288 "Found file listed in more than one file list"
290 # Optional files must be present in one other category
291 assert all_files_set.issuperset(files_opt), \
292 "Optional file not in a different required list"
294 # This one file should never ever be re-distributed via RPC
295 assert not (redist and
296 pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
298 return (files_all, files_opt, files_mc, files_vm)
301 def UploadHelper(lu, node_uuids, fname):
302 """Helper for uploading a file and showing warnings.
305 if os.path.exists(fname):
306 result = lu.rpc.call_upload_file(node_uuids, fname)
307 for to_node_uuids, to_result in result.items():
308 msg = to_result.fail_msg
310 msg = ("Copy of file %s to node %s failed: %s" %
311 (fname, lu.cfg.GetNodeName(to_node_uuids), msg))
315 def MergeAndVerifyHvState(op_input, obj_input):
316 """Combines the hv state from an opcode with the one of the object
318 @param op_input: The input dict from the opcode
319 @param obj_input: The input dict from the objects
320 @return: The verified and updated dict
324 invalid_hvs = set(op_input) - constants.HYPER_TYPES
326 raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
327 " %s" % utils.CommaJoin(invalid_hvs),
329 if obj_input is None:
331 type_check = constants.HVSTS_PARAMETER_TYPES
332 return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
337 def MergeAndVerifyDiskState(op_input, obj_input):
338 """Combines the disk state from an opcode with the one of the object
340 @param op_input: The input dict from the opcode
341 @param obj_input: The input dict from the objects
342 @return: The verified and updated dict
345 invalid_dst = set(op_input) - constants.DS_VALID_TYPES
347 raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
348 utils.CommaJoin(invalid_dst),
350 type_check = constants.DSS_PARAMETER_TYPES
351 if obj_input is None:
353 return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
355 for key, value in op_input.items())
360 def CheckOSParams(lu, required, node_uuids, osname, osparams):
361 """OS parameters validation.
363 @type lu: L{LogicalUnit}
364 @param lu: the logical unit for which we check
365 @type required: boolean
366 @param required: whether the validation should fail if the OS is not
368 @type node_uuids: list
369 @param node_uuids: the list of nodes on which we should check
371 @param osname: the name of the hypervisor we should use
373 @param osparams: the parameters which we need to check
374 @raise errors.OpPrereqError: if the parameters are not valid
377 node_uuids = _FilterVmNodes(lu, node_uuids)
378 result = lu.rpc.call_os_validate(node_uuids, required, osname,
379 [constants.OS_VALIDATE_PARAMETERS],
381 for node_uuid, nres in result.items():
382 # we don't check for offline cases since this should be run only
383 # against the master node and/or an instance's nodes
384 nres.Raise("OS Parameters validation failed on node %s" %
385 lu.cfg.GetNodeName(node_uuid))
387 lu.LogInfo("OS %s not found on node %s, validation skipped",
388 osname, lu.cfg.GetNodeName(node_uuid))
391 def CheckHVParams(lu, node_uuids, hvname, hvparams):
392 """Hypervisor parameter validation.
394 This function abstract the hypervisor parameter validation to be
395 used in both instance create and instance modify.
397 @type lu: L{LogicalUnit}
398 @param lu: the logical unit for which we check
399 @type node_uuids: list
400 @param node_uuids: the list of nodes on which we should check
402 @param hvname: the name of the hypervisor we should use
404 @param hvparams: the parameters which we need to check
405 @raise errors.OpPrereqError: if the parameters are not valid
408 node_uuids = _FilterVmNodes(lu, node_uuids)
410 cluster = lu.cfg.GetClusterInfo()
411 hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
413 hvinfo = lu.rpc.call_hypervisor_validate_params(node_uuids, hvname, hvfull)
414 for node_uuid in node_uuids:
415 info = hvinfo[node_uuid]
418 info.Raise("Hypervisor parameter validation failed on node %s" %
419 lu.cfg.GetNodeName(node_uuid))
422 def AdjustCandidatePool(lu, exceptions):
423 """Adjust the candidate pool after node operations.
426 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
428 lu.LogInfo("Promoted nodes to master candidate role: %s",
429 utils.CommaJoin(node.name for node in mod_list))
430 for node in mod_list:
431 lu.context.ReaddNode(node)
432 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
434 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
438 def CheckNodePVs(nresult, exclusive_storage):
442 pvlist_dict = nresult.get(constants.NV_PVLIST, None)
443 if pvlist_dict is None:
444 return (["Can't get PV list from node"], None)
445 pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
447 # check that ':' is not present in PV names, since it's a
448 # special character for lvcreate (denotes the range of PEs to
452 errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
453 (pv.name, pv.vg_name))
455 if exclusive_storage:
456 (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
457 errlist.extend(errmsgs)
458 shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
460 for (pvname, lvlist) in shared_pvs:
461 # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
462 errlist.append("PV %s is shared among unrelated LVs (%s)" %
463 (pvname, utils.CommaJoin(lvlist)))
464 return (errlist, es_pvinfo)
467 def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
468 """Computes if value is in the desired range.
470 @param name: name of the parameter for which we perform the check
471 @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
473 @param ispecs: dictionary containing min and max values
474 @param value: actual value that we want to use
475 @return: None or an error string
478 if value in [None, constants.VALUE_AUTO]:
480 max_v = ispecs[constants.ISPECS_MAX].get(name, value)
481 min_v = ispecs[constants.ISPECS_MIN].get(name, value)
482 if value > max_v or min_v > value:
484 fqn = "%s/%s" % (name, qualifier)
487 return ("%s value %s is not in range [%s, %s]" %
488 (fqn, value, min_v, max_v))
492 def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
493 nic_count, disk_sizes, spindle_use,
495 _compute_fn=_ComputeMinMaxSpec):
496 """Verifies ipolicy against provided specs.
499 @param ipolicy: The ipolicy
501 @param mem_size: The memory size
503 @param cpu_count: Used cpu cores
504 @type disk_count: int
505 @param disk_count: Number of disks used
507 @param nic_count: Number of nics used
508 @type disk_sizes: list of ints
509 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
510 @type spindle_use: int
511 @param spindle_use: The number of spindles this instance uses
512 @type disk_template: string
513 @param disk_template: The disk template of the instance
514 @param _compute_fn: The compute function (unittest only)
515 @return: A list of violations, or an empty list of no violations are found
518 assert disk_count == len(disk_sizes)
521 (constants.ISPEC_MEM_SIZE, "", mem_size),
522 (constants.ISPEC_CPU_COUNT, "", cpu_count),
523 (constants.ISPEC_NIC_COUNT, "", nic_count),
524 (constants.ISPEC_SPINDLE_USE, "", spindle_use),
525 ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
526 for idx, d in enumerate(disk_sizes)]
527 if disk_template != constants.DT_DISKLESS:
528 # This check doesn't make sense for diskless instances
529 test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
531 allowed_dts = ipolicy[constants.IPOLICY_DTS]
532 if disk_template not in allowed_dts:
533 ret.append("Disk template %s is not allowed (allowed templates: %s)" %
534 (disk_template, utils.CommaJoin(allowed_dts)))
537 for minmax in ipolicy[constants.ISPECS_MINMAX]:
539 (_compute_fn(name, qualifier, minmax, value)
540 for (name, qualifier, value) in test_settings))
541 if min_errs is None or len(errs) < len(min_errs):
543 assert min_errs is not None
544 return ret + min_errs
547 def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
548 _compute_fn=ComputeIPolicySpecViolation):
549 """Compute if instance meets the specs of ipolicy.
552 @param ipolicy: The ipolicy to verify against
553 @type instance: L{objects.Instance}
554 @param instance: The instance to verify
555 @type cfg: L{config.ConfigWriter}
556 @param cfg: Cluster configuration
557 @param _compute_fn: The function to verify ipolicy (unittest only)
558 @see: L{ComputeIPolicySpecViolation}
562 be_full = cfg.GetClusterInfo().FillBE(instance)
563 mem_size = be_full[constants.BE_MAXMEM]
564 cpu_count = be_full[constants.BE_VCPUS]
565 es_flags = rpc.GetExclusiveStorageForNodes(cfg, instance.all_nodes)
566 if any(es_flags.values()):
567 # With exclusive storage use the actual spindles
569 spindle_use = sum([disk.spindles for disk in instance.disks])
571 ret.append("Number of spindles not configured for disks of instance %s"
572 " while exclusive storage is enabled, try running gnt-cluster"
573 " repair-disk-sizes" % instance.name)
574 # _ComputeMinMaxSpec ignores 'None's
577 spindle_use = be_full[constants.BE_SPINDLE_USE]
578 disk_count = len(instance.disks)
579 disk_sizes = [disk.size for disk in instance.disks]
580 nic_count = len(instance.nics)
581 disk_template = instance.disk_template
583 return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
584 disk_sizes, spindle_use, disk_template)
587 def _ComputeViolatingInstances(ipolicy, instances, cfg):
588 """Computes a set of instances who violates given ipolicy.
590 @param ipolicy: The ipolicy to verify
591 @type instances: L{objects.Instance}
592 @param instances: List of instances to verify
593 @type cfg: L{config.ConfigWriter}
594 @param cfg: Cluster configuration
595 @return: A frozenset of instance names violating the ipolicy
598 return frozenset([inst.name for inst in instances
599 if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
602 def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
603 """Computes a set of any instances that would violate the new ipolicy.
605 @param old_ipolicy: The current (still in-place) ipolicy
606 @param new_ipolicy: The new (to become) ipolicy
607 @param instances: List of instances to verify
608 @type cfg: L{config.ConfigWriter}
609 @param cfg: Cluster configuration
610 @return: A list of instances which violates the new ipolicy but
614 return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
615 _ComputeViolatingInstances(old_ipolicy, instances, cfg))
618 def GetUpdatedParams(old_params, update_dict,
619 use_default=True, use_none=False):
620 """Return the new version of a parameter dictionary.
622 @type old_params: dict
623 @param old_params: old parameters
624 @type update_dict: dict
625 @param update_dict: dict containing new parameter values, or
626 constants.VALUE_DEFAULT to reset the parameter to its default
628 @param use_default: boolean
629 @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
630 values as 'to be deleted' values
631 @param use_none: boolean
632 @type use_none: whether to recognise C{None} values as 'to be
635 @return: the new parameter dictionary
638 params_copy = copy.deepcopy(old_params)
639 for key, val in update_dict.iteritems():
640 if ((use_default and val == constants.VALUE_DEFAULT) or
641 (use_none and val is None)):
647 params_copy[key] = val
651 def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
652 """Return the new version of an instance policy.
654 @param group_policy: whether this policy applies to a group and thus
655 we should support removal of policy entries
658 ipolicy = copy.deepcopy(old_ipolicy)
659 for key, value in new_ipolicy.items():
660 if key not in constants.IPOLICY_ALL_KEYS:
661 raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
663 if (not value or value == [constants.VALUE_DEFAULT] or
664 value == constants.VALUE_DEFAULT):
669 raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
670 " on the cluster'" % key,
673 if key in constants.IPOLICY_PARAMETERS:
674 # FIXME: we assume all such values are float
676 ipolicy[key] = float(value)
677 except (TypeError, ValueError), err:
678 raise errors.OpPrereqError("Invalid value for attribute"
679 " '%s': '%s', error: %s" %
680 (key, value, err), errors.ECODE_INVAL)
681 elif key == constants.ISPECS_MINMAX:
683 for k in minmax.keys():
684 utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
686 elif key == constants.ISPECS_STD:
688 msg = "%s cannot appear in group instance specs" % key
689 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
690 ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
691 use_none=False, use_default=False)
692 utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
694 # FIXME: we assume all others are lists; this should be redone
696 ipolicy[key] = list(value)
698 objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
699 except errors.ConfigurationError, err:
700 raise errors.OpPrereqError("Invalid instance policy: %s" % err,
705 def AnnotateDiskParams(instance, devs, cfg):
706 """Little helper wrapper to the rpc annotation method.
708 @param instance: The instance object
709 @type devs: List of L{objects.Disk}
710 @param devs: The root devices (not any of its children!)
711 @param cfg: The config object
712 @returns The annotated disk copies
713 @see L{rpc.AnnotateDiskParams}
716 return rpc.AnnotateDiskParams(instance.disk_template, devs,
717 cfg.GetInstanceDiskParams(instance))
720 def SupportsOob(cfg, node):
721 """Tells if node supports OOB.
723 @type cfg: L{config.ConfigWriter}
724 @param cfg: The cluster configuration
725 @type node: L{objects.Node}
726 @param node: The node
727 @return: The OOB script if supported or an empty string otherwise
730 return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
733 def _UpdateAndVerifySubDict(base, updates, type_check):
734 """Updates and verifies a dict with sub dicts of the same type.
736 @param base: The dict with the old data
737 @param updates: The dict with the new data
738 @param type_check: Dict suitable to ForceDictType to verify correct types
739 @returns: A new dict with updated and verified values
743 new = GetUpdatedParams(old, value)
744 utils.ForceDictType(new, type_check)
747 ret = copy.deepcopy(base)
748 ret.update(dict((key, fn(base.get(key, {}), value))
749 for key, value in updates.items()))
753 def _FilterVmNodes(lu, node_uuids):
754 """Filters out non-vm_capable nodes from a list.
756 @type lu: L{LogicalUnit}
757 @param lu: the logical unit for which we check
758 @type node_uuids: list
759 @param node_uuids: the list of nodes on which we should check
761 @return: the list of vm-capable nodes
764 vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
765 return [uuid for uuid in node_uuids if uuid not in vm_nodes]
768 def GetDefaultIAllocator(cfg, ialloc):
769 """Decides on which iallocator to use.
771 @type cfg: L{config.ConfigWriter}
772 @param cfg: Cluster configuration object
773 @type ialloc: string or None
774 @param ialloc: Iallocator specified in opcode
776 @return: Iallocator name
780 # Use default iallocator
781 ialloc = cfg.GetDefaultIAllocator()
784 raise errors.OpPrereqError("No iallocator was specified, neither in the"
785 " opcode nor as a cluster-wide default",
791 def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_node_uuids,
793 """Checks if node groups for locked instances are still correct.
795 @type cfg: L{config.ConfigWriter}
796 @param cfg: Cluster configuration
797 @type instances: dict; string as key, L{objects.Instance} as value
798 @param instances: Dictionary, instance name as key, instance object as value
799 @type owned_groups: iterable of string
800 @param owned_groups: List of owned groups
801 @type owned_node_uuids: iterable of string
802 @param owned_node_uuids: List of owned nodes
803 @type cur_group_uuid: string or None
804 @param cur_group_uuid: Optional group UUID to check against instance's groups
807 for (name, inst) in instances.items():
808 assert owned_node_uuids.issuperset(inst.all_nodes), \
809 "Instance %s's nodes changed while we kept the lock" % name
811 inst_groups = CheckInstanceNodeGroups(cfg, name, owned_groups)
813 assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
814 "Instance %s has no node in group %s" % (name, cur_group_uuid)
817 def CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
819 """Checks if the owned node groups are still correct for an instance.
821 @type cfg: L{config.ConfigWriter}
822 @param cfg: The cluster configuration
823 @type instance_name: string
824 @param instance_name: Instance name
825 @type owned_groups: set or frozenset
826 @param owned_groups: List of currently owned node groups
827 @type primary_only: boolean
828 @param primary_only: Whether to check node groups for only the primary node
831 inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
833 if not owned_groups.issuperset(inst_groups):
834 raise errors.OpPrereqError("Instance %s's node groups changed since"
835 " locks were acquired, current groups are"
836 " are '%s', owning groups '%s'; retry the"
839 utils.CommaJoin(inst_groups),
840 utils.CommaJoin(owned_groups)),
846 def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
847 """Unpacks the result of change-group and node-evacuate iallocator requests.
849 Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
850 L{constants.IALLOCATOR_MODE_CHG_GROUP}.
852 @type lu: L{LogicalUnit}
853 @param lu: Logical unit instance
854 @type alloc_result: tuple/list
855 @param alloc_result: Result from iallocator
856 @type early_release: bool
857 @param early_release: Whether to release locks early if possible
858 @type use_nodes: bool
859 @param use_nodes: Whether to display node names instead of groups
862 (moved, failed, jobs) = alloc_result
865 failreason = utils.CommaJoin("%s (%s)" % (name, reason)
866 for (name, reason) in failed)
867 lu.LogWarning("Unable to evacuate instances %s", failreason)
868 raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
871 lu.LogInfo("Instances to be moved: %s",
874 (name, _NodeEvacDest(use_nodes, group, node_names))
875 for (name, group, node_names) in moved))
877 return [map(compat.partial(_SetOpEarlyRelease, early_release),
878 map(opcodes.OpCode.LoadOpCode, ops))
882 def _NodeEvacDest(use_nodes, group, node_names):
883 """Returns group or nodes depending on caller's choice.
887 return utils.CommaJoin(node_names)
892 def _SetOpEarlyRelease(early_release, op):
893 """Sets C{early_release} flag on opcodes if available.
897 op.early_release = early_release
898 except AttributeError:
899 assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
904 def MapInstanceDisksToNodes(instances):
905 """Creates a map from (node, volume) to instance name.
907 @type instances: list of L{objects.Instance}
908 @rtype: dict; tuple of (node uuid, volume name) as key, instance name as value
911 return dict(((node_uuid, vol), inst.name)
912 for inst in instances
913 for (node_uuid, vols) in inst.MapLVsByNode().items()
917 def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
918 """Make sure that none of the given paramters is global.
920 If a global parameter is found, an L{errors.OpPrereqError} exception is
921 raised. This is used to avoid setting global parameters for individual nodes.
923 @type params: dictionary
924 @param params: Parameters to check
925 @type glob_pars: dictionary
926 @param glob_pars: Forbidden parameters
928 @param kind: Kind of parameters (e.g. "node")
929 @type bad_levels: string
930 @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
932 @type good_levels: strings
933 @param good_levels: Level(s) at which the parameters are allowed (e.g.
937 used_globals = glob_pars.intersection(params)
939 msg = ("The following %s parameters are global and cannot"
940 " be customized at %s level, please modify them at"
942 (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
943 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
946 def IsExclusiveStorageEnabledNode(cfg, node):
947 """Whether exclusive_storage is in effect for the given node.
949 @type cfg: L{config.ConfigWriter}
950 @param cfg: The cluster configuration
951 @type node: L{objects.Node}
952 @param node: The node
954 @return: The effective value of exclusive_storage
957 return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
960 def CheckInstanceState(lu, instance, req_states, msg=None):
961 """Ensure that an instance is in one of the required states.
963 @param lu: the LU on behalf of which we make the check
964 @param instance: the instance to check
965 @param msg: if passed, should be a message to replace the default one
966 @raise errors.OpPrereqError: if the instance is not in the required state
970 msg = ("can't use instance from outside %s states" %
971 utils.CommaJoin(req_states))
972 if instance.admin_state not in req_states:
973 raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
974 (instance.name, instance.admin_state, msg),
977 if constants.ADMINST_UP not in req_states:
978 pnode_uuid = instance.primary_node
979 if not lu.cfg.GetNodeInfo(pnode_uuid).offline:
980 all_hvparams = lu.cfg.GetClusterInfo().hvparams
981 ins_l = lu.rpc.call_instance_list(
982 [pnode_uuid], [instance.hypervisor], all_hvparams)[pnode_uuid]
983 ins_l.Raise("Can't contact node %s for instance information" %
984 lu.cfg.GetNodeName(pnode_uuid),
985 prereq=True, ecode=errors.ECODE_ENVIRON)
986 if instance.name in ins_l.payload:
987 raise errors.OpPrereqError("Instance %s is running, %s" %
988 (instance.name, msg), errors.ECODE_STATE)
990 lu.LogWarning("Primary node offline, ignoring check that instance"
994 def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
995 """Check the sanity of iallocator and node arguments and use the
996 cluster-wide iallocator if appropriate.
998 Check that at most one of (iallocator, node) is specified. If none is
999 specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
1000 then the LU's opcode's iallocator slot is filled with the cluster-wide
1003 @type iallocator_slot: string
1004 @param iallocator_slot: the name of the opcode iallocator slot
1005 @type node_slot: string
1006 @param node_slot: the name of the opcode target node slot
1009 node = getattr(lu.op, node_slot, None)
1010 ialloc = getattr(lu.op, iallocator_slot, None)
1014 if node is not None and ialloc is not None:
1015 raise errors.OpPrereqError("Do not specify both, iallocator and node",
1017 elif ((node is None and ialloc is None) or
1018 ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1019 default_iallocator = lu.cfg.GetDefaultIAllocator()
1020 if default_iallocator:
1021 setattr(lu.op, iallocator_slot, default_iallocator)
1023 raise errors.OpPrereqError("No iallocator or node given and no"
1024 " cluster-wide default iallocator found;"
1025 " please specify either an iallocator or a"
1026 " node, or set a cluster-wide default"
1027 " iallocator", errors.ECODE_INVAL)
1030 def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_uuid, prereq):
1033 for dev in instance.disks:
1034 cfg.SetDiskID(dev, node_uuid)
1036 result = rpc_runner.call_blockdev_getmirrorstatus(
1037 node_uuid, (instance.disks, instance))
1038 result.Raise("Failed to get disk status from node %s" %
1039 cfg.GetNodeName(node_uuid),
1040 prereq=prereq, ecode=errors.ECODE_ENVIRON)
1042 for idx, bdev_status in enumerate(result.payload):
1043 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1049 def CheckNodeOnline(lu, node_uuid, msg=None):
1050 """Ensure that a given node is online.
1052 @param lu: the LU on behalf of which we make the check
1053 @param node_uuid: the node to check
1054 @param msg: if passed, should be a message to replace the default one
1055 @raise errors.OpPrereqError: if the node is offline
1059 msg = "Can't use offline node"
1060 if lu.cfg.GetNodeInfo(node_uuid).offline:
1061 raise errors.OpPrereqError("%s: %s" % (msg, lu.cfg.GetNodeName(node_uuid)),