Merge branch 'devel-2.1' into master
[ganeti-local] / lib / cmdlib.py
index 880c5a0..30c431e 100644 (file)
@@ -46,6 +46,9 @@ from ganeti import serializer
 from ganeti import ssconf
 from ganeti import uidpool
 from ganeti import compat
+from ganeti import masterd
+
+import ganeti.masterd.instance # pylint: disable-msg=W0611
 
 
 class LogicalUnit(object):
@@ -596,6 +599,13 @@ def _CheckStorageType(storage_type):
     _RequireFileStorage()
 
 
+def _GetClusterDomainSecret():
+  """Reads the cluster domain secret.
+
+  """
+  return utils.ReadOneLineFile(constants.CLUSTER_DOMAIN_SECRET_FILE,
+                               strict=True)
+
 
 def _CheckInstanceDown(lu, instance, reason):
   """Ensure that an instance is not running."""
@@ -920,13 +930,6 @@ def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
   return faulty
 
 
-def _FormatTimestamp(secs):
-  """Formats a Unix timestamp with the local timezone.
-
-  """
-  return time.strftime("%F %T %Z", time.gmtime(secs))
-
-
 class LUPostInitCluster(LogicalUnit):
   """Logical unit for running hooks after cluster initialization.
 
@@ -1018,45 +1021,6 @@ class LUDestroyCluster(LogicalUnit):
     return master
 
 
-def _VerifyCertificateInner(filename, expired, not_before, not_after, now,
-                            warn_days=constants.SSL_CERT_EXPIRATION_WARN,
-                            error_days=constants.SSL_CERT_EXPIRATION_ERROR):
-  """Verifies certificate details for LUVerifyCluster.
-
-  """
-  if expired:
-    msg = "Certificate %s is expired" % filename
-
-    if not_before is not None and not_after is not None:
-      msg += (" (valid from %s to %s)" %
-              (_FormatTimestamp(not_before),
-               _FormatTimestamp(not_after)))
-    elif not_before is not None:
-      msg += " (valid from %s)" % _FormatTimestamp(not_before)
-    elif not_after is not None:
-      msg += " (valid until %s)" % _FormatTimestamp(not_after)
-
-    return (LUVerifyCluster.ETYPE_ERROR, msg)
-
-  elif not_before is not None and not_before > now:
-    return (LUVerifyCluster.ETYPE_WARNING,
-            "Certificate %s not yet valid (valid from %s)" %
-            (filename, _FormatTimestamp(not_before)))
-
-  elif not_after is not None:
-    remaining_days = int((not_after - now) / (24 * 3600))
-
-    msg = ("Certificate %s expires in %d days" % (filename, remaining_days))
-
-    if remaining_days <= error_days:
-      return (LUVerifyCluster.ETYPE_ERROR, msg)
-
-    if remaining_days <= warn_days:
-      return (LUVerifyCluster.ETYPE_WARNING, msg)
-
-  return (None, None)
-
-
 def _VerifyCertificate(filename):
   """Verifies a certificate for LUVerifyCluster.
 
@@ -1071,11 +1035,23 @@ def _VerifyCertificate(filename):
     return (LUVerifyCluster.ETYPE_ERROR,
             "Failed to load X509 certificate %s: %s" % (filename, err))
 
-  # Depending on the pyOpenSSL version, this can just return (None, None)
-  (not_before, not_after) = utils.GetX509CertValidity(cert)
+  (errcode, msg) = \
+    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
+                                constants.SSL_CERT_EXPIRATION_ERROR)
 
-  return _VerifyCertificateInner(filename, cert.has_expired(),
-                                 not_before, not_after, time.time())
+  if msg:
+    fnamemsg = "While verifying %s: %s" % (filename, msg)
+  else:
+    fnamemsg = None
+
+  if errcode is None:
+    return (None, fnamemsg)
+  elif errcode == utils.CERT_WARNING:
+    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
+  elif errcode == utils.CERT_ERROR:
+    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
+
+  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
 
 
 class LUVerifyCluster(LogicalUnit):
@@ -2541,6 +2517,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None):
                     constants.RAPI_CERT_FILE,
                     constants.RAPI_USERS_FILE,
                     constants.CONFD_HMAC_KEY,
+                    constants.CLUSTER_DOMAIN_SECRET_FILE,
                    ])
 
   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
@@ -2636,7 +2613,8 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False):
       if mstat.sync_percent is not None:
         done = False
         if mstat.estimated_time is not None:
-          rem_time = "%d estimated seconds remaining" % mstat.estimated_time
+          rem_time = ("%s remaining (estimated)" %
+                      utils.FormatSeconds(mstat.estimated_time))
           max_time = mstat.estimated_time
         else:
           rem_time = "no time estimate"
@@ -4689,18 +4667,29 @@ class LURemoveInstance(LogicalUnit):
                                  " node %s: %s" %
                                  (instance.name, instance.primary_node, msg))
 
-    logging.info("Removing block devices for instance %s", instance.name)
+    _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
 
-    if not _RemoveDisks(self, instance):
-      if self.op.ignore_failures:
-        feedback_fn("Warning: can't remove instance's disks")
-      else:
-        raise errors.OpExecError("Can't remove instance's disks")
 
-    logging.info("Removing instance %s out of cluster config", instance.name)
+def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
+  """Utility function to remove an instance.
+
+  """
+  logging.info("Removing block devices for instance %s", instance.name)
+
+  if not _RemoveDisks(lu, instance):
+    if not ignore_failures:
+      raise errors.OpExecError("Can't remove instance's disks")
+    feedback_fn("Warning: can't remove instance's disks")
+
+  logging.info("Removing instance %s out of cluster config", instance.name)
 
-    self.cfg.RemoveInstance(instance.name)
-    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
+  lu.cfg.RemoveInstance(instance.name)
+
+  assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \
+    "Instance lock removal conflict"
+
+  # Remove lock for the instance
+  lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
 
 
 class LUQueryInstances(NoHooksLU):
@@ -6146,8 +6135,7 @@ class LUCreateInstance(LogicalUnit):
     self.adopt_disks = has_adopt
 
     # verify creation mode
-    if self.op.mode not in (constants.INSTANCE_CREATE,
-                            constants.INSTANCE_IMPORT):
+    if self.op.mode not in constants.INSTANCE_CREATE_MODES:
       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
                                  self.op.mode, errors.ECODE_INVAL)
 
@@ -6157,6 +6145,9 @@ class LUCreateInstance(LogicalUnit):
       self.op.instance_name = self.hostname1.name
       # used in CheckPrereq for ip ping check
       self.check_ip = self.hostname1.ip
+    elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+      raise errors.OpPrereqError("Remote imports require names to be checked" %
+                                 errors.ECODE_INVAL)
     else:
       self.check_ip = None
 
@@ -6176,6 +6167,8 @@ class LUCreateInstance(LogicalUnit):
                                  " node must be given",
                                  errors.ECODE_INVAL)
 
+    self._cds = _GetClusterDomainSecret()
+
     if self.op.mode == constants.INSTANCE_IMPORT:
       # On import force_variant must be True, because if we forced it at
       # initial install, our only chance when importing it back is that it
@@ -6185,7 +6178,7 @@ class LUCreateInstance(LogicalUnit):
       if self.op.no_install:
         self.LogInfo("No-installation mode has no effect during import")
 
-    else: # INSTANCE_CREATE
+    elif self.op.mode == constants.INSTANCE_CREATE:
       if getattr(self.op, "os_type", None) is None:
         raise errors.OpPrereqError("No guest OS specified",
                                    errors.ECODE_INVAL)
@@ -6194,6 +6187,51 @@ class LUCreateInstance(LogicalUnit):
         raise errors.OpPrereqError("No disk template specified",
                                    errors.ECODE_INVAL)
 
+    elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+      # Check handshake to ensure both clusters have the same domain secret
+      src_handshake = getattr(self.op, "source_handshake", None)
+      if not src_handshake:
+        raise errors.OpPrereqError("Missing source handshake",
+                                   errors.ECODE_INVAL)
+
+      errmsg = masterd.instance.CheckRemoteExportHandshake(self._cds,
+                                                           src_handshake)
+      if errmsg:
+        raise errors.OpPrereqError("Invalid handshake: %s" % errmsg,
+                                   errors.ECODE_INVAL)
+
+      # Load and check source CA
+      self.source_x509_ca_pem = getattr(self.op, "source_x509_ca", None)
+      if not self.source_x509_ca_pem:
+        raise errors.OpPrereqError("Missing source X509 CA",
+                                   errors.ECODE_INVAL)
+
+      try:
+        (cert, _) = utils.LoadSignedX509Certificate(self.source_x509_ca_pem,
+                                                    self._cds)
+      except OpenSSL.crypto.Error, err:
+        raise errors.OpPrereqError("Unable to load source X509 CA (%s)" %
+                                   (err, ), errors.ECODE_INVAL)
+
+      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
+      if errcode is not None:
+        raise errors.OpPrereqError("Invalid source X509 CA (%s)" % (msg, ),
+                                   errors.ECODE_INVAL)
+
+      self.source_x509_ca = cert
+
+      src_instance_name = getattr(self.op, "source_instance_name", None)
+      if not src_instance_name:
+        raise errors.OpPrereqError("Missing source instance name",
+                                   errors.ECODE_INVAL)
+
+      self.source_instance_name = \
+        utils.GetHostInfo(utils.HostInfo.NormalizeName(src_instance_name)).name
+
+    else:
+      raise errors.OpPrereqError("Invalid instance creation mode %r" %
+                                 self.op.mode, errors.ECODE_INVAL)
+
   def ExpandNames(self):
     """ExpandNames for CreateInstance.
 
@@ -6756,7 +6794,6 @@ class LUCreateInstance(LogicalUnit):
     else:
       file_storage_dir = ""
 
-
     disks = _GenerateDiskTemplate(self,
                                   self.op.disk_template,
                                   instance, pnode_name,
@@ -6849,18 +6886,54 @@ class LUCreateInstance(LogicalUnit):
 
       elif self.op.mode == constants.INSTANCE_IMPORT:
         feedback_fn("* running the instance OS import scripts...")
-        src_node = self.op.src_node
-        src_images = self.src_images
-        cluster_name = self.cfg.GetClusterName()
-        # FIXME: pass debug option from opcode to backend
-        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
-                                                         src_node, src_images,
-                                                         cluster_name,
-                                                         self.op.debug_level)
-        msg = import_result.fail_msg
-        if msg:
-          self.LogWarning("Error while importing the disk images for instance"
-                          " %s on node %s: %s" % (instance, pnode_name, msg))
+
+        transfers = []
+
+        for idx, image in enumerate(self.src_images):
+          if not image:
+            continue
+
+          # FIXME: pass debug option from opcode to backend
+          dt = masterd.instance.DiskTransfer("disk/%s" % idx,
+                                             constants.IEIO_FILE, (image, ),
+                                             constants.IEIO_SCRIPT,
+                                             (iobj.disks[idx], idx),
+                                             None)
+          transfers.append(dt)
+
+        import_result = \
+          masterd.instance.TransferInstanceData(self, feedback_fn,
+                                                self.op.src_node, pnode_name,
+                                                self.pnode.secondary_ip,
+                                                iobj, transfers)
+        if not compat.all(import_result):
+          self.LogWarning("Some disks for instance %s on node %s were not"
+                          " imported successfully" % (instance, pnode_name))
+
+      elif self.op.mode == constants.INSTANCE_REMOTE_IMPORT:
+        feedback_fn("* preparing remote import...")
+        connect_timeout = constants.RIE_CONNECT_TIMEOUT
+        timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
+
+        disk_results = masterd.instance.RemoteImport(self, feedback_fn, iobj,
+                                                     self.source_x509_ca,
+                                                     self._cds, timeouts)
+        if not compat.all(disk_results):
+          # TODO: Should the instance still be started, even if some disks
+          # failed to import (valid for local imports, too)?
+          self.LogWarning("Some disks for instance %s on node %s were not"
+                          " imported successfully" % (instance, pnode_name))
+
+        # Run rename script on newly imported instance
+        assert iobj.name == instance
+        feedback_fn("Running rename script for %s" % instance)
+        result = self.rpc.call_instance_run_rename(pnode_name, iobj,
+                                                   self.source_instance_name,
+                                                   self.op.debug_level)
+        if result.fail_msg:
+          self.LogWarning("Failed to run rename script for %s on node"
+                          " %s: %s" % (instance, pnode_name, result.fail_msg))
+
       else:
         # also checked in the prereq part
         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
@@ -8824,6 +8897,7 @@ class LUSetInstanceParams(LogicalUnit):
     (constants.DT_DRBD8, constants.DT_PLAIN): _ConvertDrbdToPlain,
     }
 
+
 class LUQueryExports(NoHooksLU):
   """Query the exports list
 
@@ -8866,6 +8940,66 @@ class LUQueryExports(NoHooksLU):
     return result
 
 
+class LUPrepareExport(NoHooksLU):
+  """Prepares an instance for an export and returns useful information.
+
+  """
+  _OP_REQP = ["instance_name", "mode"]
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    """Check the arguments.
+
+    """
+    if self.op.mode not in constants.EXPORT_MODES:
+      raise errors.OpPrereqError("Invalid export mode %r" % self.op.mode,
+                                 errors.ECODE_INVAL)
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    instance_name = self.op.instance_name
+
+    self.instance = self.cfg.GetInstanceInfo(instance_name)
+    assert self.instance is not None, \
+          "Cannot retrieve locked instance %s" % self.op.instance_name
+    _CheckNodeOnline(self, self.instance.primary_node)
+
+    self._cds = _GetClusterDomainSecret()
+
+  def Exec(self, feedback_fn):
+    """Prepares an instance for an export.
+
+    """
+    instance = self.instance
+
+    if self.op.mode == constants.EXPORT_MODE_REMOTE:
+      salt = utils.GenerateSecret(8)
+
+      feedback_fn("Generating X509 certificate on %s" % instance.primary_node)
+      result = self.rpc.call_x509_cert_create(instance.primary_node,
+                                              constants.RIE_CERT_VALIDITY)
+      result.Raise("Can't create X509 key and certificate on %s" % result.node)
+
+      (name, cert_pem) = result.payload
+
+      cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                             cert_pem)
+
+      return {
+        "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
+        "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
+                          salt),
+        "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
+        }
+
+    return None
+
+
 class LUExportInstance(LogicalUnit):
   """Export an instance to an image in the cluster.
 
@@ -8879,20 +9013,49 @@ class LUExportInstance(LogicalUnit):
     """Check the arguments.
 
     """
+    _CheckBooleanOpField(self.op, "remove_instance")
+    _CheckBooleanOpField(self.op, "ignore_remove_failures")
+
     self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
                                     constants.DEFAULT_SHUTDOWN_TIMEOUT)
+    self.remove_instance = getattr(self.op, "remove_instance", False)
+    self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures",
+                                          False)
+    self.export_mode = getattr(self.op, "mode", constants.EXPORT_MODE_LOCAL)
+    self.x509_key_name = getattr(self.op, "x509_key_name", None)
+    self.dest_x509_ca_pem = getattr(self.op, "destination_x509_ca", None)
+
+    if self.remove_instance and not self.op.shutdown:
+      raise errors.OpPrereqError("Can not remove instance without shutting it"
+                                 " down before")
+
+    if self.export_mode not in constants.EXPORT_MODES:
+      raise errors.OpPrereqError("Invalid export mode %r" % self.export_mode,
+                                 errors.ECODE_INVAL)
+
+    if self.export_mode == constants.EXPORT_MODE_REMOTE:
+      if not self.x509_key_name:
+        raise errors.OpPrereqError("Missing X509 key name for encryption",
+                                   errors.ECODE_INVAL)
+
+      if not self.dest_x509_ca_pem:
+        raise errors.OpPrereqError("Missing destination X509 CA",
+                                   errors.ECODE_INVAL)
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
-    # FIXME: lock only instance primary and destination node
-    #
-    # Sad but true, for now we have do lock all nodes, as we don't know where
-    # the previous export might be, and and in this LU we search for it and
-    # remove it from its current node. In the future we could fix this by:
-    #  - making a tasklet to search (share-lock all), then create the new one,
-    #    then one to remove, after
-    #  - removing the removal operation altogether
-    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+    # Lock all nodes for local exports
+    if self.export_mode == constants.EXPORT_MODE_LOCAL:
+      # FIXME: lock only instance primary and destination node
+      #
+      # Sad but true, for now we have do lock all nodes, as we don't know where
+      # the previous export might be, and in this LU we search for it and
+      # remove it from its current node. In the future we could fix this by:
+      #  - making a tasklet to search (share-lock all), then create the new one,
+      #    then one to remove, after
+      #  - removing the removal operation altogether
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
 
   def DeclareLocks(self, level):
     """Last minute lock declaration."""
@@ -8905,13 +9068,21 @@ class LUExportInstance(LogicalUnit):
 
     """
     env = {
+      "EXPORT_MODE": self.export_mode,
       "EXPORT_NODE": self.op.target_node,
       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
       "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
+      # TODO: Generic function for boolean env variables
+      "REMOVE_INSTANCE": str(bool(self.remove_instance)),
       }
+
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-    nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
-          self.op.target_node]
+
+    nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
+
+    if self.export_mode == constants.EXPORT_MODE_LOCAL:
+      nl.append(self.op.target_node)
+
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -8921,85 +9092,85 @@ class LUExportInstance(LogicalUnit):
 
     """
     instance_name = self.op.instance_name
+
     self.instance = self.cfg.GetInstanceInfo(instance_name)
     assert self.instance is not None, \
           "Cannot retrieve locked instance %s" % self.op.instance_name
     _CheckNodeOnline(self, self.instance.primary_node)
 
-    self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
-    self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
-    assert self.dst_node is not None
-
-    _CheckNodeOnline(self, self.dst_node.name)
-    _CheckNodeNotDrained(self, self.dst_node.name)
-
-    # instance disk type verification
-    for disk in self.instance.disks:
-      if disk.dev_type == constants.LD_FILE:
-        raise errors.OpPrereqError("Export not supported for instances with"
-                                   " file-based disks", errors.ECODE_INVAL)
-
-  def _CreateSnapshots(self, feedback_fn):
-    """Creates an LVM snapshot for every disk of the instance.
+    if self.export_mode == constants.EXPORT_MODE_LOCAL:
+      self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
+      self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
+      assert self.dst_node is not None
 
-    @return: List of snapshots as L{objects.Disk} instances
+      _CheckNodeOnline(self, self.dst_node.name)
+      _CheckNodeNotDrained(self, self.dst_node.name)
 
-    """
-    instance = self.instance
-    src_node = instance.primary_node
+      self._cds = None
+      self.dest_disk_info = None
+      self.dest_x509_ca = None
 
-    vgname = self.cfg.GetVGName()
+    elif self.export_mode == constants.EXPORT_MODE_REMOTE:
+      self.dst_node = None
 
-    snap_disks = []
+      if len(self.op.target_node) != len(self.instance.disks):
+        raise errors.OpPrereqError(("Received destination information for %s"
+                                    " disks, but instance %s has %s disks") %
+                                   (len(self.op.target_node), instance_name,
+                                    len(self.instance.disks)),
+                                   errors.ECODE_INVAL)
 
-    for idx, disk in enumerate(instance.disks):
-      feedback_fn("Creating a snapshot of disk/%s on node %s" %
-                  (idx, src_node))
+      cds = _GetClusterDomainSecret()
 
-      # result.payload will be a snapshot of an lvm leaf of the one we
-      # passed
-      result = self.rpc.call_blockdev_snapshot(src_node, disk)
-      msg = result.fail_msg
-      if msg:
-        self.LogWarning("Could not snapshot disk/%s on node %s: %s",
-                        idx, src_node, msg)
-        snap_disks.append(False)
-      else:
-        disk_id = (vgname, result.payload)
-        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
-                               logical_id=disk_id, physical_id=disk_id,
-                               iv_name=disk.iv_name)
-        snap_disks.append(new_dev)
+      # Check X509 key name
+      try:
+        (key_name, hmac_digest, hmac_salt) = self.x509_key_name
+      except (TypeError, ValueError), err:
+        raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err)
 
-    return snap_disks
+      if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
+        raise errors.OpPrereqError("HMAC for X509 key name is wrong",
+                                   errors.ECODE_INVAL)
 
-  def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index):
-    """Removes an LVM snapshot.
+      # Load and verify CA
+      try:
+        (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
+      except OpenSSL.crypto.Error, err:
+        raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
+                                   (err, ), errors.ECODE_INVAL)
+
+      (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
+      if errcode is not None:
+        raise errors.OpPrereqError("Invalid destination X509 CA (%s)" % (msg, ),
+                                   errors.ECODE_INVAL)
 
-    @type snap_disks: list
-    @param snap_disks: The list of all snapshots as returned by
-                       L{_CreateSnapshots}
-    @type disk_index: number
-    @param disk_index: Index of the snapshot to be removed
-    @rtype: bool
-    @return: Whether removal was successful or not
+      self.dest_x509_ca = cert
 
-    """
-    disk = snap_disks[disk_index]
-    if disk:
-      src_node = self.instance.primary_node
+      # Verify target information
+      disk_info = []
+      for idx, disk_data in enumerate(self.op.target_node):
+        try:
+          (host, port, magic) = \
+            masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
+        except errors.GenericError, err:
+          raise errors.OpPrereqError("Target info for disk %s: %s" % (idx, err),
+                                     errors.ECODE_INVAL)
 
-      feedback_fn("Removing snapshot of disk/%s on node %s" %
-                  (disk_index, src_node))
+        disk_info.append((host, port, magic))
 
-      result = self.rpc.call_blockdev_remove(src_node, disk)
-      if not result.fail_msg:
-        return True
+      assert len(disk_info) == len(self.op.target_node)
+      self.dest_disk_info = disk_info
 
-      self.LogWarning("Could not remove snapshot for disk/%d from node"
-                      " %s: %s", disk_index, src_node, result.fail_msg)
+    else:
+      raise errors.ProgrammerError("Unhandled export mode %r" %
+                                   self.export_mode)
 
-    return False
+    # instance disk type verification
+    # TODO: Implement export support for file-based disks
+    for disk in self.instance.disks:
+      if disk.dev_type == constants.LD_FILE:
+        raise errors.OpPrereqError("Export not supported for instances with"
+                                   " file-based disks", errors.ECODE_INVAL)
 
   def _CleanupExports(self, feedback_fn):
     """Removes exports of current instance from all other nodes.
@@ -9008,6 +9179,8 @@ class LUExportInstance(LogicalUnit):
     exports will be removed from the nodes A, B and D.
 
     """
+    assert self.export_mode != constants.EXPORT_MODE_REMOTE
+
     nodelist = self.cfg.GetNodeList()
     nodelist.remove(self.dst_node.name)
 
@@ -9031,8 +9204,9 @@ class LUExportInstance(LogicalUnit):
     """Export an instance to an image in the cluster.
 
     """
+    assert self.export_mode in constants.EXPORT_MODES
+
     instance = self.instance
-    dst_node = self.dst_node
     src_node = instance.primary_node
 
     if self.op.shutdown:
@@ -9040,6 +9214,7 @@ class LUExportInstance(LogicalUnit):
       feedback_fn("Shutting down instance %s" % instance.name)
       result = self.rpc.call_instance_shutdown(src_node, instance,
                                                self.shutdown_timeout)
+      # TODO: Maybe ignore failures if ignore_remove_failures is set
       result.Raise("Could not shutdown instance %s on"
                    " node %s" % (instance.name, src_node))
 
@@ -9056,80 +9231,62 @@ class LUExportInstance(LogicalUnit):
       _StartInstanceDisks(self, instance, None)
 
     try:
-      # per-disk results
-      dresults = []
-      removed_snaps = [False] * len(instance.disks)
+      helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
+                                                     instance)
 
-      snap_disks = None
+      helper.CreateSnapshots()
       try:
-        try:
-          snap_disks = self._CreateSnapshots(feedback_fn)
-        finally:
-          if self.op.shutdown and instance.admin_up:
-            feedback_fn("Starting instance %s" % instance.name)
-            result = self.rpc.call_instance_start(src_node, instance,
-                                                  None, None)
-            msg = result.fail_msg
-            if msg:
-              _ShutdownInstanceDisks(self, instance)
-              raise errors.OpExecError("Could not start instance: %s" % msg)
-
-        assert len(snap_disks) == len(instance.disks)
-        assert len(removed_snaps) == len(instance.disks)
-
-        # TODO: check for size
-
-        cluster_name = self.cfg.GetClusterName()
-        for idx, dev in enumerate(snap_disks):
-          feedback_fn("Exporting snapshot %s from %s to %s" %
-                      (idx, src_node, dst_node.name))
-          if dev:
-            # FIXME: pass debug from opcode to backend
-            result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
-                                                   instance, cluster_name,
-                                                   idx, self.op.debug_level)
-            msg = result.fail_msg
-            if msg:
-              self.LogWarning("Could not export disk/%s from node %s to"
-                              " node %s: %s", idx, src_node, dst_node.name, msg)
-              dresults.append(False)
-            else:
-              dresults.append(True)
+        if (self.op.shutdown and instance.admin_up and
+            not self.remove_instance):
+          assert not activate_disks
+          feedback_fn("Starting instance %s" % instance.name)
+          result = self.rpc.call_instance_start(src_node, instance, None, None)
+          msg = result.fail_msg
+          if msg:
+            feedback_fn("Failed to start instance: %s" % msg)
+            _ShutdownInstanceDisks(self, instance)
+            raise errors.OpExecError("Could not start instance: %s" % msg)
 
-            # Remove snapshot
-            if self._RemoveSnapshot(feedback_fn, snap_disks, idx):
-              removed_snaps[idx] = True
-          else:
-            dresults.append(False)
+        if self.export_mode == constants.EXPORT_MODE_LOCAL:
+          (fin_resu, dresults) = helper.LocalExport(self.dst_node)
+        elif self.export_mode == constants.EXPORT_MODE_REMOTE:
+          connect_timeout = constants.RIE_CONNECT_TIMEOUT
+          timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
 
-        assert len(dresults) == len(instance.disks)
+          (key_name, _, _) = self.x509_key_name
 
-        # Check for backwards compatibility
-        assert compat.all(isinstance(i, bool) for i in dresults), \
-               "Not all results are boolean: %r" % dresults
-
-        feedback_fn("Finalizing export on %s" % dst_node.name)
-        result = self.rpc.call_finalize_export(dst_node.name, instance,
-                                               snap_disks)
-        msg = result.fail_msg
-        fin_resu = not msg
-        if msg:
-          self.LogWarning("Could not finalize export for instance %s"
-                          " on node %s: %s", instance.name, dst_node.name, msg)
+          dest_ca_pem = \
+            OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                            self.dest_x509_ca)
 
+          (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
+                                                     key_name, dest_ca_pem,
+                                                     timeouts)
       finally:
-        # Remove all snapshots
-        assert len(removed_snaps) == len(instance.disks)
-        for idx, removed in enumerate(removed_snaps):
-          if not removed:
-            self._RemoveSnapshot(feedback_fn, snap_disks, idx)
+        helper.Cleanup()
+
+      # Check for backwards compatibility
+      assert len(dresults) == len(instance.disks)
+      assert compat.all(isinstance(i, bool) for i in dresults), \
+             "Not all results are boolean: %r" % dresults
 
     finally:
       if activate_disks:
         feedback_fn("Deactivating disks for %s" % instance.name)
         _ShutdownInstanceDisks(self, instance)
 
-    self._CleanupExports(feedback_fn)
+    # Remove instance if requested
+    if self.remove_instance:
+      if not (compat.all(dresults) and fin_resu):
+        feedback_fn("Not removing instance %s as parts of the export failed" %
+                    instance.name)
+      else:
+        feedback_fn("Removing instance %s" % instance.name)
+        _RemoveInstance(self, feedback_fn, instance,
+                        self.ignore_remove_failures)
+
+    if self.export_mode == constants.EXPORT_MODE_LOCAL:
+      self._CleanupExports(feedback_fn)
 
     return fin_resu, dresults