Merge branch 'devel-2.1'
[ganeti-local] / lib / cmdlib.py
index 39e7d06..9f8b1ff 100644 (file)
@@ -33,6 +33,7 @@ import re
 import platform
 import logging
 import copy
+import OpenSSL
 
 from ganeti import ssh
 from ganeti import utils
@@ -43,6 +44,11 @@ from ganeti import constants
 from ganeti import objects
 from ganeti import serializer
 from ganeti import ssconf
+from ganeti import uidpool
+from ganeti import compat
+from ganeti import masterd
+
+import ganeti.masterd.instance # pylint: disable-msg=W0611
 
 
 class LogicalUnit(object):
@@ -541,6 +547,75 @@ def _CheckNodeNotDrained(lu, node):
                                errors.ECODE_INVAL)
 
 
+def _CheckNodeHasOS(lu, node, os_name, force_variant):
+  """Ensure that a node supports a given OS.
+
+  @param lu: the LU on behalf of which we make the check
+  @param node: the node to check
+  @param os_name: the OS to query about
+  @param force_variant: whether to ignore variant errors
+  @raise errors.OpPrereqError: if the node is not supporting the OS
+
+  """
+  result = lu.rpc.call_os_get(node, os_name)
+  result.Raise("OS '%s' not in supported OS list for node %s" %
+               (os_name, node),
+               prereq=True, ecode=errors.ECODE_INVAL)
+  if not force_variant:
+    _CheckOSVariant(result.payload, os_name)
+
+
+def _RequireFileStorage():
+  """Checks that file storage is enabled.
+
+  @raise errors.OpPrereqError: when file storage is disabled
+
+  """
+  if not constants.ENABLE_FILE_STORAGE:
+    raise errors.OpPrereqError("File storage disabled at configure time",
+                               errors.ECODE_INVAL)
+
+
+def _CheckDiskTemplate(template):
+  """Ensure a given disk template is valid.
+
+  """
+  if template not in constants.DISK_TEMPLATES:
+    msg = ("Invalid disk template name '%s', valid templates are: %s" %
+           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
+    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+  if template == constants.DT_FILE:
+    _RequireFileStorage()
+
+
+def _CheckStorageType(storage_type):
+  """Ensure a given storage type is valid.
+
+  """
+  if storage_type not in constants.VALID_STORAGE_TYPES:
+    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
+                               errors.ECODE_INVAL)
+  if storage_type == constants.ST_FILE:
+    _RequireFileStorage()
+
+
+
+def _CheckInstanceDown(lu, instance, reason):
+  """Ensure that an instance is not running."""
+  if instance.admin_up:
+    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
+                               (instance.name, reason), errors.ECODE_STATE)
+
+  pnode = instance.primary_node
+  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
+  ins_l.Raise("Can't contact node %s for instance information" % pnode,
+              prereq=True, ecode=errors.ECODE_ENVIRON)
+
+  if instance.name in ins_l.payload:
+    raise errors.OpPrereqError("Instance %s is running, %s" %
+                               (instance.name, reason), errors.ECODE_STATE)
+
+
 def _ExpandItemName(fn, name, kind):
   """Expand an item name.
 
@@ -939,6 +1014,39 @@ class LUDestroyCluster(LogicalUnit):
     return master
 
 
+def _VerifyCertificate(filename):
+  """Verifies a certificate for LUVerifyCluster.
+
+  @type filename: string
+  @param filename: Path to PEM file
+
+  """
+  try:
+    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                           utils.ReadFile(filename))
+  except Exception, err: # pylint: disable-msg=W0703
+    return (LUVerifyCluster.ETYPE_ERROR,
+            "Failed to load X509 certificate %s: %s" % (filename, err))
+
+  (errcode, msg) = \
+    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
+                                constants.SSL_CERT_EXPIRATION_ERROR)
+
+  if msg:
+    fnamemsg = "While verifying %s: %s" % (filename, msg)
+  else:
+    fnamemsg = None
+
+  if errcode is None:
+    return (None, fnamemsg)
+  elif errcode == utils.CERT_WARNING:
+    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
+  elif errcode == utils.CERT_ERROR:
+    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
+
+  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
+
+
 class LUVerifyCluster(LogicalUnit):
   """Verifies the cluster status.
 
@@ -953,6 +1061,7 @@ class LUVerifyCluster(LogicalUnit):
   TINSTANCE = "instance"
 
   ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
+  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
   EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
   EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
   EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
@@ -978,6 +1087,44 @@ class LUVerifyCluster(LogicalUnit):
   ETYPE_ERROR = "ERROR"
   ETYPE_WARNING = "WARNING"
 
+  class NodeImage(object):
+    """A class representing the logical and physical status of a node.
+
+    @ivar volumes: a structure as returned from
+        L{ganeti.backend.GetVolumeList} (runtime)
+    @ivar instances: a list of running instances (runtime)
+    @ivar pinst: list of configured primary instances (config)
+    @ivar sinst: list of configured secondary instances (config)
+    @ivar sbp: diction of {secondary-node: list of instances} of all peers
+        of this node (config)
+    @ivar mfree: free memory, as reported by hypervisor (runtime)
+    @ivar dfree: free disk, as reported by the node (runtime)
+    @ivar offline: the offline status (config)
+    @type rpc_fail: boolean
+    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
+        not whether the individual keys were correct) (runtime)
+    @type lvm_fail: boolean
+    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
+    @type hyp_fail: boolean
+    @ivar hyp_fail: whether the RPC call didn't return the instance list
+    @type ghost: boolean
+    @ivar ghost: whether this is a known node or not (config)
+
+    """
+    def __init__(self, offline=False):
+      self.volumes = {}
+      self.instances = []
+      self.pinst = []
+      self.sinst = []
+      self.sbp = {}
+      self.mfree = 0
+      self.dfree = 0
+      self.offline = offline
+      self.rpc_fail = False
+      self.lvm_fail = False
+      self.hyp_fail = False
+      self.ghost = False
+
   def ExpandNames(self):
     self.needed_locks = {
       locking.LEVEL_NODE: locking.ALL_SET,
@@ -1022,8 +1169,7 @@ class LUVerifyCluster(LogicalUnit):
     if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
       self.bad = self.bad or cond
 
-  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
-                  node_result, master_files, drbd_map, vg_name):
+  def _VerifyNode(self, ninfo, nresult):
     """Run multiple tests against a node.
 
     Test list:
@@ -1033,45 +1179,41 @@ class LUVerifyCluster(LogicalUnit):
       - checks config file checksum
       - checks ssh to other nodes
 
-    @type nodeinfo: L{objects.Node}
-    @param nodeinfo: the node to check
-    @param file_list: required list of files
-    @param local_cksum: dictionary of local files and their checksums
-    @param node_result: the results from the node
-    @param master_files: list of files that only masters should have
-    @param drbd_map: the useddrbd minors for this node, in
-        form of minor: (instance, must_exist) which correspond to instances
-        and their running status
-    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the results from the node
+    @rtype: boolean
+    @return: whether overall this call was successful (and we can expect
+         reasonable values in the respose)
 
     """
-    node = nodeinfo.name
+    node = ninfo.name
     _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
 
-    # main result, node_result should be a non-empty dict
-    test = not node_result or not isinstance(node_result, dict)
+    # main result, nresult should be a non-empty dict
+    test = not nresult or not isinstance(nresult, dict)
     _ErrorIf(test, self.ENODERPC, node,
                   "unable to verify node: no data returned")
     if test:
-      return
+      return False
 
     # compares ganeti version
     local_version = constants.PROTOCOL_VERSION
-    remote_version = node_result.get('version', None)
+    remote_version = nresult.get("version", None)
     test = not (remote_version and
                 isinstance(remote_version, (list, tuple)) and
                 len(remote_version) == 2)
     _ErrorIf(test, self.ENODERPC, node,
              "connection to node returned invalid data")
     if test:
-      return
+      return False
 
     test = local_version != remote_version[0]
     _ErrorIf(test, self.ENODEVERSION, node,
              "incompatible protocol versions: master %s,"
              " node %s", local_version, remote_version[0])
     if test:
-      return
+      return False
 
     # node seems compatible, we can actually try to look into its results
 
@@ -1082,111 +1224,122 @@ class LUVerifyCluster(LogicalUnit):
                   constants.RELEASE_VERSION, remote_version[1],
                   code=self.ETYPE_WARNING)
 
-    # checks vg existence and size > 20G
-    if vg_name is not None:
-      vglist = node_result.get(constants.NV_VGLIST, None)
-      test = not vglist
-      _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
-      if not test:
-        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
-                                              constants.MIN_VG_SIZE)
-        _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
+    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
+    if isinstance(hyp_result, dict):
+      for hv_name, hv_result in hyp_result.iteritems():
+        test = hv_result is not None
+        _ErrorIf(test, self.ENODEHV, node,
+                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
 
-    # checks config file checksum
 
-    remote_cksum = node_result.get(constants.NV_FILELIST, None)
-    test = not isinstance(remote_cksum, dict)
-    _ErrorIf(test, self.ENODEFILECHECK, node,
-             "node hasn't returned file checksum data")
+    test = nresult.get(constants.NV_NODESETUP,
+                           ["Missing NODESETUP results"])
+    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
+             "; ".join(test))
+
+    return True
+
+  def _VerifyNodeTime(self, ninfo, nresult,
+                      nvinfo_starttime, nvinfo_endtime):
+    """Check the node time.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param nvinfo_starttime: the start time of the RPC call
+    @param nvinfo_endtime: the end time of the RPC call
+
+    """
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    ntime = nresult.get(constants.NV_TIME, None)
+    try:
+      ntime_merged = utils.MergeTime(ntime)
+    except (ValueError, TypeError):
+      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
+      return
+
+    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
+      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
+    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
+      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
+    else:
+      ntime_diff = None
+
+    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
+             "Node time diverges by at least %s from master node time",
+             ntime_diff)
+
+  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
+    """Check the node time.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param vg_name: the configured VG name
+
+    """
+    if vg_name is None:
+      return
+
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    # checks vg existence and size > 20G
+    vglist = nresult.get(constants.NV_VGLIST, None)
+    test = not vglist
+    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
+    if not test:
+      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
+                                            constants.MIN_VG_SIZE)
+      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
+
+    # check pv names
+    pvlist = nresult.get(constants.NV_PVLIST, None)
+    test = pvlist is None
+    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
     if not test:
-      for file_name in file_list:
-        node_is_mc = nodeinfo.master_candidate
-        must_have = (file_name not in master_files) or node_is_mc
-        # missing
-        test1 = file_name not in remote_cksum
-        # invalid checksum
-        test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
-        # existing and good
-        test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
-        _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
-                 "file '%s' missing", file_name)
-        _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
-                 "file '%s' has wrong checksum", file_name)
-        # not candidate and this is not a must-have file
-        _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
-                 "file '%s' should not exist on non master"
-                 " candidates (and the file is outdated)", file_name)
-        # all good, except non-master/non-must have combination
-        _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
-                 "file '%s' should not exist"
-                 " on non master candidates", file_name)
-
-    # checks ssh to any
-
-    test = constants.NV_NODELIST not in node_result
+      # check that ':' is not present in PV names, since it's a
+      # special character for lvcreate (denotes the range of PEs to
+      # use on the PV)
+      for _, pvname, owner_vg in pvlist:
+        test = ":" in pvname
+        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
+                 " '%s' of VG '%s'", pvname, owner_vg)
+
+  def _VerifyNodeNetwork(self, ninfo, nresult):
+    """Check the node time.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+
+    """
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    test = constants.NV_NODELIST not in nresult
     _ErrorIf(test, self.ENODESSH, node,
              "node hasn't returned node ssh connectivity data")
     if not test:
-      if node_result[constants.NV_NODELIST]:
-        for a_node, a_msg in node_result[constants.NV_NODELIST].items():
+      if nresult[constants.NV_NODELIST]:
+        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
           _ErrorIf(True, self.ENODESSH, node,
                    "ssh communication with node '%s': %s", a_node, a_msg)
 
-    test = constants.NV_NODENETTEST not in node_result
+    test = constants.NV_NODENETTEST not in nresult
     _ErrorIf(test, self.ENODENET, node,
              "node hasn't returned node tcp connectivity data")
     if not test:
-      if node_result[constants.NV_NODENETTEST]:
-        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
+      if nresult[constants.NV_NODENETTEST]:
+        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
         for anode in nlist:
           _ErrorIf(True, self.ENODENET, node,
                    "tcp communication with node '%s': %s",
-                   anode, node_result[constants.NV_NODENETTEST][anode])
-
-    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
-    if isinstance(hyp_result, dict):
-      for hv_name, hv_result in hyp_result.iteritems():
-        test = hv_result is not None
-        _ErrorIf(test, self.ENODEHV, node,
-                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
+                   anode, nresult[constants.NV_NODENETTEST][anode])
 
-    # check used drbd list
-    if vg_name is not None:
-      used_minors = node_result.get(constants.NV_DRBDLIST, [])
-      test = not isinstance(used_minors, (tuple, list))
-      _ErrorIf(test, self.ENODEDRBD, node,
-               "cannot parse drbd status file: %s", str(used_minors))
-      if not test:
-        for minor, (iname, must_exist) in drbd_map.items():
-          test = minor not in used_minors and must_exist
-          _ErrorIf(test, self.ENODEDRBD, node,
-                   "drbd minor %d of instance %s is not active",
-                   minor, iname)
-        for minor in used_minors:
-          test = minor not in drbd_map
-          _ErrorIf(test, self.ENODEDRBD, node,
-                   "unallocated drbd minor %d is in use", minor)
-    test = node_result.get(constants.NV_NODESETUP,
-                           ["Missing NODESETUP results"])
-    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
-             "; ".join(test))
-
-    # check pv names
-    if vg_name is not None:
-      pvlist = node_result.get(constants.NV_PVLIST, None)
-      test = pvlist is None
-      _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
-      if not test:
-        # check that ':' is not present in PV names, since it's a
-        # special character for lvcreate (denotes the range of PEs to
-        # use on the PV)
-        for _, pvname, owner_vg in pvlist:
-          test = ":" in pvname
-          _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
-                   " '%s' of VG '%s'", pvname, owner_vg)
-
-  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
-                      node_instance, n_offline):
+  def _VerifyInstance(self, instance, instanceconfig, node_image):
     """Verify an instance.
 
     This function checks to see if the required block devices are
@@ -1200,81 +1353,264 @@ class LUVerifyCluster(LogicalUnit):
     instanceconfig.MapLVsByNode(node_vol_should)
 
     for node in node_vol_should:
-      if node in n_offline:
-        # ignore missing volumes on offline nodes
+      n_img = node_image[node]
+      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
+        # ignore missing volumes on offline or broken nodes
         continue
       for volume in node_vol_should[node]:
-        test = node not in node_vol_is or volume not in node_vol_is[node]
+        test = volume not in n_img.volumes
         _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
                  "volume %s missing on node %s", volume, node)
 
     if instanceconfig.admin_up:
-      test = ((node_current not in node_instance or
-               not instance in node_instance[node_current]) and
-              node_current not in n_offline)
+      pri_img = node_image[node_current]
+      test = instance not in pri_img.instances and not pri_img.offline
       _ErrorIf(test, self.EINSTANCEDOWN, instance,
                "instance not running on its primary node %s",
                node_current)
 
-    for node in node_instance:
+    for node, n_img in node_image.items():
       if (not node == node_current):
-        test = instance in node_instance[node]
+        test = instance in n_img.instances
         _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
                  "instance should not run on node %s", node)
 
-  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
+  def _VerifyOrphanVolumes(self, node_vol_should, node_image):
     """Verify if there are any unknown volumes in the cluster.
 
     The .os, .swap and backup volumes are ignored. All other volumes are
     reported as unknown.
 
     """
-    for node in node_vol_is:
-      for volume in node_vol_is[node]:
+    for node, n_img in node_image.items():
+      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
+        # skip non-healthy nodes
+        continue
+      for volume in n_img.volumes:
         test = (node not in node_vol_should or
                 volume not in node_vol_should[node])
         self._ErrorIf(test, self.ENODEORPHANLV, node,
                       "volume %s is unknown", volume)
 
-  def _VerifyOrphanInstances(self, instancelist, node_instance):
+  def _VerifyOrphanInstances(self, instancelist, node_image):
     """Verify the list of running instances.
 
     This checks what instances are running but unknown to the cluster.
 
     """
-    for node in node_instance:
-      for o_inst in node_instance[node]:
+    for node, n_img in node_image.items():
+      for o_inst in n_img.instances:
         test = o_inst not in instancelist
         self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
                       "instance %s on node %s should not exist", o_inst, node)
 
-  def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
+  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
     """Verify N+1 Memory Resilience.
 
-    Check that if one single node dies we can still start all the instances it
-    was primary for.
+    Check that if one single node dies we can still start all the
+    instances it was primary for.
 
     """
-    for node, nodeinfo in node_info.iteritems():
-      # This code checks that every node which is now listed as secondary has
-      # enough memory to host all instances it is supposed to should a single
-      # other node in the cluster fail.
+    for node, n_img in node_image.items():
+      # This code checks that every node which is now listed as
+      # secondary has enough memory to host all instances it is
+      # supposed to should a single other node in the cluster fail.
       # FIXME: not ready for failover to an arbitrary node
       # FIXME: does not support file-backed instances
-      # WARNING: we currently take into account down instances as well as up
-      # ones, considering that even if they're down someone might want to start
-      # them even in the event of a node failure.
-      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
+      # WARNING: we currently take into account down instances as well
+      # as up ones, considering that even if they're down someone
+      # might want to start them even in the event of a node failure.
+      for prinode, instances in n_img.sbp.items():
         needed_mem = 0
         for instance in instances:
           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
           if bep[constants.BE_AUTO_BALANCE]:
             needed_mem += bep[constants.BE_MEMORY]
-        test = nodeinfo['mfree'] < needed_mem
+        test = n_img.mfree < needed_mem
         self._ErrorIf(test, self.ENODEN1, node,
                       "not enough memory on to accommodate"
                       " failovers should peer node %s fail", prinode)
 
+  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
+                       master_files):
+    """Verifies and computes the node required file checksums.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param file_list: required list of files
+    @param local_cksum: dictionary of local files and their checksums
+    @param master_files: list of files that only masters should have
+
+    """
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    remote_cksum = nresult.get(constants.NV_FILELIST, None)
+    test = not isinstance(remote_cksum, dict)
+    _ErrorIf(test, self.ENODEFILECHECK, node,
+             "node hasn't returned file checksum data")
+    if test:
+      return
+
+    for file_name in file_list:
+      node_is_mc = ninfo.master_candidate
+      must_have = (file_name not in master_files) or node_is_mc
+      # missing
+      test1 = file_name not in remote_cksum
+      # invalid checksum
+      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
+      # existing and good
+      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
+      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
+               "file '%s' missing", file_name)
+      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
+               "file '%s' has wrong checksum", file_name)
+      # not candidate and this is not a must-have file
+      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
+               "file '%s' should not exist on non master"
+               " candidates (and the file is outdated)", file_name)
+      # all good, except non-master/non-must have combination
+      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
+               "file '%s' should not exist"
+               " on non master candidates", file_name)
+
+  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_map):
+    """Verifies and the node DRBD status.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param instanceinfo: the dict of instances
+    @param drbd_map: the DRBD map as returned by
+        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
+
+    """
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    # compute the DRBD minors
+    node_drbd = {}
+    for minor, instance in drbd_map[node].items():
+      test = instance not in instanceinfo
+      _ErrorIf(test, self.ECLUSTERCFG, None,
+               "ghost instance '%s' in temporary DRBD map", instance)
+        # ghost instance should not be running, but otherwise we
+        # don't give double warnings (both ghost instance and
+        # unallocated minor in use)
+      if test:
+        node_drbd[minor] = (instance, False)
+      else:
+        instance = instanceinfo[instance]
+        node_drbd[minor] = (instance.name, instance.admin_up)
+
+    # and now check them
+    used_minors = nresult.get(constants.NV_DRBDLIST, [])
+    test = not isinstance(used_minors, (tuple, list))
+    _ErrorIf(test, self.ENODEDRBD, node,
+             "cannot parse drbd status file: %s", str(used_minors))
+    if test:
+      # we cannot check drbd status
+      return
+
+    for minor, (iname, must_exist) in node_drbd.items():
+      test = minor not in used_minors and must_exist
+      _ErrorIf(test, self.ENODEDRBD, node,
+               "drbd minor %d of instance %s is not active", minor, iname)
+    for minor in used_minors:
+      test = minor not in node_drbd
+      _ErrorIf(test, self.ENODEDRBD, node,
+               "unallocated drbd minor %d is in use", minor)
+
+  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
+    """Verifies and updates the node volume data.
+
+    This function will update a L{NodeImage}'s internal structures
+    with data from the remote call.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param nimg: the node image object
+    @param vg_name: the configured VG name
+
+    """
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    nimg.lvm_fail = True
+    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
+    if vg_name is None:
+      pass
+    elif isinstance(lvdata, basestring):
+      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
+               utils.SafeEncode(lvdata))
+    elif not isinstance(lvdata, dict):
+      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
+    else:
+      nimg.volumes = lvdata
+      nimg.lvm_fail = False
+
+  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
+    """Verifies and updates the node instance list.
+
+    If the listing was successful, then updates this node's instance
+    list. Otherwise, it marks the RPC call as failed for the instance
+    list key.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param nimg: the node image object
+
+    """
+    idata = nresult.get(constants.NV_INSTANCELIST, None)
+    test = not isinstance(idata, list)
+    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
+                  " (instancelist): %s", utils.SafeEncode(str(idata)))
+    if test:
+      nimg.hyp_fail = True
+    else:
+      nimg.instances = idata
+
+  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
+    """Verifies and computes a node information map
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param nimg: the node image object
+    @param vg_name: the configured VG name
+
+    """
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    # try to read free memory (from the hypervisor)
+    hv_info = nresult.get(constants.NV_HVINFO, None)
+    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
+    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
+    if not test:
+      try:
+        nimg.mfree = int(hv_info["memory_free"])
+      except (ValueError, TypeError):
+        _ErrorIf(True, self.ENODERPC, node,
+                 "node returned invalid nodeinfo, check hypervisor")
+
+    # FIXME: devise a free space model for file based instances as well
+    if vg_name is not None:
+      test = (constants.NV_VGLIST not in nresult or
+              vg_name not in nresult[constants.NV_VGLIST])
+      _ErrorIf(test, self.ENODELVM, node,
+               "node didn't return data for the volume group '%s'"
+               " - it is either missing or broken", vg_name)
+      if not test:
+        try:
+          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
+        except (ValueError, TypeError):
+          _ErrorIf(True, self.ENODERPC, node,
+                   "node returned invalid LVM info, check LVM status")
+
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -1315,8 +1651,14 @@ class LUVerifyCluster(LogicalUnit):
     for msg in self.cfg.VerifyConfig():
       _ErrorIf(True, self.ECLUSTERCFG, None, msg)
 
+    # Check the cluster certificates
+    for cert_filename in constants.ALL_CERT_FILES:
+      (errcode, msg) = _VerifyCertificate(cert_filename)
+      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
+
     vg_name = self.cfg.GetVGName()
     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
+    cluster = self.cfg.GetClusterInfo()
     nodelist = utils.NiceSort(self.cfg.GetNodeList())
     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
@@ -1324,21 +1666,19 @@ class LUVerifyCluster(LogicalUnit):
                         for iname in instancelist)
     i_non_redundant = [] # Non redundant instances
     i_non_a_balanced = [] # Non auto-balanced instances
-    n_offline = [] # List of offline nodes
-    n_drained = [] # List of nodes being drained
-    node_volume = {}
-    node_instance = {}
-    node_info = {}
-    instance_cfg = {}
+    n_offline = 0 # Count of offline nodes
+    n_drained = 0 # Count of nodes being drained
+    node_vol_should = {}
 
     # FIXME: verify OS list
     # do local checksums
     master_files = [constants.CLUSTER_CONF_FILE]
 
     file_names = ssconf.SimpleStore().GetFileList()
-    file_names.append(constants.SSL_CERT_FILE)
-    file_names.append(constants.RAPI_CERT_FILE)
+    file_names.extend(constants.ALL_CERT_FILES)
     file_names.extend(master_files)
+    if cluster.modify_etc_hosts:
+      file_names.append(constants.ETC_HOSTS)
 
     local_checksums = utils.FingerprintFiles(file_names)
 
@@ -1364,6 +1704,35 @@ class LUVerifyCluster(LogicalUnit):
       node_verify_param[constants.NV_PVLIST] = [vg_name]
       node_verify_param[constants.NV_DRBDLIST] = None
 
+    # Build our expected cluster state
+    node_image = dict((node.name, self.NodeImage(offline=node.offline))
+                      for node in nodeinfo)
+
+    for instance in instancelist:
+      inst_config = instanceinfo[instance]
+
+      for nname in inst_config.all_nodes:
+        if nname not in node_image:
+          # ghost node
+          gnode = self.NodeImage()
+          gnode.ghost = True
+          node_image[nname] = gnode
+
+      inst_config.MapLVsByNode(node_vol_should)
+
+      pnode = inst_config.primary_node
+      node_image[pnode].pinst.append(instance)
+
+      for snode in inst_config.secondary_nodes:
+        nimg = node_image[snode]
+        nimg.sinst.append(instance)
+        if pnode not in nimg.sbp:
+          nimg.sbp[pnode] = []
+        nimg.sbp[pnode].append(instance)
+
+    # At this point, we have the in-memory data structures complete,
+    # except for the runtime information, which we'll gather next
+
     # Due to the way our RPC system works, exact response times cannot be
     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
     # time before and after executing the request, we can at least have a time
@@ -1373,18 +1742,18 @@ class LUVerifyCluster(LogicalUnit):
                                            self.cfg.GetClusterName())
     nvinfo_endtime = time.time()
 
-    cluster = self.cfg.GetClusterInfo()
     master_node = self.cfg.GetMasterNode()
     all_drbd_map = self.cfg.ComputeDRBDMap()
 
     feedback_fn("* Verifying node status")
     for node_i in nodeinfo:
       node = node_i.name
+      nimg = node_image[node]
 
       if node_i.offline:
         if verbose:
           feedback_fn("* Skipping offline node %s" % (node,))
-        n_offline.append(node)
+        n_offline += 1
         continue
 
       if node == master_node:
@@ -1393,7 +1762,7 @@ class LUVerifyCluster(LogicalUnit):
         ntype = "master candidate"
       elif node_i.drained:
         ntype = "drained"
-        n_drained.append(node)
+        n_drained += 1
       else:
         ntype = "regular"
       if verbose:
@@ -1402,128 +1771,38 @@ class LUVerifyCluster(LogicalUnit):
       msg = all_nvinfo[node].fail_msg
       _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
       if msg:
+        nimg.rpc_fail = True
         continue
 
       nresult = all_nvinfo[node].payload
-      node_drbd = {}
-      for minor, instance in all_drbd_map[node].items():
-        test = instance not in instanceinfo
-        _ErrorIf(test, self.ECLUSTERCFG, None,
-                 "ghost instance '%s' in temporary DRBD map", instance)
-          # ghost instance should not be running, but otherwise we
-          # don't give double warnings (both ghost instance and
-          # unallocated minor in use)
-        if test:
-          node_drbd[minor] = (instance, False)
-        else:
-          instance = instanceinfo[instance]
-          node_drbd[minor] = (instance.name, instance.admin_up)
-
-      self._VerifyNode(node_i, file_names, local_checksums,
-                       nresult, master_files, node_drbd, vg_name)
-
-      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
-      if vg_name is None:
-        node_volume[node] = {}
-      elif isinstance(lvdata, basestring):
-        _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
-                 utils.SafeEncode(lvdata))
-        node_volume[node] = {}
-      elif not isinstance(lvdata, dict):
-        _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
-        continue
-      else:
-        node_volume[node] = lvdata
-
-      # node_instance
-      idata = nresult.get(constants.NV_INSTANCELIST, None)
-      test = not isinstance(idata, list)
-      _ErrorIf(test, self.ENODEHV, node,
-               "rpc call to node failed (instancelist)")
-      if test:
-        continue
-
-      node_instance[node] = idata
 
-      # node_info
-      nodeinfo = nresult.get(constants.NV_HVINFO, None)
-      test = not isinstance(nodeinfo, dict)
-      _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
-      if test:
-        continue
-
-      # Node time
-      ntime = nresult.get(constants.NV_TIME, None)
-      try:
-        ntime_merged = utils.MergeTime(ntime)
-      except (ValueError, TypeError):
-        _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
-
-      if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
-        ntime_diff = abs(nvinfo_starttime - ntime_merged)
-      elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
-        ntime_diff = abs(ntime_merged - nvinfo_endtime)
-      else:
-        ntime_diff = None
-
-      _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
-               "Node time diverges by at least %0.1fs from master node time",
-               ntime_diff)
-
-      if ntime_diff is not None:
-        continue
-
-      try:
-        node_info[node] = {
-          "mfree": int(nodeinfo['memory_free']),
-          "pinst": [],
-          "sinst": [],
-          # dictionary holding all instances this node is secondary for,
-          # grouped by their primary node. Each key is a cluster node, and each
-          # value is a list of instances which have the key as primary and the
-          # current node as secondary.  this is handy to calculate N+1 memory
-          # availability if you can only failover from a primary to its
-          # secondary.
-          "sinst-by-pnode": {},
-        }
-        # FIXME: devise a free space model for file based instances as well
-        if vg_name is not None:
-          test = (constants.NV_VGLIST not in nresult or
-                  vg_name not in nresult[constants.NV_VGLIST])
-          _ErrorIf(test, self.ENODELVM, node,
-                   "node didn't return data for the volume group '%s'"
-                   " - it is either missing or broken", vg_name)
-          if test:
-            continue
-          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
-      except (ValueError, KeyError):
-        _ErrorIf(True, self.ENODERPC, node,
-                 "node returned invalid nodeinfo, check lvm/hypervisor")
-        continue
+      nimg.call_ok = self._VerifyNode(node_i, nresult)
+      self._VerifyNodeNetwork(node_i, nresult)
+      self._VerifyNodeLVM(node_i, nresult, vg_name)
+      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
+                            master_files)
+      self._VerifyNodeDrbd(node_i, nresult, instanceinfo, all_drbd_map)
+      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
 
-    node_vol_should = {}
+      self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
+      self._UpdateNodeInstances(node_i, nresult, nimg)
+      self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
 
     feedback_fn("* Verifying instance status")
     for instance in instancelist:
       if verbose:
         feedback_fn("* Verifying instance %s" % instance)
       inst_config = instanceinfo[instance]
-      self._VerifyInstance(instance, inst_config, node_volume,
-                           node_instance, n_offline)
+      self._VerifyInstance(instance, inst_config, node_image)
       inst_nodes_offline = []
 
-      inst_config.MapLVsByNode(node_vol_should)
-
-      instance_cfg[instance] = inst_config
-
       pnode = inst_config.primary_node
-      _ErrorIf(pnode not in node_info and pnode not in n_offline,
+      pnode_img = node_image[pnode]
+      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
                self.ENODERPC, pnode, "instance %s, connection to"
                " primary node failed", instance)
-      if pnode in node_info:
-        node_info[pnode]['pinst'].append(instance)
 
-      if pnode in n_offline:
+      if pnode_img.offline:
         inst_nodes_offline.append(pnode)
 
       # If the instance is non-redundant we cannot survive losing its primary
@@ -1531,44 +1810,42 @@ class LUVerifyCluster(LogicalUnit):
       # templates with more than one secondary so that situation is not well
       # supported either.
       # FIXME: does not support file-backed instances
-      if len(inst_config.secondary_nodes) == 0:
+      if not inst_config.secondary_nodes:
         i_non_redundant.append(instance)
-      _ErrorIf(len(inst_config.secondary_nodes) > 1,
-               self.EINSTANCELAYOUT, instance,
-               "instance has multiple secondary nodes", code="WARNING")
+      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
+               instance, "instance has multiple secondary nodes: %s",
+               utils.CommaJoin(inst_config.secondary_nodes),
+               code=self.ETYPE_WARNING)
 
       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
         i_non_a_balanced.append(instance)
 
       for snode in inst_config.secondary_nodes:
-        _ErrorIf(snode not in node_info and snode not in n_offline,
-                 self.ENODERPC, snode,
-                 "instance %s, connection to secondary node"
-                 "failed", instance)
-
-        if snode in node_info:
-          node_info[snode]['sinst'].append(instance)
-          if pnode not in node_info[snode]['sinst-by-pnode']:
-            node_info[snode]['sinst-by-pnode'][pnode] = []
-          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
-
-        if snode in n_offline:
+        s_img = node_image[snode]
+        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
+                 "instance %s, connection to secondary node failed", instance)
+
+        if s_img.offline:
           inst_nodes_offline.append(snode)
 
       # warn that the instance lives on offline nodes
       _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
                "instance lives on offline node(s) %s",
                utils.CommaJoin(inst_nodes_offline))
+      # ... or ghost nodes
+      for node in inst_config.all_nodes:
+        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
+                 "instance lives on ghost node %s", node)
 
     feedback_fn("* Verifying orphan volumes")
-    self._VerifyOrphanVolumes(node_vol_should, node_volume)
+    self._VerifyOrphanVolumes(node_vol_should, node_image)
 
-    feedback_fn("* Verifying remaining instances")
-    self._VerifyOrphanInstances(instancelist, node_instance)
+    feedback_fn("* Verifying oprhan instances")
+    self._VerifyOrphanInstances(instancelist, node_image)
 
     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
       feedback_fn("* Verifying N+1 Memory redundancy")
-      self._VerifyNPlusOneMemory(node_info, instance_cfg)
+      self._VerifyNPlusOneMemory(node_image, instanceinfo)
 
     feedback_fn("* Other Notes")
     if i_non_redundant:
@@ -1580,10 +1857,10 @@ class LUVerifyCluster(LogicalUnit):
                   % len(i_non_a_balanced))
 
     if n_offline:
-      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
+      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
 
     if n_drained:
-      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
+      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
 
     return not self.bad
 
@@ -1950,8 +2227,11 @@ class LUSetClusterParams(LogicalUnit):
     """Check parameters
 
     """
-    if not hasattr(self.op, "candidate_pool_size"):
-      self.op.candidate_pool_size = None
+    for attr in ["candidate_pool_size",
+                 "uid_pool", "add_uids", "remove_uids"]:
+      if not hasattr(self.op, attr):
+        setattr(self.op, attr, None)
+
     if self.op.candidate_pool_size is not None:
       try:
         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
@@ -1962,6 +2242,17 @@ class LUSetClusterParams(LogicalUnit):
         raise errors.OpPrereqError("At least one master candidate needed",
                                    errors.ECODE_INVAL)
 
+    _CheckBooleanOpField(self.op, "maintain_node_health")
+
+    if self.op.uid_pool:
+      uidpool.CheckUidPool(self.op.uid_pool)
+
+    if self.op.add_uids:
+      uidpool.CheckUidPool(self.op.add_uids)
+
+    if self.op.remove_uids:
+      uidpool.CheckUidPool(self.op.remove_uids)
+
   def ExpandNames(self):
     # FIXME: in the future maybe other cluster params won't require checking on
     # all nodes to be modified.
@@ -2053,7 +2344,7 @@ class LUSetClusterParams(LogicalUnit):
                                    "\n".join(nic_errors))
 
     # hypervisor list/parameters
-    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
+    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
     if self.op.hvparams:
       if not isinstance(self.op.hvparams, dict):
         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
@@ -2083,6 +2374,7 @@ class LUSetClusterParams(LogicalUnit):
             else:
               self.new_os_hvp[os_name][hv_name].update(hv_dict)
 
+    # changes to the hypervisor list
     if self.op.enabled_hypervisors is not None:
       self.hv_list = self.op.enabled_hypervisors
       if not self.hv_list:
@@ -2095,6 +2387,16 @@ class LUSetClusterParams(LogicalUnit):
                                    " entries: %s" %
                                    utils.CommaJoin(invalid_hvs),
                                    errors.ECODE_INVAL)
+      for hv in self.hv_list:
+        # if the hypervisor doesn't already exist in the cluster
+        # hvparams, we initialize it to empty, and then (in both
+        # cases) we make sure to fill the defaults, as we might not
+        # have a complete defaults list if the hypervisor wasn't
+        # enabled before
+        if hv not in new_hvp:
+          new_hvp[hv] = {}
+        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
+        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
     else:
       self.hv_list = cluster.enabled_hypervisors
 
@@ -2110,6 +2412,20 @@ class LUSetClusterParams(LogicalUnit):
           hv_class.CheckParameterSyntax(hv_params)
           _CheckHVParams(self, node_list, hv_name, hv_params)
 
+    if self.op.os_hvp:
+      # no need to check any newly-enabled hypervisors, since the
+      # defaults have already been checked in the above code-block
+      for os_name, os_hvp in self.new_os_hvp.items():
+        for hv_name, hv_params in os_hvp.items():
+          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
+          # we need to fill in the new os_hvp on top of the actual hv_p
+          cluster_defaults = self.new_hvparams.get(hv_name, {})
+          new_osp = objects.FillDict(cluster_defaults, hv_params)
+          hv_class = hypervisor.GetHypervisor(hv_name)
+          hv_class.CheckParameterSyntax(new_osp)
+          _CheckHVParams(self, node_list, hv_name, new_osp)
+
+
   def Exec(self, feedback_fn):
     """Change the parameters of the cluster.
 
@@ -2128,6 +2444,7 @@ class LUSetClusterParams(LogicalUnit):
     if self.op.os_hvp:
       self.cluster.os_hvp = self.new_os_hvp
     if self.op.enabled_hypervisors is not None:
+      self.cluster.hvparams = self.new_hvparams
       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
     if self.op.beparams:
       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
@@ -2139,6 +2456,18 @@ class LUSetClusterParams(LogicalUnit):
       # we need to update the pool size here, otherwise the save will fail
       _AdjustCandidatePool(self, [])
 
+    if self.op.maintain_node_health is not None:
+      self.cluster.maintain_node_health = self.op.maintain_node_health
+
+    if self.op.add_uids is not None:
+      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
+
+    if self.op.remove_uids is not None:
+      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
+
+    if self.op.uid_pool is not None:
+      self.cluster.uid_pool = self.op.uid_pool
+
     self.cfg.Update(self.cluster, feedback_fn)
 
 
@@ -2166,7 +2495,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None):
                     constants.SSH_KNOWN_HOSTS_FILE,
                     constants.RAPI_CERT_FILE,
                     constants.RAPI_USERS_FILE,
-                    constants.HMAC_CLUSTER_KEY,
+                    constants.CONFD_HMAC_KEY,
                    ])
 
   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
@@ -2523,6 +2852,12 @@ class LURemoveNode(LogicalUnit):
       self.LogWarning("Errors encountered on the remote node while leaving"
                       " the cluster: %s", msg)
 
+    # Remove node from our /etc/hosts
+    if self.cfg.GetClusterInfo().modify_etc_hosts:
+      # FIXME: this should be done via an rpc call to node daemon
+      utils.RemoveHostFromEtcHosts(node.name)
+      _RedistributeAncillaryFiles(self)
+
 
 class LUQueryNodes(NoHooksLU):
   """Logical unit for querying nodes.
@@ -2779,17 +3114,14 @@ class LUQueryNodeStorage(NoHooksLU):
   REQ_BGL = False
   _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
 
-  def ExpandNames(self):
-    storage_type = self.op.storage_type
-
-    if storage_type not in constants.VALID_STORAGE_TYPES:
-      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
-                                 errors.ECODE_INVAL)
+  def CheckArguments(self):
+    _CheckStorageType(self.op.storage_type)
 
     _CheckOutputFields(static=self._FIELDS_STATIC,
                        dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
                        selected=self.op.output_fields)
 
+  def ExpandNames(self):
     self.needed_locks = {}
     self.share_locks[locking.LEVEL_NODE] = 1
 
@@ -2878,10 +3210,7 @@ class LUModifyNodeStorage(NoHooksLU):
   def CheckArguments(self):
     self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
 
-    storage_type = self.op.storage_type
-    if storage_type not in constants.VALID_STORAGE_TYPES:
-      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
-                                 errors.ECODE_INVAL)
+    _CheckStorageType(self.op.storage_type)
 
   def ExpandNames(self):
     self.needed_locks = {
@@ -2982,15 +3311,19 @@ class LUAddNode(LogicalUnit):
       raise errors.OpPrereqError("Node %s is not in the configuration" % node,
                                  errors.ECODE_NOENT)
 
+    self.changed_primary_ip = False
+
     for existing_node_name in node_list:
       existing_node = cfg.GetNodeInfo(existing_node_name)
 
       if self.op.readd and node == existing_node_name:
-        if (existing_node.primary_ip != primary_ip or
-            existing_node.secondary_ip != secondary_ip):
+        if existing_node.secondary_ip != secondary_ip:
           raise errors.OpPrereqError("Readded node doesn't have the same IP"
                                      " address configuration as before",
                                      errors.ECODE_INVAL)
+        if existing_node.primary_ip != primary_ip:
+          self.changed_primary_ip = True
+
         continue
 
       if (existing_node.primary_ip == primary_ip or
@@ -3062,6 +3395,8 @@ class LUAddNode(LogicalUnit):
       self.LogInfo("Readding a node, the offline/drained flags were reset")
       # if we demote the node, we do cleanup later in the procedure
       new_node.master_candidate = self.master_candidate
+      if self.changed_primary_ip:
+        new_node.primary_ip = self.op.primary_ip
 
     # notify the user about any possible mc promotion
     if new_node.master_candidate:
@@ -3097,6 +3432,7 @@ class LUAddNode(LogicalUnit):
 
     # Add node to our /etc/hosts, and add key to known_hosts
     if self.cfg.GetClusterInfo().modify_etc_hosts:
+      # FIXME: this should be done via an rpc call to node daemon
       utils.AddHostToEtcHosts(new_node.name)
 
     if new_node.secondary_ip != new_node.primary_ip:
@@ -3225,7 +3561,7 @@ class LUSetNodeParams(LogicalUnit):
       # candidates
       (mc_remaining, mc_should, _) = \
           self.cfg.GetMasterCandidateStats(exceptions=[node.name])
-      if mc_remaining != mc_should:
+      if mc_remaining < mc_should:
         raise errors.OpPrereqError("Not enough master candidates, please"
                                    " pass auto_promote to allow promotion",
                                    errors.ECODE_INVAL)
@@ -3398,10 +3734,12 @@ class LUQueryClusterInfo(NoHooksLU):
       "master_netdev": cluster.master_netdev,
       "volume_group_name": cluster.volume_group_name,
       "file_storage_dir": cluster.file_storage_dir,
+      "maintain_node_health": cluster.maintain_node_health,
       "ctime": cluster.ctime,
       "mtime": cluster.mtime,
       "uuid": cluster.uuid,
       "tags": list(cluster.GetTags()),
+      "uid_pool": cluster.uid_pool,
       }
 
     return result
@@ -3632,14 +3970,7 @@ def _SafeShutdownInstanceDisks(lu, instance):
   _ShutdownInstanceDisks.
 
   """
-  pnode = instance.primary_node
-  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
-  ins_l.Raise("Can't contact node %s" % pnode)
-
-  if instance.name in ins_l.payload:
-    raise errors.OpExecError("Instance is running, can't shutdown"
-                             " block devices.")
-
+  _CheckInstanceDown(lu, instance, "cannot shutdown disks")
   _ShutdownInstanceDisks(lu, instance)
 
 
@@ -3703,14 +4034,50 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
                                errors.ECODE_NORES)
 
 
-class LUStartupInstance(LogicalUnit):
-  """Starts an instance.
+def _CheckNodesFreeDisk(lu, nodenames, requested):
+  """Checks if nodes have enough free disk space in the default VG.
 
-  """
-  HPATH = "instance-start"
-  HTYPE = constants.HTYPE_INSTANCE
-  _OP_REQP = ["instance_name", "force"]
-  REQ_BGL = False
+  This function check if all given nodes have the needed amount of
+  free disk. In case any node has less disk or we cannot get the
+  information from the node, this function raise an OpPrereqError
+  exception.
+
+  @type lu: C{LogicalUnit}
+  @param lu: a logical unit from which we get configuration data
+  @type nodenames: C{list}
+  @param nodenames: the list of node names to check
+  @type requested: C{int}
+  @param requested: the amount of disk in MiB to check for
+  @raise errors.OpPrereqError: if the node doesn't have enough disk, or
+      we cannot check the node
+
+  """
+  nodeinfo = lu.rpc.call_node_info(nodenames, lu.cfg.GetVGName(),
+                                   lu.cfg.GetHypervisorType())
+  for node in nodenames:
+    info = nodeinfo[node]
+    info.Raise("Cannot get current information from node %s" % node,
+               prereq=True, ecode=errors.ECODE_ENVIRON)
+    vg_free = info.payload.get("vg_free", None)
+    if not isinstance(vg_free, int):
+      raise errors.OpPrereqError("Can't compute free disk space on node %s,"
+                                 " result was '%s'" % (node, vg_free),
+                                 errors.ECODE_ENVIRON)
+    if requested > vg_free:
+      raise errors.OpPrereqError("Not enough disk space on target node %s:"
+                                 " required %d MiB, available %d MiB" %
+                                 (node, requested, vg_free),
+                                 errors.ECODE_NORES)
+
+
+class LUStartupInstance(LogicalUnit):
+  """Starts an instance.
+
+  """
+  HPATH = "instance-start"
+  HTYPE = constants.HTYPE_INSTANCE
+  _OP_REQP = ["instance_name", "force"]
+  REQ_BGL = False
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
@@ -3989,32 +4356,14 @@ class LUReinstallInstance(LogicalUnit):
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name,
                                  errors.ECODE_INVAL)
-    if instance.admin_up:
-      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
-                                 self.op.instance_name,
-                                 errors.ECODE_STATE)
-    remote_info = self.rpc.call_instance_info(instance.primary_node,
-                                              instance.name,
-                                              instance.hypervisor)
-    remote_info.Raise("Error checking node %s" % instance.primary_node,
-                      prereq=True, ecode=errors.ECODE_ENVIRON)
-    if remote_info.payload:
-      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
-                                 (self.op.instance_name,
-                                  instance.primary_node),
-                                 errors.ECODE_STATE)
+    _CheckInstanceDown(self, instance, "cannot reinstall")
 
     self.op.os_type = getattr(self.op, "os_type", None)
     self.op.force_variant = getattr(self.op, "force_variant", False)
     if self.op.os_type is not None:
       # OS verification
       pnode = _ExpandNodeName(self.cfg, instance.primary_node)
-      result = self.rpc.call_os_get(pnode, self.op.os_type)
-      result.Raise("OS '%s' not in supported OS list for primary node %s" %
-                   (self.op.os_type, pnode),
-                   prereq=True, ecode=errors.ECODE_INVAL)
-      if not self.op.force_variant:
-        _CheckOSVariant(result.payload, self.op.os_type)
+      _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant)
 
     self.instance = instance
 
@@ -4089,18 +4438,7 @@ class LURecreateInstanceDisks(LogicalUnit):
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name, errors.ECODE_INVAL)
-    if instance.admin_up:
-      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
-                                 self.op.instance_name, errors.ECODE_STATE)
-    remote_info = self.rpc.call_instance_info(instance.primary_node,
-                                              instance.name,
-                                              instance.hypervisor)
-    remote_info.Raise("Error checking node %s" % instance.primary_node,
-                      prereq=True, ecode=errors.ECODE_ENVIRON)
-    if remote_info.payload:
-      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
-                                 (self.op.instance_name,
-                                  instance.primary_node), errors.ECODE_STATE)
+    _CheckInstanceDown(self, instance, "cannot recreate disks")
 
     if not self.op.disks:
       self.op.disks = range(len(instance.disks))
@@ -4155,19 +4493,7 @@ class LURenameInstance(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None
     _CheckNodeOnline(self, instance.primary_node)
-
-    if instance.admin_up:
-      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
-                                 self.op.instance_name, errors.ECODE_STATE)
-    remote_info = self.rpc.call_instance_info(instance.primary_node,
-                                              instance.name,
-                                              instance.hypervisor)
-    remote_info.Raise("Error checking node %s" % instance.primary_node,
-                      prereq=True, ecode=errors.ECODE_ENVIRON)
-    if remote_info.payload:
-      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
-                                 (self.op.instance_name,
-                                  instance.primary_node), errors.ECODE_STATE)
+    _CheckInstanceDown(self, instance, "cannot rename")
     self.instance = instance
 
     # new name verification
@@ -4294,18 +4620,29 @@ class LURemoveInstance(LogicalUnit):
                                  " node %s: %s" %
                                  (instance.name, instance.primary_node, msg))
 
-    logging.info("Removing block devices for instance %s", instance.name)
+    _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
 
-    if not _RemoveDisks(self, instance):
-      if self.op.ignore_failures:
-        feedback_fn("Warning: can't remove instance's disks")
-      else:
-        raise errors.OpExecError("Can't remove instance's disks")
 
-    logging.info("Removing instance %s out of cluster config", instance.name)
+def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
+  """Utility function to remove an instance.
+
+  """
+  logging.info("Removing block devices for instance %s", instance.name)
+
+  if not _RemoveDisks(lu, instance):
+    if not ignore_failures:
+      raise errors.OpExecError("Can't remove instance's disks")
+    feedback_fn("Warning: can't remove instance's disks")
+
+  logging.info("Removing instance %s out of cluster config", instance.name)
 
-    self.cfg.RemoveInstance(instance.name)
-    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
+  lu.cfg.RemoveInstance(instance.name)
+
+  assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \
+    "Instance lock removal conflict"
+
+  # Remove lock for the instance
+  lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
 
 
 class LUQueryInstances(NoHooksLU):
@@ -5519,6 +5856,8 @@ def _GenerateDiskTemplate(lu, template_name,
     if len(secondary_nodes) != 0:
       raise errors.ProgrammerError("Wrong template configuration")
 
+    _RequireFileStorage()
+
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
@@ -5687,7 +6026,7 @@ class LUCreateInstance(LogicalUnit):
   """
   HPATH = "instance-add"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_REQP = ["instance_name", "disks", "disk_template",
+  _OP_REQP = ["instance_name", "disks",
               "mode", "start",
               "wait_for_sync", "ip_check", "nics",
               "hvparams", "beparams"]
@@ -5697,35 +6036,50 @@ class LUCreateInstance(LogicalUnit):
     """Check arguments.
 
     """
+    # set optional parameters to none if they don't exist
+    for attr in ["pnode", "snode", "iallocator", "hypervisor",
+                 "disk_template", "identify_defaults"]:
+      if not hasattr(self.op, attr):
+        setattr(self.op, attr, None)
+
     # do not require name_check to ease forward/backward compatibility
     # for tools
     if not hasattr(self.op, "name_check"):
       self.op.name_check = True
+    if not hasattr(self.op, "no_install"):
+      self.op.no_install = False
+    if self.op.no_install and self.op.start:
+      self.LogInfo("No-installation mode selected, disabling startup")
+      self.op.start = False
     # validate/normalize the instance name
     self.op.instance_name = utils.HostInfo.NormalizeName(self.op.instance_name)
     if self.op.ip_check and not self.op.name_check:
       # TODO: make the ip check more flexible and not depend on the name check
       raise errors.OpPrereqError("Cannot do ip checks without a name check",
                                  errors.ECODE_INVAL)
-    if (self.op.disk_template == constants.DT_FILE and
-        not constants.ENABLE_FILE_STORAGE):
-      raise errors.OpPrereqError("File storage disabled at configure time",
+    # check disk information: either all adopt, or no adopt
+    has_adopt = has_no_adopt = False
+    for disk in self.op.disks:
+      if "adopt" in disk:
+        has_adopt = True
+      else:
+        has_no_adopt = True
+    if has_adopt and has_no_adopt:
+      raise errors.OpPrereqError("Either all disks are adopted or none is",
                                  errors.ECODE_INVAL)
+    if has_adopt:
+      if self.op.disk_template != constants.DT_PLAIN:
+        raise errors.OpPrereqError("Disk adoption is only supported for the"
+                                   " 'plain' disk template",
+                                   errors.ECODE_INVAL)
+      if self.op.iallocator is not None:
+        raise errors.OpPrereqError("Disk adoption not allowed with an"
+                                   " iallocator script", errors.ECODE_INVAL)
+      if self.op.mode == constants.INSTANCE_IMPORT:
+        raise errors.OpPrereqError("Disk adoption not allowed for"
+                                   " instance import", errors.ECODE_INVAL)
 
-  def ExpandNames(self):
-    """ExpandNames for CreateInstance.
-
-    Figure out the right locks for instance creation.
-
-    """
-    self.needed_locks = {}
-
-    # set optional parameters to none if they don't exist
-    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
-      if not hasattr(self.op, attr):
-        setattr(self.op, attr, None)
-
-    # cheap checks, mostly valid constants given
+    self.adopt_disks = has_adopt
 
     # verify creation mode
     if self.op.mode not in (constants.INSTANCE_CREATE,
@@ -5733,145 +6087,15 @@ class LUCreateInstance(LogicalUnit):
       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
                                  self.op.mode, errors.ECODE_INVAL)
 
-    # disk template and mirror node verification
-    if self.op.disk_template not in constants.DISK_TEMPLATES:
-      raise errors.OpPrereqError("Invalid disk template name",
-                                 errors.ECODE_INVAL)
-
-    if self.op.hypervisor is None:
-      self.op.hypervisor = self.cfg.GetHypervisorType()
-
-    cluster = self.cfg.GetClusterInfo()
-    enabled_hvs = cluster.enabled_hypervisors
-    if self.op.hypervisor not in enabled_hvs:
-      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
-                                 " cluster (%s)" % (self.op.hypervisor,
-                                  ",".join(enabled_hvs)),
-                                 errors.ECODE_STATE)
-
-    # check hypervisor parameter syntax (locally)
-    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
-    filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
-                                  self.op.hvparams)
-    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
-    hv_type.CheckParameterSyntax(filled_hvp)
-    self.hv_full = filled_hvp
-    # check that we don't specify global parameters on an instance
-    _CheckGlobalHvParams(self.op.hvparams)
-
-    # fill and remember the beparams dict
-    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
-    self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
-                                    self.op.beparams)
-
-    #### instance parameters check
-
     # instance name verification
     if self.op.name_check:
-      hostname1 = utils.GetHostInfo(self.op.instance_name)
-      self.op.instance_name = instance_name = hostname1.name
+      self.hostname1 = utils.GetHostInfo(self.op.instance_name)
+      self.op.instance_name = self.hostname1.name
       # used in CheckPrereq for ip ping check
-      self.check_ip = hostname1.ip
+      self.check_ip = self.hostname1.ip
     else:
-      instance_name = self.op.instance_name
       self.check_ip = None
 
-    # this is just a preventive check, but someone might still add this
-    # instance in the meantime, and creation will fail at lock-add time
-    if instance_name in self.cfg.GetInstanceList():
-      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
-                                 instance_name, errors.ECODE_EXISTS)
-
-    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
-
-    # NIC buildup
-    self.nics = []
-    for idx, nic in enumerate(self.op.nics):
-      nic_mode_req = nic.get("mode", None)
-      nic_mode = nic_mode_req
-      if nic_mode is None:
-        nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
-
-      # in routed mode, for the first nic, the default ip is 'auto'
-      if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
-        default_ip_mode = constants.VALUE_AUTO
-      else:
-        default_ip_mode = constants.VALUE_NONE
-
-      # ip validity checks
-      ip = nic.get("ip", default_ip_mode)
-      if ip is None or ip.lower() == constants.VALUE_NONE:
-        nic_ip = None
-      elif ip.lower() == constants.VALUE_AUTO:
-        if not self.op.name_check:
-          raise errors.OpPrereqError("IP address set to auto but name checks"
-                                     " have been skipped. Aborting.",
-                                     errors.ECODE_INVAL)
-        nic_ip = hostname1.ip
-      else:
-        if not utils.IsValidIP(ip):
-          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
-                                     " like a valid IP" % ip,
-                                     errors.ECODE_INVAL)
-        nic_ip = ip
-
-      # TODO: check the ip address for uniqueness
-      if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
-        raise errors.OpPrereqError("Routed nic mode requires an ip address",
-                                   errors.ECODE_INVAL)
-
-      # MAC address verification
-      mac = nic.get("mac", constants.VALUE_AUTO)
-      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
-        mac = utils.NormalizeAndValidateMac(mac)
-
-        try:
-          self.cfg.ReserveMAC(mac, self.proc.GetECId())
-        except errors.ReservationError:
-          raise errors.OpPrereqError("MAC address %s already in use"
-                                     " in cluster" % mac,
-                                     errors.ECODE_NOTUNIQUE)
-
-      # bridge verification
-      bridge = nic.get("bridge", None)
-      link = nic.get("link", None)
-      if bridge and link:
-        raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
-                                   " at the same time", errors.ECODE_INVAL)
-      elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
-        raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
-                                   errors.ECODE_INVAL)
-      elif bridge:
-        link = bridge
-
-      nicparams = {}
-      if nic_mode_req:
-        nicparams[constants.NIC_MODE] = nic_mode_req
-      if link:
-        nicparams[constants.NIC_LINK] = link
-
-      check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
-                                      nicparams)
-      objects.NIC.CheckParameterSyntax(check_params)
-      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
-
-    # disk checks/pre-build
-    self.disks = []
-    for disk in self.op.disks:
-      mode = disk.get("mode", constants.DISK_RDWR)
-      if mode not in constants.DISK_ACCESS_SET:
-        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
-                                   mode, errors.ECODE_INVAL)
-      size = disk.get("size", None)
-      if size is None:
-        raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
-      try:
-        size = int(size)
-      except (TypeError, ValueError):
-        raise errors.OpPrereqError("Invalid disk size '%s'" % size,
-                                   errors.ECODE_INVAL)
-      self.disks.append({"size": size, "mode": mode})
-
     # file storage checks
     if (self.op.file_driver and
         not self.op.file_driver in constants.FILE_DRIVER):
@@ -5888,6 +6112,41 @@ class LUCreateInstance(LogicalUnit):
                                  " node must be given",
                                  errors.ECODE_INVAL)
 
+    if self.op.mode == constants.INSTANCE_IMPORT:
+      # On import force_variant must be True, because if we forced it at
+      # initial install, our only chance when importing it back is that it
+      # works again!
+      self.op.force_variant = True
+
+      if self.op.no_install:
+        self.LogInfo("No-installation mode has no effect during import")
+
+    else: # INSTANCE_CREATE
+      if getattr(self.op, "os_type", None) is None:
+        raise errors.OpPrereqError("No guest OS specified",
+                                   errors.ECODE_INVAL)
+      self.op.force_variant = getattr(self.op, "force_variant", False)
+      if self.op.disk_template is None:
+        raise errors.OpPrereqError("No disk template specified",
+                                   errors.ECODE_INVAL)
+
+  def ExpandNames(self):
+    """ExpandNames for CreateInstance.
+
+    Figure out the right locks for instance creation.
+
+    """
+    self.needed_locks = {}
+
+    instance_name = self.op.instance_name
+    # this is just a preventive check, but someone might still add this
+    # instance in the meantime, and creation will fail at lock-add time
+    if instance_name in self.cfg.GetInstanceList():
+      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
+                                 instance_name, errors.ECODE_EXISTS)
+
+    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
+
     if self.op.iallocator:
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
     else:
@@ -5921,17 +6180,6 @@ class LUCreateInstance(LogicalUnit):
           self.op.src_path = src_path = \
             utils.PathJoin(constants.EXPORT_DIR, src_path)
 
-      # On import force_variant must be True, because if we forced it at
-      # initial install, our only chance when importing it back is that it
-      # works again!
-      self.op.force_variant = True
-
-    else: # INSTANCE_CREATE
-      if getattr(self.op, "os_type", None) is None:
-        raise errors.OpPrereqError("No guest OS specified",
-                                   errors.ECODE_INVAL)
-      self.op.force_variant = getattr(self.op, "force_variant", False)
-
   def _RunAllocator(self):
     """Run the allocator based on input opcode.
 
@@ -5972,82 +6220,309 @@ class LUCreateInstance(LogicalUnit):
   def BuildHooksEnv(self):
     """Build hooks env.
 
-    This runs on master, primary and secondary nodes of the instance.
+    This runs on master, primary and secondary nodes of the instance.
+
+    """
+    env = {
+      "ADD_MODE": self.op.mode,
+      }
+    if self.op.mode == constants.INSTANCE_IMPORT:
+      env["SRC_NODE"] = self.op.src_node
+      env["SRC_PATH"] = self.op.src_path
+      env["SRC_IMAGES"] = self.src_images
+
+    env.update(_BuildInstanceHookEnv(
+      name=self.op.instance_name,
+      primary_node=self.op.pnode,
+      secondary_nodes=self.secondaries,
+      status=self.op.start,
+      os_type=self.op.os_type,
+      memory=self.be_full[constants.BE_MEMORY],
+      vcpus=self.be_full[constants.BE_VCPUS],
+      nics=_NICListToTuple(self, self.nics),
+      disk_template=self.op.disk_template,
+      disks=[(d["size"], d["mode"]) for d in self.disks],
+      bep=self.be_full,
+      hvp=self.hv_full,
+      hypervisor_name=self.op.hypervisor,
+    ))
+
+    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
+          self.secondaries)
+    return env, nl, nl
+
+  def _ReadExportInfo(self):
+    """Reads the export information from disk.
+
+    It will override the opcode source node and path with the actual
+    information, if these two were not specified before.
+
+    @return: the export information
+
+    """
+    assert self.op.mode == constants.INSTANCE_IMPORT
+
+    src_node = self.op.src_node
+    src_path = self.op.src_path
+
+    if src_node is None:
+      locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+      exp_list = self.rpc.call_export_list(locked_nodes)
+      found = False
+      for node in exp_list:
+        if exp_list[node].fail_msg:
+          continue
+        if src_path in exp_list[node].payload:
+          found = True
+          self.op.src_node = src_node = node
+          self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
+                                                       src_path)
+          break
+      if not found:
+        raise errors.OpPrereqError("No export found for relative path %s" %
+                                    src_path, errors.ECODE_INVAL)
+
+    _CheckNodeOnline(self, src_node)
+    result = self.rpc.call_export_info(src_node, src_path)
+    result.Raise("No export or invalid export found in dir %s" % src_path)
+
+    export_info = objects.SerializableConfigParser.Loads(str(result.payload))
+    if not export_info.has_section(constants.INISECT_EXP):
+      raise errors.ProgrammerError("Corrupted export config",
+                                   errors.ECODE_ENVIRON)
+
+    ei_version = export_info.get(constants.INISECT_EXP, "version")
+    if (int(ei_version) != constants.EXPORT_VERSION):
+      raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
+                                 (ei_version, constants.EXPORT_VERSION),
+                                 errors.ECODE_ENVIRON)
+    return export_info
+
+  def _ReadExportParams(self, einfo):
+    """Use export parameters as defaults.
+
+    In case the opcode doesn't specify (as in override) some instance
+    parameters, then try to use them from the export information, if
+    that declares them.
+
+    """
+    self.op.os_type = einfo.get(constants.INISECT_EXP, "os")
+
+    if self.op.disk_template is None:
+      if einfo.has_option(constants.INISECT_INS, "disk_template"):
+        self.op.disk_template = einfo.get(constants.INISECT_INS,
+                                          "disk_template")
+      else:
+        raise errors.OpPrereqError("No disk template specified and the export"
+                                   " is missing the disk_template information",
+                                   errors.ECODE_INVAL)
+
+    if not self.op.disks:
+      if einfo.has_option(constants.INISECT_INS, "disk_count"):
+        disks = []
+        # TODO: import the disk iv_name too
+        for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")):
+          disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
+          disks.append({"size": disk_sz})
+        self.op.disks = disks
+      else:
+        raise errors.OpPrereqError("No disk info specified and the export"
+                                   " is missing the disk information",
+                                   errors.ECODE_INVAL)
+
+    if (not self.op.nics and
+        einfo.has_option(constants.INISECT_INS, "nic_count")):
+      nics = []
+      for idx in range(einfo.getint(constants.INISECT_INS, "nic_count")):
+        ndict = {}
+        for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
+          v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
+          ndict[name] = v
+        nics.append(ndict)
+      self.op.nics = nics
+
+    if (self.op.hypervisor is None and
+        einfo.has_option(constants.INISECT_INS, "hypervisor")):
+      self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor")
+    if einfo.has_section(constants.INISECT_HYP):
+      # use the export parameters but do not override the ones
+      # specified by the user
+      for name, value in einfo.items(constants.INISECT_HYP):
+        if name not in self.op.hvparams:
+          self.op.hvparams[name] = value
+
+    if einfo.has_section(constants.INISECT_BEP):
+      # use the parameters, without overriding
+      for name, value in einfo.items(constants.INISECT_BEP):
+        if name not in self.op.beparams:
+          self.op.beparams[name] = value
+    else:
+      # try to read the parameters old style, from the main section
+      for name in constants.BES_PARAMETERS:
+        if (name not in self.op.beparams and
+            einfo.has_option(constants.INISECT_INS, name)):
+          self.op.beparams[name] = einfo.get(constants.INISECT_INS, name)
+
+  def _RevertToDefaults(self, cluster):
+    """Revert the instance parameters to the default values.
+
+    """
+    # hvparams
+    hv_defs = cluster.GetHVDefaults(self.op.hypervisor, self.op.os_type)
+    for name in self.op.hvparams.keys():
+      if name in hv_defs and hv_defs[name] == self.op.hvparams[name]:
+        del self.op.hvparams[name]
+    # beparams
+    be_defs = cluster.beparams.get(constants.PP_DEFAULT, {})
+    for name in self.op.beparams.keys():
+      if name in be_defs and be_defs[name] == self.op.beparams[name]:
+        del self.op.beparams[name]
+    # nic params
+    nic_defs = cluster.nicparams.get(constants.PP_DEFAULT, {})
+    for nic in self.op.nics:
+      for name in constants.NICS_PARAMETERS:
+        if name in nic and name in nic_defs and nic[name] == nic_defs[name]:
+          del nic[name]
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    if self.op.mode == constants.INSTANCE_IMPORT:
+      export_info = self._ReadExportInfo()
+      self._ReadExportParams(export_info)
+
+    _CheckDiskTemplate(self.op.disk_template)
+
+    if (not self.cfg.GetVGName() and
+        self.op.disk_template not in constants.DTS_NOT_LVM):
+      raise errors.OpPrereqError("Cluster does not support lvm-based"
+                                 " instances", errors.ECODE_STATE)
+
+    if self.op.hypervisor is None:
+      self.op.hypervisor = self.cfg.GetHypervisorType()
+
+    cluster = self.cfg.GetClusterInfo()
+    enabled_hvs = cluster.enabled_hypervisors
+    if self.op.hypervisor not in enabled_hvs:
+      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
+                                 " cluster (%s)" % (self.op.hypervisor,
+                                  ",".join(enabled_hvs)),
+                                 errors.ECODE_STATE)
+
+    # check hypervisor parameter syntax (locally)
+    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
+    filled_hvp = objects.FillDict(cluster.GetHVDefaults(self.op.hypervisor,
+                                                        self.op.os_type),
+                                  self.op.hvparams)
+    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
+    hv_type.CheckParameterSyntax(filled_hvp)
+    self.hv_full = filled_hvp
+    # check that we don't specify global parameters on an instance
+    _CheckGlobalHvParams(self.op.hvparams)
+
+    # fill and remember the beparams dict
+    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
+    self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
+                                    self.op.beparams)
+
+    # now that hvp/bep are in final format, let's reset to defaults,
+    # if told to do so
+    if self.op.identify_defaults:
+      self._RevertToDefaults(cluster)
+
+    # NIC buildup
+    self.nics = []
+    for idx, nic in enumerate(self.op.nics):
+      nic_mode_req = nic.get("mode", None)
+      nic_mode = nic_mode_req
+      if nic_mode is None:
+        nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
+
+      # in routed mode, for the first nic, the default ip is 'auto'
+      if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
+        default_ip_mode = constants.VALUE_AUTO
+      else:
+        default_ip_mode = constants.VALUE_NONE
+
+      # ip validity checks
+      ip = nic.get("ip", default_ip_mode)
+      if ip is None or ip.lower() == constants.VALUE_NONE:
+        nic_ip = None
+      elif ip.lower() == constants.VALUE_AUTO:
+        if not self.op.name_check:
+          raise errors.OpPrereqError("IP address set to auto but name checks"
+                                     " have been skipped. Aborting.",
+                                     errors.ECODE_INVAL)
+        nic_ip = self.hostname1.ip
+      else:
+        if not utils.IsValidIP(ip):
+          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
+                                     " like a valid IP" % ip,
+                                     errors.ECODE_INVAL)
+        nic_ip = ip
+
+      # TODO: check the ip address for uniqueness
+      if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
+        raise errors.OpPrereqError("Routed nic mode requires an ip address",
+                                   errors.ECODE_INVAL)
 
-    """
-    env = {
-      "ADD_MODE": self.op.mode,
-      }
-    if self.op.mode == constants.INSTANCE_IMPORT:
-      env["SRC_NODE"] = self.op.src_node
-      env["SRC_PATH"] = self.op.src_path
-      env["SRC_IMAGES"] = self.src_images
+      # MAC address verification
+      mac = nic.get("mac", constants.VALUE_AUTO)
+      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+        mac = utils.NormalizeAndValidateMac(mac)
 
-    env.update(_BuildInstanceHookEnv(
-      name=self.op.instance_name,
-      primary_node=self.op.pnode,
-      secondary_nodes=self.secondaries,
-      status=self.op.start,
-      os_type=self.op.os_type,
-      memory=self.be_full[constants.BE_MEMORY],
-      vcpus=self.be_full[constants.BE_VCPUS],
-      nics=_NICListToTuple(self, self.nics),
-      disk_template=self.op.disk_template,
-      disks=[(d["size"], d["mode"]) for d in self.disks],
-      bep=self.be_full,
-      hvp=self.hv_full,
-      hypervisor_name=self.op.hypervisor,
-    ))
+        try:
+          self.cfg.ReserveMAC(mac, self.proc.GetECId())
+        except errors.ReservationError:
+          raise errors.OpPrereqError("MAC address %s already in use"
+                                     " in cluster" % mac,
+                                     errors.ECODE_NOTUNIQUE)
 
-    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
-          self.secondaries)
-    return env, nl, nl
+      # bridge verification
+      bridge = nic.get("bridge", None)
+      link = nic.get("link", None)
+      if bridge and link:
+        raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
+                                   " at the same time", errors.ECODE_INVAL)
+      elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
+        raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
+                                   errors.ECODE_INVAL)
+      elif bridge:
+        link = bridge
 
+      nicparams = {}
+      if nic_mode_req:
+        nicparams[constants.NIC_MODE] = nic_mode_req
+      if link:
+        nicparams[constants.NIC_LINK] = link
 
-  def CheckPrereq(self):
-    """Check prerequisites.
+      check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
+                                      nicparams)
+      objects.NIC.CheckParameterSyntax(check_params)
+      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
 
-    """
-    if (not self.cfg.GetVGName() and
-        self.op.disk_template not in constants.DTS_NOT_LVM):
-      raise errors.OpPrereqError("Cluster does not support lvm-based"
-                                 " instances", errors.ECODE_STATE)
+    # disk checks/pre-build
+    self.disks = []
+    for disk in self.op.disks:
+      mode = disk.get("mode", constants.DISK_RDWR)
+      if mode not in constants.DISK_ACCESS_SET:
+        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
+                                   mode, errors.ECODE_INVAL)
+      size = disk.get("size", None)
+      if size is None:
+        raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
+      try:
+        size = int(size)
+      except (TypeError, ValueError):
+        raise errors.OpPrereqError("Invalid disk size '%s'" % size,
+                                   errors.ECODE_INVAL)
+      new_disk = {"size": size, "mode": mode}
+      if "adopt" in disk:
+        new_disk["adopt"] = disk["adopt"]
+      self.disks.append(new_disk)
 
     if self.op.mode == constants.INSTANCE_IMPORT:
-      src_node = self.op.src_node
-      src_path = self.op.src_path
-
-      if src_node is None:
-        locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
-        exp_list = self.rpc.call_export_list(locked_nodes)
-        found = False
-        for node in exp_list:
-          if exp_list[node].fail_msg:
-            continue
-          if src_path in exp_list[node].payload:
-            found = True
-            self.op.src_node = src_node = node
-            self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
-                                                         src_path)
-            break
-        if not found:
-          raise errors.OpPrereqError("No export found for relative path %s" %
-                                      src_path, errors.ECODE_INVAL)
-
-      _CheckNodeOnline(self, src_node)
-      result = self.rpc.call_export_info(src_node, src_path)
-      result.Raise("No export or invalid export found in dir %s" % src_path)
-
-      export_info = objects.SerializableConfigParser.Loads(str(result.payload))
-      if not export_info.has_section(constants.INISECT_EXP):
-        raise errors.ProgrammerError("Corrupted export config",
-                                     errors.ECODE_ENVIRON)
-
-      ei_version = export_info.get(constants.INISECT_EXP, 'version')
-      if (int(ei_version) != constants.EXPORT_VERSION):
-        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
-                                   (ei_version, constants.EXPORT_VERSION),
-                                   errors.ECODE_ENVIRON)
 
       # Check that the new instance doesn't have less disks than the export
       instance_disks = len(self.disks)
@@ -6058,14 +6533,13 @@ class LUCreateInstance(LogicalUnit):
                                    (instance_disks, export_disks),
                                    errors.ECODE_INVAL)
 
-      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
       disk_images = []
       for idx in range(export_disks):
         option = 'disk%d_dump' % idx
         if export_info.has_option(constants.INISECT_INS, option):
           # FIXME: are the old os-es, disk sizes, etc. useful?
           export_name = export_info.get(constants.INISECT_INS, option)
-          image = utils.PathJoin(src_path, export_name)
+          image = utils.PathJoin(self.op.src_path, export_name)
           disk_images.append(image)
         else:
           disk_images.append(False)
@@ -6073,8 +6547,12 @@ class LUCreateInstance(LogicalUnit):
       self.src_images = disk_images
 
       old_name = export_info.get(constants.INISECT_INS, 'name')
-      # FIXME: int() here could throw a ValueError on broken exports
-      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
+      try:
+        exp_nic_count = export_info.getint(constants.INISECT_INS, 'nic_count')
+      except (TypeError, ValueError), err:
+        raise errors.OpPrereqError("Invalid export file, nic_count is not"
+                                   " an integer: %s" % str(err),
+                                   errors.ECODE_STATE)
       if self.op.instance_name == old_name:
         for idx, nic in enumerate(self.nics):
           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
@@ -6139,33 +6617,43 @@ class LUCreateInstance(LogicalUnit):
     req_size = _ComputeDiskSize(self.op.disk_template,
                                 self.disks)
 
-    # Check lv size requirements
-    if req_size is not None:
-      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
-                                         self.op.hypervisor)
-      for node in nodenames:
-        info = nodeinfo[node]
-        info.Raise("Cannot get current information from node %s" % node)
-        info = info.payload
-        vg_free = info.get('vg_free', None)
-        if not isinstance(vg_free, int):
-          raise errors.OpPrereqError("Can't compute free disk space on"
-                                     " node %s" % node, errors.ECODE_ENVIRON)
-        if req_size > vg_free:
-          raise errors.OpPrereqError("Not enough disk space on target node %s."
-                                     " %d MB available, %d MB required" %
-                                     (node, vg_free, req_size),
-                                     errors.ECODE_NORES)
+    # Check lv size requirements, if not adopting
+    if req_size is not None and not self.adopt_disks:
+      _CheckNodesFreeDisk(self, nodenames, req_size)
+
+    if self.adopt_disks: # instead, we must check the adoption data
+      all_lvs = set([i["adopt"] for i in self.disks])
+      if len(all_lvs) != len(self.disks):
+        raise errors.OpPrereqError("Duplicate volume names given for adoption",
+                                   errors.ECODE_INVAL)
+      for lv_name in all_lvs:
+        try:
+          self.cfg.ReserveLV(lv_name, self.proc.GetECId())
+        except errors.ReservationError:
+          raise errors.OpPrereqError("LV named %s used by another instance" %
+                                     lv_name, errors.ECODE_NOTUNIQUE)
+
+      node_lvs = self.rpc.call_lv_list([pnode.name],
+                                       self.cfg.GetVGName())[pnode.name]
+      node_lvs.Raise("Cannot get LV information from node %s" % pnode.name)
+      node_lvs = node_lvs.payload
+      delta = all_lvs.difference(node_lvs.keys())
+      if delta:
+        raise errors.OpPrereqError("Missing logical volume(s): %s" %
+                                   utils.CommaJoin(delta),
+                                   errors.ECODE_INVAL)
+      online_lvs = [lv for lv in all_lvs if node_lvs[lv][2]]
+      if online_lvs:
+        raise errors.OpPrereqError("Online logical volumes found, cannot"
+                                   " adopt: %s" % utils.CommaJoin(online_lvs),
+                                   errors.ECODE_STATE)
+      # update the size of disk based on what is found
+      for dsk in self.disks:
+        dsk["size"] = int(float(node_lvs[dsk["adopt"]][0]))
 
     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
 
-    # os verification
-    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
-    result.Raise("OS '%s' not in supported os list for primary node %s" %
-                 (self.op.os_type, pnode.name),
-                 prereq=True, ecode=errors.ECODE_INVAL)
-    if not self.op.force_variant:
-      _CheckOSVariant(result.payload, self.op.os_type)
+    _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant)
 
     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
 
@@ -6191,19 +6679,18 @@ class LUCreateInstance(LogicalUnit):
     else:
       network_port = None
 
-    ##if self.op.vnc_bind_address is None:
-    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
+    if constants.ENABLE_FILE_STORAGE:
+      # this is needed because os.path.join does not accept None arguments
+      if self.op.file_storage_dir is None:
+        string_file_storage_dir = ""
+      else:
+        string_file_storage_dir = self.op.file_storage_dir
 
-    # this is needed because os.path.join does not accept None arguments
-    if self.op.file_storage_dir is None:
-      string_file_storage_dir = ""
+      # build the full file storage dir path
+      file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
+                                        string_file_storage_dir, instance)
     else:
-      string_file_storage_dir = self.op.file_storage_dir
-
-    # build the full file storage dir path
-    file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
-                                      string_file_storage_dir, instance)
-
+      file_storage_dir = ""
 
     disks = _GenerateDiskTemplate(self,
                                   self.op.disk_template,
@@ -6225,16 +6712,29 @@ class LUCreateInstance(LogicalUnit):
                             hypervisor=self.op.hypervisor,
                             )
 
-    feedback_fn("* creating instance disks...")
-    try:
-      _CreateDisks(self, iobj)
-    except errors.OpExecError:
-      self.LogWarning("Device creation failed, reverting...")
+    if self.adopt_disks:
+      # rename LVs to the newly-generated names; we need to construct
+      # 'fake' LV disks with the old data, plus the new unique_id
+      tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
+      rename_to = []
+      for t_dsk, a_dsk in zip (tmp_disks, self.disks):
+        rename_to.append(t_dsk.logical_id)
+        t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk["adopt"])
+        self.cfg.SetDiskID(t_dsk, pnode_name)
+      result = self.rpc.call_blockdev_rename(pnode_name,
+                                             zip(tmp_disks, rename_to))
+      result.Raise("Failed to rename adoped LVs")
+    else:
+      feedback_fn("* creating instance disks...")
       try:
-        _RemoveDisks(self, iobj)
-      finally:
-        self.cfg.ReleaseDRBDMinors(instance)
-        raise
+        _CreateDisks(self, iobj)
+      except errors.OpExecError:
+        self.LogWarning("Device creation failed, reverting...")
+        try:
+          _RemoveDisks(self, iobj)
+        finally:
+          self.cfg.ReleaseDRBDMinors(instance)
+          raise
 
     feedback_fn("adding instance %s to cluster config" % instance)
 
@@ -6272,32 +6772,42 @@ class LUCreateInstance(LogicalUnit):
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance")
 
-    feedback_fn("creating os for instance %s on node %s" %
-                (instance, pnode_name))
-
-    if iobj.disk_template != constants.DT_DISKLESS:
+    if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
       if self.op.mode == constants.INSTANCE_CREATE:
-        feedback_fn("* running the instance OS create scripts...")
-        # FIXME: pass debug option from opcode to backend
-        result = self.rpc.call_instance_os_add(pnode_name, iobj, False,
-                                               self.op.debug_level)
-        result.Raise("Could not add os for instance %s"
-                     " on node %s" % (instance, pnode_name))
+        if not self.op.no_install:
+          feedback_fn("* running the instance OS create scripts...")
+          # FIXME: pass debug option from opcode to backend
+          result = self.rpc.call_instance_os_add(pnode_name, iobj, False,
+                                                 self.op.debug_level)
+          result.Raise("Could not add os for instance %s"
+                       " on node %s" % (instance, pnode_name))
 
       elif self.op.mode == constants.INSTANCE_IMPORT:
         feedback_fn("* running the instance OS import scripts...")
-        src_node = self.op.src_node
-        src_images = self.src_images
-        cluster_name = self.cfg.GetClusterName()
-        # FIXME: pass debug option from opcode to backend
-        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
-                                                         src_node, src_images,
-                                                         cluster_name,
-                                                         self.op.debug_level)
-        msg = import_result.fail_msg
-        if msg:
-          self.LogWarning("Error while importing the disk images for instance"
-                          " %s on node %s: %s" % (instance, pnode_name, msg))
+
+        transfers = []
+
+        for idx, image in enumerate(self.src_images):
+          if not image:
+            continue
+
+          # FIXME: pass debug option from opcode to backend
+          dt = masterd.instance.DiskTransfer("disk/%s" % idx,
+                                             constants.IEIO_FILE, (image, ),
+                                             constants.IEIO_SCRIPT,
+                                             (iobj.disks[idx], idx),
+                                             None)
+          transfers.append(dt)
+
+        import_result = \
+          masterd.instance.TransferInstanceData(self, feedback_fn,
+                                                self.op.src_node, pnode_name,
+                                                self.pnode.secondary_ip,
+                                                iobj, transfers)
+        if not compat.all(import_result):
+          self.LogWarning("Some disks for instance %s on node %s were not"
+                          " imported successfully" % (instance, pnode_name))
+
       else:
         # also checked in the prereq part
         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
@@ -7211,6 +7721,8 @@ class LURepairNodeStorage(NoHooksLU):
   def CheckArguments(self):
     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
 
+    _CheckStorageType(self.op.storage_type)
+
   def ExpandNames(self):
     self.needed_locks = {
       locking.LEVEL_NODE: [self.op.node_name],
@@ -7366,26 +7878,16 @@ class LUGrowDisk(LogicalUnit):
 
     self.instance = instance
 
-    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
+    if instance.disk_template not in constants.DTS_GROWABLE:
       raise errors.OpPrereqError("Instance's disk layout does not support"
                                  " growing.", errors.ECODE_INVAL)
 
     self.disk = instance.FindDisk(self.op.disk)
 
-    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
-                                       instance.hypervisor)
-    for node in nodenames:
-      info = nodeinfo[node]
-      info.Raise("Cannot get current information from node %s" % node)
-      vg_free = info.payload.get('vg_free', None)
-      if not isinstance(vg_free, int):
-        raise errors.OpPrereqError("Can't compute free disk space on"
-                                   " node %s" % node, errors.ECODE_ENVIRON)
-      if self.op.amount > vg_free:
-        raise errors.OpPrereqError("Not enough disk space on target node %s:"
-                                   " %d MiB available, %d MiB required" %
-                                   (node, vg_free, self.op.amount),
-                                   errors.ECODE_NORES)
+    if instance.disk_template != constants.DT_FILE:
+      # TODO: check the free disk space for file, when that feature will be
+      # supported
+      _CheckNodesFreeDisk(self, nodenames, self.op.amount)
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
@@ -7589,9 +8091,17 @@ class LUSetInstanceParams(LogicalUnit):
       self.op.beparams = {}
     if not hasattr(self.op, 'hvparams'):
       self.op.hvparams = {}
+    if not hasattr(self.op, "disk_template"):
+      self.op.disk_template = None
+    if not hasattr(self.op, "remote_node"):
+      self.op.remote_node = None
+    if not hasattr(self.op, "os_name"):
+      self.op.os_name = None
+    if not hasattr(self.op, "force_variant"):
+      self.op.force_variant = False
     self.op.force = getattr(self.op, "force", False)
-    if not (self.op.nics or self.op.disks or
-            self.op.hvparams or self.op.beparams):
+    if not (self.op.nics or self.op.disks or self.op.disk_template or
+            self.op.hvparams or self.op.beparams or self.op.os_name):
       raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
 
     if self.op.hvparams:
@@ -7637,6 +8147,19 @@ class LUSetInstanceParams(LogicalUnit):
       raise errors.OpPrereqError("Only one disk add or remove operation"
                                  " supported at a time", errors.ECODE_INVAL)
 
+    if self.op.disks and self.op.disk_template is not None:
+      raise errors.OpPrereqError("Disk template conversion and other disk"
+                                 " changes not supported at the same time",
+                                 errors.ECODE_INVAL)
+
+    if self.op.disk_template:
+      _CheckDiskTemplate(self.op.disk_template)
+      if (self.op.disk_template in constants.DTS_NET_MIRROR and
+          self.op.remote_node is None):
+        raise errors.OpPrereqError("Changing the disk template to a mirrored"
+                                   " one requires specifying a secondary node",
+                                   errors.ECODE_INVAL)
+
     # NIC validation
     nic_addremove = 0
     for nic_op, nic_dict in self.op.nics:
@@ -7699,6 +8222,9 @@ class LUSetInstanceParams(LogicalUnit):
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
+      if self.op.disk_template and self.op.remote_node:
+        self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+        self.needed_locks[locking.LEVEL_NODE].append(self.op.remote_node)
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -7748,6 +8274,8 @@ class LUSetInstanceParams(LogicalUnit):
         del args['nics'][-1]
 
     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
+    if self.op.disk_template:
+      env["NEW_DISK_TEMPLATE"] = self.op.disk_template
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
@@ -7801,6 +8329,25 @@ class LUSetInstanceParams(LogicalUnit):
     pnode = instance.primary_node
     nodelist = list(instance.all_nodes)
 
+    if self.op.disk_template:
+      if instance.disk_template == self.op.disk_template:
+        raise errors.OpPrereqError("Instance already has disk template %s" %
+                                   instance.disk_template, errors.ECODE_INVAL)
+
+      if (instance.disk_template,
+          self.op.disk_template) not in self._DISK_CONVERSIONS:
+        raise errors.OpPrereqError("Unsupported disk template conversion from"
+                                   " %s to %s" % (instance.disk_template,
+                                                  self.op.disk_template),
+                                   errors.ECODE_INVAL)
+      if self.op.disk_template in constants.DTS_NET_MIRROR:
+        _CheckNodeOnline(self, self.op.remote_node)
+        _CheckNodeNotDrained(self, self.op.remote_node)
+        disks = [{"size": d.size} for d in instance.disks]
+        required = _ComputeDiskSize(self.op.disk_template, disks)
+        _CheckNodesFreeDisk(self, [self.op.remote_node], required)
+        _CheckInstanceDown(self, instance, "cannot change disk template")
+
     # hvparams processing
     if self.op.hvparams:
       i_hvdict, hv_new = self._GetUpdatedParams(
@@ -7966,17 +8513,8 @@ class LUSetInstanceParams(LogicalUnit):
       if disk_op == constants.DDM_REMOVE:
         if len(instance.disks) == 1:
           raise errors.OpPrereqError("Cannot remove the last disk of"
-                                     " an instance",
-                                     errors.ECODE_INVAL)
-        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
-        ins_l = ins_l[pnode]
-        msg = ins_l.fail_msg
-        if msg:
-          raise errors.OpPrereqError("Can't contact node %s: %s" %
-                                     (pnode, msg), errors.ECODE_ENVIRON)
-        if instance.name in ins_l.payload:
-          raise errors.OpPrereqError("Instance is running, can't remove"
-                                     " disks.", errors.ECODE_STATE)
+                                     " an instance", errors.ECODE_INVAL)
+        _CheckInstanceDown(self, instance, "cannot remove disks")
 
       if (disk_op == constants.DDM_ADD and
           len(instance.nics) >= constants.MAX_DISKS):
@@ -7991,8 +8529,103 @@ class LUSetInstanceParams(LogicalUnit):
                                      (disk_op, len(instance.disks)),
                                      errors.ECODE_INVAL)
 
+    # OS change
+    if self.op.os_name and not self.op.force:
+      _CheckNodeHasOS(self, instance.primary_node, self.op.os_name,
+                      self.op.force_variant)
+
     return
 
+  def _ConvertPlainToDrbd(self, feedback_fn):
+    """Converts an instance from plain to drbd.
+
+    """
+    feedback_fn("Converting template to drbd")
+    instance = self.instance
+    pnode = instance.primary_node
+    snode = self.op.remote_node
+
+    # create a fake disk info for _GenerateDiskTemplate
+    disk_info = [{"size": d.size, "mode": d.mode} for d in instance.disks]
+    new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
+                                      instance.name, pnode, [snode],
+                                      disk_info, None, None, 0)
+    info = _GetInstanceInfoText(instance)
+    feedback_fn("Creating aditional volumes...")
+    # first, create the missing data and meta devices
+    for disk in new_disks:
+      # unfortunately this is... not too nice
+      _CreateSingleBlockDev(self, pnode, instance, disk.children[1],
+                            info, True)
+      for child in disk.children:
+        _CreateSingleBlockDev(self, snode, instance, child, info, True)
+    # at this stage, all new LVs have been created, we can rename the
+    # old ones
+    feedback_fn("Renaming original volumes...")
+    rename_list = [(o, n.children[0].logical_id)
+                   for (o, n) in zip(instance.disks, new_disks)]
+    result = self.rpc.call_blockdev_rename(pnode, rename_list)
+    result.Raise("Failed to rename original LVs")
+
+    feedback_fn("Initializing DRBD devices...")
+    # all child devices are in place, we can now create the DRBD devices
+    for disk in new_disks:
+      for node in [pnode, snode]:
+        f_create = node == pnode
+        _CreateSingleBlockDev(self, node, instance, disk, info, f_create)
+
+    # at this point, the instance has been modified
+    instance.disk_template = constants.DT_DRBD8
+    instance.disks = new_disks
+    self.cfg.Update(instance, feedback_fn)
+
+    # disks are created, waiting for sync
+    disk_abort = not _WaitForSync(self, instance)
+    if disk_abort:
+      raise errors.OpExecError("There are some degraded disks for"
+                               " this instance, please cleanup manually")
+
+  def _ConvertDrbdToPlain(self, feedback_fn):
+    """Converts an instance from drbd to plain.
+
+    """
+    instance = self.instance
+    assert len(instance.secondary_nodes) == 1
+    pnode = instance.primary_node
+    snode = instance.secondary_nodes[0]
+    feedback_fn("Converting template to plain")
+
+    old_disks = instance.disks
+    new_disks = [d.children[0] for d in old_disks]
+
+    # copy over size and mode
+    for parent, child in zip(old_disks, new_disks):
+      child.size = parent.size
+      child.mode = parent.mode
+
+    # update instance structure
+    instance.disks = new_disks
+    instance.disk_template = constants.DT_PLAIN
+    self.cfg.Update(instance, feedback_fn)
+
+    feedback_fn("Removing volumes on the secondary node...")
+    for disk in old_disks:
+      self.cfg.SetDiskID(disk, snode)
+      msg = self.rpc.call_blockdev_remove(snode, disk).fail_msg
+      if msg:
+        self.LogWarning("Could not remove block device %s on node %s,"
+                        " continuing anyway: %s", disk.iv_name, snode, msg)
+
+    feedback_fn("Removing unneeded volumes on the primary node...")
+    for idx, disk in enumerate(old_disks):
+      meta = disk.children[1]
+      self.cfg.SetDiskID(meta, pnode)
+      msg = self.rpc.call_blockdev_remove(pnode, meta).fail_msg
+      if msg:
+        self.LogWarning("Could not remove metadata for disk %d on node %s,"
+                        " continuing anyway: %s", idx, pnode, msg)
+
+
   def Exec(self, feedback_fn):
     """Modifies an instance.
 
@@ -8057,6 +8690,20 @@ class LUSetInstanceParams(LogicalUnit):
         # change a given disk
         instance.disks[disk_op].mode = disk_dict['mode']
         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
+
+    if self.op.disk_template:
+      r_shut = _ShutdownInstanceDisks(self, instance)
+      if not r_shut:
+        raise errors.OpExecError("Cannot shutdow instance disks, unable to"
+                                 " proceed with disk template conversion")
+      mode = (instance.disk_template, self.op.disk_template)
+      try:
+        self._DISK_CONVERSIONS[mode](self, feedback_fn)
+      except:
+        self.cfg.ReleaseDRBDMinors(instance.name)
+        raise
+      result.append(("disk_template", self.op.disk_template))
+
     # NIC changes
     for nic_op, nic_dict in self.op.nics:
       if nic_op == constants.DDM_REMOVE:
@@ -8097,10 +8744,18 @@ class LUSetInstanceParams(LogicalUnit):
       for key, val in self.op.beparams.iteritems():
         result.append(("be/%s" % key, val))
 
+    # OS change
+    if self.op.os_name:
+      instance.os = self.op.os_name
+
     self.cfg.Update(instance, feedback_fn)
 
     return result
 
+  _DISK_CONVERSIONS = {
+    (constants.DT_PLAIN, constants.DT_DRBD8): _ConvertPlainToDrbd,
+    (constants.DT_DRBD8, constants.DT_PLAIN): _ConvertDrbdToPlain,
+    }
 
 class LUQueryExports(NoHooksLU):
   """Query the exports list
@@ -8157,11 +8812,22 @@ class LUExportInstance(LogicalUnit):
     """Check the arguments.
 
     """
+    _CheckBooleanOpField(self.op, "remove_instance")
+    _CheckBooleanOpField(self.op, "ignore_remove_failures")
+
     self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
                                     constants.DEFAULT_SHUTDOWN_TIMEOUT)
+    self.remove_instance = getattr(self.op, "remove_instance", False)
+    self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures",
+                                          False)
+
+    if self.remove_instance and not self.op.shutdown:
+      raise errors.OpPrereqError("Can not remove instance without shutting it"
+                                 " down before")
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+
     # FIXME: lock only instance primary and destination node
     #
     # Sad but true, for now we have do lock all nodes, as we don't know where
@@ -8186,6 +8852,8 @@ class LUExportInstance(LogicalUnit):
       "EXPORT_NODE": self.op.target_node,
       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
       "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
+      # TODO: Generic function for boolean env variables
+      "REMOVE_INSTANCE": str(bool(self.remove_instance)),
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
@@ -8212,11 +8880,100 @@ class LUExportInstance(LogicalUnit):
     _CheckNodeNotDrained(self, self.dst_node.name)
 
     # instance disk type verification
+    # TODO: Implement export support for file-based disks
     for disk in self.instance.disks:
       if disk.dev_type == constants.LD_FILE:
         raise errors.OpPrereqError("Export not supported for instances with"
                                    " file-based disks", errors.ECODE_INVAL)
 
+  def _CreateSnapshots(self, feedback_fn):
+    """Creates an LVM snapshot for every disk of the instance.
+
+    @return: List of snapshots as L{objects.Disk} instances
+
+    """
+    instance = self.instance
+    src_node = instance.primary_node
+
+    vgname = self.cfg.GetVGName()
+
+    snap_disks = []
+
+    for idx, disk in enumerate(instance.disks):
+      feedback_fn("Creating a snapshot of disk/%s on node %s" %
+                  (idx, src_node))
+
+      # result.payload will be a snapshot of an lvm leaf of the one we
+      # passed
+      result = self.rpc.call_blockdev_snapshot(src_node, disk)
+      msg = result.fail_msg
+      if msg:
+        self.LogWarning("Could not snapshot disk/%s on node %s: %s",
+                        idx, src_node, msg)
+        snap_disks.append(False)
+      else:
+        disk_id = (vgname, result.payload)
+        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
+                               logical_id=disk_id, physical_id=disk_id,
+                               iv_name=disk.iv_name)
+        snap_disks.append(new_dev)
+
+    return snap_disks
+
+  def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index):
+    """Removes an LVM snapshot.
+
+    @type snap_disks: list
+    @param snap_disks: The list of all snapshots as returned by
+                       L{_CreateSnapshots}
+    @type disk_index: number
+    @param disk_index: Index of the snapshot to be removed
+    @rtype: bool
+    @return: Whether removal was successful or not
+
+    """
+    disk = snap_disks[disk_index]
+    if disk:
+      src_node = self.instance.primary_node
+
+      feedback_fn("Removing snapshot of disk/%s on node %s" %
+                  (disk_index, src_node))
+
+      result = self.rpc.call_blockdev_remove(src_node, disk)
+      if not result.fail_msg:
+        return True
+
+      self.LogWarning("Could not remove snapshot for disk/%d from node"
+                      " %s: %s", disk_index, src_node, result.fail_msg)
+
+    return False
+
+  def _CleanupExports(self, feedback_fn):
+    """Removes exports of current instance from all other nodes.
+
+    If an instance in a cluster with nodes A..D was exported to node C, its
+    exports will be removed from the nodes A, B and D.
+
+    """
+    nodelist = self.cfg.GetNodeList()
+    nodelist.remove(self.dst_node.name)
+
+    # on one-node clusters nodelist will be empty after the removal
+    # if we proceed the backup would be removed because OpQueryExports
+    # substitutes an empty list with the full cluster node list.
+    iname = self.instance.name
+    if nodelist:
+      feedback_fn("Removing old exports for instance %s" % iname)
+      exportlist = self.rpc.call_export_list(nodelist)
+      for node in exportlist:
+        if exportlist[node].fail_msg:
+          continue
+        if iname in exportlist[node].payload:
+          msg = self.rpc.call_export_remove(node, iname).fail_msg
+          if msg:
+            self.LogWarning("Could not remove older export for instance %s"
+                            " on node %s: %s", iname, node, msg)
+
   def Exec(self, feedback_fn):
     """Export an instance to an image in the cluster.
 
@@ -8230,13 +8987,10 @@ class LUExportInstance(LogicalUnit):
       feedback_fn("Shutting down instance %s" % instance.name)
       result = self.rpc.call_instance_shutdown(src_node, instance,
                                                self.shutdown_timeout)
+      # TODO: Maybe ignore failures if ignore_remove_failures is set
       result.Raise("Could not shutdown instance %s on"
                    " node %s" % (instance.name, src_node))
 
-    vgname = self.cfg.GetVGName()
-
-    snap_disks = []
-
     # set the disks ID correctly since call_instance_start needs the
     # correct drbd minor to create the symlinks
     for disk in instance.disks:
@@ -8251,94 +9005,93 @@ class LUExportInstance(LogicalUnit):
 
     try:
       # per-disk results
-      dresults = []
+      removed_snaps = [False] * len(instance.disks)
+
+      snap_disks = None
       try:
-        for idx, disk in enumerate(instance.disks):
-          feedback_fn("Creating a snapshot of disk/%s on node %s" %
-                      (idx, src_node))
-
-          # result.payload will be a snapshot of an lvm leaf of the one we
-          # passed
-          result = self.rpc.call_blockdev_snapshot(src_node, disk)
-          msg = result.fail_msg
-          if msg:
-            self.LogWarning("Could not snapshot disk/%s on node %s: %s",
-                            idx, src_node, msg)
-            snap_disks.append(False)
-          else:
-            disk_id = (vgname, result.payload)
-            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
-                                   logical_id=disk_id, physical_id=disk_id,
-                                   iv_name=disk.iv_name)
-            snap_disks.append(new_dev)
+        try:
+          snap_disks = self._CreateSnapshots(feedback_fn)
+        finally:
+          if (self.op.shutdown and instance.admin_up and
+              not self.remove_instance):
+            feedback_fn("Starting instance %s" % instance.name)
+            result = self.rpc.call_instance_start(src_node, instance,
+                                                  None, None)
+            msg = result.fail_msg
+            if msg:
+              _ShutdownInstanceDisks(self, instance)
+              raise errors.OpExecError("Could not start instance: %s" % msg)
+
+        assert len(snap_disks) == len(instance.disks)
+        assert len(removed_snaps) == len(instance.disks)
+
+        # TODO: check for size
+
+        def _TransferFinished(idx):
+          logging.debug("Transfer %s finished", idx)
+          if self._RemoveSnapshot(feedback_fn, snap_disks, idx):
+            removed_snaps[idx] = True
+
+        transfers = []
+
+        for idx, dev in enumerate(snap_disks):
+          if not dev:
+            transfers.append(None)
+            continue
 
-      finally:
-        if self.op.shutdown and instance.admin_up:
-          feedback_fn("Starting instance %s" % instance.name)
-          result = self.rpc.call_instance_start(src_node, instance, None, None)
-          msg = result.fail_msg
-          if msg:
-            _ShutdownInstanceDisks(self, instance)
-            raise errors.OpExecError("Could not start instance: %s" % msg)
-
-      # TODO: check for size
-
-      cluster_name = self.cfg.GetClusterName()
-      for idx, dev in enumerate(snap_disks):
-        feedback_fn("Exporting snapshot %s from %s to %s" %
-                    (idx, src_node, dst_node.name))
-        if dev:
-          # FIXME: pass debug from opcode to backend
-          result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
-                                                 instance, cluster_name,
-                                                 idx, self.op.debug_level)
-          msg = result.fail_msg
-          if msg:
-            self.LogWarning("Could not export disk/%s from node %s to"
-                            " node %s: %s", idx, src_node, dst_node.name, msg)
-            dresults.append(False)
-          else:
-            dresults.append(True)
-          msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
-          if msg:
-            self.LogWarning("Could not remove snapshot for disk/%d from node"
-                            " %s: %s", idx, src_node, msg)
-        else:
-          dresults.append(False)
+          path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
+                                dev.physical_id[1])
 
-      feedback_fn("Finalizing export on %s" % dst_node.name)
-      result = self.rpc.call_finalize_export(dst_node.name, instance,
-                                             snap_disks)
-      fin_resu = True
-      msg = result.fail_msg
-      if msg:
-        self.LogWarning("Could not finalize export for instance %s"
-                        " on node %s: %s", instance.name, dst_node.name, msg)
-        fin_resu = False
+          finished_fn = compat.partial(_TransferFinished, idx)
+
+          # FIXME: pass debug option from opcode to backend
+          dt = masterd.instance.DiskTransfer("snapshot/%s" % idx,
+                                             constants.IEIO_SCRIPT, (dev, idx),
+                                             constants.IEIO_FILE, (path, ),
+                                             finished_fn)
+          transfers.append(dt)
+
+        # Actually export data
+        dresults = \
+          masterd.instance.TransferInstanceData(self, feedback_fn,
+                                                src_node, dst_node.name,
+                                                dst_node.secondary_ip,
+                                                instance, transfers)
+
+        assert len(dresults) == len(instance.disks)
+
+        # Check for backwards compatibility
+        assert compat.all(isinstance(i, bool) for i in dresults), \
+               "Not all results are boolean: %r" % dresults
+
+        feedback_fn("Finalizing export on %s" % dst_node.name)
+        result = self.rpc.call_finalize_export(dst_node.name, instance,
+                                               snap_disks)
+        msg = result.fail_msg
+        fin_resu = not msg
+        if msg:
+          self.LogWarning("Could not finalize export for instance %s"
+                          " on node %s: %s", instance.name, dst_node.name, msg)
+
+      finally:
+        # Remove all snapshots
+        assert len(removed_snaps) == len(instance.disks)
+        for idx, removed in enumerate(removed_snaps):
+          if not removed:
+            self._RemoveSnapshot(feedback_fn, snap_disks, idx)
 
     finally:
       if activate_disks:
         feedback_fn("Deactivating disks for %s" % instance.name)
         _ShutdownInstanceDisks(self, instance)
 
-    nodelist = self.cfg.GetNodeList()
-    nodelist.remove(dst_node.name)
+    # Remove instance if requested
+    if self.remove_instance:
+      feedback_fn("Removing instance %s" % instance.name)
+      _RemoveInstance(self, feedback_fn, instance, self.ignore_remove_failures)
+
+    self._CleanupExports(feedback_fn)
 
-    # on one-node clusters nodelist will be empty after the removal
-    # if we proceed the backup would be removed because OpQueryExports
-    # substitutes an empty list with the full cluster node list.
-    iname = instance.name
-    if nodelist:
-      feedback_fn("Removing old exports for instance %s" % iname)
-      exportlist = self.rpc.call_export_list(nodelist)
-      for node in exportlist:
-        if exportlist[node].fail_msg:
-          continue
-        if iname in exportlist[node].payload:
-          msg = self.rpc.call_export_remove(node, iname).fail_msg
-          if msg:
-            self.LogWarning("Could not remove older export for instance %s"
-                            " on node %s: %s", iname, node, msg)
     return fin_resu, dresults