X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/df28c49bb51aa48921638b07a40727a084e2ab48..8e09e80131f6b14dfb3d6a65c49dbb92dbebe2aa:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 2068882..fbc63e1 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -60,6 +60,7 @@ from ganeti import ht from ganeti import rpc from ganeti import runtime from ganeti import pathutils +from ganeti import vcluster from ganeti.masterd import iallocator import ganeti.masterd.instance # pylint: disable=W0611 @@ -690,6 +691,18 @@ def _SupportsOob(cfg, node): return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM] +def _CopyLockList(names): + """Makes a copy of a list of lock names. + + Handles L{locking.ALL_SET} correctly. + + """ + if names == locking.ALL_SET: + return locking.ALL_SET + else: + return names[:] + + def _GetWantedNodes(lu, nodes): """Returns list of checked and expanded node names. @@ -1610,8 +1623,9 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): cluster-wide iallocator if appropriate. Check that at most one of (iallocator, node) is specified. If none is - specified, then the LU's opcode's iallocator slot is filled with the - cluster-wide default iallocator. + specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT}, + then the LU's opcode's iallocator slot is filled with the cluster-wide + default iallocator. @type iallocator_slot: string @param iallocator_slot: the name of the opcode iallocator slot @@ -1621,11 +1635,14 @@ def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot): """ node = getattr(lu.op, node_slot, None) ialloc = getattr(lu.op, iallocator_slot, None) + if node == []: + node = None if node is not None and ialloc is not None: raise errors.OpPrereqError("Do not specify both, iallocator and node", errors.ECODE_INVAL) - elif node is None and ialloc is None: + elif ((node is None and ialloc is None) or + ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT): default_iallocator = lu.cfg.GetDefaultIAllocator() if default_iallocator: setattr(lu.op, iallocator_slot, default_iallocator) @@ -1660,6 +1677,27 @@ def _GetDefaultIAllocator(cfg, ialloc): return ialloc +def _CheckHostnameSane(lu, name): + """Ensures that a given hostname resolves to a 'sane' name. + + The given name is required to be a prefix of the resolved hostname, + to prevent accidental mismatches. + + @param lu: the logical unit on behalf of which we're checking + @param name: the name we should resolve and check + @return: the resolved hostname object + + """ + hostname = netutils.GetHostname(name=name) + if hostname.name != name: + lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name) + if not utils.MatchNameComponent(name, [hostname.name]): + raise errors.OpPrereqError(("Resolved hostname '%s' does not look the" + " same as given hostname '%s'") % + (hostname.name, name), errors.ECODE_INVAL) + return hostname + + class LUClusterPostInit(LogicalUnit): """Logical unit for running hooks after cluster initialization. @@ -2549,7 +2587,10 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): if nresult.fail_msg or not nresult.payload: node_files = None else: - node_files = nresult.payload.get(constants.NV_FILELIST, None) + fingerprints = nresult.payload.get(constants.NV_FILELIST, None) + node_files = dict((vcluster.LocalizeVirtualPath(key), value) + for (key, value) in fingerprints.items()) + del fingerprints test = not (node_files and isinstance(node_files, dict)) errorif(test, constants.CV_ENODEFILECHECK, node.name, @@ -2764,6 +2805,37 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): "OSes present on reference node %s but missing on this node: %s", base.name, utils.CommaJoin(missing)) + def _VerifyFileStoragePaths(self, ninfo, nresult, is_master): + """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}. + + @type ninfo: L{objects.Node} + @param ninfo: the node to check + @param nresult: the remote results for the node + @type is_master: bool + @param is_master: Whether node is the master node + + """ + node = ninfo.name + + if (is_master and + (constants.ENABLE_FILE_STORAGE or + constants.ENABLE_SHARED_FILE_STORAGE)): + try: + fspaths = nresult[constants.NV_FILE_STORAGE_PATHS] + except KeyError: + # This should never happen + self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node, + "Node did not return forbidden file storage paths") + else: + self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node, + "Found forbidden file storage paths: %s", + utils.CommaJoin(fspaths)) + else: + self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult, + constants.CV_ENODEFILESTORAGEPATHS, node, + "Node should not have returned forbidden file storage" + " paths") + def _VerifyOob(self, ninfo, nresult): """Verifies out of band functionality of a node. @@ -3073,9 +3145,10 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node_verify_param = { constants.NV_FILELIST: - utils.UniqueSequence(filename - for files in filemap - for filename in files), + map(vcluster.MakeVirtualPath, + utils.UniqueSequence(filename + for files in filemap + for filename in files)), constants.NV_NODELIST: self._SelectSshCheckNodes(node_data_list, self.group_uuid, self.all_node_info.values()), @@ -3100,11 +3173,15 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): node_verify_param[constants.NV_VGLIST] = None node_verify_param[constants.NV_LVLIST] = vg_name node_verify_param[constants.NV_PVLIST] = [vg_name] - node_verify_param[constants.NV_DRBDLIST] = None if drbd_helper: + node_verify_param[constants.NV_DRBDLIST] = None node_verify_param[constants.NV_DRBDHELPER] = drbd_helper + if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE: + # Load file storage paths only from master node + node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node + # bridge checks # FIXME: this needs to be changed per node-group, not cluster-wide bridges = set() @@ -3258,6 +3335,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): self._VerifyNodeNetwork(node_i, nresult) self._VerifyNodeUserScripts(node_i, nresult) self._VerifyOob(node_i, nresult) + self._VerifyFileStoragePaths(node_i, nresult, + node == master_node) if nimg.vm_capable: self._VerifyNodeLVM(node_i, nresult, vg_name) @@ -3313,11 +3392,8 @@ class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors): inst_config.primary_node) # If the instance is non-redundant we cannot survive losing its primary - # node, so we are not N+1 compliant. On the other hand we have no disk - # templates with more than one secondary so that situation is not well - # supported either. - # FIXME: does not support file-backed instances - if not inst_config.secondary_nodes: + # node, so we are not N+1 compliant. + if inst_config.disk_template not in constants.DTS_MIRRORED: i_non_redundant.append(instance) _ErrorIf(len(inst_config.secondary_nodes) > 1, @@ -4287,15 +4363,15 @@ def _ComputeAncillaryFiles(cluster, redist): pathutils.RAPI_USERS_FILE, ]) - if not redist: - files_all.update(pathutils.ALL_CERT_FILES) - files_all.update(ssconf.SimpleStore().GetFileList()) - else: + if redist: # we need to ship at least the RAPI certificate files_all.add(pathutils.RAPI_CERT_FILE) + else: + files_all.update(pathutils.ALL_CERT_FILES) + files_all.update(ssconf.SimpleStore().GetFileList()) if cluster.modify_etc_hosts: - files_all.add(constants.ETC_HOSTS) + files_all.add(pathutils.ETC_HOSTS) if cluster.use_external_mip_script: files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT) @@ -4313,6 +4389,12 @@ def _ComputeAncillaryFiles(cluster, redist): if not redist: files_mc.add(pathutils.CLUSTER_CONF_FILE) + # File storage + if (not redist and + (constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE)): + files_all.add(pathutils.FILE_STORAGE_PATHS_FILE) + files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE) + # Files which should only be on VM-capable nodes files_vm = set( filename @@ -4334,6 +4416,10 @@ def _ComputeAncillaryFiles(cluster, redist): assert all_files_set.issuperset(files_opt), \ "Optional file not in a different required list" + # This one file should never ever be re-distributed via RPC + assert not (redist and + pathutils.FILE_STORAGE_PATHS_FILE in all_files_set) + return (files_all, files_opt, files_mc, files_vm) @@ -7137,9 +7223,10 @@ class LUInstanceRecreateDisks(LogicalUnit): " once: %s" % utils.CommaJoin(duplicates), errors.ECODE_INVAL) - if self.op.iallocator and self.op.nodes: - raise errors.OpPrereqError("Give either the iallocator or the new" - " nodes, not both", errors.ECODE_INVAL) + # We don't want _CheckIAllocatorOrNode selecting the default iallocator + # when neither iallocator nor nodes are specified + if self.op.iallocator or self.op.nodes: + _CheckIAllocatorOrNode(self, "iallocator", "nodes") for (idx, params) in self.op.disks: utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES) @@ -7195,7 +7282,7 @@ class LUInstanceRecreateDisks(LogicalUnit): elif level == locking.LEVEL_NODE_RES: # Copy node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE][:] + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) def BuildHooksEnv(self): """Build hooks env. @@ -7277,10 +7364,9 @@ class LUInstanceRecreateDisks(LogicalUnit): if self.op.iallocator: self._RunAllocator() - - # Release unneeded node and node resource locks - _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes) - _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes) + # Release unneeded node and node resource locks + _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes) + _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes) def Exec(self, feedback_fn): """Recreate the disks. @@ -7338,6 +7424,9 @@ class LUInstanceRecreateDisks(LogicalUnit): if self.op.nodes: self.cfg.Update(instance, feedback_fn) + # All touched nodes must be locked + mylocks = self.owned_locks(locking.LEVEL_NODE) + assert mylocks.issuperset(frozenset(instance.all_nodes)) _CreateDisks(self, instance, to_skip=to_skip) @@ -7391,15 +7480,7 @@ class LUInstanceRename(LogicalUnit): new_name = self.op.new_name if self.op.name_check: - hostname = netutils.GetHostname(name=new_name) - if hostname.name != new_name: - self.LogInfo("Resolved given name '%s' to '%s'", new_name, - hostname.name) - if not utils.MatchNameComponent(self.op.new_name, [hostname.name]): - raise errors.OpPrereqError(("Resolved hostname '%s' does not look the" - " same as given hostname '%s'") % - (hostname.name, self.op.new_name), - errors.ECODE_INVAL) + hostname = _CheckHostnameSane(self, new_name) new_name = self.op.new_name = hostname.name if (self.op.ip_check and netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)): @@ -7446,6 +7527,15 @@ class LUInstanceRename(LogicalUnit): new_file_storage_dir)) _StartInstanceDisks(self, inst, None) + # update info on disks + info = _GetInstanceInfoText(inst) + for (idx, disk) in enumerate(inst.disks): + for node in inst.all_nodes: + self.cfg.SetDiskID(disk, node) + result = self.rpc.call_blockdev_setinfo(node, disk, info) + if result.fail_msg: + self.LogWarning("Error setting info on node %s for disk %s: %s", + node, idx, result.fail_msg) try: result = self.rpc.call_instance_run_rename(inst.primary_node, inst, old_name, self.op.debug_level) @@ -7481,7 +7571,7 @@ class LUInstanceRemove(LogicalUnit): elif level == locking.LEVEL_NODE_RES: # Copy node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE][:] + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) def BuildHooksEnv(self): """Build hooks env. @@ -7634,7 +7724,7 @@ class LUInstanceFailover(LogicalUnit): elif level == locking.LEVEL_NODE_RES: # Copy node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE][:] + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) def BuildHooksEnv(self): """Build hooks env. @@ -7718,7 +7808,7 @@ class LUInstanceMigrate(LogicalUnit): elif level == locking.LEVEL_NODE_RES: # Copy node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE][:] + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) def BuildHooksEnv(self): """Build hooks env. @@ -7777,7 +7867,7 @@ class LUInstanceMove(LogicalUnit): elif level == locking.LEVEL_NODE_RES: # Copy node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE][:] + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) def BuildHooksEnv(self): """Build hooks env. @@ -8970,7 +9060,7 @@ def _CalcEta(time_taken, written, total_size): return (total_size - written) * avg_time -def _WipeDisks(lu, instance): +def _WipeDisks(lu, instance, disks=None): """Wipes instance disks. @type lu: L{LogicalUnit} @@ -8982,72 +9072,85 @@ def _WipeDisks(lu, instance): """ node = instance.primary_node - for device in instance.disks: + if disks is None: + disks = [(idx, disk, 0) + for (idx, disk) in enumerate(instance.disks)] + + for (_, device, _) in disks: lu.cfg.SetDiskID(device, node) - logging.info("Pause sync of instance %s disks", instance.name) + logging.info("Pausing synchronization of disks of instance '%s'", + instance.name) result = lu.rpc.call_blockdev_pause_resume_sync(node, - (instance.disks, instance), + (map(compat.snd, disks), + instance), True) - result.Raise("Failed RPC to node %s for pausing the disk syncing" % node) + result.Raise("Failed to pause disk synchronization on node '%s'" % node) for idx, success in enumerate(result.payload): if not success: - logging.warn("pause-sync of instance %s for disks %d failed", - instance.name, idx) + logging.warn("Pausing synchronization of disk %s of instance '%s'" + " failed", idx, instance.name) try: - for idx, device in enumerate(instance.disks): + for (idx, device, offset) in disks: # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but - # MAX_WIPE_CHUNK at max - wipe_chunk_size = min(constants.MAX_WIPE_CHUNK, device.size / 100.0 * - constants.MIN_WIPE_CHUNK_PERCENT) - # we _must_ make this an int, otherwise rounding errors will - # occur - wipe_chunk_size = int(wipe_chunk_size) - - lu.LogInfo("* Wiping disk %d", idx) - logging.info("Wiping disk %d for instance %s, node %s using" - " chunk size %s", idx, instance.name, node, wipe_chunk_size) + # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors. + wipe_chunk_size = \ + int(min(constants.MAX_WIPE_CHUNK, + device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT)) - offset = 0 size = device.size last_output = 0 start_time = time.time() + if offset == 0: + info_text = "" + else: + info_text = (" (from %s to %s)" % + (utils.FormatUnit(offset, "h"), + utils.FormatUnit(size, "h"))) + + lu.LogInfo("* Wiping disk %s%s", idx, info_text) + + logging.info("Wiping disk %d for instance %s on node %s using" + " chunk size %s", idx, instance.name, node, wipe_chunk_size) + while offset < size: wipe_size = min(wipe_chunk_size, size - offset) + logging.debug("Wiping disk %d, offset %s, chunk %s", idx, offset, wipe_size) + result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset, wipe_size) result.Raise("Could not wipe disk %d at offset %d for size %d" % (idx, offset, wipe_size)) + now = time.time() offset += wipe_size if now - last_output >= 60: eta = _CalcEta(now - start_time, offset, size) - lu.LogInfo(" - done: %.1f%% ETA: %s" % - (offset / float(size) * 100, utils.FormatSeconds(eta))) + lu.LogInfo(" - done: %.1f%% ETA: %s", + offset / float(size) * 100, utils.FormatSeconds(eta)) last_output = now finally: - logging.info("Resume sync of instance %s disks", instance.name) + logging.info("Resuming synchronization of disks for instance '%s'", + instance.name) result = lu.rpc.call_blockdev_pause_resume_sync(node, - (instance.disks, instance), + (map(compat.snd, disks), + instance), False) if result.fail_msg: - lu.LogWarning("RPC call to %s for resuming disk syncing failed," - " please have a look at the status and troubleshoot" - " the issue: %s", node, result.fail_msg) + lu.LogWarning("Failed to resume disk synchronization on node '%s': %s", + node, result.fail_msg) else: for idx, success in enumerate(result.payload): if not success: - lu.LogWarning("Resume sync of disk %d failed, please have a" - " look at the status and troubleshoot the issue", idx) - logging.warn("resume-sync of instance %s for disks %d failed", - instance.name, idx) + lu.LogWarning("Resuming synchronization of disk %s of instance '%s'" + " failed", idx, instance.name) def _CreateDisks(lu, instance, to_skip=None, target_node=None): @@ -9139,7 +9242,7 @@ def _RemoveDisks(lu, instance, target_node=None, ignore_failures=False): for port in ports_to_release: lu.cfg.AddTcpUdpPort(port) - if instance.disk_template == constants.DT_FILE: + if instance.disk_template in constants.DTS_FILEBASED: file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1]) if target_node: tgt = target_node @@ -9261,6 +9364,163 @@ def _CheckOSParams(lu, required, nodenames, osname, osparams): osname, node) +def _CreateInstanceAllocRequest(op, disks, nics, beparams): + """Wrapper around IAReqInstanceAlloc. + + @param op: The instance opcode + @param disks: The computed disks + @param nics: The computed nics + @param beparams: The full filled beparams + + @returns: A filled L{iallocator.IAReqInstanceAlloc} + + """ + spindle_use = beparams[constants.BE_SPINDLE_USE] + return iallocator.IAReqInstanceAlloc(name=op.instance_name, + disk_template=op.disk_template, + tags=op.tags, + os=op.os_type, + vcpus=beparams[constants.BE_VCPUS], + memory=beparams[constants.BE_MAXMEM], + spindle_use=spindle_use, + disks=disks, + nics=[n.ToDict() for n in nics], + hypervisor=op.hypervisor) + + +def _ComputeNics(op, cluster, default_ip, cfg, proc): + """Computes the nics. + + @param op: The instance opcode + @param cluster: Cluster configuration object + @param default_ip: The default ip to assign + @param cfg: An instance of the configuration object + @param proc: The executer instance + + @returns: The build up nics + + """ + nics = [] + for idx, nic in enumerate(op.nics): + nic_mode_req = nic.get(constants.INIC_MODE, None) + nic_mode = nic_mode_req + if nic_mode is None or nic_mode == constants.VALUE_AUTO: + nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE] + + # in routed mode, for the first nic, the default ip is 'auto' + if nic_mode == constants.NIC_MODE_ROUTED and idx == 0: + default_ip_mode = constants.VALUE_AUTO + else: + default_ip_mode = constants.VALUE_NONE + + # ip validity checks + ip = nic.get(constants.INIC_IP, default_ip_mode) + if ip is None or ip.lower() == constants.VALUE_NONE: + nic_ip = None + elif ip.lower() == constants.VALUE_AUTO: + if not op.name_check: + raise errors.OpPrereqError("IP address set to auto but name checks" + " have been skipped", + errors.ECODE_INVAL) + nic_ip = default_ip + else: + if not netutils.IPAddress.IsValid(ip): + raise errors.OpPrereqError("Invalid IP address '%s'" % ip, + errors.ECODE_INVAL) + nic_ip = ip + + # TODO: check the ip address for uniqueness + if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip: + raise errors.OpPrereqError("Routed nic mode requires an ip address", + errors.ECODE_INVAL) + + # MAC address verification + mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO) + if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): + mac = utils.NormalizeAndValidateMac(mac) + + try: + # TODO: We need to factor this out + cfg.ReserveMAC(mac, proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("MAC address %s already in use" + " in cluster" % mac, + errors.ECODE_NOTUNIQUE) + + # Build nic parameters + link = nic.get(constants.INIC_LINK, None) + if link == constants.VALUE_AUTO: + link = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_LINK] + nicparams = {} + if nic_mode_req: + nicparams[constants.NIC_MODE] = nic_mode + if link: + nicparams[constants.NIC_LINK] = link + + check_params = cluster.SimpleFillNIC(nicparams) + objects.NIC.CheckParameterSyntax(check_params) + nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams)) + + return nics + + +def _ComputeDisks(op, default_vg): + """Computes the instance disks. + + @param op: The instance opcode + @param default_vg: The default_vg to assume + + @return: The computer disks + + """ + disks = [] + for disk in op.disks: + mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR) + if mode not in constants.DISK_ACCESS_SET: + raise errors.OpPrereqError("Invalid disk access mode '%s'" % + mode, errors.ECODE_INVAL) + size = disk.get(constants.IDISK_SIZE, None) + if size is None: + raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) + try: + size = int(size) + except (TypeError, ValueError): + raise errors.OpPrereqError("Invalid disk size '%s'" % size, + errors.ECODE_INVAL) + + data_vg = disk.get(constants.IDISK_VG, default_vg) + new_disk = { + constants.IDISK_SIZE: size, + constants.IDISK_MODE: mode, + constants.IDISK_VG: data_vg, + } + if constants.IDISK_METAVG in disk: + new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG] + if constants.IDISK_ADOPT in disk: + new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT] + disks.append(new_disk) + + return disks + + +def _ComputeFullBeParams(op, cluster): + """Computes the full beparams. + + @param op: The instance opcode + @param cluster: The cluster config object + + @return: The fully filled beparams + + """ + default_beparams = cluster.beparams[constants.PP_DEFAULT] + for param, value in op.beparams.iteritems(): + if value == constants.VALUE_AUTO: + op.beparams[param] = default_beparams[param] + objects.UpgradeBeParams(op.beparams) + utils.ForceDictType(op.beparams, constants.BES_PARAMETER_TYPES) + return cluster.SimpleFillBE(op.beparams) + + class LUInstanceCreate(LogicalUnit): """Create an instance. @@ -9325,7 +9585,7 @@ class LUInstanceCreate(LogicalUnit): # instance name verification if self.op.name_check: - self.hostname1 = netutils.GetHostname(name=self.op.instance_name) + self.hostname1 = _CheckHostnameSane(self, self.op.instance_name) self.op.instance_name = self.hostname1.name # used in CheckPrereq for ip ping check self.check_ip = self.hostname1.ip @@ -9485,19 +9745,8 @@ class LUInstanceCreate(LogicalUnit): """Run the allocator based on input opcode. """ - nics = [n.ToDict() for n in self.nics] - memory = self.be_full[constants.BE_MAXMEM] - spindle_use = self.be_full[constants.BE_SPINDLE_USE] - req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name, - disk_template=self.op.disk_template, - tags=self.op.tags, - os=self.op.os_type, - vcpus=self.be_full[constants.BE_VCPUS], - memory=memory, - spindle_use=spindle_use, - disks=self.disks, - nics=nics, - hypervisor=self.op.hypervisor) + req = _CreateInstanceAllocRequest(self.op, self.disks, + self.nics, self.be_full) ial = iallocator.IAllocator(self.cfg, self.rpc, req) ial.Run(self.op.iallocator) @@ -9795,13 +10044,7 @@ class LUInstanceCreate(LogicalUnit): _CheckGlobalHvParams(self.op.hvparams) # fill and remember the beparams dict - default_beparams = cluster.beparams[constants.PP_DEFAULT] - for param, value in self.op.beparams.iteritems(): - if value == constants.VALUE_AUTO: - self.op.beparams[param] = default_beparams[param] - objects.UpgradeBeParams(self.op.beparams) - utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) - self.be_full = cluster.SimpleFillBE(self.op.beparams) + self.be_full = _ComputeFullBeParams(self.op, cluster) # build os parameters self.os_full = cluster.SimpleFillOS(self.op.os_type, self.op.osparams) @@ -9812,94 +10055,12 @@ class LUInstanceCreate(LogicalUnit): self._RevertToDefaults(cluster) # NIC buildup - self.nics = [] - for idx, nic in enumerate(self.op.nics): - nic_mode_req = nic.get(constants.INIC_MODE, None) - nic_mode = nic_mode_req - if nic_mode is None or nic_mode == constants.VALUE_AUTO: - nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE] - - # in routed mode, for the first nic, the default ip is 'auto' - if nic_mode == constants.NIC_MODE_ROUTED and idx == 0: - default_ip_mode = constants.VALUE_AUTO - else: - default_ip_mode = constants.VALUE_NONE - - # ip validity checks - ip = nic.get(constants.INIC_IP, default_ip_mode) - if ip is None or ip.lower() == constants.VALUE_NONE: - nic_ip = None - elif ip.lower() == constants.VALUE_AUTO: - if not self.op.name_check: - raise errors.OpPrereqError("IP address set to auto but name checks" - " have been skipped", - errors.ECODE_INVAL) - nic_ip = self.hostname1.ip - else: - if not netutils.IPAddress.IsValid(ip): - raise errors.OpPrereqError("Invalid IP address '%s'" % ip, - errors.ECODE_INVAL) - nic_ip = ip - - # TODO: check the ip address for uniqueness - if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip: - raise errors.OpPrereqError("Routed nic mode requires an ip address", - errors.ECODE_INVAL) - - # MAC address verification - mac = nic.get(constants.INIC_MAC, constants.VALUE_AUTO) - if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - mac = utils.NormalizeAndValidateMac(mac) - - try: - self.cfg.ReserveMAC(mac, self.proc.GetECId()) - except errors.ReservationError: - raise errors.OpPrereqError("MAC address %s already in use" - " in cluster" % mac, - errors.ECODE_NOTUNIQUE) - - # Build nic parameters - link = nic.get(constants.INIC_LINK, None) - if link == constants.VALUE_AUTO: - link = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_LINK] - nicparams = {} - if nic_mode_req: - nicparams[constants.NIC_MODE] = nic_mode - if link: - nicparams[constants.NIC_LINK] = link - - check_params = cluster.SimpleFillNIC(nicparams) - objects.NIC.CheckParameterSyntax(check_params) - self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams)) + self.nics = _ComputeNics(self.op, cluster, self.hostname1.ip, self.cfg, + self.proc) # disk checks/pre-build default_vg = self.cfg.GetVGName() - self.disks = [] - for disk in self.op.disks: - mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR) - if mode not in constants.DISK_ACCESS_SET: - raise errors.OpPrereqError("Invalid disk access mode '%s'" % - mode, errors.ECODE_INVAL) - size = disk.get(constants.IDISK_SIZE, None) - if size is None: - raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) - try: - size = int(size) - except (TypeError, ValueError): - raise errors.OpPrereqError("Invalid disk size '%s'" % size, - errors.ECODE_INVAL) - - data_vg = disk.get(constants.IDISK_VG, default_vg) - new_disk = { - constants.IDISK_SIZE: size, - constants.IDISK_MODE: mode, - constants.IDISK_VG: data_vg, - } - if constants.IDISK_METAVG in disk: - new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG] - if constants.IDISK_ADOPT in disk: - new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT] - self.disks.append(new_disk) + self.disks = _ComputeDisks(self.op, default_vg) if self.op.mode == constants.INSTANCE_IMPORT: disk_images = [] @@ -10092,6 +10253,27 @@ class LUInstanceCreate(LogicalUnit): dsk[constants.IDISK_SIZE] = \ int(float(node_disks[dsk[constants.IDISK_ADOPT]])) + # Verify instance specs + spindle_use = self.be_full.get(constants.BE_SPINDLE_USE, None) + ispec = { + constants.ISPEC_MEM_SIZE: self.be_full.get(constants.BE_MAXMEM, None), + constants.ISPEC_CPU_COUNT: self.be_full.get(constants.BE_VCPUS, None), + constants.ISPEC_DISK_COUNT: len(self.disks), + constants.ISPEC_DISK_SIZE: [disk[constants.IDISK_SIZE] + for disk in self.disks], + constants.ISPEC_NIC_COUNT: len(self.nics), + constants.ISPEC_SPINDLE_USE: spindle_use, + } + + group_info = self.cfg.GetNodeGroup(pnode.group) + ipolicy = _CalculateGroupIPolicy(cluster, group_info) + res = _ComputeIPolicyInstanceSpecViolation(ipolicy, ispec) + if not self.op.ignore_ipolicy and res: + raise errors.OpPrereqError(("Instance allocation to group %s violates" + " policy: %s") % (pnode.group, + utils.CommaJoin(res)), + errors.ECODE_INVAL) + _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams) _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant) @@ -10351,6 +10533,137 @@ class LUInstanceCreate(LogicalUnit): return list(iobj.all_nodes) +class LUInstanceMultiAlloc(NoHooksLU): + """Allocates multiple instances at the same time. + + """ + REQ_BGL = False + + def CheckArguments(self): + """Check arguments. + + """ + nodes = [] + for inst in self.op.instances: + if inst.iallocator is not None: + raise errors.OpPrereqError("iallocator are not allowed to be set on" + " instance objects", errors.ECODE_INVAL) + nodes.append(bool(inst.pnode)) + if inst.disk_template in constants.DTS_INT_MIRROR: + nodes.append(bool(inst.snode)) + + has_nodes = compat.any(nodes) + if compat.all(nodes) ^ has_nodes: + raise errors.OpPrereqError("There are instance objects providing" + " pnode/snode while others do not", + errors.ECODE_INVAL) + + if self.op.iallocator is None: + default_iallocator = self.cfg.GetDefaultIAllocator() + if default_iallocator and has_nodes: + self.op.iallocator = default_iallocator + else: + raise errors.OpPrereqError("No iallocator or nodes on the instances" + " given and no cluster-wide default" + " iallocator found; please specify either" + " an iallocator or nodes on the instances" + " or set a cluster-wide default iallocator", + errors.ECODE_INVAL) + + dups = utils.FindDuplicates([op.instance_name for op in self.op.instances]) + if dups: + raise errors.OpPrereqError("There are duplicate instance names: %s" % + utils.CommaJoin(dups), errors.ECODE_INVAL) + + def ExpandNames(self): + """Calculate the locks. + + """ + self.share_locks = _ShareAll() + self.needed_locks = {} + + if self.op.iallocator: + self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET + self.needed_locks[locking.LEVEL_NODE_RES] = locking.ALL_SET + else: + nodeslist = [] + for inst in self.op.instances: + inst.pnode = _ExpandNodeName(self.cfg, inst.pnode) + nodeslist.append(inst.pnode) + if inst.snode is not None: + inst.snode = _ExpandNodeName(self.cfg, inst.snode) + nodeslist.append(inst.snode) + + self.needed_locks[locking.LEVEL_NODE] = nodeslist + # Lock resources of instance's primary and secondary nodes (copy to + # prevent accidential modification) + self.needed_locks[locking.LEVEL_NODE_RES] = list(nodeslist) + + def CheckPrereq(self): + """Check prerequisite. + + """ + cluster = self.cfg.GetClusterInfo() + default_vg = self.cfg.GetVGName() + insts = [_CreateInstanceAllocRequest(op, _ComputeDisks(op, default_vg), + _ComputeNics(op, cluster, None, + self.cfg, self.proc), + _ComputeFullBeParams(op, cluster)) + for op in self.op.instances] + req = iallocator.IAReqMultiInstanceAlloc(instances=insts) + ial = iallocator.IAllocator(self.cfg, self.rpc, req) + + ial.Run(self.op.iallocator) + + if not ial.success: + raise errors.OpPrereqError("Can't compute nodes using" + " iallocator '%s': %s" % + (self.op.iallocator, ial.info), + errors.ECODE_NORES) + + self.ia_result = ial.result + + if self.op.dry_run: + self.dry_run_rsult = objects.FillDict(self._ConstructPartialResult(), { + constants.JOB_IDS_KEY: [], + }) + + def _ConstructPartialResult(self): + """Contructs the partial result. + + """ + (allocatable, failed) = self.ia_result + return { + opcodes.OpInstanceMultiAlloc.ALLOCATABLE_KEY: + map(compat.fst, allocatable), + opcodes.OpInstanceMultiAlloc.FAILED_KEY: failed, + } + + def Exec(self, feedback_fn): + """Executes the opcode. + + """ + op2inst = dict((op.instance_name, op) for op in self.op.instances) + (allocatable, failed) = self.ia_result + + jobs = [] + for (name, nodes) in allocatable: + op = op2inst.pop(name) + + if len(nodes) > 1: + (op.pnode, op.snode) = nodes + else: + (op.pnode,) = nodes + + jobs.append([op]) + + missing = set(op2inst.keys()) - set(failed) + assert not missing, \ + "Iallocator did return incomplete result: %s" % utils.CommaJoin(missing) + + return ResultWithJobs(jobs, **self._ConstructPartialResult()) + + def _CheckRADOSFreeSpace(): """Compute disk size requirements inside the RADOS cluster. @@ -10440,8 +10753,24 @@ class LUInstanceReplaceDisks(LogicalUnit): REQ_BGL = False def CheckArguments(self): - TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node, - self.op.iallocator) + """Check arguments. + + """ + remote_node = self.op.remote_node + ialloc = self.op.iallocator + if self.op.mode == constants.REPLACE_DISK_CHG: + if remote_node is None and ialloc is None: + raise errors.OpPrereqError("When changing the secondary either an" + " iallocator script must be used or the" + " new node given", errors.ECODE_INVAL) + else: + _CheckIAllocatorOrNode(self, "iallocator", "remote_node") + + elif remote_node is not None or ialloc is not None: + # Not replacing the secondary + raise errors.OpPrereqError("The iallocator and new node options can" + " only be used when changing the" + " secondary node", errors.ECODE_INVAL) def ExpandNames(self): self._ExpandAndLockInstance() @@ -10583,28 +10912,6 @@ class TLReplaceDisks(Tasklet): self.node_secondary_ip = None @staticmethod - def CheckArguments(mode, remote_node, ialloc): - """Helper function for users of this class. - - """ - # check for valid parameter combination - if mode == constants.REPLACE_DISK_CHG: - if remote_node is None and ialloc is None: - raise errors.OpPrereqError("When changing the secondary either an" - " iallocator script must be used or the" - " new node given", errors.ECODE_INVAL) - - if remote_node is not None and ialloc is not None: - raise errors.OpPrereqError("Give either the iallocator or the new" - " secondary, not both", errors.ECODE_INVAL) - - elif remote_node is not None or ialloc is not None: - # Not replacing the secondary - raise errors.OpPrereqError("The iallocator and new node options can" - " only be used when changing the" - " secondary node", errors.ECODE_INVAL) - - @staticmethod def _RunAllocator(lu, iallocator_name, instance_name, relocate_from): """Compute a new secondary node using an IAllocator. @@ -10848,11 +11155,15 @@ class TLReplaceDisks(Tasklet): "Should not own any node group lock at this point" if not self.disks: - feedback_fn("No disks need replacement") + feedback_fn("No disks need replacement for instance '%s'" % + self.instance.name) return - feedback_fn("Replacing disk(s) %s for %s" % + feedback_fn("Replacing disk(s) %s for instance '%s'" % (utils.CommaJoin(self.disks), self.instance.name)) + feedback_fn("Current primary node: %s", self.instance.primary_node) + feedback_fn("Current seconary node: %s", + utils.CommaJoin(self.instance.secondary_nodes)) activate_disks = (self.instance.admin_state != constants.ADMINST_UP) @@ -11641,6 +11952,23 @@ def _LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes): for ops in jobs] +def _DiskSizeInBytesToMebibytes(lu, size): + """Converts a disk size in bytes to mebibytes. + + Warns and rounds up if the size isn't an even multiple of 1 MiB. + + """ + (mib, remainder) = divmod(size, 1024 * 1024) + + if remainder != 0: + lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up" + " to not overwrite existing data (%s bytes will not be" + " wiped)", (1024 * 1024) - remainder) + mib += 1 + + return mib + + class LUInstanceGrowDisk(LogicalUnit): """Grow a disk of an instance. @@ -11662,7 +11990,7 @@ class LUInstanceGrowDisk(LogicalUnit): elif level == locking.LEVEL_NODE_RES: # Copy node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE][:] + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) def BuildHooksEnv(self): """Build hooks env. @@ -11742,6 +12070,8 @@ class LUInstanceGrowDisk(LogicalUnit): assert (self.owned_locks(locking.LEVEL_NODE) == self.owned_locks(locking.LEVEL_NODE_RES)) + wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks + disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk]) if not disks_ok: raise errors.OpExecError("Cannot activate block device to grow") @@ -11756,7 +12086,27 @@ class LUInstanceGrowDisk(LogicalUnit): self.cfg.SetDiskID(disk, node) result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta, True, True) - result.Raise("Grow request failed to node %s" % node) + result.Raise("Dry-run grow request failed to node %s" % node) + + if wipe_disks: + # Get disk size from primary node for wiping + result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk]) + result.Raise("Failed to retrieve disk size from node '%s'" % + instance.primary_node) + + (disk_size_in_bytes, ) = result.payload + + if disk_size_in_bytes is None: + raise errors.OpExecError("Failed to retrieve disk size from primary" + " node '%s'" % instance.primary_node) + + old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes) + + assert old_disk_size >= disk.size, \ + ("Retrieved disk size too small (got %s, should be at least %s)" % + (old_disk_size, disk.size)) + else: + old_disk_size = None # We know that (as far as we can test) operations across different # nodes will succeed, time to run it for real on the backing storage @@ -11782,6 +12132,15 @@ class LUInstanceGrowDisk(LogicalUnit): # Downgrade lock while waiting for sync self.glm.downgrade(locking.LEVEL_INSTANCE) + assert wipe_disks ^ (old_disk_size is None) + + if wipe_disks: + assert instance.disks[self.op.disk] == disk + + # Wipe newly added disk space + _WipeDisks(self, instance, + disks=[(self.op.disk, disk, old_disk_size)]) + if self.op.wait_for_sync: disk_abort = not _WaitForSync(self, instance, disks=[disk]) if disk_abort: @@ -12326,7 +12685,7 @@ class LUInstanceSetParams(LogicalUnit): elif level == locking.LEVEL_NODE_RES and self.op.disk_template: # Copy node locks self.needed_locks[locking.LEVEL_NODE_RES] = \ - self.needed_locks[locking.LEVEL_NODE][:] + _CopyLockList(self.needed_locks[locking.LEVEL_NODE]) def BuildHooksEnv(self): """Build hooks env. @@ -12645,12 +13004,11 @@ class LUInstanceSetParams(LogicalUnit): self.be_proposed[constants.BE_MAXMEM]), errors.ECODE_INVAL) - if self.op.runtime_mem > current_memory: + delta = self.op.runtime_mem - current_memory + if delta > 0: _CheckNodeFreeMemory(self, instance.primary_node, "ballooning memory for instance %s" % - instance.name, - self.op.memory - current_memory, - instance.hypervisor) + instance.name, delta, instance.hypervisor) if self.op.disks and instance.disk_template == constants.DT_DISKLESS: raise errors.OpPrereqError("Disk operations not supported for"