Merge branch 'devel-2.1'
[ganeti-local] / lib / cmdlib.py
index 52744bd..9f8b1ff 100644 (file)
@@ -44,6 +44,11 @@ from ganeti import constants
 from ganeti import objects
 from ganeti import serializer
 from ganeti import ssconf
+from ganeti import uidpool
+from ganeti import compat
+from ganeti import masterd
+
+import ganeti.masterd.instance # pylint: disable-msg=W0611
 
 
 class LogicalUnit(object):
@@ -560,6 +565,17 @@ def _CheckNodeHasOS(lu, node, os_name, force_variant):
     _CheckOSVariant(result.payload, os_name)
 
 
+def _RequireFileStorage():
+  """Checks that file storage is enabled.
+
+  @raise errors.OpPrereqError: when file storage is disabled
+
+  """
+  if not constants.ENABLE_FILE_STORAGE:
+    raise errors.OpPrereqError("File storage disabled at configure time",
+                               errors.ECODE_INVAL)
+
+
 def _CheckDiskTemplate(template):
   """Ensure a given disk template is valid.
 
@@ -568,6 +584,20 @@ def _CheckDiskTemplate(template):
     msg = ("Invalid disk template name '%s', valid templates are: %s" %
            (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
     raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+  if template == constants.DT_FILE:
+    _RequireFileStorage()
+
+
+def _CheckStorageType(storage_type):
+  """Ensure a given storage type is valid.
+
+  """
+  if storage_type not in constants.VALID_STORAGE_TYPES:
+    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
+                               errors.ECODE_INVAL)
+  if storage_type == constants.ST_FILE:
+    _RequireFileStorage()
+
 
 
 def _CheckInstanceDown(lu, instance, reason):
@@ -893,13 +923,6 @@ def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
   return faulty
 
 
-def _FormatTimestamp(secs):
-  """Formats a Unix timestamp with the local timezone.
-
-  """
-  return time.strftime("%F %T %Z", time.gmtime(secs))
-
-
 class LUPostInitCluster(LogicalUnit):
   """Logical unit for running hooks after cluster initialization.
 
@@ -991,45 +1014,6 @@ class LUDestroyCluster(LogicalUnit):
     return master
 
 
-def _VerifyCertificateInner(filename, expired, not_before, not_after, now,
-                            warn_days=constants.SSL_CERT_EXPIRATION_WARN,
-                            error_days=constants.SSL_CERT_EXPIRATION_ERROR):
-  """Verifies certificate details for LUVerifyCluster.
-
-  """
-  if expired:
-    msg = "Certificate %s is expired" % filename
-
-    if not_before is not None and not_after is not None:
-      msg += (" (valid from %s to %s)" %
-              (_FormatTimestamp(not_before),
-               _FormatTimestamp(not_after)))
-    elif not_before is not None:
-      msg += " (valid from %s)" % _FormatTimestamp(not_before)
-    elif not_after is not None:
-      msg += " (valid until %s)" % _FormatTimestamp(not_after)
-
-    return (LUVerifyCluster.ETYPE_ERROR, msg)
-
-  elif not_before is not None and not_before > now:
-    return (LUVerifyCluster.ETYPE_WARNING,
-            "Certificate %s not yet valid (valid from %s)" %
-            (filename, _FormatTimestamp(not_before)))
-
-  elif not_after is not None:
-    remaining_days = int((not_after - now) / (24 * 3600))
-
-    msg = ("Certificate %s expires in %d days" % (filename, remaining_days))
-
-    if remaining_days <= error_days:
-      return (LUVerifyCluster.ETYPE_ERROR, msg)
-
-    if remaining_days <= warn_days:
-      return (LUVerifyCluster.ETYPE_WARNING, msg)
-
-  return (None, None)
-
-
 def _VerifyCertificate(filename):
   """Verifies a certificate for LUVerifyCluster.
 
@@ -1044,11 +1028,23 @@ def _VerifyCertificate(filename):
     return (LUVerifyCluster.ETYPE_ERROR,
             "Failed to load X509 certificate %s: %s" % (filename, err))
 
-  # Depending on the pyOpenSSL version, this can just return (None, None)
-  (not_before, not_after) = utils.GetX509CertValidity(cert)
+  (errcode, msg) = \
+    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
+                                constants.SSL_CERT_EXPIRATION_ERROR)
+
+  if msg:
+    fnamemsg = "While verifying %s: %s" % (filename, msg)
+  else:
+    fnamemsg = None
 
-  return _VerifyCertificateInner(filename, cert.has_expired(),
-                                 not_before, not_after, time.time())
+  if errcode is None:
+    return (None, fnamemsg)
+  elif errcode == utils.CERT_WARNING:
+    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
+  elif errcode == utils.CERT_ERROR:
+    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
+
+  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
 
 
 class LUVerifyCluster(LogicalUnit):
@@ -1662,6 +1658,7 @@ class LUVerifyCluster(LogicalUnit):
 
     vg_name = self.cfg.GetVGName()
     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
+    cluster = self.cfg.GetClusterInfo()
     nodelist = utils.NiceSort(self.cfg.GetNodeList())
     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
@@ -1680,6 +1677,8 @@ class LUVerifyCluster(LogicalUnit):
     file_names = ssconf.SimpleStore().GetFileList()
     file_names.extend(constants.ALL_CERT_FILES)
     file_names.extend(master_files)
+    if cluster.modify_etc_hosts:
+      file_names.append(constants.ETC_HOSTS)
 
     local_checksums = utils.FingerprintFiles(file_names)
 
@@ -1743,7 +1742,6 @@ class LUVerifyCluster(LogicalUnit):
                                            self.cfg.GetClusterName())
     nvinfo_endtime = time.time()
 
-    cluster = self.cfg.GetClusterInfo()
     master_node = self.cfg.GetMasterNode()
     all_drbd_map = self.cfg.ComputeDRBDMap()
 
@@ -2229,8 +2227,11 @@ class LUSetClusterParams(LogicalUnit):
     """Check parameters
 
     """
-    if not hasattr(self.op, "candidate_pool_size"):
-      self.op.candidate_pool_size = None
+    for attr in ["candidate_pool_size",
+                 "uid_pool", "add_uids", "remove_uids"]:
+      if not hasattr(self.op, attr):
+        setattr(self.op, attr, None)
+
     if self.op.candidate_pool_size is not None:
       try:
         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
@@ -2240,8 +2241,18 @@ class LUSetClusterParams(LogicalUnit):
       if self.op.candidate_pool_size < 1:
         raise errors.OpPrereqError("At least one master candidate needed",
                                    errors.ECODE_INVAL)
+
     _CheckBooleanOpField(self.op, "maintain_node_health")
 
+    if self.op.uid_pool:
+      uidpool.CheckUidPool(self.op.uid_pool)
+
+    if self.op.add_uids:
+      uidpool.CheckUidPool(self.op.add_uids)
+
+    if self.op.remove_uids:
+      uidpool.CheckUidPool(self.op.remove_uids)
+
   def ExpandNames(self):
     # FIXME: in the future maybe other cluster params won't require checking on
     # all nodes to be modified.
@@ -2333,7 +2344,7 @@ class LUSetClusterParams(LogicalUnit):
                                    "\n".join(nic_errors))
 
     # hypervisor list/parameters
-    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
+    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
     if self.op.hvparams:
       if not isinstance(self.op.hvparams, dict):
         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
@@ -2363,6 +2374,7 @@ class LUSetClusterParams(LogicalUnit):
             else:
               self.new_os_hvp[os_name][hv_name].update(hv_dict)
 
+    # changes to the hypervisor list
     if self.op.enabled_hypervisors is not None:
       self.hv_list = self.op.enabled_hypervisors
       if not self.hv_list:
@@ -2375,6 +2387,16 @@ class LUSetClusterParams(LogicalUnit):
                                    " entries: %s" %
                                    utils.CommaJoin(invalid_hvs),
                                    errors.ECODE_INVAL)
+      for hv in self.hv_list:
+        # if the hypervisor doesn't already exist in the cluster
+        # hvparams, we initialize it to empty, and then (in both
+        # cases) we make sure to fill the defaults, as we might not
+        # have a complete defaults list if the hypervisor wasn't
+        # enabled before
+        if hv not in new_hvp:
+          new_hvp[hv] = {}
+        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
+        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
     else:
       self.hv_list = cluster.enabled_hypervisors
 
@@ -2422,6 +2444,7 @@ class LUSetClusterParams(LogicalUnit):
     if self.op.os_hvp:
       self.cluster.os_hvp = self.new_os_hvp
     if self.op.enabled_hypervisors is not None:
+      self.cluster.hvparams = self.new_hvparams
       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
     if self.op.beparams:
       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
@@ -2436,6 +2459,15 @@ class LUSetClusterParams(LogicalUnit):
     if self.op.maintain_node_health is not None:
       self.cluster.maintain_node_health = self.op.maintain_node_health
 
+    if self.op.add_uids is not None:
+      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
+
+    if self.op.remove_uids is not None:
+      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
+
+    if self.op.uid_pool is not None:
+      self.cluster.uid_pool = self.op.uid_pool
+
     self.cfg.Update(self.cluster, feedback_fn)
 
 
@@ -2820,6 +2852,12 @@ class LURemoveNode(LogicalUnit):
       self.LogWarning("Errors encountered on the remote node while leaving"
                       " the cluster: %s", msg)
 
+    # Remove node from our /etc/hosts
+    if self.cfg.GetClusterInfo().modify_etc_hosts:
+      # FIXME: this should be done via an rpc call to node daemon
+      utils.RemoveHostFromEtcHosts(node.name)
+      _RedistributeAncillaryFiles(self)
+
 
 class LUQueryNodes(NoHooksLU):
   """Logical unit for querying nodes.
@@ -3076,17 +3114,14 @@ class LUQueryNodeStorage(NoHooksLU):
   REQ_BGL = False
   _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
 
-  def ExpandNames(self):
-    storage_type = self.op.storage_type
-
-    if storage_type not in constants.VALID_STORAGE_TYPES:
-      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
-                                 errors.ECODE_INVAL)
+  def CheckArguments(self):
+    _CheckStorageType(self.op.storage_type)
 
     _CheckOutputFields(static=self._FIELDS_STATIC,
                        dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
                        selected=self.op.output_fields)
 
+  def ExpandNames(self):
     self.needed_locks = {}
     self.share_locks[locking.LEVEL_NODE] = 1
 
@@ -3175,10 +3210,7 @@ class LUModifyNodeStorage(NoHooksLU):
   def CheckArguments(self):
     self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
 
-    storage_type = self.op.storage_type
-    if storage_type not in constants.VALID_STORAGE_TYPES:
-      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
-                                 errors.ECODE_INVAL)
+    _CheckStorageType(self.op.storage_type)
 
   def ExpandNames(self):
     self.needed_locks = {
@@ -3279,15 +3311,19 @@ class LUAddNode(LogicalUnit):
       raise errors.OpPrereqError("Node %s is not in the configuration" % node,
                                  errors.ECODE_NOENT)
 
+    self.changed_primary_ip = False
+
     for existing_node_name in node_list:
       existing_node = cfg.GetNodeInfo(existing_node_name)
 
       if self.op.readd and node == existing_node_name:
-        if (existing_node.primary_ip != primary_ip or
-            existing_node.secondary_ip != secondary_ip):
+        if existing_node.secondary_ip != secondary_ip:
           raise errors.OpPrereqError("Readded node doesn't have the same IP"
                                      " address configuration as before",
                                      errors.ECODE_INVAL)
+        if existing_node.primary_ip != primary_ip:
+          self.changed_primary_ip = True
+
         continue
 
       if (existing_node.primary_ip == primary_ip or
@@ -3359,6 +3395,8 @@ class LUAddNode(LogicalUnit):
       self.LogInfo("Readding a node, the offline/drained flags were reset")
       # if we demote the node, we do cleanup later in the procedure
       new_node.master_candidate = self.master_candidate
+      if self.changed_primary_ip:
+        new_node.primary_ip = self.op.primary_ip
 
     # notify the user about any possible mc promotion
     if new_node.master_candidate:
@@ -3394,6 +3432,7 @@ class LUAddNode(LogicalUnit):
 
     # Add node to our /etc/hosts, and add key to known_hosts
     if self.cfg.GetClusterInfo().modify_etc_hosts:
+      # FIXME: this should be done via an rpc call to node daemon
       utils.AddHostToEtcHosts(new_node.name)
 
     if new_node.secondary_ip != new_node.primary_ip:
@@ -3700,6 +3739,7 @@ class LUQueryClusterInfo(NoHooksLU):
       "mtime": cluster.mtime,
       "uuid": cluster.uuid,
       "tags": list(cluster.GetTags()),
+      "uid_pool": cluster.uid_pool,
       }
 
     return result
@@ -4580,18 +4620,29 @@ class LURemoveInstance(LogicalUnit):
                                  " node %s: %s" %
                                  (instance.name, instance.primary_node, msg))
 
-    logging.info("Removing block devices for instance %s", instance.name)
+    _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
 
-    if not _RemoveDisks(self, instance):
-      if self.op.ignore_failures:
-        feedback_fn("Warning: can't remove instance's disks")
-      else:
-        raise errors.OpExecError("Can't remove instance's disks")
 
-    logging.info("Removing instance %s out of cluster config", instance.name)
+def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
+  """Utility function to remove an instance.
+
+  """
+  logging.info("Removing block devices for instance %s", instance.name)
+
+  if not _RemoveDisks(lu, instance):
+    if not ignore_failures:
+      raise errors.OpExecError("Can't remove instance's disks")
+    feedback_fn("Warning: can't remove instance's disks")
+
+  logging.info("Removing instance %s out of cluster config", instance.name)
+
+  lu.cfg.RemoveInstance(instance.name)
 
-    self.cfg.RemoveInstance(instance.name)
-    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
+  assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \
+    "Instance lock removal conflict"
+
+  # Remove lock for the instance
+  lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
 
 
 class LUQueryInstances(NoHooksLU):
@@ -5805,6 +5856,8 @@ def _GenerateDiskTemplate(lu, template_name,
     if len(secondary_nodes) != 0:
       raise errors.ProgrammerError("Wrong template configuration")
 
+    _RequireFileStorage()
+
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
@@ -5973,7 +6026,7 @@ class LUCreateInstance(LogicalUnit):
   """
   HPATH = "instance-add"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_REQP = ["instance_name", "disks", "disk_template",
+  _OP_REQP = ["instance_name", "disks",
               "mode", "start",
               "wait_for_sync", "ip_check", "nics",
               "hvparams", "beparams"]
@@ -5984,7 +6037,8 @@ class LUCreateInstance(LogicalUnit):
 
     """
     # set optional parameters to none if they don't exist
-    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
+    for attr in ["pnode", "snode", "iallocator", "hypervisor",
+                 "disk_template", "identify_defaults"]:
       if not hasattr(self.op, attr):
         setattr(self.op, attr, None)
 
@@ -6003,10 +6057,6 @@ class LUCreateInstance(LogicalUnit):
       # TODO: make the ip check more flexible and not depend on the name check
       raise errors.OpPrereqError("Cannot do ip checks without a name check",
                                  errors.ECODE_INVAL)
-    if (self.op.disk_template == constants.DT_FILE and
-        not constants.ENABLE_FILE_STORAGE):
-      raise errors.OpPrereqError("File storage disabled at configure time",
-                                 errors.ECODE_INVAL)
     # check disk information: either all adopt, or no adopt
     has_adopt = has_no_adopt = False
     for disk in self.op.disks:
@@ -6015,7 +6065,7 @@ class LUCreateInstance(LogicalUnit):
       else:
         has_no_adopt = True
     if has_adopt and has_no_adopt:
-      raise errors.OpPrereqError("Either all disks have are adoped or none is",
+      raise errors.OpPrereqError("Either all disks are adopted or none is",
                                  errors.ECODE_INVAL)
     if has_adopt:
       if self.op.disk_template != constants.DT_PLAIN:
@@ -6031,162 +6081,21 @@ class LUCreateInstance(LogicalUnit):
 
     self.adopt_disks = has_adopt
 
-  def ExpandNames(self):
-    """ExpandNames for CreateInstance.
-
-    Figure out the right locks for instance creation.
-
-    """
-    self.needed_locks = {}
-
-    # cheap checks, mostly valid constants given
-
     # verify creation mode
     if self.op.mode not in (constants.INSTANCE_CREATE,
                             constants.INSTANCE_IMPORT):
       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
                                  self.op.mode, errors.ECODE_INVAL)
 
-    # disk template and mirror node verification
-    _CheckDiskTemplate(self.op.disk_template)
-
-    if self.op.hypervisor is None:
-      self.op.hypervisor = self.cfg.GetHypervisorType()
-
-    cluster = self.cfg.GetClusterInfo()
-    enabled_hvs = cluster.enabled_hypervisors
-    if self.op.hypervisor not in enabled_hvs:
-      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
-                                 " cluster (%s)" % (self.op.hypervisor,
-                                  ",".join(enabled_hvs)),
-                                 errors.ECODE_STATE)
-
-    # check hypervisor parameter syntax (locally)
-    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
-    filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
-                                  self.op.hvparams)
-    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
-    hv_type.CheckParameterSyntax(filled_hvp)
-    self.hv_full = filled_hvp
-    # check that we don't specify global parameters on an instance
-    _CheckGlobalHvParams(self.op.hvparams)
-
-    # fill and remember the beparams dict
-    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
-    self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
-                                    self.op.beparams)
-
-    #### instance parameters check
-
     # instance name verification
     if self.op.name_check:
-      hostname1 = utils.GetHostInfo(self.op.instance_name)
-      self.op.instance_name = instance_name = hostname1.name
+      self.hostname1 = utils.GetHostInfo(self.op.instance_name)
+      self.op.instance_name = self.hostname1.name
       # used in CheckPrereq for ip ping check
-      self.check_ip = hostname1.ip
+      self.check_ip = self.hostname1.ip
     else:
-      instance_name = self.op.instance_name
       self.check_ip = None
 
-    # this is just a preventive check, but someone might still add this
-    # instance in the meantime, and creation will fail at lock-add time
-    if instance_name in self.cfg.GetInstanceList():
-      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
-                                 instance_name, errors.ECODE_EXISTS)
-
-    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
-
-    # NIC buildup
-    self.nics = []
-    for idx, nic in enumerate(self.op.nics):
-      nic_mode_req = nic.get("mode", None)
-      nic_mode = nic_mode_req
-      if nic_mode is None:
-        nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
-
-      # in routed mode, for the first nic, the default ip is 'auto'
-      if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
-        default_ip_mode = constants.VALUE_AUTO
-      else:
-        default_ip_mode = constants.VALUE_NONE
-
-      # ip validity checks
-      ip = nic.get("ip", default_ip_mode)
-      if ip is None or ip.lower() == constants.VALUE_NONE:
-        nic_ip = None
-      elif ip.lower() == constants.VALUE_AUTO:
-        if not self.op.name_check:
-          raise errors.OpPrereqError("IP address set to auto but name checks"
-                                     " have been skipped. Aborting.",
-                                     errors.ECODE_INVAL)
-        nic_ip = hostname1.ip
-      else:
-        if not utils.IsValidIP(ip):
-          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
-                                     " like a valid IP" % ip,
-                                     errors.ECODE_INVAL)
-        nic_ip = ip
-
-      # TODO: check the ip address for uniqueness
-      if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
-        raise errors.OpPrereqError("Routed nic mode requires an ip address",
-                                   errors.ECODE_INVAL)
-
-      # MAC address verification
-      mac = nic.get("mac", constants.VALUE_AUTO)
-      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
-        mac = utils.NormalizeAndValidateMac(mac)
-
-        try:
-          self.cfg.ReserveMAC(mac, self.proc.GetECId())
-        except errors.ReservationError:
-          raise errors.OpPrereqError("MAC address %s already in use"
-                                     " in cluster" % mac,
-                                     errors.ECODE_NOTUNIQUE)
-
-      # bridge verification
-      bridge = nic.get("bridge", None)
-      link = nic.get("link", None)
-      if bridge and link:
-        raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
-                                   " at the same time", errors.ECODE_INVAL)
-      elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
-        raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
-                                   errors.ECODE_INVAL)
-      elif bridge:
-        link = bridge
-
-      nicparams = {}
-      if nic_mode_req:
-        nicparams[constants.NIC_MODE] = nic_mode_req
-      if link:
-        nicparams[constants.NIC_LINK] = link
-
-      check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
-                                      nicparams)
-      objects.NIC.CheckParameterSyntax(check_params)
-      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
-
-    # disk checks/pre-build
-    self.disks = []
-    for disk in self.op.disks:
-      mode = disk.get("mode", constants.DISK_RDWR)
-      if mode not in constants.DISK_ACCESS_SET:
-        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
-                                   mode, errors.ECODE_INVAL)
-      size = disk.get("size", None)
-      if size is None:
-        raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
-      try:
-        size = int(size)
-      except (TypeError, ValueError):
-        raise errors.OpPrereqError("Invalid disk size '%s'" % size,
-                                   errors.ECODE_INVAL)
-      new_disk = {"size": size, "mode": mode}
-      if "adopt" in disk:
-        new_disk["adopt"] = disk["adopt"]
-      self.disks.append(new_disk)
-
     # file storage checks
     if (self.op.file_driver and
         not self.op.file_driver in constants.FILE_DRIVER):
@@ -6203,6 +6112,41 @@ class LUCreateInstance(LogicalUnit):
                                  " node must be given",
                                  errors.ECODE_INVAL)
 
+    if self.op.mode == constants.INSTANCE_IMPORT:
+      # On import force_variant must be True, because if we forced it at
+      # initial install, our only chance when importing it back is that it
+      # works again!
+      self.op.force_variant = True
+
+      if self.op.no_install:
+        self.LogInfo("No-installation mode has no effect during import")
+
+    else: # INSTANCE_CREATE
+      if getattr(self.op, "os_type", None) is None:
+        raise errors.OpPrereqError("No guest OS specified",
+                                   errors.ECODE_INVAL)
+      self.op.force_variant = getattr(self.op, "force_variant", False)
+      if self.op.disk_template is None:
+        raise errors.OpPrereqError("No disk template specified",
+                                   errors.ECODE_INVAL)
+
+  def ExpandNames(self):
+    """ExpandNames for CreateInstance.
+
+    Figure out the right locks for instance creation.
+
+    """
+    self.needed_locks = {}
+
+    instance_name = self.op.instance_name
+    # this is just a preventive check, but someone might still add this
+    # instance in the meantime, and creation will fail at lock-add time
+    if instance_name in self.cfg.GetInstanceList():
+      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
+                                 instance_name, errors.ECODE_EXISTS)
+
+    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
+
     if self.op.iallocator:
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
     else:
@@ -6236,20 +6180,6 @@ class LUCreateInstance(LogicalUnit):
           self.op.src_path = src_path = \
             utils.PathJoin(constants.EXPORT_DIR, src_path)
 
-      # On import force_variant must be True, because if we forced it at
-      # initial install, our only chance when importing it back is that it
-      # works again!
-      self.op.force_variant = True
-
-      if self.op.no_install:
-        self.LogInfo("No-installation mode has no effect during import")
-
-    else: # INSTANCE_CREATE
-      if getattr(self.op, "os_type", None) is None:
-        raise errors.OpPrereqError("No guest OS specified",
-                                   errors.ECODE_INVAL)
-      self.op.force_variant = getattr(self.op, "force_variant", False)
-
   def _RunAllocator(self):
     """Run the allocator based on input opcode.
 
@@ -6321,51 +6251,278 @@ class LUCreateInstance(LogicalUnit):
           self.secondaries)
     return env, nl, nl
 
+  def _ReadExportInfo(self):
+    """Reads the export information from disk.
+
+    It will override the opcode source node and path with the actual
+    information, if these two were not specified before.
+
+    @return: the export information
+
+    """
+    assert self.op.mode == constants.INSTANCE_IMPORT
+
+    src_node = self.op.src_node
+    src_path = self.op.src_path
+
+    if src_node is None:
+      locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+      exp_list = self.rpc.call_export_list(locked_nodes)
+      found = False
+      for node in exp_list:
+        if exp_list[node].fail_msg:
+          continue
+        if src_path in exp_list[node].payload:
+          found = True
+          self.op.src_node = src_node = node
+          self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
+                                                       src_path)
+          break
+      if not found:
+        raise errors.OpPrereqError("No export found for relative path %s" %
+                                    src_path, errors.ECODE_INVAL)
+
+    _CheckNodeOnline(self, src_node)
+    result = self.rpc.call_export_info(src_node, src_path)
+    result.Raise("No export or invalid export found in dir %s" % src_path)
+
+    export_info = objects.SerializableConfigParser.Loads(str(result.payload))
+    if not export_info.has_section(constants.INISECT_EXP):
+      raise errors.ProgrammerError("Corrupted export config",
+                                   errors.ECODE_ENVIRON)
+
+    ei_version = export_info.get(constants.INISECT_EXP, "version")
+    if (int(ei_version) != constants.EXPORT_VERSION):
+      raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
+                                 (ei_version, constants.EXPORT_VERSION),
+                                 errors.ECODE_ENVIRON)
+    return export_info
+
+  def _ReadExportParams(self, einfo):
+    """Use export parameters as defaults.
+
+    In case the opcode doesn't specify (as in override) some instance
+    parameters, then try to use them from the export information, if
+    that declares them.
+
+    """
+    self.op.os_type = einfo.get(constants.INISECT_EXP, "os")
+
+    if self.op.disk_template is None:
+      if einfo.has_option(constants.INISECT_INS, "disk_template"):
+        self.op.disk_template = einfo.get(constants.INISECT_INS,
+                                          "disk_template")
+      else:
+        raise errors.OpPrereqError("No disk template specified and the export"
+                                   " is missing the disk_template information",
+                                   errors.ECODE_INVAL)
+
+    if not self.op.disks:
+      if einfo.has_option(constants.INISECT_INS, "disk_count"):
+        disks = []
+        # TODO: import the disk iv_name too
+        for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")):
+          disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
+          disks.append({"size": disk_sz})
+        self.op.disks = disks
+      else:
+        raise errors.OpPrereqError("No disk info specified and the export"
+                                   " is missing the disk information",
+                                   errors.ECODE_INVAL)
+
+    if (not self.op.nics and
+        einfo.has_option(constants.INISECT_INS, "nic_count")):
+      nics = []
+      for idx in range(einfo.getint(constants.INISECT_INS, "nic_count")):
+        ndict = {}
+        for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
+          v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
+          ndict[name] = v
+        nics.append(ndict)
+      self.op.nics = nics
+
+    if (self.op.hypervisor is None and
+        einfo.has_option(constants.INISECT_INS, "hypervisor")):
+      self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor")
+    if einfo.has_section(constants.INISECT_HYP):
+      # use the export parameters but do not override the ones
+      # specified by the user
+      for name, value in einfo.items(constants.INISECT_HYP):
+        if name not in self.op.hvparams:
+          self.op.hvparams[name] = value
+
+    if einfo.has_section(constants.INISECT_BEP):
+      # use the parameters, without overriding
+      for name, value in einfo.items(constants.INISECT_BEP):
+        if name not in self.op.beparams:
+          self.op.beparams[name] = value
+    else:
+      # try to read the parameters old style, from the main section
+      for name in constants.BES_PARAMETERS:
+        if (name not in self.op.beparams and
+            einfo.has_option(constants.INISECT_INS, name)):
+          self.op.beparams[name] = einfo.get(constants.INISECT_INS, name)
+
+  def _RevertToDefaults(self, cluster):
+    """Revert the instance parameters to the default values.
+
+    """
+    # hvparams
+    hv_defs = cluster.GetHVDefaults(self.op.hypervisor, self.op.os_type)
+    for name in self.op.hvparams.keys():
+      if name in hv_defs and hv_defs[name] == self.op.hvparams[name]:
+        del self.op.hvparams[name]
+    # beparams
+    be_defs = cluster.beparams.get(constants.PP_DEFAULT, {})
+    for name in self.op.beparams.keys():
+      if name in be_defs and be_defs[name] == self.op.beparams[name]:
+        del self.op.beparams[name]
+    # nic params
+    nic_defs = cluster.nicparams.get(constants.PP_DEFAULT, {})
+    for nic in self.op.nics:
+      for name in constants.NICS_PARAMETERS:
+        if name in nic and name in nic_defs and nic[name] == nic_defs[name]:
+          del nic[name]
 
   def CheckPrereq(self):
     """Check prerequisites.
 
     """
+    if self.op.mode == constants.INSTANCE_IMPORT:
+      export_info = self._ReadExportInfo()
+      self._ReadExportParams(export_info)
+
+    _CheckDiskTemplate(self.op.disk_template)
+
     if (not self.cfg.GetVGName() and
         self.op.disk_template not in constants.DTS_NOT_LVM):
       raise errors.OpPrereqError("Cluster does not support lvm-based"
                                  " instances", errors.ECODE_STATE)
 
-    if self.op.mode == constants.INSTANCE_IMPORT:
-      src_node = self.op.src_node
-      src_path = self.op.src_path
+    if self.op.hypervisor is None:
+      self.op.hypervisor = self.cfg.GetHypervisorType()
 
-      if src_node is None:
-        locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
-        exp_list = self.rpc.call_export_list(locked_nodes)
-        found = False
-        for node in exp_list:
-          if exp_list[node].fail_msg:
-            continue
-          if src_path in exp_list[node].payload:
-            found = True
-            self.op.src_node = src_node = node
-            self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
-                                                         src_path)
-            break
-        if not found:
-          raise errors.OpPrereqError("No export found for relative path %s" %
-                                      src_path, errors.ECODE_INVAL)
-
-      _CheckNodeOnline(self, src_node)
-      result = self.rpc.call_export_info(src_node, src_path)
-      result.Raise("No export or invalid export found in dir %s" % src_path)
-
-      export_info = objects.SerializableConfigParser.Loads(str(result.payload))
-      if not export_info.has_section(constants.INISECT_EXP):
-        raise errors.ProgrammerError("Corrupted export config",
-                                     errors.ECODE_ENVIRON)
-
-      ei_version = export_info.get(constants.INISECT_EXP, 'version')
-      if (int(ei_version) != constants.EXPORT_VERSION):
-        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
-                                   (ei_version, constants.EXPORT_VERSION),
-                                   errors.ECODE_ENVIRON)
+    cluster = self.cfg.GetClusterInfo()
+    enabled_hvs = cluster.enabled_hypervisors
+    if self.op.hypervisor not in enabled_hvs:
+      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
+                                 " cluster (%s)" % (self.op.hypervisor,
+                                  ",".join(enabled_hvs)),
+                                 errors.ECODE_STATE)
+
+    # check hypervisor parameter syntax (locally)
+    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
+    filled_hvp = objects.FillDict(cluster.GetHVDefaults(self.op.hypervisor,
+                                                        self.op.os_type),
+                                  self.op.hvparams)
+    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
+    hv_type.CheckParameterSyntax(filled_hvp)
+    self.hv_full = filled_hvp
+    # check that we don't specify global parameters on an instance
+    _CheckGlobalHvParams(self.op.hvparams)
+
+    # fill and remember the beparams dict
+    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
+    self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
+                                    self.op.beparams)
+
+    # now that hvp/bep are in final format, let's reset to defaults,
+    # if told to do so
+    if self.op.identify_defaults:
+      self._RevertToDefaults(cluster)
+
+    # NIC buildup
+    self.nics = []
+    for idx, nic in enumerate(self.op.nics):
+      nic_mode_req = nic.get("mode", None)
+      nic_mode = nic_mode_req
+      if nic_mode is None:
+        nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
+
+      # in routed mode, for the first nic, the default ip is 'auto'
+      if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
+        default_ip_mode = constants.VALUE_AUTO
+      else:
+        default_ip_mode = constants.VALUE_NONE
+
+      # ip validity checks
+      ip = nic.get("ip", default_ip_mode)
+      if ip is None or ip.lower() == constants.VALUE_NONE:
+        nic_ip = None
+      elif ip.lower() == constants.VALUE_AUTO:
+        if not self.op.name_check:
+          raise errors.OpPrereqError("IP address set to auto but name checks"
+                                     " have been skipped. Aborting.",
+                                     errors.ECODE_INVAL)
+        nic_ip = self.hostname1.ip
+      else:
+        if not utils.IsValidIP(ip):
+          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
+                                     " like a valid IP" % ip,
+                                     errors.ECODE_INVAL)
+        nic_ip = ip
+
+      # TODO: check the ip address for uniqueness
+      if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
+        raise errors.OpPrereqError("Routed nic mode requires an ip address",
+                                   errors.ECODE_INVAL)
+
+      # MAC address verification
+      mac = nic.get("mac", constants.VALUE_AUTO)
+      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+        mac = utils.NormalizeAndValidateMac(mac)
+
+        try:
+          self.cfg.ReserveMAC(mac, self.proc.GetECId())
+        except errors.ReservationError:
+          raise errors.OpPrereqError("MAC address %s already in use"
+                                     " in cluster" % mac,
+                                     errors.ECODE_NOTUNIQUE)
+
+      # bridge verification
+      bridge = nic.get("bridge", None)
+      link = nic.get("link", None)
+      if bridge and link:
+        raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
+                                   " at the same time", errors.ECODE_INVAL)
+      elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
+        raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
+                                   errors.ECODE_INVAL)
+      elif bridge:
+        link = bridge
+
+      nicparams = {}
+      if nic_mode_req:
+        nicparams[constants.NIC_MODE] = nic_mode_req
+      if link:
+        nicparams[constants.NIC_LINK] = link
+
+      check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
+                                      nicparams)
+      objects.NIC.CheckParameterSyntax(check_params)
+      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
+
+    # disk checks/pre-build
+    self.disks = []
+    for disk in self.op.disks:
+      mode = disk.get("mode", constants.DISK_RDWR)
+      if mode not in constants.DISK_ACCESS_SET:
+        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
+                                   mode, errors.ECODE_INVAL)
+      size = disk.get("size", None)
+      if size is None:
+        raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
+      try:
+        size = int(size)
+      except (TypeError, ValueError):
+        raise errors.OpPrereqError("Invalid disk size '%s'" % size,
+                                   errors.ECODE_INVAL)
+      new_disk = {"size": size, "mode": mode}
+      if "adopt" in disk:
+        new_disk["adopt"] = disk["adopt"]
+      self.disks.append(new_disk)
+
+    if self.op.mode == constants.INSTANCE_IMPORT:
 
       # Check that the new instance doesn't have less disks than the export
       instance_disks = len(self.disks)
@@ -6376,14 +6533,13 @@ class LUCreateInstance(LogicalUnit):
                                    (instance_disks, export_disks),
                                    errors.ECODE_INVAL)
 
-      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
       disk_images = []
       for idx in range(export_disks):
         option = 'disk%d_dump' % idx
         if export_info.has_option(constants.INISECT_INS, option):
           # FIXME: are the old os-es, disk sizes, etc. useful?
           export_name = export_info.get(constants.INISECT_INS, option)
-          image = utils.PathJoin(src_path, export_name)
+          image = utils.PathJoin(self.op.src_path, export_name)
           disk_images.append(image)
         else:
           disk_images.append(False)
@@ -6391,8 +6547,12 @@ class LUCreateInstance(LogicalUnit):
       self.src_images = disk_images
 
       old_name = export_info.get(constants.INISECT_INS, 'name')
-      # FIXME: int() here could throw a ValueError on broken exports
-      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
+      try:
+        exp_nic_count = export_info.getint(constants.INISECT_INS, 'nic_count')
+      except (TypeError, ValueError), err:
+        raise errors.OpPrereqError("Invalid export file, nic_count is not"
+                                   " an integer: %s" % str(err),
+                                   errors.ECODE_STATE)
       if self.op.instance_name == old_name:
         for idx, nic in enumerate(self.nics):
           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
@@ -6519,19 +6679,18 @@ class LUCreateInstance(LogicalUnit):
     else:
       network_port = None
 
-    ##if self.op.vnc_bind_address is None:
-    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
+    if constants.ENABLE_FILE_STORAGE:
+      # this is needed because os.path.join does not accept None arguments
+      if self.op.file_storage_dir is None:
+        string_file_storage_dir = ""
+      else:
+        string_file_storage_dir = self.op.file_storage_dir
 
-    # this is needed because os.path.join does not accept None arguments
-    if self.op.file_storage_dir is None:
-      string_file_storage_dir = ""
+      # build the full file storage dir path
+      file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
+                                        string_file_storage_dir, instance)
     else:
-      string_file_storage_dir = self.op.file_storage_dir
-
-    # build the full file storage dir path
-    file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
-                                      string_file_storage_dir, instance)
-
+      file_storage_dir = ""
 
     disks = _GenerateDiskTemplate(self,
                                   self.op.disk_template,
@@ -6625,18 +6784,30 @@ class LUCreateInstance(LogicalUnit):
 
       elif self.op.mode == constants.INSTANCE_IMPORT:
         feedback_fn("* running the instance OS import scripts...")
-        src_node = self.op.src_node
-        src_images = self.src_images
-        cluster_name = self.cfg.GetClusterName()
-        # FIXME: pass debug option from opcode to backend
-        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
-                                                         src_node, src_images,
-                                                         cluster_name,
-                                                         self.op.debug_level)
-        msg = import_result.fail_msg
-        if msg:
-          self.LogWarning("Error while importing the disk images for instance"
-                          " %s on node %s: %s" % (instance, pnode_name, msg))
+
+        transfers = []
+
+        for idx, image in enumerate(self.src_images):
+          if not image:
+            continue
+
+          # FIXME: pass debug option from opcode to backend
+          dt = masterd.instance.DiskTransfer("disk/%s" % idx,
+                                             constants.IEIO_FILE, (image, ),
+                                             constants.IEIO_SCRIPT,
+                                             (iobj.disks[idx], idx),
+                                             None)
+          transfers.append(dt)
+
+        import_result = \
+          masterd.instance.TransferInstanceData(self, feedback_fn,
+                                                self.op.src_node, pnode_name,
+                                                self.pnode.secondary_ip,
+                                                iobj, transfers)
+        if not compat.all(import_result):
+          self.LogWarning("Some disks for instance %s on node %s were not"
+                          " imported successfully" % (instance, pnode_name))
+
       else:
         # also checked in the prereq part
         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
@@ -7550,6 +7721,8 @@ class LURepairNodeStorage(NoHooksLU):
   def CheckArguments(self):
     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
 
+    _CheckStorageType(self.op.storage_type)
+
   def ExpandNames(self):
     self.needed_locks = {
       locking.LEVEL_NODE: [self.op.node_name],
@@ -8639,11 +8812,22 @@ class LUExportInstance(LogicalUnit):
     """Check the arguments.
 
     """
+    _CheckBooleanOpField(self.op, "remove_instance")
+    _CheckBooleanOpField(self.op, "ignore_remove_failures")
+
     self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
                                     constants.DEFAULT_SHUTDOWN_TIMEOUT)
+    self.remove_instance = getattr(self.op, "remove_instance", False)
+    self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures",
+                                          False)
+
+    if self.remove_instance and not self.op.shutdown:
+      raise errors.OpPrereqError("Can not remove instance without shutting it"
+                                 " down before")
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+
     # FIXME: lock only instance primary and destination node
     #
     # Sad but true, for now we have do lock all nodes, as we don't know where
@@ -8668,6 +8852,8 @@ class LUExportInstance(LogicalUnit):
       "EXPORT_NODE": self.op.target_node,
       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
       "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
+      # TODO: Generic function for boolean env variables
+      "REMOVE_INSTANCE": str(bool(self.remove_instance)),
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
@@ -8694,11 +8880,100 @@ class LUExportInstance(LogicalUnit):
     _CheckNodeNotDrained(self, self.dst_node.name)
 
     # instance disk type verification
+    # TODO: Implement export support for file-based disks
     for disk in self.instance.disks:
       if disk.dev_type == constants.LD_FILE:
         raise errors.OpPrereqError("Export not supported for instances with"
                                    " file-based disks", errors.ECODE_INVAL)
 
+  def _CreateSnapshots(self, feedback_fn):
+    """Creates an LVM snapshot for every disk of the instance.
+
+    @return: List of snapshots as L{objects.Disk} instances
+
+    """
+    instance = self.instance
+    src_node = instance.primary_node
+
+    vgname = self.cfg.GetVGName()
+
+    snap_disks = []
+
+    for idx, disk in enumerate(instance.disks):
+      feedback_fn("Creating a snapshot of disk/%s on node %s" %
+                  (idx, src_node))
+
+      # result.payload will be a snapshot of an lvm leaf of the one we
+      # passed
+      result = self.rpc.call_blockdev_snapshot(src_node, disk)
+      msg = result.fail_msg
+      if msg:
+        self.LogWarning("Could not snapshot disk/%s on node %s: %s",
+                        idx, src_node, msg)
+        snap_disks.append(False)
+      else:
+        disk_id = (vgname, result.payload)
+        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
+                               logical_id=disk_id, physical_id=disk_id,
+                               iv_name=disk.iv_name)
+        snap_disks.append(new_dev)
+
+    return snap_disks
+
+  def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index):
+    """Removes an LVM snapshot.
+
+    @type snap_disks: list
+    @param snap_disks: The list of all snapshots as returned by
+                       L{_CreateSnapshots}
+    @type disk_index: number
+    @param disk_index: Index of the snapshot to be removed
+    @rtype: bool
+    @return: Whether removal was successful or not
+
+    """
+    disk = snap_disks[disk_index]
+    if disk:
+      src_node = self.instance.primary_node
+
+      feedback_fn("Removing snapshot of disk/%s on node %s" %
+                  (disk_index, src_node))
+
+      result = self.rpc.call_blockdev_remove(src_node, disk)
+      if not result.fail_msg:
+        return True
+
+      self.LogWarning("Could not remove snapshot for disk/%d from node"
+                      " %s: %s", disk_index, src_node, result.fail_msg)
+
+    return False
+
+  def _CleanupExports(self, feedback_fn):
+    """Removes exports of current instance from all other nodes.
+
+    If an instance in a cluster with nodes A..D was exported to node C, its
+    exports will be removed from the nodes A, B and D.
+
+    """
+    nodelist = self.cfg.GetNodeList()
+    nodelist.remove(self.dst_node.name)
+
+    # on one-node clusters nodelist will be empty after the removal
+    # if we proceed the backup would be removed because OpQueryExports
+    # substitutes an empty list with the full cluster node list.
+    iname = self.instance.name
+    if nodelist:
+      feedback_fn("Removing old exports for instance %s" % iname)
+      exportlist = self.rpc.call_export_list(nodelist)
+      for node in exportlist:
+        if exportlist[node].fail_msg:
+          continue
+        if iname in exportlist[node].payload:
+          msg = self.rpc.call_export_remove(node, iname).fail_msg
+          if msg:
+            self.LogWarning("Could not remove older export for instance %s"
+                            " on node %s: %s", iname, node, msg)
+
   def Exec(self, feedback_fn):
     """Export an instance to an image in the cluster.
 
@@ -8712,13 +8987,10 @@ class LUExportInstance(LogicalUnit):
       feedback_fn("Shutting down instance %s" % instance.name)
       result = self.rpc.call_instance_shutdown(src_node, instance,
                                                self.shutdown_timeout)
+      # TODO: Maybe ignore failures if ignore_remove_failures is set
       result.Raise("Could not shutdown instance %s on"
                    " node %s" % (instance.name, src_node))
 
-    vgname = self.cfg.GetVGName()
-
-    snap_disks = []
-
     # set the disks ID correctly since call_instance_start needs the
     # correct drbd minor to create the symlinks
     for disk in instance.disks:
@@ -8733,94 +9005,93 @@ class LUExportInstance(LogicalUnit):
 
     try:
       # per-disk results
-      dresults = []
+      removed_snaps = [False] * len(instance.disks)
+
+      snap_disks = None
       try:
-        for idx, disk in enumerate(instance.disks):
-          feedback_fn("Creating a snapshot of disk/%s on node %s" %
-                      (idx, src_node))
-
-          # result.payload will be a snapshot of an lvm leaf of the one we
-          # passed
-          result = self.rpc.call_blockdev_snapshot(src_node, disk)
-          msg = result.fail_msg
-          if msg:
-            self.LogWarning("Could not snapshot disk/%s on node %s: %s",
-                            idx, src_node, msg)
-            snap_disks.append(False)
-          else:
-            disk_id = (vgname, result.payload)
-            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
-                                   logical_id=disk_id, physical_id=disk_id,
-                                   iv_name=disk.iv_name)
-            snap_disks.append(new_dev)
+        try:
+          snap_disks = self._CreateSnapshots(feedback_fn)
+        finally:
+          if (self.op.shutdown and instance.admin_up and
+              not self.remove_instance):
+            feedback_fn("Starting instance %s" % instance.name)
+            result = self.rpc.call_instance_start(src_node, instance,
+                                                  None, None)
+            msg = result.fail_msg
+            if msg:
+              _ShutdownInstanceDisks(self, instance)
+              raise errors.OpExecError("Could not start instance: %s" % msg)
+
+        assert len(snap_disks) == len(instance.disks)
+        assert len(removed_snaps) == len(instance.disks)
+
+        # TODO: check for size
+
+        def _TransferFinished(idx):
+          logging.debug("Transfer %s finished", idx)
+          if self._RemoveSnapshot(feedback_fn, snap_disks, idx):
+            removed_snaps[idx] = True
+
+        transfers = []
+
+        for idx, dev in enumerate(snap_disks):
+          if not dev:
+            transfers.append(None)
+            continue
 
-      finally:
-        if self.op.shutdown and instance.admin_up:
-          feedback_fn("Starting instance %s" % instance.name)
-          result = self.rpc.call_instance_start(src_node, instance, None, None)
-          msg = result.fail_msg
-          if msg:
-            _ShutdownInstanceDisks(self, instance)
-            raise errors.OpExecError("Could not start instance: %s" % msg)
-
-      # TODO: check for size
-
-      cluster_name = self.cfg.GetClusterName()
-      for idx, dev in enumerate(snap_disks):
-        feedback_fn("Exporting snapshot %s from %s to %s" %
-                    (idx, src_node, dst_node.name))
-        if dev:
-          # FIXME: pass debug from opcode to backend
-          result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
-                                                 instance, cluster_name,
-                                                 idx, self.op.debug_level)
-          msg = result.fail_msg
-          if msg:
-            self.LogWarning("Could not export disk/%s from node %s to"
-                            " node %s: %s", idx, src_node, dst_node.name, msg)
-            dresults.append(False)
-          else:
-            dresults.append(True)
-          msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
-          if msg:
-            self.LogWarning("Could not remove snapshot for disk/%d from node"
-                            " %s: %s", idx, src_node, msg)
-        else:
-          dresults.append(False)
+          path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
+                                dev.physical_id[1])
 
-      feedback_fn("Finalizing export on %s" % dst_node.name)
-      result = self.rpc.call_finalize_export(dst_node.name, instance,
-                                             snap_disks)
-      fin_resu = True
-      msg = result.fail_msg
-      if msg:
-        self.LogWarning("Could not finalize export for instance %s"
-                        " on node %s: %s", instance.name, dst_node.name, msg)
-        fin_resu = False
+          finished_fn = compat.partial(_TransferFinished, idx)
+
+          # FIXME: pass debug option from opcode to backend
+          dt = masterd.instance.DiskTransfer("snapshot/%s" % idx,
+                                             constants.IEIO_SCRIPT, (dev, idx),
+                                             constants.IEIO_FILE, (path, ),
+                                             finished_fn)
+          transfers.append(dt)
+
+        # Actually export data
+        dresults = \
+          masterd.instance.TransferInstanceData(self, feedback_fn,
+                                                src_node, dst_node.name,
+                                                dst_node.secondary_ip,
+                                                instance, transfers)
+
+        assert len(dresults) == len(instance.disks)
+
+        # Check for backwards compatibility
+        assert compat.all(isinstance(i, bool) for i in dresults), \
+               "Not all results are boolean: %r" % dresults
+
+        feedback_fn("Finalizing export on %s" % dst_node.name)
+        result = self.rpc.call_finalize_export(dst_node.name, instance,
+                                               snap_disks)
+        msg = result.fail_msg
+        fin_resu = not msg
+        if msg:
+          self.LogWarning("Could not finalize export for instance %s"
+                          " on node %s: %s", instance.name, dst_node.name, msg)
+
+      finally:
+        # Remove all snapshots
+        assert len(removed_snaps) == len(instance.disks)
+        for idx, removed in enumerate(removed_snaps):
+          if not removed:
+            self._RemoveSnapshot(feedback_fn, snap_disks, idx)
 
     finally:
       if activate_disks:
         feedback_fn("Deactivating disks for %s" % instance.name)
         _ShutdownInstanceDisks(self, instance)
 
-    nodelist = self.cfg.GetNodeList()
-    nodelist.remove(dst_node.name)
+    # Remove instance if requested
+    if self.remove_instance:
+      feedback_fn("Removing instance %s" % instance.name)
+      _RemoveInstance(self, feedback_fn, instance, self.ignore_remove_failures)
+
+    self._CleanupExports(feedback_fn)
 
-    # on one-node clusters nodelist will be empty after the removal
-    # if we proceed the backup would be removed because OpQueryExports
-    # substitutes an empty list with the full cluster node list.
-    iname = instance.name
-    if nodelist:
-      feedback_fn("Removing old exports for instance %s" % iname)
-      exportlist = self.rpc.call_export_list(nodelist)
-      for node in exportlist:
-        if exportlist[node].fail_msg:
-          continue
-        if iname in exportlist[node].payload:
-          msg = self.rpc.call_export_remove(node, iname).fail_msg
-          if msg:
-            self.LogWarning("Could not remove older export for instance %s"
-                            " on node %s: %s", iname, node, msg)
     return fin_resu, dresults