X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/77031881461755393de6d43dbee0e6a6ad72e5c5..da961187f97344fde390140ebb2f10d10d334d51:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 6bdc263..27dfe5f 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -25,7 +25,6 @@ import os import os.path -import sha import time import tempfile import re @@ -782,12 +781,13 @@ class LUVerifyCluster(LogicalUnit): else: for minor, (iname, must_exist) in drbd_map.items(): if minor not in used_minors and must_exist: - feedback_fn(" - ERROR: drbd minor %d of instance %s is not active" % - (minor, iname)) + feedback_fn(" - ERROR: drbd minor %d of instance %s is" + " not active" % (minor, iname)) bad = True for minor in used_minors: if minor not in drbd_map: - feedback_fn(" - ERROR: unallocated drbd minor %d is in use" % minor) + feedback_fn(" - ERROR: unallocated drbd minor %d is in use" % + minor) bad = True return bad @@ -915,8 +915,12 @@ class LUVerifyCluster(LogicalUnit): """ all_nodes = self.cfg.GetNodeList() - # TODO: populate the environment with useful information for verify hooks - env = {} + env = { + "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()) + } + for node in self.cfg.GetAllNodesInfo().values(): + env["NODE_TAGS_%s" % node.name] = " ".join(node.GetTags()) + return env, [], all_nodes def Exec(self, feedback_fn): @@ -1006,8 +1010,16 @@ class LUVerifyCluster(LogicalUnit): node_drbd = {} for minor, instance in all_drbd_map[node].items(): - instance = instanceinfo[instance] - node_drbd[minor] = (instance.name, instance.admin_up) + if instance not in instanceinfo: + feedback_fn(" - ERROR: ghost instance '%s' in temporary DRBD map" % + instance) + # ghost instance should not be running, but otherwise we + # don't give double warnings (both ghost instance and + # unallocated minor in use) + node_drbd[minor] = (instance, False) + else: + instance = instanceinfo[instance] + node_drbd[minor] = (instance.name, instance.admin_up) result = self._VerifyNode(node_i, file_names, local_checksums, nresult, feedback_fn, master_files, node_drbd, vg_name) @@ -1060,9 +1072,17 @@ class LUVerifyCluster(LogicalUnit): } # FIXME: devise a free space model for file based instances as well if vg_name is not None: + if (constants.NV_VGLIST not in nresult or + vg_name not in nresult[constants.NV_VGLIST]): + feedback_fn(" - ERROR: node %s didn't return data for the" + " volume group '%s' - it is either missing or broken" % + (node, vg_name)) + bad = True + continue node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name]) - except ValueError: - feedback_fn(" - ERROR: invalid value returned from node %s" % (node,)) + except (ValueError, KeyError): + feedback_fn(" - ERROR: invalid nodeinfo value returned" + " from node %s" % (node,)) bad = True continue @@ -1269,6 +1289,7 @@ class LUVerifyDisks(NoHooksLU): if isinstance(lvs, basestring): logging.warning("Error enumerating LVs on node %s: %s", node, lvs) res_nlvm[node] = lvs + continue elif not isinstance(lvs, dict): logging.warning("Connection to node %s failed or invalid data" " returned", node) @@ -1396,7 +1417,7 @@ class LUSetClusterParams(LogicalUnit): _OP_REQP = [] REQ_BGL = False - def CheckParameters(self): + def CheckArguments(self): """Check parameters """ @@ -1405,7 +1426,7 @@ class LUSetClusterParams(LogicalUnit): if self.op.candidate_pool_size is not None: try: self.op.candidate_pool_size = int(self.op.candidate_pool_size) - except ValueError, err: + except (ValueError, TypeError), err: raise errors.OpPrereqError("Invalid candidate_pool_size value: %s" % str(err)) if self.op.candidate_pool_size < 1: @@ -1502,8 +1523,11 @@ class LUSetClusterParams(LogicalUnit): """ if self.op.vg_name is not None: - if self.op.vg_name != self.cfg.GetVGName(): - self.cfg.SetVGName(self.op.vg_name) + new_volume = self.op.vg_name + if not new_volume: + new_volume = None + if new_volume != self.cfg.GetVGName(): + self.cfg.SetVGName(new_volume) else: feedback_fn("Cluster LVM configuration already in desired" " state, not changing") @@ -1524,6 +1548,45 @@ class LUSetClusterParams(LogicalUnit): _AdjustCandidatePool(self) +def _RedistributeAncillaryFiles(lu, additional_nodes=None): + """Distribute additional files which are part of the cluster configuration. + + ConfigWriter takes care of distributing the config and ssconf files, but + there are more files which should be distributed to all nodes. This function + makes sure those are copied. + + @param lu: calling logical unit + @param additional_nodes: list of nodes not in the config to distribute to + + """ + # 1. Gather target nodes + myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode()) + dist_nodes = lu.cfg.GetNodeList() + if additional_nodes is not None: + dist_nodes.extend(additional_nodes) + if myself.name in dist_nodes: + dist_nodes.remove(myself.name) + # 2. Gather files to distribute + dist_files = set([constants.ETC_HOSTS, + constants.SSH_KNOWN_HOSTS_FILE, + constants.RAPI_CERT_FILE, + constants.RAPI_USERS_FILE, + ]) + + enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors + for hv_name in enabled_hypervisors: + hv_class = hypervisor.GetHypervisor(hv_name) + dist_files.update(hv_class.GetAncillaryFiles()) + + # 3. Perform the files upload + for fname in dist_files: + if os.path.exists(fname): + result = lu.rpc.call_upload_file(dist_nodes, fname) + for to_node, to_result in result.items(): + if to_result.failed or not to_result.data: + logging.error("Copy of file %s to node %s failed", fname, to_node) + + class LURedistributeConfig(NoHooksLU): """Force the redistribution of cluster configuration. @@ -1549,6 +1612,7 @@ class LURedistributeConfig(NoHooksLU): """ self.cfg.Update(self.cfg.GetClusterInfo()) + _RedistributeAncillaryFiles(self) def _WaitForSync(lu, instance, oneshot=False, unlock=False): @@ -1660,9 +1724,11 @@ class LUDiagnoseOS(NoHooksLU): selected=self.op.output_fields) # Lock all nodes, in shared mode + # Temporary removal of locks, should be reverted later + # TODO: reintroduce locks when they are lighter-weight self.needed_locks = {} - self.share_locks[locking.LEVEL_NODE] = 1 - self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + #self.share_locks[locking.LEVEL_NODE] = 1 + #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET def CheckPrereq(self): """Check prerequisites. @@ -1686,6 +1752,11 @@ class LUDiagnoseOS(NoHooksLU): """ all_os = {} + # we build here the list of nodes that didn't fail the RPC (at RPC + # level), so that nodes with a non-responding node daemon don't + # make all OSes invalid + good_nodes = [node_name for node_name in rlist + if not rlist[node_name].failed] for node_name, nr in rlist.iteritems(): if nr.failed or not nr.data: continue @@ -1694,7 +1765,7 @@ class LUDiagnoseOS(NoHooksLU): # build a list of nodes for this os containing empty lists # for each node in node_list all_os[os_obj.name] = {} - for nname in node_list: + for nname in good_nodes: all_os[os_obj.name][nname] = [] all_os[os_obj.name][node_name].append(os_obj) return all_os @@ -1703,9 +1774,7 @@ class LUDiagnoseOS(NoHooksLU): """Compute the list of OSes. """ - node_list = self.acquired_locks[locking.LEVEL_NODE] - valid_nodes = [node for node in self.cfg.GetOnlineNodeList() - if node in node_list] + valid_nodes = [node for node in self.cfg.GetOnlineNodeList()] node_data = self.rpc.call_os_diagnose(valid_nodes) if node_data == False: raise errors.OpExecError("Can't gather the list of OSes") @@ -2219,35 +2288,11 @@ class LUAddNode(LogicalUnit): (verifier, result[verifier].data['nodelist'][failed])) raise errors.OpExecError("ssh/hostname verification failed.") - # Distribute updated /etc/hosts and known_hosts to all nodes, - # including the node just added - myself = self.cfg.GetNodeInfo(self.cfg.GetMasterNode()) - dist_nodes = self.cfg.GetNodeList() - if not self.op.readd: - dist_nodes.append(node) - if myself.name in dist_nodes: - dist_nodes.remove(myself.name) - - logging.debug("Copying hosts and known_hosts to all nodes") - for fname in (constants.ETC_HOSTS, constants.SSH_KNOWN_HOSTS_FILE): - result = self.rpc.call_upload_file(dist_nodes, fname) - for to_node, to_result in result.iteritems(): - if to_result.failed or not to_result.data: - logging.error("Copy of file %s to node %s failed", fname, to_node) - - to_copy = [] - enabled_hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors - if constants.HTS_COPY_VNC_PASSWORD.intersection(enabled_hypervisors): - to_copy.append(constants.VNC_PASSWORD_FILE) - - for fname in to_copy: - result = self.rpc.call_upload_file([node], fname) - if result[node].failed or not result[node]: - logging.error("Could not copy file %s to node %s", fname, node) - if self.op.readd: + _RedistributeAncillaryFiles(self) self.context.ReaddNode(new_node) else: + _RedistributeAncillaryFiles(self, additional_nodes=node) self.context.AddNode(new_node) @@ -2322,7 +2367,7 @@ class LUSetNodeParams(LogicalUnit): ((node.offline and not self.op.offline == False) or (node.drained and not self.op.drained == False))): raise errors.OpPrereqError("Node '%s' is offline or drained, can't set" - " to master_candidate") + " to master_candidate" % node.name) return @@ -2414,6 +2459,10 @@ class LUQueryClusterInfo(NoHooksLU): for hypervisor in cluster.enabled_hypervisors]), "beparams": cluster.beparams, "candidate_pool_size": cluster.candidate_pool_size, + "default_bridge": cluster.default_bridge, + "master_netdev": cluster.master_netdev, + "volume_group_name": cluster.volume_group_name, + "file_storage_dir": cluster.file_storage_dir, } return result @@ -2728,15 +2777,48 @@ class LUStartupInstance(LogicalUnit): assert self.instance is not None, \ "Cannot retrieve locked instance %s" % self.op.instance_name + # extra beparams + self.beparams = getattr(self.op, "beparams", {}) + if self.beparams: + if not isinstance(self.beparams, dict): + raise errors.OpPrereqError("Invalid beparams passed: %s, expected" + " dict" % (type(self.beparams), )) + # fill the beparams dict + utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES) + self.op.beparams = self.beparams + + # extra hvparams + self.hvparams = getattr(self.op, "hvparams", {}) + if self.hvparams: + if not isinstance(self.hvparams, dict): + raise errors.OpPrereqError("Invalid hvparams passed: %s, expected" + " dict" % (type(self.hvparams), )) + + # check hypervisor parameter syntax (locally) + cluster = self.cfg.GetClusterInfo() + utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES) + filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor], + instance.hvparams) + filled_hvp.update(self.hvparams) + hv_type = hypervisor.GetHypervisor(instance.hypervisor) + hv_type.CheckParameterSyntax(filled_hvp) + _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp) + self.op.hvparams = self.hvparams + _CheckNodeOnline(self, instance.primary_node) bep = self.cfg.GetClusterInfo().FillBE(instance) # check bridges existance _CheckInstanceBridgesExist(self, instance) - _CheckNodeFreeMemory(self, instance.primary_node, - "starting instance %s" % instance.name, - bep[constants.BE_MEMORY], instance.hypervisor) + remote_info = self.rpc.call_instance_info(instance.primary_node, + instance.name, + instance.hypervisor) + remote_info.Raise() + if not remote_info.data: + _CheckNodeFreeMemory(self, instance.primary_node, + "starting instance %s" % instance.name, + bep[constants.BE_MEMORY], instance.hypervisor) def Exec(self, feedback_fn): """Start the instance. @@ -2751,7 +2833,8 @@ class LUStartupInstance(LogicalUnit): _StartInstanceDisks(self, instance, force) - result = self.rpc.call_instance_start(node_current, instance) + result = self.rpc.call_instance_start(node_current, instance, + self.hvparams, self.beparams) msg = result.RemoteFailMsg() if msg: _ShutdownInstanceDisks(self, instance) @@ -2833,7 +2916,7 @@ class LURebootInstance(LogicalUnit): " full reboot: %s" % msg) _ShutdownInstanceDisks(self, instance) _StartInstanceDisks(self, instance, ignore_secondaries) - result = self.rpc.call_instance_start(node_current, instance) + result = self.rpc.call_instance_start(node_current, instance, None, None) msg = result.RemoteFailMsg() if msg: _ShutdownInstanceDisks(self, instance) @@ -2933,7 +3016,8 @@ class LUReinstallInstance(LogicalUnit): remote_info = self.rpc.call_instance_info(instance.primary_node, instance.name, instance.hypervisor) - if remote_info.failed or remote_info.data: + remote_info.Raise() + if remote_info.data: raise errors.OpPrereqError("Instance '%s' is running on the node %s" % (self.op.instance_name, instance.primary_node)) @@ -2968,7 +3052,7 @@ class LUReinstallInstance(LogicalUnit): _StartInstanceDisks(self, inst, None) try: feedback_fn("Running the instance OS create scripts...") - result = self.rpc.call_instance_os_add(inst.primary_node, inst) + result = self.rpc.call_instance_os_add(inst.primary_node, inst, True) msg = result.RemoteFailMsg() if msg: raise errors.OpExecError("Could not install OS for instance %s" @@ -3523,7 +3607,7 @@ class LUFailoverInstance(LogicalUnit): raise errors.OpExecError("Can't activate the instance's disks") feedback_fn("* starting the instance on the target node") - result = self.rpc.call_instance_start(target_node, instance) + result = self.rpc.call_instance_start(target_node, instance, None, None) msg = result.RemoteFailMsg() if msg: _ShutdownInstanceDisks(self, instance) @@ -4207,8 +4291,8 @@ def _CheckHVParams(lu, nodenames, hvname, hvparams): continue msg = info.RemoteFailMsg() if msg: - raise errors.OpPrereqError("Hypervisor parameter validation failed:" - " %s" % msg) + raise errors.OpPrereqError("Hypervisor parameter validation" + " failed on node %s: %s" % (node, msg)) class LUCreateInstance(LogicalUnit): @@ -4736,7 +4820,7 @@ class LUCreateInstance(LogicalUnit): if iobj.disk_template != constants.DT_DISKLESS: if self.op.mode == constants.INSTANCE_CREATE: feedback_fn("* running the instance OS create scripts...") - result = self.rpc.call_instance_os_add(pnode_name, iobj) + result = self.rpc.call_instance_os_add(pnode_name, iobj, False) msg = result.RemoteFailMsg() if msg: raise errors.OpExecError("Could not add os for instance %s" @@ -4767,7 +4851,7 @@ class LUCreateInstance(LogicalUnit): self.cfg.Update(iobj) logging.info("Starting instance %s on node %s", instance, pnode_name) feedback_fn("* starting instance...") - result = self.rpc.call_instance_start(pnode_name, iobj) + result = self.rpc.call_instance_start(pnode_name, iobj, None, None) msg = result.RemoteFailMsg() if msg: raise errors.OpExecError("Could not start instance: %s" % msg) @@ -5292,7 +5376,7 @@ class LUReplaceDisks(LogicalUnit): try: _CreateSingleBlockDev(self, new_node, instance, new_drbd, _GetInstanceInfoText(instance), False) - except errors.BlockDeviceError: + except errors.GenericError: self.cfg.ReleaseDRBDMinors(instance.name) raise @@ -5871,7 +5955,7 @@ class LUSetInstanceParams(LogicalUnit): self.warn.append("Can't get info from primary node %s" % pnode) else: if not instance_info.failed and instance_info.data: - current_mem = instance_info.data['memory'] + current_mem = int(instance_info.data['memory']) else: # Assume instance not running # (there is a slight race condition here, but it's not very probable, @@ -6215,7 +6299,7 @@ class LUExportInstance(LogicalUnit): finally: if self.op.shutdown and instance.admin_up: - result = self.rpc.call_instance_start(src_node, instance) + result = self.rpc.call_instance_start(src_node, instance, None, None) msg = result.RemoteFailMsg() if msg: _ShutdownInstanceDisks(self, instance) @@ -6684,6 +6768,8 @@ class IAllocator(object): "disk_template": iinfo.disk_template, "hypervisor": iinfo.hypervisor, } + pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template, + pir["disks"]) instance_data[iinfo.name] = pir data["instances"] = instance_data