mlock: fail gracefully if libc.so.6 cannot be loaded
[ganeti-local] / lib / masterd / instance.py
index 9872467..45abf49 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2010 Google Inc.
+# Copyright (C) 2010, 2011 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 
 import logging
 import time
+import OpenSSL
 
 from ganeti import constants
 from ganeti import errors
 from ganeti import compat
+from ganeti import utils
+from ganeti import objects
+from ganeti import netutils
 
 
 class _ImportExportError(Exception):
@@ -47,17 +51,22 @@ class ImportExportTimeouts(object):
   #: Time after which daemon must be listening
   DEFAULT_LISTEN_TIMEOUT = 10
 
+  #: Progress update interval
+  DEFAULT_PROGRESS_INTERVAL = 60
+
   __slots__ = [
     "error",
     "ready",
     "listen",
     "connect",
+    "progress",
     ]
 
   def __init__(self, connect,
                listen=DEFAULT_LISTEN_TIMEOUT,
                error=DEFAULT_ERROR_TIMEOUT,
-               ready=DEFAULT_READY_TIMEOUT):
+               ready=DEFAULT_READY_TIMEOUT,
+               progress=DEFAULT_PROGRESS_INTERVAL):
     """Initializes this class.
 
     @type connect: number
@@ -68,12 +77,15 @@ class ImportExportTimeouts(object):
     @param error: Length of time until errors cause hard failure
     @type ready: number
     @param ready: Timeout for daemon to become ready
+    @type progress: number
+    @param progress: Progress update interval
 
     """
     self.error = error
     self.ready = ready
     self.listen = listen
     self.connect = connect
+    self.progress = progress
 
 
 class ImportExportCbBase(object):
@@ -98,6 +110,15 @@ class ImportExportCbBase(object):
 
     """
 
+  def ReportProgress(self, ie, private):
+    """Called when new progress information should be reported.
+
+    @type ie: Subclass of L{_DiskImportExportBase}
+    @param ie: Import/export object
+    @param private: Private data passed to import/export object
+
+    """
+
   def ReportFinished(self, ie, private):
     """Called when a transfer has finished.
 
@@ -118,17 +139,15 @@ def _TimeoutExpired(epoch, timeout, _time_fn=time.time):
 class _DiskImportExportBase(object):
   MODE_TEXT = None
 
-  def __init__(self, lu, node_name, x509_key_name, remote_x509_ca,
+  def __init__(self, lu, node_name, opts,
                instance, timeouts, cbs, private=None):
     """Initializes this class.
 
     @param lu: Logical unit instance
     @type node_name: string
     @param node_name: Node name for import
-    @type x509_key_name: string
-    @param x509_key_name: Name of X509 key (None for node daemon key)
-    @type remote_x509_ca: string
-    @param remote_x509_ca: Remote peer's CA (None for node daemon certificate)
+    @type opts: L{objects.ImportExportOptions}
+    @param opts: Import/export daemon options
     @type instance: L{objects.Instance}
     @param instance: Instance object
     @type timeouts: L{ImportExportTimeouts}
@@ -142,13 +161,16 @@ class _DiskImportExportBase(object):
 
     self._lu = lu
     self.node_name = node_name
-    self._x509_key_name = x509_key_name
-    self._remote_x509_ca = remote_x509_ca
+    self._opts = opts.Copy()
     self._instance = instance
     self._timeouts = timeouts
     self._cbs = cbs
     self._private = private
 
+    # Set master daemon's timeout in options for import/export daemon
+    assert self._opts.connect_timeout is None
+    self._opts.connect_timeout = timeouts.connect
+
     # Parent loop
     self._loop = None
 
@@ -157,6 +179,7 @@ class _DiskImportExportBase(object):
     self._ts_connected = None
     self._ts_finished = None
     self._ts_cleanup = None
+    self._ts_last_progress = None
     self._ts_last_error = None
 
     # Transfer status
@@ -173,11 +196,31 @@ class _DiskImportExportBase(object):
 
     """
     if self._daemon:
-      return self._daemon.recent_output
+      return "\n".join(self._daemon.recent_output)
 
     return None
 
   @property
+  def progress(self):
+    """Returns transfer progress information.
+
+    """
+    if not self._daemon:
+      return None
+
+    return (self._daemon.progress_mbytes,
+            self._daemon.progress_throughput,
+            self._daemon.progress_percent,
+            self._daemon.progress_eta)
+
+  @property
+  def magic(self):
+    """Returns the magic value for this import/export.
+
+    """
+    return self._opts.magic
+
+  @property
   def active(self):
     """Determines whether this transport is still active.
 
@@ -345,6 +388,18 @@ class _DiskImportExportBase(object):
 
     return False
 
+  def _CheckProgress(self):
+    """Checks whether a progress update should be reported.
+
+    """
+    if ((self._ts_last_progress is None or
+         _TimeoutExpired(self._ts_last_progress, self._timeouts.progress)) and
+        self._daemon and
+        self._daemon.progress_mbytes is not None and
+        self._daemon.progress_throughput is not None):
+      self._cbs.ReportProgress(self, self._private)
+      self._ts_last_progress = time.time()
+
   def CheckFinished(self):
     """Checks whether the daemon exited.
 
@@ -358,6 +413,8 @@ class _DiskImportExportBase(object):
       return True
 
     if self._daemon.exit_status is None:
+      # TODO: Adjust delay for ETA expiring soon
+      self._CheckProgress()
       return False
 
     self._ts_finished = time.time()
@@ -404,8 +461,6 @@ class _DiskImportExportBase(object):
     """Finalizes this import/export.
 
     """
-    assert error or self.success is not None
-
     if self._daemon_name:
       logging.info("Finalizing %s %r on %s",
                    self.MODE_TEXT, self._daemon_name, self.node_name)
@@ -430,17 +485,15 @@ class _DiskImportExportBase(object):
 class DiskImport(_DiskImportExportBase):
   MODE_TEXT = "import"
 
-  def __init__(self, lu, node_name, x509_key_name, source_x509_ca, instance,
+  def __init__(self, lu, node_name, opts, instance,
                dest, dest_args, timeouts, cbs, private=None):
     """Initializes this class.
 
     @param lu: Logical unit instance
     @type node_name: string
     @param node_name: Node name for import
-    @type x509_key_name: string
-    @param x509_key_name: Name of X509 key (None for node daemon key)
-    @type source_x509_ca: string
-    @param source_x509_ca: Remote peer's CA (None for node daemon certificate)
+    @type opts: L{objects.ImportExportOptions}
+    @param opts: Import/export daemon options
     @type instance: L{objects.Instance}
     @param instance: Instance object
     @param dest: I/O destination
@@ -452,8 +505,7 @@ class DiskImport(_DiskImportExportBase):
     @param private: Private data for callback functions
 
     """
-    _DiskImportExportBase.__init__(self, lu, node_name,
-                                   x509_key_name, source_x509_ca,
+    _DiskImportExportBase.__init__(self, lu, node_name, opts,
                                    instance, timeouts, cbs, private)
     self._dest = dest
     self._dest_args = dest_args
@@ -475,9 +527,8 @@ class DiskImport(_DiskImportExportBase):
     """Starts the import daemon.
 
     """
-    return self._lu.rpc.call_import_start(self.node_name,
-                                          self._x509_key_name,
-                                          self._remote_x509_ca, self._instance,
+    return self._lu.rpc.call_import_start(self.node_name, self._opts,
+                                          self._instance,
                                           self._dest, self._dest_args)
 
   def CheckListening(self):
@@ -523,7 +574,7 @@ class DiskImport(_DiskImportExportBase):
 class DiskExport(_DiskImportExportBase):
   MODE_TEXT = "export"
 
-  def __init__(self, lu, node_name, x509_key_name, dest_x509_ca,
+  def __init__(self, lu, node_name, opts,
                dest_host, dest_port, instance, source, source_args,
                timeouts, cbs, private=None):
     """Initializes this class.
@@ -531,10 +582,8 @@ class DiskExport(_DiskImportExportBase):
     @param lu: Logical unit instance
     @type node_name: string
     @param node_name: Node name for import
-    @type x509_key_name: string
-    @param x509_key_name: Name of X509 key (None for node daemon key)
-    @type dest_x509_ca: string
-    @param dest_x509_ca: Remote peer's CA (None for node daemon certificate)
+    @type opts: L{objects.ImportExportOptions}
+    @param opts: Import/export daemon options
     @type dest_host: string
     @param dest_host: Destination host name or IP address
     @type dest_port: number
@@ -550,8 +599,7 @@ class DiskExport(_DiskImportExportBase):
     @param private: Private data for callback functions
 
     """
-    _DiskImportExportBase.__init__(self, lu, node_name,
-                                   x509_key_name, dest_x509_ca,
+    _DiskImportExportBase.__init__(self, lu, node_name, opts,
                                    instance, timeouts, cbs, private)
     self._dest_host = dest_host
     self._dest_port = dest_port
@@ -562,8 +610,7 @@ class DiskExport(_DiskImportExportBase):
     """Starts the export daemon.
 
     """
-    return self._lu.rpc.call_export_start(self.node_name, self._x509_key_name,
-                                          self._remote_x509_ca,
+    return self._lu.rpc.call_export_start(self.node_name, self._opts,
                                           self._dest_host, self._dest_port,
                                           self._instance, self._source,
                                           self._source_args)
@@ -584,6 +631,28 @@ class DiskExport(_DiskImportExportBase):
     return self._ts_begin
 
 
+def FormatProgress(progress):
+  """Formats progress information for user consumption
+
+  """
+  (mbytes, throughput, percent, eta) = progress
+
+  parts = [
+    utils.FormatUnit(mbytes, "h"),
+
+    # Not using FormatUnit as it doesn't support kilobytes
+    "%0.1f MiB/s" % throughput,
+    ]
+
+  if percent is not None:
+    parts.append("%d%%" % percent)
+
+  if eta is not None:
+    parts.append("ETA %s" % utils.FormatSeconds(eta))
+
+  return utils.CommaJoin(parts)
+
+
 class ImportExportLoop:
   MIN_DELAY = 1.0
   MAX_DELAY = 20.0
@@ -729,7 +798,7 @@ class ImportExportLoop:
           logging.exception("%s failed", diskie.MODE_TEXT)
           diskie.Finalize(error=str(err))
 
-      if not compat.any([diskie.active for diskie in self._queue]):
+      if not compat.any(diskie.active for diskie in self._queue):
         break
 
       # Wait a bit
@@ -779,6 +848,16 @@ class _TransferInstSourceCb(_TransferInstCbBase):
     self.feedback_fn("%s is sending data on %s" %
                      (dtp.data.name, ie.node_name))
 
+  def ReportProgress(self, ie, dtp):
+    """Called when new progress information should be reported.
+
+    """
+    progress = ie.progress
+    if not progress:
+      return
+
+    self.feedback_fn("%s sent %s" % (dtp.data.name, FormatProgress(progress)))
+
   def ReportFinished(self, ie, dtp):
     """Called when a transfer has finished.
 
@@ -790,7 +869,7 @@ class _TransferInstSourceCb(_TransferInstCbBase):
     if ie.success:
       self.feedback_fn("%s finished sending data" % dtp.data.name)
     else:
-      self.feedback_fn("%s failed to send data: %s (recent output: %r)" %
+      self.feedback_fn("%s failed to send data: %s (recent output: %s)" %
                        (dtp.data.name, ie.final_message, ie.recent_output))
 
     dtp.RecordResult(ie.success)
@@ -813,13 +892,14 @@ class _TransferInstDestCb(_TransferInstCbBase):
     assert self.src_cbs
     assert dtp.src_export is None
     assert dtp.dest_import
+    assert dtp.export_opts
 
     self.feedback_fn("%s is now listening, starting export" % dtp.data.name)
 
     # Start export on source node
-    de = DiskExport(self.lu, self.src_node, None, None, self.dest_ip,
-                    ie.listen_port, self.instance,
-                    dtp.data.src_io, dtp.data.src_ioargs,
+    de = DiskExport(self.lu, self.src_node, dtp.export_opts,
+                    self.dest_ip, ie.listen_port,
+                    self.instance, dtp.data.src_io, dtp.data.src_ioargs,
                     self.timeouts, self.src_cbs, private=dtp)
     ie.loop.Add(de)
 
@@ -839,7 +919,7 @@ class _TransferInstDestCb(_TransferInstCbBase):
     if ie.success:
       self.feedback_fn("%s finished receiving data" % dtp.data.name)
     else:
-      self.feedback_fn("%s failed to receive data: %s (recent output: %r)" %
+      self.feedback_fn("%s failed to receive data: %s (recent output: %s)" %
                        (dtp.data.name, ie.final_message, ie.recent_output))
 
     dtp.RecordResult(ie.success)
@@ -877,7 +957,7 @@ class DiskTransfer(object):
 
 
 class _DiskTransferPrivate(object):
-  def __init__(self, data, success):
+  def __init__(self, data, success, export_opts):
     """Initializes this class.
 
     @type data: L{DiskTransfer}
@@ -885,12 +965,12 @@ class _DiskTransferPrivate(object):
 
     """
     self.data = data
+    self.success = success
+    self.export_opts = export_opts
 
     self.src_export = None
     self.dest_import = None
 
-    self.success = success
-
   def RecordResult(self, success):
     """Updates the status.
 
@@ -900,6 +980,25 @@ class _DiskTransferPrivate(object):
     self.success = self.success and success
 
 
+def _GetInstDiskMagic(base, instance_name, index):
+  """Computes the magic value for a disk export or import.
+
+  @type base: string
+  @param base: Random seed value (can be the same for all disks of a transfer)
+  @type instance_name: string
+  @param instance_name: Name of instance
+  @type index: number
+  @param index: Disk index
+
+  """
+  h = compat.sha1_hash()
+  h.update(str(constants.RIE_VERSION))
+  h.update(base)
+  h.update(instance_name)
+  h.update(str(index))
+  return h.hexdigest()
+
+
 def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
                          instance, all_transfers):
   """Transfers an instance's data from one node to another.
@@ -921,6 +1020,12 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
            each transfer
 
   """
+  # Disable compression for all moves as these are all within the same cluster
+  compress = constants.IEC_NONE
+
+  logging.debug("Source node %s, destination node %s, compression '%s'",
+                src_node, dest_node, compress)
+
   timeouts = ImportExportTimeouts(constants.DISK_TRANSFER_CONNECT_TIMEOUT)
   src_cbs = _TransferInstSourceCb(lu, feedback_fn, instance, timeouts,
                                   src_node, None, dest_node, dest_ip)
@@ -929,23 +1034,29 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
 
   all_dtp = []
 
+  base_magic = utils.GenerateSecret(6)
+
   ieloop = ImportExportLoop(lu)
   try:
-    for transfer in all_transfers:
+    for idx, transfer in enumerate(all_transfers):
       if transfer:
         feedback_fn("Exporting %s from %s to %s" %
                     (transfer.name, src_node, dest_node))
 
-        dtp = _DiskTransferPrivate(transfer, True)
+        magic = _GetInstDiskMagic(base_magic, instance.name, idx)
+        opts = objects.ImportExportOptions(key_name=None, ca_pem=None,
+                                           compress=compress, magic=magic)
 
-        di = DiskImport(lu, dest_node, None, None, instance,
+        dtp = _DiskTransferPrivate(transfer, True, opts)
+
+        di = DiskImport(lu, dest_node, opts, instance,
                         transfer.dest_io, transfer.dest_ioargs,
                         timeouts, dest_cbs, private=dtp)
         ieloop.Add(di)
 
         dtp.dest_import = di
       else:
-        dtp = _DiskTransferPrivate(None, False)
+        dtp = _DiskTransferPrivate(None, False, None)
 
       all_dtp.append(dtp)
 
@@ -954,11 +1065,537 @@ def TransferInstanceData(lu, feedback_fn, src_node, dest_node, dest_ip,
     ieloop.FinalizeAll()
 
   assert len(all_dtp) == len(all_transfers)
-  assert compat.all([(dtp.src_export is None or
+  assert compat.all((dtp.src_export is None or
                       dtp.src_export.success is not None) and
                      (dtp.dest_import is None or
                       dtp.dest_import.success is not None)
-                     for dtp in all_dtp]), \
+                     for dtp in all_dtp), \
          "Not all imports/exports are finalized"
 
   return [bool(dtp.success) for dtp in all_dtp]
+
+
+class _RemoteExportCb(ImportExportCbBase):
+  def __init__(self, feedback_fn, disk_count):
+    """Initializes this class.
+
+    """
+    ImportExportCbBase.__init__(self)
+    self._feedback_fn = feedback_fn
+    self._dresults = [None] * disk_count
+
+  @property
+  def disk_results(self):
+    """Returns per-disk results.
+
+    """
+    return self._dresults
+
+  def ReportConnected(self, ie, private):
+    """Called when a connection has been established.
+
+    """
+    (idx, _) = private
+
+    self._feedback_fn("Disk %s is now sending data" % idx)
+
+  def ReportProgress(self, ie, private):
+    """Called when new progress information should be reported.
+
+    """
+    (idx, _) = private
+
+    progress = ie.progress
+    if not progress:
+      return
+
+    self._feedback_fn("Disk %s sent %s" % (idx, FormatProgress(progress)))
+
+  def ReportFinished(self, ie, private):
+    """Called when a transfer has finished.
+
+    """
+    (idx, finished_fn) = private
+
+    if ie.success:
+      self._feedback_fn("Disk %s finished sending data" % idx)
+    else:
+      self._feedback_fn("Disk %s failed to send data: %s (recent output: %s)" %
+                        (idx, ie.final_message, ie.recent_output))
+
+    self._dresults[idx] = bool(ie.success)
+
+    if finished_fn:
+      finished_fn()
+
+
+class ExportInstanceHelper:
+  def __init__(self, lu, feedback_fn, instance):
+    """Initializes this class.
+
+    @param lu: Logical unit instance
+    @param feedback_fn: Feedback function
+    @type instance: L{objects.Instance}
+    @param instance: Instance object
+
+    """
+    self._lu = lu
+    self._feedback_fn = feedback_fn
+    self._instance = instance
+
+    self._snap_disks = []
+    self._removed_snaps = [False] * len(instance.disks)
+
+  def CreateSnapshots(self):
+    """Creates an LVM snapshot for every disk of the instance.
+
+    """
+    assert not self._snap_disks
+
+    instance = self._instance
+    src_node = instance.primary_node
+
+    for idx, disk in enumerate(instance.disks):
+      self._feedback_fn("Creating a snapshot of disk/%s on node %s" %
+                        (idx, src_node))
+
+      # result.payload will be a snapshot of an lvm leaf of the one we
+      # passed
+      result = self._lu.rpc.call_blockdev_snapshot(src_node, disk)
+      new_dev = False
+      msg = result.fail_msg
+      if msg:
+        self._lu.LogWarning("Could not snapshot disk/%s on node %s: %s",
+                            idx, src_node, msg)
+      elif (not isinstance(result.payload, (tuple, list)) or
+            len(result.payload) != 2):
+        self._lu.LogWarning("Could not snapshot disk/%s on node %s: invalid"
+                            " result '%s'", idx, src_node, result.payload)
+      else:
+        disk_id = tuple(result.payload)
+        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
+                               logical_id=disk_id, physical_id=disk_id,
+                               iv_name=disk.iv_name)
+
+      self._snap_disks.append(new_dev)
+
+    assert len(self._snap_disks) == len(instance.disks)
+    assert len(self._removed_snaps) == len(instance.disks)
+
+  def _RemoveSnapshot(self, disk_index):
+    """Removes an LVM snapshot.
+
+    @type disk_index: number
+    @param disk_index: Index of the snapshot to be removed
+
+    """
+    disk = self._snap_disks[disk_index]
+    if disk and not self._removed_snaps[disk_index]:
+      src_node = self._instance.primary_node
+
+      self._feedback_fn("Removing snapshot of disk/%s on node %s" %
+                        (disk_index, src_node))
+
+      result = self._lu.rpc.call_blockdev_remove(src_node, disk)
+      if result.fail_msg:
+        self._lu.LogWarning("Could not remove snapshot for disk/%d from node"
+                            " %s: %s", disk_index, src_node, result.fail_msg)
+      else:
+        self._removed_snaps[disk_index] = True
+
+  def LocalExport(self, dest_node):
+    """Intra-cluster instance export.
+
+    @type dest_node: L{objects.Node}
+    @param dest_node: Destination node
+
+    """
+    instance = self._instance
+    src_node = instance.primary_node
+
+    assert len(self._snap_disks) == len(instance.disks)
+
+    transfers = []
+
+    for idx, dev in enumerate(self._snap_disks):
+      if not dev:
+        transfers.append(None)
+        continue
+
+      path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
+                            dev.physical_id[1])
+
+      finished_fn = compat.partial(self._TransferFinished, idx)
+
+      # FIXME: pass debug option from opcode to backend
+      dt = DiskTransfer("snapshot/%s" % idx,
+                        constants.IEIO_SCRIPT, (dev, idx),
+                        constants.IEIO_FILE, (path, ),
+                        finished_fn)
+      transfers.append(dt)
+
+    # Actually export data
+    dresults = TransferInstanceData(self._lu, self._feedback_fn,
+                                    src_node, dest_node.name,
+                                    dest_node.secondary_ip,
+                                    instance, transfers)
+
+    assert len(dresults) == len(instance.disks)
+
+    self._feedback_fn("Finalizing export on %s" % dest_node.name)
+    result = self._lu.rpc.call_finalize_export(dest_node.name, instance,
+                                               self._snap_disks)
+    msg = result.fail_msg
+    fin_resu = not msg
+    if msg:
+      self._lu.LogWarning("Could not finalize export for instance %s"
+                          " on node %s: %s", instance.name, dest_node.name, msg)
+
+    return (fin_resu, dresults)
+
+  def RemoteExport(self, disk_info, key_name, dest_ca_pem, timeouts):
+    """Inter-cluster instance export.
+
+    @type disk_info: list
+    @param disk_info: Per-disk destination information
+    @type key_name: string
+    @param key_name: Name of X509 key to use
+    @type dest_ca_pem: string
+    @param dest_ca_pem: Destination X509 CA in PEM format
+    @type timeouts: L{ImportExportTimeouts}
+    @param timeouts: Timeouts for this import
+
+    """
+    instance = self._instance
+
+    assert len(disk_info) == len(instance.disks)
+
+    cbs = _RemoteExportCb(self._feedback_fn, len(instance.disks))
+
+    ieloop = ImportExportLoop(self._lu)
+    try:
+      for idx, (dev, (host, port, magic)) in enumerate(zip(instance.disks,
+                                                           disk_info)):
+        # Decide whether to use IPv6
+        ipv6 = netutils.IP6Address.IsValid(host)
+
+        opts = objects.ImportExportOptions(key_name=key_name,
+                                           ca_pem=dest_ca_pem,
+                                           magic=magic, ipv6=ipv6)
+
+        self._feedback_fn("Sending disk %s to %s:%s" % (idx, host, port))
+        finished_fn = compat.partial(self._TransferFinished, idx)
+        ieloop.Add(DiskExport(self._lu, instance.primary_node,
+                              opts, host, port, instance,
+                              constants.IEIO_SCRIPT, (dev, idx),
+                              timeouts, cbs, private=(idx, finished_fn)))
+
+      ieloop.Run()
+    finally:
+      ieloop.FinalizeAll()
+
+    return (True, cbs.disk_results)
+
+  def _TransferFinished(self, idx):
+    """Called once a transfer has finished.
+
+    @type idx: number
+    @param idx: Disk index
+
+    """
+    logging.debug("Transfer %s finished", idx)
+    self._RemoveSnapshot(idx)
+
+  def Cleanup(self):
+    """Remove all snapshots.
+
+    """
+    assert len(self._removed_snaps) == len(self._instance.disks)
+    for idx in range(len(self._instance.disks)):
+      self._RemoveSnapshot(idx)
+
+
+class _RemoteImportCb(ImportExportCbBase):
+  def __init__(self, feedback_fn, cds, x509_cert_pem, disk_count,
+               external_address):
+    """Initializes this class.
+
+    @type cds: string
+    @param cds: Cluster domain secret
+    @type x509_cert_pem: string
+    @param x509_cert_pem: CA used for signing import key
+    @type disk_count: number
+    @param disk_count: Number of disks
+    @type external_address: string
+    @param external_address: External address of destination node
+
+    """
+    ImportExportCbBase.__init__(self)
+    self._feedback_fn = feedback_fn
+    self._cds = cds
+    self._x509_cert_pem = x509_cert_pem
+    self._disk_count = disk_count
+    self._external_address = external_address
+
+    self._dresults = [None] * disk_count
+    self._daemon_port = [None] * disk_count
+
+    self._salt = utils.GenerateSecret(8)
+
+  @property
+  def disk_results(self):
+    """Returns per-disk results.
+
+    """
+    return self._dresults
+
+  def _CheckAllListening(self):
+    """Checks whether all daemons are listening.
+
+    If all daemons are listening, the information is sent to the client.
+
+    """
+    if not compat.all(dp is not None for dp in self._daemon_port):
+      return
+
+    host = self._external_address
+
+    disks = []
+    for idx, (port, magic) in enumerate(self._daemon_port):
+      disks.append(ComputeRemoteImportDiskInfo(self._cds, self._salt,
+                                               idx, host, port, magic))
+
+    assert len(disks) == self._disk_count
+
+    self._feedback_fn(constants.ELOG_REMOTE_IMPORT, {
+      "disks": disks,
+      "x509_ca": self._x509_cert_pem,
+      })
+
+  def ReportListening(self, ie, private):
+    """Called when daemon started listening.
+
+    """
+    (idx, ) = private
+
+    self._feedback_fn("Disk %s is now listening" % idx)
+
+    assert self._daemon_port[idx] is None
+
+    self._daemon_port[idx] = (ie.listen_port, ie.magic)
+
+    self._CheckAllListening()
+
+  def ReportConnected(self, ie, private):
+    """Called when a connection has been established.
+
+    """
+    (idx, ) = private
+
+    self._feedback_fn("Disk %s is now receiving data" % idx)
+
+  def ReportFinished(self, ie, private):
+    """Called when a transfer has finished.
+
+    """
+    (idx, ) = private
+
+    # Daemon is certainly no longer listening
+    self._daemon_port[idx] = None
+
+    if ie.success:
+      self._feedback_fn("Disk %s finished receiving data" % idx)
+    else:
+      self._feedback_fn(("Disk %s failed to receive data: %s"
+                         " (recent output: %s)") %
+                        (idx, ie.final_message, ie.recent_output))
+
+    self._dresults[idx] = bool(ie.success)
+
+
+def RemoteImport(lu, feedback_fn, instance, pnode, source_x509_ca,
+                 cds, timeouts):
+  """Imports an instance from another cluster.
+
+  @param lu: Logical unit instance
+  @param feedback_fn: Feedback function
+  @type instance: L{objects.Instance}
+  @param instance: Instance object
+  @type pnode: L{objects.Node}
+  @param pnode: Primary node of instance as an object
+  @type source_x509_ca: OpenSSL.crypto.X509
+  @param source_x509_ca: Import source's X509 CA
+  @type cds: string
+  @param cds: Cluster domain secret
+  @type timeouts: L{ImportExportTimeouts}
+  @param timeouts: Timeouts for this import
+
+  """
+  source_ca_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                                  source_x509_ca)
+
+  magic_base = utils.GenerateSecret(6)
+
+  # Decide whether to use IPv6
+  ipv6 = netutils.IP6Address.IsValid(pnode.primary_ip)
+
+  # Create crypto key
+  result = lu.rpc.call_x509_cert_create(instance.primary_node,
+                                        constants.RIE_CERT_VALIDITY)
+  result.Raise("Can't create X509 key and certificate on %s" % result.node)
+
+  (x509_key_name, x509_cert_pem) = result.payload
+  try:
+    # Load certificate
+    x509_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                                x509_cert_pem)
+
+    # Sign certificate
+    signed_x509_cert_pem = \
+      utils.SignX509Certificate(x509_cert, cds, utils.GenerateSecret(8))
+
+    cbs = _RemoteImportCb(feedback_fn, cds, signed_x509_cert_pem,
+                          len(instance.disks), pnode.primary_ip)
+
+    ieloop = ImportExportLoop(lu)
+    try:
+      for idx, dev in enumerate(instance.disks):
+        magic = _GetInstDiskMagic(magic_base, instance.name, idx)
+
+        # Import daemon options
+        opts = objects.ImportExportOptions(key_name=x509_key_name,
+                                           ca_pem=source_ca_pem,
+                                           magic=magic, ipv6=ipv6)
+
+        ieloop.Add(DiskImport(lu, instance.primary_node, opts, instance,
+                              constants.IEIO_SCRIPT, (dev, idx),
+                              timeouts, cbs, private=(idx, )))
+
+      ieloop.Run()
+    finally:
+      ieloop.FinalizeAll()
+  finally:
+    # Remove crypto key and certificate
+    result = lu.rpc.call_x509_cert_remove(instance.primary_node, x509_key_name)
+    result.Raise("Can't remove X509 key and certificate on %s" % result.node)
+
+  return cbs.disk_results
+
+
+def _GetImportExportHandshakeMessage(version):
+  """Returns the handshake message for a RIE protocol version.
+
+  @type version: number
+
+  """
+  return "%s:%s" % (version, constants.RIE_HANDSHAKE)
+
+
+def ComputeRemoteExportHandshake(cds):
+  """Computes the remote import/export handshake.
+
+  @type cds: string
+  @param cds: Cluster domain secret
+
+  """
+  salt = utils.GenerateSecret(8)
+  msg = _GetImportExportHandshakeMessage(constants.RIE_VERSION)
+  return (constants.RIE_VERSION, utils.Sha1Hmac(cds, msg, salt=salt), salt)
+
+
+def CheckRemoteExportHandshake(cds, handshake):
+  """Checks the handshake of a remote import/export.
+
+  @type cds: string
+  @param cds: Cluster domain secret
+  @type handshake: sequence
+  @param handshake: Handshake sent by remote peer
+
+  """
+  try:
+    (version, hmac_digest, hmac_salt) = handshake
+  except (TypeError, ValueError), err:
+    return "Invalid data: %s" % err
+
+  if not utils.VerifySha1Hmac(cds, _GetImportExportHandshakeMessage(version),
+                              hmac_digest, salt=hmac_salt):
+    return "Hash didn't match, clusters don't share the same domain secret"
+
+  if version != constants.RIE_VERSION:
+    return ("Clusters don't have the same remote import/export protocol"
+            " (local=%s, remote=%s)" %
+            (constants.RIE_VERSION, version))
+
+  return None
+
+
+def _GetRieDiskInfoMessage(disk_index, host, port, magic):
+  """Returns the hashed text for import/export disk information.
+
+  @type disk_index: number
+  @param disk_index: Index of disk (included in hash)
+  @type host: string
+  @param host: Hostname
+  @type port: number
+  @param port: Daemon port
+  @type magic: string
+  @param magic: Magic value
+
+  """
+  return "%s:%s:%s:%s" % (disk_index, host, port, magic)
+
+
+def CheckRemoteExportDiskInfo(cds, disk_index, disk_info):
+  """Verifies received disk information for an export.
+
+  @type cds: string
+  @param cds: Cluster domain secret
+  @type disk_index: number
+  @param disk_index: Index of disk (included in hash)
+  @type disk_info: sequence
+  @param disk_info: Disk information sent by remote peer
+
+  """
+  try:
+    (host, port, magic, hmac_digest, hmac_salt) = disk_info
+  except (TypeError, ValueError), err:
+    raise errors.GenericError("Invalid data: %s" % err)
+
+  if not (host and port and magic):
+    raise errors.GenericError("Missing destination host, port or magic")
+
+  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
+
+  if not utils.VerifySha1Hmac(cds, msg, hmac_digest, salt=hmac_salt):
+    raise errors.GenericError("HMAC is wrong")
+
+  if netutils.IP6Address.IsValid(host) or netutils.IP4Address.IsValid(host):
+    destination = host
+  else:
+    destination = netutils.Hostname.GetNormalizedName(host)
+
+  return (destination,
+          utils.ValidateServiceName(port),
+          magic)
+
+
+def ComputeRemoteImportDiskInfo(cds, salt, disk_index, host, port, magic):
+  """Computes the signed disk information for a remote import.
+
+  @type cds: string
+  @param cds: Cluster domain secret
+  @type salt: string
+  @param salt: HMAC salt
+  @type disk_index: number
+  @param disk_index: Index of disk (included in hash)
+  @type host: string
+  @param host: Hostname
+  @type port: number
+  @param port: Daemon port
+  @type magic: string
+  @param magic: Magic value
+
+  """
+  msg = _GetRieDiskInfoMessage(disk_index, host, port, magic)
+  hmac_digest = utils.Sha1Hmac(cds, msg, salt=salt)
+  return (host, port, magic, hmac_digest, salt)