X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/9f3ac97059d3b565805a7b1d7e472d82a9782f22..c1e7897de040fb86a71100684b64f46c8b08e2eb:/lib/cmdlib.py diff --git a/lib/cmdlib.py b/lib/cmdlib.py index 66cd416..9f8b1ff 100644 --- a/lib/cmdlib.py +++ b/lib/cmdlib.py @@ -45,6 +45,10 @@ from ganeti import objects from ganeti import serializer from ganeti import ssconf from ganeti import uidpool +from ganeti import compat +from ganeti import masterd + +import ganeti.masterd.instance # pylint: disable-msg=W0611 class LogicalUnit(object): @@ -919,13 +923,6 @@ def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq): return faulty -def _FormatTimestamp(secs): - """Formats a Unix timestamp with the local timezone. - - """ - return time.strftime("%F %T %Z", time.gmtime(secs)) - - class LUPostInitCluster(LogicalUnit): """Logical unit for running hooks after cluster initialization. @@ -1017,45 +1014,6 @@ class LUDestroyCluster(LogicalUnit): return master -def _VerifyCertificateInner(filename, expired, not_before, not_after, now, - warn_days=constants.SSL_CERT_EXPIRATION_WARN, - error_days=constants.SSL_CERT_EXPIRATION_ERROR): - """Verifies certificate details for LUVerifyCluster. - - """ - if expired: - msg = "Certificate %s is expired" % filename - - if not_before is not None and not_after is not None: - msg += (" (valid from %s to %s)" % - (_FormatTimestamp(not_before), - _FormatTimestamp(not_after))) - elif not_before is not None: - msg += " (valid from %s)" % _FormatTimestamp(not_before) - elif not_after is not None: - msg += " (valid until %s)" % _FormatTimestamp(not_after) - - return (LUVerifyCluster.ETYPE_ERROR, msg) - - elif not_before is not None and not_before > now: - return (LUVerifyCluster.ETYPE_WARNING, - "Certificate %s not yet valid (valid from %s)" % - (filename, _FormatTimestamp(not_before))) - - elif not_after is not None: - remaining_days = int((not_after - now) / (24 * 3600)) - - msg = ("Certificate %s expires in %d days" % (filename, remaining_days)) - - if remaining_days <= error_days: - return (LUVerifyCluster.ETYPE_ERROR, msg) - - if remaining_days <= warn_days: - return (LUVerifyCluster.ETYPE_WARNING, msg) - - return (None, None) - - def _VerifyCertificate(filename): """Verifies a certificate for LUVerifyCluster. @@ -1070,11 +1028,23 @@ def _VerifyCertificate(filename): return (LUVerifyCluster.ETYPE_ERROR, "Failed to load X509 certificate %s: %s" % (filename, err)) - # Depending on the pyOpenSSL version, this can just return (None, None) - (not_before, not_after) = utils.GetX509CertValidity(cert) + (errcode, msg) = \ + utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN, + constants.SSL_CERT_EXPIRATION_ERROR) - return _VerifyCertificateInner(filename, cert.has_expired(), - not_before, not_after, time.time()) + if msg: + fnamemsg = "While verifying %s: %s" % (filename, msg) + else: + fnamemsg = None + + if errcode is None: + return (None, fnamemsg) + elif errcode == utils.CERT_WARNING: + return (LUVerifyCluster.ETYPE_WARNING, fnamemsg) + elif errcode == utils.CERT_ERROR: + return (LUVerifyCluster.ETYPE_ERROR, fnamemsg) + + raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode) class LUVerifyCluster(LogicalUnit): @@ -1688,6 +1658,7 @@ class LUVerifyCluster(LogicalUnit): vg_name = self.cfg.GetVGName() hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors + cluster = self.cfg.GetClusterInfo() nodelist = utils.NiceSort(self.cfg.GetNodeList()) nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist] instancelist = utils.NiceSort(self.cfg.GetInstanceList()) @@ -1706,6 +1677,8 @@ class LUVerifyCluster(LogicalUnit): file_names = ssconf.SimpleStore().GetFileList() file_names.extend(constants.ALL_CERT_FILES) file_names.extend(master_files) + if cluster.modify_etc_hosts: + file_names.append(constants.ETC_HOSTS) local_checksums = utils.FingerprintFiles(file_names) @@ -1769,7 +1742,6 @@ class LUVerifyCluster(LogicalUnit): self.cfg.GetClusterName()) nvinfo_endtime = time.time() - cluster = self.cfg.GetClusterInfo() master_node = self.cfg.GetMasterNode() all_drbd_map = self.cfg.ComputeDRBDMap() @@ -2255,8 +2227,11 @@ class LUSetClusterParams(LogicalUnit): """Check parameters """ - if not hasattr(self.op, "candidate_pool_size"): - self.op.candidate_pool_size = None + for attr in ["candidate_pool_size", + "uid_pool", "add_uids", "remove_uids"]: + if not hasattr(self.op, attr): + setattr(self.op, attr, None) + if self.op.candidate_pool_size is not None: try: self.op.candidate_pool_size = int(self.op.candidate_pool_size) @@ -2877,6 +2852,12 @@ class LURemoveNode(LogicalUnit): self.LogWarning("Errors encountered on the remote node while leaving" " the cluster: %s", msg) + # Remove node from our /etc/hosts + if self.cfg.GetClusterInfo().modify_etc_hosts: + # FIXME: this should be done via an rpc call to node daemon + utils.RemoveHostFromEtcHosts(node.name) + _RedistributeAncillaryFiles(self) + class LUQueryNodes(NoHooksLU): """Logical unit for querying nodes. @@ -3451,6 +3432,7 @@ class LUAddNode(LogicalUnit): # Add node to our /etc/hosts, and add key to known_hosts if self.cfg.GetClusterInfo().modify_etc_hosts: + # FIXME: this should be done via an rpc call to node daemon utils.AddHostToEtcHosts(new_node.name) if new_node.secondary_ip != new_node.primary_ip: @@ -4638,18 +4620,29 @@ class LURemoveInstance(LogicalUnit): " node %s: %s" % (instance.name, instance.primary_node, msg)) - logging.info("Removing block devices for instance %s", instance.name) + _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures) - if not _RemoveDisks(self, instance): - if self.op.ignore_failures: - feedback_fn("Warning: can't remove instance's disks") - else: - raise errors.OpExecError("Can't remove instance's disks") - logging.info("Removing instance %s out of cluster config", instance.name) +def _RemoveInstance(lu, feedback_fn, instance, ignore_failures): + """Utility function to remove an instance. + + """ + logging.info("Removing block devices for instance %s", instance.name) + + if not _RemoveDisks(lu, instance): + if not ignore_failures: + raise errors.OpExecError("Can't remove instance's disks") + feedback_fn("Warning: can't remove instance's disks") - self.cfg.RemoveInstance(instance.name) - self.remove_locks[locking.LEVEL_INSTANCE] = instance.name + logging.info("Removing instance %s out of cluster config", instance.name) + + lu.cfg.RemoveInstance(instance.name) + + assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \ + "Instance lock removal conflict" + + # Remove lock for the instance + lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name class LUQueryInstances(NoHooksLU): @@ -6699,7 +6692,6 @@ class LUCreateInstance(LogicalUnit): else: file_storage_dir = "" - disks = _GenerateDiskTemplate(self, self.op.disk_template, instance, pnode_name, @@ -6792,18 +6784,30 @@ class LUCreateInstance(LogicalUnit): elif self.op.mode == constants.INSTANCE_IMPORT: feedback_fn("* running the instance OS import scripts...") - src_node = self.op.src_node - src_images = self.src_images - cluster_name = self.cfg.GetClusterName() - # FIXME: pass debug option from opcode to backend - import_result = self.rpc.call_instance_os_import(pnode_name, iobj, - src_node, src_images, - cluster_name, - self.op.debug_level) - msg = import_result.fail_msg - if msg: - self.LogWarning("Error while importing the disk images for instance" - " %s on node %s: %s" % (instance, pnode_name, msg)) + + transfers = [] + + for idx, image in enumerate(self.src_images): + if not image: + continue + + # FIXME: pass debug option from opcode to backend + dt = masterd.instance.DiskTransfer("disk/%s" % idx, + constants.IEIO_FILE, (image, ), + constants.IEIO_SCRIPT, + (iobj.disks[idx], idx), + None) + transfers.append(dt) + + import_result = \ + masterd.instance.TransferInstanceData(self, feedback_fn, + self.op.src_node, pnode_name, + self.pnode.secondary_ip, + iobj, transfers) + if not compat.all(import_result): + self.LogWarning("Some disks for instance %s on node %s were not" + " imported successfully" % (instance, pnode_name)) + else: # also checked in the prereq part raise errors.ProgrammerError("Unknown OS initialization mode '%s'" @@ -8808,11 +8812,22 @@ class LUExportInstance(LogicalUnit): """Check the arguments. """ + _CheckBooleanOpField(self.op, "remove_instance") + _CheckBooleanOpField(self.op, "ignore_remove_failures") + self.shutdown_timeout = getattr(self.op, "shutdown_timeout", constants.DEFAULT_SHUTDOWN_TIMEOUT) + self.remove_instance = getattr(self.op, "remove_instance", False) + self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures", + False) + + if self.remove_instance and not self.op.shutdown: + raise errors.OpPrereqError("Can not remove instance without shutting it" + " down before") def ExpandNames(self): self._ExpandAndLockInstance() + # FIXME: lock only instance primary and destination node # # Sad but true, for now we have do lock all nodes, as we don't know where @@ -8837,6 +8852,8 @@ class LUExportInstance(LogicalUnit): "EXPORT_NODE": self.op.target_node, "EXPORT_DO_SHUTDOWN": self.op.shutdown, "SHUTDOWN_TIMEOUT": self.shutdown_timeout, + # TODO: Generic function for boolean env variables + "REMOVE_INSTANCE": str(bool(self.remove_instance)), } env.update(_BuildInstanceHookEnvByObject(self, self.instance)) nl = [self.cfg.GetMasterNode(), self.instance.primary_node, @@ -8863,11 +8880,100 @@ class LUExportInstance(LogicalUnit): _CheckNodeNotDrained(self, self.dst_node.name) # instance disk type verification + # TODO: Implement export support for file-based disks for disk in self.instance.disks: if disk.dev_type == constants.LD_FILE: raise errors.OpPrereqError("Export not supported for instances with" " file-based disks", errors.ECODE_INVAL) + def _CreateSnapshots(self, feedback_fn): + """Creates an LVM snapshot for every disk of the instance. + + @return: List of snapshots as L{objects.Disk} instances + + """ + instance = self.instance + src_node = instance.primary_node + + vgname = self.cfg.GetVGName() + + snap_disks = [] + + for idx, disk in enumerate(instance.disks): + feedback_fn("Creating a snapshot of disk/%s on node %s" % + (idx, src_node)) + + # result.payload will be a snapshot of an lvm leaf of the one we + # passed + result = self.rpc.call_blockdev_snapshot(src_node, disk) + msg = result.fail_msg + if msg: + self.LogWarning("Could not snapshot disk/%s on node %s: %s", + idx, src_node, msg) + snap_disks.append(False) + else: + disk_id = (vgname, result.payload) + new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, + logical_id=disk_id, physical_id=disk_id, + iv_name=disk.iv_name) + snap_disks.append(new_dev) + + return snap_disks + + def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index): + """Removes an LVM snapshot. + + @type snap_disks: list + @param snap_disks: The list of all snapshots as returned by + L{_CreateSnapshots} + @type disk_index: number + @param disk_index: Index of the snapshot to be removed + @rtype: bool + @return: Whether removal was successful or not + + """ + disk = snap_disks[disk_index] + if disk: + src_node = self.instance.primary_node + + feedback_fn("Removing snapshot of disk/%s on node %s" % + (disk_index, src_node)) + + result = self.rpc.call_blockdev_remove(src_node, disk) + if not result.fail_msg: + return True + + self.LogWarning("Could not remove snapshot for disk/%d from node" + " %s: %s", disk_index, src_node, result.fail_msg) + + return False + + def _CleanupExports(self, feedback_fn): + """Removes exports of current instance from all other nodes. + + If an instance in a cluster with nodes A..D was exported to node C, its + exports will be removed from the nodes A, B and D. + + """ + nodelist = self.cfg.GetNodeList() + nodelist.remove(self.dst_node.name) + + # on one-node clusters nodelist will be empty after the removal + # if we proceed the backup would be removed because OpQueryExports + # substitutes an empty list with the full cluster node list. + iname = self.instance.name + if nodelist: + feedback_fn("Removing old exports for instance %s" % iname) + exportlist = self.rpc.call_export_list(nodelist) + for node in exportlist: + if exportlist[node].fail_msg: + continue + if iname in exportlist[node].payload: + msg = self.rpc.call_export_remove(node, iname).fail_msg + if msg: + self.LogWarning("Could not remove older export for instance %s" + " on node %s: %s", iname, node, msg) + def Exec(self, feedback_fn): """Export an instance to an image in the cluster. @@ -8881,13 +8987,10 @@ class LUExportInstance(LogicalUnit): feedback_fn("Shutting down instance %s" % instance.name) result = self.rpc.call_instance_shutdown(src_node, instance, self.shutdown_timeout) + # TODO: Maybe ignore failures if ignore_remove_failures is set result.Raise("Could not shutdown instance %s on" " node %s" % (instance.name, src_node)) - vgname = self.cfg.GetVGName() - - snap_disks = [] - # set the disks ID correctly since call_instance_start needs the # correct drbd minor to create the symlinks for disk in instance.disks: @@ -8902,94 +9005,93 @@ class LUExportInstance(LogicalUnit): try: # per-disk results - dresults = [] + removed_snaps = [False] * len(instance.disks) + + snap_disks = None try: - for idx, disk in enumerate(instance.disks): - feedback_fn("Creating a snapshot of disk/%s on node %s" % - (idx, src_node)) - - # result.payload will be a snapshot of an lvm leaf of the one we - # passed - result = self.rpc.call_blockdev_snapshot(src_node, disk) - msg = result.fail_msg - if msg: - self.LogWarning("Could not snapshot disk/%s on node %s: %s", - idx, src_node, msg) - snap_disks.append(False) - else: - disk_id = (vgname, result.payload) - new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, - logical_id=disk_id, physical_id=disk_id, - iv_name=disk.iv_name) - snap_disks.append(new_dev) + try: + snap_disks = self._CreateSnapshots(feedback_fn) + finally: + if (self.op.shutdown and instance.admin_up and + not self.remove_instance): + feedback_fn("Starting instance %s" % instance.name) + result = self.rpc.call_instance_start(src_node, instance, + None, None) + msg = result.fail_msg + if msg: + _ShutdownInstanceDisks(self, instance) + raise errors.OpExecError("Could not start instance: %s" % msg) + + assert len(snap_disks) == len(instance.disks) + assert len(removed_snaps) == len(instance.disks) + + # TODO: check for size + + def _TransferFinished(idx): + logging.debug("Transfer %s finished", idx) + if self._RemoveSnapshot(feedback_fn, snap_disks, idx): + removed_snaps[idx] = True + + transfers = [] + + for idx, dev in enumerate(snap_disks): + if not dev: + transfers.append(None) + continue - finally: - if self.op.shutdown and instance.admin_up: - feedback_fn("Starting instance %s" % instance.name) - result = self.rpc.call_instance_start(src_node, instance, None, None) - msg = result.fail_msg - if msg: - _ShutdownInstanceDisks(self, instance) - raise errors.OpExecError("Could not start instance: %s" % msg) - - # TODO: check for size - - cluster_name = self.cfg.GetClusterName() - for idx, dev in enumerate(snap_disks): - feedback_fn("Exporting snapshot %s from %s to %s" % - (idx, src_node, dst_node.name)) - if dev: - # FIXME: pass debug from opcode to backend - result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name, - instance, cluster_name, - idx, self.op.debug_level) - msg = result.fail_msg - if msg: - self.LogWarning("Could not export disk/%s from node %s to" - " node %s: %s", idx, src_node, dst_node.name, msg) - dresults.append(False) - else: - dresults.append(True) - msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg - if msg: - self.LogWarning("Could not remove snapshot for disk/%d from node" - " %s: %s", idx, src_node, msg) - else: - dresults.append(False) + path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name, + dev.physical_id[1]) - feedback_fn("Finalizing export on %s" % dst_node.name) - result = self.rpc.call_finalize_export(dst_node.name, instance, - snap_disks) - fin_resu = True - msg = result.fail_msg - if msg: - self.LogWarning("Could not finalize export for instance %s" - " on node %s: %s", instance.name, dst_node.name, msg) - fin_resu = False + finished_fn = compat.partial(_TransferFinished, idx) + + # FIXME: pass debug option from opcode to backend + dt = masterd.instance.DiskTransfer("snapshot/%s" % idx, + constants.IEIO_SCRIPT, (dev, idx), + constants.IEIO_FILE, (path, ), + finished_fn) + transfers.append(dt) + + # Actually export data + dresults = \ + masterd.instance.TransferInstanceData(self, feedback_fn, + src_node, dst_node.name, + dst_node.secondary_ip, + instance, transfers) + + assert len(dresults) == len(instance.disks) + + # Check for backwards compatibility + assert compat.all(isinstance(i, bool) for i in dresults), \ + "Not all results are boolean: %r" % dresults + + feedback_fn("Finalizing export on %s" % dst_node.name) + result = self.rpc.call_finalize_export(dst_node.name, instance, + snap_disks) + msg = result.fail_msg + fin_resu = not msg + if msg: + self.LogWarning("Could not finalize export for instance %s" + " on node %s: %s", instance.name, dst_node.name, msg) + + finally: + # Remove all snapshots + assert len(removed_snaps) == len(instance.disks) + for idx, removed in enumerate(removed_snaps): + if not removed: + self._RemoveSnapshot(feedback_fn, snap_disks, idx) finally: if activate_disks: feedback_fn("Deactivating disks for %s" % instance.name) _ShutdownInstanceDisks(self, instance) - nodelist = self.cfg.GetNodeList() - nodelist.remove(dst_node.name) + # Remove instance if requested + if self.remove_instance: + feedback_fn("Removing instance %s" % instance.name) + _RemoveInstance(self, feedback_fn, instance, self.ignore_remove_failures) + + self._CleanupExports(feedback_fn) - # on one-node clusters nodelist will be empty after the removal - # if we proceed the backup would be removed because OpQueryExports - # substitutes an empty list with the full cluster node list. - iname = instance.name - if nodelist: - feedback_fn("Removing old exports for instance %s" % iname) - exportlist = self.rpc.call_export_list(nodelist) - for node in exportlist: - if exportlist[node].fail_msg: - continue - if iname in exportlist[node].payload: - msg = self.rpc.call_export_remove(node, iname).fail_msg - if msg: - self.LogWarning("Could not remove older export for instance %s" - " on node %s: %s", iname, node, msg) return fin_resu, dresults