#
#
-# Copyright (C) 2010 Google Inc.
+# Copyright (C) 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
import logging
import time
+import OpenSSL
from ganeti import constants
from ganeti import errors
from ganeti import compat
+from ganeti import utils
+from ganeti import objects
+from ganeti import netutils
class _ImportExportError(Exception):
#: Time after which daemon must be listening
DEFAULT_LISTEN_TIMEOUT = 10
+ #: Progress update interval
+ DEFAULT_PROGRESS_INTERVAL = 60
+
__slots__ = [
"error",
"ready",
"listen",
"connect",
+ "progress",
]
def __init__(self, connect,
listen=DEFAULT_LISTEN_TIMEOUT,
error=DEFAULT_ERROR_TIMEOUT,
- ready=DEFAULT_READY_TIMEOUT):
+ ready=DEFAULT_READY_TIMEOUT,
+ progress=DEFAULT_PROGRESS_INTERVAL):
"""Initializes this class.
@type connect: number
@param error: Length of time until errors cause hard failure
@type ready: number
@param ready: Timeout for daemon to become ready
+ @type progress: number
+ @param progress: Progress update interval
"""
self.error = error
self.ready = ready
self.listen = listen
self.connect = connect
+ self.progress = progress
class ImportExportCbBase(object):
"""
+ def ReportProgress(self, ie, private):
+ """Called when new progress information should be reported.
+
+ @type ie: Subclass of L{_DiskImportExportBase}
+ @param ie: Import/export object
+ @param private: Private data passed to import/export object
+
+ """
+
def ReportFinished(self, ie, private):
"""Called when a transfer has finished.
class _DiskImportExportBase(object):
MODE_TEXT = None
- def __init__(self, lu, node_name, x509_key_name, remote_x509_ca,
+ def __init__(self, lu, node_name, opts,
instance, timeouts, cbs, private=None):
"""Initializes this class.
@param lu: Logical unit instance
@type node_name: string
@param node_name: Node name for import
- @type x509_key_name: string
- @param x509_key_name: Name of X509 key (None for node daemon key)
- @type remote_x509_ca: string
- @param remote_x509_ca: Remote peer's CA (None for node daemon certificate)
+ @type opts: L{objects.ImportExportOptions}
+ @param opts: Import/export daemon options
@type instance: L{objects.Instance}
@param instance: Instance object
@type timeouts: L{ImportExportTimeouts}
self._lu = lu
self.node_name = node_name
- self._x509_key_name = x509_key_name
- self._remote_x509_ca = remote_x509_ca
+ self._opts = opts.Copy()
self._instance = instance
self._timeouts = timeouts
self._cbs = cbs
self._private = private
+ # Set master daemon's timeout in options for import/export daemon
+ assert self._opts.connect_timeout is None
+ self._opts.connect_timeout = timeouts.connect
+
# Parent loop
self._loop = None
self._ts_connected = None
self._ts_finished = None
self._ts_cleanup = None
+ self._ts_last_progress = None
self._ts_last_error = None
# Transfer status
"""
if self._daemon:
- return self._daemon.recent_output
+ return "\n".join(self._daemon.recent_output)
return None
@property
+ def progress(self):
+ """Returns transfer progress information.
+
+ """
+ if not self._daemon:
+ return None
+
+ return (self._daemon.progress_mbytes,
+ self._daemon.progress_throughput,
+ self._daemon.progress_percent,
+ self._daemon.progress_eta)
+
+ @property
+ def magic(self):
+ """Returns the magic value for this import/export.
+
+ """
+ return self._opts.magic
+
+ @property
def active(self):
"""Determines whether this transport is still active.
return False
+ def _CheckProgress(self):
+ """Checks whether a progress update should be reported.
+
+ """
+ if ((self._ts_last_progress is None or
+ _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
+ self._daemon and
+ self._daemon.progress_mbytes is not None and
+ self._daemon.progress_throughput is not None):
+ self._cbs.ReportProgress(self, self._private)
+ self._ts_last_progress = time.time()
+
def CheckFinished(self):
"""Checks whether the daemon exited.
return True
if self._daemon.exit_status is None:
+ # TODO: Adjust delay for ETA expiring soon
+ self._CheckProgress()
return False
self._ts_finished = time.time()
"""Finalizes this import/export.
"""
- assert error or self.success is not None
-
if self._daemon_name:
logging.info("Finalizing %s %r on %s",
self.MODE_TEXT, self._daemon_name, self.node_name)
class DiskImport(_DiskImportExportBase):
MODE_TEXT = "import"
- def __init__(self, lu, node_name, x509_key_name, source_x509_ca, instance,
+ def __init__(self, lu, node_name, opts, instance,
dest, dest_args, timeouts, cbs, private=None):
"""Initializes this class.
@param lu: Logical unit instance
@type node_name: string
@param node_name: Node name for import
- @type x509_key_name: string
- @param x509_key_name: Name of X509 key (None for node daemon key)
- @type source_x509_ca: string
- @param source_x509_ca: Remote peer's CA (None for node daemon certificate)
+ @type opts: L{objects.ImportExportOptions}
+ @param opts: Import/export daemon options
@type instance: L{objects.Instance}
@param instance: Instance object
@param dest: I/O destination
@param private: Private data for callback functions
"""
- _DiskImportExportBase.__init__(self, lu, node_name,
- x509_key_name, source_x509_ca,
+ _DiskImportExportBase.__init__(self, lu, node_name, opts,
instance, timeouts, cbs, private)
self._dest = dest
self._dest_args = dest_args
"""Starts the import daemon.
"""
- return self._lu.rpc.call_import_start(self.node_name,
- self._x509_key_name,
- self._remote_x509_ca, self._instance,
+ return self._lu.rpc.call_import_start(self.node_name, self._opts,
+ self._instance,
self._dest, self._dest_args)
def CheckListening(self):
class DiskExport(_DiskImportExportBase):
MODE_TEXT = "export"
- def __init__(self, lu, node_name, x509_key_name, dest_x509_ca,
+ def __init__(self, lu, node_name, opts,
dest_host, dest_port, instance, source, source_args,
timeouts, cbs, private=None):
"""Initializes this class.
@param lu: Logical unit instance
@type node_name: string
@param node_name: Node name for import
- @type x509_key_name: string
- @param x509_key_name: Name of X509 key (None for node daemon key)
- @type dest_x509_ca: string
- @param dest_x509_ca: Remote peer's CA (None for node daemon certificate)
+ @type opts: L{objects.ImportExportOptions}
+ @param opts: Import/export daemon options
@type dest_host: string
@param dest_host: Destination host name or IP address
@type dest_port: number
@param private: Private data for callback functions
"""
- _DiskImportExportBase.__init__(self, lu, node_name,
- x509_key_name, dest_x509_ca,
+ _DiskImportExportBase.__init__(self, lu, node_name, opts,
instance, timeouts, cbs, private)
self._dest_host = dest_host
self._dest_port = dest_port
"""Starts the export daemon.
"""
- return self._lu.rpc.call_export_start(self.node_name, self._x509_key_name,
- self._remote_x509_ca,
+ return self._lu.rpc.call_export_start(self.node_name, self._opts,
self._dest_host, self._dest_port,
self._instance, self._source,
self._source_args)
return self._ts_begin
+def FormatProgress(progress):
+ """Formats progress information for user consumption
+
+ """
+ (mbytes, throughput, percent, eta) = progress
+
+ parts = [
+ utils.FormatUnit(mbytes, "h"),
+
+ # Not using FormatUnit as it doesn't support kilobytes
+ "%0.1f MiB/s" % throughput,
+ ]
+
+ if percent is not None:
+ parts.append("%d%%" % percent)
+
+ if eta is not None:
+ parts.append("ETA %s" % utils.FormatSeconds(eta))
+
+ return utils.CommaJoin(parts)
+
+
class ImportExportLoop:
MIN_DELAY = 1.0
MAX_DELAY = 20.0
logging.exception("%s failed", diskie.MODE_TEXT)
diskie.Finalize(error=str(err))
- if not compat.any([diskie.active for diskie in self._queue]):
+ if not compat.any(diskie.active for diskie in self._queue):
break
# Wait a bit
self.feedback_fn("%s is sending data on %s" %
(dtp.data.name, ie.node_name))
+ def ReportProgress(self, ie, dtp):
+ """Called when new progress information should be reported.
+
+ """
+ progress = ie.progress
+ if not progress:
+ return
+
+ self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
+
def ReportFinished(self, ie, dtp):
"""Called when a transfer has finished.
if ie.success:
self.feedback_fn("%s finished sending data" % dtp.data.name)
else:
- self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
+ self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
(dtp.data.name, ie.final_message, ie.recent_output))
dtp.RecordResult(ie.success)
assert self.src_cbs
assert dtp.src_export is None
assert dtp.dest_import
+ assert dtp.export_opts
self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
# Start export on source node
- de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip,
- ie.listen_port, self.instance,
- dtp.data.src_io, dtp.data.src_ioargs,
+ de = DiskExport(self.lu, self.src_node, dtp.export_opts,
+ self.dest_ip, ie.listen_port,
+ self.instance, dtp.data.src_io, dtp.data.src_ioargs,
self.timeouts, self.src_cbs, private=dtp)
ie.loop.Add(de)
if ie.success:
self.feedback_fn("%s finished receiving data" % dtp.data.name)
else:
- self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
+ self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
(dtp.data.name, ie.final_message, ie.recent_output))
dtp.RecordResult(ie.success)
class _DiskTransferPrivate(object):
- def __init__(self, data, success):
+ def __init__(self, data, success, export_opts):
"""Initializes this class.
@type data: L{DiskTransfer}
"""
self.data = data
+ self.success = success
+ self.export_opts = export_opts
self.src_export = None
self.dest_import = None
- self.success = success
-
def RecordResult(self, success):
"""Updates the status.
self.success = self.success and success
+def _GetInstDiskMagic(base, instance_name, index):
+ """Computes the magic value for a disk export or import.
+
+ @type base: string
+ @param base: Random seed value (can be the same for all disks of a transfer)
+ @type instance_name: string
+ @param instance_name: Name of instance
+ @type index: number
+ @param index: Disk index
+
+ """
+ h = compat.sha1_hash()
+ h.update(str(constants.RIE_VERSION))
+ h.update(base)
+ h.update(instance_name)
+ h.update(str(index))
+ return h.hexdigest()
+
+
def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
instance, all_transfers):
"""Transfers an instance's data from one node to another.
each transfer
"""
+ # Disable compression for all moves as these are all within the same cluster
+ compress = constants.IEC_NONE
+
+ logging.debug("Source node %s, destination node %s, compression '%s'",
+ src_node, dest_node, compress)
+
timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
src_node, None, dest_node, dest_ip)
all_dtp = []
+ base_magic = utils.GenerateSecret(6)
+
ieloop = ImportExportLoop(lu)
try:
- for transfer in all_transfers:
+ for idx, transfer in enumerate(all_transfers):
if transfer:
feedback_fn("Exporting %s from %s to %s" %
(transfer.name, src_node, dest_node))
- dtp = _DiskTransferPrivate(transfer, True)
+ magic = _GetInstDiskMagic(base_magic, instance.name, idx)
+ opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
+ compress=compress, magic=magic)
- di = DiskImport(lu, dest_node, None, None, instance,
+ dtp = _DiskTransferPrivate(transfer, True, opts)
+
+ di = DiskImport(lu, dest_node, opts, instance,
transfer.dest_io, transfer.dest_ioargs,
timeouts, dest_cbs, private=dtp)
ieloop.Add(di)
dtp.dest_import = di
else:
- dtp = _DiskTransferPrivate(None, False)
+ dtp = _DiskTransferPrivate(None, False, None)
all_dtp.append(dtp)
ieloop.FinalizeAll()
assert len(all_dtp) == len(all_transfers)
- assert compat.all([(dtp.src_export is None or
+ assert compat.all((dtp.src_export is None or
dtp.src_export.success is not None) and
(dtp.dest_import is None or
dtp.dest_import.success is not None)
- for dtp in all_dtp]), \
+ for dtp in all_dtp), \
"Not all imports/exports are finalized"
return [bool(dtp.success) for dtp in all_dtp]
+
+
+class _RemoteExportCb(ImportExportCbBase):
+ def __init__(self, feedback_fn, disk_count):
+ """Initializes this class.
+
+ """
+ ImportExportCbBase.__init__(self)
+ self._feedback_fn = feedback_fn
+ self._dresults = [None] * disk_count
+
+ @property
+ def disk_results(self):
+ """Returns per-disk results.
+
+ """
+ return self._dresults
+
+ def ReportConnected(self, ie, private):
+ """Called when a connection has been established.
+
+ """
+ (idx, _) = private
+
+ self._feedback_fn("Disk %s is now sending data" % idx)
+
+ def ReportProgress(self, ie, private):
+ """Called when new progress information should be reported.
+
+ """
+ (idx, _) = private
+
+ progress = ie.progress
+ if not progress:
+ return
+
+ self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
+
+ def ReportFinished(self, ie, private):
+ """Called when a transfer has finished.
+
+ """
+ (idx, finished_fn) = private
+
+ if ie.success:
+ self._feedback_fn("Disk %s finished sending data" % idx)
+ else:
+ self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
+ (idx, ie.final_message, ie.recent_output))
+
+ self._dresults[idx] = bool(ie.success)
+
+ if finished_fn:
+ finished_fn()
+
+
+class ExportInstanceHelper:
+ def __init__(self, lu, feedback_fn, instance):
+ """Initializes this class.
+
+ @param lu: Logical unit instance
+ @param feedback_fn: Feedback function
+ @type instance: L{objects.Instance}
+ @param instance: Instance object
+
+ """
+ self._lu = lu
+ self._feedback_fn = feedback_fn
+ self._instance = instance
+
+ self._snap_disks = []
+ self._removed_snaps = [False] * len(instance.disks)
+
+ def CreateSnapshots(self):
+ """Creates an LVM snapshot for every disk of the instance.
+
+ """
+ assert not self._snap_disks
+
+ instance = self._instance
+ src_node = instance.primary_node
+
+ for idx, disk in enumerate(instance.disks):
+ self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
+ (idx, src_node))
+
+ # result.payload will be a snapshot of an lvm leaf of the one we
+ # passed
+ result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
+ new_dev = False
+ msg = result.fail_msg
+ if msg:
+ self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
+ idx, src_node, msg)
+ elif (not isinstance(result.payload, (tuple, list)) or
+ len(result.payload) != 2):
+ self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
+ " result '%s'", idx, src_node, result.payload)
+ else:
+ disk_id = tuple(result.payload)
+ new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
+ logical_id=disk_id, physical_id=disk_id,
+ iv_name=disk.iv_name)
+
+ self._snap_disks.append(new_dev)
+
+ assert len(self._snap_disks) == len(instance.disks)
+ assert len(self._removed_snaps) == len(instance.disks)
+
+ def _RemoveSnapshot(self, disk_index):
+ """Removes an LVM snapshot.
+
+ @type disk_index: number
+ @param disk_index: Index of the snapshot to be removed
+
+ """
+ disk = self._snap_disks[disk_index]
+ if disk and not self._removed_snaps[disk_index]:
+ src_node = self._instance.primary_node
+
+ self._feedback_fn("Removing snapshot of disk/%s on node %s" %
+ (disk_index, src_node))
+
+ result = self._lu.rpc.call_blockdev_remove(src_node, disk)
+ if result.fail_msg:
+ self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
+ " %s: %s", disk_index, src_node, result.fail_msg)
+ else:
+ self._removed_snaps[disk_index] = True
+
+ def LocalExport(self, dest_node):
+ """Intra-cluster instance export.
+
+ @type dest_node: L{objects.Node}
+ @param dest_node: Destination node
+
+ """
+ instance = self._instance
+ src_node = instance.primary_node
+
+ assert len(self._snap_disks) == len(instance.disks)
+
+ transfers = []
+
+ for idx, dev in enumerate(self._snap_disks):
+ if not dev:
+ transfers.append(None)
+ continue
+
+ path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
+ dev.physical_id[1])
+
+ finished_fn = compat.partial(self._TransferFinished, idx)
+
+ # FIXME: pass debug option from opcode to backend
+ dt = DiskTransfer("snapshot/%s" % idx,
+ constants.IEIO_SCRIPT, (dev, idx),
+ constants.IEIO_FILE, (path, ),
+ finished_fn)
+ transfers.append(dt)
+
+ # Actually export data
+ dresults = TransferInstanceData(self._lu, self._feedback_fn,
+ src_node, dest_node.name,
+ dest_node.secondary_ip,
+ instance, transfers)
+
+ assert len(dresults) == len(instance.disks)
+
+ self._feedback_fn("Finalizing export on %s" % dest_node.name)
+ result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
+ self._snap_disks)
+ msg = result.fail_msg
+ fin_resu = not msg
+ if msg:
+ self._lu.LogWarning("Could not finalize export for instance %s"
+ " on node %s: %s", instance.name, dest_node.name, msg)
+
+ return (fin_resu, dresults)
+
+ def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
+ """Inter-cluster instance export.
+
+ @type disk_info: list
+ @param disk_info: Per-disk destination information
+ @type key_name: string
+ @param key_name: Name of X509 key to use
+ @type dest_ca_pem: string
+ @param dest_ca_pem: Destination X509 CA in PEM format
+ @type timeouts: L{ImportExportTimeouts}
+ @param timeouts: Timeouts for this import
+
+ """
+ instance = self._instance
+
+ assert len(disk_info) == len(instance.disks)
+
+ cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
+
+ ieloop = ImportExportLoop(self._lu)
+ try:
+ for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
+ disk_info)):
+ # Decide whether to use IPv6
+ ipv6 = netutils.IP6Address.IsValid(host)
+
+ opts = objects.ImportExportOptions(key_name=key_name,
+ ca_pem=dest_ca_pem,
+ magic=magic, ipv6=ipv6)
+
+ self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
+ finished_fn = compat.partial(self._TransferFinished, idx)
+ ieloop.Add(DiskExport(self._lu, instance.primary_node,
+ opts, host, port, instance,
+ constants.IEIO_SCRIPT, (dev, idx),
+ timeouts, cbs, private=(idx, finished_fn)))
+
+ ieloop.Run()
+ finally:
+ ieloop.FinalizeAll()
+
+ return (True, cbs.disk_results)
+
+ def _TransferFinished(self, idx):
+ """Called once a transfer has finished.
+
+ @type idx: number
+ @param idx: Disk index
+
+ """
+ logging.debug("Transfer %s finished", idx)
+ self._RemoveSnapshot(idx)
+
+ def Cleanup(self):
+ """Remove all snapshots.
+
+ """
+ assert len(self._removed_snaps) == len(self._instance.disks)
+ for idx in range(len(self._instance.disks)):
+ self._RemoveSnapshot(idx)
+
+
+class _RemoteImportCb(ImportExportCbBase):
+ def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
+ external_address):
+ """Initializes this class.
+
+ @type cds: string
+ @param cds: Cluster domain secret
+ @type x509_cert_pem: string
+ @param x509_cert_pem: CA used for signing import key
+ @type disk_count: number
+ @param disk_count: Number of disks
+ @type external_address: string
+ @param external_address: External address of destination node
+
+ """
+ ImportExportCbBase.__init__(self)
+ self._feedback_fn = feedback_fn
+ self._cds = cds
+ self._x509_cert_pem = x509_cert_pem
+ self._disk_count = disk_count
+ self._external_address = external_address
+
+ self._dresults = [None] * disk_count
+ self._daemon_port = [None] * disk_count
+
+ self._salt = utils.GenerateSecret(8)
+
+ @property
+ def disk_results(self):
+ """Returns per-disk results.
+
+ """
+ return self._dresults
+
+ def _CheckAllListening(self):
+ """Checks whether all daemons are listening.
+
+ If all daemons are listening, the information is sent to the client.
+
+ """
+ if not compat.all(dp is not None for dp in self._daemon_port):
+ return
+
+ host = self._external_address
+
+ disks = []
+ for idx, (port, magic) in enumerate(self._daemon_port):
+ disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
+ idx, host, port, magic))
+
+ assert len(disks) == self._disk_count
+
+ self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
+ "disks": disks,
+ "x509_ca": self._x509_cert_pem,
+ })
+
+ def ReportListening(self, ie, private):
+ """Called when daemon started listening.
+
+ """
+ (idx, ) = private
+
+ self._feedback_fn("Disk %s is now listening" % idx)
+
+ assert self._daemon_port[idx] is None
+
+ self._daemon_port[idx] = (ie.listen_port, ie.magic)
+
+ self._CheckAllListening()
+
+ def ReportConnected(self, ie, private):
+ """Called when a connection has been established.
+
+ """
+ (idx, ) = private
+
+ self._feedback_fn("Disk %s is now receiving data" % idx)
+
+ def ReportFinished(self, ie, private):
+ """Called when a transfer has finished.
+
+ """
+ (idx, ) = private
+
+ # Daemon is certainly no longer listening
+ self._daemon_port[idx] = None
+
+ if ie.success:
+ self._feedback_fn("Disk %s finished receiving data" % idx)
+ else:
+ self._feedback_fn(("Disk %s failed to receive data: %s"
+ " (recent output: %s)") %
+ (idx, ie.final_message, ie.recent_output))
+
+ self._dresults[idx] = bool(ie.success)
+
+
+def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
+ cds, timeouts):
+ """Imports an instance from another cluster.
+
+ @param lu: Logical unit instance
+ @param feedback_fn: Feedback function
+ @type instance: L{objects.Instance}
+ @param instance: Instance object
+ @type pnode: L{objects.Node}
+ @param pnode: Primary node of instance as an object
+ @type source_x509_ca: OpenSSL.crypto.X509
+ @param source_x509_ca: Import source's X509 CA
+ @type cds: string
+ @param cds: Cluster domain secret
+ @type timeouts: L{ImportExportTimeouts}
+ @param timeouts: Timeouts for this import
+
+ """
+ source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ source_x509_ca)
+
+ magic_base = utils.GenerateSecret(6)
+
+ # Decide whether to use IPv6
+ ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
+
+ # Create crypto key
+ result = lu.rpc.call_x509_cert_create(instance.primary_node,
+ constants.RIE_CERT_VALIDITY)
+ result.Raise("Can't create X509 key and certificate on %s" % result.node)
+
+ (x509_key_name, x509_cert_pem) = result.payload
+ try:
+ # Load certificate
+ x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ x509_cert_pem)
+
+ # Sign certificate
+ signed_x509_cert_pem = \
+ utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
+
+ cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
+ len(instance.disks), pnode.primary_ip)
+
+ ieloop = ImportExportLoop(lu)
+ try:
+ for idx, dev in enumerate(instance.disks):
+ magic = _GetInstDiskMagic(magic_base, instance.name, idx)
+
+ # Import daemon options
+ opts = objects.ImportExportOptions(key_name=x509_key_name,
+ ca_pem=source_ca_pem,
+ magic=magic, ipv6=ipv6)
+
+ ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
+ constants.IEIO_SCRIPT, (dev, idx),
+ timeouts, cbs, private=(idx, )))
+
+ ieloop.Run()
+ finally:
+ ieloop.FinalizeAll()
+ finally:
+ # Remove crypto key and certificate
+ result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
+ result.Raise("Can't remove X509 key and certificate on %s" % result.node)
+
+ return cbs.disk_results
+
+
+def _GetImportExportHandshakeMessage(version):
+ """Returns the handshake message for a RIE protocol version.
+
+ @type version: number
+
+ """
+ return "%s:%s" % (version, constants.RIE_HANDSHAKE)
+
+
+def ComputeRemoteExportHandshake(cds):
+ """Computes the remote import/export handshake.
+
+ @type cds: string
+ @param cds: Cluster domain secret
+
+ """
+ salt = utils.GenerateSecret(8)
+ msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
+ return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
+
+
+def CheckRemoteExportHandshake(cds, handshake):
+ """Checks the handshake of a remote import/export.
+
+ @type cds: string
+ @param cds: Cluster domain secret
+ @type handshake: sequence
+ @param handshake: Handshake sent by remote peer
+
+ """
+ try:
+ (version, hmac_digest, hmac_salt) = handshake
+ except (TypeError, ValueError), err:
+ return "Invalid data: %s" % err
+
+ if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
+ hmac_digest, salt=hmac_salt):
+ return "Hash didn't match, clusters don't share the same domain secret"
+
+ if version != constants.RIE_VERSION:
+ return ("Clusters don't have the same remote import/export protocol"
+ " (local=%s, remote=%s)" %
+ (constants.RIE_VERSION, version))
+
+ return None
+
+
+def _GetRieDiskInfoMessage(disk_index, host, port, magic):
+ """Returns the hashed text for import/export disk information.
+
+ @type disk_index: number
+ @param disk_index: Index of disk (included in hash)
+ @type host: string
+ @param host: Hostname
+ @type port: number
+ @param port: Daemon port
+ @type magic: string
+ @param magic: Magic value
+
+ """
+ return "%s:%s:%s:%s" % (disk_index, host, port, magic)
+
+
+def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
+ """Verifies received disk information for an export.
+
+ @type cds: string
+ @param cds: Cluster domain secret
+ @type disk_index: number
+ @param disk_index: Index of disk (included in hash)
+ @type disk_info: sequence
+ @param disk_info: Disk information sent by remote peer
+
+ """
+ try:
+ (host, port, magic, hmac_digest, hmac_salt) = disk_info
+ except (TypeError, ValueError), err:
+ raise errors.GenericError("Invalid data: %s" % err)
+
+ if not (host and port and magic):
+ raise errors.GenericError("Missing destination host, port or magic")
+
+ msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
+
+ if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
+ raise errors.GenericError("HMAC is wrong")
+
+ if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
+ destination = host
+ else:
+ destination = netutils.Hostname.GetNormalizedName(host)
+
+ return (destination,
+ utils.ValidateServiceName(port),
+ magic)
+
+
+def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
+ """Computes the signed disk information for a remote import.
+
+ @type cds: string
+ @param cds: Cluster domain secret
+ @type salt: string
+ @param salt: HMAC salt
+ @type disk_index: number
+ @param disk_index: Index of disk (included in hash)
+ @type host: string
+ @param host: Hostname
+ @type port: number
+ @param port: Daemon port
+ @type magic: string
+ @param magic: Magic value
+
+ """
+ msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
+ hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
+ return (host, port, magic, hmac_digest, salt)