Merge branch 'devel-2.1'
[ganeti-local] / lib / cmdlib.py
index 39e7d06..9f8b1ff 100644 (file)
@@ -33,6 +33,7 @@ import re
 import platform
 import logging
 import copy
 import platform
 import logging
 import copy
+import OpenSSL
 
 from ganeti import ssh
 from ganeti import utils
 
 from ganeti import ssh
 from ganeti import utils
@@ -43,6 +44,11 @@ from ganeti import constants
 from ganeti import objects
 from ganeti import serializer
 from ganeti import ssconf
 from ganeti import objects
 from ganeti import serializer
 from ganeti import ssconf
+from ganeti import uidpool
+from ganeti import compat
+from ganeti import masterd
+
+import ganeti.masterd.instance # pylint: disable-msg=W0611
 
 
 class LogicalUnit(object):
 
 
 class LogicalUnit(object):
@@ -541,6 +547,75 @@ def _CheckNodeNotDrained(lu, node):
                                errors.ECODE_INVAL)
 
 
                                errors.ECODE_INVAL)
 
 
+def _CheckNodeHasOS(lu, node, os_name, force_variant):
+  """Ensure that a node supports a given OS.
+
+  @param lu: the LU on behalf of which we make the check
+  @param node: the node to check
+  @param os_name: the OS to query about
+  @param force_variant: whether to ignore variant errors
+  @raise errors.OpPrereqError: if the node is not supporting the OS
+
+  """
+  result = lu.rpc.call_os_get(node, os_name)
+  result.Raise("OS '%s' not in supported OS list for node %s" %
+               (os_name, node),
+               prereq=True, ecode=errors.ECODE_INVAL)
+  if not force_variant:
+    _CheckOSVariant(result.payload, os_name)
+
+
+def _RequireFileStorage():
+  """Checks that file storage is enabled.
+
+  @raise errors.OpPrereqError: when file storage is disabled
+
+  """
+  if not constants.ENABLE_FILE_STORAGE:
+    raise errors.OpPrereqError("File storage disabled at configure time",
+                               errors.ECODE_INVAL)
+
+
+def _CheckDiskTemplate(template):
+  """Ensure a given disk template is valid.
+
+  """
+  if template not in constants.DISK_TEMPLATES:
+    msg = ("Invalid disk template name '%s', valid templates are: %s" %
+           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
+    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+  if template == constants.DT_FILE:
+    _RequireFileStorage()
+
+
+def _CheckStorageType(storage_type):
+  """Ensure a given storage type is valid.
+
+  """
+  if storage_type not in constants.VALID_STORAGE_TYPES:
+    raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
+                               errors.ECODE_INVAL)
+  if storage_type == constants.ST_FILE:
+    _RequireFileStorage()
+
+
+
+def _CheckInstanceDown(lu, instance, reason):
+  """Ensure that an instance is not running."""
+  if instance.admin_up:
+    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
+                               (instance.name, reason), errors.ECODE_STATE)
+
+  pnode = instance.primary_node
+  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
+  ins_l.Raise("Can't contact node %s for instance information" % pnode,
+              prereq=True, ecode=errors.ECODE_ENVIRON)
+
+  if instance.name in ins_l.payload:
+    raise errors.OpPrereqError("Instance %s is running, %s" %
+                               (instance.name, reason), errors.ECODE_STATE)
+
+
 def _ExpandItemName(fn, name, kind):
   """Expand an item name.
 
 def _ExpandItemName(fn, name, kind):
   """Expand an item name.
 
@@ -939,6 +1014,39 @@ class LUDestroyCluster(LogicalUnit):
     return master
 
 
     return master
 
 
+def _VerifyCertificate(filename):
+  """Verifies a certificate for LUVerifyCluster.
+
+  @type filename: string
+  @param filename: Path to PEM file
+
+  """
+  try:
+    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                           utils.ReadFile(filename))
+  except Exception, err: # pylint: disable-msg=W0703
+    return (LUVerifyCluster.ETYPE_ERROR,
+            "Failed to load X509 certificate %s: %s" % (filename, err))
+
+  (errcode, msg) = \
+    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
+                                constants.SSL_CERT_EXPIRATION_ERROR)
+
+  if msg:
+    fnamemsg = "While verifying %s: %s" % (filename, msg)
+  else:
+    fnamemsg = None
+
+  if errcode is None:
+    return (None, fnamemsg)
+  elif errcode == utils.CERT_WARNING:
+    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
+  elif errcode == utils.CERT_ERROR:
+    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
+
+  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
+
+
 class LUVerifyCluster(LogicalUnit):
   """Verifies the cluster status.
 
 class LUVerifyCluster(LogicalUnit):
   """Verifies the cluster status.
 
@@ -953,6 +1061,7 @@ class LUVerifyCluster(LogicalUnit):
   TINSTANCE = "instance"
 
   ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
   TINSTANCE = "instance"
 
   ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
+  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
   EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
   EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
   EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
   EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
   EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
   EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
@@ -978,6 +1087,44 @@ class LUVerifyCluster(LogicalUnit):
   ETYPE_ERROR = "ERROR"
   ETYPE_WARNING = "WARNING"
 
   ETYPE_ERROR = "ERROR"
   ETYPE_WARNING = "WARNING"
 
+  class NodeImage(object):
+    """A class representing the logical and physical status of a node.
+
+    @ivar volumes: a structure as returned from
+        L{ganeti.backend.GetVolumeList} (runtime)
+    @ivar instances: a list of running instances (runtime)
+    @ivar pinst: list of configured primary instances (config)
+    @ivar sinst: list of configured secondary instances (config)
+    @ivar sbp: diction of {secondary-node: list of instances} of all peers
+        of this node (config)
+    @ivar mfree: free memory, as reported by hypervisor (runtime)
+    @ivar dfree: free disk, as reported by the node (runtime)
+    @ivar offline: the offline status (config)
+    @type rpc_fail: boolean
+    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
+        not whether the individual keys were correct) (runtime)
+    @type lvm_fail: boolean
+    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
+    @type hyp_fail: boolean
+    @ivar hyp_fail: whether the RPC call didn't return the instance list
+    @type ghost: boolean
+    @ivar ghost: whether this is a known node or not (config)
+
+    """
+    def __init__(self, offline=False):
+      self.volumes = {}
+      self.instances = []
+      self.pinst = []
+      self.sinst = []
+      self.sbp = {}
+      self.mfree = 0
+      self.dfree = 0
+      self.offline = offline
+      self.rpc_fail = False
+      self.lvm_fail = False
+      self.hyp_fail = False
+      self.ghost = False
+
   def ExpandNames(self):
     self.needed_locks = {
       locking.LEVEL_NODE: locking.ALL_SET,
   def ExpandNames(self):
     self.needed_locks = {
       locking.LEVEL_NODE: locking.ALL_SET,
@@ -1022,8 +1169,7 @@ class LUVerifyCluster(LogicalUnit):
     if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
       self.bad = self.bad or cond
 
     if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
       self.bad = self.bad or cond
 
-  def _VerifyNode(self, nodeinfo, file_list, local_cksum,
-                  node_result, master_files, drbd_map, vg_name):
+  def _VerifyNode(self, ninfo, nresult):
     """Run multiple tests against a node.
 
     Test list:
     """Run multiple tests against a node.
 
     Test list:
@@ -1033,45 +1179,41 @@ class LUVerifyCluster(LogicalUnit):
       - checks config file checksum
       - checks ssh to other nodes
 
       - checks config file checksum
       - checks ssh to other nodes
 
-    @type nodeinfo: L{objects.Node}
-    @param nodeinfo: the node to check
-    @param file_list: required list of files
-    @param local_cksum: dictionary of local files and their checksums
-    @param node_result: the results from the node
-    @param master_files: list of files that only masters should have
-    @param drbd_map: the useddrbd minors for this node, in
-        form of minor: (instance, must_exist) which correspond to instances
-        and their running status
-    @param vg_name: Ganeti Volume Group (result of self.cfg.GetVGName())
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the results from the node
+    @rtype: boolean
+    @return: whether overall this call was successful (and we can expect
+         reasonable values in the respose)
 
     """
 
     """
-    node = nodeinfo.name
+    node = ninfo.name
     _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
 
     _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
 
-    # main result, node_result should be a non-empty dict
-    test = not node_result or not isinstance(node_result, dict)
+    # main result, nresult should be a non-empty dict
+    test = not nresult or not isinstance(nresult, dict)
     _ErrorIf(test, self.ENODERPC, node,
                   "unable to verify node: no data returned")
     if test:
     _ErrorIf(test, self.ENODERPC, node,
                   "unable to verify node: no data returned")
     if test:
-      return
+      return False
 
     # compares ganeti version
     local_version = constants.PROTOCOL_VERSION
 
     # compares ganeti version
     local_version = constants.PROTOCOL_VERSION
-    remote_version = node_result.get('version', None)
+    remote_version = nresult.get("version", None)
     test = not (remote_version and
                 isinstance(remote_version, (list, tuple)) and
                 len(remote_version) == 2)
     _ErrorIf(test, self.ENODERPC, node,
              "connection to node returned invalid data")
     if test:
     test = not (remote_version and
                 isinstance(remote_version, (list, tuple)) and
                 len(remote_version) == 2)
     _ErrorIf(test, self.ENODERPC, node,
              "connection to node returned invalid data")
     if test:
-      return
+      return False
 
     test = local_version != remote_version[0]
     _ErrorIf(test, self.ENODEVERSION, node,
              "incompatible protocol versions: master %s,"
              " node %s", local_version, remote_version[0])
     if test:
 
     test = local_version != remote_version[0]
     _ErrorIf(test, self.ENODEVERSION, node,
              "incompatible protocol versions: master %s,"
              " node %s", local_version, remote_version[0])
     if test:
-      return
+      return False
 
     # node seems compatible, we can actually try to look into its results
 
 
     # node seems compatible, we can actually try to look into its results
 
@@ -1082,111 +1224,122 @@ class LUVerifyCluster(LogicalUnit):
                   constants.RELEASE_VERSION, remote_version[1],
                   code=self.ETYPE_WARNING)
 
                   constants.RELEASE_VERSION, remote_version[1],
                   code=self.ETYPE_WARNING)
 
-    # checks vg existence and size > 20G
-    if vg_name is not None:
-      vglist = node_result.get(constants.NV_VGLIST, None)
-      test = not vglist
-      _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
-      if not test:
-        vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
-                                              constants.MIN_VG_SIZE)
-        _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
+    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
+    if isinstance(hyp_result, dict):
+      for hv_name, hv_result in hyp_result.iteritems():
+        test = hv_result is not None
+        _ErrorIf(test, self.ENODEHV, node,
+                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
 
 
-    # checks config file checksum
 
 
-    remote_cksum = node_result.get(constants.NV_FILELIST, None)
-    test = not isinstance(remote_cksum, dict)
-    _ErrorIf(test, self.ENODEFILECHECK, node,
-             "node hasn't returned file checksum data")
+    test = nresult.get(constants.NV_NODESETUP,
+                           ["Missing NODESETUP results"])
+    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
+             "; ".join(test))
+
+    return True
+
+  def _VerifyNodeTime(self, ninfo, nresult,
+                      nvinfo_starttime, nvinfo_endtime):
+    """Check the node time.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param nvinfo_starttime: the start time of the RPC call
+    @param nvinfo_endtime: the end time of the RPC call
+
+    """
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    ntime = nresult.get(constants.NV_TIME, None)
+    try:
+      ntime_merged = utils.MergeTime(ntime)
+    except (ValueError, TypeError):
+      _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
+      return
+
+    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
+      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
+    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
+      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
+    else:
+      ntime_diff = None
+
+    _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
+             "Node time diverges by at least %s from master node time",
+             ntime_diff)
+
+  def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
+    """Check the node time.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param vg_name: the configured VG name
+
+    """
+    if vg_name is None:
+      return
+
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    # checks vg existence and size > 20G
+    vglist = nresult.get(constants.NV_VGLIST, None)
+    test = not vglist
+    _ErrorIf(test, self.ENODELVM, node, "unable to check volume groups")
+    if not test:
+      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
+                                            constants.MIN_VG_SIZE)
+      _ErrorIf(vgstatus, self.ENODELVM, node, vgstatus)
+
+    # check pv names
+    pvlist = nresult.get(constants.NV_PVLIST, None)
+    test = pvlist is None
+    _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
     if not test:
     if not test:
-      for file_name in file_list:
-        node_is_mc = nodeinfo.master_candidate
-        must_have = (file_name not in master_files) or node_is_mc
-        # missing
-        test1 = file_name not in remote_cksum
-        # invalid checksum
-        test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
-        # existing and good
-        test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
-        _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
-                 "file '%s' missing", file_name)
-        _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
-                 "file '%s' has wrong checksum", file_name)
-        # not candidate and this is not a must-have file
-        _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
-                 "file '%s' should not exist on non master"
-                 " candidates (and the file is outdated)", file_name)
-        # all good, except non-master/non-must have combination
-        _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
-                 "file '%s' should not exist"
-                 " on non master candidates", file_name)
-
-    # checks ssh to any
-
-    test = constants.NV_NODELIST not in node_result
+      # check that ':' is not present in PV names, since it's a
+      # special character for lvcreate (denotes the range of PEs to
+      # use on the PV)
+      for _, pvname, owner_vg in pvlist:
+        test = ":" in pvname
+        _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
+                 " '%s' of VG '%s'", pvname, owner_vg)
+
+  def _VerifyNodeNetwork(self, ninfo, nresult):
+    """Check the node time.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+
+    """
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    test = constants.NV_NODELIST not in nresult
     _ErrorIf(test, self.ENODESSH, node,
              "node hasn't returned node ssh connectivity data")
     if not test:
     _ErrorIf(test, self.ENODESSH, node,
              "node hasn't returned node ssh connectivity data")
     if not test:
-      if node_result[constants.NV_NODELIST]:
-        for a_node, a_msg in node_result[constants.NV_NODELIST].items():
+      if nresult[constants.NV_NODELIST]:
+        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
           _ErrorIf(True, self.ENODESSH, node,
                    "ssh communication with node '%s': %s", a_node, a_msg)
 
           _ErrorIf(True, self.ENODESSH, node,
                    "ssh communication with node '%s': %s", a_node, a_msg)
 
-    test = constants.NV_NODENETTEST not in node_result
+    test = constants.NV_NODENETTEST not in nresult
     _ErrorIf(test, self.ENODENET, node,
              "node hasn't returned node tcp connectivity data")
     if not test:
     _ErrorIf(test, self.ENODENET, node,
              "node hasn't returned node tcp connectivity data")
     if not test:
-      if node_result[constants.NV_NODENETTEST]:
-        nlist = utils.NiceSort(node_result[constants.NV_NODENETTEST].keys())
+      if nresult[constants.NV_NODENETTEST]:
+        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
         for anode in nlist:
           _ErrorIf(True, self.ENODENET, node,
                    "tcp communication with node '%s': %s",
         for anode in nlist:
           _ErrorIf(True, self.ENODENET, node,
                    "tcp communication with node '%s': %s",
-                   anode, node_result[constants.NV_NODENETTEST][anode])
-
-    hyp_result = node_result.get(constants.NV_HYPERVISOR, None)
-    if isinstance(hyp_result, dict):
-      for hv_name, hv_result in hyp_result.iteritems():
-        test = hv_result is not None
-        _ErrorIf(test, self.ENODEHV, node,
-                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
+                   anode, nresult[constants.NV_NODENETTEST][anode])
 
 
-    # check used drbd list
-    if vg_name is not None:
-      used_minors = node_result.get(constants.NV_DRBDLIST, [])
-      test = not isinstance(used_minors, (tuple, list))
-      _ErrorIf(test, self.ENODEDRBD, node,
-               "cannot parse drbd status file: %s", str(used_minors))
-      if not test:
-        for minor, (iname, must_exist) in drbd_map.items():
-          test = minor not in used_minors and must_exist
-          _ErrorIf(test, self.ENODEDRBD, node,
-                   "drbd minor %d of instance %s is not active",
-                   minor, iname)
-        for minor in used_minors:
-          test = minor not in drbd_map
-          _ErrorIf(test, self.ENODEDRBD, node,
-                   "unallocated drbd minor %d is in use", minor)
-    test = node_result.get(constants.NV_NODESETUP,
-                           ["Missing NODESETUP results"])
-    _ErrorIf(test, self.ENODESETUP, node, "node setup error: %s",
-             "; ".join(test))
-
-    # check pv names
-    if vg_name is not None:
-      pvlist = node_result.get(constants.NV_PVLIST, None)
-      test = pvlist is None
-      _ErrorIf(test, self.ENODELVM, node, "Can't get PV list from node")
-      if not test:
-        # check that ':' is not present in PV names, since it's a
-        # special character for lvcreate (denotes the range of PEs to
-        # use on the PV)
-        for _, pvname, owner_vg in pvlist:
-          test = ":" in pvname
-          _ErrorIf(test, self.ENODELVM, node, "Invalid character ':' in PV"
-                   " '%s' of VG '%s'", pvname, owner_vg)
-
-  def _VerifyInstance(self, instance, instanceconfig, node_vol_is,
-                      node_instance, n_offline):
+  def _VerifyInstance(self, instance, instanceconfig, node_image):
     """Verify an instance.
 
     This function checks to see if the required block devices are
     """Verify an instance.
 
     This function checks to see if the required block devices are
@@ -1200,81 +1353,264 @@ class LUVerifyCluster(LogicalUnit):
     instanceconfig.MapLVsByNode(node_vol_should)
 
     for node in node_vol_should:
     instanceconfig.MapLVsByNode(node_vol_should)
 
     for node in node_vol_should:
-      if node in n_offline:
-        # ignore missing volumes on offline nodes
+      n_img = node_image[node]
+      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
+        # ignore missing volumes on offline or broken nodes
         continue
       for volume in node_vol_should[node]:
         continue
       for volume in node_vol_should[node]:
-        test = node not in node_vol_is or volume not in node_vol_is[node]
+        test = volume not in n_img.volumes
         _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
                  "volume %s missing on node %s", volume, node)
 
     if instanceconfig.admin_up:
         _ErrorIf(test, self.EINSTANCEMISSINGDISK, instance,
                  "volume %s missing on node %s", volume, node)
 
     if instanceconfig.admin_up:
-      test = ((node_current not in node_instance or
-               not instance in node_instance[node_current]) and
-              node_current not in n_offline)
+      pri_img = node_image[node_current]
+      test = instance not in pri_img.instances and not pri_img.offline
       _ErrorIf(test, self.EINSTANCEDOWN, instance,
                "instance not running on its primary node %s",
                node_current)
 
       _ErrorIf(test, self.EINSTANCEDOWN, instance,
                "instance not running on its primary node %s",
                node_current)
 
-    for node in node_instance:
+    for node, n_img in node_image.items():
       if (not node == node_current):
       if (not node == node_current):
-        test = instance in node_instance[node]
+        test = instance in n_img.instances
         _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
                  "instance should not run on node %s", node)
 
         _ErrorIf(test, self.EINSTANCEWRONGNODE, instance,
                  "instance should not run on node %s", node)
 
-  def _VerifyOrphanVolumes(self, node_vol_should, node_vol_is):
+  def _VerifyOrphanVolumes(self, node_vol_should, node_image):
     """Verify if there are any unknown volumes in the cluster.
 
     The .os, .swap and backup volumes are ignored. All other volumes are
     reported as unknown.
 
     """
     """Verify if there are any unknown volumes in the cluster.
 
     The .os, .swap and backup volumes are ignored. All other volumes are
     reported as unknown.
 
     """
-    for node in node_vol_is:
-      for volume in node_vol_is[node]:
+    for node, n_img in node_image.items():
+      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
+        # skip non-healthy nodes
+        continue
+      for volume in n_img.volumes:
         test = (node not in node_vol_should or
                 volume not in node_vol_should[node])
         self._ErrorIf(test, self.ENODEORPHANLV, node,
                       "volume %s is unknown", volume)
 
         test = (node not in node_vol_should or
                 volume not in node_vol_should[node])
         self._ErrorIf(test, self.ENODEORPHANLV, node,
                       "volume %s is unknown", volume)
 
-  def _VerifyOrphanInstances(self, instancelist, node_instance):
+  def _VerifyOrphanInstances(self, instancelist, node_image):
     """Verify the list of running instances.
 
     This checks what instances are running but unknown to the cluster.
 
     """
     """Verify the list of running instances.
 
     This checks what instances are running but unknown to the cluster.
 
     """
-    for node in node_instance:
-      for o_inst in node_instance[node]:
+    for node, n_img in node_image.items():
+      for o_inst in n_img.instances:
         test = o_inst not in instancelist
         self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
                       "instance %s on node %s should not exist", o_inst, node)
 
         test = o_inst not in instancelist
         self._ErrorIf(test, self.ENODEORPHANINSTANCE, node,
                       "instance %s on node %s should not exist", o_inst, node)
 
-  def _VerifyNPlusOneMemory(self, node_info, instance_cfg):
+  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
     """Verify N+1 Memory Resilience.
 
     """Verify N+1 Memory Resilience.
 
-    Check that if one single node dies we can still start all the instances it
-    was primary for.
+    Check that if one single node dies we can still start all the
+    instances it was primary for.
 
     """
 
     """
-    for node, nodeinfo in node_info.iteritems():
-      # This code checks that every node which is now listed as secondary has
-      # enough memory to host all instances it is supposed to should a single
-      # other node in the cluster fail.
+    for node, n_img in node_image.items():
+      # This code checks that every node which is now listed as
+      # secondary has enough memory to host all instances it is
+      # supposed to should a single other node in the cluster fail.
       # FIXME: not ready for failover to an arbitrary node
       # FIXME: does not support file-backed instances
       # FIXME: not ready for failover to an arbitrary node
       # FIXME: does not support file-backed instances
-      # WARNING: we currently take into account down instances as well as up
-      # ones, considering that even if they're down someone might want to start
-      # them even in the event of a node failure.
-      for prinode, instances in nodeinfo['sinst-by-pnode'].iteritems():
+      # WARNING: we currently take into account down instances as well
+      # as up ones, considering that even if they're down someone
+      # might want to start them even in the event of a node failure.
+      for prinode, instances in n_img.sbp.items():
         needed_mem = 0
         for instance in instances:
           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
           if bep[constants.BE_AUTO_BALANCE]:
             needed_mem += bep[constants.BE_MEMORY]
         needed_mem = 0
         for instance in instances:
           bep = self.cfg.GetClusterInfo().FillBE(instance_cfg[instance])
           if bep[constants.BE_AUTO_BALANCE]:
             needed_mem += bep[constants.BE_MEMORY]
-        test = nodeinfo['mfree'] < needed_mem
+        test = n_img.mfree < needed_mem
         self._ErrorIf(test, self.ENODEN1, node,
                       "not enough memory on to accommodate"
                       " failovers should peer node %s fail", prinode)
 
         self._ErrorIf(test, self.ENODEN1, node,
                       "not enough memory on to accommodate"
                       " failovers should peer node %s fail", prinode)
 
+  def _VerifyNodeFiles(self, ninfo, nresult, file_list, local_cksum,
+                       master_files):
+    """Verifies and computes the node required file checksums.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param file_list: required list of files
+    @param local_cksum: dictionary of local files and their checksums
+    @param master_files: list of files that only masters should have
+
+    """
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    remote_cksum = nresult.get(constants.NV_FILELIST, None)
+    test = not isinstance(remote_cksum, dict)
+    _ErrorIf(test, self.ENODEFILECHECK, node,
+             "node hasn't returned file checksum data")
+    if test:
+      return
+
+    for file_name in file_list:
+      node_is_mc = ninfo.master_candidate
+      must_have = (file_name not in master_files) or node_is_mc
+      # missing
+      test1 = file_name not in remote_cksum
+      # invalid checksum
+      test2 = not test1 and remote_cksum[file_name] != local_cksum[file_name]
+      # existing and good
+      test3 = not test1 and remote_cksum[file_name] == local_cksum[file_name]
+      _ErrorIf(test1 and must_have, self.ENODEFILECHECK, node,
+               "file '%s' missing", file_name)
+      _ErrorIf(test2 and must_have, self.ENODEFILECHECK, node,
+               "file '%s' has wrong checksum", file_name)
+      # not candidate and this is not a must-have file
+      _ErrorIf(test2 and not must_have, self.ENODEFILECHECK, node,
+               "file '%s' should not exist on non master"
+               " candidates (and the file is outdated)", file_name)
+      # all good, except non-master/non-must have combination
+      _ErrorIf(test3 and not must_have, self.ENODEFILECHECK, node,
+               "file '%s' should not exist"
+               " on non master candidates", file_name)
+
+  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_map):
+    """Verifies and the node DRBD status.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param instanceinfo: the dict of instances
+    @param drbd_map: the DRBD map as returned by
+        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
+
+    """
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    # compute the DRBD minors
+    node_drbd = {}
+    for minor, instance in drbd_map[node].items():
+      test = instance not in instanceinfo
+      _ErrorIf(test, self.ECLUSTERCFG, None,
+               "ghost instance '%s' in temporary DRBD map", instance)
+        # ghost instance should not be running, but otherwise we
+        # don't give double warnings (both ghost instance and
+        # unallocated minor in use)
+      if test:
+        node_drbd[minor] = (instance, False)
+      else:
+        instance = instanceinfo[instance]
+        node_drbd[minor] = (instance.name, instance.admin_up)
+
+    # and now check them
+    used_minors = nresult.get(constants.NV_DRBDLIST, [])
+    test = not isinstance(used_minors, (tuple, list))
+    _ErrorIf(test, self.ENODEDRBD, node,
+             "cannot parse drbd status file: %s", str(used_minors))
+    if test:
+      # we cannot check drbd status
+      return
+
+    for minor, (iname, must_exist) in node_drbd.items():
+      test = minor not in used_minors and must_exist
+      _ErrorIf(test, self.ENODEDRBD, node,
+               "drbd minor %d of instance %s is not active", minor, iname)
+    for minor in used_minors:
+      test = minor not in node_drbd
+      _ErrorIf(test, self.ENODEDRBD, node,
+               "unallocated drbd minor %d is in use", minor)
+
+  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
+    """Verifies and updates the node volume data.
+
+    This function will update a L{NodeImage}'s internal structures
+    with data from the remote call.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param nimg: the node image object
+    @param vg_name: the configured VG name
+
+    """
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    nimg.lvm_fail = True
+    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
+    if vg_name is None:
+      pass
+    elif isinstance(lvdata, basestring):
+      _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
+               utils.SafeEncode(lvdata))
+    elif not isinstance(lvdata, dict):
+      _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
+    else:
+      nimg.volumes = lvdata
+      nimg.lvm_fail = False
+
+  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
+    """Verifies and updates the node instance list.
+
+    If the listing was successful, then updates this node's instance
+    list. Otherwise, it marks the RPC call as failed for the instance
+    list key.
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param nimg: the node image object
+
+    """
+    idata = nresult.get(constants.NV_INSTANCELIST, None)
+    test = not isinstance(idata, list)
+    self._ErrorIf(test, self.ENODEHV, ninfo.name, "rpc call to node failed"
+                  " (instancelist): %s", utils.SafeEncode(str(idata)))
+    if test:
+      nimg.hyp_fail = True
+    else:
+      nimg.instances = idata
+
+  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
+    """Verifies and computes a node information map
+
+    @type ninfo: L{objects.Node}
+    @param ninfo: the node to check
+    @param nresult: the remote results for the node
+    @param nimg: the node image object
+    @param vg_name: the configured VG name
+
+    """
+    node = ninfo.name
+    _ErrorIf = self._ErrorIf # pylint: disable-msg=C0103
+
+    # try to read free memory (from the hypervisor)
+    hv_info = nresult.get(constants.NV_HVINFO, None)
+    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
+    _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
+    if not test:
+      try:
+        nimg.mfree = int(hv_info["memory_free"])
+      except (ValueError, TypeError):
+        _ErrorIf(True, self.ENODERPC, node,
+                 "node returned invalid nodeinfo, check hypervisor")
+
+    # FIXME: devise a free space model for file based instances as well
+    if vg_name is not None:
+      test = (constants.NV_VGLIST not in nresult or
+              vg_name not in nresult[constants.NV_VGLIST])
+      _ErrorIf(test, self.ENODELVM, node,
+               "node didn't return data for the volume group '%s'"
+               " - it is either missing or broken", vg_name)
+      if not test:
+        try:
+          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
+        except (ValueError, TypeError):
+          _ErrorIf(True, self.ENODERPC, node,
+                   "node returned invalid LVM info, check LVM status")
+
   def CheckPrereq(self):
     """Check prerequisites.
 
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -1315,8 +1651,14 @@ class LUVerifyCluster(LogicalUnit):
     for msg in self.cfg.VerifyConfig():
       _ErrorIf(True, self.ECLUSTERCFG, None, msg)
 
     for msg in self.cfg.VerifyConfig():
       _ErrorIf(True, self.ECLUSTERCFG, None, msg)
 
+    # Check the cluster certificates
+    for cert_filename in constants.ALL_CERT_FILES:
+      (errcode, msg) = _VerifyCertificate(cert_filename)
+      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
+
     vg_name = self.cfg.GetVGName()
     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
     vg_name = self.cfg.GetVGName()
     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
+    cluster = self.cfg.GetClusterInfo()
     nodelist = utils.NiceSort(self.cfg.GetNodeList())
     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
     nodelist = utils.NiceSort(self.cfg.GetNodeList())
     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
@@ -1324,21 +1666,19 @@ class LUVerifyCluster(LogicalUnit):
                         for iname in instancelist)
     i_non_redundant = [] # Non redundant instances
     i_non_a_balanced = [] # Non auto-balanced instances
                         for iname in instancelist)
     i_non_redundant = [] # Non redundant instances
     i_non_a_balanced = [] # Non auto-balanced instances
-    n_offline = [] # List of offline nodes
-    n_drained = [] # List of nodes being drained
-    node_volume = {}
-    node_instance = {}
-    node_info = {}
-    instance_cfg = {}
+    n_offline = 0 # Count of offline nodes
+    n_drained = 0 # Count of nodes being drained
+    node_vol_should = {}
 
     # FIXME: verify OS list
     # do local checksums
     master_files = [constants.CLUSTER_CONF_FILE]
 
     file_names = ssconf.SimpleStore().GetFileList()
 
     # FIXME: verify OS list
     # do local checksums
     master_files = [constants.CLUSTER_CONF_FILE]
 
     file_names = ssconf.SimpleStore().GetFileList()
-    file_names.append(constants.SSL_CERT_FILE)
-    file_names.append(constants.RAPI_CERT_FILE)
+    file_names.extend(constants.ALL_CERT_FILES)
     file_names.extend(master_files)
     file_names.extend(master_files)
+    if cluster.modify_etc_hosts:
+      file_names.append(constants.ETC_HOSTS)
 
     local_checksums = utils.FingerprintFiles(file_names)
 
 
     local_checksums = utils.FingerprintFiles(file_names)
 
@@ -1364,6 +1704,35 @@ class LUVerifyCluster(LogicalUnit):
       node_verify_param[constants.NV_PVLIST] = [vg_name]
       node_verify_param[constants.NV_DRBDLIST] = None
 
       node_verify_param[constants.NV_PVLIST] = [vg_name]
       node_verify_param[constants.NV_DRBDLIST] = None
 
+    # Build our expected cluster state
+    node_image = dict((node.name, self.NodeImage(offline=node.offline))
+                      for node in nodeinfo)
+
+    for instance in instancelist:
+      inst_config = instanceinfo[instance]
+
+      for nname in inst_config.all_nodes:
+        if nname not in node_image:
+          # ghost node
+          gnode = self.NodeImage()
+          gnode.ghost = True
+          node_image[nname] = gnode
+
+      inst_config.MapLVsByNode(node_vol_should)
+
+      pnode = inst_config.primary_node
+      node_image[pnode].pinst.append(instance)
+
+      for snode in inst_config.secondary_nodes:
+        nimg = node_image[snode]
+        nimg.sinst.append(instance)
+        if pnode not in nimg.sbp:
+          nimg.sbp[pnode] = []
+        nimg.sbp[pnode].append(instance)
+
+    # At this point, we have the in-memory data structures complete,
+    # except for the runtime information, which we'll gather next
+
     # Due to the way our RPC system works, exact response times cannot be
     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
     # time before and after executing the request, we can at least have a time
     # Due to the way our RPC system works, exact response times cannot be
     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
     # time before and after executing the request, we can at least have a time
@@ -1373,18 +1742,18 @@ class LUVerifyCluster(LogicalUnit):
                                            self.cfg.GetClusterName())
     nvinfo_endtime = time.time()
 
                                            self.cfg.GetClusterName())
     nvinfo_endtime = time.time()
 
-    cluster = self.cfg.GetClusterInfo()
     master_node = self.cfg.GetMasterNode()
     all_drbd_map = self.cfg.ComputeDRBDMap()
 
     feedback_fn("* Verifying node status")
     for node_i in nodeinfo:
       node = node_i.name
     master_node = self.cfg.GetMasterNode()
     all_drbd_map = self.cfg.ComputeDRBDMap()
 
     feedback_fn("* Verifying node status")
     for node_i in nodeinfo:
       node = node_i.name
+      nimg = node_image[node]
 
       if node_i.offline:
         if verbose:
           feedback_fn("* Skipping offline node %s" % (node,))
 
       if node_i.offline:
         if verbose:
           feedback_fn("* Skipping offline node %s" % (node,))
-        n_offline.append(node)
+        n_offline += 1
         continue
 
       if node == master_node:
         continue
 
       if node == master_node:
@@ -1393,7 +1762,7 @@ class LUVerifyCluster(LogicalUnit):
         ntype = "master candidate"
       elif node_i.drained:
         ntype = "drained"
         ntype = "master candidate"
       elif node_i.drained:
         ntype = "drained"
-        n_drained.append(node)
+        n_drained += 1
       else:
         ntype = "regular"
       if verbose:
       else:
         ntype = "regular"
       if verbose:
@@ -1402,128 +1771,38 @@ class LUVerifyCluster(LogicalUnit):
       msg = all_nvinfo[node].fail_msg
       _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
       if msg:
       msg = all_nvinfo[node].fail_msg
       _ErrorIf(msg, self.ENODERPC, node, "while contacting node: %s", msg)
       if msg:
+        nimg.rpc_fail = True
         continue
 
       nresult = all_nvinfo[node].payload
         continue
 
       nresult = all_nvinfo[node].payload
-      node_drbd = {}
-      for minor, instance in all_drbd_map[node].items():
-        test = instance not in instanceinfo
-        _ErrorIf(test, self.ECLUSTERCFG, None,
-                 "ghost instance '%s' in temporary DRBD map", instance)
-          # ghost instance should not be running, but otherwise we
-          # don't give double warnings (both ghost instance and
-          # unallocated minor in use)
-        if test:
-          node_drbd[minor] = (instance, False)
-        else:
-          instance = instanceinfo[instance]
-          node_drbd[minor] = (instance.name, instance.admin_up)
-
-      self._VerifyNode(node_i, file_names, local_checksums,
-                       nresult, master_files, node_drbd, vg_name)
-
-      lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
-      if vg_name is None:
-        node_volume[node] = {}
-      elif isinstance(lvdata, basestring):
-        _ErrorIf(True, self.ENODELVM, node, "LVM problem on node: %s",
-                 utils.SafeEncode(lvdata))
-        node_volume[node] = {}
-      elif not isinstance(lvdata, dict):
-        _ErrorIf(True, self.ENODELVM, node, "rpc call to node failed (lvlist)")
-        continue
-      else:
-        node_volume[node] = lvdata
-
-      # node_instance
-      idata = nresult.get(constants.NV_INSTANCELIST, None)
-      test = not isinstance(idata, list)
-      _ErrorIf(test, self.ENODEHV, node,
-               "rpc call to node failed (instancelist)")
-      if test:
-        continue
-
-      node_instance[node] = idata
 
 
-      # node_info
-      nodeinfo = nresult.get(constants.NV_HVINFO, None)
-      test = not isinstance(nodeinfo, dict)
-      _ErrorIf(test, self.ENODEHV, node, "rpc call to node failed (hvinfo)")
-      if test:
-        continue
-
-      # Node time
-      ntime = nresult.get(constants.NV_TIME, None)
-      try:
-        ntime_merged = utils.MergeTime(ntime)
-      except (ValueError, TypeError):
-        _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
-
-      if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
-        ntime_diff = abs(nvinfo_starttime - ntime_merged)
-      elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
-        ntime_diff = abs(ntime_merged - nvinfo_endtime)
-      else:
-        ntime_diff = None
-
-      _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
-               "Node time diverges by at least %0.1fs from master node time",
-               ntime_diff)
-
-      if ntime_diff is not None:
-        continue
-
-      try:
-        node_info[node] = {
-          "mfree": int(nodeinfo['memory_free']),
-          "pinst": [],
-          "sinst": [],
-          # dictionary holding all instances this node is secondary for,
-          # grouped by their primary node. Each key is a cluster node, and each
-          # value is a list of instances which have the key as primary and the
-          # current node as secondary.  this is handy to calculate N+1 memory
-          # availability if you can only failover from a primary to its
-          # secondary.
-          "sinst-by-pnode": {},
-        }
-        # FIXME: devise a free space model for file based instances as well
-        if vg_name is not None:
-          test = (constants.NV_VGLIST not in nresult or
-                  vg_name not in nresult[constants.NV_VGLIST])
-          _ErrorIf(test, self.ENODELVM, node,
-                   "node didn't return data for the volume group '%s'"
-                   " - it is either missing or broken", vg_name)
-          if test:
-            continue
-          node_info[node]["dfree"] = int(nresult[constants.NV_VGLIST][vg_name])
-      except (ValueError, KeyError):
-        _ErrorIf(True, self.ENODERPC, node,
-                 "node returned invalid nodeinfo, check lvm/hypervisor")
-        continue
+      nimg.call_ok = self._VerifyNode(node_i, nresult)
+      self._VerifyNodeNetwork(node_i, nresult)
+      self._VerifyNodeLVM(node_i, nresult, vg_name)
+      self._VerifyNodeFiles(node_i, nresult, file_names, local_checksums,
+                            master_files)
+      self._VerifyNodeDrbd(node_i, nresult, instanceinfo, all_drbd_map)
+      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
 
 
-    node_vol_should = {}
+      self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
+      self._UpdateNodeInstances(node_i, nresult, nimg)
+      self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
 
     feedback_fn("* Verifying instance status")
     for instance in instancelist:
       if verbose:
         feedback_fn("* Verifying instance %s" % instance)
       inst_config = instanceinfo[instance]
 
     feedback_fn("* Verifying instance status")
     for instance in instancelist:
       if verbose:
         feedback_fn("* Verifying instance %s" % instance)
       inst_config = instanceinfo[instance]
-      self._VerifyInstance(instance, inst_config, node_volume,
-                           node_instance, n_offline)
+      self._VerifyInstance(instance, inst_config, node_image)
       inst_nodes_offline = []
 
       inst_nodes_offline = []
 
-      inst_config.MapLVsByNode(node_vol_should)
-
-      instance_cfg[instance] = inst_config
-
       pnode = inst_config.primary_node
       pnode = inst_config.primary_node
-      _ErrorIf(pnode not in node_info and pnode not in n_offline,
+      pnode_img = node_image[pnode]
+      _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
                self.ENODERPC, pnode, "instance %s, connection to"
                " primary node failed", instance)
                self.ENODERPC, pnode, "instance %s, connection to"
                " primary node failed", instance)
-      if pnode in node_info:
-        node_info[pnode]['pinst'].append(instance)
 
 
-      if pnode in n_offline:
+      if pnode_img.offline:
         inst_nodes_offline.append(pnode)
 
       # If the instance is non-redundant we cannot survive losing its primary
         inst_nodes_offline.append(pnode)
 
       # If the instance is non-redundant we cannot survive losing its primary
@@ -1531,44 +1810,42 @@ class LUVerifyCluster(LogicalUnit):
       # templates with more than one secondary so that situation is not well
       # supported either.
       # FIXME: does not support file-backed instances
       # templates with more than one secondary so that situation is not well
       # supported either.
       # FIXME: does not support file-backed instances
-      if len(inst_config.secondary_nodes) == 0:
+      if not inst_config.secondary_nodes:
         i_non_redundant.append(instance)
         i_non_redundant.append(instance)
-      _ErrorIf(len(inst_config.secondary_nodes) > 1,
-               self.EINSTANCELAYOUT, instance,
-               "instance has multiple secondary nodes", code="WARNING")
+      _ErrorIf(len(inst_config.secondary_nodes) > 1, self.EINSTANCELAYOUT,
+               instance, "instance has multiple secondary nodes: %s",
+               utils.CommaJoin(inst_config.secondary_nodes),
+               code=self.ETYPE_WARNING)
 
       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
         i_non_a_balanced.append(instance)
 
       for snode in inst_config.secondary_nodes:
 
       if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
         i_non_a_balanced.append(instance)
 
       for snode in inst_config.secondary_nodes:
-        _ErrorIf(snode not in node_info and snode not in n_offline,
-                 self.ENODERPC, snode,
-                 "instance %s, connection to secondary node"
-                 "failed", instance)
-
-        if snode in node_info:
-          node_info[snode]['sinst'].append(instance)
-          if pnode not in node_info[snode]['sinst-by-pnode']:
-            node_info[snode]['sinst-by-pnode'][pnode] = []
-          node_info[snode]['sinst-by-pnode'][pnode].append(instance)
-
-        if snode in n_offline:
+        s_img = node_image[snode]
+        _ErrorIf(s_img.rpc_fail and not s_img.offline, self.ENODERPC, snode,
+                 "instance %s, connection to secondary node failed", instance)
+
+        if s_img.offline:
           inst_nodes_offline.append(snode)
 
       # warn that the instance lives on offline nodes
       _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
                "instance lives on offline node(s) %s",
                utils.CommaJoin(inst_nodes_offline))
           inst_nodes_offline.append(snode)
 
       # warn that the instance lives on offline nodes
       _ErrorIf(inst_nodes_offline, self.EINSTANCEBADNODE, instance,
                "instance lives on offline node(s) %s",
                utils.CommaJoin(inst_nodes_offline))
+      # ... or ghost nodes
+      for node in inst_config.all_nodes:
+        _ErrorIf(node_image[node].ghost, self.EINSTANCEBADNODE, instance,
+                 "instance lives on ghost node %s", node)
 
     feedback_fn("* Verifying orphan volumes")
 
     feedback_fn("* Verifying orphan volumes")
-    self._VerifyOrphanVolumes(node_vol_should, node_volume)
+    self._VerifyOrphanVolumes(node_vol_should, node_image)
 
 
-    feedback_fn("* Verifying remaining instances")
-    self._VerifyOrphanInstances(instancelist, node_instance)
+    feedback_fn("* Verifying oprhan instances")
+    self._VerifyOrphanInstances(instancelist, node_image)
 
     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
       feedback_fn("* Verifying N+1 Memory redundancy")
 
     if constants.VERIFY_NPLUSONE_MEM not in self.skip_set:
       feedback_fn("* Verifying N+1 Memory redundancy")
-      self._VerifyNPlusOneMemory(node_info, instance_cfg)
+      self._VerifyNPlusOneMemory(node_image, instanceinfo)
 
     feedback_fn("* Other Notes")
     if i_non_redundant:
 
     feedback_fn("* Other Notes")
     if i_non_redundant:
@@ -1580,10 +1857,10 @@ class LUVerifyCluster(LogicalUnit):
                   % len(i_non_a_balanced))
 
     if n_offline:
                   % len(i_non_a_balanced))
 
     if n_offline:
-      feedback_fn("  - NOTICE: %d offline node(s) found." % len(n_offline))
+      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
 
     if n_drained:
 
     if n_drained:
-      feedback_fn("  - NOTICE: %d drained node(s) found." % len(n_drained))
+      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
 
     return not self.bad
 
 
     return not self.bad
 
@@ -1950,8 +2227,11 @@ class LUSetClusterParams(LogicalUnit):
     """Check parameters
 
     """
     """Check parameters
 
     """
-    if not hasattr(self.op, "candidate_pool_size"):
-      self.op.candidate_pool_size = None
+    for attr in ["candidate_pool_size",
+                 "uid_pool", "add_uids", "remove_uids"]:
+      if not hasattr(self.op, attr):
+        setattr(self.op, attr, None)
+
     if self.op.candidate_pool_size is not None:
       try:
         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
     if self.op.candidate_pool_size is not None:
       try:
         self.op.candidate_pool_size = int(self.op.candidate_pool_size)
@@ -1962,6 +2242,17 @@ class LUSetClusterParams(LogicalUnit):
         raise errors.OpPrereqError("At least one master candidate needed",
                                    errors.ECODE_INVAL)
 
         raise errors.OpPrereqError("At least one master candidate needed",
                                    errors.ECODE_INVAL)
 
+    _CheckBooleanOpField(self.op, "maintain_node_health")
+
+    if self.op.uid_pool:
+      uidpool.CheckUidPool(self.op.uid_pool)
+
+    if self.op.add_uids:
+      uidpool.CheckUidPool(self.op.add_uids)
+
+    if self.op.remove_uids:
+      uidpool.CheckUidPool(self.op.remove_uids)
+
   def ExpandNames(self):
     # FIXME: in the future maybe other cluster params won't require checking on
     # all nodes to be modified.
   def ExpandNames(self):
     # FIXME: in the future maybe other cluster params won't require checking on
     # all nodes to be modified.
@@ -2053,7 +2344,7 @@ class LUSetClusterParams(LogicalUnit):
                                    "\n".join(nic_errors))
 
     # hypervisor list/parameters
                                    "\n".join(nic_errors))
 
     # hypervisor list/parameters
-    self.new_hvparams = objects.FillDict(cluster.hvparams, {})
+    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
     if self.op.hvparams:
       if not isinstance(self.op.hvparams, dict):
         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
     if self.op.hvparams:
       if not isinstance(self.op.hvparams, dict):
         raise errors.OpPrereqError("Invalid 'hvparams' parameter on input",
@@ -2083,6 +2374,7 @@ class LUSetClusterParams(LogicalUnit):
             else:
               self.new_os_hvp[os_name][hv_name].update(hv_dict)
 
             else:
               self.new_os_hvp[os_name][hv_name].update(hv_dict)
 
+    # changes to the hypervisor list
     if self.op.enabled_hypervisors is not None:
       self.hv_list = self.op.enabled_hypervisors
       if not self.hv_list:
     if self.op.enabled_hypervisors is not None:
       self.hv_list = self.op.enabled_hypervisors
       if not self.hv_list:
@@ -2095,6 +2387,16 @@ class LUSetClusterParams(LogicalUnit):
                                    " entries: %s" %
                                    utils.CommaJoin(invalid_hvs),
                                    errors.ECODE_INVAL)
                                    " entries: %s" %
                                    utils.CommaJoin(invalid_hvs),
                                    errors.ECODE_INVAL)
+      for hv in self.hv_list:
+        # if the hypervisor doesn't already exist in the cluster
+        # hvparams, we initialize it to empty, and then (in both
+        # cases) we make sure to fill the defaults, as we might not
+        # have a complete defaults list if the hypervisor wasn't
+        # enabled before
+        if hv not in new_hvp:
+          new_hvp[hv] = {}
+        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
+        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
     else:
       self.hv_list = cluster.enabled_hypervisors
 
     else:
       self.hv_list = cluster.enabled_hypervisors
 
@@ -2110,6 +2412,20 @@ class LUSetClusterParams(LogicalUnit):
           hv_class.CheckParameterSyntax(hv_params)
           _CheckHVParams(self, node_list, hv_name, hv_params)
 
           hv_class.CheckParameterSyntax(hv_params)
           _CheckHVParams(self, node_list, hv_name, hv_params)
 
+    if self.op.os_hvp:
+      # no need to check any newly-enabled hypervisors, since the
+      # defaults have already been checked in the above code-block
+      for os_name, os_hvp in self.new_os_hvp.items():
+        for hv_name, hv_params in os_hvp.items():
+          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
+          # we need to fill in the new os_hvp on top of the actual hv_p
+          cluster_defaults = self.new_hvparams.get(hv_name, {})
+          new_osp = objects.FillDict(cluster_defaults, hv_params)
+          hv_class = hypervisor.GetHypervisor(hv_name)
+          hv_class.CheckParameterSyntax(new_osp)
+          _CheckHVParams(self, node_list, hv_name, new_osp)
+
+
   def Exec(self, feedback_fn):
     """Change the parameters of the cluster.
 
   def Exec(self, feedback_fn):
     """Change the parameters of the cluster.
 
@@ -2128,6 +2444,7 @@ class LUSetClusterParams(LogicalUnit):
     if self.op.os_hvp:
       self.cluster.os_hvp = self.new_os_hvp
     if self.op.enabled_hypervisors is not None:
     if self.op.os_hvp:
       self.cluster.os_hvp = self.new_os_hvp
     if self.op.enabled_hypervisors is not None:
+      self.cluster.hvparams = self.new_hvparams
       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
     if self.op.beparams:
       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
     if self.op.beparams:
       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
@@ -2139,6 +2456,18 @@ class LUSetClusterParams(LogicalUnit):
       # we need to update the pool size here, otherwise the save will fail
       _AdjustCandidatePool(self, [])
 
       # we need to update the pool size here, otherwise the save will fail
       _AdjustCandidatePool(self, [])
 
+    if self.op.maintain_node_health is not None:
+      self.cluster.maintain_node_health = self.op.maintain_node_health
+
+    if self.op.add_uids is not None:
+      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
+
+    if self.op.remove_uids is not None:
+      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
+
+    if self.op.uid_pool is not None:
+      self.cluster.uid_pool = self.op.uid_pool
+
     self.cfg.Update(self.cluster, feedback_fn)
 
 
     self.cfg.Update(self.cluster, feedback_fn)
 
 
@@ -2166,7 +2495,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None):
                     constants.SSH_KNOWN_HOSTS_FILE,
                     constants.RAPI_CERT_FILE,
                     constants.RAPI_USERS_FILE,
                     constants.SSH_KNOWN_HOSTS_FILE,
                     constants.RAPI_CERT_FILE,
                     constants.RAPI_USERS_FILE,
-                    constants.HMAC_CLUSTER_KEY,
+                    constants.CONFD_HMAC_KEY,
                    ])
 
   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
                    ])
 
   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
@@ -2523,6 +2852,12 @@ class LURemoveNode(LogicalUnit):
       self.LogWarning("Errors encountered on the remote node while leaving"
                       " the cluster: %s", msg)
 
       self.LogWarning("Errors encountered on the remote node while leaving"
                       " the cluster: %s", msg)
 
+    # Remove node from our /etc/hosts
+    if self.cfg.GetClusterInfo().modify_etc_hosts:
+      # FIXME: this should be done via an rpc call to node daemon
+      utils.RemoveHostFromEtcHosts(node.name)
+      _RedistributeAncillaryFiles(self)
+
 
 class LUQueryNodes(NoHooksLU):
   """Logical unit for querying nodes.
 
 class LUQueryNodes(NoHooksLU):
   """Logical unit for querying nodes.
@@ -2779,17 +3114,14 @@ class LUQueryNodeStorage(NoHooksLU):
   REQ_BGL = False
   _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
 
   REQ_BGL = False
   _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
 
-  def ExpandNames(self):
-    storage_type = self.op.storage_type
-
-    if storage_type not in constants.VALID_STORAGE_TYPES:
-      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
-                                 errors.ECODE_INVAL)
+  def CheckArguments(self):
+    _CheckStorageType(self.op.storage_type)
 
     _CheckOutputFields(static=self._FIELDS_STATIC,
                        dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
                        selected=self.op.output_fields)
 
 
     _CheckOutputFields(static=self._FIELDS_STATIC,
                        dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
                        selected=self.op.output_fields)
 
+  def ExpandNames(self):
     self.needed_locks = {}
     self.share_locks[locking.LEVEL_NODE] = 1
 
     self.needed_locks = {}
     self.share_locks[locking.LEVEL_NODE] = 1
 
@@ -2878,10 +3210,7 @@ class LUModifyNodeStorage(NoHooksLU):
   def CheckArguments(self):
     self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
 
   def CheckArguments(self):
     self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
 
-    storage_type = self.op.storage_type
-    if storage_type not in constants.VALID_STORAGE_TYPES:
-      raise errors.OpPrereqError("Unknown storage type: %s" % storage_type,
-                                 errors.ECODE_INVAL)
+    _CheckStorageType(self.op.storage_type)
 
   def ExpandNames(self):
     self.needed_locks = {
 
   def ExpandNames(self):
     self.needed_locks = {
@@ -2982,15 +3311,19 @@ class LUAddNode(LogicalUnit):
       raise errors.OpPrereqError("Node %s is not in the configuration" % node,
                                  errors.ECODE_NOENT)
 
       raise errors.OpPrereqError("Node %s is not in the configuration" % node,
                                  errors.ECODE_NOENT)
 
+    self.changed_primary_ip = False
+
     for existing_node_name in node_list:
       existing_node = cfg.GetNodeInfo(existing_node_name)
 
       if self.op.readd and node == existing_node_name:
     for existing_node_name in node_list:
       existing_node = cfg.GetNodeInfo(existing_node_name)
 
       if self.op.readd and node == existing_node_name:
-        if (existing_node.primary_ip != primary_ip or
-            existing_node.secondary_ip != secondary_ip):
+        if existing_node.secondary_ip != secondary_ip:
           raise errors.OpPrereqError("Readded node doesn't have the same IP"
                                      " address configuration as before",
                                      errors.ECODE_INVAL)
           raise errors.OpPrereqError("Readded node doesn't have the same IP"
                                      " address configuration as before",
                                      errors.ECODE_INVAL)
+        if existing_node.primary_ip != primary_ip:
+          self.changed_primary_ip = True
+
         continue
 
       if (existing_node.primary_ip == primary_ip or
         continue
 
       if (existing_node.primary_ip == primary_ip or
@@ -3062,6 +3395,8 @@ class LUAddNode(LogicalUnit):
       self.LogInfo("Readding a node, the offline/drained flags were reset")
       # if we demote the node, we do cleanup later in the procedure
       new_node.master_candidate = self.master_candidate
       self.LogInfo("Readding a node, the offline/drained flags were reset")
       # if we demote the node, we do cleanup later in the procedure
       new_node.master_candidate = self.master_candidate
+      if self.changed_primary_ip:
+        new_node.primary_ip = self.op.primary_ip
 
     # notify the user about any possible mc promotion
     if new_node.master_candidate:
 
     # notify the user about any possible mc promotion
     if new_node.master_candidate:
@@ -3097,6 +3432,7 @@ class LUAddNode(LogicalUnit):
 
     # Add node to our /etc/hosts, and add key to known_hosts
     if self.cfg.GetClusterInfo().modify_etc_hosts:
 
     # Add node to our /etc/hosts, and add key to known_hosts
     if self.cfg.GetClusterInfo().modify_etc_hosts:
+      # FIXME: this should be done via an rpc call to node daemon
       utils.AddHostToEtcHosts(new_node.name)
 
     if new_node.secondary_ip != new_node.primary_ip:
       utils.AddHostToEtcHosts(new_node.name)
 
     if new_node.secondary_ip != new_node.primary_ip:
@@ -3225,7 +3561,7 @@ class LUSetNodeParams(LogicalUnit):
       # candidates
       (mc_remaining, mc_should, _) = \
           self.cfg.GetMasterCandidateStats(exceptions=[node.name])
       # candidates
       (mc_remaining, mc_should, _) = \
           self.cfg.GetMasterCandidateStats(exceptions=[node.name])
-      if mc_remaining != mc_should:
+      if mc_remaining < mc_should:
         raise errors.OpPrereqError("Not enough master candidates, please"
                                    " pass auto_promote to allow promotion",
                                    errors.ECODE_INVAL)
         raise errors.OpPrereqError("Not enough master candidates, please"
                                    " pass auto_promote to allow promotion",
                                    errors.ECODE_INVAL)
@@ -3398,10 +3734,12 @@ class LUQueryClusterInfo(NoHooksLU):
       "master_netdev": cluster.master_netdev,
       "volume_group_name": cluster.volume_group_name,
       "file_storage_dir": cluster.file_storage_dir,
       "master_netdev": cluster.master_netdev,
       "volume_group_name": cluster.volume_group_name,
       "file_storage_dir": cluster.file_storage_dir,
+      "maintain_node_health": cluster.maintain_node_health,
       "ctime": cluster.ctime,
       "mtime": cluster.mtime,
       "uuid": cluster.uuid,
       "tags": list(cluster.GetTags()),
       "ctime": cluster.ctime,
       "mtime": cluster.mtime,
       "uuid": cluster.uuid,
       "tags": list(cluster.GetTags()),
+      "uid_pool": cluster.uid_pool,
       }
 
     return result
       }
 
     return result
@@ -3632,14 +3970,7 @@ def _SafeShutdownInstanceDisks(lu, instance):
   _ShutdownInstanceDisks.
 
   """
   _ShutdownInstanceDisks.
 
   """
-  pnode = instance.primary_node
-  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
-  ins_l.Raise("Can't contact node %s" % pnode)
-
-  if instance.name in ins_l.payload:
-    raise errors.OpExecError("Instance is running, can't shutdown"
-                             " block devices.")
-
+  _CheckInstanceDown(lu, instance, "cannot shutdown disks")
   _ShutdownInstanceDisks(lu, instance)
 
 
   _ShutdownInstanceDisks(lu, instance)
 
 
@@ -3703,14 +4034,50 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
                                errors.ECODE_NORES)
 
 
                                errors.ECODE_NORES)
 
 
-class LUStartupInstance(LogicalUnit):
-  """Starts an instance.
+def _CheckNodesFreeDisk(lu, nodenames, requested):
+  """Checks if nodes have enough free disk space in the default VG.
 
 
-  """
-  HPATH = "instance-start"
-  HTYPE = constants.HTYPE_INSTANCE
-  _OP_REQP = ["instance_name", "force"]
-  REQ_BGL = False
+  This function check if all given nodes have the needed amount of
+  free disk. In case any node has less disk or we cannot get the
+  information from the node, this function raise an OpPrereqError
+  exception.
+
+  @type lu: C{LogicalUnit}
+  @param lu: a logical unit from which we get configuration data
+  @type nodenames: C{list}
+  @param nodenames: the list of node names to check
+  @type requested: C{int}
+  @param requested: the amount of disk in MiB to check for
+  @raise errors.OpPrereqError: if the node doesn't have enough disk, or
+      we cannot check the node
+
+  """
+  nodeinfo = lu.rpc.call_node_info(nodenames, lu.cfg.GetVGName(),
+                                   lu.cfg.GetHypervisorType())
+  for node in nodenames:
+    info = nodeinfo[node]
+    info.Raise("Cannot get current information from node %s" % node,
+               prereq=True, ecode=errors.ECODE_ENVIRON)
+    vg_free = info.payload.get("vg_free", None)
+    if not isinstance(vg_free, int):
+      raise errors.OpPrereqError("Can't compute free disk space on node %s,"
+                                 " result was '%s'" % (node, vg_free),
+                                 errors.ECODE_ENVIRON)
+    if requested > vg_free:
+      raise errors.OpPrereqError("Not enough disk space on target node %s:"
+                                 " required %d MiB, available %d MiB" %
+                                 (node, requested, vg_free),
+                                 errors.ECODE_NORES)
+
+
+class LUStartupInstance(LogicalUnit):
+  """Starts an instance.
+
+  """
+  HPATH = "instance-start"
+  HTYPE = constants.HTYPE_INSTANCE
+  _OP_REQP = ["instance_name", "force"]
+  REQ_BGL = False
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
@@ -3989,32 +4356,14 @@ class LUReinstallInstance(LogicalUnit):
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name,
                                  errors.ECODE_INVAL)
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name,
                                  errors.ECODE_INVAL)
-    if instance.admin_up:
-      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
-                                 self.op.instance_name,
-                                 errors.ECODE_STATE)
-    remote_info = self.rpc.call_instance_info(instance.primary_node,
-                                              instance.name,
-                                              instance.hypervisor)
-    remote_info.Raise("Error checking node %s" % instance.primary_node,
-                      prereq=True, ecode=errors.ECODE_ENVIRON)
-    if remote_info.payload:
-      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
-                                 (self.op.instance_name,
-                                  instance.primary_node),
-                                 errors.ECODE_STATE)
+    _CheckInstanceDown(self, instance, "cannot reinstall")
 
     self.op.os_type = getattr(self.op, "os_type", None)
     self.op.force_variant = getattr(self.op, "force_variant", False)
     if self.op.os_type is not None:
       # OS verification
       pnode = _ExpandNodeName(self.cfg, instance.primary_node)
 
     self.op.os_type = getattr(self.op, "os_type", None)
     self.op.force_variant = getattr(self.op, "force_variant", False)
     if self.op.os_type is not None:
       # OS verification
       pnode = _ExpandNodeName(self.cfg, instance.primary_node)
-      result = self.rpc.call_os_get(pnode, self.op.os_type)
-      result.Raise("OS '%s' not in supported OS list for primary node %s" %
-                   (self.op.os_type, pnode),
-                   prereq=True, ecode=errors.ECODE_INVAL)
-      if not self.op.force_variant:
-        _CheckOSVariant(result.payload, self.op.os_type)
+      _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant)
 
     self.instance = instance
 
 
     self.instance = instance
 
@@ -4089,18 +4438,7 @@ class LURecreateInstanceDisks(LogicalUnit):
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name, errors.ECODE_INVAL)
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name, errors.ECODE_INVAL)
-    if instance.admin_up:
-      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
-                                 self.op.instance_name, errors.ECODE_STATE)
-    remote_info = self.rpc.call_instance_info(instance.primary_node,
-                                              instance.name,
-                                              instance.hypervisor)
-    remote_info.Raise("Error checking node %s" % instance.primary_node,
-                      prereq=True, ecode=errors.ECODE_ENVIRON)
-    if remote_info.payload:
-      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
-                                 (self.op.instance_name,
-                                  instance.primary_node), errors.ECODE_STATE)
+    _CheckInstanceDown(self, instance, "cannot recreate disks")
 
     if not self.op.disks:
       self.op.disks = range(len(instance.disks))
 
     if not self.op.disks:
       self.op.disks = range(len(instance.disks))
@@ -4155,19 +4493,7 @@ class LURenameInstance(LogicalUnit):
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None
     _CheckNodeOnline(self, instance.primary_node)
     instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert instance is not None
     _CheckNodeOnline(self, instance.primary_node)
-
-    if instance.admin_up:
-      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
-                                 self.op.instance_name, errors.ECODE_STATE)
-    remote_info = self.rpc.call_instance_info(instance.primary_node,
-                                              instance.name,
-                                              instance.hypervisor)
-    remote_info.Raise("Error checking node %s" % instance.primary_node,
-                      prereq=True, ecode=errors.ECODE_ENVIRON)
-    if remote_info.payload:
-      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
-                                 (self.op.instance_name,
-                                  instance.primary_node), errors.ECODE_STATE)
+    _CheckInstanceDown(self, instance, "cannot rename")
     self.instance = instance
 
     # new name verification
     self.instance = instance
 
     # new name verification
@@ -4294,18 +4620,29 @@ class LURemoveInstance(LogicalUnit):
                                  " node %s: %s" %
                                  (instance.name, instance.primary_node, msg))
 
                                  " node %s: %s" %
                                  (instance.name, instance.primary_node, msg))
 
-    logging.info("Removing block devices for instance %s", instance.name)
+    _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
 
 
-    if not _RemoveDisks(self, instance):
-      if self.op.ignore_failures:
-        feedback_fn("Warning: can't remove instance's disks")
-      else:
-        raise errors.OpExecError("Can't remove instance's disks")
 
 
-    logging.info("Removing instance %s out of cluster config", instance.name)
+def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
+  """Utility function to remove an instance.
+
+  """
+  logging.info("Removing block devices for instance %s", instance.name)
+
+  if not _RemoveDisks(lu, instance):
+    if not ignore_failures:
+      raise errors.OpExecError("Can't remove instance's disks")
+    feedback_fn("Warning: can't remove instance's disks")
+
+  logging.info("Removing instance %s out of cluster config", instance.name)
 
 
-    self.cfg.RemoveInstance(instance.name)
-    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
+  lu.cfg.RemoveInstance(instance.name)
+
+  assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \
+    "Instance lock removal conflict"
+
+  # Remove lock for the instance
+  lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
 
 
 class LUQueryInstances(NoHooksLU):
 
 
 class LUQueryInstances(NoHooksLU):
@@ -5519,6 +5856,8 @@ def _GenerateDiskTemplate(lu, template_name,
     if len(secondary_nodes) != 0:
       raise errors.ProgrammerError("Wrong template configuration")
 
     if len(secondary_nodes) != 0:
       raise errors.ProgrammerError("Wrong template configuration")
 
+    _RequireFileStorage()
+
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
       disk_dev = objects.Disk(dev_type=constants.LD_FILE, size=disk["size"],
@@ -5687,7 +6026,7 @@ class LUCreateInstance(LogicalUnit):
   """
   HPATH = "instance-add"
   HTYPE = constants.HTYPE_INSTANCE
   """
   HPATH = "instance-add"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_REQP = ["instance_name", "disks", "disk_template",
+  _OP_REQP = ["instance_name", "disks",
               "mode", "start",
               "wait_for_sync", "ip_check", "nics",
               "hvparams", "beparams"]
               "mode", "start",
               "wait_for_sync", "ip_check", "nics",
               "hvparams", "beparams"]
@@ -5697,35 +6036,50 @@ class LUCreateInstance(LogicalUnit):
     """Check arguments.
 
     """
     """Check arguments.
 
     """
+    # set optional parameters to none if they don't exist
+    for attr in ["pnode", "snode", "iallocator", "hypervisor",
+                 "disk_template", "identify_defaults"]:
+      if not hasattr(self.op, attr):
+        setattr(self.op, attr, None)
+
     # do not require name_check to ease forward/backward compatibility
     # for tools
     if not hasattr(self.op, "name_check"):
       self.op.name_check = True
     # do not require name_check to ease forward/backward compatibility
     # for tools
     if not hasattr(self.op, "name_check"):
       self.op.name_check = True
+    if not hasattr(self.op, "no_install"):
+      self.op.no_install = False
+    if self.op.no_install and self.op.start:
+      self.LogInfo("No-installation mode selected, disabling startup")
+      self.op.start = False
     # validate/normalize the instance name
     self.op.instance_name = utils.HostInfo.NormalizeName(self.op.instance_name)
     if self.op.ip_check and not self.op.name_check:
       # TODO: make the ip check more flexible and not depend on the name check
       raise errors.OpPrereqError("Cannot do ip checks without a name check",
                                  errors.ECODE_INVAL)
     # validate/normalize the instance name
     self.op.instance_name = utils.HostInfo.NormalizeName(self.op.instance_name)
     if self.op.ip_check and not self.op.name_check:
       # TODO: make the ip check more flexible and not depend on the name check
       raise errors.OpPrereqError("Cannot do ip checks without a name check",
                                  errors.ECODE_INVAL)
-    if (self.op.disk_template == constants.DT_FILE and
-        not constants.ENABLE_FILE_STORAGE):
-      raise errors.OpPrereqError("File storage disabled at configure time",
+    # check disk information: either all adopt, or no adopt
+    has_adopt = has_no_adopt = False
+    for disk in self.op.disks:
+      if "adopt" in disk:
+        has_adopt = True
+      else:
+        has_no_adopt = True
+    if has_adopt and has_no_adopt:
+      raise errors.OpPrereqError("Either all disks are adopted or none is",
                                  errors.ECODE_INVAL)
                                  errors.ECODE_INVAL)
+    if has_adopt:
+      if self.op.disk_template != constants.DT_PLAIN:
+        raise errors.OpPrereqError("Disk adoption is only supported for the"
+                                   " 'plain' disk template",
+                                   errors.ECODE_INVAL)
+      if self.op.iallocator is not None:
+        raise errors.OpPrereqError("Disk adoption not allowed with an"
+                                   " iallocator script", errors.ECODE_INVAL)
+      if self.op.mode == constants.INSTANCE_IMPORT:
+        raise errors.OpPrereqError("Disk adoption not allowed for"
+                                   " instance import", errors.ECODE_INVAL)
 
 
-  def ExpandNames(self):
-    """ExpandNames for CreateInstance.
-
-    Figure out the right locks for instance creation.
-
-    """
-    self.needed_locks = {}
-
-    # set optional parameters to none if they don't exist
-    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
-      if not hasattr(self.op, attr):
-        setattr(self.op, attr, None)
-
-    # cheap checks, mostly valid constants given
+    self.adopt_disks = has_adopt
 
     # verify creation mode
     if self.op.mode not in (constants.INSTANCE_CREATE,
 
     # verify creation mode
     if self.op.mode not in (constants.INSTANCE_CREATE,
@@ -5733,145 +6087,15 @@ class LUCreateInstance(LogicalUnit):
       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
                                  self.op.mode, errors.ECODE_INVAL)
 
       raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
                                  self.op.mode, errors.ECODE_INVAL)
 
-    # disk template and mirror node verification
-    if self.op.disk_template not in constants.DISK_TEMPLATES:
-      raise errors.OpPrereqError("Invalid disk template name",
-                                 errors.ECODE_INVAL)
-
-    if self.op.hypervisor is None:
-      self.op.hypervisor = self.cfg.GetHypervisorType()
-
-    cluster = self.cfg.GetClusterInfo()
-    enabled_hvs = cluster.enabled_hypervisors
-    if self.op.hypervisor not in enabled_hvs:
-      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
-                                 " cluster (%s)" % (self.op.hypervisor,
-                                  ",".join(enabled_hvs)),
-                                 errors.ECODE_STATE)
-
-    # check hypervisor parameter syntax (locally)
-    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
-    filled_hvp = objects.FillDict(cluster.hvparams[self.op.hypervisor],
-                                  self.op.hvparams)
-    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
-    hv_type.CheckParameterSyntax(filled_hvp)
-    self.hv_full = filled_hvp
-    # check that we don't specify global parameters on an instance
-    _CheckGlobalHvParams(self.op.hvparams)
-
-    # fill and remember the beparams dict
-    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
-    self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
-                                    self.op.beparams)
-
-    #### instance parameters check
-
     # instance name verification
     if self.op.name_check:
     # instance name verification
     if self.op.name_check:
-      hostname1 = utils.GetHostInfo(self.op.instance_name)
-      self.op.instance_name = instance_name = hostname1.name
+      self.hostname1 = utils.GetHostInfo(self.op.instance_name)
+      self.op.instance_name = self.hostname1.name
       # used in CheckPrereq for ip ping check
       # used in CheckPrereq for ip ping check
-      self.check_ip = hostname1.ip
+      self.check_ip = self.hostname1.ip
     else:
     else:
-      instance_name = self.op.instance_name
       self.check_ip = None
 
       self.check_ip = None
 
-    # this is just a preventive check, but someone might still add this
-    # instance in the meantime, and creation will fail at lock-add time
-    if instance_name in self.cfg.GetInstanceList():
-      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
-                                 instance_name, errors.ECODE_EXISTS)
-
-    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
-
-    # NIC buildup
-    self.nics = []
-    for idx, nic in enumerate(self.op.nics):
-      nic_mode_req = nic.get("mode", None)
-      nic_mode = nic_mode_req
-      if nic_mode is None:
-        nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
-
-      # in routed mode, for the first nic, the default ip is 'auto'
-      if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
-        default_ip_mode = constants.VALUE_AUTO
-      else:
-        default_ip_mode = constants.VALUE_NONE
-
-      # ip validity checks
-      ip = nic.get("ip", default_ip_mode)
-      if ip is None or ip.lower() == constants.VALUE_NONE:
-        nic_ip = None
-      elif ip.lower() == constants.VALUE_AUTO:
-        if not self.op.name_check:
-          raise errors.OpPrereqError("IP address set to auto but name checks"
-                                     " have been skipped. Aborting.",
-                                     errors.ECODE_INVAL)
-        nic_ip = hostname1.ip
-      else:
-        if not utils.IsValidIP(ip):
-          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
-                                     " like a valid IP" % ip,
-                                     errors.ECODE_INVAL)
-        nic_ip = ip
-
-      # TODO: check the ip address for uniqueness
-      if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
-        raise errors.OpPrereqError("Routed nic mode requires an ip address",
-                                   errors.ECODE_INVAL)
-
-      # MAC address verification
-      mac = nic.get("mac", constants.VALUE_AUTO)
-      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
-        mac = utils.NormalizeAndValidateMac(mac)
-
-        try:
-          self.cfg.ReserveMAC(mac, self.proc.GetECId())
-        except errors.ReservationError:
-          raise errors.OpPrereqError("MAC address %s already in use"
-                                     " in cluster" % mac,
-                                     errors.ECODE_NOTUNIQUE)
-
-      # bridge verification
-      bridge = nic.get("bridge", None)
-      link = nic.get("link", None)
-      if bridge and link:
-        raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
-                                   " at the same time", errors.ECODE_INVAL)
-      elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
-        raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
-                                   errors.ECODE_INVAL)
-      elif bridge:
-        link = bridge
-
-      nicparams = {}
-      if nic_mode_req:
-        nicparams[constants.NIC_MODE] = nic_mode_req
-      if link:
-        nicparams[constants.NIC_LINK] = link
-
-      check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
-                                      nicparams)
-      objects.NIC.CheckParameterSyntax(check_params)
-      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
-
-    # disk checks/pre-build
-    self.disks = []
-    for disk in self.op.disks:
-      mode = disk.get("mode", constants.DISK_RDWR)
-      if mode not in constants.DISK_ACCESS_SET:
-        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
-                                   mode, errors.ECODE_INVAL)
-      size = disk.get("size", None)
-      if size is None:
-        raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
-      try:
-        size = int(size)
-      except (TypeError, ValueError):
-        raise errors.OpPrereqError("Invalid disk size '%s'" % size,
-                                   errors.ECODE_INVAL)
-      self.disks.append({"size": size, "mode": mode})
-
     # file storage checks
     if (self.op.file_driver and
         not self.op.file_driver in constants.FILE_DRIVER):
     # file storage checks
     if (self.op.file_driver and
         not self.op.file_driver in constants.FILE_DRIVER):
@@ -5888,6 +6112,41 @@ class LUCreateInstance(LogicalUnit):
                                  " node must be given",
                                  errors.ECODE_INVAL)
 
                                  " node must be given",
                                  errors.ECODE_INVAL)
 
+    if self.op.mode == constants.INSTANCE_IMPORT:
+      # On import force_variant must be True, because if we forced it at
+      # initial install, our only chance when importing it back is that it
+      # works again!
+      self.op.force_variant = True
+
+      if self.op.no_install:
+        self.LogInfo("No-installation mode has no effect during import")
+
+    else: # INSTANCE_CREATE
+      if getattr(self.op, "os_type", None) is None:
+        raise errors.OpPrereqError("No guest OS specified",
+                                   errors.ECODE_INVAL)
+      self.op.force_variant = getattr(self.op, "force_variant", False)
+      if self.op.disk_template is None:
+        raise errors.OpPrereqError("No disk template specified",
+                                   errors.ECODE_INVAL)
+
+  def ExpandNames(self):
+    """ExpandNames for CreateInstance.
+
+    Figure out the right locks for instance creation.
+
+    """
+    self.needed_locks = {}
+
+    instance_name = self.op.instance_name
+    # this is just a preventive check, but someone might still add this
+    # instance in the meantime, and creation will fail at lock-add time
+    if instance_name in self.cfg.GetInstanceList():
+      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
+                                 instance_name, errors.ECODE_EXISTS)
+
+    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
+
     if self.op.iallocator:
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
     else:
     if self.op.iallocator:
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
     else:
@@ -5921,17 +6180,6 @@ class LUCreateInstance(LogicalUnit):
           self.op.src_path = src_path = \
             utils.PathJoin(constants.EXPORT_DIR, src_path)
 
           self.op.src_path = src_path = \
             utils.PathJoin(constants.EXPORT_DIR, src_path)
 
-      # On import force_variant must be True, because if we forced it at
-      # initial install, our only chance when importing it back is that it
-      # works again!
-      self.op.force_variant = True
-
-    else: # INSTANCE_CREATE
-      if getattr(self.op, "os_type", None) is None:
-        raise errors.OpPrereqError("No guest OS specified",
-                                   errors.ECODE_INVAL)
-      self.op.force_variant = getattr(self.op, "force_variant", False)
-
   def _RunAllocator(self):
     """Run the allocator based on input opcode.
 
   def _RunAllocator(self):
     """Run the allocator based on input opcode.
 
@@ -5972,82 +6220,309 @@ class LUCreateInstance(LogicalUnit):
   def BuildHooksEnv(self):
     """Build hooks env.
 
   def BuildHooksEnv(self):
     """Build hooks env.
 
-    This runs on master, primary and secondary nodes of the instance.
+    This runs on master, primary and secondary nodes of the instance.
+
+    """
+    env = {
+      "ADD_MODE": self.op.mode,
+      }
+    if self.op.mode == constants.INSTANCE_IMPORT:
+      env["SRC_NODE"] = self.op.src_node
+      env["SRC_PATH"] = self.op.src_path
+      env["SRC_IMAGES"] = self.src_images
+
+    env.update(_BuildInstanceHookEnv(
+      name=self.op.instance_name,
+      primary_node=self.op.pnode,
+      secondary_nodes=self.secondaries,
+      status=self.op.start,
+      os_type=self.op.os_type,
+      memory=self.be_full[constants.BE_MEMORY],
+      vcpus=self.be_full[constants.BE_VCPUS],
+      nics=_NICListToTuple(self, self.nics),
+      disk_template=self.op.disk_template,
+      disks=[(d["size"], d["mode"]) for d in self.disks],
+      bep=self.be_full,
+      hvp=self.hv_full,
+      hypervisor_name=self.op.hypervisor,
+    ))
+
+    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
+          self.secondaries)
+    return env, nl, nl
+
+  def _ReadExportInfo(self):
+    """Reads the export information from disk.
+
+    It will override the opcode source node and path with the actual
+    information, if these two were not specified before.
+
+    @return: the export information
+
+    """
+    assert self.op.mode == constants.INSTANCE_IMPORT
+
+    src_node = self.op.src_node
+    src_path = self.op.src_path
+
+    if src_node is None:
+      locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
+      exp_list = self.rpc.call_export_list(locked_nodes)
+      found = False
+      for node in exp_list:
+        if exp_list[node].fail_msg:
+          continue
+        if src_path in exp_list[node].payload:
+          found = True
+          self.op.src_node = src_node = node
+          self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
+                                                       src_path)
+          break
+      if not found:
+        raise errors.OpPrereqError("No export found for relative path %s" %
+                                    src_path, errors.ECODE_INVAL)
+
+    _CheckNodeOnline(self, src_node)
+    result = self.rpc.call_export_info(src_node, src_path)
+    result.Raise("No export or invalid export found in dir %s" % src_path)
+
+    export_info = objects.SerializableConfigParser.Loads(str(result.payload))
+    if not export_info.has_section(constants.INISECT_EXP):
+      raise errors.ProgrammerError("Corrupted export config",
+                                   errors.ECODE_ENVIRON)
+
+    ei_version = export_info.get(constants.INISECT_EXP, "version")
+    if (int(ei_version) != constants.EXPORT_VERSION):
+      raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
+                                 (ei_version, constants.EXPORT_VERSION),
+                                 errors.ECODE_ENVIRON)
+    return export_info
+
+  def _ReadExportParams(self, einfo):
+    """Use export parameters as defaults.
+
+    In case the opcode doesn't specify (as in override) some instance
+    parameters, then try to use them from the export information, if
+    that declares them.
+
+    """
+    self.op.os_type = einfo.get(constants.INISECT_EXP, "os")
+
+    if self.op.disk_template is None:
+      if einfo.has_option(constants.INISECT_INS, "disk_template"):
+        self.op.disk_template = einfo.get(constants.INISECT_INS,
+                                          "disk_template")
+      else:
+        raise errors.OpPrereqError("No disk template specified and the export"
+                                   " is missing the disk_template information",
+                                   errors.ECODE_INVAL)
+
+    if not self.op.disks:
+      if einfo.has_option(constants.INISECT_INS, "disk_count"):
+        disks = []
+        # TODO: import the disk iv_name too
+        for idx in range(einfo.getint(constants.INISECT_INS, "disk_count")):
+          disk_sz = einfo.getint(constants.INISECT_INS, "disk%d_size" % idx)
+          disks.append({"size": disk_sz})
+        self.op.disks = disks
+      else:
+        raise errors.OpPrereqError("No disk info specified and the export"
+                                   " is missing the disk information",
+                                   errors.ECODE_INVAL)
+
+    if (not self.op.nics and
+        einfo.has_option(constants.INISECT_INS, "nic_count")):
+      nics = []
+      for idx in range(einfo.getint(constants.INISECT_INS, "nic_count")):
+        ndict = {}
+        for name in list(constants.NICS_PARAMETERS) + ["ip", "mac"]:
+          v = einfo.get(constants.INISECT_INS, "nic%d_%s" % (idx, name))
+          ndict[name] = v
+        nics.append(ndict)
+      self.op.nics = nics
+
+    if (self.op.hypervisor is None and
+        einfo.has_option(constants.INISECT_INS, "hypervisor")):
+      self.op.hypervisor = einfo.get(constants.INISECT_INS, "hypervisor")
+    if einfo.has_section(constants.INISECT_HYP):
+      # use the export parameters but do not override the ones
+      # specified by the user
+      for name, value in einfo.items(constants.INISECT_HYP):
+        if name not in self.op.hvparams:
+          self.op.hvparams[name] = value
+
+    if einfo.has_section(constants.INISECT_BEP):
+      # use the parameters, without overriding
+      for name, value in einfo.items(constants.INISECT_BEP):
+        if name not in self.op.beparams:
+          self.op.beparams[name] = value
+    else:
+      # try to read the parameters old style, from the main section
+      for name in constants.BES_PARAMETERS:
+        if (name not in self.op.beparams and
+            einfo.has_option(constants.INISECT_INS, name)):
+          self.op.beparams[name] = einfo.get(constants.INISECT_INS, name)
+
+  def _RevertToDefaults(self, cluster):
+    """Revert the instance parameters to the default values.
+
+    """
+    # hvparams
+    hv_defs = cluster.GetHVDefaults(self.op.hypervisor, self.op.os_type)
+    for name in self.op.hvparams.keys():
+      if name in hv_defs and hv_defs[name] == self.op.hvparams[name]:
+        del self.op.hvparams[name]
+    # beparams
+    be_defs = cluster.beparams.get(constants.PP_DEFAULT, {})
+    for name in self.op.beparams.keys():
+      if name in be_defs and be_defs[name] == self.op.beparams[name]:
+        del self.op.beparams[name]
+    # nic params
+    nic_defs = cluster.nicparams.get(constants.PP_DEFAULT, {})
+    for nic in self.op.nics:
+      for name in constants.NICS_PARAMETERS:
+        if name in nic and name in nic_defs and nic[name] == nic_defs[name]:
+          del nic[name]
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    if self.op.mode == constants.INSTANCE_IMPORT:
+      export_info = self._ReadExportInfo()
+      self._ReadExportParams(export_info)
+
+    _CheckDiskTemplate(self.op.disk_template)
+
+    if (not self.cfg.GetVGName() and
+        self.op.disk_template not in constants.DTS_NOT_LVM):
+      raise errors.OpPrereqError("Cluster does not support lvm-based"
+                                 " instances", errors.ECODE_STATE)
+
+    if self.op.hypervisor is None:
+      self.op.hypervisor = self.cfg.GetHypervisorType()
+
+    cluster = self.cfg.GetClusterInfo()
+    enabled_hvs = cluster.enabled_hypervisors
+    if self.op.hypervisor not in enabled_hvs:
+      raise errors.OpPrereqError("Selected hypervisor (%s) not enabled in the"
+                                 " cluster (%s)" % (self.op.hypervisor,
+                                  ",".join(enabled_hvs)),
+                                 errors.ECODE_STATE)
+
+    # check hypervisor parameter syntax (locally)
+    utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
+    filled_hvp = objects.FillDict(cluster.GetHVDefaults(self.op.hypervisor,
+                                                        self.op.os_type),
+                                  self.op.hvparams)
+    hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
+    hv_type.CheckParameterSyntax(filled_hvp)
+    self.hv_full = filled_hvp
+    # check that we don't specify global parameters on an instance
+    _CheckGlobalHvParams(self.op.hvparams)
+
+    # fill and remember the beparams dict
+    utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
+    self.be_full = objects.FillDict(cluster.beparams[constants.PP_DEFAULT],
+                                    self.op.beparams)
+
+    # now that hvp/bep are in final format, let's reset to defaults,
+    # if told to do so
+    if self.op.identify_defaults:
+      self._RevertToDefaults(cluster)
+
+    # NIC buildup
+    self.nics = []
+    for idx, nic in enumerate(self.op.nics):
+      nic_mode_req = nic.get("mode", None)
+      nic_mode = nic_mode_req
+      if nic_mode is None:
+        nic_mode = cluster.nicparams[constants.PP_DEFAULT][constants.NIC_MODE]
+
+      # in routed mode, for the first nic, the default ip is 'auto'
+      if nic_mode == constants.NIC_MODE_ROUTED and idx == 0:
+        default_ip_mode = constants.VALUE_AUTO
+      else:
+        default_ip_mode = constants.VALUE_NONE
+
+      # ip validity checks
+      ip = nic.get("ip", default_ip_mode)
+      if ip is None or ip.lower() == constants.VALUE_NONE:
+        nic_ip = None
+      elif ip.lower() == constants.VALUE_AUTO:
+        if not self.op.name_check:
+          raise errors.OpPrereqError("IP address set to auto but name checks"
+                                     " have been skipped. Aborting.",
+                                     errors.ECODE_INVAL)
+        nic_ip = self.hostname1.ip
+      else:
+        if not utils.IsValidIP(ip):
+          raise errors.OpPrereqError("Given IP address '%s' doesn't look"
+                                     " like a valid IP" % ip,
+                                     errors.ECODE_INVAL)
+        nic_ip = ip
+
+      # TODO: check the ip address for uniqueness
+      if nic_mode == constants.NIC_MODE_ROUTED and not nic_ip:
+        raise errors.OpPrereqError("Routed nic mode requires an ip address",
+                                   errors.ECODE_INVAL)
 
 
-    """
-    env = {
-      "ADD_MODE": self.op.mode,
-      }
-    if self.op.mode == constants.INSTANCE_IMPORT:
-      env["SRC_NODE"] = self.op.src_node
-      env["SRC_PATH"] = self.op.src_path
-      env["SRC_IMAGES"] = self.src_images
+      # MAC address verification
+      mac = nic.get("mac", constants.VALUE_AUTO)
+      if mac not in (constants.VALUE_AUTO, constants.VALUE_GENERATE):
+        mac = utils.NormalizeAndValidateMac(mac)
 
 
-    env.update(_BuildInstanceHookEnv(
-      name=self.op.instance_name,
-      primary_node=self.op.pnode,
-      secondary_nodes=self.secondaries,
-      status=self.op.start,
-      os_type=self.op.os_type,
-      memory=self.be_full[constants.BE_MEMORY],
-      vcpus=self.be_full[constants.BE_VCPUS],
-      nics=_NICListToTuple(self, self.nics),
-      disk_template=self.op.disk_template,
-      disks=[(d["size"], d["mode"]) for d in self.disks],
-      bep=self.be_full,
-      hvp=self.hv_full,
-      hypervisor_name=self.op.hypervisor,
-    ))
+        try:
+          self.cfg.ReserveMAC(mac, self.proc.GetECId())
+        except errors.ReservationError:
+          raise errors.OpPrereqError("MAC address %s already in use"
+                                     " in cluster" % mac,
+                                     errors.ECODE_NOTUNIQUE)
 
 
-    nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
-          self.secondaries)
-    return env, nl, nl
+      # bridge verification
+      bridge = nic.get("bridge", None)
+      link = nic.get("link", None)
+      if bridge and link:
+        raise errors.OpPrereqError("Cannot pass 'bridge' and 'link'"
+                                   " at the same time", errors.ECODE_INVAL)
+      elif bridge and nic_mode == constants.NIC_MODE_ROUTED:
+        raise errors.OpPrereqError("Cannot pass 'bridge' on a routed nic",
+                                   errors.ECODE_INVAL)
+      elif bridge:
+        link = bridge
 
 
+      nicparams = {}
+      if nic_mode_req:
+        nicparams[constants.NIC_MODE] = nic_mode_req
+      if link:
+        nicparams[constants.NIC_LINK] = link
 
 
-  def CheckPrereq(self):
-    """Check prerequisites.
+      check_params = objects.FillDict(cluster.nicparams[constants.PP_DEFAULT],
+                                      nicparams)
+      objects.NIC.CheckParameterSyntax(check_params)
+      self.nics.append(objects.NIC(mac=mac, ip=nic_ip, nicparams=nicparams))
 
 
-    """
-    if (not self.cfg.GetVGName() and
-        self.op.disk_template not in constants.DTS_NOT_LVM):
-      raise errors.OpPrereqError("Cluster does not support lvm-based"
-                                 " instances", errors.ECODE_STATE)
+    # disk checks/pre-build
+    self.disks = []
+    for disk in self.op.disks:
+      mode = disk.get("mode", constants.DISK_RDWR)
+      if mode not in constants.DISK_ACCESS_SET:
+        raise errors.OpPrereqError("Invalid disk access mode '%s'" %
+                                   mode, errors.ECODE_INVAL)
+      size = disk.get("size", None)
+      if size is None:
+        raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
+      try:
+        size = int(size)
+      except (TypeError, ValueError):
+        raise errors.OpPrereqError("Invalid disk size '%s'" % size,
+                                   errors.ECODE_INVAL)
+      new_disk = {"size": size, "mode": mode}
+      if "adopt" in disk:
+        new_disk["adopt"] = disk["adopt"]
+      self.disks.append(new_disk)
 
     if self.op.mode == constants.INSTANCE_IMPORT:
 
     if self.op.mode == constants.INSTANCE_IMPORT:
-      src_node = self.op.src_node
-      src_path = self.op.src_path
-
-      if src_node is None:
-        locked_nodes = self.acquired_locks[locking.LEVEL_NODE]
-        exp_list = self.rpc.call_export_list(locked_nodes)
-        found = False
-        for node in exp_list:
-          if exp_list[node].fail_msg:
-            continue
-          if src_path in exp_list[node].payload:
-            found = True
-            self.op.src_node = src_node = node
-            self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
-                                                         src_path)
-            break
-        if not found:
-          raise errors.OpPrereqError("No export found for relative path %s" %
-                                      src_path, errors.ECODE_INVAL)
-
-      _CheckNodeOnline(self, src_node)
-      result = self.rpc.call_export_info(src_node, src_path)
-      result.Raise("No export or invalid export found in dir %s" % src_path)
-
-      export_info = objects.SerializableConfigParser.Loads(str(result.payload))
-      if not export_info.has_section(constants.INISECT_EXP):
-        raise errors.ProgrammerError("Corrupted export config",
-                                     errors.ECODE_ENVIRON)
-
-      ei_version = export_info.get(constants.INISECT_EXP, 'version')
-      if (int(ei_version) != constants.EXPORT_VERSION):
-        raise errors.OpPrereqError("Wrong export version %s (wanted %d)" %
-                                   (ei_version, constants.EXPORT_VERSION),
-                                   errors.ECODE_ENVIRON)
 
       # Check that the new instance doesn't have less disks than the export
       instance_disks = len(self.disks)
 
       # Check that the new instance doesn't have less disks than the export
       instance_disks = len(self.disks)
@@ -6058,14 +6533,13 @@ class LUCreateInstance(LogicalUnit):
                                    (instance_disks, export_disks),
                                    errors.ECODE_INVAL)
 
                                    (instance_disks, export_disks),
                                    errors.ECODE_INVAL)
 
-      self.op.os_type = export_info.get(constants.INISECT_EXP, 'os')
       disk_images = []
       for idx in range(export_disks):
         option = 'disk%d_dump' % idx
         if export_info.has_option(constants.INISECT_INS, option):
           # FIXME: are the old os-es, disk sizes, etc. useful?
           export_name = export_info.get(constants.INISECT_INS, option)
       disk_images = []
       for idx in range(export_disks):
         option = 'disk%d_dump' % idx
         if export_info.has_option(constants.INISECT_INS, option):
           # FIXME: are the old os-es, disk sizes, etc. useful?
           export_name = export_info.get(constants.INISECT_INS, option)
-          image = utils.PathJoin(src_path, export_name)
+          image = utils.PathJoin(self.op.src_path, export_name)
           disk_images.append(image)
         else:
           disk_images.append(False)
           disk_images.append(image)
         else:
           disk_images.append(False)
@@ -6073,8 +6547,12 @@ class LUCreateInstance(LogicalUnit):
       self.src_images = disk_images
 
       old_name = export_info.get(constants.INISECT_INS, 'name')
       self.src_images = disk_images
 
       old_name = export_info.get(constants.INISECT_INS, 'name')
-      # FIXME: int() here could throw a ValueError on broken exports
-      exp_nic_count = int(export_info.get(constants.INISECT_INS, 'nic_count'))
+      try:
+        exp_nic_count = export_info.getint(constants.INISECT_INS, 'nic_count')
+      except (TypeError, ValueError), err:
+        raise errors.OpPrereqError("Invalid export file, nic_count is not"
+                                   " an integer: %s" % str(err),
+                                   errors.ECODE_STATE)
       if self.op.instance_name == old_name:
         for idx, nic in enumerate(self.nics):
           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
       if self.op.instance_name == old_name:
         for idx, nic in enumerate(self.nics):
           if nic.mac == constants.VALUE_AUTO and exp_nic_count >= idx:
@@ -6139,33 +6617,43 @@ class LUCreateInstance(LogicalUnit):
     req_size = _ComputeDiskSize(self.op.disk_template,
                                 self.disks)
 
     req_size = _ComputeDiskSize(self.op.disk_template,
                                 self.disks)
 
-    # Check lv size requirements
-    if req_size is not None:
-      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
-                                         self.op.hypervisor)
-      for node in nodenames:
-        info = nodeinfo[node]
-        info.Raise("Cannot get current information from node %s" % node)
-        info = info.payload
-        vg_free = info.get('vg_free', None)
-        if not isinstance(vg_free, int):
-          raise errors.OpPrereqError("Can't compute free disk space on"
-                                     " node %s" % node, errors.ECODE_ENVIRON)
-        if req_size > vg_free:
-          raise errors.OpPrereqError("Not enough disk space on target node %s."
-                                     " %d MB available, %d MB required" %
-                                     (node, vg_free, req_size),
-                                     errors.ECODE_NORES)
+    # Check lv size requirements, if not adopting
+    if req_size is not None and not self.adopt_disks:
+      _CheckNodesFreeDisk(self, nodenames, req_size)
+
+    if self.adopt_disks: # instead, we must check the adoption data
+      all_lvs = set([i["adopt"] for i in self.disks])
+      if len(all_lvs) != len(self.disks):
+        raise errors.OpPrereqError("Duplicate volume names given for adoption",
+                                   errors.ECODE_INVAL)
+      for lv_name in all_lvs:
+        try:
+          self.cfg.ReserveLV(lv_name, self.proc.GetECId())
+        except errors.ReservationError:
+          raise errors.OpPrereqError("LV named %s used by another instance" %
+                                     lv_name, errors.ECODE_NOTUNIQUE)
+
+      node_lvs = self.rpc.call_lv_list([pnode.name],
+                                       self.cfg.GetVGName())[pnode.name]
+      node_lvs.Raise("Cannot get LV information from node %s" % pnode.name)
+      node_lvs = node_lvs.payload
+      delta = all_lvs.difference(node_lvs.keys())
+      if delta:
+        raise errors.OpPrereqError("Missing logical volume(s): %s" %
+                                   utils.CommaJoin(delta),
+                                   errors.ECODE_INVAL)
+      online_lvs = [lv for lv in all_lvs if node_lvs[lv][2]]
+      if online_lvs:
+        raise errors.OpPrereqError("Online logical volumes found, cannot"
+                                   " adopt: %s" % utils.CommaJoin(online_lvs),
+                                   errors.ECODE_STATE)
+      # update the size of disk based on what is found
+      for dsk in self.disks:
+        dsk["size"] = int(float(node_lvs[dsk["adopt"]][0]))
 
     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
 
 
     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
 
-    # os verification
-    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
-    result.Raise("OS '%s' not in supported os list for primary node %s" %
-                 (self.op.os_type, pnode.name),
-                 prereq=True, ecode=errors.ECODE_INVAL)
-    if not self.op.force_variant:
-      _CheckOSVariant(result.payload, self.op.os_type)
+    _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant)
 
     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
 
 
     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
 
@@ -6191,19 +6679,18 @@ class LUCreateInstance(LogicalUnit):
     else:
       network_port = None
 
     else:
       network_port = None
 
-    ##if self.op.vnc_bind_address is None:
-    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
+    if constants.ENABLE_FILE_STORAGE:
+      # this is needed because os.path.join does not accept None arguments
+      if self.op.file_storage_dir is None:
+        string_file_storage_dir = ""
+      else:
+        string_file_storage_dir = self.op.file_storage_dir
 
 
-    # this is needed because os.path.join does not accept None arguments
-    if self.op.file_storage_dir is None:
-      string_file_storage_dir = ""
+      # build the full file storage dir path
+      file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
+                                        string_file_storage_dir, instance)
     else:
     else:
-      string_file_storage_dir = self.op.file_storage_dir
-
-    # build the full file storage dir path
-    file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
-                                      string_file_storage_dir, instance)
-
+      file_storage_dir = ""
 
     disks = _GenerateDiskTemplate(self,
                                   self.op.disk_template,
 
     disks = _GenerateDiskTemplate(self,
                                   self.op.disk_template,
@@ -6225,16 +6712,29 @@ class LUCreateInstance(LogicalUnit):
                             hypervisor=self.op.hypervisor,
                             )
 
                             hypervisor=self.op.hypervisor,
                             )
 
-    feedback_fn("* creating instance disks...")
-    try:
-      _CreateDisks(self, iobj)
-    except errors.OpExecError:
-      self.LogWarning("Device creation failed, reverting...")
+    if self.adopt_disks:
+      # rename LVs to the newly-generated names; we need to construct
+      # 'fake' LV disks with the old data, plus the new unique_id
+      tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
+      rename_to = []
+      for t_dsk, a_dsk in zip (tmp_disks, self.disks):
+        rename_to.append(t_dsk.logical_id)
+        t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk["adopt"])
+        self.cfg.SetDiskID(t_dsk, pnode_name)
+      result = self.rpc.call_blockdev_rename(pnode_name,
+                                             zip(tmp_disks, rename_to))
+      result.Raise("Failed to rename adoped LVs")
+    else:
+      feedback_fn("* creating instance disks...")
       try:
       try:
-        _RemoveDisks(self, iobj)
-      finally:
-        self.cfg.ReleaseDRBDMinors(instance)
-        raise
+        _CreateDisks(self, iobj)
+      except errors.OpExecError:
+        self.LogWarning("Device creation failed, reverting...")
+        try:
+          _RemoveDisks(self, iobj)
+        finally:
+          self.cfg.ReleaseDRBDMinors(instance)
+          raise
 
     feedback_fn("adding instance %s to cluster config" % instance)
 
 
     feedback_fn("adding instance %s to cluster config" % instance)
 
@@ -6272,32 +6772,42 @@ class LUCreateInstance(LogicalUnit):
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance")
 
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance")
 
-    feedback_fn("creating os for instance %s on node %s" %
-                (instance, pnode_name))
-
-    if iobj.disk_template != constants.DT_DISKLESS:
+    if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
       if self.op.mode == constants.INSTANCE_CREATE:
       if self.op.mode == constants.INSTANCE_CREATE:
-        feedback_fn("* running the instance OS create scripts...")
-        # FIXME: pass debug option from opcode to backend
-        result = self.rpc.call_instance_os_add(pnode_name, iobj, False,
-                                               self.op.debug_level)
-        result.Raise("Could not add os for instance %s"
-                     " on node %s" % (instance, pnode_name))
+        if not self.op.no_install:
+          feedback_fn("* running the instance OS create scripts...")
+          # FIXME: pass debug option from opcode to backend
+          result = self.rpc.call_instance_os_add(pnode_name, iobj, False,
+                                                 self.op.debug_level)
+          result.Raise("Could not add os for instance %s"
+                       " on node %s" % (instance, pnode_name))
 
       elif self.op.mode == constants.INSTANCE_IMPORT:
         feedback_fn("* running the instance OS import scripts...")
 
       elif self.op.mode == constants.INSTANCE_IMPORT:
         feedback_fn("* running the instance OS import scripts...")
-        src_node = self.op.src_node
-        src_images = self.src_images
-        cluster_name = self.cfg.GetClusterName()
-        # FIXME: pass debug option from opcode to backend
-        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
-                                                         src_node, src_images,
-                                                         cluster_name,
-                                                         self.op.debug_level)
-        msg = import_result.fail_msg
-        if msg:
-          self.LogWarning("Error while importing the disk images for instance"
-                          " %s on node %s: %s" % (instance, pnode_name, msg))
+
+        transfers = []
+
+        for idx, image in enumerate(self.src_images):
+          if not image:
+            continue
+
+          # FIXME: pass debug option from opcode to backend
+          dt = masterd.instance.DiskTransfer("disk/%s" % idx,
+                                             constants.IEIO_FILE, (image, ),
+                                             constants.IEIO_SCRIPT,
+                                             (iobj.disks[idx], idx),
+                                             None)
+          transfers.append(dt)
+
+        import_result = \
+          masterd.instance.TransferInstanceData(self, feedback_fn,
+                                                self.op.src_node, pnode_name,
+                                                self.pnode.secondary_ip,
+                                                iobj, transfers)
+        if not compat.all(import_result):
+          self.LogWarning("Some disks for instance %s on node %s were not"
+                          " imported successfully" % (instance, pnode_name))
+
       else:
         # also checked in the prereq part
         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
       else:
         # also checked in the prereq part
         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
@@ -7211,6 +7721,8 @@ class LURepairNodeStorage(NoHooksLU):
   def CheckArguments(self):
     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
 
   def CheckArguments(self):
     self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
 
+    _CheckStorageType(self.op.storage_type)
+
   def ExpandNames(self):
     self.needed_locks = {
       locking.LEVEL_NODE: [self.op.node_name],
   def ExpandNames(self):
     self.needed_locks = {
       locking.LEVEL_NODE: [self.op.node_name],
@@ -7366,26 +7878,16 @@ class LUGrowDisk(LogicalUnit):
 
     self.instance = instance
 
 
     self.instance = instance
 
-    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
+    if instance.disk_template not in constants.DTS_GROWABLE:
       raise errors.OpPrereqError("Instance's disk layout does not support"
                                  " growing.", errors.ECODE_INVAL)
 
     self.disk = instance.FindDisk(self.op.disk)
 
       raise errors.OpPrereqError("Instance's disk layout does not support"
                                  " growing.", errors.ECODE_INVAL)
 
     self.disk = instance.FindDisk(self.op.disk)
 
-    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
-                                       instance.hypervisor)
-    for node in nodenames:
-      info = nodeinfo[node]
-      info.Raise("Cannot get current information from node %s" % node)
-      vg_free = info.payload.get('vg_free', None)
-      if not isinstance(vg_free, int):
-        raise errors.OpPrereqError("Can't compute free disk space on"
-                                   " node %s" % node, errors.ECODE_ENVIRON)
-      if self.op.amount > vg_free:
-        raise errors.OpPrereqError("Not enough disk space on target node %s:"
-                                   " %d MiB available, %d MiB required" %
-                                   (node, vg_free, self.op.amount),
-                                   errors.ECODE_NORES)
+    if instance.disk_template != constants.DT_FILE:
+      # TODO: check the free disk space for file, when that feature will be
+      # supported
+      _CheckNodesFreeDisk(self, nodenames, self.op.amount)
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
@@ -7589,9 +8091,17 @@ class LUSetInstanceParams(LogicalUnit):
       self.op.beparams = {}
     if not hasattr(self.op, 'hvparams'):
       self.op.hvparams = {}
       self.op.beparams = {}
     if not hasattr(self.op, 'hvparams'):
       self.op.hvparams = {}
+    if not hasattr(self.op, "disk_template"):
+      self.op.disk_template = None
+    if not hasattr(self.op, "remote_node"):
+      self.op.remote_node = None
+    if not hasattr(self.op, "os_name"):
+      self.op.os_name = None
+    if not hasattr(self.op, "force_variant"):
+      self.op.force_variant = False
     self.op.force = getattr(self.op, "force", False)
     self.op.force = getattr(self.op, "force", False)
-    if not (self.op.nics or self.op.disks or
-            self.op.hvparams or self.op.beparams):
+    if not (self.op.nics or self.op.disks or self.op.disk_template or
+            self.op.hvparams or self.op.beparams or self.op.os_name):
       raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
 
     if self.op.hvparams:
       raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
 
     if self.op.hvparams:
@@ -7637,6 +8147,19 @@ class LUSetInstanceParams(LogicalUnit):
       raise errors.OpPrereqError("Only one disk add or remove operation"
                                  " supported at a time", errors.ECODE_INVAL)
 
       raise errors.OpPrereqError("Only one disk add or remove operation"
                                  " supported at a time", errors.ECODE_INVAL)
 
+    if self.op.disks and self.op.disk_template is not None:
+      raise errors.OpPrereqError("Disk template conversion and other disk"
+                                 " changes not supported at the same time",
+                                 errors.ECODE_INVAL)
+
+    if self.op.disk_template:
+      _CheckDiskTemplate(self.op.disk_template)
+      if (self.op.disk_template in constants.DTS_NET_MIRROR and
+          self.op.remote_node is None):
+        raise errors.OpPrereqError("Changing the disk template to a mirrored"
+                                   " one requires specifying a secondary node",
+                                   errors.ECODE_INVAL)
+
     # NIC validation
     nic_addremove = 0
     for nic_op, nic_dict in self.op.nics:
     # NIC validation
     nic_addremove = 0
     for nic_op, nic_dict in self.op.nics:
@@ -7699,6 +8222,9 @@ class LUSetInstanceParams(LogicalUnit):
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
+      if self.op.disk_template and self.op.remote_node:
+        self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+        self.needed_locks[locking.LEVEL_NODE].append(self.op.remote_node)
 
   def BuildHooksEnv(self):
     """Build hooks env.
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -7748,6 +8274,8 @@ class LUSetInstanceParams(LogicalUnit):
         del args['nics'][-1]
 
     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
         del args['nics'][-1]
 
     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
+    if self.op.disk_template:
+      env["NEW_DISK_TEMPLATE"] = self.op.disk_template
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
@@ -7801,6 +8329,25 @@ class LUSetInstanceParams(LogicalUnit):
     pnode = instance.primary_node
     nodelist = list(instance.all_nodes)
 
     pnode = instance.primary_node
     nodelist = list(instance.all_nodes)
 
+    if self.op.disk_template:
+      if instance.disk_template == self.op.disk_template:
+        raise errors.OpPrereqError("Instance already has disk template %s" %
+                                   instance.disk_template, errors.ECODE_INVAL)
+
+      if (instance.disk_template,
+          self.op.disk_template) not in self._DISK_CONVERSIONS:
+        raise errors.OpPrereqError("Unsupported disk template conversion from"
+                                   " %s to %s" % (instance.disk_template,
+                                                  self.op.disk_template),
+                                   errors.ECODE_INVAL)
+      if self.op.disk_template in constants.DTS_NET_MIRROR:
+        _CheckNodeOnline(self, self.op.remote_node)
+        _CheckNodeNotDrained(self, self.op.remote_node)
+        disks = [{"size": d.size} for d in instance.disks]
+        required = _ComputeDiskSize(self.op.disk_template, disks)
+        _CheckNodesFreeDisk(self, [self.op.remote_node], required)
+        _CheckInstanceDown(self, instance, "cannot change disk template")
+
     # hvparams processing
     if self.op.hvparams:
       i_hvdict, hv_new = self._GetUpdatedParams(
     # hvparams processing
     if self.op.hvparams:
       i_hvdict, hv_new = self._GetUpdatedParams(
@@ -7966,17 +8513,8 @@ class LUSetInstanceParams(LogicalUnit):
       if disk_op == constants.DDM_REMOVE:
         if len(instance.disks) == 1:
           raise errors.OpPrereqError("Cannot remove the last disk of"
       if disk_op == constants.DDM_REMOVE:
         if len(instance.disks) == 1:
           raise errors.OpPrereqError("Cannot remove the last disk of"
-                                     " an instance",
-                                     errors.ECODE_INVAL)
-        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
-        ins_l = ins_l[pnode]
-        msg = ins_l.fail_msg
-        if msg:
-          raise errors.OpPrereqError("Can't contact node %s: %s" %
-                                     (pnode, msg), errors.ECODE_ENVIRON)
-        if instance.name in ins_l.payload:
-          raise errors.OpPrereqError("Instance is running, can't remove"
-                                     " disks.", errors.ECODE_STATE)
+                                     " an instance", errors.ECODE_INVAL)
+        _CheckInstanceDown(self, instance, "cannot remove disks")
 
       if (disk_op == constants.DDM_ADD and
           len(instance.nics) >= constants.MAX_DISKS):
 
       if (disk_op == constants.DDM_ADD and
           len(instance.nics) >= constants.MAX_DISKS):
@@ -7991,8 +8529,103 @@ class LUSetInstanceParams(LogicalUnit):
                                      (disk_op, len(instance.disks)),
                                      errors.ECODE_INVAL)
 
                                      (disk_op, len(instance.disks)),
                                      errors.ECODE_INVAL)
 
+    # OS change
+    if self.op.os_name and not self.op.force:
+      _CheckNodeHasOS(self, instance.primary_node, self.op.os_name,
+                      self.op.force_variant)
+
     return
 
     return
 
+  def _ConvertPlainToDrbd(self, feedback_fn):
+    """Converts an instance from plain to drbd.
+
+    """
+    feedback_fn("Converting template to drbd")
+    instance = self.instance
+    pnode = instance.primary_node
+    snode = self.op.remote_node
+
+    # create a fake disk info for _GenerateDiskTemplate
+    disk_info = [{"size": d.size, "mode": d.mode} for d in instance.disks]
+    new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
+                                      instance.name, pnode, [snode],
+                                      disk_info, None, None, 0)
+    info = _GetInstanceInfoText(instance)
+    feedback_fn("Creating aditional volumes...")
+    # first, create the missing data and meta devices
+    for disk in new_disks:
+      # unfortunately this is... not too nice
+      _CreateSingleBlockDev(self, pnode, instance, disk.children[1],
+                            info, True)
+      for child in disk.children:
+        _CreateSingleBlockDev(self, snode, instance, child, info, True)
+    # at this stage, all new LVs have been created, we can rename the
+    # old ones
+    feedback_fn("Renaming original volumes...")
+    rename_list = [(o, n.children[0].logical_id)
+                   for (o, n) in zip(instance.disks, new_disks)]
+    result = self.rpc.call_blockdev_rename(pnode, rename_list)
+    result.Raise("Failed to rename original LVs")
+
+    feedback_fn("Initializing DRBD devices...")
+    # all child devices are in place, we can now create the DRBD devices
+    for disk in new_disks:
+      for node in [pnode, snode]:
+        f_create = node == pnode
+        _CreateSingleBlockDev(self, node, instance, disk, info, f_create)
+
+    # at this point, the instance has been modified
+    instance.disk_template = constants.DT_DRBD8
+    instance.disks = new_disks
+    self.cfg.Update(instance, feedback_fn)
+
+    # disks are created, waiting for sync
+    disk_abort = not _WaitForSync(self, instance)
+    if disk_abort:
+      raise errors.OpExecError("There are some degraded disks for"
+                               " this instance, please cleanup manually")
+
+  def _ConvertDrbdToPlain(self, feedback_fn):
+    """Converts an instance from drbd to plain.
+
+    """
+    instance = self.instance
+    assert len(instance.secondary_nodes) == 1
+    pnode = instance.primary_node
+    snode = instance.secondary_nodes[0]
+    feedback_fn("Converting template to plain")
+
+    old_disks = instance.disks
+    new_disks = [d.children[0] for d in old_disks]
+
+    # copy over size and mode
+    for parent, child in zip(old_disks, new_disks):
+      child.size = parent.size
+      child.mode = parent.mode
+
+    # update instance structure
+    instance.disks = new_disks
+    instance.disk_template = constants.DT_PLAIN
+    self.cfg.Update(instance, feedback_fn)
+
+    feedback_fn("Removing volumes on the secondary node...")
+    for disk in old_disks:
+      self.cfg.SetDiskID(disk, snode)
+      msg = self.rpc.call_blockdev_remove(snode, disk).fail_msg
+      if msg:
+        self.LogWarning("Could not remove block device %s on node %s,"
+                        " continuing anyway: %s", disk.iv_name, snode, msg)
+
+    feedback_fn("Removing unneeded volumes on the primary node...")
+    for idx, disk in enumerate(old_disks):
+      meta = disk.children[1]
+      self.cfg.SetDiskID(meta, pnode)
+      msg = self.rpc.call_blockdev_remove(pnode, meta).fail_msg
+      if msg:
+        self.LogWarning("Could not remove metadata for disk %d on node %s,"
+                        " continuing anyway: %s", idx, pnode, msg)
+
+
   def Exec(self, feedback_fn):
     """Modifies an instance.
 
   def Exec(self, feedback_fn):
     """Modifies an instance.
 
@@ -8057,6 +8690,20 @@ class LUSetInstanceParams(LogicalUnit):
         # change a given disk
         instance.disks[disk_op].mode = disk_dict['mode']
         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
         # change a given disk
         instance.disks[disk_op].mode = disk_dict['mode']
         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
+
+    if self.op.disk_template:
+      r_shut = _ShutdownInstanceDisks(self, instance)
+      if not r_shut:
+        raise errors.OpExecError("Cannot shutdow instance disks, unable to"
+                                 " proceed with disk template conversion")
+      mode = (instance.disk_template, self.op.disk_template)
+      try:
+        self._DISK_CONVERSIONS[mode](self, feedback_fn)
+      except:
+        self.cfg.ReleaseDRBDMinors(instance.name)
+        raise
+      result.append(("disk_template", self.op.disk_template))
+
     # NIC changes
     for nic_op, nic_dict in self.op.nics:
       if nic_op == constants.DDM_REMOVE:
     # NIC changes
     for nic_op, nic_dict in self.op.nics:
       if nic_op == constants.DDM_REMOVE:
@@ -8097,10 +8744,18 @@ class LUSetInstanceParams(LogicalUnit):
       for key, val in self.op.beparams.iteritems():
         result.append(("be/%s" % key, val))
 
       for key, val in self.op.beparams.iteritems():
         result.append(("be/%s" % key, val))
 
+    # OS change
+    if self.op.os_name:
+      instance.os = self.op.os_name
+
     self.cfg.Update(instance, feedback_fn)
 
     return result
 
     self.cfg.Update(instance, feedback_fn)
 
     return result
 
+  _DISK_CONVERSIONS = {
+    (constants.DT_PLAIN, constants.DT_DRBD8): _ConvertPlainToDrbd,
+    (constants.DT_DRBD8, constants.DT_PLAIN): _ConvertDrbdToPlain,
+    }
 
 class LUQueryExports(NoHooksLU):
   """Query the exports list
 
 class LUQueryExports(NoHooksLU):
   """Query the exports list
@@ -8157,11 +8812,22 @@ class LUExportInstance(LogicalUnit):
     """Check the arguments.
 
     """
     """Check the arguments.
 
     """
+    _CheckBooleanOpField(self.op, "remove_instance")
+    _CheckBooleanOpField(self.op, "ignore_remove_failures")
+
     self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
                                     constants.DEFAULT_SHUTDOWN_TIMEOUT)
     self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
                                     constants.DEFAULT_SHUTDOWN_TIMEOUT)
+    self.remove_instance = getattr(self.op, "remove_instance", False)
+    self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures",
+                                          False)
+
+    if self.remove_instance and not self.op.shutdown:
+      raise errors.OpPrereqError("Can not remove instance without shutting it"
+                                 " down before")
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+
     # FIXME: lock only instance primary and destination node
     #
     # Sad but true, for now we have do lock all nodes, as we don't know where
     # FIXME: lock only instance primary and destination node
     #
     # Sad but true, for now we have do lock all nodes, as we don't know where
@@ -8186,6 +8852,8 @@ class LUExportInstance(LogicalUnit):
       "EXPORT_NODE": self.op.target_node,
       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
       "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
       "EXPORT_NODE": self.op.target_node,
       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
       "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
+      # TODO: Generic function for boolean env variables
+      "REMOVE_INSTANCE": str(bool(self.remove_instance)),
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
@@ -8212,11 +8880,100 @@ class LUExportInstance(LogicalUnit):
     _CheckNodeNotDrained(self, self.dst_node.name)
 
     # instance disk type verification
     _CheckNodeNotDrained(self, self.dst_node.name)
 
     # instance disk type verification
+    # TODO: Implement export support for file-based disks
     for disk in self.instance.disks:
       if disk.dev_type == constants.LD_FILE:
         raise errors.OpPrereqError("Export not supported for instances with"
                                    " file-based disks", errors.ECODE_INVAL)
 
     for disk in self.instance.disks:
       if disk.dev_type == constants.LD_FILE:
         raise errors.OpPrereqError("Export not supported for instances with"
                                    " file-based disks", errors.ECODE_INVAL)
 
+  def _CreateSnapshots(self, feedback_fn):
+    """Creates an LVM snapshot for every disk of the instance.
+
+    @return: List of snapshots as L{objects.Disk} instances
+
+    """
+    instance = self.instance
+    src_node = instance.primary_node
+
+    vgname = self.cfg.GetVGName()
+
+    snap_disks = []
+
+    for idx, disk in enumerate(instance.disks):
+      feedback_fn("Creating a snapshot of disk/%s on node %s" %
+                  (idx, src_node))
+
+      # result.payload will be a snapshot of an lvm leaf of the one we
+      # passed
+      result = self.rpc.call_blockdev_snapshot(src_node, disk)
+      msg = result.fail_msg
+      if msg:
+        self.LogWarning("Could not snapshot disk/%s on node %s: %s",
+                        idx, src_node, msg)
+        snap_disks.append(False)
+      else:
+        disk_id = (vgname, result.payload)
+        new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
+                               logical_id=disk_id, physical_id=disk_id,
+                               iv_name=disk.iv_name)
+        snap_disks.append(new_dev)
+
+    return snap_disks
+
+  def _RemoveSnapshot(self, feedback_fn, snap_disks, disk_index):
+    """Removes an LVM snapshot.
+
+    @type snap_disks: list
+    @param snap_disks: The list of all snapshots as returned by
+                       L{_CreateSnapshots}
+    @type disk_index: number
+    @param disk_index: Index of the snapshot to be removed
+    @rtype: bool
+    @return: Whether removal was successful or not
+
+    """
+    disk = snap_disks[disk_index]
+    if disk:
+      src_node = self.instance.primary_node
+
+      feedback_fn("Removing snapshot of disk/%s on node %s" %
+                  (disk_index, src_node))
+
+      result = self.rpc.call_blockdev_remove(src_node, disk)
+      if not result.fail_msg:
+        return True
+
+      self.LogWarning("Could not remove snapshot for disk/%d from node"
+                      " %s: %s", disk_index, src_node, result.fail_msg)
+
+    return False
+
+  def _CleanupExports(self, feedback_fn):
+    """Removes exports of current instance from all other nodes.
+
+    If an instance in a cluster with nodes A..D was exported to node C, its
+    exports will be removed from the nodes A, B and D.
+
+    """
+    nodelist = self.cfg.GetNodeList()
+    nodelist.remove(self.dst_node.name)
+
+    # on one-node clusters nodelist will be empty after the removal
+    # if we proceed the backup would be removed because OpQueryExports
+    # substitutes an empty list with the full cluster node list.
+    iname = self.instance.name
+    if nodelist:
+      feedback_fn("Removing old exports for instance %s" % iname)
+      exportlist = self.rpc.call_export_list(nodelist)
+      for node in exportlist:
+        if exportlist[node].fail_msg:
+          continue
+        if iname in exportlist[node].payload:
+          msg = self.rpc.call_export_remove(node, iname).fail_msg
+          if msg:
+            self.LogWarning("Could not remove older export for instance %s"
+                            " on node %s: %s", iname, node, msg)
+
   def Exec(self, feedback_fn):
     """Export an instance to an image in the cluster.
 
   def Exec(self, feedback_fn):
     """Export an instance to an image in the cluster.
 
@@ -8230,13 +8987,10 @@ class LUExportInstance(LogicalUnit):
       feedback_fn("Shutting down instance %s" % instance.name)
       result = self.rpc.call_instance_shutdown(src_node, instance,
                                                self.shutdown_timeout)
       feedback_fn("Shutting down instance %s" % instance.name)
       result = self.rpc.call_instance_shutdown(src_node, instance,
                                                self.shutdown_timeout)
+      # TODO: Maybe ignore failures if ignore_remove_failures is set
       result.Raise("Could not shutdown instance %s on"
                    " node %s" % (instance.name, src_node))
 
       result.Raise("Could not shutdown instance %s on"
                    " node %s" % (instance.name, src_node))
 
-    vgname = self.cfg.GetVGName()
-
-    snap_disks = []
-
     # set the disks ID correctly since call_instance_start needs the
     # correct drbd minor to create the symlinks
     for disk in instance.disks:
     # set the disks ID correctly since call_instance_start needs the
     # correct drbd minor to create the symlinks
     for disk in instance.disks:
@@ -8251,94 +9005,93 @@ class LUExportInstance(LogicalUnit):
 
     try:
       # per-disk results
 
     try:
       # per-disk results
-      dresults = []
+      removed_snaps = [False] * len(instance.disks)
+
+      snap_disks = None
       try:
       try:
-        for idx, disk in enumerate(instance.disks):
-          feedback_fn("Creating a snapshot of disk/%s on node %s" %
-                      (idx, src_node))
-
-          # result.payload will be a snapshot of an lvm leaf of the one we
-          # passed
-          result = self.rpc.call_blockdev_snapshot(src_node, disk)
-          msg = result.fail_msg
-          if msg:
-            self.LogWarning("Could not snapshot disk/%s on node %s: %s",
-                            idx, src_node, msg)
-            snap_disks.append(False)
-          else:
-            disk_id = (vgname, result.payload)
-            new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
-                                   logical_id=disk_id, physical_id=disk_id,
-                                   iv_name=disk.iv_name)
-            snap_disks.append(new_dev)
+        try:
+          snap_disks = self._CreateSnapshots(feedback_fn)
+        finally:
+          if (self.op.shutdown and instance.admin_up and
+              not self.remove_instance):
+            feedback_fn("Starting instance %s" % instance.name)
+            result = self.rpc.call_instance_start(src_node, instance,
+                                                  None, None)
+            msg = result.fail_msg
+            if msg:
+              _ShutdownInstanceDisks(self, instance)
+              raise errors.OpExecError("Could not start instance: %s" % msg)
+
+        assert len(snap_disks) == len(instance.disks)
+        assert len(removed_snaps) == len(instance.disks)
+
+        # TODO: check for size
+
+        def _TransferFinished(idx):
+          logging.debug("Transfer %s finished", idx)
+          if self._RemoveSnapshot(feedback_fn, snap_disks, idx):
+            removed_snaps[idx] = True
+
+        transfers = []
+
+        for idx, dev in enumerate(snap_disks):
+          if not dev:
+            transfers.append(None)
+            continue
 
 
-      finally:
-        if self.op.shutdown and instance.admin_up:
-          feedback_fn("Starting instance %s" % instance.name)
-          result = self.rpc.call_instance_start(src_node, instance, None, None)
-          msg = result.fail_msg
-          if msg:
-            _ShutdownInstanceDisks(self, instance)
-            raise errors.OpExecError("Could not start instance: %s" % msg)
-
-      # TODO: check for size
-
-      cluster_name = self.cfg.GetClusterName()
-      for idx, dev in enumerate(snap_disks):
-        feedback_fn("Exporting snapshot %s from %s to %s" %
-                    (idx, src_node, dst_node.name))
-        if dev:
-          # FIXME: pass debug from opcode to backend
-          result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
-                                                 instance, cluster_name,
-                                                 idx, self.op.debug_level)
-          msg = result.fail_msg
-          if msg:
-            self.LogWarning("Could not export disk/%s from node %s to"
-                            " node %s: %s", idx, src_node, dst_node.name, msg)
-            dresults.append(False)
-          else:
-            dresults.append(True)
-          msg = self.rpc.call_blockdev_remove(src_node, dev).fail_msg
-          if msg:
-            self.LogWarning("Could not remove snapshot for disk/%d from node"
-                            " %s: %s", idx, src_node, msg)
-        else:
-          dresults.append(False)
+          path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
+                                dev.physical_id[1])
 
 
-      feedback_fn("Finalizing export on %s" % dst_node.name)
-      result = self.rpc.call_finalize_export(dst_node.name, instance,
-                                             snap_disks)
-      fin_resu = True
-      msg = result.fail_msg
-      if msg:
-        self.LogWarning("Could not finalize export for instance %s"
-                        " on node %s: %s", instance.name, dst_node.name, msg)
-        fin_resu = False
+          finished_fn = compat.partial(_TransferFinished, idx)
+
+          # FIXME: pass debug option from opcode to backend
+          dt = masterd.instance.DiskTransfer("snapshot/%s" % idx,
+                                             constants.IEIO_SCRIPT, (dev, idx),
+                                             constants.IEIO_FILE, (path, ),
+                                             finished_fn)
+          transfers.append(dt)
+
+        # Actually export data
+        dresults = \
+          masterd.instance.TransferInstanceData(self, feedback_fn,
+                                                src_node, dst_node.name,
+                                                dst_node.secondary_ip,
+                                                instance, transfers)
+
+        assert len(dresults) == len(instance.disks)
+
+        # Check for backwards compatibility
+        assert compat.all(isinstance(i, bool) for i in dresults), \
+               "Not all results are boolean: %r" % dresults
+
+        feedback_fn("Finalizing export on %s" % dst_node.name)
+        result = self.rpc.call_finalize_export(dst_node.name, instance,
+                                               snap_disks)
+        msg = result.fail_msg
+        fin_resu = not msg
+        if msg:
+          self.LogWarning("Could not finalize export for instance %s"
+                          " on node %s: %s", instance.name, dst_node.name, msg)
+
+      finally:
+        # Remove all snapshots
+        assert len(removed_snaps) == len(instance.disks)
+        for idx, removed in enumerate(removed_snaps):
+          if not removed:
+            self._RemoveSnapshot(feedback_fn, snap_disks, idx)
 
     finally:
       if activate_disks:
         feedback_fn("Deactivating disks for %s" % instance.name)
         _ShutdownInstanceDisks(self, instance)
 
 
     finally:
       if activate_disks:
         feedback_fn("Deactivating disks for %s" % instance.name)
         _ShutdownInstanceDisks(self, instance)
 
-    nodelist = self.cfg.GetNodeList()
-    nodelist.remove(dst_node.name)
+    # Remove instance if requested
+    if self.remove_instance:
+      feedback_fn("Removing instance %s" % instance.name)
+      _RemoveInstance(self, feedback_fn, instance, self.ignore_remove_failures)
+
+    self._CleanupExports(feedback_fn)
 
 
-    # on one-node clusters nodelist will be empty after the removal
-    # if we proceed the backup would be removed because OpQueryExports
-    # substitutes an empty list with the full cluster node list.
-    iname = instance.name
-    if nodelist:
-      feedback_fn("Removing old exports for instance %s" % iname)
-      exportlist = self.rpc.call_export_list(nodelist)
-      for node in exportlist:
-        if exportlist[node].fail_msg:
-          continue
-        if iname in exportlist[node].payload:
-          msg = self.rpc.call_export_remove(node, iname).fail_msg
-          if msg:
-            self.LogWarning("Could not remove older export for instance %s"
-                            " on node %s: %s", iname, node, msg)
     return fin_resu, dresults
 
 
     return fin_resu, dresults