X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/e7a25b0864e9ffa10b35e1d808bb092ec161e7e9..c1e7897de040fb86a71100684b64f46c8b08e2eb:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index cf07862..9f8b1ff 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -44,6 +44,11 @@ from ganeti import constants from ganeti import objects from ganeti import serializer from ganeti import ssconf +from ganeti import uidpool +from ganeti import compat +from ganeti import masterd + +import ganeti.masterd.instance # pylint: disable-msg=W0611 class LogicalUnit(object): @@ -560,6 +565,17 @@ def _CheckNodeHasOS(lu, node, os_name, force_variant): _CheckOSVariant(result.payload, os_name) +def _RequireFileStorage(): + """Checks that file storage is enabled. + + @raise errors.OpPrereqError: when file storage is disabled + + """ + if not constants.ENABLE_FILE_STORAGE: + raise errors.OpPrereqError("File storage disabled at configure time", + errors.ECODE_INVAL) + + def _CheckDiskTemplate(template): """Ensure a given disk template is valid. @@ -568,6 +584,20 @@ def _CheckDiskTemplate(template): msg = ("Invalid disk template name '%s', valid templates are: %s" % (template, utils.CommaJoin(constants.DISK_TEMPLATES))) raise errors.OpPrereqError(msg, errors.ECODE_INVAL) + if template == constants.DT_FILE: + _RequireFileStorage() + + +def _CheckStorageType(storage_type): + """Ensure a given storage type is valid. + + """ + if storage_type not in constants.VALID_STORAGE_TYPES: + raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, + errors.ECODE_INVAL) + if storage_type == constants.ST_FILE: + _RequireFileStorage() + def _CheckInstanceDown(lu, instance, reason): @@ -893,13 +923,6 @@ def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq): return faulty -def _FormatTimestamp(secs): - """Formats a Unix timestamp with the local timezone. - - """ - return time.strftime("%F %T %Z", time.gmtime(secs)) - - class LUPostInitCluster(LogicalUnit): """Logical unit for running hooks after cluster initialization. @@ -991,45 +1014,6 @@ class LUDestroyCluster(LogicalUnit): return master -def _VerifyCertificateInner(filename, expired, not_before, not_after, now, - warn_days=constants.SSL_CERT_EXPIRATION_WARN, - error_days=constants.SSL_CERT_EXPIRATION_ERROR): - """Verifies certificate details for LUVerifyCluster. - - """ - if expired: - msg = "Certificate %s is expired" % filename - - if not_before is not None and not_after is not None: - msg += (" (valid from %s to %s)" % - (_FormatTimestamp(not_before), - _FormatTimestamp(not_after))) - elif not_before is not None: - msg += " (valid from %s)" % _FormatTimestamp(not_before) - elif not_after is not None: - msg += " (valid until %s)" % _FormatTimestamp(not_after) - - return (LUVerifyCluster.ETYPE_ERROR, msg) - - elif not_before is not None and not_before > now: - return (LUVerifyCluster.ETYPE_WARNING, - "Certificate %s not yet valid (valid from %s)" % - (filename, _FormatTimestamp(not_before))) - - elif not_after is not None: - remaining_days = int((not_after - now) / (24 * 3600)) - - msg = ("Certificate %s expires in %d days" % (filename, remaining_days)) - - if remaining_days <= error_days: - return (LUVerifyCluster.ETYPE_ERROR, msg) - - if remaining_days <= warn_days: - return (LUVerifyCluster.ETYPE_WARNING, msg) - - return (None, None) - - def _VerifyCertificate(filename): """Verifies a certificate for LUVerifyCluster. @@ -1044,11 +1028,23 @@ def _VerifyCertificate(filename): return (LUVerifyCluster.ETYPE_ERROR, "Failed to load X509 certificate %s: %s" % (filename, err)) - # Depending on the pyOpenSSL version, this can just return (None, None) - (not_before, not_after) = utils.GetX509CertValidity(cert) + (errcode, msg) = \ + utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN, + constants.SSL_CERT_EXPIRATION_ERROR) + + if msg: + fnamemsg = "While verifying %s: %s" % (filename, msg) + else: + fnamemsg = None + + if errcode is None: + return (None, fnamemsg) + elif errcode == utils.CERT_WARNING: + return (LUVerifyCluster.ETYPE_WARNING, fnamemsg) + elif errcode == utils.CERT_ERROR: + return (LUVerifyCluster.ETYPE_ERROR, fnamemsg) - return _VerifyCertificateInner(filename, cert.has_expired(), - not_before, not_after, time.time()) + raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode) class LUVerifyCluster(LogicalUnit): @@ -1662,6 +1658,7 @@ class LUVerifyCluster(LogicalUnit): vg_name = self.cfg.GetVGName() hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors + cluster = self.cfg.GetClusterInfo() nodelist = utils.NiceSort(self.cfg.GetNodeList()) nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist] instancelist = utils.NiceSort(self.cfg.GetInstanceList()) @@ -1680,6 +1677,8 @@ class LUVerifyCluster(LogicalUnit): file_names = ssconf.SimpleStore().GetFileList() file_names.extend(constants.ALL_CERT_FILES) file_names.extend(master_files) + if cluster.modify_etc_hosts: + file_names.append(constants.ETC_HOSTS) local_checksums = utils.FingerprintFiles(file_names) @@ -1743,7 +1742,6 @@ class LUVerifyCluster(LogicalUnit): self.cfg.GetClusterName()) nvinfo_endtime = time.time() - cluster = self.cfg.GetClusterInfo() master_node = self.cfg.GetMasterNode() all_drbd_map = self.cfg.ComputeDRBDMap() @@ -2229,8 +2227,11 @@ class LUSetClusterParams(LogicalUnit): """Check parameters """ - if not hasattr(self.op, "candidate_pool_size"): - self.op.candidate_pool_size = None + for attr in ["candidate_pool_size", + "uid_pool", "add_uids", "remove_uids"]: + if not hasattr(self.op, attr): + setattr(self.op, attr, None) + if self.op.candidate_pool_size is not None: try: self.op.candidate_pool_size = int(self.op.candidate_pool_size) @@ -2240,8 +2241,18 @@ class LUSetClusterParams(LogicalUnit): if self.op.candidate_pool_size < 1: raise errors.OpPrereqError("At least one master candidate needed", errors.ECODE_INVAL) + _CheckBooleanOpField(self.op, "maintain_node_health") + if self.op.uid_pool: + uidpool.CheckUidPool(self.op.uid_pool) + + if self.op.add_uids: + uidpool.CheckUidPool(self.op.add_uids) + + if self.op.remove_uids: + uidpool.CheckUidPool(self.op.remove_uids) + def ExpandNames(self): # FIXME: in the future maybe other cluster params won't require checking on # all nodes to be modified. @@ -2333,7 +2344,7 @@ class LUSetClusterParams(LogicalUnit): "\n".join(nic_errors)) # hypervisor list/parameters - self.new_hvparams = objects.FillDict(cluster.hvparams, {}) + self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {}) if self.op.hvparams: if not isinstance(self.op.hvparams, dict): raise errors.OpPrereqError("Invalid 'hvparams' parameter on input", @@ -2363,6 +2374,7 @@ class LUSetClusterParams(LogicalUnit): else: self.new_os_hvp[os_name][hv_name].update(hv_dict) + # changes to the hypervisor list if self.op.enabled_hypervisors is not None: self.hv_list = self.op.enabled_hypervisors if not self.hv_list: @@ -2375,6 +2387,16 @@ class LUSetClusterParams(LogicalUnit): " entries: %s" % utils.CommaJoin(invalid_hvs), errors.ECODE_INVAL) + for hv in self.hv_list: + # if the hypervisor doesn't already exist in the cluster + # hvparams, we initialize it to empty, and then (in both + # cases) we make sure to fill the defaults, as we might not + # have a complete defaults list if the hypervisor wasn't + # enabled before + if hv not in new_hvp: + new_hvp[hv] = {} + new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv]) + utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES) else: self.hv_list = cluster.enabled_hypervisors @@ -2422,6 +2444,7 @@ class LUSetClusterParams(LogicalUnit): if self.op.os_hvp: self.cluster.os_hvp = self.new_os_hvp if self.op.enabled_hypervisors is not None: + self.cluster.hvparams = self.new_hvparams self.cluster.enabled_hypervisors = self.op.enabled_hypervisors if self.op.beparams: self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams @@ -2436,6 +2459,15 @@ class LUSetClusterParams(LogicalUnit): if self.op.maintain_node_health is not None: self.cluster.maintain_node_health = self.op.maintain_node_health + if self.op.add_uids is not None: + uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids) + + if self.op.remove_uids is not None: + uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids) + + if self.op.uid_pool is not None: + self.cluster.uid_pool = self.op.uid_pool + self.cfg.Update(self.cluster, feedback_fn) @@ -2820,6 +2852,12 @@ class LURemoveNode(LogicalUnit): self.LogWarning("Errors encountered on the remote node while leaving" " the cluster: %s", msg) + # Remove node from our /etc/hosts + if self.cfg.GetClusterInfo().modify_etc_hosts: + # FIXME: this should be done via an rpc call to node daemon + utils.RemoveHostFromEtcHosts(node.name) + _RedistributeAncillaryFiles(self) + class LUQueryNodes(NoHooksLU): """Logical unit for querying nodes. @@ -3076,17 +3114,14 @@ class LUQueryNodeStorage(NoHooksLU): REQ_BGL = False _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE) - def ExpandNames(self): - storage_type = self.op.storage_type - - if storage_type not in constants.VALID_STORAGE_TYPES: - raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, - errors.ECODE_INVAL) + def CheckArguments(self): + _CheckStorageType(self.op.storage_type) _CheckOutputFields(static=self._FIELDS_STATIC, dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS), selected=self.op.output_fields) + def ExpandNames(self): self.needed_locks = {} self.share_locks[locking.LEVEL_NODE] = 1 @@ -3175,10 +3210,7 @@ class LUModifyNodeStorage(NoHooksLU): def CheckArguments(self): self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name) - storage_type = self.op.storage_type - if storage_type not in constants.VALID_STORAGE_TYPES: - raise errors.OpPrereqError("Unknown storage type: %s" % storage_type, - errors.ECODE_INVAL) + _CheckStorageType(self.op.storage_type) def ExpandNames(self): self.needed_locks = { @@ -3279,15 +3311,19 @@ class LUAddNode(LogicalUnit): raise errors.OpPrereqError("Node %s is not in the configuration" % node, errors.ECODE_NOENT) + self.changed_primary_ip = False + for existing_node_name in node_list: existing_node = cfg.GetNodeInfo(existing_node_name) if self.op.readd and node == existing_node_name: - if (existing_node.primary_ip != primary_ip or - existing_node.secondary_ip != secondary_ip): + if existing_node.secondary_ip != secondary_ip: raise errors.OpPrereqError("Readded node doesn't have the same IP" " address configuration as before", errors.ECODE_INVAL) + if existing_node.primary_ip != primary_ip: + self.changed_primary_ip = True + continue if (existing_node.primary_ip == primary_ip or @@ -3359,6 +3395,8 @@ class LUAddNode(LogicalUnit): self.LogInfo("Readding a node, the offline/drained flags were reset") # if we demote the node, we do cleanup later in the procedure new_node.master_candidate = self.master_candidate + if self.changed_primary_ip: + new_node.primary_ip = self.op.primary_ip # notify the user about any possible mc promotion if new_node.master_candidate: @@ -3394,6 +3432,7 @@ class LUAddNode(LogicalUnit): # Add node to our /etc/hosts, and add key to known_hosts if self.cfg.GetClusterInfo().modify_etc_hosts: + # FIXME: this should be done via an rpc call to node daemon utils.AddHostToEtcHosts(new_node.name) if new_node.secondary_ip != new_node.primary_ip: @@ -3700,6 +3739,7 @@ class LUQueryClusterInfo(NoHooksLU): "mtime": cluster.mtime, "uuid": cluster.uuid, "tags": list(cluster.GetTags()), + "uid_pool": cluster.uid_pool, } return result @@ -5816,6 +5856,8 @@ def _GenerateDiskTemplate(lu, template_name, if len(secondary_nodes) != 0: raise errors.ProgrammerError("Wrong template configuration") + _RequireFileStorage() + for idx, disk in enumerate(disk_info): disk_index = idx + base_index disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"], @@ -5984,7 +6026,7 @@ class LUCreateInstance(LogicalUnit): """ HPATH = "instance-add" HTYPE = constants.HTYPE_INSTANCE - _OP_REQP = ["instance_name", "disks", "disk_template", + _OP_REQP = ["instance_name", "disks", "mode", "start", "wait_for_sync", "ip_check", "nics", "hvparams", "beparams"] @@ -5995,7 +6037,8 @@ class LUCreateInstance(LogicalUnit): """ # set optional parameters to none if they don't exist - for attr in ["pnode", "snode", "iallocator", "hypervisor"]: + for attr in ["pnode", "snode", "iallocator", "hypervisor", + "disk_template", "identify_defaults"]: if not hasattr(self.op, attr): setattr(self.op, attr, None) @@ -6014,10 +6057,6 @@ class LUCreateInstance(LogicalUnit): # TODO: make the ip check more flexible and not depend on the name check raise errors.OpPrereqError("Cannot do ip checks without a name check", errors.ECODE_INVAL) - if (self.op.disk_template == constants.DT_FILE and - not constants.ENABLE_FILE_STORAGE): - raise errors.OpPrereqError("File storage disabled at configure time", - errors.ECODE_INVAL) # check disk information: either all adopt, or no adopt has_adopt = has_no_adopt = False for disk in self.op.disks: @@ -6026,7 +6065,7 @@ class LUCreateInstance(LogicalUnit): else: has_no_adopt = True if has_adopt and has_no_adopt: - raise errors.OpPrereqError("Either all disks have are adoped or none is", + raise errors.OpPrereqError("Either all disks are adopted or none is", errors.ECODE_INVAL) if has_adopt: if self.op.disk_template != constants.DT_PLAIN: @@ -6042,162 +6081,21 @@ class LUCreateInstance(LogicalUnit): self.adopt_disks = has_adopt - def ExpandNames(self): - """ExpandNames for CreateInstance. - - Figure out the right locks for instance creation. - - """ - self.needed_locks = {} - - # cheap checks, mostly valid constants given - # verify creation mode if self.op.mode not in (constants.INSTANCE_CREATE, constants.INSTANCE_IMPORT): raise errors.OpPrereqError("Invalid instance creation mode '%s'" % self.op.mode, errors.ECODE_INVAL) - # disk template and mirror node verification - _CheckDiskTemplate(self.op.disk_template) - - if self.op.hypervisor is None: - self.op.hypervisor = self.cfg.GetHypervisorType() - - cluster = self.cfg.GetClusterInfo() - enabled_hvs = cluster.enabled_hypervisors - if self.op.hypervisor not in enabled_hvs: - raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the" - " cluster (%s)" % (self.op.hypervisor, - ",".join(enabled_hvs)), - errors.ECODE_STATE) - - # check hypervisor parameter syntax (locally) - utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) - filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor], - self.op.hvparams) - hv_type = hypervisor.GetHypervisor(self.op.hypervisor) - hv_type.CheckParameterSyntax(filled_hvp) - self.hv_full = filled_hvp - # check that we don't specify global parameters on an instance - _CheckGlobalHvParams(self.op.hvparams) - - # fill and remember the beparams dict - utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) - self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT], - self.op.beparams) - - #### instance parameters check - # instance name verification if self.op.name_check: - hostname1 = utils.GetHostInfo(self.op.instance_name) - self.op.instance_name = instance_name = hostname1.name + self.hostname1 = utils.GetHostInfo(self.op.instance_name) + self.op.instance_name = self.hostname1.name # used in CheckPrereq for ip ping check - self.check_ip = hostname1.ip + self.check_ip = self.hostname1.ip else: - instance_name = self.op.instance_name self.check_ip = None - # this is just a preventive check, but someone might still add this - # instance in the meantime, and creation will fail at lock-add time - if instance_name in self.cfg.GetInstanceList(): - raise errors.OpPrereqError("Instance '%s' is already in the cluster" % - instance_name, errors.ECODE_EXISTS) - - self.add_locks[locking.LEVEL_INSTANCE] = instance_name - - # NIC buildup - self.nics = [] - for idx, nic in enumerate(self.op.nics): - nic_mode_req = nic.get("mode", None) - nic_mode = nic_mode_req - if nic_mode is None: - nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE] - - # in routed mode, for the first nic, the default ip is 'auto' - if nic_mode == constants.NIC_MODE_ROUTED and idx == 0: - default_ip_mode = constants.VALUE_AUTO - else: - default_ip_mode = constants.VALUE_NONE - - # ip validity checks - ip = nic.get("ip", default_ip_mode) - if ip is None or ip.lower() == constants.VALUE_NONE: - nic_ip = None - elif ip.lower() == constants.VALUE_AUTO: - if not self.op.name_check: - raise errors.OpPrereqError("IP address set to auto but name checks" - " have been skipped. Aborting.", - errors.ECODE_INVAL) - nic_ip = hostname1.ip - else: - if not utils.IsValidIP(ip): - raise errors.OpPrereqError("Given IP address '%s' doesn't look" - " like a valid IP" % ip, - errors.ECODE_INVAL) - nic_ip = ip - - # TODO: check the ip address for uniqueness - if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip: - raise errors.OpPrereqError("Routed nic mode requires an ip address", - errors.ECODE_INVAL) - - # MAC address verification - mac = nic.get("mac", constants.VALUE_AUTO) - if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): - mac = utils.NormalizeAndValidateMac(mac) - - try: - self.cfg.ReserveMAC(mac, self.proc.GetECId()) - except errors.ReservationError: - raise errors.OpPrereqError("MAC address %s already in use" - " in cluster" % mac, - errors.ECODE_NOTUNIQUE) - - # bridge verification - bridge = nic.get("bridge", None) - link = nic.get("link", None) - if bridge and link: - raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" - " at the same time", errors.ECODE_INVAL) - elif bridge and nic_mode == constants.NIC_MODE_ROUTED: - raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic", - errors.ECODE_INVAL) - elif bridge: - link = bridge - - nicparams = {} - if nic_mode_req: - nicparams[constants.NIC_MODE] = nic_mode_req - if link: - nicparams[constants.NIC_LINK] = link - - check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT], - nicparams) - objects.NIC.CheckParameterSyntax(check_params) - self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams)) - - # disk checks/pre-build - self.disks = [] - for disk in self.op.disks: - mode = disk.get("mode", constants.DISK_RDWR) - if mode not in constants.DISK_ACCESS_SET: - raise errors.OpPrereqError("Invalid disk access mode '%s'" % - mode, errors.ECODE_INVAL) - size = disk.get("size", None) - if size is None: - raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) - try: - size = int(size) - except (TypeError, ValueError): - raise errors.OpPrereqError("Invalid disk size '%s'" % size, - errors.ECODE_INVAL) - new_disk = {"size": size, "mode": mode} - if "adopt" in disk: - new_disk["adopt"] = disk["adopt"] - self.disks.append(new_disk) - # file storage checks if (self.op.file_driver and not self.op.file_driver in constants.FILE_DRIVER): @@ -6214,6 +6112,41 @@ class LUCreateInstance(LogicalUnit): " node must be given", errors.ECODE_INVAL) + if self.op.mode == constants.INSTANCE_IMPORT: + # On import force_variant must be True, because if we forced it at + # initial install, our only chance when importing it back is that it + # works again! + self.op.force_variant = True + + if self.op.no_install: + self.LogInfo("No-installation mode has no effect during import") + + else: # INSTANCE_CREATE + if getattr(self.op, "os_type", None) is None: + raise errors.OpPrereqError("No guest OS specified", + errors.ECODE_INVAL) + self.op.force_variant = getattr(self.op, "force_variant", False) + if self.op.disk_template is None: + raise errors.OpPrereqError("No disk template specified", + errors.ECODE_INVAL) + + def ExpandNames(self): + """ExpandNames for CreateInstance. + + Figure out the right locks for instance creation. + + """ + self.needed_locks = {} + + instance_name = self.op.instance_name + # this is just a preventive check, but someone might still add this + # instance in the meantime, and creation will fail at lock-add time + if instance_name in self.cfg.GetInstanceList(): + raise errors.OpPrereqError("Instance '%s' is already in the cluster" % + instance_name, errors.ECODE_EXISTS) + + self.add_locks[locking.LEVEL_INSTANCE] = instance_name + if self.op.iallocator: self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET else: @@ -6247,20 +6180,6 @@ class LUCreateInstance(LogicalUnit): self.op.src_path = src_path = \ utils.PathJoin(constants.EXPORT_DIR, src_path) - # On import force_variant must be True, because if we forced it at - # initial install, our only chance when importing it back is that it - # works again! - self.op.force_variant = True - - if self.op.no_install: - self.LogInfo("No-installation mode has no effect during import") - - else: # INSTANCE_CREATE - if getattr(self.op, "os_type", None) is None: - raise errors.OpPrereqError("No guest OS specified", - errors.ECODE_INVAL) - self.op.force_variant = getattr(self.op, "force_variant", False) - def _RunAllocator(self): """Run the allocator based on input opcode. @@ -6332,50 +6251,278 @@ class LUCreateInstance(LogicalUnit): self.secondaries) return env, nl, nl + def _ReadExportInfo(self): + """Reads the export information from disk. + + It will override the opcode source node and path with the actual + information, if these two were not specified before. + + @return: the export information + + """ + assert self.op.mode == constants.INSTANCE_IMPORT + + src_node = self.op.src_node + src_path = self.op.src_path + + if src_node is None: + locked_nodes = self.acquired_locks[locking.LEVEL_NODE] + exp_list = self.rpc.call_export_list(locked_nodes) + found = False + for node in exp_list: + if exp_list[node].fail_msg: + continue + if src_path in exp_list[node].payload: + found = True + self.op.src_node = src_node = node + self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR, + src_path) + break + if not found: + raise errors.OpPrereqError("No export found for relative path %s" % + src_path, errors.ECODE_INVAL) + + _CheckNodeOnline(self, src_node) + result = self.rpc.call_export_info(src_node, src_path) + result.Raise("No export or invalid export found in dir %s" % src_path) + + export_info = objects.SerializableConfigParser.Loads(str(result.payload)) + if not export_info.has_section(constants.INISECT_EXP): + raise errors.ProgrammerError("Corrupted export config", + errors.ECODE_ENVIRON) + + ei_version = export_info.get(constants.INISECT_EXP, "version") + if (int(ei_version) != constants.EXPORT_VERSION): + raise errors.OpPrereqError("Wrong export version %s (wanted %d)" % + (ei_version, constants.EXPORT_VERSION), + errors.ECODE_ENVIRON) + return export_info + + def _ReadExportParams(self, einfo): + """Use export parameters as defaults. + + In case the opcode doesn't specify (as in override) some instance + parameters, then try to use them from the export information, if + that declares them. + + """ + self.op.os_type = einfo.get(constants.INISECT_EXP, "os") + + if self.op.disk_template is None: + if einfo.has_option(constants.INISECT_INS, "disk_template"): + self.op.disk_template = einfo.get(constants.INISECT_INS, + "disk_template") + else: + raise errors.OpPrereqError("No disk template specified and the export" + " is missing the disk_template information", + errors.ECODE_INVAL) + + if not self.op.disks: + if einfo.has_option(constants.INISECT_INS, "disk_count"): + disks = [] + # TODO: import the disk iv_name too + for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")): + disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx) + disks.append({"size": disk_sz}) + self.op.disks = disks + else: + raise errors.OpPrereqError("No disk info specified and the export" + " is missing the disk information", + errors.ECODE_INVAL) + + if (not self.op.nics and + einfo.has_option(constants.INISECT_INS, "nic_count")): + nics = [] + for idx in range(einfo.getint(constants.INISECT_INS, "nic_count")): + ndict = {} + for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]: + v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name)) + ndict[name] = v + nics.append(ndict) + self.op.nics = nics + + if (self.op.hypervisor is None and + einfo.has_option(constants.INISECT_INS, "hypervisor")): + self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor") + if einfo.has_section(constants.INISECT_HYP): + # use the export parameters but do not override the ones + # specified by the user + for name, value in einfo.items(constants.INISECT_HYP): + if name not in self.op.hvparams: + self.op.hvparams[name] = value + + if einfo.has_section(constants.INISECT_BEP): + # use the parameters, without overriding + for name, value in einfo.items(constants.INISECT_BEP): + if name not in self.op.beparams: + self.op.beparams[name] = value + else: + # try to read the parameters old style, from the main section + for name in constants.BES_PARAMETERS: + if (name not in self.op.beparams and + einfo.has_option(constants.INISECT_INS, name)): + self.op.beparams[name] = einfo.get(constants.INISECT_INS, name) + + def _RevertToDefaults(self, cluster): + """Revert the instance parameters to the default values. + + """ + # hvparams + hv_defs = cluster.GetHVDefaults(self.op.hypervisor, self.op.os_type) + for name in self.op.hvparams.keys(): + if name in hv_defs and hv_defs[name] == self.op.hvparams[name]: + del self.op.hvparams[name] + # beparams + be_defs = cluster.beparams.get(constants.PP_DEFAULT, {}) + for name in self.op.beparams.keys(): + if name in be_defs and be_defs[name] == self.op.beparams[name]: + del self.op.beparams[name] + # nic params + nic_defs = cluster.nicparams.get(constants.PP_DEFAULT, {}) + for nic in self.op.nics: + for name in constants.NICS_PARAMETERS: + if name in nic and name in nic_defs and nic[name] == nic_defs[name]: + del nic[name] + def CheckPrereq(self): """Check prerequisites. """ + if self.op.mode == constants.INSTANCE_IMPORT: + export_info = self._ReadExportInfo() + self._ReadExportParams(export_info) + + _CheckDiskTemplate(self.op.disk_template) + if (not self.cfg.GetVGName() and self.op.disk_template not in constants.DTS_NOT_LVM): raise errors.OpPrereqError("Cluster does not support lvm-based" " instances", errors.ECODE_STATE) - if self.op.mode == constants.INSTANCE_IMPORT: - src_node = self.op.src_node - src_path = self.op.src_path + if self.op.hypervisor is None: + self.op.hypervisor = self.cfg.GetHypervisorType() - if src_node is None: - locked_nodes = self.acquired_locks[locking.LEVEL_NODE] - exp_list = self.rpc.call_export_list(locked_nodes) - found = False - for node in exp_list: - if exp_list[node].fail_msg: - continue - if src_path in exp_list[node].payload: - found = True - self.op.src_node = src_node = node - self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR, - src_path) - break - if not found: - raise errors.OpPrereqError("No export found for relative path %s" % - src_path, errors.ECODE_INVAL) - - _CheckNodeOnline(self, src_node) - result = self.rpc.call_export_info(src_node, src_path) - result.Raise("No export or invalid export found in dir %s" % src_path) - - export_info = objects.SerializableConfigParser.Loads(str(result.payload)) - if not export_info.has_section(constants.INISECT_EXP): - raise errors.ProgrammerError("Corrupted export config", - errors.ECODE_ENVIRON) - - ei_version = export_info.get(constants.INISECT_EXP, 'version') - if (int(ei_version) != constants.EXPORT_VERSION): - raise errors.OpPrereqError("Wrong export version %s (wanted %d)" % - (ei_version, constants.EXPORT_VERSION), - errors.ECODE_ENVIRON) + cluster = self.cfg.GetClusterInfo() + enabled_hvs = cluster.enabled_hypervisors + if self.op.hypervisor not in enabled_hvs: + raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the" + " cluster (%s)" % (self.op.hypervisor, + ",".join(enabled_hvs)), + errors.ECODE_STATE) + + # check hypervisor parameter syntax (locally) + utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES) + filled_hvp = objects.FillDict(cluster.GetHVDefaults(self.op.hypervisor, + self.op.os_type), + self.op.hvparams) + hv_type = hypervisor.GetHypervisor(self.op.hypervisor) + hv_type.CheckParameterSyntax(filled_hvp) + self.hv_full = filled_hvp + # check that we don't specify global parameters on an instance + _CheckGlobalHvParams(self.op.hvparams) + + # fill and remember the beparams dict + utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES) + self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT], + self.op.beparams) + + # now that hvp/bep are in final format, let's reset to defaults, + # if told to do so + if self.op.identify_defaults: + self._RevertToDefaults(cluster) + + # NIC buildup + self.nics = [] + for idx, nic in enumerate(self.op.nics): + nic_mode_req = nic.get("mode", None) + nic_mode = nic_mode_req + if nic_mode is None: + nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE] + + # in routed mode, for the first nic, the default ip is 'auto' + if nic_mode == constants.NIC_MODE_ROUTED and idx == 0: + default_ip_mode = constants.VALUE_AUTO + else: + default_ip_mode = constants.VALUE_NONE + + # ip validity checks + ip = nic.get("ip", default_ip_mode) + if ip is None or ip.lower() == constants.VALUE_NONE: + nic_ip = None + elif ip.lower() == constants.VALUE_AUTO: + if not self.op.name_check: + raise errors.OpPrereqError("IP address set to auto but name checks" + " have been skipped. Aborting.", + errors.ECODE_INVAL) + nic_ip = self.hostname1.ip + else: + if not utils.IsValidIP(ip): + raise errors.OpPrereqError("Given IP address '%s' doesn't look" + " like a valid IP" % ip, + errors.ECODE_INVAL) + nic_ip = ip + + # TODO: check the ip address for uniqueness + if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip: + raise errors.OpPrereqError("Routed nic mode requires an ip address", + errors.ECODE_INVAL) + + # MAC address verification + mac = nic.get("mac", constants.VALUE_AUTO) + if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE): + mac = utils.NormalizeAndValidateMac(mac) + + try: + self.cfg.ReserveMAC(mac, self.proc.GetECId()) + except errors.ReservationError: + raise errors.OpPrereqError("MAC address %s already in use" + " in cluster" % mac, + errors.ECODE_NOTUNIQUE) + + # bridge verification + bridge = nic.get("bridge", None) + link = nic.get("link", None) + if bridge and link: + raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'" + " at the same time", errors.ECODE_INVAL) + elif bridge and nic_mode == constants.NIC_MODE_ROUTED: + raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic", + errors.ECODE_INVAL) + elif bridge: + link = bridge + + nicparams = {} + if nic_mode_req: + nicparams[constants.NIC_MODE] = nic_mode_req + if link: + nicparams[constants.NIC_LINK] = link + + check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT], + nicparams) + objects.NIC.CheckParameterSyntax(check_params) + self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams)) + + # disk checks/pre-build + self.disks = [] + for disk in self.op.disks: + mode = disk.get("mode", constants.DISK_RDWR) + if mode not in constants.DISK_ACCESS_SET: + raise errors.OpPrereqError("Invalid disk access mode '%s'" % + mode, errors.ECODE_INVAL) + size = disk.get("size", None) + if size is None: + raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL) + try: + size = int(size) + except (TypeError, ValueError): + raise errors.OpPrereqError("Invalid disk size '%s'" % size, + errors.ECODE_INVAL) + new_disk = {"size": size, "mode": mode} + if "adopt" in disk: + new_disk["adopt"] = disk["adopt"] + self.disks.append(new_disk) + + if self.op.mode == constants.INSTANCE_IMPORT: # Check that the new instance doesn't have less disks than the export instance_disks = len(self.disks) @@ -6386,14 +6533,13 @@ class LUCreateInstance(LogicalUnit): (instance_disks, export_disks), errors.ECODE_INVAL) - self.op.os_type = export_info.get(constants.INISECT_EXP, 'os') disk_images = [] for idx in range(export_disks): option = 'disk%d_dump' % idx if export_info.has_option(constants.INISECT_INS, option): # FIXME: are the old os-es, disk sizes, etc. useful? export_name = export_info.get(constants.INISECT_INS, option) - image = utils.PathJoin(src_path, export_name) + image = utils.PathJoin(self.op.src_path, export_name) disk_images.append(image) else: disk_images.append(False) @@ -6401,8 +6547,12 @@ class LUCreateInstance(LogicalUnit): self.src_images = disk_images old_name = export_info.get(constants.INISECT_INS, 'name') - # FIXME: int() here could throw a ValueError on broken exports - exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count')) + try: + exp_nic_count = export_info.getint(constants.INISECT_INS, 'nic_count') + except (TypeError, ValueError), err: + raise errors.OpPrereqError("Invalid export file, nic_count is not" + " an integer: %s" % str(err), + errors.ECODE_STATE) if self.op.instance_name == old_name: for idx, nic in enumerate(self.nics): if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx: @@ -6529,15 +6679,18 @@ class LUCreateInstance(LogicalUnit): else: network_port = None - # this is needed because os.path.join does not accept None arguments - if self.op.file_storage_dir is None: - string_file_storage_dir = "" - else: - string_file_storage_dir = self.op.file_storage_dir + if constants.ENABLE_FILE_STORAGE: + # this is needed because os.path.join does not accept None arguments + if self.op.file_storage_dir is None: + string_file_storage_dir = "" + else: + string_file_storage_dir = self.op.file_storage_dir - # build the full file storage dir path - file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(), - string_file_storage_dir, instance) + # build the full file storage dir path + file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(), + string_file_storage_dir, instance) + else: + file_storage_dir = "" disks = _GenerateDiskTemplate(self, self.op.disk_template, @@ -6631,18 +6784,30 @@ class LUCreateInstance(LogicalUnit): elif self.op.mode == constants.INSTANCE_IMPORT: feedback_fn("* running the instance OS import scripts...") - src_node = self.op.src_node - src_images = self.src_images - cluster_name = self.cfg.GetClusterName() - # FIXME: pass debug option from opcode to backend - import_result = self.rpc.call_instance_os_import(pnode_name, iobj, - src_node, src_images, - cluster_name, - self.op.debug_level) - msg = import_result.fail_msg - if msg: - self.LogWarning("Error while importing the disk images for instance" - " %s on node %s: %s" % (instance, pnode_name, msg)) + + transfers = [] + + for idx, image in enumerate(self.src_images): + if not image: + continue + + # FIXME: pass debug option from opcode to backend + dt = masterd.instance.DiskTransfer("disk/%s" % idx, + constants.IEIO_FILE, (image, ), + constants.IEIO_SCRIPT, + (iobj.disks[idx], idx), + None) + transfers.append(dt) + + import_result = \ + masterd.instance.TransferInstanceData(self, feedback_fn, + self.op.src_node, pnode_name, + self.pnode.secondary_ip, + iobj, transfers) + if not compat.all(import_result): + self.LogWarning("Some disks for instance %s on node %s were not" + " imported successfully" % (instance, pnode_name)) + else: # also checked in the prereq part raise errors.ProgrammerError("Unknown OS initialization mode '%s'" @@ -7556,6 +7721,8 @@ class LURepairNodeStorage(NoHooksLU): def CheckArguments(self): self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name) + _CheckStorageType(self.op.storage_type) + def ExpandNames(self): self.needed_locks = { locking.LEVEL_NODE: [self.op.node_name], @@ -8719,6 +8886,94 @@ class LUExportInstance(LogicalUnit): raise errors.OpPrereqError("Export not supported for instances with" " file-based disks", errors.ECODE_INVAL) + def _CreateSnapshots(self, feedback_fn): + """Creates an LVM snapshot for every disk of the instance. + + @return: List of snapshots as L{objects.Disk} instances + + """ + instance = self.instance + src_node = instance.primary_node + + vgname = self.cfg.GetVGName() + + snap_disks = [] + + for idx, disk in enumerate(instance.disks): + feedback_fn("Creating a snapshot of disk/%s on node %s" % + (idx, src_node)) + + # result.payload will be a snapshot of an lvm leaf of the one we + # passed + result = self.rpc.call_blockdev_snapshot(src_node, disk) + msg = result.fail_msg + if msg: + self.LogWarning("Could not snapshot disk/%s on node %s: %s", + idx, src_node, msg) + snap_disks.append(False) + else: + disk_id = (vgname, result.payload) + new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, + logical_id=disk_id, physical_id=disk_id, + iv_name=disk.iv_name) + snap_disks.append(new_dev) + + return snap_disks + + def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index): + """Removes an LVM snapshot. + + @type snap_disks: list + @param snap_disks: The list of all snapshots as returned by + L{_CreateSnapshots} + @type disk_index: number + @param disk_index: Index of the snapshot to be removed + @rtype: bool + @return: Whether removal was successful or not + + """ + disk = snap_disks[disk_index] + if disk: + src_node = self.instance.primary_node + + feedback_fn("Removing snapshot of disk/%s on node %s" % + (disk_index, src_node)) + + result = self.rpc.call_blockdev_remove(src_node, disk) + if not result.fail_msg: + return True + + self.LogWarning("Could not remove snapshot for disk/%d from node" + " %s: %s", disk_index, src_node, result.fail_msg) + + return False + + def _CleanupExports(self, feedback_fn): + """Removes exports of current instance from all other nodes. + + If an instance in a cluster with nodes A..D was exported to node C, its + exports will be removed from the nodes A, B and D. + + """ + nodelist = self.cfg.GetNodeList() + nodelist.remove(self.dst_node.name) + + # on one-node clusters nodelist will be empty after the removal + # if we proceed the backup would be removed because OpQueryExports + # substitutes an empty list with the full cluster node list. + iname = self.instance.name + if nodelist: + feedback_fn("Removing old exports for instance %s" % iname) + exportlist = self.rpc.call_export_list(nodelist) + for node in exportlist: + if exportlist[node].fail_msg: + continue + if iname in exportlist[node].payload: + msg = self.rpc.call_export_remove(node, iname).fail_msg + if msg: + self.LogWarning("Could not remove older export for instance %s" + " on node %s: %s", iname, node, msg) + def Exec(self, feedback_fn): """Export an instance to an image in the cluster. @@ -8736,10 +8991,6 @@ class LUExportInstance(LogicalUnit): result.Raise("Could not shutdown instance %s on" " node %s" % (instance.name, src_node)) - vgname = self.cfg.GetVGName() - - snap_disks = [] - # set the disks ID correctly since call_instance_start needs the # correct drbd minor to create the symlinks for disk in instance.disks: @@ -8754,70 +9005,80 @@ class LUExportInstance(LogicalUnit): try: # per-disk results - dresults = [] + removed_snaps = [False] * len(instance.disks) + + snap_disks = None try: - for idx, disk in enumerate(instance.disks): - feedback_fn("Creating a snapshot of disk/%s on node %s" % - (idx, src_node)) - - # result.payload will be a snapshot of an lvm leaf of the one we - # passed - result = self.rpc.call_blockdev_snapshot(src_node, disk) - msg = result.fail_msg - if msg: - self.LogWarning("Could not snapshot disk/%s on node %s: %s", - idx, src_node, msg) - snap_disks.append(False) - else: - disk_id = (vgname, result.payload) - new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, - logical_id=disk_id, physical_id=disk_id, - iv_name=disk.iv_name) - snap_disks.append(new_dev) + try: + snap_disks = self._CreateSnapshots(feedback_fn) + finally: + if (self.op.shutdown and instance.admin_up and + not self.remove_instance): + feedback_fn("Starting instance %s" % instance.name) + result = self.rpc.call_instance_start(src_node, instance, + None, None) + msg = result.fail_msg + if msg: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Could not start instance: %s" % msg) + + assert len(snap_disks) == len(instance.disks) + assert len(removed_snaps) == len(instance.disks) + + # TODO: check for size + + def _TransferFinished(idx): + logging.debug("Transfer %s finished", idx) + if self._RemoveSnapshot(feedback_fn, snap_disks, idx): + removed_snaps[idx] = True + + transfers = [] + + for idx, dev in enumerate(snap_disks): + if not dev: + transfers.append(None) + continue - finally: - if self.op.shutdown and instance.admin_up and not self.remove_instance: - feedback_fn("Starting instance %s" % instance.name) - result = self.rpc.call_instance_start(src_node, instance, None, None) - msg = result.fail_msg - if msg: - _ShutdownInstanceDisks(self, instance) - raise errors.OpExecError("Could not start instance: %s" % msg) - - # TODO: check for size - - cluster_name = self.cfg.GetClusterName() - for idx, dev in enumerate(snap_disks): - feedback_fn("Exporting snapshot %s from %s to %s" % - (idx, src_node, dst_node.name)) - if dev: - # FIXME: pass debug from opcode to backend - result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, - instance, cluster_name, - idx, self.op.debug_level) - msg = result.fail_msg - if msg: - self.LogWarning("Could not export disk/%s from node %s to" - " node %s: %s", idx, src_node, dst_node.name, msg) - dresults.append(False) - else: - dresults.append(True) - msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg - if msg: - self.LogWarning("Could not remove snapshot for disk/%d from node" - " %s: %s", idx, src_node, msg) - else: - dresults.append(False) + path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name, + dev.physical_id[1]) - feedback_fn("Finalizing export on %s" % dst_node.name) - result = self.rpc.call_finalize_export(dst_node.name, instance, - snap_disks) - fin_resu = True - msg = result.fail_msg - if msg: - self.LogWarning("Could not finalize export for instance %s" - " on node %s: %s", instance.name, dst_node.name, msg) - fin_resu = False + finished_fn = compat.partial(_TransferFinished, idx) + + # FIXME: pass debug option from opcode to backend + dt = masterd.instance.DiskTransfer("snapshot/%s" % idx, + constants.IEIO_SCRIPT, (dev, idx), + constants.IEIO_FILE, (path, ), + finished_fn) + transfers.append(dt) + + # Actually export data + dresults = \ + masterd.instance.TransferInstanceData(self, feedback_fn, + src_node, dst_node.name, + dst_node.secondary_ip, + instance, transfers) + + assert len(dresults) == len(instance.disks) + + # Check for backwards compatibility + assert compat.all(isinstance(i, bool) for i in dresults), \ + "Not all results are boolean: %r" % dresults + + feedback_fn("Finalizing export on %s" % dst_node.name) + result = self.rpc.call_finalize_export(dst_node.name, instance, + snap_disks) + msg = result.fail_msg + fin_resu = not msg + if msg: + self.LogWarning("Could not finalize export for instance %s" + " on node %s: %s", instance.name, dst_node.name, msg) + + finally: + # Remove all snapshots + assert len(removed_snaps) == len(instance.disks) + for idx, removed in enumerate(removed_snaps): + if not removed: + self._RemoveSnapshot(feedback_fn, snap_disks, idx) finally: if activate_disks: @@ -8829,24 +9090,7 @@ class LUExportInstance(LogicalUnit): feedback_fn("Removing instance %s" % instance.name) _RemoveInstance(self, feedback_fn, instance, self.ignore_remove_failures) - nodelist = self.cfg.GetNodeList() - nodelist.remove(dst_node.name) - - # on one-node clusters nodelist will be empty after the removal - # if we proceed the backup would be removed because OpQueryExports - # substitutes an empty list with the full cluster node list. - iname = instance.name - if nodelist: - feedback_fn("Removing old exports for instance %s" % iname) - exportlist = self.rpc.call_export_list(nodelist) - for node in exportlist: - if exportlist[node].fail_msg: - continue - if iname in exportlist[node].payload: - msg = self.rpc.call_export_remove(node, iname).fail_msg - if msg: - self.LogWarning("Could not remove older export for instance %s" - " on node %s: %s", iname, node, msg) + self._CleanupExports(feedback_fn) return fin_resu, dresults