X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/5d97d6ddf838c40a52cc81eadd9513e5743ef4f5..adc523ab4aace8063588ab3715e72edb6634da1b:/lib/masterd/instance.py diff --git a/lib/masterd/instance.py b/lib/masterd/instance.py index 9872467..45abf49 100644 --- a/lib/masterd/instance.py +++ b/lib/masterd/instance.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2010 Google Inc. +# Copyright (C) 2010, 2011 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -25,10 +25,14 @@ import logging import time +import OpenSSL from ganeti import constants from ganeti import errors from ganeti import compat +from ganeti import utils +from ganeti import objects +from ganeti import netutils class _ImportExportError(Exception): @@ -47,17 +51,22 @@ class ImportExportTimeouts(object): #: Time after which daemon must be listening DEFAULT_LISTEN_TIMEOUT = 10 + #: Progress update interval + DEFAULT_PROGRESS_INTERVAL = 60 + __slots__ = [ "error", "ready", "listen", "connect", + "progress", ] def __init__(self, connect, listen=DEFAULT_LISTEN_TIMEOUT, error=DEFAULT_ERROR_TIMEOUT, - ready=DEFAULT_READY_TIMEOUT): + ready=DEFAULT_READY_TIMEOUT, + progress=DEFAULT_PROGRESS_INTERVAL): """Initializes this class. @type connect: number @@ -68,12 +77,15 @@ class ImportExportTimeouts(object): @param error: Length of time until errors cause hard failure @type ready: number @param ready: Timeout for daemon to become ready + @type progress: number + @param progress: Progress update interval """ self.error = error self.ready = ready self.listen = listen self.connect = connect + self.progress = progress class ImportExportCbBase(object): @@ -98,6 +110,15 @@ class ImportExportCbBase(object): """ + def ReportProgress(self, ie, private): + """Called when new progress information should be reported. + + @type ie: Subclass of L{_DiskImportExportBase} + @param ie: Import/export object + @param private: Private data passed to import/export object + + """ + def ReportFinished(self, ie, private): """Called when a transfer has finished. @@ -118,17 +139,15 @@ def _TimeoutExpired(epoch, timeout, _time_fn=time.time): class _DiskImportExportBase(object): MODE_TEXT = None - def __init__(self, lu, node_name, x509_key_name, remote_x509_ca, + def __init__(self, lu, node_name, opts, instance, timeouts, cbs, private=None): """Initializes this class. @param lu: Logical unit instance @type node_name: string @param node_name: Node name for import - @type x509_key_name: string - @param x509_key_name: Name of X509 key (None for node daemon key) - @type remote_x509_ca: string - @param remote_x509_ca: Remote peer's CA (None for node daemon certificate) + @type opts: L{objects.ImportExportOptions} + @param opts: Import/export daemon options @type instance: L{objects.Instance} @param instance: Instance object @type timeouts: L{ImportExportTimeouts} @@ -142,13 +161,16 @@ class _DiskImportExportBase(object): self._lu = lu self.node_name = node_name - self._x509_key_name = x509_key_name - self._remote_x509_ca = remote_x509_ca + self._opts = opts.Copy() self._instance = instance self._timeouts = timeouts self._cbs = cbs self._private = private + # Set master daemon's timeout in options for import/export daemon + assert self._opts.connect_timeout is None + self._opts.connect_timeout = timeouts.connect + # Parent loop self._loop = None @@ -157,6 +179,7 @@ class _DiskImportExportBase(object): self._ts_connected = None self._ts_finished = None self._ts_cleanup = None + self._ts_last_progress = None self._ts_last_error = None # Transfer status @@ -173,11 +196,31 @@ class _DiskImportExportBase(object): """ if self._daemon: - return self._daemon.recent_output + return "\n".join(self._daemon.recent_output) return None @property + def progress(self): + """Returns transfer progress information. + + """ + if not self._daemon: + return None + + return (self._daemon.progress_mbytes, + self._daemon.progress_throughput, + self._daemon.progress_percent, + self._daemon.progress_eta) + + @property + def magic(self): + """Returns the magic value for this import/export. + + """ + return self._opts.magic + + @property def active(self): """Determines whether this transport is still active. @@ -345,6 +388,18 @@ class _DiskImportExportBase(object): return False + def _CheckProgress(self): + """Checks whether a progress update should be reported. + + """ + if ((self._ts_last_progress is None or + _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and + self._daemon and + self._daemon.progress_mbytes is not None and + self._daemon.progress_throughput is not None): + self._cbs.ReportProgress(self, self._private) + self._ts_last_progress = time.time() + def CheckFinished(self): """Checks whether the daemon exited. @@ -358,6 +413,8 @@ class _DiskImportExportBase(object): return True if self._daemon.exit_status is None: + # TODO: Adjust delay for ETA expiring soon + self._CheckProgress() return False self._ts_finished = time.time() @@ -404,8 +461,6 @@ class _DiskImportExportBase(object): """Finalizes this import/export. """ - assert error or self.success is not None - if self._daemon_name: logging.info("Finalizing %s %r on %s", self.MODE_TEXT, self._daemon_name, self.node_name) @@ -430,17 +485,15 @@ class _DiskImportExportBase(object): class DiskImport(_DiskImportExportBase): MODE_TEXT = "import" - def __init__(self, lu, node_name, x509_key_name, source_x509_ca, instance, + def __init__(self, lu, node_name, opts, instance, dest, dest_args, timeouts, cbs, private=None): """Initializes this class. @param lu: Logical unit instance @type node_name: string @param node_name: Node name for import - @type x509_key_name: string - @param x509_key_name: Name of X509 key (None for node daemon key) - @type source_x509_ca: string - @param source_x509_ca: Remote peer's CA (None for node daemon certificate) + @type opts: L{objects.ImportExportOptions} + @param opts: Import/export daemon options @type instance: L{objects.Instance} @param instance: Instance object @param dest: I/O destination @@ -452,8 +505,7 @@ class DiskImport(_DiskImportExportBase): @param private: Private data for callback functions """ - _DiskImportExportBase.__init__(self, lu, node_name, - x509_key_name, source_x509_ca, + _DiskImportExportBase.__init__(self, lu, node_name, opts, instance, timeouts, cbs, private) self._dest = dest self._dest_args = dest_args @@ -475,9 +527,8 @@ class DiskImport(_DiskImportExportBase): """Starts the import daemon. """ - return self._lu.rpc.call_import_start(self.node_name, - self._x509_key_name, - self._remote_x509_ca, self._instance, + return self._lu.rpc.call_import_start(self.node_name, self._opts, + self._instance, self._dest, self._dest_args) def CheckListening(self): @@ -523,7 +574,7 @@ class DiskImport(_DiskImportExportBase): class DiskExport(_DiskImportExportBase): MODE_TEXT = "export" - def __init__(self, lu, node_name, x509_key_name, dest_x509_ca, + def __init__(self, lu, node_name, opts, dest_host, dest_port, instance, source, source_args, timeouts, cbs, private=None): """Initializes this class. @@ -531,10 +582,8 @@ class DiskExport(_DiskImportExportBase): @param lu: Logical unit instance @type node_name: string @param node_name: Node name for import - @type x509_key_name: string - @param x509_key_name: Name of X509 key (None for node daemon key) - @type dest_x509_ca: string - @param dest_x509_ca: Remote peer's CA (None for node daemon certificate) + @type opts: L{objects.ImportExportOptions} + @param opts: Import/export daemon options @type dest_host: string @param dest_host: Destination host name or IP address @type dest_port: number @@ -550,8 +599,7 @@ class DiskExport(_DiskImportExportBase): @param private: Private data for callback functions """ - _DiskImportExportBase.__init__(self, lu, node_name, - x509_key_name, dest_x509_ca, + _DiskImportExportBase.__init__(self, lu, node_name, opts, instance, timeouts, cbs, private) self._dest_host = dest_host self._dest_port = dest_port @@ -562,8 +610,7 @@ class DiskExport(_DiskImportExportBase): """Starts the export daemon. """ - return self._lu.rpc.call_export_start(self.node_name, self._x509_key_name, - self._remote_x509_ca, + return self._lu.rpc.call_export_start(self.node_name, self._opts, self._dest_host, self._dest_port, self._instance, self._source, self._source_args) @@ -584,6 +631,28 @@ class DiskExport(_DiskImportExportBase): return self._ts_begin +def FormatProgress(progress): + """Formats progress information for user consumption + + """ + (mbytes, throughput, percent, eta) = progress + + parts = [ + utils.FormatUnit(mbytes, "h"), + + # Not using FormatUnit as it doesn't support kilobytes + "%0.1f MiB/s" % throughput, + ] + + if percent is not None: + parts.append("%d%%" % percent) + + if eta is not None: + parts.append("ETA %s" % utils.FormatSeconds(eta)) + + return utils.CommaJoin(parts) + + class ImportExportLoop: MIN_DELAY = 1.0 MAX_DELAY = 20.0 @@ -729,7 +798,7 @@ class ImportExportLoop: logging.exception("%s failed", diskie.MODE_TEXT) diskie.Finalize(error=str(err)) - if not compat.any([diskie.active for diskie in self._queue]): + if not compat.any(diskie.active for diskie in self._queue): break # Wait a bit @@ -779,6 +848,16 @@ class _TransferInstSourceCb(_TransferInstCbBase): self.feedback_fn("%s is sending data on %s" % (dtp.data.name, ie.node_name)) + def ReportProgress(self, ie, dtp): + """Called when new progress information should be reported. + + """ + progress = ie.progress + if not progress: + return + + self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress))) + def ReportFinished(self, ie, dtp): """Called when a transfer has finished. @@ -790,7 +869,7 @@ class _TransferInstSourceCb(_TransferInstCbBase): if ie.success: self.feedback_fn("%s finished sending data" % dtp.data.name) else: - self.feedback_fn("%s failed to send data: %s (recent output: %r)" % + self.feedback_fn("%s failed to send data: %s (recent output: %s)" % (dtp.data.name, ie.final_message, ie.recent_output)) dtp.RecordResult(ie.success) @@ -813,13 +892,14 @@ class _TransferInstDestCb(_TransferInstCbBase): assert self.src_cbs assert dtp.src_export is None assert dtp.dest_import + assert dtp.export_opts self.feedback_fn("%s is now listening, starting export" % dtp.data.name) # Start export on source node - de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip, - ie.listen_port, self.instance, - dtp.data.src_io, dtp.data.src_ioargs, + de = DiskExport(self.lu, self.src_node, dtp.export_opts, + self.dest_ip, ie.listen_port, + self.instance, dtp.data.src_io, dtp.data.src_ioargs, self.timeouts, self.src_cbs, private=dtp) ie.loop.Add(de) @@ -839,7 +919,7 @@ class _TransferInstDestCb(_TransferInstCbBase): if ie.success: self.feedback_fn("%s finished receiving data" % dtp.data.name) else: - self.feedback_fn("%s failed to receive data: %s (recent output: %r)" % + self.feedback_fn("%s failed to receive data: %s (recent output: %s)" % (dtp.data.name, ie.final_message, ie.recent_output)) dtp.RecordResult(ie.success) @@ -877,7 +957,7 @@ class DiskTransfer(object): class _DiskTransferPrivate(object): - def __init__(self, data, success): + def __init__(self, data, success, export_opts): """Initializes this class. @type data: L{DiskTransfer} @@ -885,12 +965,12 @@ class _DiskTransferPrivate(object): """ self.data = data + self.success = success + self.export_opts = export_opts self.src_export = None self.dest_import = None - self.success = success - def RecordResult(self, success): """Updates the status. @@ -900,6 +980,25 @@ class _DiskTransferPrivate(object): self.success = self.success and success +def _GetInstDiskMagic(base, instance_name, index): + """Computes the magic value for a disk export or import. + + @type base: string + @param base: Random seed value (can be the same for all disks of a transfer) + @type instance_name: string + @param instance_name: Name of instance + @type index: number + @param index: Disk index + + """ + h = compat.sha1_hash() + h.update(str(constants.RIE_VERSION)) + h.update(base) + h.update(instance_name) + h.update(str(index)) + return h.hexdigest() + + def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, instance, all_transfers): """Transfers an instance's data from one node to another. @@ -921,6 +1020,12 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, each transfer """ + # Disable compression for all moves as these are all within the same cluster + compress = constants.IEC_NONE + + logging.debug("Source node %s, destination node %s, compression '%s'", + src_node, dest_node, compress) + timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT) src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts, src_node, None, dest_node, dest_ip) @@ -929,23 +1034,29 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, all_dtp = [] + base_magic = utils.GenerateSecret(6) + ieloop = ImportExportLoop(lu) try: - for transfer in all_transfers: + for idx, transfer in enumerate(all_transfers): if transfer: feedback_fn("Exporting %s from %s to %s" % (transfer.name, src_node, dest_node)) - dtp = _DiskTransferPrivate(transfer, True) + magic = _GetInstDiskMagic(base_magic, instance.name, idx) + opts = objects.ImportExportOptions(key_name=None, ca_pem=None, + compress=compress, magic=magic) - di = DiskImport(lu, dest_node, None, None, instance, + dtp = _DiskTransferPrivate(transfer, True, opts) + + di = DiskImport(lu, dest_node, opts, instance, transfer.dest_io, transfer.dest_ioargs, timeouts, dest_cbs, private=dtp) ieloop.Add(di) dtp.dest_import = di else: - dtp = _DiskTransferPrivate(None, False) + dtp = _DiskTransferPrivate(None, False, None) all_dtp.append(dtp) @@ -954,11 +1065,537 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip, ieloop.FinalizeAll() assert len(all_dtp) == len(all_transfers) - assert compat.all([(dtp.src_export is None or + assert compat.all((dtp.src_export is None or dtp.src_export.success is not None) and (dtp.dest_import is None or dtp.dest_import.success is not None) - for dtp in all_dtp]), \ + for dtp in all_dtp), \ "Not all imports/exports are finalized" return [bool(dtp.success) for dtp in all_dtp] + + +class _RemoteExportCb(ImportExportCbBase): + def __init__(self, feedback_fn, disk_count): + """Initializes this class. + + """ + ImportExportCbBase.__init__(self) + self._feedback_fn = feedback_fn + self._dresults = [None] * disk_count + + @property + def disk_results(self): + """Returns per-disk results. + + """ + return self._dresults + + def ReportConnected(self, ie, private): + """Called when a connection has been established. + + """ + (idx, _) = private + + self._feedback_fn("Disk %s is now sending data" % idx) + + def ReportProgress(self, ie, private): + """Called when new progress information should be reported. + + """ + (idx, _) = private + + progress = ie.progress + if not progress: + return + + self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress))) + + def ReportFinished(self, ie, private): + """Called when a transfer has finished. + + """ + (idx, finished_fn) = private + + if ie.success: + self._feedback_fn("Disk %s finished sending data" % idx) + else: + self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" % + (idx, ie.final_message, ie.recent_output)) + + self._dresults[idx] = bool(ie.success) + + if finished_fn: + finished_fn() + + +class ExportInstanceHelper: + def __init__(self, lu, feedback_fn, instance): + """Initializes this class. + + @param lu: Logical unit instance + @param feedback_fn: Feedback function + @type instance: L{objects.Instance} + @param instance: Instance object + + """ + self._lu = lu + self._feedback_fn = feedback_fn + self._instance = instance + + self._snap_disks = [] + self._removed_snaps = [False] * len(instance.disks) + + def CreateSnapshots(self): + """Creates an LVM snapshot for every disk of the instance. + + """ + assert not self._snap_disks + + instance = self._instance + src_node = instance.primary_node + + for idx, disk in enumerate(instance.disks): + self._feedback_fn("Creating a snapshot of disk/%s on node %s" % + (idx, src_node)) + + # result.payload will be a snapshot of an lvm leaf of the one we + # passed + result = self._lu.rpc.call_blockdev_snapshot(src_node, disk) + new_dev = False + msg = result.fail_msg + if msg: + self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s", + idx, src_node, msg) + elif (not isinstance(result.payload, (tuple, list)) or + len(result.payload) != 2): + self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid" + " result '%s'", idx, src_node, result.payload) + else: + disk_id = tuple(result.payload) + new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size, + logical_id=disk_id, physical_id=disk_id, + iv_name=disk.iv_name) + + self._snap_disks.append(new_dev) + + assert len(self._snap_disks) == len(instance.disks) + assert len(self._removed_snaps) == len(instance.disks) + + def _RemoveSnapshot(self, disk_index): + """Removes an LVM snapshot. + + @type disk_index: number + @param disk_index: Index of the snapshot to be removed + + """ + disk = self._snap_disks[disk_index] + if disk and not self._removed_snaps[disk_index]: + src_node = self._instance.primary_node + + self._feedback_fn("Removing snapshot of disk/%s on node %s" % + (disk_index, src_node)) + + result = self._lu.rpc.call_blockdev_remove(src_node, disk) + if result.fail_msg: + self._lu.LogWarning("Could not remove snapshot for disk/%d from node" + " %s: %s", disk_index, src_node, result.fail_msg) + else: + self._removed_snaps[disk_index] = True + + def LocalExport(self, dest_node): + """Intra-cluster instance export. + + @type dest_node: L{objects.Node} + @param dest_node: Destination node + + """ + instance = self._instance + src_node = instance.primary_node + + assert len(self._snap_disks) == len(instance.disks) + + transfers = [] + + for idx, dev in enumerate(self._snap_disks): + if not dev: + transfers.append(None) + continue + + path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name, + dev.physical_id[1]) + + finished_fn = compat.partial(self._TransferFinished, idx) + + # FIXME: pass debug option from opcode to backend + dt = DiskTransfer("snapshot/%s" % idx, + constants.IEIO_SCRIPT, (dev, idx), + constants.IEIO_FILE, (path, ), + finished_fn) + transfers.append(dt) + + # Actually export data + dresults = TransferInstanceData(self._lu, self._feedback_fn, + src_node, dest_node.name, + dest_node.secondary_ip, + instance, transfers) + + assert len(dresults) == len(instance.disks) + + self._feedback_fn("Finalizing export on %s" % dest_node.name) + result = self._lu.rpc.call_finalize_export(dest_node.name, instance, + self._snap_disks) + msg = result.fail_msg + fin_resu = not msg + if msg: + self._lu.LogWarning("Could not finalize export for instance %s" + " on node %s: %s", instance.name, dest_node.name, msg) + + return (fin_resu, dresults) + + def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts): + """Inter-cluster instance export. + + @type disk_info: list + @param disk_info: Per-disk destination information + @type key_name: string + @param key_name: Name of X509 key to use + @type dest_ca_pem: string + @param dest_ca_pem: Destination X509 CA in PEM format + @type timeouts: L{ImportExportTimeouts} + @param timeouts: Timeouts for this import + + """ + instance = self._instance + + assert len(disk_info) == len(instance.disks) + + cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks)) + + ieloop = ImportExportLoop(self._lu) + try: + for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks, + disk_info)): + # Decide whether to use IPv6 + ipv6 = netutils.IP6Address.IsValid(host) + + opts = objects.ImportExportOptions(key_name=key_name, + ca_pem=dest_ca_pem, + magic=magic, ipv6=ipv6) + + self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port)) + finished_fn = compat.partial(self._TransferFinished, idx) + ieloop.Add(DiskExport(self._lu, instance.primary_node, + opts, host, port, instance, + constants.IEIO_SCRIPT, (dev, idx), + timeouts, cbs, private=(idx, finished_fn))) + + ieloop.Run() + finally: + ieloop.FinalizeAll() + + return (True, cbs.disk_results) + + def _TransferFinished(self, idx): + """Called once a transfer has finished. + + @type idx: number + @param idx: Disk index + + """ + logging.debug("Transfer %s finished", idx) + self._RemoveSnapshot(idx) + + def Cleanup(self): + """Remove all snapshots. + + """ + assert len(self._removed_snaps) == len(self._instance.disks) + for idx in range(len(self._instance.disks)): + self._RemoveSnapshot(idx) + + +class _RemoteImportCb(ImportExportCbBase): + def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count, + external_address): + """Initializes this class. + + @type cds: string + @param cds: Cluster domain secret + @type x509_cert_pem: string + @param x509_cert_pem: CA used for signing import key + @type disk_count: number + @param disk_count: Number of disks + @type external_address: string + @param external_address: External address of destination node + + """ + ImportExportCbBase.__init__(self) + self._feedback_fn = feedback_fn + self._cds = cds + self._x509_cert_pem = x509_cert_pem + self._disk_count = disk_count + self._external_address = external_address + + self._dresults = [None] * disk_count + self._daemon_port = [None] * disk_count + + self._salt = utils.GenerateSecret(8) + + @property + def disk_results(self): + """Returns per-disk results. + + """ + return self._dresults + + def _CheckAllListening(self): + """Checks whether all daemons are listening. + + If all daemons are listening, the information is sent to the client. + + """ + if not compat.all(dp is not None for dp in self._daemon_port): + return + + host = self._external_address + + disks = [] + for idx, (port, magic) in enumerate(self._daemon_port): + disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt, + idx, host, port, magic)) + + assert len(disks) == self._disk_count + + self._feedback_fn(constants.ELOG_REMOTE_IMPORT, { + "disks": disks, + "x509_ca": self._x509_cert_pem, + }) + + def ReportListening(self, ie, private): + """Called when daemon started listening. + + """ + (idx, ) = private + + self._feedback_fn("Disk %s is now listening" % idx) + + assert self._daemon_port[idx] is None + + self._daemon_port[idx] = (ie.listen_port, ie.magic) + + self._CheckAllListening() + + def ReportConnected(self, ie, private): + """Called when a connection has been established. + + """ + (idx, ) = private + + self._feedback_fn("Disk %s is now receiving data" % idx) + + def ReportFinished(self, ie, private): + """Called when a transfer has finished. + + """ + (idx, ) = private + + # Daemon is certainly no longer listening + self._daemon_port[idx] = None + + if ie.success: + self._feedback_fn("Disk %s finished receiving data" % idx) + else: + self._feedback_fn(("Disk %s failed to receive data: %s" + " (recent output: %s)") % + (idx, ie.final_message, ie.recent_output)) + + self._dresults[idx] = bool(ie.success) + + +def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca, + cds, timeouts): + """Imports an instance from another cluster. + + @param lu: Logical unit instance + @param feedback_fn: Feedback function + @type instance: L{objects.Instance} + @param instance: Instance object + @type pnode: L{objects.Node} + @param pnode: Primary node of instance as an object + @type source_x509_ca: OpenSSL.crypto.X509 + @param source_x509_ca: Import source's X509 CA + @type cds: string + @param cds: Cluster domain secret + @type timeouts: L{ImportExportTimeouts} + @param timeouts: Timeouts for this import + + """ + source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, + source_x509_ca) + + magic_base = utils.GenerateSecret(6) + + # Decide whether to use IPv6 + ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip) + + # Create crypto key + result = lu.rpc.call_x509_cert_create(instance.primary_node, + constants.RIE_CERT_VALIDITY) + result.Raise("Can't create X509 key and certificate on %s" % result.node) + + (x509_key_name, x509_cert_pem) = result.payload + try: + # Load certificate + x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, + x509_cert_pem) + + # Sign certificate + signed_x509_cert_pem = \ + utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8)) + + cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem, + len(instance.disks), pnode.primary_ip) + + ieloop = ImportExportLoop(lu) + try: + for idx, dev in enumerate(instance.disks): + magic = _GetInstDiskMagic(magic_base, instance.name, idx) + + # Import daemon options + opts = objects.ImportExportOptions(key_name=x509_key_name, + ca_pem=source_ca_pem, + magic=magic, ipv6=ipv6) + + ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance, + constants.IEIO_SCRIPT, (dev, idx), + timeouts, cbs, private=(idx, ))) + + ieloop.Run() + finally: + ieloop.FinalizeAll() + finally: + # Remove crypto key and certificate + result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name) + result.Raise("Can't remove X509 key and certificate on %s" % result.node) + + return cbs.disk_results + + +def _GetImportExportHandshakeMessage(version): + """Returns the handshake message for a RIE protocol version. + + @type version: number + + """ + return "%s:%s" % (version, constants.RIE_HANDSHAKE) + + +def ComputeRemoteExportHandshake(cds): + """Computes the remote import/export handshake. + + @type cds: string + @param cds: Cluster domain secret + + """ + salt = utils.GenerateSecret(8) + msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION) + return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt) + + +def CheckRemoteExportHandshake(cds, handshake): + """Checks the handshake of a remote import/export. + + @type cds: string + @param cds: Cluster domain secret + @type handshake: sequence + @param handshake: Handshake sent by remote peer + + """ + try: + (version, hmac_digest, hmac_salt) = handshake + except (TypeError, ValueError), err: + return "Invalid data: %s" % err + + if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version), + hmac_digest, salt=hmac_salt): + return "Hash didn't match, clusters don't share the same domain secret" + + if version != constants.RIE_VERSION: + return ("Clusters don't have the same remote import/export protocol" + " (local=%s, remote=%s)" % + (constants.RIE_VERSION, version)) + + return None + + +def _GetRieDiskInfoMessage(disk_index, host, port, magic): + """Returns the hashed text for import/export disk information. + + @type disk_index: number + @param disk_index: Index of disk (included in hash) + @type host: string + @param host: Hostname + @type port: number + @param port: Daemon port + @type magic: string + @param magic: Magic value + + """ + return "%s:%s:%s:%s" % (disk_index, host, port, magic) + + +def CheckRemoteExportDiskInfo(cds, disk_index, disk_info): + """Verifies received disk information for an export. + + @type cds: string + @param cds: Cluster domain secret + @type disk_index: number + @param disk_index: Index of disk (included in hash) + @type disk_info: sequence + @param disk_info: Disk information sent by remote peer + + """ + try: + (host, port, magic, hmac_digest, hmac_salt) = disk_info + except (TypeError, ValueError), err: + raise errors.GenericError("Invalid data: %s" % err) + + if not (host and port and magic): + raise errors.GenericError("Missing destination host, port or magic") + + msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) + + if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt): + raise errors.GenericError("HMAC is wrong") + + if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host): + destination = host + else: + destination = netutils.Hostname.GetNormalizedName(host) + + return (destination, + utils.ValidateServiceName(port), + magic) + + +def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic): + """Computes the signed disk information for a remote import. + + @type cds: string + @param cds: Cluster domain secret + @type salt: string + @param salt: HMAC salt + @type disk_index: number + @param disk_index: Index of disk (included in hash) + @type host: string + @param host: Hostname + @type port: number + @param port: Daemon port + @type magic: string + @param magic: Magic value + + """ + msg = _GetRieDiskInfoMessage(disk_index, host, port, magic) + hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt) + return (host, port, magic, hmac_digest, salt)