X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/da4fd3b0527ff0b277dd95128c20be77a60cfd80..c57add7e6fb973525928b868f157ab6b68399639:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 8ec6f4a..f6afefe 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -59,10 +59,15 @@ from ganeti import query from ganeti import qlang from ganeti import opcodes from ganeti import ht +from ganeti import rpc import ganeti.masterd.instance # pylint: disable=W0611 +#: Size of DRBD meta block device +DRBD_META_SIZE = 128 + + class ResultWithJobs: """Data container for LU results with jobs. @@ -108,7 +113,7 @@ class LogicalUnit(object): HTYPE = None REQ_BGL = True - def __init__(self, processor, op, context, rpc): + def __init__(self, processor, op, context, rpc_runner): """Constructor for LogicalUnit. This needs to be overridden in derived classes in order to check op @@ -122,7 +127,7 @@ class LogicalUnit(object): # readability alias self.owned_locks = context.glm.list_owned self.context = context - self.rpc = rpc + self.rpc = rpc_runner # Dicts used to declare locking needs to mcpu self.needed_locks = None self.share_locks = dict.fromkeys(locking.LEVELS, 0) @@ -344,7 +349,8 @@ class LogicalUnit(object): self.op.instance_name) self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name - def _LockInstancesNodes(self, primary_only=False): + def _LockInstancesNodes(self, primary_only=False, + level=locking.LEVEL_NODE): """Helper function to declare instances' nodes for locking. This function should be called after locking one or more instances to lock @@ -365,9 +371,10 @@ class LogicalUnit(object): @type primary_only: boolean @param primary_only: only lock primary nodes of locked instances + @param level: Which lock level to use for locking nodes """ - assert locking.LEVEL_NODE in self.recalculate_locks, \ + assert level in self.recalculate_locks, \ "_LockInstancesNodes helper function called with no nodes to recalculate" # TODO: check if we're really been called with the instance locks held @@ -382,12 +389,14 @@ class LogicalUnit(object): if not primary_only: wanted_nodes.extend(instance.secondary_nodes) - if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE: - self.needed_locks[locking.LEVEL_NODE] = wanted_nodes - elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND: - self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes) + if self.recalculate_locks[level] == constants.LOCKS_REPLACE: + self.needed_locks[level] = wanted_nodes + elif self.recalculate_locks[level] == constants.LOCKS_APPEND: + self.needed_locks[level].extend(wanted_nodes) + else: + raise errors.ProgrammerError("Unknown recalculation mode") - del self.recalculate_locks[locking.LEVEL_NODE] + del self.recalculate_locks[level] class NoHooksLU(LogicalUnit): # pylint: disable=W0223 @@ -753,7 +762,7 @@ def _RunPostHook(lu, node_name): """Runs the post-hook for an opcode on a single node. """ - hm = lu.proc.hmclass(lu.rpc.call_hooks_runner, lu) + hm = lu.proc.BuildHooksManager(lu) try: hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name]) except: @@ -1204,13 +1213,13 @@ def _GetStorageTypeArgs(cfg, storage_type): return [] -def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq): +def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq): faulty = [] for dev in instance.disks: cfg.SetDiskID(dev, node_name) - result = rpc.call_blockdev_getmirrorstatus(node_name, instance.disks) + result = rpc_runner.call_blockdev_getmirrorstatus(node_name, instance.disks) result.Raise("Failed to get disk status from node %s" % node_name, prereq=prereq, ecode=errors.ECODE_ENVIRON) @@ -1350,15 +1359,17 @@ class LUClusterDestroy(LogicalUnit): """Destroys the cluster. """ - master = self.cfg.GetMasterNode() + master_params = self.cfg.GetMasterNetworkParameters() # Run post hooks on master node before it's removed - _RunPostHook(self, master) + _RunPostHook(self, master_params.name) - result = self.rpc.call_node_deactivate_master_ip(master) + ems = self.cfg.GetUseExternalMipScript() + result = self.rpc.call_node_deactivate_master_ip(master_params.name, + master_params, ems) result.Raise("Could not disable the master role") - return master + return master_params.name def _VerifyCertificate(filename): @@ -1931,6 +1942,26 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): _ErrorIf(bool(missing), constants.CV_ENODENET, node, "missing bridges: %s" % utils.CommaJoin(sorted(missing))) + def _VerifyNodeUserScripts(self, ninfo, nresult): + """Check the results of user scripts presence and executability on the node + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + + """ + node = ninfo.name + + test = not constants.NV_USERSCRIPTS in nresult + self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node, + "did not return user scripts information") + + broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None) + if not test: + self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node, + "user scripts not present or not executable: %s" % + utils.CommaJoin(sorted(broken_scripts))) + def _VerifyNodeNetwork(self, ninfo, nresult): """Check the node network connectivity results. @@ -2080,7 +2111,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): @classmethod def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo, - (files_all, files_all_opt, files_mc, files_vm)): + (files_all, files_opt, files_mc, files_vm)): """Verifies file checksums collected from all nodes. @param errorif: Callback for reporting errors @@ -2089,14 +2120,9 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): @param all_nvinfo: RPC results """ - assert (len(files_all | files_all_opt | files_mc | files_vm) == - sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \ - "Found file listed in more than one file list" - # Define functions determining which nodes to consider for a file files2nodefn = [ (files_all, None), - (files_all_opt, None), (files_mc, lambda node: (node.master_candidate or node.name == master_node)), (files_vm, lambda node: node.vm_capable), @@ -2113,7 +2139,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): frozenset(map(operator.attrgetter("name"), filenodes))) for filename in files) - assert set(nodefiles) == (files_all | files_all_opt | files_mc | files_vm) + assert set(nodefiles) == (files_all | files_mc | files_vm) fileinfo = dict((filename, {}) for filename in nodefiles) ignore_nodes = set() @@ -2155,7 +2181,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): # Nodes missing file missing_file = expected_nodes - with_file - if filename in files_all_opt: + if filename in files_opt: # All or no nodes errorif(missing_file and missing_file != expected_nodes, constants.CV_ECLUSTERFILECHECK, None, @@ -2644,6 +2670,10 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names)) + user_scripts = [] + if self.cfg.GetUseExternalMipScript(): + user_scripts.append(constants.EXTERNAL_MASTER_SETUP_SCRIPT) + node_verify_param = { constants.NV_FILELIST: utils.UniqueSequence(filename @@ -2666,6 +2696,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): constants.NV_MASTERIP: (master_node, master_ip), constants.NV_OSLIST: None, constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(), + constants.NV_USERSCRIPTS: user_scripts, } if vg_name is not None: @@ -2824,6 +2855,7 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): nimg.call_ok = self._VerifyNode(node_i, nresult) self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime) self._VerifyNodeNetwork(node_i, nresult) + self._VerifyNodeUserScripts(node_i, nresult) self._VerifyOob(node_i, nresult) if nimg.vm_capable: @@ -3154,7 +3186,7 @@ class LUGroupVerifyDisks(NoHooksLU): # any leftover items in nv_dict are missing LVs, let's arrange the data # better for key, inst in nv_dict.iteritems(): - res_missing.setdefault(inst, []).append(key) + res_missing.setdefault(inst, []).append(list(key)) return (res_nodes, list(res_instances), res_missing) @@ -3169,21 +3201,21 @@ class LUClusterRepairDiskSizes(NoHooksLU): if self.op.instances: self.wanted_names = _GetWantedInstances(self, self.op.instances) self.needed_locks = { - locking.LEVEL_NODE: [], + locking.LEVEL_NODE_RES: [], locking.LEVEL_INSTANCE: self.wanted_names, } - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE else: self.wanted_names = None self.needed_locks = { - locking.LEVEL_NODE: locking.ALL_SET, + locking.LEVEL_NODE_RES: locking.ALL_SET, locking.LEVEL_INSTANCE: locking.ALL_SET, } self.share_locks = _ShareAll() def DeclareLocks(self, level): - if level == locking.LEVEL_NODE and self.wanted_names is not None: - self._LockInstancesNodes(primary_only=True) + if level == locking.LEVEL_NODE_RES and self.wanted_names is not None: + self._LockInstancesNodes(primary_only=True, level=level) def CheckPrereq(self): """Check prerequisites. @@ -3234,6 +3266,11 @@ class LUClusterRepairDiskSizes(NoHooksLU): for idx, disk in enumerate(instance.disks): per_node_disks[pnode].append((instance, idx, disk)) + assert not (frozenset(per_node_disks.keys()) - + self.owned_locks(locking.LEVEL_NODE_RES)), \ + "Not owning correct locks" + assert not self.owned_locks(locking.LEVEL_NODE) + changed = [] for node, dskl in per_node_disks.items(): newl = [v[2].Copy() for v in dskl] @@ -3323,29 +3360,33 @@ class LUClusterRename(LogicalUnit): """ clustername = self.op.name - ip = self.ip + new_ip = self.ip # shutdown the master IP - master = self.cfg.GetMasterNode() - result = self.rpc.call_node_deactivate_master_ip(master) + master_params = self.cfg.GetMasterNetworkParameters() + ems = self.cfg.GetUseExternalMipScript() + result = self.rpc.call_node_deactivate_master_ip(master_params.name, + master_params, ems) result.Raise("Could not disable the master role") try: cluster = self.cfg.GetClusterInfo() cluster.cluster_name = clustername - cluster.master_ip = ip + cluster.master_ip = new_ip self.cfg.Update(cluster, feedback_fn) # update the known hosts file ssh.WriteKnownHostsFile(self.cfg, constants.SSH_KNOWN_HOSTS_FILE) node_list = self.cfg.GetOnlineNodeList() try: - node_list.remove(master) + node_list.remove(master_params.name) except ValueError: pass _UploadHelper(self, node_list, constants.SSH_KNOWN_HOSTS_FILE) finally: - result = self.rpc.call_node_activate_master_ip(master) + master_params.ip = new_ip + result = self.rpc.call_node_activate_master_ip(master_params.name, + master_params, ems) msg = result.fail_msg if msg: self.LogWarning("Could not re-enable the master role on" @@ -3675,6 +3716,9 @@ class LUClusterSetParams(LogicalUnit): if self.op.reserved_lvs is not None: self.cluster.reserved_lvs = self.op.reserved_lvs + if self.op.use_external_mip_script is not None: + self.cluster.use_external_mip_script = self.op.use_external_mip_script + def helper_os(aname, mods, desc): desc += " OS list" lst = getattr(self.cluster, aname) @@ -3699,33 +3743,40 @@ class LUClusterSetParams(LogicalUnit): helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted") if self.op.master_netdev: - master = self.cfg.GetMasterNode() + master_params = self.cfg.GetMasterNetworkParameters() + ems = self.cfg.GetUseExternalMipScript() feedback_fn("Shutting down master ip on the current netdev (%s)" % self.cluster.master_netdev) - result = self.rpc.call_node_deactivate_master_ip(master) + result = self.rpc.call_node_deactivate_master_ip(master_params.name, + master_params, ems) result.Raise("Could not disable the master ip") feedback_fn("Changing master_netdev from %s to %s" % - (self.cluster.master_netdev, self.op.master_netdev)) + (master_params.netdev, self.op.master_netdev)) self.cluster.master_netdev = self.op.master_netdev if self.op.master_netmask: - master = self.cfg.GetMasterNode() + master_params = self.cfg.GetMasterNetworkParameters() feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask) - result = self.rpc.call_node_change_master_netmask(master, - self.op.master_netmask) + result = self.rpc.call_node_change_master_netmask(master_params.name, + master_params.netmask, + self.op.master_netmask, + master_params.ip, + master_params.netdev) if result.fail_msg: msg = "Could not change the master IP netmask: %s" % result.fail_msg - self.LogWarning(msg) feedback_fn(msg) - else: - self.cluster.master_netmask = self.op.master_netmask + + self.cluster.master_netmask = self.op.master_netmask self.cfg.Update(self.cluster, feedback_fn) if self.op.master_netdev: + master_params = self.cfg.GetMasterNetworkParameters() feedback_fn("Starting the master ip on the new master netdev (%s)" % self.op.master_netdev) - result = self.rpc.call_node_activate_master_ip(master) + ems = self.cfg.GetUseExternalMipScript() + result = self.rpc.call_node_activate_master_ip(master_params.name, + master_params, ems) if result.fail_msg: self.LogWarning("Could not re-enable the master ip on" " the master, please restart manually: %s", @@ -3760,6 +3811,7 @@ def _ComputeAncillaryFiles(cluster, redist): constants.CLUSTER_DOMAIN_SECRET_FILE, constants.SPICE_CERT_FILE, constants.SPICE_CACERT_FILE, + constants.RAPI_USERS_FILE, ]) if not redist: @@ -3772,27 +3824,43 @@ def _ComputeAncillaryFiles(cluster, redist): if cluster.modify_etc_hosts: files_all.add(constants.ETC_HOSTS) - # Files which must either exist on all nodes or on none - files_all_opt = set([ + # Files which are optional, these must: + # - be present in one other category as well + # - either exist or not exist on all nodes of that category (mc, vm all) + files_opt = set([ constants.RAPI_USERS_FILE, ]) # Files which should only be on master candidates files_mc = set() + if not redist: files_mc.add(constants.CLUSTER_CONF_FILE) + # FIXME: this should also be replicated but Ganeti doesn't support files_mc + # replication + files_mc.add(constants.DEFAULT_MASTER_SETUP_SCRIPT) + # Files which should only be on VM-capable nodes files_vm = set(filename for hv_name in cluster.enabled_hypervisors - for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()) + for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[0]) + + files_opt |= set(filename + for hv_name in cluster.enabled_hypervisors + for filename in hypervisor.GetHypervisor(hv_name).GetAncillaryFiles()[1]) - # Filenames must be unique - assert (len(files_all | files_all_opt | files_mc | files_vm) == - sum(map(len, [files_all, files_all_opt, files_mc, files_vm]))), \ + # Filenames in each category must be unique + all_files_set = files_all | files_mc | files_vm + assert (len(all_files_set) == + sum(map(len, [files_all, files_mc, files_vm]))), \ "Found file listed in more than one file list" - return (files_all, files_all_opt, files_mc, files_vm) + # Optional files must be present in one other category + assert all_files_set.issuperset(files_opt), \ + "Optional file not in a different required list" + + return (files_all, files_opt, files_mc, files_vm) def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): @@ -3826,7 +3894,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): nodelist.remove(master_info.name) # Gather file lists - (files_all, files_all_opt, files_mc, files_vm) = \ + (files_all, _, files_mc, files_vm) = \ _ComputeAncillaryFiles(cluster, True) # Never re-distribute configuration file from here @@ -3836,7 +3904,6 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None, additional_vm=True): filemap = [ (online_nodes, files_all), - (online_nodes, files_all_opt), (vm_nodes, files_vm), ] @@ -3876,8 +3943,10 @@ class LUClusterActivateMasterIp(NoHooksLU): """Activate the master IP. """ - master = self.cfg.GetMasterNode() - self.rpc.call_node_activate_master_ip(master) + master_params = self.cfg.GetMasterNetworkParameters() + ems = self.cfg.GetUseExternalMipScript() + self.rpc.call_node_activate_master_ip(master_params.name, + master_params, ems) class LUClusterDeactivateMasterIp(NoHooksLU): @@ -3888,8 +3957,10 @@ class LUClusterDeactivateMasterIp(NoHooksLU): """Deactivate the master IP. """ - master = self.cfg.GetMasterNode() - self.rpc.call_node_deactivate_master_ip(master) + master_params = self.cfg.GetMasterNetworkParameters() + ems = self.cfg.GetUseExternalMipScript() + self.rpc.call_node_deactivate_master_ip(master_params.name, master_params, + ems) def _WaitForSync(lu, instance, disks=None, oneshot=False): @@ -4411,6 +4482,9 @@ class LUNodeRemove(LogicalUnit): modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup + assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ + "Not owning BGL" + # Promote nodes to master candidate as needed _AdjustCandidatePool(self, exceptions=[node.name]) self.context.RemoveNode(node.name) @@ -4524,6 +4598,9 @@ class LUNodeQuery(NoHooksLU): def ExpandNames(self): self.nq.ExpandNames(self) + def DeclareLocks(self, level): + self.nq.DeclareLocks(self, level) + def Exec(self, feedback_fn): return self.nq.OldStyleQuery(self) @@ -4542,8 +4619,9 @@ class LUNodeQueryvols(NoHooksLU): selected=self.op.output_fields) def ExpandNames(self): + self.share_locks = _ShareAll() self.needed_locks = {} - self.share_locks[locking.LEVEL_NODE] = 1 + if not self.op.nodes: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET else: @@ -4610,8 +4688,8 @@ class LUNodeQueryStorage(NoHooksLU): selected=self.op.output_fields) def ExpandNames(self): + self.share_locks = _ShareAll() self.needed_locks = {} - self.share_locks[locking.LEVEL_NODE] = 1 if self.op.nodes: self.needed_locks[locking.LEVEL_NODE] = \ @@ -5077,6 +5155,9 @@ class LUNodeAdd(LogicalUnit): new_node = self.new_node node = new_node.name + assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \ + "Not owning BGL" + # We adding a new node so we assume it's powered new_node.powered = True @@ -5215,6 +5296,13 @@ class LUNodeSetParams(LogicalUnit): self.lock_all = self.op.auto_promote and self.might_demote self.lock_instances = self.op.secondary_ip is not None + def _InstanceFilter(self, instance): + """Filter for getting affected instances. + + """ + return (instance.disk_template in constants.DTS_INT_MIRROR and + self.op.node_name in instance.all_nodes) + def ExpandNames(self): if self.lock_all: self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET} @@ -5222,28 +5310,8 @@ class LUNodeSetParams(LogicalUnit): self.needed_locks = {locking.LEVEL_NODE: self.op.node_name} if self.lock_instances: - self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET - - def DeclareLocks(self, level): - # If we have locked all instances, before waiting to lock nodes, release - # all the ones living on nodes unrelated to the current operation. - if level == locking.LEVEL_NODE and self.lock_instances: - self.affected_instances = [] - if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET: - instances_keep = [] - - # Build list of instances to release - locked_i = self.owned_locks(locking.LEVEL_INSTANCE) - for instance_name, instance in self.cfg.GetMultiInstanceInfo(locked_i): - if (instance.disk_template in constants.DTS_INT_MIRROR and - self.op.node_name in instance.all_nodes): - instances_keep.append(instance_name) - self.affected_instances.append(instance) - - _ReleaseLocks(self, locking.LEVEL_INSTANCE, keep=instances_keep) - - assert (set(self.owned_locks(locking.LEVEL_INSTANCE)) == - set(instances_keep)) + self.needed_locks[locking.LEVEL_INSTANCE] = \ + frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)) def BuildHooksEnv(self): """Build hooks env. @@ -5275,6 +5343,25 @@ class LUNodeSetParams(LogicalUnit): """ node = self.node = self.cfg.GetNodeInfo(self.op.node_name) + if self.lock_instances: + affected_instances = \ + self.cfg.GetInstancesInfoByFilter(self._InstanceFilter) + + # Verify instance locks + owned_instances = self.owned_locks(locking.LEVEL_INSTANCE) + wanted_instances = frozenset(affected_instances.keys()) + if wanted_instances - owned_instances: + raise errors.OpPrereqError("Instances affected by changing node %s's" + " secondary IP address have changed since" + " locks were acquired, wanted '%s', have" + " '%s'; retry the operation" % + (self.op.node_name, + utils.CommaJoin(wanted_instances), + utils.CommaJoin(owned_instances)), + errors.ECODE_STATE) + else: + affected_instances = None + if (self.op.master_candidate is not None or self.op.drained is not None or self.op.offline is not None): @@ -5364,7 +5451,9 @@ class LUNodeSetParams(LogicalUnit): if old_role == self._ROLE_OFFLINE and new_role != old_role: # Trying to transition out of offline status - result = self.rpc.call_version([node.name])[node.name] + # TODO: Use standard RPC runner, but make sure it works when the node is + # still marked offline + result = rpc.BootstrapRunner().call_version([node.name])[node.name] if result.fail_msg: raise errors.OpPrereqError("Node %s is being de-offlined but fails" " to report its version: %s" % @@ -5383,15 +5472,19 @@ class LUNodeSetParams(LogicalUnit): raise errors.OpPrereqError("Cannot change the secondary ip on a single" " homed cluster", errors.ECODE_INVAL) + assert not (frozenset(affected_instances) - + self.owned_locks(locking.LEVEL_INSTANCE)) + if node.offline: - if self.affected_instances: - raise errors.OpPrereqError("Cannot change secondary ip: offline" - " node has instances (%s) configured" - " to use it" % self.affected_instances) + if affected_instances: + raise errors.OpPrereqError("Cannot change secondary IP address:" + " offline node has instances (%s)" + " configured to use it" % + utils.CommaJoin(affected_instances.keys())) else: # On online nodes, check that no instances are running, and that # the node has the new ip and we can reach it. - for instance in self.affected_instances: + for instance in affected_instances.values(): _CheckInstanceDown(self, instance, "cannot change secondary ip") _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True) @@ -5544,6 +5637,7 @@ class LUClusterQuery(NoHooksLU): "candidate_pool_size": cluster.candidate_pool_size, "master_netdev": cluster.master_netdev, "master_netmask": cluster.master_netmask, + "use_external_mip_script": cluster.use_external_mip_script, "volume_group_name": cluster.volume_group_name, "drbd_usermode_helper": cluster.drbd_usermode_helper, "file_storage_dir": cluster.file_storage_dir, @@ -6070,9 +6164,11 @@ class LUInstanceStartup(LogicalUnit): _StartInstanceDisks(self, instance, force) - result = self.rpc.call_instance_start(node_current, instance, - self.op.hvparams, self.op.beparams, - self.op.startup_paused) + result = \ + self.rpc.call_instance_start(node_current, + (instance, self.op.hvparams, + self.op.beparams), + self.op.startup_paused) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -6162,8 +6258,8 @@ class LUInstanceReboot(LogicalUnit): self.LogInfo("Instance %s was already stopped, starting now", instance.name) _StartInstanceDisks(self, instance, ignore_secondaries) - result = self.rpc.call_instance_start(node_current, instance, - None, None, False) + result = self.rpc.call_instance_start(node_current, + (instance, None, None), False) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -6324,9 +6420,9 @@ class LUInstanceReinstall(LogicalUnit): try: feedback_fn("Running the instance OS create scripts...") # FIXME: pass debug option from opcode to backend - result = self.rpc.call_instance_os_add(inst.primary_node, inst, True, - self.op.debug_level, - osparams=self.os_inst) + result = self.rpc.call_instance_os_add(inst.primary_node, + (inst, self.os_inst), True, + self.op.debug_level) result.Raise("Could not install OS for instance %s on node %s" % (inst.name, inst.primary_node)) finally: @@ -6360,6 +6456,10 @@ class LUInstanceRecreateDisks(LogicalUnit): # otherwise we need to lock all nodes for disk re-creation primary_only = bool(self.op.nodes) self._LockInstancesNodes(primary_only=primary_only) + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -6406,7 +6506,8 @@ class LUInstanceRecreateDisks(LogicalUnit): self.op.instance_name, errors.ECODE_INVAL) # if we replace nodes *and* the old primary is offline, we don't # check - assert instance.primary_node in self.needed_locks[locking.LEVEL_NODE] + assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE) + assert instance.primary_node in self.owned_locks(locking.LEVEL_NODE_RES) old_pnode = self.cfg.GetNodeInfo(instance.primary_node) if not (self.op.nodes and old_pnode.offline): _CheckInstanceDown(self, instance, "cannot recreate disks") @@ -6430,6 +6531,9 @@ class LUInstanceRecreateDisks(LogicalUnit): """ instance = self.instance + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)) + to_skip = [] mods = [] # keeps track of needed logical_id changes @@ -6598,11 +6702,16 @@ class LUInstanceRemove(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] + self.needed_locks[locking.LEVEL_NODE_RES] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE def DeclareLocks(self, level): if level == locking.LEVEL_NODE: self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -6651,6 +6760,12 @@ class LUInstanceRemove(LogicalUnit): " node %s: %s" % (instance.name, instance.primary_node, msg)) + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)) + assert not (set(instance.all_nodes) - + self.owned_locks(locking.LEVEL_NODE)), \ + "Not owning correct locks" + _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures) @@ -6864,11 +6979,16 @@ class LUInstanceMove(LogicalUnit): target_node = _ExpandNodeName(self.cfg, self.op.target_node) self.op.target_node = target_node self.needed_locks[locking.LEVEL_NODE] = [target_node] + self.needed_locks[locking.LEVEL_NODE_RES] = [] self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND def DeclareLocks(self, level): if level == locking.LEVEL_NODE: self._LockInstancesNodes(primary_only=True) + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -6953,6 +7073,9 @@ class LUInstanceMove(LogicalUnit): self.LogInfo("Shutting down instance %s on source node %s", instance.name, source_node) + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)) + result = self.rpc.call_instance_shutdown(source_node, instance, self.op.shutdown_timeout) msg = result.fail_msg @@ -7027,8 +7150,8 @@ class LUInstanceMove(LogicalUnit): _ShutdownInstanceDisks(self, instance) raise errors.OpExecError("Can't activate the instance's disks") - result = self.rpc.call_instance_start(target_node, instance, - None, None, False) + result = self.rpc.call_instance_start(target_node, + (instance, None, None), False) msg = result.fail_msg if msg: _ShutdownInstanceDisks(self, instance) @@ -7691,7 +7814,7 @@ class TLMigrateInstance(Tasklet): self.feedback_fn("* starting the instance on the target node %s" % target_node) - result = self.rpc.call_instance_start(target_node, instance, None, None, + result = self.rpc.call_instance_start(target_node, (instance, None, None), False) msg = result.fail_msg if msg: @@ -7823,7 +7946,7 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names, shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId()) dev_data = objects.Disk(dev_type=constants.LD_LV, size=size, logical_id=(vgnames[0], names[0])) - dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128, + dev_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE, logical_id=(vgnames[1], names[1])) drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size, logical_id=(primary, secondary, port, @@ -8143,7 +8266,7 @@ def _ComputeDiskSizePerVG(disk_template, disks): constants.DT_DISKLESS: {}, constants.DT_PLAIN: _compute(disks, 0), # 128 MB are added for drbd metadata for each disk - constants.DT_DRBD8: _compute(disks, 128), + constants.DT_DRBD8: _compute(disks, DRBD_META_SIZE), constants.DT_FILE: {}, constants.DT_SHARED_FILE: {}, } @@ -8164,7 +8287,8 @@ def _ComputeDiskSize(disk_template, disks): constants.DT_DISKLESS: None, constants.DT_PLAIN: sum(d[constants.IDISK_SIZE] for d in disks), # 128 MB are added for drbd metadata for each disk - constants.DT_DRBD8: sum(d[constants.IDISK_SIZE] + 128 for d in disks), + constants.DT_DRBD8: + sum(d[constants.IDISK_SIZE] + DRBD_META_SIZE for d in disks), constants.DT_FILE: None, constants.DT_SHARED_FILE: 0, constants.DT_BLOCK: 0, @@ -8210,9 +8334,11 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams): """ nodenames = _FilterVmNodes(lu, nodenames) - hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, - hvname, - hvparams) + + cluster = lu.cfg.GetClusterInfo() + hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams) + + hvinfo = lu.rpc.call_hypervisor_validate_params(nodenames, hvname, hvfull) for node in nodenames: info = hvinfo[node] if info.offline: @@ -8238,7 +8364,7 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams): """ nodenames = _FilterVmNodes(lu, nodenames) - result = lu.rpc.call_os_validate(required, nodenames, osname, + result = lu.rpc.call_os_validate(nodenames, required, osname, [constants.OS_VALIDATE_PARAMETERS], osparams) for node, nres in result.items(): @@ -8431,7 +8557,11 @@ class LUInstanceCreate(LogicalUnit): self.add_locks[locking.LEVEL_INSTANCE] = instance_name if self.op.iallocator: + # TODO: Find a solution to not lock all nodes in the cluster, e.g. by + # specifying a group on instance creation and then selecting nodes from + # that group self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET else: self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode) nodelist = [self.op.pnode] @@ -8439,6 +8569,9 @@ class LUInstanceCreate(LogicalUnit): self.op.snode = _ExpandNodeName(self.cfg, self.op.snode) nodelist.append(self.op.snode) self.needed_locks[locking.LEVEL_NODE] = nodelist + # Lock resources of instance's primary and secondary nodes (copy to + # prevent accidential modification) + self.needed_locks[locking.LEVEL_NODE_RES] = list(nodelist) # in case of import lock the source node too if self.op.mode == constants.INSTANCE_IMPORT: @@ -9045,6 +9178,10 @@ class LUInstanceCreate(LogicalUnit): instance = self.op.instance_name pnode_name = self.pnode.name + assert not (self.owned_locks(locking.LEVEL_NODE_RES) - + self.owned_locks(locking.LEVEL_NODE)), \ + "Node locks differ from node resource locks" + ht_kind = self.op.hypervisor if ht_kind in constants.HTS_REQ_PORT: network_port = self.cfg.AllocatePort() @@ -9147,6 +9284,9 @@ class LUInstanceCreate(LogicalUnit): raise errors.OpExecError("There are some degraded disks for" " this instance") + # Release all node resource locks + _ReleaseLocks(self, locking.LEVEL_NODE_RES) + if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks: if self.op.mode == constants.INSTANCE_CREATE: if not self.op.no_install: @@ -9164,7 +9304,7 @@ class LUInstanceCreate(LogicalUnit): feedback_fn("* running the instance OS create scripts...") # FIXME: pass debug option from opcode to backend os_add_result = \ - self.rpc.call_instance_os_add(pnode_name, iobj, False, + self.rpc.call_instance_os_add(pnode_name, (iobj, None), False, self.op.debug_level) if pause_sync: feedback_fn("* resuming disk sync") @@ -9239,13 +9379,15 @@ class LUInstanceCreate(LogicalUnit): raise errors.ProgrammerError("Unknown OS initialization mode '%s'" % self.op.mode) + assert not self.owned_locks(locking.LEVEL_NODE_RES) + if self.op.start: iobj.admin_up = True self.cfg.Update(iobj, feedback_fn) logging.info("Starting instance %s on node %s", instance, pnode_name) feedback_fn("* starting instance...") - result = self.rpc.call_instance_start(pnode_name, iobj, - None, None, False) + result = self.rpc.call_instance_start(pnode_name, (iobj, None, None), + False) result.Raise("Could not start instance") return list(iobj.all_nodes) @@ -9262,6 +9404,7 @@ class LUInstanceConsole(NoHooksLU): REQ_BGL = False def ExpandNames(self): + self.share_locks = _ShareAll() self._ExpandAndLockInstance() def CheckPrereq(self): @@ -9829,7 +9972,7 @@ class TLReplaceDisks(Tasklet): lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size, logical_id=(vg_data, names[0])) vg_meta = dev.children[1].logical_id[0] - lv_meta = objects.Disk(dev_type=constants.LD_LV, size=128, + lv_meta = objects.Disk(dev_type=constants.LD_LV, size=DRBD_META_SIZE, logical_id=(vg_meta, names[1])) new_lvs = [lv_data, lv_meta] @@ -10459,11 +10602,16 @@ class LUInstanceGrowDisk(LogicalUnit): def ExpandNames(self): self._ExpandAndLockInstance() self.needed_locks[locking.LEVEL_NODE] = [] - self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE + self.needed_locks[locking.LEVEL_NODE_RES] = [] + self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE def DeclareLocks(self, level): if level == locking.LEVEL_NODE: self._LockInstancesNodes() + elif level == locking.LEVEL_NODE_RES: + # Copy node locks + self.needed_locks[locking.LEVEL_NODE_RES] = \ + self.needed_locks[locking.LEVEL_NODE][:] def BuildHooksEnv(self): """Build hooks env. @@ -10520,10 +10668,18 @@ class LUInstanceGrowDisk(LogicalUnit): instance = self.instance disk = self.disk + assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) + assert (self.owned_locks(locking.LEVEL_NODE) == + self.owned_locks(locking.LEVEL_NODE_RES)) + disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk]) if not disks_ok: raise errors.OpExecError("Cannot activate block device to grow") + feedback_fn("Growing disk %s of instance '%s' by %s" % + (self.op.disk, instance.name, + utils.FormatUnit(self.op.amount, "h"))) + # First run all grow ops in dry-run mode for node in instance.all_nodes: self.cfg.SetDiskID(disk, node) @@ -10546,6 +10702,13 @@ class LUInstanceGrowDisk(LogicalUnit): disk.RecordGrow(self.op.amount) self.cfg.Update(instance, feedback_fn) + + # Changes have been recorded, release node lock + _ReleaseLocks(self, locking.LEVEL_NODE) + + # Downgrade lock while waiting for sync + self.glm.downgrade(locking.LEVEL_INSTANCE) + if self.op.wait_for_sync: disk_abort = not _WaitForSync(self, instance, disks=[disk]) if disk_abort: @@ -10558,6 +10721,9 @@ class LUInstanceGrowDisk(LogicalUnit): " not supposed to be running because no wait for" " sync mode was requested") + assert self.owned_locks(locking.LEVEL_NODE_RES) + assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE) + class LUInstanceQueryData(NoHooksLU): """Query runtime instance data. @@ -11890,8 +12056,8 @@ class LUBackupExport(LogicalUnit): not self.op.remove_instance): assert not activate_disks feedback_fn("Starting instance %s" % instance.name) - result = self.rpc.call_instance_start(src_node, instance, - None, None, False) + result = self.rpc.call_instance_start(src_node, + (instance, None, None), False) msg = result.fail_msg if msg: feedback_fn("Failed to start instance: %s" % msg) @@ -13023,9 +13189,9 @@ class IAllocator(object): # pylint: disable=R0902 # lots of instance attributes - def __init__(self, cfg, rpc, mode, **kwargs): + def __init__(self, cfg, rpc_runner, mode, **kwargs): self.cfg = cfg - self.rpc = rpc + self.rpc = rpc_runner # init buffer variables self.in_text = self.out_text = self.in_data = self.out_data = None # init all input fields so that pylint is happy