X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/41e079ce36d9b91ce9c0ee18688f35f188801901..c4929a8bcca4a43dc6434394a91a8ea67d854844:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 53eb657..04d26f9 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -67,6 +67,13 @@ import ganeti.masterd.instance # pylint: disable=W0611 #: Size of DRBD meta block device DRBD_META_SIZE = 128 +# States of instance +INSTANCE_UP = [constants.ADMINST_UP] +INSTANCE_DOWN = [constants.ADMINST_DOWN] +INSTANCE_OFFLINE = [constants.ADMINST_OFFLINE] +INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP] +INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE] + class ResultWithJobs: """Data container for LU results with jobs. @@ -349,7 +356,8 @@ class LogicalUnit(object): self.op.instance_name) self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name - def _LockInstancesNodes(self, primary_only=False): + def _LockInstancesNodes(self, primary_only=False, + level=locking.LEVEL_NODE): """Helper function to declare instances' nodes for locking. This function should be called after locking one or more instances to lock @@ -370,9 +378,10 @@ class LogicalUnit(object): @type primary_only: boolean @param primary_only: only lock primary nodes of locked instances + @param level: Which lock level to use for locking nodes """ - assert locking.LEVEL_NODE in self.recalculate_locks, \ + assert level in self.recalculate_locks, \ "_LockInstancesNodes helper function called with no nodes to recalculate" # TODO: check if we're really been called with the instance locks held @@ -387,12 +396,14 @@ class LogicalUnit(object): if not primary_only: wanted_nodes.extend(instance.secondary_nodes) - if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE: - self.needed_locks[locking.LEVEL_NODE] = wanted_nodes - elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND: - self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes) + if self.recalculate_locks[level] == constants.LOCKS_REPLACE: + self.needed_locks[level] = wanted_nodes + elif self.recalculate_locks[level] == constants.LOCKS_APPEND: + self.needed_locks[level].extend(wanted_nodes) + else: + raise errors.ProgrammerError("Unknown recalculation mode") - del self.recalculate_locks[locking.LEVEL_NODE] + del self.recalculate_locks[level] class NoHooksLU(LogicalUnit): # pylint: disable=W0223 @@ -562,6 +573,20 @@ def _ShareAll(): return dict.fromkeys(locking.LEVELS, 1) +def _MakeLegacyNodeInfo(data): + """Formats the data returned by L{rpc.RpcRunner.call_node_info}. + + Converts the data into a single dictionary. This is fine for most use cases, + but some require information from more than one volume group or hypervisor. + + """ + (bootid, (vg_info, ), (hv_info, )) = data + + return utils.JoinDisjointDicts(utils.JoinDisjointDicts(vg_info, hv_info), { + "bootid": bootid, + }) + + def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups): """Checks if the owned node groups are still correct for an instance. @@ -696,6 +721,71 @@ def _GetUpdatedParams(old_params, update_dict, return params_copy +def _UpdateAndVerifySubDict(base, updates, type_check): + """Updates and verifies a dict with sub dicts of the same type. + + @param base: The dict with the old data + @param updates: The dict with the new data + @param type_check: Dict suitable to ForceDictType to verify correct types + @returns: A new dict with updated and verified values + + """ + def fn(old, value): + new = _GetUpdatedParams(old, value) + utils.ForceDictType(new, type_check) + return new + + ret = copy.deepcopy(base) + ret.update(dict((key, fn(base.get(key, {}), value)) + for key, value in updates.items())) + return ret + + +def _MergeAndVerifyHvState(op_input, obj_input): + """Combines the hv state from an opcode with the one of the object + + @param op_input: The input dict from the opcode + @param obj_input: The input dict from the objects + @return: The verified and updated dict + + """ + if op_input: + invalid_hvs = set(op_input) - constants.HYPER_TYPES + if invalid_hvs: + raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:" + " %s" % utils.CommaJoin(invalid_hvs), + errors.ECODE_INVAL) + if obj_input is None: + obj_input = {} + type_check = constants.HVSTS_PARAMETER_TYPES + return _UpdateAndVerifySubDict(obj_input, op_input, type_check) + + return None + + +def _MergeAndVerifyDiskState(op_input, obj_input): + """Combines the disk state from an opcode with the one of the object + + @param op_input: The input dict from the opcode + @param obj_input: The input dict from the objects + @return: The verified and updated dict + """ + if op_input: + invalid_dst = set(op_input) - constants.DS_VALID_TYPES + if invalid_dst: + raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" % + utils.CommaJoin(invalid_dst), + errors.ECODE_INVAL) + type_check = constants.DSS_PARAMETER_TYPES + if obj_input is None: + obj_input = {} + return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value, + type_check)) + for key, value in op_input.items()) + + return None + + def _ReleaseLocks(lu, level, names=None, keep=None): """Releases locks owned by an LU. @@ -717,12 +807,17 @@ def _ReleaseLocks(lu, level, names=None, keep=None): else: should_release = None - if should_release: + owned = lu.owned_locks(level) + if not owned: + # Not owning any lock at this level, do nothing + pass + + elif should_release: retain = [] release = [] # Determine which locks to release - for name in lu.owned_locks(level): + for name in owned: if should_release(name): release.append(name) else: @@ -894,20 +989,51 @@ def _GetClusterDomainSecret(): strict=True) -def _CheckInstanceDown(lu, instance, reason): - """Ensure that an instance is not running.""" - if instance.admin_up: - raise errors.OpPrereqError("Instance %s is marked to be up, %s" % - (instance.name, reason), errors.ECODE_STATE) +def _CheckInstanceState(lu, instance, req_states, msg=None): + """Ensure that an instance is in one of the required states. + + @param lu: the LU on behalf of which we make the check + @param instance: the instance to check + @param msg: if passed, should be a message to replace the default one + @raise errors.OpPrereqError: if the instance is not in the required state + + """ + if msg is None: + msg = "can't use instance from outside %s states" % ", ".join(req_states) + if instance.admin_state not in req_states: + raise errors.OpPrereqError("Instance %s is marked to be %s, %s" % + (instance, instance.admin_state, msg), + errors.ECODE_STATE) + + if constants.ADMINST_UP not in req_states: + pnode = instance.primary_node + ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] + ins_l.Raise("Can't contact node %s for instance information" % pnode, + prereq=True, ecode=errors.ECODE_ENVIRON) - pnode = instance.primary_node - ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode] - ins_l.Raise("Can't contact node %s for instance information" % pnode, - prereq=True, ecode=errors.ECODE_ENVIRON) + if instance.name in ins_l.payload: + raise errors.OpPrereqError("Instance %s is running, %s" % + (instance.name, msg), errors.ECODE_STATE) - if instance.name in ins_l.payload: - raise errors.OpPrereqError("Instance %s is running, %s" % - (instance.name, reason), errors.ECODE_STATE) + +def _CheckMinMaxSpecs(name, ipolicy, value): + """Checks if value is in the desired range. + + @param name: name of the parameter for which we perform the check + @param ipolicy: dictionary containing min, max and std values + @param value: actual value that we want to use + @return: None or element not meeting the criteria + + + """ + if value in [None, constants.VALUE_AUTO]: + return None + max_v = ipolicy[constants.ISPECS_MAX].get(name, value) + min_v = ipolicy[constants.ISPECS_MIN].get(name, value) + if value > max_v or min_v > value: + return ("%s value %s is not in range [%s, %s]" % + (name, value, min_v, max_v)) + return None def _ExpandItemName(fn, name, kind): @@ -938,7 +1064,7 @@ def _ExpandInstanceName(cfg, name): def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, - memory, vcpus, nics, disk_template, disks, + minmem, maxmem, vcpus, nics, disk_template, disks, bep, hvp, hypervisor_name, tags): """Builds instance related env variables for hooks @@ -952,10 +1078,12 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, @param secondary_nodes: list of secondary nodes as strings @type os_type: string @param os_type: the name of the instance's OS - @type status: boolean - @param status: the should_run status of the instance - @type memory: string - @param memory: the memory size of the instance + @type status: string + @param status: the desired status of the instance + @type minmem: string + @param minmem: the minimum memory size of the instance + @type maxmem: string + @param maxmem: the maximum memory size of the instance @type vcpus: string @param vcpus: the count of VCPUs the instance has @type nics: list @@ -977,23 +1105,21 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status, @return: the hook environment for this instance """ - if status: - str_status = "up" - else: - str_status = "down" env = { "OP_TARGET": name, "INSTANCE_NAME": name, "INSTANCE_PRIMARY": primary_node, "INSTANCE_SECONDARIES": " ".join(secondary_nodes), "INSTANCE_OS_TYPE": os_type, - "INSTANCE_STATUS": str_status, - "INSTANCE_MEMORY": memory, + "INSTANCE_STATUS": status, + "INSTANCE_MINMEM": minmem, + "INSTANCE_MAXMEM": maxmem, + # TODO(2.7) remove deprecated "memory" value + "INSTANCE_MEMORY": maxmem, "INSTANCE_VCPUS": vcpus, "INSTANCE_DISK_TEMPLATE": disk_template, "INSTANCE_HYPERVISOR": hypervisor_name, } - if nics: nic_count = len(nics) for idx, (ip, mac, mode, link) in enumerate(nics): @@ -1079,8 +1205,9 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None): "primary_node": instance.primary_node, "secondary_nodes": instance.secondary_nodes, "os_type": instance.os, - "status": instance.admin_up, - "memory": bep[constants.BE_MEMORY], + "status": instance.admin_state, + "maxmem": bep[constants.BE_MAXMEM], + "minmem": bep[constants.BE_MINMEM], "vcpus": bep[constants.BE_VCPUS], "nics": _NICListToTuple(lu, instance.nics), "disk_template": instance.disk_template, @@ -1122,6 +1249,14 @@ def _DecideSelfPromotion(lu, exceptions=None): return mc_now < mc_should +def _CalculateGroupIPolicy(cfg, group): + """Calculate instance policy for group. + + """ + cluster = cfg.GetClusterInfo() + return cluster.SimpleFillIPolicy(group.ipolicy) + + def _CheckNicsBridgesExist(lu, target_nics, target_node): """Check that the brigdes needed by a list of nics exist. @@ -1355,15 +1490,17 @@ class LUClusterDestroy(LogicalUnit): """Destroys the cluster. """ - (master, ip, dev, netmask, _) = self.cfg.GetMasterNetworkParameters() + master_params = self.cfg.GetMasterNetworkParameters() # Run post hooks on master node before it's removed - _RunPostHook(self, master) + _RunPostHook(self, master_params.name) - result = self.rpc.call_node_deactivate_master_ip(master, ip, netmask, dev) + ems = self.cfg.GetUseExternalMipScript() + result = self.rpc.call_node_deactivate_master_ip(master_params.name, + master_params, ems) result.Raise("Could not disable the master role") - return master + return master_params.name def _VerifyCertificate(filename): @@ -1936,6 +2073,26 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): _ErrorIf(bool(missing), constants.CV_ENODENET, node, "missing bridges: %s" % utils.CommaJoin(sorted(missing))) + def _VerifyNodeUserScripts(self, ninfo, nresult): + """Check the results of user scripts presence and executability on the node + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + + """ + node = ninfo.name + + test = not constants.NV_USERSCRIPTS in nresult + self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node, + "did not return user scripts information") + + broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None) + if not test: + self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node, + "user scripts not present or not executable: %s" % + utils.CommaJoin(sorted(broken_scripts))) + def _VerifyNodeNetwork(self, ninfo, nresult): """Check the node network connectivity results. @@ -1978,6 +2135,34 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): msg = "cannot reach the master IP" _ErrorIf(True, constants.CV_ENODENET, node, msg) + def _VerifyInstancePolicy(self, instance): + """Verify instance specs against instance policy set on node group level. + + + """ + cluster = self.cfg.GetClusterInfo() + full_beparams = cluster.FillBE(instance) + ipolicy = cluster.SimpleFillIPolicy(self.group_info.ipolicy) + + mem_size = full_beparams.get(constants.BE_MAXMEM, None) + cpu_count = full_beparams.get(constants.BE_VCPUS, None) + disk_count = len(instance.disks) + disk_sizes = [disk.size for disk in instance.disks] + nic_count = len(instance.nics) + + test_settings = [ + (constants.ISPEC_MEM_SIZE, mem_size), + (constants.ISPEC_CPU_COUNT, cpu_count), + (constants.ISPEC_DISK_COUNT, disk_count), + (constants.ISPEC_NIC_COUNT, nic_count), + ] + map((lambda d: (constants.ISPEC_DISK_SIZE, d)), disk_sizes) + + for (name, value) in test_settings: + test_result = _CheckMinMaxSpecs(name, ipolicy, value) + self._ErrorIf(test_result is not None, + constants.CV_EINSTANCEPOLICY, instance.name, + test_result) + def _VerifyInstance(self, instance, instanceconfig, node_image, diskstatus): """Verify an instance. @@ -1992,6 +2177,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node_vol_should = {} instanceconfig.MapLVsByNode(node_vol_should) + self._VerifyInstancePolicy(instanceconfig) + for node in node_vol_should: n_img = node_image[node] if n_img.offline or n_img.rpc_fail or n_img.lvm_fail: @@ -2002,7 +2189,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance, "volume %s missing on node %s", volume, node) - if instanceconfig.admin_up: + if instanceconfig.admin_state == constants.ADMINST_UP: pri_img = node_image[node_current] test = instance not in pri_img.instances and not pri_img.offline _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance, @@ -2018,12 +2205,13 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # node here snode = node_image[nname] bad_snode = snode.ghost or snode.offline - _ErrorIf(instanceconfig.admin_up and not success and not bad_snode, + _ErrorIf(instanceconfig.admin_state == constants.ADMINST_UP and + not success and not bad_snode, constants.CV_EINSTANCEFAULTYDISK, instance, "couldn't retrieve status for disk/%s on %s: %s", idx, nname, bdev_status) - _ErrorIf((instanceconfig.admin_up and success and - bdev_status.ldisk_status == constants.LDS_FAULTY), + _ErrorIf((instanceconfig.admin_state == constants.ADMINST_UP and + success and bdev_status.ldisk_status == constants.LDS_FAULTY), constants.CV_EINSTANCEFAULTYDISK, instance, "disk/%s on %s is faulty", idx, nname) @@ -2071,12 +2259,14 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # we already list instances living on such nodes, and that's # enough warning continue + #TODO(dynmem): use MINMEM for checking + #TODO(dynmem): also consider ballooning out other instances for prinode, instances in n_img.sbp.items(): needed_mem = 0 for instance in instances: bep = cluster_info.FillBE(instance_cfg[instance]) if bep[constants.BE_AUTO_BALANCE]: - needed_mem += bep[constants.BE_MEMORY] + needed_mem += bep[constants.BE_MAXMEM] test = n_img.mfree < needed_mem self._ErrorIf(test, constants.CV_ENODEN1, node, "not enough memory to accomodate instance failovers" @@ -2231,7 +2421,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node_drbd[minor] = (instance, False) else: instance = instanceinfo[instance] - node_drbd[minor] = (instance.name, instance.admin_up) + node_drbd[minor] = (instance.name, + instance.admin_state == constants.ADMINST_UP) # and now check them used_minors = nresult.get(constants.NV_DRBDLIST, []) @@ -2629,6 +2820,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): i_non_redundant = [] # Non redundant instances i_non_a_balanced = [] # Non auto-balanced instances + i_offline = 0 # Count of offline instances n_offline = 0 # Count of offline nodes n_drained = 0 # Count of nodes being drained node_vol_should = {} @@ -2644,6 +2836,10 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names)) + user_scripts = [] + if self.cfg.GetUseExternalMipScript(): + user_scripts.append(constants.EXTERNAL_MASTER_SETUP_SCRIPT) + node_verify_param = { constants.NV_FILELIST: utils.UniqueSequence(filename @@ -2666,6 +2862,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): constants.NV_MASTERIP: (master_node, master_ip), constants.NV_OSLIST: None, constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(), + constants.NV_USERSCRIPTS: user_scripts, } if vg_name is not None: @@ -2824,6 +3021,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): nimg.call_ok = self._VerifyNode(node_i, nresult) self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime) self._VerifyNodeNetwork(node_i, nresult) + self._VerifyNodeUserScripts(node_i, nresult) self._VerifyOob(node_i, nresult) if nimg.vm_capable: @@ -2848,6 +3046,12 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): non_primary_inst = set(nimg.instances).difference(nimg.pinst) for inst in non_primary_inst: + # FIXME: investigate best way to handle offline insts + if inst.admin_state == constants.ADMINST_OFFLINE: + if verbose: + feedback_fn("* Skipping offline instance %s" % inst.name) + i_offline += 1 + continue test = inst in self.all_inst_info _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst, "instance should not run on node %s", node_i.name) @@ -2873,7 +3077,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): constants.CV_ENODERPC, pnode, "instance %s, connection to" " primary node failed", instance) - _ErrorIf(inst_config.admin_up and pnode_img.offline, + _ErrorIf(inst_config.admin_state == constants.ADMINST_UP and + pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance, "instance is marked as running and lives on offline node %s", inst_config.primary_node) @@ -2965,6 +3170,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found." % len(i_non_a_balanced)) + if i_offline: + feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline) + if n_offline: feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline) @@ -3127,8 +3335,8 @@ class LUGroupVerifyDisks(NoHooksLU): res_missing = {} nv_dict = _MapInstanceDisksToNodes([inst - for inst in self.instances.values() - if inst.admin_up]) + for inst in self.instances.values() + if inst.admin_state == constants.ADMINST_UP]) if nv_dict: nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) & @@ -3169,21 +3377,24 @@ class LUClusterRepairDiskSizes(NoHooksLU): if self.op.instances: self.wanted_names = _GetWantedInstances(self, self.op.instances) self.needed_locks = { - locking.LEVEL_NODE: [], + locking.LEVEL_NODE_RES: [], locking.LEVEL_INSTANCE: self.wanted_names, } - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE else: self.wanted_names = None self.needed_locks = { - locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_NODE_RES: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, } - self.share_locks = _ShareAll() + self.share_locks = { + locking.LEVEL_NODE_RES: 1, + locking.LEVEL_INSTANCE: 0, + } def DeclareLocks(self, level): - if level == locking.LEVEL_NODE and self.wanted_names is not None: - self._LockInstancesNodes(primary_only=True) + if level == locking.LEVEL_NODE_RES and self.wanted_names is not None: + self._LockInstancesNodes(primary_only=True, level=level) def CheckPrereq(self): """Check prerequisites. @@ -3234,6 +3445,11 @@ class LUClusterRepairDiskSizes(NoHooksLU): for idx, disk in enumerate(instance.disks): per_node_disks[pnode].append((instance, idx, disk)) + assert not (frozenset(per_node_disks.keys()) - + self.owned_locks(locking.LEVEL_NODE_RES)), \ + "Not owning correct locks" + assert not self.owned_locks(locking.LEVEL_NODE) + changed = [] for node, dskl in per_node_disks.items(): newl = [v[2].Copy() for v in dskl] @@ -3326,8 +3542,10 @@ class LUClusterRename(LogicalUnit): new_ip = self.ip # shutdown the master IP - (master, ip, dev, netmask, family) = self.cfg.GetMasterNetworkParameters() - result = self.rpc.call_node_deactivate_master_ip(master, ip, netmask, dev) + master_params = self.cfg.GetMasterNetworkParameters() + ems = self.cfg.GetUseExternalMipScript() + result = self.rpc.call_node_deactivate_master_ip(master_params.name, + master_params, ems) result.Raise("Could not disable the master role") try: @@ -3340,13 +3558,14 @@ class LUClusterRename(LogicalUnit): ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE) node_list = self.cfg.GetOnlineNodeList() try: - node_list.remove(master) + node_list.remove(master_params.name) except ValueError: pass _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE) finally: - result = self.rpc.call_node_activate_master_ip(master, new_ip, netmask, - dev, family) + master_params.ip = new_ip + result = self.rpc.call_node_activate_master_ip(master_params.name, + master_params, ems) msg = result.fail_msg if msg: self.LogWarning("Could not re-enable the master role on" @@ -3400,6 +3619,10 @@ class LUClusterSetParams(LogicalUnit): if self.op.master_netmask is not None: _ValidateNetmask(self.cfg, self.op.master_netmask) + if self.op.diskparams: + for dt_params in self.op.diskparams.values(): + utils.ForceDictType(dt_params, constants.DISK_DT_TYPES) + def ExpandNames(self): # FIXME: in the future maybe other cluster params won't require checking on # all nodes to be modified. @@ -3481,6 +3704,7 @@ class LUClusterSetParams(LogicalUnit): self.cluster = cluster = self.cfg.GetClusterInfo() # validate params changes if self.op.beparams: + objects.UpgradeBeParams(self.op.beparams) utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) self.new_beparams = cluster.SimpleFillBE(self.op.beparams) @@ -3494,6 +3718,29 @@ class LUClusterSetParams(LogicalUnit): self.new_ndparams["oob_program"] = \ constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM] + if self.op.hv_state: + new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, + self.cluster.hv_state_static) + self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values)) + for hv, values in new_hv_state.items()) + + if self.op.disk_state: + new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state, + self.cluster.disk_state_static) + self.new_disk_state = \ + dict((storage, dict((name, cluster.SimpleFillDiskState(values)) + for name, values in svalues.items())) + for storage, svalues in new_disk_state.items()) + + if self.op.ipolicy: + ipolicy = {} + for key, value in self.op.ipolicy.items(): + utils.ForceDictType(value, constants.ISPECS_PARAMETER_TYPES) + ipolicy[key] = _GetUpdatedParams(cluster.ipolicy.get(key, {}), + value) + objects.InstancePolicy.CheckParameterSyntax(ipolicy) + self.new_ipolicy = ipolicy + if self.op.nicparams: utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES) self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams) @@ -3531,6 +3778,15 @@ class LUClusterSetParams(LogicalUnit): else: self.new_hvparams[hv_name].update(hv_dict) + # disk template parameters + self.new_diskparams = objects.FillDict(cluster.diskparams, {}) + if self.op.diskparams: + for dt_name, dt_params in self.op.diskparams.items(): + if dt_name not in self.op.diskparams: + self.new_diskparams[dt_name] = dt_params + else: + self.new_diskparams[dt_name].update(dt_params) + # os hypervisor parameters self.new_os_hvp = objects.FillDict(cluster.os_hvp, {}) if self.op.os_hvp: @@ -3645,10 +3901,18 @@ class LUClusterSetParams(LogicalUnit): self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams if self.op.nicparams: self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams + if self.op.ipolicy: + self.cluster.ipolicy = self.new_ipolicy if self.op.osparams: self.cluster.osparams = self.new_osp if self.op.ndparams: self.cluster.ndparams = self.new_ndparams + if self.op.diskparams: + self.cluster.diskparams = self.new_diskparams + if self.op.hv_state: + self.cluster.hv_state_static = self.new_hv_state + if self.op.disk_state: + self.cluster.disk_state_static = self.new_disk_state if self.op.candidate_pool_size is not None: self.cluster.candidate_pool_size = self.op.candidate_pool_size @@ -3656,6 +3920,9 @@ class LUClusterSetParams(LogicalUnit): _AdjustCandidatePool(self, []) if self.op.maintain_node_health is not None: + if self.op.maintain_node_health and not constants.ENABLE_CONFD: + feedback_fn("Note: CONFD was disabled at build time, node health" + " maintenance is not useful (still enabling it)") self.cluster.maintain_node_health = self.op.maintain_node_health if self.op.prealloc_wipe_disks is not None: @@ -3676,6 +3943,9 @@ class LUClusterSetParams(LogicalUnit): if self.op.reserved_lvs is not None: self.cluster.reserved_lvs = self.op.reserved_lvs + if self.op.use_external_mip_script is not None: + self.cluster.use_external_mip_script = self.op.use_external_mip_script + def helper_os(aname, mods, desc): desc += " OS list" lst = getattr(self.cluster, aname) @@ -3700,36 +3970,40 @@ class LUClusterSetParams(LogicalUnit): helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted") if self.op.master_netdev: - (master, ip, dev, netmask, _) = self.cfg.GetMasterNetworkParameters() + master_params = self.cfg.GetMasterNetworkParameters() + ems = self.cfg.GetUseExternalMipScript() feedback_fn("Shutting down master ip on the current netdev (%s)" % self.cluster.master_netdev) - result = self.rpc.call_node_deactivate_master_ip(master, ip, netmask, dev) + result = self.rpc.call_node_deactivate_master_ip(master_params.name, + master_params, ems) result.Raise("Could not disable the master ip") feedback_fn("Changing master_netdev from %s to %s" % - (dev, self.op.master_netdev)) + (master_params.netdev, self.op.master_netdev)) self.cluster.master_netdev = self.op.master_netdev if self.op.master_netmask: - (master, ip, dev, old_netmask, _) = self.cfg.GetMasterNetworkParameters() + master_params = self.cfg.GetMasterNetworkParameters() feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask) - result = self.rpc.call_node_change_master_netmask(master, old_netmask, + result = self.rpc.call_node_change_master_netmask(master_params.name, + master_params.netmask, self.op.master_netmask, - ip, dev) + master_params.ip, + master_params.netdev) if result.fail_msg: msg = "Could not change the master IP netmask: %s" % result.fail_msg - self.LogWarning(msg) feedback_fn(msg) - else: - self.cluster.master_netmask = self.op.master_netmask + + self.cluster.master_netmask = self.op.master_netmask self.cfg.Update(self.cluster, feedback_fn) if self.op.master_netdev: - (master, ip, dev, netmask, family) = self.cfg.GetMasterNetworkParameters() + master_params = self.cfg.GetMasterNetworkParameters() feedback_fn("Starting the master ip on the new master netdev (%s)" % self.op.master_netdev) - result = self.rpc.call_node_activate_master_ip(master, ip, netmask, dev, - family) + ems = self.cfg.GetUseExternalMipScript() + result = self.rpc.call_node_activate_master_ip(master_params.name, + master_params, ems) if result.fail_msg: self.LogWarning("Could not re-enable the master ip on" " the master, please restart manually: %s", @@ -3786,9 +4060,14 @@ def _ComputeAncillaryFiles(cluster, redist): # Files which should only be on master candidates files_mc = set() + if not redist: files_mc.add(constants.CLUSTER_CONF_FILE) + # FIXME: this should also be replicated but Ganeti doesn't support files_mc + # replication + files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT) + # Files which should only be on VM-capable nodes files_vm = set(filename for hv_name in cluster.enabled_hypervisors @@ -3891,8 +4170,11 @@ class LUClusterActivateMasterIp(NoHooksLU): """Activate the master IP. """ - (master, ip, dev, netmask, family) = self.cfg.GetMasterNetworkParameters() - self.rpc.call_node_activate_master_ip(master, ip, netmask, dev, family) + master_params = self.cfg.GetMasterNetworkParameters() + ems = self.cfg.GetUseExternalMipScript() + result = self.rpc.call_node_activate_master_ip(master_params.name, + master_params, ems) + result.Raise("Could not activate the master IP") class LUClusterDeactivateMasterIp(NoHooksLU): @@ -3903,8 +4185,11 @@ class LUClusterDeactivateMasterIp(NoHooksLU): """Deactivate the master IP. """ - (master, ip, dev, netmask, _) = self.cfg.GetMasterNetworkParameters() - self.rpc.call_node_deactivate_master_ip(master, ip, netmask, dev) + master_params = self.cfg.GetMasterNetworkParameters() + ems = self.cfg.GetUseExternalMipScript() + result = self.rpc.call_node_deactivate_master_ip(master_params.name, + master_params, ems) + result.Raise("Could not deactivate the master IP") def _WaitForSync(lu, instance, disks=None, oneshot=False): @@ -4408,7 +4693,7 @@ class LUNodeRemove(LogicalUnit): raise errors.OpPrereqError("Node is the master node, failover to another" " node is required", errors.ECODE_INVAL) - for instance_name, instance in self.cfg.GetAllInstancesInfo(): + for instance_name, instance in self.cfg.GetAllInstancesInfo().items(): if node.name in instance.all_nodes: raise errors.OpPrereqError("Instance %s is still running on the node," " please remove first" % instance_name, @@ -4426,6 +4711,9 @@ class LUNodeRemove(LogicalUnit): modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup + assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ + "Not owning BGL" + # Promote nodes to master candidate as needed _AdjustCandidatePool(self, exceptions=[node.name]) self.context.RemoveNode(node.name) @@ -4484,9 +4772,9 @@ class _NodeQuery(_QueryBase): # filter out non-vm_capable nodes toquery_nodes = [name for name in nodenames if all_info[name].vm_capable] - node_data = lu.rpc.call_node_info(toquery_nodes, lu.cfg.GetVGName(), - lu.cfg.GetHypervisorType()) - live_data = dict((name, nresult.payload) + node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()], + [lu.cfg.GetHypervisorType()]) + live_data = dict((name, _MakeLegacyNodeInfo(nresult.payload)) for (name, nresult) in node_data.items() if not nresult.fail_msg and nresult.payload) else: @@ -4539,6 +4827,9 @@ class LUNodeQuery(NoHooksLU): def ExpandNames(self): self.nq.ExpandNames(self) + def DeclareLocks(self, level): + self.nq.DeclareLocks(self, level) + def Exec(self, feedback_fn): return self.nq.OldStyleQuery(self) @@ -4557,8 +4848,9 @@ class LUNodeQueryvols(NoHooksLU): selected=self.op.output_fields) def ExpandNames(self): + self.share_locks = _ShareAll() self.needed_locks = {} - self.share_locks[locking.LEVEL_NODE] = 1 + if not self.op.nodes: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET else: @@ -4625,8 +4917,8 @@ class LUNodeQueryStorage(NoHooksLU): selected=self.op.output_fields) def ExpandNames(self): + self.share_locks = _ShareAll() self.needed_locks = {} - self.share_locks[locking.LEVEL_NODE] = 1 if self.op.nodes: self.needed_locks[locking.LEVEL_NODE] = \ @@ -5092,6 +5384,9 @@ class LUNodeAdd(LogicalUnit): new_node = self.new_node node = new_node.name + assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ + "Not owning BGL" + # We adding a new node so we assume it's powered new_node.powered = True @@ -5206,7 +5501,8 @@ class LUNodeSetParams(LogicalUnit): self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) all_mods = [self.op.offline, self.op.master_candidate, self.op.drained, self.op.master_capable, self.op.vm_capable, - self.op.secondary_ip, self.op.ndparams] + self.op.secondary_ip, self.op.ndparams, self.op.hv_state, + self.op.disk_state] if all_mods.count(None) == len(all_mods): raise errors.OpPrereqError("Please pass at least one modification", errors.ECODE_INVAL) @@ -5230,35 +5526,32 @@ class LUNodeSetParams(LogicalUnit): self.lock_all = self.op.auto_promote and self.might_demote self.lock_instances = self.op.secondary_ip is not None + def _InstanceFilter(self, instance): + """Filter for getting affected instances. + + """ + return (instance.disk_template in constants.DTS_INT_MIRROR and + self.op.node_name in instance.all_nodes) + def ExpandNames(self): if self.lock_all: self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET} else: self.needed_locks = {locking.LEVEL_NODE: self.op.node_name} - if self.lock_instances: - self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET + # Since modifying a node can have severe effects on currently running + # operations the resource lock is at least acquired in shared mode + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE] - def DeclareLocks(self, level): - # If we have locked all instances, before waiting to lock nodes, release - # all the ones living on nodes unrelated to the current operation. - if level == locking.LEVEL_NODE and self.lock_instances: - self.affected_instances = [] - if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: - instances_keep = [] - - # Build list of instances to release - locked_i = self.owned_locks(locking.LEVEL_INSTANCE) - for instance_name, instance in self.cfg.GetMultiInstanceInfo(locked_i): - if (instance.disk_template in constants.DTS_INT_MIRROR and - self.op.node_name in instance.all_nodes): - instances_keep.append(instance_name) - self.affected_instances.append(instance) - - _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep) - - assert (set(self.owned_locks(locking.LEVEL_INSTANCE)) == - set(instances_keep)) + # Get node resource and instance locks in shared mode; they are not used + # for anything but read-only access + self.share_locks[locking.LEVEL_NODE_RES] = 1 + self.share_locks[locking.LEVEL_INSTANCE] = 1 + + if self.lock_instances: + self.needed_locks[locking.LEVEL_INSTANCE] = \ + frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)) def BuildHooksEnv(self): """Build hooks env. @@ -5290,6 +5583,25 @@ class LUNodeSetParams(LogicalUnit): """ node = self.node = self.cfg.GetNodeInfo(self.op.node_name) + if self.lock_instances: + affected_instances = \ + self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) + + # Verify instance locks + owned_instances = self.owned_locks(locking.LEVEL_INSTANCE) + wanted_instances = frozenset(affected_instances.keys()) + if wanted_instances - owned_instances: + raise errors.OpPrereqError("Instances affected by changing node %s's" + " secondary IP address have changed since" + " locks were acquired, wanted '%s', have" + " '%s'; retry the operation" % + (self.op.node_name, + utils.CommaJoin(wanted_instances), + utils.CommaJoin(owned_instances)), + errors.ECODE_STATE) + else: + affected_instances = None + if (self.op.master_candidate is not None or self.op.drained is not None or self.op.offline is not None): @@ -5400,16 +5712,21 @@ class LUNodeSetParams(LogicalUnit): raise errors.OpPrereqError("Cannot change the secondary ip on a single" " homed cluster", errors.ECODE_INVAL) + assert not (frozenset(affected_instances) - + self.owned_locks(locking.LEVEL_INSTANCE)) + if node.offline: - if self.affected_instances: - raise errors.OpPrereqError("Cannot change secondary ip: offline" - " node has instances (%s) configured" - " to use it" % self.affected_instances) + if affected_instances: + raise errors.OpPrereqError("Cannot change secondary IP address:" + " offline node has instances (%s)" + " configured to use it" % + utils.CommaJoin(affected_instances.keys())) else: # On online nodes, check that no instances are running, and that # the node has the new ip and we can reach it. - for instance in self.affected_instances: - _CheckInstanceDown(self, instance, "cannot change secondary ip") + for instance in affected_instances.values(): + _CheckInstanceState(self, instance, INSTANCE_DOWN, + msg="cannot change secondary ip") _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True) if master.name != node.name: @@ -5426,6 +5743,15 @@ class LUNodeSetParams(LogicalUnit): utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES) self.new_ndparams = new_ndparams + if self.op.hv_state: + self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, + self.node.hv_state_static) + + if self.op.disk_state: + self.new_disk_state = \ + _MergeAndVerifyDiskState(self.op.disk_state, + self.node.disk_state_static) + def Exec(self, feedback_fn): """Modifies a node. @@ -5442,6 +5768,12 @@ class LUNodeSetParams(LogicalUnit): if self.op.powered is not None: node.powered = self.op.powered + if self.op.hv_state: + node.hv_state_static = self.new_hv_state + + if self.op.disk_state: + node.disk_state_static = self.new_disk_state + for attr in ["master_capable", "vm_capable"]: val = getattr(self.op, attr) if val is not None: @@ -5549,18 +5881,20 @@ class LUClusterQuery(NoHooksLU): "architecture": (platform.architecture()[0], platform.machine()), "name": cluster.cluster_name, "master": cluster.master_node, - "default_hypervisor": cluster.enabled_hypervisors[0], + "default_hypervisor": cluster.primary_hypervisor, "enabled_hypervisors": cluster.enabled_hypervisors, "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name]) for hypervisor_name in cluster.enabled_hypervisors]), "os_hvp": os_hvp, "beparams": cluster.beparams, "osparams": cluster.osparams, + "ipolicy": cluster.ipolicy, "nicparams": cluster.nicparams, "ndparams": cluster.ndparams, "candidate_pool_size": cluster.candidate_pool_size, "master_netdev": cluster.master_netdev, "master_netmask": cluster.master_netmask, + "use_external_mip_script": cluster.use_external_mip_script, "volume_group_name": cluster.volume_group_name, "drbd_usermode_helper": cluster.drbd_usermode_helper, "file_storage_dir": cluster.file_storage_dir, @@ -5806,7 +6140,7 @@ def _SafeShutdownInstanceDisks(lu, instance, disks=None): _ShutdownInstanceDisks. """ - _CheckInstanceDown(lu, instance, "cannot shutdown disks") + _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks") _ShutdownInstanceDisks(lu, instance, disks=disks) @@ -5876,10 +6210,12 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name): we cannot check the node """ - nodeinfo = lu.rpc.call_node_info([node], None, hypervisor_name) + nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name]) nodeinfo[node].Raise("Can't get data from node %s" % node, prereq=True, ecode=errors.ECODE_ENVIRON) - free_mem = nodeinfo[node].payload.get("memory_free", None) + (_, _, (hv_info, )) = nodeinfo[node].payload + + free_mem = hv_info.get("memory_free", None) if not isinstance(free_mem, int): raise errors.OpPrereqError("Can't compute free memory on node %s, result" " was '%s'" % (node, free_mem), @@ -5934,12 +6270,13 @@ def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested): or we cannot check the node """ - nodeinfo = lu.rpc.call_node_info(nodenames, vg, None) + nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None) for node in nodenames: info = nodeinfo[node] info.Raise("Cannot get current information from node %s" % node, prereq=True, ecode=errors.ECODE_ENVIRON) - vg_free = info.payload.get("vg_free", None) + (_, (vg_info, ), _) = info.payload + vg_free = vg_info.get("vg_free", None) if not isinstance(vg_free, int): raise errors.OpPrereqError("Can't compute free disk space on node" " %s for vg %s, result was '%s'" % @@ -5969,12 +6306,13 @@ def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name): or we cannot check the node """ - nodeinfo = lu.rpc.call_node_info(nodenames, None, hypervisor_name) + nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name]) for node in nodenames: info = nodeinfo[node] info.Raise("Cannot get current information from node %s" % node, prereq=True, ecode=errors.ECODE_ENVIRON) - num_cpus = info.payload.get("cpu_total", None) + (_, _, (hv_info, )) = info.payload + num_cpus = hv_info.get("cpu_total", None) if not isinstance(num_cpus, int): raise errors.OpPrereqError("Can't compute the number of physical CPUs" " on node %s, result was '%s'" % @@ -5997,6 +6335,7 @@ class LUInstanceStartup(LogicalUnit): # extra beparams if self.op.beparams: # fill the beparams dict + objects.UpgradeBeParams(self.op.beparams) utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) def ExpandNames(self): @@ -6044,6 +6383,8 @@ class LUInstanceStartup(LogicalUnit): hv_type.CheckParameterSyntax(filled_hvp) _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp) + _CheckInstanceState(self, instance, INSTANCE_ONLINE) + self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline if self.primary_offline and self.op.ignore_offline_nodes: @@ -6067,7 +6408,7 @@ class LUInstanceStartup(LogicalUnit): if not remote_info.payload: # not running already _CheckNodeFreeMemory(self, instance.primary_node, "starting instance %s" % instance.name, - bep[constants.BE_MEMORY], instance.hypervisor) + bep[constants.BE_MAXMEM], instance.hypervisor) def Exec(self, feedback_fn): """Start the instance. @@ -6141,7 +6482,7 @@ class LUInstanceReboot(LogicalUnit): self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name) assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name - + _CheckInstanceState(self, instance, INSTANCE_ONLINE) _CheckNodeOnline(self, instance.primary_node) # check bridges existence @@ -6230,6 +6571,8 @@ class LUInstanceShutdown(LogicalUnit): assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name + _CheckInstanceState(self, self.instance, INSTANCE_ONLINE) + self.primary_offline = \ self.cfg.GetNodeInfo(self.instance.primary_node).offline @@ -6306,7 +6649,7 @@ class LUInstanceReinstall(LogicalUnit): raise errors.OpPrereqError("Instance '%s' has no disks" % self.op.instance_name, errors.ECODE_INVAL) - _CheckInstanceDown(self, instance, "cannot reinstall") + _CheckInstanceState(self, instance, INSTANCE_DOWN, msg="cannot reinstall") if self.op.os_type is not None: # OS verification @@ -6379,6 +6722,10 @@ class LUInstanceRecreateDisks(LogicalUnit): # otherwise we need to lock all nodes for disk re-creation primary_only = bool(self.op.nodes) self._LockInstancesNodes(primary_only=primary_only) + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -6425,10 +6772,12 @@ class LUInstanceRecreateDisks(LogicalUnit): self.op.instance_name, errors.ECODE_INVAL) # if we replace nodes *and* the old primary is offline, we don't # check - assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE] + assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE) + assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE_RES) old_pnode = self.cfg.GetNodeInfo(instance.primary_node) if not (self.op.nodes and old_pnode.offline): - _CheckInstanceDown(self, instance, "cannot recreate disks") + _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, + msg="cannot recreate disks") if not self.op.disks: self.op.disks = range(len(instance.disks)) @@ -6449,6 +6798,9 @@ class LUInstanceRecreateDisks(LogicalUnit): """ instance = self.instance + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)) + to_skip = [] mods = [] # keeps track of needed logical_id changes @@ -6531,13 +6883,14 @@ class LUInstanceRename(LogicalUnit): instance = self.cfg.GetInstanceInfo(self.op.instance_name) assert instance is not None _CheckNodeOnline(self, instance.primary_node) - _CheckInstanceDown(self, instance, "cannot rename") + _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING, + msg="cannot rename") self.instance = instance new_name = self.op.new_name if self.op.name_check: hostname = netutils.GetHostname(name=new_name) - if hostname != new_name: + if hostname.name != new_name: self.LogInfo("Resolved given name '%s' to '%s'", new_name, hostname.name) if not utils.MatchNameComponent(self.op.new_name, [hostname.name]): @@ -6617,11 +6970,16 @@ class LUInstanceRemove(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] + self.needed_locks[locking.LEVEL_NODE_RES] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): if level == locking.LEVEL_NODE: self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -6670,6 +7028,12 @@ class LUInstanceRemove(LogicalUnit): " node %s: %s" % (instance.name, instance.primary_node, msg)) + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)) + assert not (set(instance.all_nodes) - + self.owned_locks(locking.LEVEL_NODE)), \ + "Not owning correct locks" + _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures) @@ -6883,11 +7247,16 @@ class LUInstanceMove(LogicalUnit): target_node = _ExpandNodeName(self.cfg, self.op.target_node) self.op.target_node = target_node self.needed_locks[locking.LEVEL_NODE] = [target_node] + self.needed_locks[locking.LEVEL_NODE_RES] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND def DeclareLocks(self, level): if level == locking.LEVEL_NODE: self._LockInstancesNodes(primary_only=True) + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -6945,10 +7314,10 @@ class LUInstanceMove(LogicalUnit): _CheckNodeNotDrained(self, target_node) _CheckNodeVmCapable(self, target_node) - if instance.admin_up: + if instance.admin_state == constants.ADMINST_UP: # check memory requirements on the secondary node _CheckNodeFreeMemory(self, target_node, "failing over instance %s" % - instance.name, bep[constants.BE_MEMORY], + instance.name, bep[constants.BE_MAXMEM], instance.hypervisor) else: self.LogInfo("Not checking memory on the secondary node as" @@ -6972,6 +7341,9 @@ class LUInstanceMove(LogicalUnit): self.LogInfo("Shutting down instance %s on source node %s", instance.name, source_node) + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)) + result = self.rpc.call_instance_shutdown(source_node, instance, self.op.shutdown_timeout) msg = result.fail_msg @@ -7036,7 +7408,7 @@ class LUInstanceMove(LogicalUnit): _RemoveDisks(self, instance, target_node=source_node) # Only start the instance if it's marked as up - if instance.admin_up: + if instance.admin_state == constants.ADMINST_UP: self.LogInfo("Starting instance %s on node %s", instance.name, target_node) @@ -7174,10 +7546,11 @@ class TLMigrateInstance(Tasklet): assert instance is not None self.instance = instance - if (not self.cleanup and not instance.admin_up and not self.failover and - self.fallback): - self.lu.LogInfo("Instance is marked down, fallback allowed, switching" - " to failover") + if (not self.cleanup and + not instance.admin_state == constants.ADMINST_UP and + not self.failover and self.fallback): + self.lu.LogInfo("Instance is marked down or offline, fallback allowed," + " switching to failover") self.failover = True if instance.disk_template not in constants.DTS_MIRRORED: @@ -7236,14 +7609,26 @@ class TLMigrateInstance(Tasklet): i_be = self.cfg.GetClusterInfo().FillBE(instance) # check memory requirements on the secondary node - if not self.failover or instance.admin_up: + if not self.failover or instance.admin_state == constants.ADMINST_UP: _CheckNodeFreeMemory(self.lu, target_node, "migrating instance %s" % - instance.name, i_be[constants.BE_MEMORY], + instance.name, i_be[constants.BE_MAXMEM], instance.hypervisor) else: self.lu.LogInfo("Not checking memory on the secondary node as" " instance will not be started") + # check if failover must be forced instead of migration + if (not self.cleanup and not self.failover and + i_be[constants.BE_ALWAYS_FAILOVER]): + if self.fallback: + self.lu.LogInfo("Instance configured to always failover; fallback" + " to failover") + self.failover = True + else: + raise errors.OpPrereqError("This instance has been configured to" + " always failover, please allow failover", + errors.ECODE_STATE) + # check bridge existance _CheckInstanceBridgesExist(self.lu, instance, node=target_node) @@ -7507,14 +7892,17 @@ class TLMigrateInstance(Tasklet): # Check for hypervisor version mismatch and warn the user. nodeinfo = self.rpc.call_node_info([source_node, target_node], - None, self.instance.hypervisor) - src_info = nodeinfo[source_node] - dst_info = nodeinfo[target_node] - - if ((constants.HV_NODEINFO_KEY_VERSION in src_info.payload) and - (constants.HV_NODEINFO_KEY_VERSION in dst_info.payload)): - src_version = src_info.payload[constants.HV_NODEINFO_KEY_VERSION] - dst_version = dst_info.payload[constants.HV_NODEINFO_KEY_VERSION] + None, [self.instance.hypervisor]) + for ninfo in nodeinfo.values(): + ninfo.Raise("Unable to retrieve node information from node '%s'" % + ninfo.node) + (_, _, (src_info, )) = nodeinfo[source_node].payload + (_, _, (dst_info, )) = nodeinfo[target_node].payload + + if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and + (constants.HV_NODEINFO_KEY_VERSION in dst_info)): + src_version = src_info[constants.HV_NODEINFO_KEY_VERSION] + dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION] if src_version != dst_version: self.feedback_fn("* warning: hypervisor version mismatch between" " source (%s) and target (%s) node" % @@ -7653,7 +8041,7 @@ class TLMigrateInstance(Tasklet): source_node = instance.primary_node target_node = self.target_node - if instance.admin_up: + if instance.admin_state == constants.ADMINST_UP: self.feedback_fn("* checking disk consistency between source and target") for dev in instance.disks: # for drbd, these are drbd over lvm @@ -7696,7 +8084,7 @@ class TLMigrateInstance(Tasklet): self.cfg.Update(instance, self.feedback_fn) # Only start the instance if it's marked as up - if instance.admin_up: + if instance.admin_state == constants.ADMINST_UP: self.feedback_fn("* activating the instance's disks on target node %s" % target_node) logging.info("Starting instance %s on node %s", @@ -7832,24 +8220,104 @@ def _GenerateUniqueNames(lu, exts): return results +def _ComputeLDParams(disk_template, disk_params): + """Computes Logical Disk parameters from Disk Template parameters. + + @type disk_template: string + @param disk_template: disk template, one of L{constants.DISK_TEMPLATES} + @type disk_params: dict + @param disk_params: disk template parameters; dict(template_name -> parameters + @rtype: list(dict) + @return: a list of dicts, one for each node of the disk hierarchy. Each dict + contains the LD parameters of the node. The tree is flattened in-order. + + """ + if disk_template not in constants.DISK_TEMPLATES: + raise errors.ProgrammerError("Unknown disk template %s" % disk_template) + + result = list() + dt_params = disk_params[disk_template] + if disk_template == constants.DT_DRBD8: + drbd_params = { + constants.LDP_RESYNC_RATE: dt_params[constants.DRBD_RESYNC_RATE], + constants.LDP_BARRIERS: dt_params[constants.DRBD_DISK_BARRIERS], + constants.LDP_NO_META_FLUSH: dt_params[constants.DRBD_META_BARRIERS], + constants.LDP_DEFAULT_METAVG: dt_params[constants.DRBD_DEFAULT_METAVG], + constants.LDP_DISK_CUSTOM: dt_params[constants.DRBD_DISK_CUSTOM], + constants.LDP_NET_CUSTOM: dt_params[constants.DRBD_NET_CUSTOM], + constants.LDP_DYNAMIC_RESYNC: dt_params[constants.DRBD_DYNAMIC_RESYNC], + constants.LDP_PLAN_AHEAD: dt_params[constants.DRBD_PLAN_AHEAD], + constants.LDP_FILL_TARGET: dt_params[constants.DRBD_FILL_TARGET], + constants.LDP_DELAY_TARGET: dt_params[constants.DRBD_DELAY_TARGET], + constants.LDP_MAX_RATE: dt_params[constants.DRBD_MAX_RATE], + constants.LDP_MIN_RATE: dt_params[constants.DRBD_MIN_RATE], + } + + drbd_params = \ + objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_DRBD8], + drbd_params) + + result.append(drbd_params) + + # data LV + data_params = { + constants.LDP_STRIPES: dt_params[constants.DRBD_DATA_STRIPES], + } + data_params = \ + objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV], + data_params) + result.append(data_params) + + # metadata LV + meta_params = { + constants.LDP_STRIPES: dt_params[constants.DRBD_META_STRIPES], + } + meta_params = \ + objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV], + meta_params) + result.append(meta_params) + + elif (disk_template == constants.DT_FILE or + disk_template == constants.DT_SHARED_FILE): + result.append(constants.DISK_LD_DEFAULTS[constants.LD_FILE]) + + elif disk_template == constants.DT_PLAIN: + params = { + constants.LDP_STRIPES: dt_params[constants.LV_STRIPES], + } + params = \ + objects.FillDict(constants.DISK_LD_DEFAULTS[constants.LD_LV], + params) + result.append(params) + + elif disk_template == constants.DT_BLOCK: + result.append(constants.DISK_LD_DEFAULTS[constants.LD_BLOCKDEV]) + + return result + + def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, - iv_name, p_minor, s_minor): + iv_name, p_minor, s_minor, drbd_params, data_params, + meta_params): """Generate a drbd8 device complete with its children. """ assert len(vgnames) == len(names) == 2 port = lu.cfg.AllocatePort() shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) + dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, - logical_id=(vgnames[0], names[0])) + logical_id=(vgnames[0], names[0]), + params=data_params) dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE, - logical_id=(vgnames[1], names[1])) + logical_id=(vgnames[1], names[1]), + params=meta_params) drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size, logical_id=(primary, secondary, port, p_minor, s_minor, shared_secret), children=[dev_data, dev_meta], - iv_name=iv_name) + iv_name=iv_name, params=drbd_params) return drbd_dev @@ -7857,7 +8325,7 @@ def _GenerateDiskTemplate(lu, template_name, instance_name, primary_node, secondary_nodes, disk_info, file_storage_dir, file_driver, - base_index, feedback_fn): + base_index, feedback_fn, disk_params): """Generate the entire disk layout for a given template type. """ @@ -7866,6 +8334,7 @@ def _GenerateDiskTemplate(lu, template_name, vgname = lu.cfg.GetVGName() disk_count = len(disk_info) disks = [] + ld_params = _ComputeLDParams(template_name, disk_params) if template_name == constants.DT_DISKLESS: pass elif template_name == constants.DT_PLAIN: @@ -7882,9 +8351,11 @@ def _GenerateDiskTemplate(lu, template_name, size=disk[constants.IDISK_SIZE], logical_id=(vg, names[idx]), iv_name="disk/%d" % disk_index, - mode=disk[constants.IDISK_MODE]) + mode=disk[constants.IDISK_MODE], + params=ld_params[0]) disks.append(disk_dev) elif template_name == constants.DT_DRBD8: + drbd_params, data_params, meta_params = ld_params if len(secondary_nodes) != 1: raise errors.ProgrammerError("Wrong template configuration") remote_node = secondary_nodes[0] @@ -7898,14 +8369,16 @@ def _GenerateDiskTemplate(lu, template_name, names.append(lv_prefix + "_meta") for idx, disk in enumerate(disk_info): disk_index = idx + base_index + drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG] data_vg = disk.get(constants.IDISK_VG, vgname) - meta_vg = disk.get(constants.IDISK_METAVG, data_vg) + meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg) disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node, disk[constants.IDISK_SIZE], [data_vg, meta_vg], names[idx * 2:idx * 2 + 2], "disk/%d" % disk_index, - minors[idx * 2], minors[idx * 2 + 1]) + minors[idx * 2], minors[idx * 2 + 1], + drbd_params, data_params, meta_params) disk_dev.mode = disk[constants.IDISK_MODE] disks.append(disk_dev) elif template_name == constants.DT_FILE: @@ -7922,7 +8395,8 @@ def _GenerateDiskTemplate(lu, template_name, logical_id=(file_driver, "%s/disk%d" % (file_storage_dir, disk_index)), - mode=disk[constants.IDISK_MODE]) + mode=disk[constants.IDISK_MODE], + params=ld_params[0]) disks.append(disk_dev) elif template_name == constants.DT_SHARED_FILE: if len(secondary_nodes) != 0: @@ -7938,7 +8412,8 @@ def _GenerateDiskTemplate(lu, template_name, logical_id=(file_driver, "%s/disk%d" % (file_storage_dir, disk_index)), - mode=disk[constants.IDISK_MODE]) + mode=disk[constants.IDISK_MODE], + params=ld_params[0]) disks.append(disk_dev) elif template_name == constants.DT_BLOCK: if len(secondary_nodes) != 0: @@ -7951,7 +8426,8 @@ def _GenerateDiskTemplate(lu, template_name, logical_id=(constants.BLOCKDEV_DRIVER_MANUAL, disk[constants.IDISK_ADOPT]), iv_name="disk/%d" % disk_index, - mode=disk[constants.IDISK_MODE]) + mode=disk[constants.IDISK_MODE], + params=ld_params[0]) disks.append(disk_dev) else: @@ -8127,6 +8603,11 @@ def _RemoveDisks(lu, instance, target_node=None): " continuing anyway: %s", device.iv_name, node, msg) all_result = False + # if this is a DRBD disk, return its port to the pool + if device.dev_type in constants.LDS_DRBD: + tcp_port = device.logical_id[2] + lu.cfg.AddTcpUdpPort(tcp_port) + if instance.disk_template == constants.DT_FILE: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) if target_node: @@ -8453,7 +8934,11 @@ class LUInstanceCreate(LogicalUnit): self.add_locks[locking.LEVEL_INSTANCE] = instance_name if self.op.iallocator: + # TODO: Find a solution to not lock all nodes in the cluster, e.g. by + # specifying a group on instance creation and then selecting nodes from + # that group self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET else: self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode) nodelist = [self.op.pnode] @@ -8461,6 +8946,9 @@ class LUInstanceCreate(LogicalUnit): self.op.snode = _ExpandNodeName(self.cfg, self.op.snode) nodelist.append(self.op.snode) self.needed_locks[locking.LEVEL_NODE] = nodelist + # Lock resources of instance's primary and secondary nodes (copy to + # prevent accidential modification) + self.needed_locks[locking.LEVEL_NODE_RES] = list(nodelist) # in case of import lock the source node too if self.op.mode == constants.INSTANCE_IMPORT: @@ -8497,7 +8985,7 @@ class LUInstanceCreate(LogicalUnit): tags=self.op.tags, os=self.op.os_type, vcpus=self.be_full[constants.BE_VCPUS], - memory=self.be_full[constants.BE_MEMORY], + memory=self.be_full[constants.BE_MAXMEM], disks=self.disks, nics=nics, hypervisor=self.op.hypervisor, @@ -8542,7 +9030,8 @@ class LUInstanceCreate(LogicalUnit): secondary_nodes=self.secondaries, status=self.op.start, os_type=self.op.os_type, - memory=self.be_full[constants.BE_MEMORY], + minmem=self.be_full[constants.BE_MINMEM], + maxmem=self.be_full[constants.BE_MAXMEM], vcpus=self.be_full[constants.BE_VCPUS], nics=_NICListToTuple(self, self.nics), disk_template=self.op.disk_template, @@ -8678,6 +9167,12 @@ class LUInstanceCreate(LogicalUnit): for name, value in einfo.items(constants.INISECT_BEP): if name not in self.op.beparams: self.op.beparams[name] = value + # Compatibility for the old "memory" be param + if name == constants.BE_MEMORY: + if constants.BE_MAXMEM not in self.op.beparams: + self.op.beparams[constants.BE_MAXMEM] = value + if constants.BE_MINMEM not in self.op.beparams: + self.op.beparams[constants.BE_MINMEM] = value else: # try to read the parameters old style, from the main section for name in constants.BES_PARAMETERS: @@ -8791,6 +9286,7 @@ class LUInstanceCreate(LogicalUnit): for param, value in self.op.beparams.iteritems(): if value == constants.VALUE_AUTO: self.op.beparams[param] = default_beparams[param] + objects.UpgradeBeParams(self.op.beparams) utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) self.be_full = cluster.SimpleFillBE(self.op.beparams) @@ -8885,8 +9381,9 @@ class LUInstanceCreate(LogicalUnit): constants.IDISK_SIZE: size, constants.IDISK_MODE: mode, constants.IDISK_VG: data_vg, - constants.IDISK_METAVG: disk.get(constants.IDISK_METAVG, data_vg), } + if constants.IDISK_METAVG in disk: + new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG] if constants.IDISK_ADOPT in disk: new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT] self.disks.append(new_disk) @@ -8938,6 +9435,11 @@ class LUInstanceCreate(LogicalUnit): if self.op.iallocator is not None: self._RunAllocator() + # Release all unneeded node locks + _ReleaseLocks(self, locking.LEVEL_NODE, + keep=filter(None, [self.op.pnode, self.op.snode, + self.op.src_node])) + #### node related checks # check primary node @@ -8966,8 +9468,19 @@ class LUInstanceCreate(LogicalUnit): _CheckNodeVmCapable(self, self.op.snode) self.secondaries.append(self.op.snode) + snode = self.cfg.GetNodeInfo(self.op.snode) + if pnode.group != snode.group: + self.LogWarning("The primary and secondary nodes are in two" + " different node groups; the disk parameters" + " from the first disk's node group will be" + " used") + nodenames = [pnode.name] + self.secondaries + # disk parameters (not customizable at instance or node level) + # just use the primary node parameters, ignoring the secondary. + self.diskparams = self.cfg.GetNodeGroup(pnode.group).diskparams + if not self.adopt_disks: # Check lv size requirements, if not adopting req_sizes = _ComputeDiskSizePerVG(self.op.disk_template, self.disks) @@ -9052,10 +9565,11 @@ class LUInstanceCreate(LogicalUnit): _CheckNicsBridgesExist(self, self.nics, self.pnode.name) # memory check on primary node + #TODO(dynmem): use MINMEM for checking if self.op.start: _CheckNodeFreeMemory(self, self.pnode.name, "creating instance %s" % self.op.instance_name, - self.be_full[constants.BE_MEMORY], + self.be_full[constants.BE_MAXMEM], self.op.hypervisor) self.dry_run_result = list(nodenames) @@ -9067,6 +9581,10 @@ class LUInstanceCreate(LogicalUnit): instance = self.op.instance_name pnode_name = self.pnode.name + assert not (self.owned_locks(locking.LEVEL_NODE_RES) - + self.owned_locks(locking.LEVEL_NODE)), \ + "Node locks differ from node resource locks" + ht_kind = self.op.hypervisor if ht_kind in constants.HTS_REQ_PORT: network_port = self.cfg.AllocatePort() @@ -9081,13 +9599,14 @@ class LUInstanceCreate(LogicalUnit): self.instance_file_storage_dir, self.op.file_driver, 0, - feedback_fn) + feedback_fn, + self.diskparams) iobj = objects.Instance(name=instance, os=self.op.os_type, primary_node=pnode_name, nics=self.nics, disks=disks, disk_template=self.op.disk_template, - admin_up=False, + admin_state=constants.ADMINST_DOWN, network_port=network_port, beparams=self.op.beparams, hvparams=self.op.hvparams, @@ -9169,6 +9688,9 @@ class LUInstanceCreate(LogicalUnit): raise errors.OpExecError("There are some degraded disks for" " this instance") + # Release all node resource locks + _ReleaseLocks(self, locking.LEVEL_NODE_RES) + if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks: if self.op.mode == constants.INSTANCE_CREATE: if not self.op.no_install: @@ -9261,8 +9783,10 @@ class LUInstanceCreate(LogicalUnit): raise errors.ProgrammerError("Unknown OS initialization mode '%s'" % self.op.mode) + assert not self.owned_locks(locking.LEVEL_NODE_RES) + if self.op.start: - iobj.admin_up = True + iobj.admin_state = constants.ADMINST_UP self.cfg.Update(iobj, feedback_fn) logging.info("Starting instance %s on node %s", instance, pnode_name) feedback_fn("* starting instance...") @@ -9284,6 +9808,7 @@ class LUInstanceConsole(NoHooksLU): REQ_BGL = False def ExpandNames(self): + self.share_locks = _ShareAll() self._ExpandAndLockInstance() def CheckPrereq(self): @@ -9309,10 +9834,12 @@ class LUInstanceConsole(NoHooksLU): node_insts.Raise("Can't get node information from %s" % node) if instance.name not in node_insts.payload: - if instance.admin_up: + if instance.admin_state == constants.ADMINST_UP: state = constants.INSTST_ERRORDOWN - else: + elif instance.admin_state == constants.ADMINST_DOWN: state = constants.INSTST_ADMINDOWN + else: + state = constants.INSTST_ADMINOFFLINE raise errors.OpExecError("Instance %s is not running (state %s)" % (instance.name, state)) @@ -9358,6 +9885,7 @@ class LUInstanceReplaceDisks(LogicalUnit): self._ExpandAndLockInstance() assert locking.LEVEL_NODE not in self.needed_locks + assert locking.LEVEL_NODE_RES not in self.needed_locks assert locking.LEVEL_NODEGROUP not in self.needed_locks assert self.op.iallocator is None or self.op.remote_node is None, \ @@ -9380,6 +9908,8 @@ class LUInstanceReplaceDisks(LogicalUnit): # iallocator will select a new node in the same group self.needed_locks[locking.LEVEL_NODEGROUP] = [] + self.needed_locks[locking.LEVEL_NODE_RES] = [] + self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode, self.op.iallocator, self.op.remote_node, self.op.disks, False, self.op.early_release) @@ -9393,6 +9923,8 @@ class LUInstanceReplaceDisks(LogicalUnit): assert not self.needed_locks[locking.LEVEL_NODEGROUP] self.share_locks[locking.LEVEL_NODEGROUP] = 1 + # Lock all groups used by instance optimistically; this requires going + # via the node before it's locked, requiring verification later on self.needed_locks[locking.LEVEL_NODEGROUP] = \ self.cfg.GetInstanceNodeGroups(self.op.instance_name) @@ -9407,6 +9939,10 @@ class LUInstanceReplaceDisks(LogicalUnit): for node_name in self.cfg.GetNodeGroup(group_uuid).members] else: self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Reuse node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE] def BuildHooksEnv(self): """Build hooks env. @@ -9443,6 +9979,7 @@ class LUInstanceReplaceDisks(LogicalUnit): assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or self.op.iallocator is None) + # Verify if node group locks are still correct owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) if owned_groups: _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups) @@ -9693,6 +10230,16 @@ class TLReplaceDisks(Tasklet): if not self.disks: self.disks = range(len(self.instance.disks)) + # TODO: compute disk parameters + primary_node_info = self.cfg.GetNodeInfo(instance.primary_node) + secondary_node_info = self.cfg.GetNodeInfo(secondary_node) + if primary_node_info.group != secondary_node_info.group: + self.lu.LogInfo("The instance primary and secondary nodes are in two" + " different node groups; the disk parameters of the" + " primary node's group will be applied.") + + self.diskparams = self.cfg.GetNodeGroup(primary_node_info.group).diskparams + for node in check_nodes: _CheckNodeOnline(self.lu, node) @@ -9701,8 +10248,9 @@ class TLReplaceDisks(Tasklet): self.target_node] if node_name is not None) - # Release unneeded node locks + # Release unneeded node and node resource locks _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes) + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes) # Release any owned node group if self.lu.glm.is_owned(locking.LEVEL_NODEGROUP): @@ -9731,6 +10279,8 @@ class TLReplaceDisks(Tasklet): assert set(owned_nodes) == set(self.node_secondary_ip), \ ("Incorrect node locks, owning %s, expected %s" % (owned_nodes, self.node_secondary_ip.keys())) + assert (self.lu.owned_locks(locking.LEVEL_NODE) == + self.lu.owned_locks(locking.LEVEL_NODE_RES)) owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE) assert list(owned_instances) == [self.instance_name], \ @@ -9746,7 +10296,7 @@ class TLReplaceDisks(Tasklet): feedback_fn("Replacing disk(s) %s for %s" % (utils.CommaJoin(self.disks), self.instance.name)) - activate_disks = (not self.instance.admin_up) + activate_disks = (self.instance.admin_state != constants.ADMINST_UP) # Activate the instance disks if we're replacing them on a down instance if activate_disks: @@ -9766,9 +10316,11 @@ class TLReplaceDisks(Tasklet): if activate_disks: _SafeShutdownInstanceDisks(self.lu, self.instance) + assert not self.lu.owned_locks(locking.LEVEL_NODE) + if __debug__: # Verify owned locks - owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE) + owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES) nodes = frozenset(self.node_secondary_ip) assert ((self.early_release and not owned_nodes) or (not self.early_release and not (set(owned_nodes) - nodes))), \ @@ -9847,12 +10399,14 @@ class TLReplaceDisks(Tasklet): lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]] names = _GenerateUniqueNames(self.lu, lv_names) + _, data_p, meta_p = _ComputeLDParams(constants.DT_DRBD8, self.diskparams) + vg_data = dev.children[0].logical_id[0] lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size, - logical_id=(vg_data, names[0])) + logical_id=(vg_data, names[0]), params=data_p) vg_meta = dev.children[1].logical_id[0] lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE, - logical_id=(vg_meta, names[1])) + logical_id=(vg_meta, names[1]), params=meta_p) new_lvs = [lv_data, lv_meta] old_lvs = [child.Copy() for child in dev.children] @@ -10003,21 +10557,28 @@ class TLReplaceDisks(Tasklet): "volumes")) raise errors.OpExecError("Can't add local storage to drbd: %s" % msg) - cstep = 5 + cstep = itertools.count(5) + if self.early_release: - self.lu.LogStep(cstep, steps_total, "Removing old storage") - cstep += 1 + self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") self._RemoveOldStorage(self.target_node, iv_names) - # WARNING: we release both node locks here, do not do other RPCs - # than WaitForSync to the primary node - _ReleaseLocks(self.lu, locking.LEVEL_NODE, - names=[self.target_node, self.other_node]) + # TODO: Check if releasing locks early still makes sense + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) + else: + # Release all resource locks except those used by the instance + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, + keep=self.node_secondary_ip.keys()) + + # Release all node locks while waiting for sync + _ReleaseLocks(self.lu, locking.LEVEL_NODE) + + # TODO: Can the instance lock be downgraded here? Take the optional disk + # shutdown in the caller into consideration. # Wait for sync # This can fail as the old devices are degraded and _WaitForSync # does a combined result over all disks, so we don't check its return value - self.lu.LogStep(cstep, steps_total, "Sync devices") - cstep += 1 + self.lu.LogStep(cstep.next(), steps_total, "Sync devices") _WaitForSync(self.lu, self.instance) # Check all devices manually @@ -10025,8 +10586,7 @@ class TLReplaceDisks(Tasklet): # Step: remove old storage if not self.early_release: - self.lu.LogStep(cstep, steps_total, "Removing old storage") - cstep += 1 + self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") self._RemoveOldStorage(self.target_node, iv_names) def _ExecDrbd8Secondary(self, feedback_fn): @@ -10103,10 +10663,12 @@ class TLReplaceDisks(Tasklet): iv_names[idx] = (dev, dev.children, new_net_id) logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor, new_net_id) + drbd_params, _, _ = _ComputeLDParams(constants.DT_DRBD8, self.diskparams) new_drbd = objects.Disk(dev_type=constants.LD_DRBD8, logical_id=new_alone_id, children=dev.children, - size=dev.size) + size=dev.size, + params=drbd_params) try: _CreateSingleBlockDev(self.lu, self.new_node, self.instance, new_drbd, _GetInstanceInfoText(self.instance), False) @@ -10145,6 +10707,9 @@ class TLReplaceDisks(Tasklet): self.cfg.Update(self.instance, feedback_fn) + # Release all node locks (the configuration has been updated) + _ReleaseLocks(self.lu, locking.LEVEL_NODE) + # and now perform the drbd attach self.lu.LogInfo("Attaching primary drbds to new secondary" " (standalone => connected)") @@ -10161,23 +10726,26 @@ class TLReplaceDisks(Tasklet): to_node, msg, hint=("please do a gnt-instance info to see the" " status of disks")) - cstep = 5 + + cstep = itertools.count(5) + if self.early_release: - self.lu.LogStep(cstep, steps_total, "Removing old storage") - cstep += 1 + self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") self._RemoveOldStorage(self.target_node, iv_names) - # WARNING: we release all node locks here, do not do other RPCs - # than WaitForSync to the primary node - _ReleaseLocks(self.lu, locking.LEVEL_NODE, - names=[self.instance.primary_node, - self.target_node, - self.new_node]) + # TODO: Check if releasing locks early still makes sense + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES) + else: + # Release all resource locks except those used by the instance + _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, + keep=self.node_secondary_ip.keys()) + + # TODO: Can the instance lock be downgraded here? Take the optional disk + # shutdown in the caller into consideration. # Wait for sync # This can fail as the old devices are degraded and _WaitForSync # does a combined result over all disks, so we don't check its return value - self.lu.LogStep(cstep, steps_total, "Sync devices") - cstep += 1 + self.lu.LogStep(cstep.next(), steps_total, "Sync devices") _WaitForSync(self.lu, self.instance) # Check all devices manually @@ -10185,7 +10753,7 @@ class TLReplaceDisks(Tasklet): # Step: remove old storage if not self.early_release: - self.lu.LogStep(cstep, steps_total, "Removing old storage") + self.lu.LogStep(cstep.next(), steps_total, "Removing old storage") self._RemoveOldStorage(self.target_node, iv_names) @@ -10231,7 +10799,7 @@ class LURepairNodeStorage(NoHooksLU): """ # Check whether any instance on this node has faulty disks for inst in _GetNodeInstances(self.cfg, self.op.node_name): - if not inst.admin_up: + if inst.admin_state != constants.ADMINST_UP: continue check_nodes = set(inst.all_nodes) check_nodes.discard(self.op.node_name) @@ -10257,6 +10825,15 @@ class LUNodeEvacuate(NoHooksLU): """ REQ_BGL = False + _MODE2IALLOCATOR = { + constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI, + constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC, + constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL, + } + assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES + assert (frozenset(_MODE2IALLOCATOR.values()) == + constants.IALLOCATOR_NEVAC_MODES) + def CheckArguments(self): _CheckIAllocatorOrNode(self, "iallocator", "remote_node") @@ -10271,7 +10848,7 @@ class LUNodeEvacuate(NoHooksLU): raise errors.OpPrereqError("Can not use evacuated node as a new" " secondary node", errors.ECODE_INVAL) - if self.op.mode != constants.IALLOCATOR_NEVAC_SEC: + if self.op.mode != constants.NODE_EVAC_SEC: raise errors.OpPrereqError("Without the use of an iallocator only" " secondary instances can be evacuated", errors.ECODE_INVAL) @@ -10284,6 +10861,14 @@ class LUNodeEvacuate(NoHooksLU): locking.LEVEL_NODE: [], } + # Determine nodes (via group) optimistically, needs verification once locks + # have been acquired + self.lock_nodes = self._DetermineNodes() + + def _DetermineNodes(self): + """Gets the list of nodes to operate on. + + """ if self.op.remote_node is None: # Iallocator will choose any node(s) in the same group group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name]) @@ -10291,26 +10876,34 @@ class LUNodeEvacuate(NoHooksLU): group_nodes = frozenset([self.op.remote_node]) # Determine nodes to be locked - self.lock_nodes = set([self.op.node_name]) | group_nodes + return set([self.op.node_name]) | group_nodes def _DetermineInstances(self): """Builds list of instances to operate on. """ - assert self.op.mode in constants.IALLOCATOR_NEVAC_MODES + assert self.op.mode in constants.NODE_EVAC_MODES - if self.op.mode == constants.IALLOCATOR_NEVAC_PRI: + if self.op.mode == constants.NODE_EVAC_PRI: # Primary instances only inst_fn = _GetNodePrimaryInstances assert self.op.remote_node is None, \ "Evacuating primary instances requires iallocator" - elif self.op.mode == constants.IALLOCATOR_NEVAC_SEC: + elif self.op.mode == constants.NODE_EVAC_SEC: # Secondary instances only inst_fn = _GetNodeSecondaryInstances else: # All instances - assert self.op.mode == constants.IALLOCATOR_NEVAC_ALL + assert self.op.mode == constants.NODE_EVAC_ALL inst_fn = _GetNodeInstances + # TODO: In 2.6, change the iallocator interface to take an evacuation mode + # per instance + raise errors.OpPrereqError("Due to an issue with the iallocator" + " interface it is not possible to evacuate" + " all instances at once; specify explicitly" + " whether to evacuate primary or secondary" + " instances", + errors.ECODE_INVAL) return inst_fn(self.cfg, self.op.node_name) @@ -10322,8 +10915,8 @@ class LUNodeEvacuate(NoHooksLU): set(i.name for i in self._DetermineInstances()) elif level == locking.LEVEL_NODEGROUP: - # Lock node groups optimistically, needs verification once nodes have - # been acquired + # Lock node groups for all potential target nodes optimistically, needs + # verification once nodes have been acquired self.needed_locks[locking.LEVEL_NODEGROUP] = \ self.cfg.GetNodeGroupsFromNodes(self.lock_nodes) @@ -10336,12 +10929,23 @@ class LUNodeEvacuate(NoHooksLU): owned_nodes = self.owned_locks(locking.LEVEL_NODE) owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP) - assert owned_nodes == self.lock_nodes + need_nodes = self._DetermineNodes() + + if not owned_nodes.issuperset(need_nodes): + raise errors.OpPrereqError("Nodes in same group as '%s' changed since" + " locks were acquired, current nodes are" + " are '%s', used to be '%s'; retry the" + " operation" % + (self.op.node_name, + utils.CommaJoin(need_nodes), + utils.CommaJoin(owned_nodes)), + errors.ECODE_STATE) wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes) if owned_groups != wanted_groups: raise errors.OpExecError("Node groups changed since locks were acquired," - " current groups are '%s', used to be '%s'" % + " current groups are '%s', used to be '%s';" + " retry the operation" % (utils.CommaJoin(wanted_groups), utils.CommaJoin(owned_groups))) @@ -10352,7 +10956,7 @@ class LUNodeEvacuate(NoHooksLU): if set(self.instance_names) != owned_instances: raise errors.OpExecError("Instances on node '%s' changed since locks" " were acquired, current instances are '%s'," - " used to be '%s'" % + " used to be '%s'; retry the operation" % (self.op.node_name, utils.CommaJoin(self.instance_names), utils.CommaJoin(owned_instances))) @@ -10384,7 +10988,7 @@ class LUNodeEvacuate(NoHooksLU): elif self.op.iallocator is not None: # TODO: Implement relocation to other group ial = IAllocator(self.cfg, self.rpc, constants.IALLOCATOR_MODE_NODE_EVAC, - evac_mode=self.op.mode, + evac_mode=self._MODE2IALLOCATOR[self.op.mode], instances=list(self.instance_names)) ial.Run(self.op.iallocator) @@ -10398,7 +11002,7 @@ class LUNodeEvacuate(NoHooksLU): jobs = _LoadNodeEvacResult(self, ial.result, self.op.early_release, True) elif self.op.remote_node is not None: - assert self.op.mode == constants.IALLOCATOR_NEVAC_SEC + assert self.op.mode == constants.NODE_EVAC_SEC jobs = [ [opcodes.OpInstanceReplaceDisks(instance_name=instance_name, remote_node=self.op.remote_node, @@ -10455,9 +11059,10 @@ def _LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes): (moved, failed, jobs) = alloc_result if failed: - lu.LogWarning("Unable to evacuate instances %s", - utils.CommaJoin("%s (%s)" % (name, reason) - for (name, reason) in failed)) + failreason = utils.CommaJoin("%s (%s)" % (name, reason) + for (name, reason) in failed) + lu.LogWarning("Unable to evacuate instances %s", failreason) + raise errors.OpExecError("Unable to evacuate instances %s" % failreason) if moved: lu.LogInfo("Instances to be moved: %s", @@ -10481,11 +11086,16 @@ class LUInstanceGrowDisk(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.needed_locks[locking.LEVEL_NODE_RES] = [] + self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE def DeclareLocks(self, level): if level == locking.LEVEL_NODE: self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -10542,10 +11152,18 @@ class LUInstanceGrowDisk(LogicalUnit): instance = self.instance disk = self.disk + assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)) + disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk]) if not disks_ok: raise errors.OpExecError("Cannot activate block device to grow") + feedback_fn("Growing disk %s of instance '%s' by %s" % + (self.op.disk, instance.name, + utils.FormatUnit(self.op.amount, "h"))) + # First run all grow ops in dry-run mode for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) @@ -10568,18 +11186,28 @@ class LUInstanceGrowDisk(LogicalUnit): disk.RecordGrow(self.op.amount) self.cfg.Update(instance, feedback_fn) + + # Changes have been recorded, release node lock + _ReleaseLocks(self, locking.LEVEL_NODE) + + # Downgrade lock while waiting for sync + self.glm.downgrade(locking.LEVEL_INSTANCE) + if self.op.wait_for_sync: disk_abort = not _WaitForSync(self, instance, disks=[disk]) if disk_abort: self.proc.LogWarning("Disk sync-ing has not returned a good" " status; please check the instance") - if not instance.admin_up: + if instance.admin_state != constants.ADMINST_UP: _SafeShutdownInstanceDisks(self, instance, disks=[disk]) - elif not instance.admin_up: + elif instance.admin_state != constants.ADMINST_UP: self.proc.LogWarning("Not shutting down the disk even if the instance is" " not supposed to be running because no wait for" " sync mode was requested") + assert self.owned_locks(locking.LEVEL_NODE_RES) + assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) + class LUInstanceQueryData(NoHooksLU): """Query runtime instance data. @@ -10711,19 +11339,17 @@ class LUInstanceQueryData(NoHooksLU): if remote_info and "state" in remote_info: remote_state = "up" else: - remote_state = "down" - - if instance.admin_up: - config_state = "up" - else: - config_state = "down" + if instance.admin_state == constants.ADMINST_UP: + remote_state = "down" + else: + remote_state = instance.admin_state disks = map(compat.partial(self._ComputeDiskStatus, instance, None), instance.disks) result[instance.name] = { "name": instance.name, - "config_state": config_state, + "config_state": instance.admin_state, "run_state": remote_state, "pnode": instance.primary_node, "snodes": instance.secondary_nodes, @@ -10759,7 +11385,8 @@ class LUInstanceSetParams(LogicalUnit): def CheckArguments(self): if not (self.op.nics or self.op.disks or self.op.disk_template or - self.op.hvparams or self.op.beparams or self.op.os_name): + self.op.hvparams or self.op.beparams or self.op.os_name or + self.op.online_inst or self.op.offline_inst): raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL) if self.op.hvparams: @@ -10875,7 +11502,10 @@ class LUInstanceSetParams(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() + # Can't even acquire node locks in shared mode as upcoming changes in + # Ganeti 2.6 will start to modify the node object on disk conversion self.needed_locks[locking.LEVEL_NODE] = [] + self.needed_locks[locking.LEVEL_NODE_RES] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): @@ -10884,6 +11514,10 @@ class LUInstanceSetParams(LogicalUnit): if self.op.disk_template and self.op.remote_node: self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node) self.needed_locks[locking.LEVEL_NODE].append(self.op.remote_node) + elif level == locking.LEVEL_NODE_RES and self.op.disk_template: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -10892,8 +11526,10 @@ class LUInstanceSetParams(LogicalUnit): """ args = dict() - if constants.BE_MEMORY in self.be_new: - args["memory"] = self.be_new[constants.BE_MEMORY] + if constants.BE_MINMEM in self.be_new: + args["minmem"] = self.be_new[constants.BE_MINMEM] + if constants.BE_MAXMEM in self.be_new: + args["maxmem"] = self.be_new[constants.BE_MAXMEM] if constants.BE_VCPUS in self.be_new: args["vcpus"] = self.be_new[constants.BE_VCPUS] # TODO: export disk changes. Note: _BuildInstanceHookEnv* don't export disk @@ -10958,6 +11594,8 @@ class LUInstanceSetParams(LogicalUnit): "Cannot retrieve locked instance %s" % self.op.instance_name pnode = instance.primary_node nodelist = list(instance.all_nodes) + pnode_info = self.cfg.GetNodeInfo(pnode) + self.diskparams = self.cfg.GetNodeGroup(pnode_info.group).diskparams # OS change if self.op.os_name and not self.op.force: @@ -10978,7 +11616,8 @@ class LUInstanceSetParams(LogicalUnit): " %s to %s" % (instance.disk_template, self.op.disk_template), errors.ECODE_INVAL) - _CheckInstanceDown(self, instance, "cannot change disk template") + _CheckInstanceState(self, instance, INSTANCE_DOWN, + msg="cannot change disk template") if self.op.disk_template in constants.DTS_INT_MIRROR: if self.op.remote_node == pnode: raise errors.OpPrereqError("Given new secondary node %s is the same" @@ -10994,6 +11633,13 @@ class LUInstanceSetParams(LogicalUnit): required = _ComputeDiskSizePerVG(self.op.disk_template, disks) _CheckNodesFreeDiskPerVG(self, [self.op.remote_node], required) + snode_info = self.cfg.GetNodeInfo(self.op.remote_node) + if pnode_info.group != snode_info.group: + self.LogWarning("The primary and secondary nodes are in two" + " different node groups; the disk parameters" + " from the first disk's node group will be" + " used") + # hvparams processing if self.op.hvparams: hv_type = instance.hypervisor @@ -11015,6 +11661,7 @@ class LUInstanceSetParams(LogicalUnit): if self.op.beparams: i_bedict = _GetUpdatedParams(instance.beparams, self.op.beparams, use_none=True) + objects.UpgradeBeParams(i_bedict) utils.ForceDictType(i_bedict, constants.BES_PARAMETER_TYPES) be_new = cluster.SimpleFillBE(i_bedict) self.be_proposed = self.be_new = be_new # the new actual values @@ -11061,8 +11708,9 @@ class LUInstanceSetParams(LogicalUnit): self.warn = [] - if (constants.BE_MEMORY in self.op.beparams and not self.op.force and - be_new[constants.BE_MEMORY] > be_old[constants.BE_MEMORY]): + #TODO(dynmem): do the appropriate check involving MINMEM + if (constants.BE_MAXMEM in self.op.beparams and not self.op.force and + be_new[constants.BE_MAXMEM] > be_old[constants.BE_MAXMEM]): mem_check_list = [pnode] if be_new[constants.BE_AUTO_BALANCE]: # either we changed auto_balance to yes or it was from before @@ -11070,34 +11718,39 @@ class LUInstanceSetParams(LogicalUnit): instance_info = self.rpc.call_instance_info(pnode, instance.name, instance.hypervisor) nodeinfo = self.rpc.call_node_info(mem_check_list, None, - instance.hypervisor) + [instance.hypervisor]) pninfo = nodeinfo[pnode] msg = pninfo.fail_msg if msg: # Assume the primary node is unreachable and go ahead self.warn.append("Can't get info from primary node %s: %s" % (pnode, msg)) - elif not isinstance(pninfo.payload.get("memory_free", None), int): - self.warn.append("Node data from primary node %s doesn't contain" - " free memory information" % pnode) - elif instance_info.fail_msg: - self.warn.append("Can't get instance runtime information: %s" % - instance_info.fail_msg) else: - if instance_info.payload: - current_mem = int(instance_info.payload["memory"]) + (_, _, (pnhvinfo, )) = pninfo.payload + if not isinstance(pnhvinfo.get("memory_free", None), int): + self.warn.append("Node data from primary node %s doesn't contain" + " free memory information" % pnode) + elif instance_info.fail_msg: + self.warn.append("Can't get instance runtime information: %s" % + instance_info.fail_msg) else: - # Assume instance not running - # (there is a slight race condition here, but it's not very probable, - # and we have no other way to check) - current_mem = 0 - miss_mem = (be_new[constants.BE_MEMORY] - current_mem - - pninfo.payload["memory_free"]) - if miss_mem > 0: - raise errors.OpPrereqError("This change will prevent the instance" - " from starting, due to %d MB of memory" - " missing on its primary node" % miss_mem, - errors.ECODE_NORES) + if instance_info.payload: + current_mem = int(instance_info.payload["memory"]) + else: + # Assume instance not running + # (there is a slight race condition here, but it's not very + # probable, and we have no other way to check) + # TODO: Describe race condition + current_mem = 0 + #TODO(dynmem): do the appropriate check involving MINMEM + miss_mem = (be_new[constants.BE_MAXMEM] - current_mem - + pnhvinfo["memory_free"]) + if miss_mem > 0: + raise errors.OpPrereqError("This change will prevent the instance" + " from starting, due to %d MB of memory" + " missing on its primary node" % + miss_mem, + errors.ECODE_NORES) if be_new[constants.BE_AUTO_BALANCE]: for node, nres in nodeinfo.items(): @@ -11105,11 +11758,13 @@ class LUInstanceSetParams(LogicalUnit): continue nres.Raise("Can't get info from secondary node %s" % node, prereq=True, ecode=errors.ECODE_STATE) - if not isinstance(nres.payload.get("memory_free", None), int): + (_, _, (nhvinfo, )) = nres.payload + if not isinstance(nhvinfo.get("memory_free", None), int): raise errors.OpPrereqError("Secondary node %s didn't return free" " memory information" % node, errors.ECODE_STATE) - elif be_new[constants.BE_MEMORY] > nres.payload["memory_free"]: + #TODO(dynmem): do the appropriate check involving MINMEM + elif be_new[constants.BE_MAXMEM] > nhvinfo["memory_free"]: raise errors.OpPrereqError("This change will prevent the instance" " from failover to its secondary node" " %s, due to not enough memory" % node, @@ -11202,7 +11857,8 @@ class LUInstanceSetParams(LogicalUnit): if len(instance.disks) == 1: raise errors.OpPrereqError("Cannot remove the last disk of" " an instance", errors.ECODE_INVAL) - _CheckInstanceDown(self, instance, "cannot remove disks") + _CheckInstanceState(self, instance, INSTANCE_DOWN, + msg="cannot remove disks") if (disk_op == constants.DDM_ADD and len(instance.disks) >= constants.MAX_DISKS): @@ -11217,7 +11873,15 @@ class LUInstanceSetParams(LogicalUnit): (disk_op, len(instance.disks)), errors.ECODE_INVAL) - return + # disabling the instance + if self.op.offline_inst: + _CheckInstanceState(self, instance, INSTANCE_DOWN, + msg="cannot change instance state to offline") + + # enabling the instance + if self.op.online_inst: + _CheckInstanceState(self, instance, INSTANCE_OFFLINE, + msg="cannot make instance go online") def _ConvertPlainToDrbd(self, feedback_fn): """Converts an instance from plain to drbd. @@ -11228,13 +11892,16 @@ class LUInstanceSetParams(LogicalUnit): pnode = instance.primary_node snode = self.op.remote_node + assert instance.disk_template == constants.DT_PLAIN + # create a fake disk info for _GenerateDiskTemplate disk_info = [{constants.IDISK_SIZE: d.size, constants.IDISK_MODE: d.mode, constants.IDISK_VG: d.logical_id[0]} for d in instance.disks] new_disks = _GenerateDiskTemplate(self, self.op.disk_template, instance.name, pnode, [snode], - disk_info, None, None, 0, feedback_fn) + disk_info, None, None, 0, feedback_fn, + self.diskparams) info = _GetInstanceInfoText(instance) feedback_fn("Creating aditional volumes...") # first, create the missing data and meta devices @@ -11264,6 +11931,9 @@ class LUInstanceSetParams(LogicalUnit): instance.disks = new_disks self.cfg.Update(instance, feedback_fn) + # Release node locks while waiting for sync + _ReleaseLocks(self, locking.LEVEL_NODE) + # disks are created, waiting for sync disk_abort = not _WaitForSync(self, instance, oneshot=not self.op.wait_for_sync) @@ -11271,12 +11941,17 @@ class LUInstanceSetParams(LogicalUnit): raise errors.OpExecError("There are some degraded disks for" " this instance, please cleanup manually") + # Node resource locks will be released by caller + def _ConvertDrbdToPlain(self, feedback_fn): """Converts an instance from drbd to plain. """ instance = self.instance + assert len(instance.secondary_nodes) == 1 + assert instance.disk_template == constants.DT_DRBD8 + pnode = instance.primary_node snode = instance.secondary_nodes[0] feedback_fn("Converting template to plain") @@ -11294,6 +11969,9 @@ class LUInstanceSetParams(LogicalUnit): instance.disk_template = constants.DT_PLAIN self.cfg.Update(instance, feedback_fn) + # Release locks in case removing disks takes a while + _ReleaseLocks(self, locking.LEVEL_NODE) + feedback_fn("Removing volumes on the secondary node...") for disk in old_disks: self.cfg.SetDiskID(disk, snode) @@ -11311,6 +11989,13 @@ class LUInstanceSetParams(LogicalUnit): self.LogWarning("Could not remove metadata for disk %d on node %s," " continuing anyway: %s", idx, pnode, msg) + # this is a DRBD disk, return its port to the pool + for disk in old_disks: + tcp_port = disk.logical_id[2] + self.cfg.AddTcpUdpPort(tcp_port) + + # Node resource locks will be released by caller + def Exec(self, feedback_fn): """Modifies an instance. @@ -11322,6 +12007,10 @@ class LUInstanceSetParams(LogicalUnit): for warn in self.warn: feedback_fn("WARNING: %s" % warn) + assert ((self.op.disk_template is None) ^ + bool(self.owned_locks(locking.LEVEL_NODE_RES))), \ + "Not owning any node resource locks" + result = [] instance = self.instance # disk changes @@ -11337,6 +12026,11 @@ class LUInstanceSetParams(LogicalUnit): self.LogWarning("Could not remove disk/%d on node %s: %s," " continuing anyway", device_idx, node, msg) result.append(("disk/%d" % device_idx, "remove")) + + # if this is a DRBD disk, return its port to the pool + if device.dev_type in constants.LDS_DRBD: + tcp_port = device.logical_id[2] + self.cfg.AddTcpUdpPort(tcp_port) elif disk_op == constants.DDM_ADD: # add a new disk if instance.disk_template in (constants.DT_FILE, @@ -11353,7 +12047,9 @@ class LUInstanceSetParams(LogicalUnit): [disk_dict], file_path, file_driver, - disk_idx_base, feedback_fn)[0] + disk_idx_base, + feedback_fn, + self.diskparams)[0] instance.disks.append(new_disk) info = _GetInstanceInfoText(instance) @@ -11379,6 +12075,16 @@ class LUInstanceSetParams(LogicalUnit): disk_dict[constants.IDISK_MODE])) if self.op.disk_template: + if __debug__: + check_nodes = set(instance.all_nodes) + if self.op.remote_node: + check_nodes.add(self.op.remote_node) + for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]: + owned = self.owned_locks(level) + assert not (check_nodes - owned), \ + ("Not owning the correct locks, owning %r, expected at least %r" % + (owned, check_nodes)) + r_shut = _ShutdownInstanceDisks(self, instance) if not r_shut: raise errors.OpExecError("Cannot shutdown instance disks, unable to" @@ -11391,6 +12097,15 @@ class LUInstanceSetParams(LogicalUnit): raise result.append(("disk_template", self.op.disk_template)) + assert instance.disk_template == self.op.disk_template, \ + ("Expected disk template '%s', found '%s'" % + (self.op.disk_template, instance.disk_template)) + + # Release node and resource locks if there are any (they might already have + # been released during disk conversion) + _ReleaseLocks(self, locking.LEVEL_NODE) + _ReleaseLocks(self, locking.LEVEL_NODE_RES) + # NIC changes for nic_op, nic_dict in self.op.nics: if nic_op == constants.DDM_REMOVE: @@ -11441,8 +12156,20 @@ class LUInstanceSetParams(LogicalUnit): for key, val in self.op.osparams.iteritems(): result.append(("os/%s" % key, val)) + # online/offline instance + if self.op.online_inst: + self.cfg.MarkInstanceDown(instance.name) + result.append(("admin_state", constants.ADMINST_DOWN)) + if self.op.offline_inst: + self.cfg.MarkInstanceOffline(instance.name) + result.append(("admin_state", constants.ADMINST_OFFLINE)) + self.cfg.Update(instance, feedback_fn) + assert not (self.owned_locks(locking.LEVEL_NODE_RES) or + self.owned_locks(locking.LEVEL_NODE)), \ + "All node locks should have been released by now" + return result _DISK_CONVERSIONS = { @@ -11765,7 +12492,8 @@ class LUBackupExport(LogicalUnit): "Cannot retrieve locked instance %s" % self.op.instance_name _CheckNodeOnline(self, self.instance.primary_node) - if (self.op.remove_instance and self.instance.admin_up and + if (self.op.remove_instance and + self.instance.admin_state == constants.ADMINST_UP and not self.op.shutdown): raise errors.OpPrereqError("Can not remove instance without shutting it" " down before") @@ -11895,7 +12623,7 @@ class LUBackupExport(LogicalUnit): for disk in instance.disks: self.cfg.SetDiskID(disk, src_node) - activate_disks = (not instance.admin_up) + activate_disks = (instance.admin_state != constants.ADMINST_UP) if activate_disks: # Activate the instance disks if we'exporting a stopped instance @@ -11908,7 +12636,8 @@ class LUBackupExport(LogicalUnit): helper.CreateSnapshots() try: - if (self.op.shutdown and instance.admin_up and + if (self.op.shutdown and + instance.admin_state == constants.ADMINST_UP and not self.op.remove_instance): assert not activate_disks feedback_fn("Starting instance %s" % instance.name) @@ -12057,6 +12786,19 @@ class LUGroupAdd(LogicalUnit): if self.op.ndparams: utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) + if self.op.diskparams: + for templ in constants.DISK_TEMPLATES: + if templ not in self.op.diskparams: + self.op.diskparams[templ] = {} + utils.ForceDictType(self.op.diskparams[templ], constants.DISK_DT_TYPES) + else: + self.op.diskparams = self.cfg.GetClusterInfo().diskparams + + if self.op.ipolicy: + cluster = self.cfg.GetClusterInfo() + full_ipolicy = cluster.SimpleFillIPolicy(self.op.ipolicy) + objects.InstancePolicy.CheckParameterSyntax(full_ipolicy) + def BuildHooksEnv(self): """Build hooks env. @@ -12079,7 +12821,9 @@ class LUGroupAdd(LogicalUnit): group_obj = objects.NodeGroup(name=self.op.group_name, members=[], uuid=self.group_uuid, alloc_policy=self.op.alloc_policy, - ndparams=self.op.ndparams) + ndparams=self.op.ndparams, + diskparams=self.op.diskparams, + ipolicy=self.op.ipolicy) self.cfg.AddNodeGroup(group_obj, self.proc.GetECId(), check_uuid=False) del self.remove_locks[locking.LEVEL_NODEGROUP] @@ -12164,13 +12908,9 @@ class LUGroupAssignNodes(NoHooksLU): """Assign nodes to a new group. """ - for node in self.op.nodes: - self.node_data[node].group = self.group_uuid + mods = [(node_name, self.group_uuid) for node_name in self.op.nodes] - # FIXME: Depends on side-effects of modifying the result of - # C{cfg.GetAllNodesInfo} - - self.cfg.Update(self.group, feedback_fn) # Saves all modified nodes. + self.cfg.AssignGroupNodes(mods) @staticmethod def CheckAssignmentForSplitInstances(changes, node_data, instance_data): @@ -12229,6 +12969,7 @@ class _GroupQuery(_QueryBase): lu.needed_locks = {} self._all_groups = lu.cfg.GetAllNodeGroupsInfo() + self._cluster = lu.cfg.GetClusterInfo() name_to_uuid = dict((g.name, g.uuid) for g in self._all_groups.values()) if not self.names: @@ -12294,7 +13035,8 @@ class _GroupQuery(_QueryBase): # Do not pass on node information if it was not requested. group_to_nodes = None - return query.GroupQueryData([self._all_groups[uuid] + return query.GroupQueryData(self._cluster, + [self._all_groups[uuid] for uuid in self.wanted], group_to_nodes, group_to_instances) @@ -12330,7 +13072,11 @@ class LUGroupSetParams(LogicalUnit): def CheckArguments(self): all_changes = [ self.op.ndparams, + self.op.diskparams, self.op.alloc_policy, + self.op.hv_state, + self.op.disk_state, + self.op.ipolicy, ] if all_changes.count(None) == len(all_changes): @@ -12360,6 +13106,35 @@ class LUGroupSetParams(LogicalUnit): utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES) self.new_ndparams = new_ndparams + if self.op.diskparams: + self.new_diskparams = dict() + for templ in constants.DISK_TEMPLATES: + if templ not in self.op.diskparams: + self.op.diskparams[templ] = {} + new_templ_params = _GetUpdatedParams(self.group.diskparams[templ], + self.op.diskparams[templ]) + utils.ForceDictType(new_templ_params, constants.DISK_DT_TYPES) + self.new_diskparams[templ] = new_templ_params + + if self.op.hv_state: + self.new_hv_state = _MergeAndVerifyHvState(self.op.hv_state, + self.group.hv_state_static) + + if self.op.disk_state: + self.new_disk_state = \ + _MergeAndVerifyDiskState(self.op.disk_state, + self.group.disk_state_static) + + if self.op.ipolicy: + g_ipolicy = {} + for key, value in self.op.ipolicy.iteritems(): + g_ipolicy[key] = _GetUpdatedParams(self.group.ipolicy.get(key, {}), + value, + use_none=True) + utils.ForceDictType(g_ipolicy[key], constants.ISPECS_PARAMETER_TYPES) + self.new_ipolicy = g_ipolicy + objects.InstancePolicy.CheckParameterSyntax(self.new_ipolicy) + def BuildHooksEnv(self): """Build hooks env. @@ -12386,9 +13161,22 @@ class LUGroupSetParams(LogicalUnit): self.group.ndparams = self.new_ndparams result.append(("ndparams", str(self.group.ndparams))) + if self.op.diskparams: + self.group.diskparams = self.new_diskparams + result.append(("diskparams", str(self.group.diskparams))) + if self.op.alloc_policy: self.group.alloc_policy = self.op.alloc_policy + if self.op.hv_state: + self.group.hv_state_static = self.new_hv_state + + if self.op.disk_state: + self.group.disk_state_static = self.new_disk_state + + if self.op.ipolicy: + self.group.ipolicy = self.new_ipolicy + self.cfg.Update(self.group, feedback_fn) return result @@ -13113,10 +13901,10 @@ class IAllocator(object): elif self.mode == constants.IALLOCATOR_MODE_RELOC: hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor else: - hypervisor_name = cluster_info.enabled_hypervisors[0] + hypervisor_name = cluster_info.primary_hypervisor - node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(), - hypervisor_name) + node_data = self.rpc.call_node_info(node_list, [cfg.GetVGName()], + [hypervisor_name]) node_iinfo = \ self.rpc.call_all_instances_info(node_list, cluster_info.enabled_hypervisors) @@ -13178,6 +13966,7 @@ class IAllocator(object): @param node_results: the basic node structures as filled from the config """ + #TODO(dynmem): compute the right data on MAX and MIN memory # make a copy of the current dict node_results = dict(node_results) for nname, nresult in node_data.items(): @@ -13188,7 +13977,7 @@ class IAllocator(object): nresult.Raise("Can't get data for node %s" % nname) node_iinfo[nname].Raise("Can't get node instance info from node %s" % nname) - remote_info = nresult.payload + remote_info = _MakeLegacyNodeInfo(nresult.payload) for attr in ["memory_total", "memory_free", "memory_dom0", "vg_size", "vg_free", "cpu_total"]: @@ -13203,16 +13992,16 @@ class IAllocator(object): i_p_mem = i_p_up_mem = 0 for iinfo, beinfo in i_list: if iinfo.primary_node == nname: - i_p_mem += beinfo[constants.BE_MEMORY] + i_p_mem += beinfo[constants.BE_MAXMEM] if iinfo.name not in node_iinfo[nname].payload: i_used_mem = 0 else: i_used_mem = int(node_iinfo[nname].payload[iinfo.name]["memory"]) - i_mem_diff = beinfo[constants.BE_MEMORY] - i_used_mem + i_mem_diff = beinfo[constants.BE_MAXMEM] - i_used_mem remote_info["memory_free"] -= max(0, i_mem_diff) - if iinfo.admin_up: - i_p_up_mem += beinfo[constants.BE_MEMORY] + if iinfo.admin_state == constants.ADMINST_UP: + i_p_up_mem += beinfo[constants.BE_MAXMEM] # compute memory used by instances pnr_dyn = { @@ -13251,9 +14040,9 @@ class IAllocator(object): nic_data.append(nic_dict) pir = { "tags": list(iinfo.GetTags()), - "admin_up": iinfo.admin_up, + "admin_state": iinfo.admin_state, "vcpus": beinfo[constants.BE_VCPUS], - "memory": beinfo[constants.BE_MEMORY], + "memory": beinfo[constants.BE_MAXMEM], "os": iinfo.os, "nodes": [iinfo.primary_node] + list(iinfo.secondary_nodes), "nics": nic_data,