Merge branch 'devel-2.1'
[ganeti-local] / lib / cmdlib.py
index a632bb7..f15cd14 100644 (file)
@@ -33,6 +33,7 @@ import re
 import platform
 import logging
 import copy
+import OpenSSL
 
 from ganeti import ssh
 from ganeti import utils
@@ -95,6 +96,10 @@ class LogicalUnit(object):
     self.LogStep = processor.LogStep # pylint: disable-msg=C0103
     # support for dry-run
     self.dry_run_result = None
+    # support for generic debug attribute
+    if (not hasattr(self.op, "debug_level") or
+        not isinstance(self.op.debug_level, int)):
+      self.op.debug_level = 0
 
     # Tasklets
     self.tasklets = None
@@ -300,12 +305,9 @@ class LogicalUnit(object):
     else:
       assert locking.LEVEL_INSTANCE not in self.needed_locks, \
         "_ExpandAndLockInstance called with instance-level locks set"
-    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
-    if expanded_name is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name, errors.ECODE_NOENT)
-    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
-    self.op.instance_name = expanded_name
+    self.op.instance_name = _ExpandInstanceName(self.cfg,
+                                                self.op.instance_name)
+    self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
 
   def _LockInstancesNodes(self, primary_only=False):
     """Helper function to declare instances' nodes for locking.
@@ -427,7 +429,7 @@ def _GetWantedNodes(lu, nodes):
   @param nodes: list of node names or None for all nodes
   @rtype: list
   @return: the list of nodes, sorted
-  @raise errors.OpProgrammerError: if the nodes parameter is wrong type
+  @raise errors.ProgrammerError: if the nodes parameter is wrong type
 
   """
   if not isinstance(nodes, list):
@@ -438,14 +440,7 @@ def _GetWantedNodes(lu, nodes):
     raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
       " non-empty list of nodes whose name is to be expanded.")
 
-  wanted = []
-  for name in nodes:
-    node = lu.cfg.ExpandNodeName(name)
-    if node is None:
-      raise errors.OpPrereqError("No such node name '%s'" % name,
-                                 errors.ECODE_NOENT)
-    wanted.append(node)
-
+  wanted = [_ExpandNodeName(lu.cfg, name) for name in nodes]
   return utils.NiceSort(wanted)
 
 
@@ -467,15 +462,7 @@ def _GetWantedInstances(lu, instances):
                                errors.ECODE_INVAL)
 
   if instances:
-    wanted = []
-
-    for name in instances:
-      instance = lu.cfg.ExpandInstanceName(name)
-      if instance is None:
-        raise errors.OpPrereqError("No such instance name '%s'" % name,
-                                   errors.ECODE_NOENT)
-      wanted.append(instance)
-
+    wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
   else:
     wanted = utils.NiceSort(lu.cfg.GetInstanceList())
   return wanted
@@ -555,6 +542,77 @@ def _CheckNodeNotDrained(lu, node):
                                errors.ECODE_INVAL)
 
 
+def _CheckNodeHasOS(lu, node, os_name, force_variant):
+  """Ensure that a node supports a given OS.
+
+  @param lu: the LU on behalf of which we make the check
+  @param node: the node to check
+  @param os_name: the OS to query about
+  @param force_variant: whether to ignore variant errors
+  @raise errors.OpPrereqError: if the node is not supporting the OS
+
+  """
+  result = lu.rpc.call_os_get(node, os_name)
+  result.Raise("OS '%s' not in supported OS list for node %s" %
+               (os_name, node),
+               prereq=True, ecode=errors.ECODE_INVAL)
+  if not force_variant:
+    _CheckOSVariant(result.payload, os_name)
+
+
+def _CheckDiskTemplate(template):
+  """Ensure a given disk template is valid.
+
+  """
+  if template not in constants.DISK_TEMPLATES:
+    msg = ("Invalid disk template name '%s', valid templates are: %s" %
+           (template, utils.CommaJoin(constants.DISK_TEMPLATES)))
+    raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+
+
+def _CheckInstanceDown(lu, instance, reason):
+  """Ensure that an instance is not running."""
+  if instance.admin_up:
+    raise errors.OpPrereqError("Instance %s is marked to be up, %s" %
+                               (instance.name, reason), errors.ECODE_STATE)
+
+  pnode = instance.primary_node
+  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
+  ins_l.Raise("Can't contact node %s for instance information" % pnode,
+              prereq=True, ecode=errors.ECODE_ENVIRON)
+
+  if instance.name in ins_l.payload:
+    raise errors.OpPrereqError("Instance %s is running, %s" %
+                               (instance.name, reason), errors.ECODE_STATE)
+
+
+def _ExpandItemName(fn, name, kind):
+  """Expand an item name.
+
+  @param fn: the function to use for expansion
+  @param name: requested item name
+  @param kind: text description ('Node' or 'Instance')
+  @return: the resolved (full) name
+  @raise errors.OpPrereqError: if the item is not found
+
+  """
+  full_name = fn(name)
+  if full_name is None:
+    raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
+                               errors.ECODE_NOENT)
+  return full_name
+
+
+def _ExpandNodeName(cfg, name):
+  """Wrapper over L{_ExpandItemName} for nodes."""
+  return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
+
+
+def _ExpandInstanceName(cfg, name):
+  """Wrapper over L{_ExpandItemName} for instance."""
+  return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
+
+
 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
                           memory, vcpus, nics, disk_template, disks,
                           bep, hvp, hypervisor_name):
@@ -835,6 +893,13 @@ def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
   return faulty
 
 
+def _FormatTimestamp(secs):
+  """Formats a Unix timestamp with the local timezone.
+
+  """
+  return time.strftime("%F %T %Z", time.gmtime(secs))
+
+
 class LUPostInitCluster(LogicalUnit):
   """Logical unit for running hooks after cluster initialization.
 
@@ -926,6 +991,66 @@ class LUDestroyCluster(LogicalUnit):
     return master
 
 
+def _VerifyCertificateInner(filename, expired, not_before, not_after, now,
+                            warn_days=constants.SSL_CERT_EXPIRATION_WARN,
+                            error_days=constants.SSL_CERT_EXPIRATION_ERROR):
+  """Verifies certificate details for LUVerifyCluster.
+
+  """
+  if expired:
+    msg = "Certificate %s is expired" % filename
+
+    if not_before is not None and not_after is not None:
+      msg += (" (valid from %s to %s)" %
+              (_FormatTimestamp(not_before),
+               _FormatTimestamp(not_after)))
+    elif not_before is not None:
+      msg += " (valid from %s)" % _FormatTimestamp(not_before)
+    elif not_after is not None:
+      msg += " (valid until %s)" % _FormatTimestamp(not_after)
+
+    return (LUVerifyCluster.ETYPE_ERROR, msg)
+
+  elif not_before is not None and not_before > now:
+    return (LUVerifyCluster.ETYPE_WARNING,
+            "Certificate %s not yet valid (valid from %s)" %
+            (filename, _FormatTimestamp(not_before)))
+
+  elif not_after is not None:
+    remaining_days = int((not_after - now) / (24 * 3600))
+
+    msg = ("Certificate %s expires in %d days" % (filename, remaining_days))
+
+    if remaining_days <= error_days:
+      return (LUVerifyCluster.ETYPE_ERROR, msg)
+
+    if remaining_days <= warn_days:
+      return (LUVerifyCluster.ETYPE_WARNING, msg)
+
+  return (None, None)
+
+
+def _VerifyCertificate(filename):
+  """Verifies a certificate for LUVerifyCluster.
+
+  @type filename: string
+  @param filename: Path to PEM file
+
+  """
+  try:
+    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                           utils.ReadFile(filename))
+  except Exception, err: # pylint: disable-msg=W0703
+    return (LUVerifyCluster.ETYPE_ERROR,
+            "Failed to load X509 certificate %s: %s" % (filename, err))
+
+  # Depending on the pyOpenSSL version, this can just return (None, None)
+  (not_before, not_after) = utils.GetX509CertValidity(cert)
+
+  return _VerifyCertificateInner(filename, cert.has_expired(),
+                                 not_before, not_after, time.time())
+
+
 class LUVerifyCluster(LogicalUnit):
   """Verifies the cluster status.
 
@@ -940,6 +1065,7 @@ class LUVerifyCluster(LogicalUnit):
   TINSTANCE = "instance"
 
   ECLUSTERCFG = (TCLUSTER, "ECLUSTERCFG")
+  ECLUSTERCERT = (TCLUSTER, "ECLUSTERCERT")
   EINSTANCEBADNODE = (TINSTANCE, "EINSTANCEBADNODE")
   EINSTANCEDOWN = (TINSTANCE, "EINSTANCEDOWN")
   EINSTANCELAYOUT = (TINSTANCE, "EINSTANCELAYOUT")
@@ -1302,6 +1428,11 @@ class LUVerifyCluster(LogicalUnit):
     for msg in self.cfg.VerifyConfig():
       _ErrorIf(True, self.ECLUSTERCFG, None, msg)
 
+    # Check the cluster certificates
+    for cert_filename in constants.ALL_CERT_FILES:
+      (errcode, msg) = _VerifyCertificate(cert_filename)
+      _ErrorIf(errcode, self.ECLUSTERCERT, None, msg, code=errcode)
+
     vg_name = self.cfg.GetVGName()
     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
     nodelist = utils.NiceSort(self.cfg.GetNodeList())
@@ -1323,8 +1454,7 @@ class LUVerifyCluster(LogicalUnit):
     master_files = [constants.CLUSTER_CONF_FILE]
 
     file_names = ssconf.SimpleStore().GetFileList()
-    file_names.append(constants.SSL_CERT_FILE)
-    file_names.append(constants.RAPI_CERT_FILE)
+    file_names.extend(constants.ALL_CERT_FILES)
     file_names.extend(master_files)
 
     local_checksums = utils.FingerprintFiles(file_names)
@@ -1426,7 +1556,8 @@ class LUVerifyCluster(LogicalUnit):
       idata = nresult.get(constants.NV_INSTANCELIST, None)
       test = not isinstance(idata, list)
       _ErrorIf(test, self.ENODEHV, node,
-               "rpc call to node failed (instancelist)")
+               "rpc call to node failed (instancelist): %s",
+               utils.SafeEncode(str(idata)))
       if test:
         continue
 
@@ -1444,17 +1575,17 @@ class LUVerifyCluster(LogicalUnit):
       try:
         ntime_merged = utils.MergeTime(ntime)
       except (ValueError, TypeError):
-        _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
+        _ErrorIf(True, self.ENODETIME, node, "Node returned invalid time")
 
       if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
-        ntime_diff = abs(nvinfo_starttime - ntime_merged)
+        ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
       elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
-        ntime_diff = abs(ntime_merged - nvinfo_endtime)
+        ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
       else:
         ntime_diff = None
 
       _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
-               "Node time diverges by at least %0.1fs from master node time",
+               "Node time diverges by at least %s from master node time",
                ntime_diff)
 
       if ntime_diff is not None:
@@ -1531,7 +1662,7 @@ class LUVerifyCluster(LogicalUnit):
         _ErrorIf(snode not in node_info and snode not in n_offline,
                  self.ENODERPC, snode,
                  "instance %s, connection to secondary node"
-                 "failed", instance)
+                 " failed", instance)
 
         if snode in node_info:
           node_info[snode]['sinst'].append(instance)
@@ -1616,7 +1747,7 @@ class LUVerifyCluster(LogicalUnit):
           if test:
             output = indent_re.sub('      ', output)
             feedback_fn("%s" % output)
-            lu_result = 1
+            lu_result = 0
 
       return lu_result
 
@@ -1719,10 +1850,7 @@ class LURepairDiskSizes(NoHooksLU):
     if self.op.instances:
       self.wanted_names = []
       for name in self.op.instances:
-        full_name = self.cfg.ExpandInstanceName(name)
-        if full_name is None:
-          raise errors.OpPrereqError("Instance '%s' not known" % name,
-                                     errors.ECODE_NOENT)
+        full_name = _ExpandInstanceName(self.cfg, name)
         self.wanted_names.append(full_name)
       self.needed_locks = {
         locking.LEVEL_NODE: [],
@@ -2054,6 +2182,25 @@ class LUSetClusterParams(LogicalUnit):
         else:
           self.new_hvparams[hv_name].update(hv_dict)
 
+    # os hypervisor parameters
+    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
+    if self.op.os_hvp:
+      if not isinstance(self.op.os_hvp, dict):
+        raise errors.OpPrereqError("Invalid 'os_hvp' parameter on input",
+                                   errors.ECODE_INVAL)
+      for os_name, hvs in self.op.os_hvp.items():
+        if not isinstance(hvs, dict):
+          raise errors.OpPrereqError(("Invalid 'os_hvp' parameter on"
+                                      " input"), errors.ECODE_INVAL)
+        if os_name not in self.new_os_hvp:
+          self.new_os_hvp[os_name] = hvs
+        else:
+          for hv_name, hv_dict in hvs.items():
+            if hv_name not in self.new_os_hvp[os_name]:
+              self.new_os_hvp[os_name][hv_name] = hv_dict
+            else:
+              self.new_os_hvp[os_name][hv_name].update(hv_dict)
+
     if self.op.enabled_hypervisors is not None:
       self.hv_list = self.op.enabled_hypervisors
       if not self.hv_list:
@@ -2081,6 +2228,20 @@ class LUSetClusterParams(LogicalUnit):
           hv_class.CheckParameterSyntax(hv_params)
           _CheckHVParams(self, node_list, hv_name, hv_params)
 
+    if self.op.os_hvp:
+      # no need to check any newly-enabled hypervisors, since the
+      # defaults have already been checked in the above code-block
+      for os_name, os_hvp in self.new_os_hvp.items():
+        for hv_name, hv_params in os_hvp.items():
+          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
+          # we need to fill in the new os_hvp on top of the actual hv_p
+          cluster_defaults = self.new_hvparams.get(hv_name, {})
+          new_osp = objects.FillDict(cluster_defaults, hv_params)
+          hv_class = hypervisor.GetHypervisor(hv_name)
+          hv_class.CheckParameterSyntax(new_osp)
+          _CheckHVParams(self, node_list, hv_name, new_osp)
+
+
   def Exec(self, feedback_fn):
     """Change the parameters of the cluster.
 
@@ -2096,6 +2257,8 @@ class LUSetClusterParams(LogicalUnit):
                     " state, not changing")
     if self.op.hvparams:
       self.cluster.hvparams = self.new_hvparams
+    if self.op.os_hvp:
+      self.cluster.os_hvp = self.new_os_hvp
     if self.op.enabled_hypervisors is not None:
       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
     if self.op.beparams:
@@ -2124,7 +2287,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None):
   """
   # 1. Gather target nodes
   myself = lu.cfg.GetNodeInfo(lu.cfg.GetMasterNode())
-  dist_nodes = lu.cfg.GetNodeList()
+  dist_nodes = lu.cfg.GetOnlineNodeList()
   if additional_nodes is not None:
     dist_nodes.extend(additional_nodes)
   if myself.name in dist_nodes:
@@ -2135,7 +2298,7 @@ def _RedistributeAncillaryFiles(lu, additional_nodes=None):
                     constants.SSH_KNOWN_HOSTS_FILE,
                     constants.RAPI_CERT_FILE,
                     constants.RAPI_USERS_FILE,
-                    constants.HMAC_CLUSTER_KEY,
+                    constants.CONFD_HMAC_KEY,
                    ])
 
   enabled_hypervisors = lu.cfg.GetClusterInfo().enabled_hypervisors
@@ -2425,8 +2588,11 @@ class LURemoveNode(LogicalUnit):
       "NODE_NAME": self.op.node_name,
       }
     all_nodes = self.cfg.GetNodeList()
-    if self.op.node_name in all_nodes:
+    try:
       all_nodes.remove(self.op.node_name)
+    except ValueError:
+      logging.warning("Node %s which is about to be removed not found"
+                      " in the all nodes list", self.op.node_name)
     return env, all_nodes, all_nodes
 
   def CheckPrereq(self):
@@ -2440,10 +2606,9 @@ class LURemoveNode(LogicalUnit):
     Any errors are signaled by raising errors.OpPrereqError.
 
     """
-    node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
-    if node is None:
-      raise errors.OpPrereqError("Node '%s' is unknown." % self.op.node_name,
-                                 errors.ECODE_NOENT)
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+    node = self.cfg.GetNodeInfo(self.op.node_name)
+    assert node is not None
 
     instance_list = self.cfg.GetInstanceList()
 
@@ -2843,12 +3008,7 @@ class LUModifyNodeStorage(NoHooksLU):
   REQ_BGL = False
 
   def CheckArguments(self):
-    node_name = self.cfg.ExpandNodeName(self.op.node_name)
-    if node_name is None:
-      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
-                                 errors.ECODE_NOENT)
-
-    self.op.node_name = node_name
+    self.opnode_name = _ExpandNodeName(self.cfg, self.op.node_name)
 
     storage_type = self.op.storage_type
     if storage_type not in constants.VALID_STORAGE_TYPES:
@@ -2900,6 +3060,10 @@ class LUAddNode(LogicalUnit):
   HTYPE = constants.HTYPE_NODE
   _OP_REQP = ["node_name"]
 
+  def CheckArguments(self):
+    # validate/normalize the node name
+    self.op.node_name = utils.HostInfo.NormalizeName(self.op.node_name)
+
   def BuildHooksEnv(self):
     """Build hooks env.
 
@@ -3122,14 +3286,11 @@ class LUSetNodeParams(LogicalUnit):
   REQ_BGL = False
 
   def CheckArguments(self):
-    node_name = self.cfg.ExpandNodeName(self.op.node_name)
-    if node_name is None:
-      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
-                                 errors.ECODE_INVAL)
-    self.op.node_name = node_name
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
     _CheckBooleanOpField(self.op, 'master_candidate')
     _CheckBooleanOpField(self.op, 'offline')
     _CheckBooleanOpField(self.op, 'drained')
+    _CheckBooleanOpField(self.op, 'auto_promote')
     all_mods = [self.op.offline, self.op.master_candidate, self.op.drained]
     if all_mods.count(None) == 3:
       raise errors.OpPrereqError("Please pass at least one modification",
@@ -3139,8 +3300,22 @@ class LUSetNodeParams(LogicalUnit):
                                  " state at the same time",
                                  errors.ECODE_INVAL)
 
+    # Boolean value that tells us whether we're offlining or draining the node
+    self.offline_or_drain = (self.op.offline == True or
+                             self.op.drained == True)
+    self.deoffline_or_drain = (self.op.offline == False or
+                               self.op.drained == False)
+    self.might_demote = (self.op.master_candidate == False or
+                         self.offline_or_drain)
+
+    self.lock_all = self.op.auto_promote and self.might_demote
+
+
   def ExpandNames(self):
-    self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
+    if self.lock_all:
+      self.needed_locks = {locking.LEVEL_NODE: locking.ALL_SET}
+    else:
+      self.needed_locks = {locking.LEVEL_NODE: self.op.node_name}
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -3175,25 +3350,17 @@ class LUSetNodeParams(LogicalUnit):
                                    " only via masterfailover",
                                    errors.ECODE_INVAL)
 
-    # Boolean value that tells us whether we're offlining or draining the node
-    offline_or_drain = self.op.offline == True or self.op.drained == True
-    deoffline_or_drain = self.op.offline == False or self.op.drained == False
-
-    if (node.master_candidate and
-        (self.op.master_candidate == False or offline_or_drain)):
-      cp_size = self.cfg.GetClusterInfo().candidate_pool_size
-      mc_now, mc_should, mc_max = self.cfg.GetMasterCandidateStats()
-      if mc_now <= cp_size:
-        msg = ("Not enough master candidates (desired"
-               " %d, new value will be %d)" % (cp_size, mc_now-1))
-        # Only allow forcing the operation if it's an offline/drain operation,
-        # and we could not possibly promote more nodes.
-        # FIXME: this can still lead to issues if in any way another node which
-        # could be promoted appears in the meantime.
-        if self.op.force and offline_or_drain and mc_should == mc_max:
-          self.LogWarning(msg)
-        else:
-          raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
+
+    if node.master_candidate and self.might_demote and not self.lock_all:
+      assert not self.op.auto_promote, "auto-promote set but lock_all not"
+      # check if after removing the current node, we're missing master
+      # candidates
+      (mc_remaining, mc_should, _) = \
+          self.cfg.GetMasterCandidateStats(exceptions=[node.name])
+      if mc_remaining < mc_should:
+        raise errors.OpPrereqError("Not enough master candidates, please"
+                                   " pass auto_promote to allow promotion",
+                                   errors.ECODE_INVAL)
 
     if (self.op.master_candidate == True and
         ((node.offline and not self.op.offline == False) or
@@ -3203,7 +3370,7 @@ class LUSetNodeParams(LogicalUnit):
                                  errors.ECODE_INVAL)
 
     # If we're being deofflined/drained, we'll MC ourself if needed
-    if (deoffline_or_drain and not offline_or_drain and not
+    if (self.deoffline_or_drain and not self.offline_or_drain and not
         self.op.master_candidate == True and not node.master_candidate):
       self.op.master_candidate = _DecideSelfPromotion(self)
       if self.op.master_candidate:
@@ -3258,8 +3425,13 @@ class LUSetNodeParams(LogicalUnit):
           node.offline = False
           result.append(("offline", "clear offline status due to drain"))
 
+    # we locked all nodes, we adjust the CP before updating this node
+    if self.lock_all:
+      _AdjustCandidatePool(self, [node.name])
+
     # this will trigger configuration file update, if needed
     self.cfg.Update(node, feedback_fn)
+
     # this will trigger job queue propagation or cleanup
     if changed_mc:
       self.context.ReaddNode(node)
@@ -3275,12 +3447,8 @@ class LUPowercycleNode(NoHooksLU):
   REQ_BGL = False
 
   def CheckArguments(self):
-    node_name = self.cfg.ExpandNodeName(self.op.node_name)
-    if node_name is None:
-      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
-                                 errors.ECODE_NOENT)
-    self.op.node_name = node_name
-    if node_name == self.cfg.GetMasterNode() and not self.op.force:
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
+    if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
       raise errors.OpPrereqError("The node is the master and the force"
                                  " parameter was not set",
                                  errors.ECODE_INVAL)
@@ -3333,6 +3501,15 @@ class LUQueryClusterInfo(NoHooksLU):
 
     """
     cluster = self.cfg.GetClusterInfo()
+    os_hvp = {}
+
+    # Filter just for enabled hypervisors
+    for os_name, hv_dict in cluster.os_hvp.items():
+      os_hvp[os_name] = {}
+      for hv_name, hv_params in hv_dict.items():
+        if hv_name in cluster.enabled_hypervisors:
+          os_hvp[os_name][hv_name] = hv_params
+
     result = {
       "software_version": constants.RELEASE_VERSION,
       "protocol_version": constants.PROTOCOL_VERSION,
@@ -3346,6 +3523,7 @@ class LUQueryClusterInfo(NoHooksLU):
       "enabled_hypervisors": cluster.enabled_hypervisors,
       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
                         for hypervisor_name in cluster.enabled_hypervisors]),
+      "os_hvp": os_hvp,
       "beparams": cluster.beparams,
       "nicparams": cluster.nicparams,
       "candidate_pool_size": cluster.candidate_pool_size,
@@ -3397,7 +3575,7 @@ class LUQueryConfigValues(NoHooksLU):
       elif field == "drain_flag":
         entry = os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
       elif field == "watcher_pause":
-        return utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
+        entry = utils.ReadWatcherPauseFile(constants.WATCHER_PAUSEFILE)
       else:
         raise errors.ParameterError(field)
       values.append(entry)
@@ -3586,14 +3764,7 @@ def _SafeShutdownInstanceDisks(lu, instance):
   _ShutdownInstanceDisks.
 
   """
-  pnode = instance.primary_node
-  ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
-  ins_l.Raise("Can't contact node %s" % pnode)
-
-  if instance.name in ins_l.payload:
-    raise errors.OpExecError("Instance is running, can't shutdown"
-                             " block devices.")
-
+  _CheckInstanceDown(lu, instance, "cannot shutdown disks")
   _ShutdownInstanceDisks(lu, instance)
 
 
@@ -3657,6 +3828,42 @@ def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
                                errors.ECODE_NORES)
 
 
+def _CheckNodesFreeDisk(lu, nodenames, requested):
+  """Checks if nodes have enough free disk space in the default VG.
+
+  This function check if all given nodes have the needed amount of
+  free disk. In case any node has less disk or we cannot get the
+  information from the node, this function raise an OpPrereqError
+  exception.
+
+  @type lu: C{LogicalUnit}
+  @param lu: a logical unit from which we get configuration data
+  @type nodenames: C{list}
+  @param node: the list of node names to check
+  @type requested: C{int}
+  @param requested: the amount of disk in MiB to check for
+  @raise errors.OpPrereqError: if the node doesn't have enough disk, or
+      we cannot check the node
+
+  """
+  nodeinfo = lu.rpc.call_node_info(nodenames, lu.cfg.GetVGName(),
+                                   lu.cfg.GetHypervisorType())
+  for node in nodenames:
+    info = nodeinfo[node]
+    info.Raise("Cannot get current information from node %s" % node,
+               prereq=True, ecode=errors.ECODE_ENVIRON)
+    vg_free = info.payload.get("vg_free", None)
+    if not isinstance(vg_free, int):
+      raise errors.OpPrereqError("Can't compute free disk space on node %s,"
+                                 " result was '%s'" % (node, vg_free),
+                                 errors.ECODE_ENVIRON)
+    if requested > vg_free:
+      raise errors.OpPrereqError("Not enough disk space on target node %s:"
+                                 " required %d MiB, available %d MiB" %
+                                 (node, requested, vg_free),
+                                 errors.ECODE_NORES)
+
+
 class LUStartupInstance(LogicalUnit):
   """Starts an instance.
 
@@ -3943,36 +4150,14 @@ class LUReinstallInstance(LogicalUnit):
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name,
                                  errors.ECODE_INVAL)
-    if instance.admin_up:
-      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
-                                 self.op.instance_name,
-                                 errors.ECODE_STATE)
-    remote_info = self.rpc.call_instance_info(instance.primary_node,
-                                              instance.name,
-                                              instance.hypervisor)
-    remote_info.Raise("Error checking node %s" % instance.primary_node,
-                      prereq=True, ecode=errors.ECODE_ENVIRON)
-    if remote_info.payload:
-      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
-                                 (self.op.instance_name,
-                                  instance.primary_node),
-                                 errors.ECODE_STATE)
+    _CheckInstanceDown(self, instance, "cannot reinstall")
 
     self.op.os_type = getattr(self.op, "os_type", None)
     self.op.force_variant = getattr(self.op, "force_variant", False)
     if self.op.os_type is not None:
       # OS verification
-      pnode = self.cfg.GetNodeInfo(
-        self.cfg.ExpandNodeName(instance.primary_node))
-      if pnode is None:
-        raise errors.OpPrereqError("Primary node '%s' is unknown" %
-                                   self.op.pnode, errors.ECODE_NOENT)
-      result = self.rpc.call_os_get(pnode.name, self.op.os_type)
-      result.Raise("OS '%s' not in supported OS list for primary node %s" %
-                   (self.op.os_type, pnode.name),
-                   prereq=True, ecode=errors.ECODE_INVAL)
-      if not self.op.force_variant:
-        _CheckOSVariant(result.payload, self.op.os_type)
+      pnode = _ExpandNodeName(self.cfg, instance.primary_node)
+      _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant)
 
     self.instance = instance
 
@@ -3990,7 +4175,9 @@ class LUReinstallInstance(LogicalUnit):
     _StartInstanceDisks(self, inst, None)
     try:
       feedback_fn("Running the instance OS create scripts...")
-      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True)
+      # FIXME: pass debug option from opcode to backend
+      result = self.rpc.call_instance_os_add(inst.primary_node, inst, True,
+                                             self.op.debug_level)
       result.Raise("Could not install OS for instance %s on node %s" %
                    (inst.name, inst.primary_node))
     finally:
@@ -4045,18 +4232,7 @@ class LURecreateInstanceDisks(LogicalUnit):
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name, errors.ECODE_INVAL)
-    if instance.admin_up:
-      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
-                                 self.op.instance_name, errors.ECODE_STATE)
-    remote_info = self.rpc.call_instance_info(instance.primary_node,
-                                              instance.name,
-                                              instance.hypervisor)
-    remote_info.Raise("Error checking node %s" % instance.primary_node,
-                      prereq=True, ecode=errors.ECODE_ENVIRON)
-    if remote_info.payload:
-      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
-                                 (self.op.instance_name,
-                                  instance.primary_node), errors.ECODE_STATE)
+    _CheckInstanceDown(self, instance, "cannot recreate disks")
 
     if not self.op.disks:
       self.op.disks = range(len(instance.disks))
@@ -4106,25 +4282,12 @@ class LURenameInstance(LogicalUnit):
     This checks that the instance is in the cluster and is not running.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name, errors.ECODE_NOENT)
+    self.op.instance_name = _ExpandInstanceName(self.cfg,
+                                                self.op.instance_name)
+    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert instance is not None
     _CheckNodeOnline(self, instance.primary_node)
-
-    if instance.admin_up:
-      raise errors.OpPrereqError("Instance '%s' is marked to be up" %
-                                 self.op.instance_name, errors.ECODE_STATE)
-    remote_info = self.rpc.call_instance_info(instance.primary_node,
-                                              instance.name,
-                                              instance.hypervisor)
-    remote_info.Raise("Error checking node %s" % instance.primary_node,
-                      prereq=True, ecode=errors.ECODE_ENVIRON)
-    if remote_info.payload:
-      raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
-                                 (self.op.instance_name,
-                                  instance.primary_node), errors.ECODE_STATE)
+    _CheckInstanceDown(self, instance, "cannot rename")
     self.instance = instance
 
     # new name verification
@@ -4174,7 +4337,7 @@ class LURenameInstance(LogicalUnit):
     _StartInstanceDisks(self, inst, None)
     try:
       result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
-                                                 old_name)
+                                                 old_name, self.op.debug_level)
       msg = result.fail_msg
       if msg:
         msg = ("Could not run OS rename script for instance %s on node %s"
@@ -4219,7 +4382,8 @@ class LURemoveInstance(LogicalUnit):
     env = _BuildInstanceHookEnvByObject(self, self.instance)
     env["SHUTDOWN_TIMEOUT"] = self.shutdown_timeout
     nl = [self.cfg.GetMasterNode()]
-    return env, nl, nl
+    nl_post = list(self.instance.all_nodes) + nl
+    return env, nl, nl_post
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4250,18 +4414,29 @@ class LURemoveInstance(LogicalUnit):
                                  " node %s: %s" %
                                  (instance.name, instance.primary_node, msg))
 
-    logging.info("Removing block devices for instance %s", instance.name)
+    _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
 
-    if not _RemoveDisks(self, instance):
-      if self.op.ignore_failures:
-        feedback_fn("Warning: can't remove instance's disks")
-      else:
-        raise errors.OpExecError("Can't remove instance's disks")
 
-    logging.info("Removing instance %s out of cluster config", instance.name)
+def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
+  """Utility function to remove an instance.
+
+  """
+  logging.info("Removing block devices for instance %s", instance.name)
+
+  if not _RemoveDisks(lu, instance):
+    if not ignore_failures:
+      raise errors.OpExecError("Can't remove instance's disks")
+    feedback_fn("Warning: can't remove instance's disks")
 
-    self.cfg.RemoveInstance(instance.name)
-    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
+  logging.info("Removing instance %s out of cluster config", instance.name)
+
+  lu.cfg.RemoveInstance(instance.name)
+
+  assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \
+    "Instance lock removal conflict"
+
+  # Remove lock for the instance
+  lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
 
 
 class LUQueryInstances(NoHooksLU):
@@ -4578,13 +4753,22 @@ class LUFailoverInstance(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
+    instance = self.instance
+    source_node = instance.primary_node
+    target_node = instance.secondary_nodes[0]
     env = {
       "IGNORE_CONSISTENCY": self.op.ignore_consistency,
       "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
+      "OLD_PRIMARY": source_node,
+      "OLD_SECONDARY": target_node,
+      "NEW_PRIMARY": target_node,
+      "NEW_SECONDARY": source_node,
       }
-    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-    nl = [self.cfg.GetMasterNode()] + list(self.instance.secondary_nodes)
-    return env, nl, nl
+    env.update(_BuildInstanceHookEnvByObject(self, instance))
+    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
+    nl_post = list(nl)
+    nl_post.append(source_node)
+    return env, nl, nl_post
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4726,11 +4910,21 @@ class LUMigrateInstance(LogicalUnit):
 
     """
     instance = self._migrater.instance
+    source_node = instance.primary_node
+    target_node = instance.secondary_nodes[0]
     env = _BuildInstanceHookEnvByObject(self, instance)
     env["MIGRATE_LIVE"] = self.op.live
     env["MIGRATE_CLEANUP"] = self.op.cleanup
+    env.update({
+        "OLD_PRIMARY": source_node,
+        "OLD_SECONDARY": target_node,
+        "NEW_PRIMARY": target_node,
+        "NEW_SECONDARY": source_node,
+        })
     nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
-    return env, nl, nl
+    nl_post = list(nl)
+    nl_post.append(source_node)
+    return env, nl, nl_post
 
 
 class LUMoveInstance(LogicalUnit):
@@ -4751,10 +4945,7 @@ class LUMoveInstance(LogicalUnit):
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
-    target_node = self.cfg.ExpandNodeName(self.op.target_node)
-    if target_node is None:
-      raise errors.OpPrereqError("Node '%s' not known" %
-                                  self.op.target_node, errors.ECODE_NOENT)
+    target_node = _ExpandNodeName(self.cfg, self.op.target_node)
     self.op.target_node = target_node
     self.needed_locks[locking.LEVEL_NODE] = [target_node]
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
@@ -4928,10 +5119,7 @@ class LUMigrateNode(LogicalUnit):
   REQ_BGL = False
 
   def ExpandNames(self):
-    self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
-    if self.op.node_name is None:
-      raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
-                                 errors.ECODE_NOENT)
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
 
     self.needed_locks = {
       locking.LEVEL_NODE: [self.op.node_name],
@@ -4991,11 +5179,9 @@ class TLMigrateInstance(Tasklet):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.instance_name, errors.ECODE_NOENT)
+    instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
+    instance = self.cfg.GetInstanceInfo(instance_name)
+    assert instance is not None
 
     if instance.disk_template != constants.DT_DRBD8:
       raise errors.OpPrereqError("Instance's disk layout is not"
@@ -5642,23 +5828,53 @@ class LUCreateInstance(LogicalUnit):
     """Check arguments.
 
     """
+    # set optional parameters to none if they don't exist
+    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
+      if not hasattr(self.op, attr):
+        setattr(self.op, attr, None)
+
     # do not require name_check to ease forward/backward compatibility
     # for tools
     if not hasattr(self.op, "name_check"):
       self.op.name_check = True
+    if not hasattr(self.op, "no_install"):
+      self.op.no_install = False
+    if self.op.no_install and self.op.start:
+      self.LogInfo("No-installation mode selected, disabling startup")
+      self.op.start = False
+    # validate/normalize the instance name
+    self.op.instance_name = utils.HostInfo.NormalizeName(self.op.instance_name)
     if self.op.ip_check and not self.op.name_check:
       # TODO: make the ip check more flexible and not depend on the name check
       raise errors.OpPrereqError("Cannot do ip checks without a name check",
                                  errors.ECODE_INVAL)
+    if (self.op.disk_template == constants.DT_FILE and
+        not constants.ENABLE_FILE_STORAGE):
+      raise errors.OpPrereqError("File storage disabled at configure time",
+                                 errors.ECODE_INVAL)
+    # check disk information: either all adopt, or no adopt
+    has_adopt = has_no_adopt = False
+    for disk in self.op.disks:
+      if "adopt" in disk:
+        has_adopt = True
+      else:
+        has_no_adopt = True
+    if has_adopt and has_no_adopt:
+      raise errors.OpPrereqError("Either all disks have are adoped or none is",
+                                 errors.ECODE_INVAL)
+    if has_adopt:
+      if self.op.disk_template != constants.DT_PLAIN:
+        raise errors.OpPrereqError("Disk adoption is only supported for the"
+                                   " 'plain' disk template",
+                                   errors.ECODE_INVAL)
+      if self.op.iallocator is not None:
+        raise errors.OpPrereqError("Disk adoption not allowed with an"
+                                   " iallocator script", errors.ECODE_INVAL)
+      if self.op.mode == constants.INSTANCE_IMPORT:
+        raise errors.OpPrereqError("Disk adoption not allowed for"
+                                   " instance import", errors.ECODE_INVAL)
 
-  def _ExpandNode(self, node):
-    """Expands and checks one node name.
-
-    """
-    node_full = self.cfg.ExpandNodeName(node)
-    if node_full is None:
-      raise errors.OpPrereqError("Unknown node %s" % node, errors.ECODE_NOENT)
-    return node_full
+    self.adopt_disks = has_adopt
 
   def ExpandNames(self):
     """ExpandNames for CreateInstance.
@@ -5668,11 +5884,6 @@ class LUCreateInstance(LogicalUnit):
     """
     self.needed_locks = {}
 
-    # set optional parameters to none if they don't exist
-    for attr in ["pnode", "snode", "iallocator", "hypervisor"]:
-      if not hasattr(self.op, attr):
-        setattr(self.op, attr, None)
-
     # cheap checks, mostly valid constants given
 
     # verify creation mode
@@ -5682,9 +5893,7 @@ class LUCreateInstance(LogicalUnit):
                                  self.op.mode, errors.ECODE_INVAL)
 
     # disk template and mirror node verification
-    if self.op.disk_template not in constants.DISK_TEMPLATES:
-      raise errors.OpPrereqError("Invalid disk template name",
-                                 errors.ECODE_INVAL)
+    _CheckDiskTemplate(self.op.disk_template)
 
     if self.op.hypervisor is None:
       self.op.hypervisor = self.cfg.GetHypervisorType()
@@ -5815,10 +6024,13 @@ class LUCreateInstance(LogicalUnit):
         raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
       try:
         size = int(size)
-      except ValueError:
+      except (TypeError, ValueError):
         raise errors.OpPrereqError("Invalid disk size '%s'" % size,
                                    errors.ECODE_INVAL)
-      self.disks.append({"size": size, "mode": mode})
+      new_disk = {"size": size, "mode": mode}
+      if "adopt" in disk:
+        new_disk["adopt"] = disk["adopt"]
+      self.disks.append(new_disk)
 
     # file storage checks
     if (self.op.file_driver and
@@ -5839,10 +6051,10 @@ class LUCreateInstance(LogicalUnit):
     if self.op.iallocator:
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
     else:
-      self.op.pnode = self._ExpandNode(self.op.pnode)
+      self.op.pnode = _ExpandNodeName(self.cfg, self.op.pnode)
       nodelist = [self.op.pnode]
       if self.op.snode is not None:
-        self.op.snode = self._ExpandNode(self.op.snode)
+        self.op.snode = _ExpandNodeName(self.cfg, self.op.snode)
         nodelist.append(self.op.snode)
       self.needed_locks[locking.LEVEL_NODE] = nodelist
 
@@ -5862,18 +6074,21 @@ class LUCreateInstance(LogicalUnit):
                                      " path requires a source node option.",
                                      errors.ECODE_INVAL)
       else:
-        self.op.src_node = src_node = self._ExpandNode(src_node)
+        self.op.src_node = src_node = _ExpandNodeName(self.cfg, src_node)
         if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
           self.needed_locks[locking.LEVEL_NODE].append(src_node)
         if not os.path.isabs(src_path):
           self.op.src_path = src_path = \
-            os.path.join(constants.EXPORT_DIR, src_path)
+            utils.PathJoin(constants.EXPORT_DIR, src_path)
 
       # On import force_variant must be True, because if we forced it at
       # initial install, our only chance when importing it back is that it
       # works again!
       self.op.force_variant = True
 
+      if self.op.no_install:
+        self.LogInfo("No-installation mode has no effect during import")
+
     else: # INSTANCE_CREATE
       if getattr(self.op, "os_type", None) is None:
         raise errors.OpPrereqError("No guest OS specified",
@@ -5905,17 +6120,17 @@ class LUCreateInstance(LogicalUnit):
                                  " iallocator '%s': %s" %
                                  (self.op.iallocator, ial.info),
                                  errors.ECODE_NORES)
-    if len(ial.nodes) != ial.required_nodes:
+    if len(ial.result) != ial.required_nodes:
       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
                                  " of nodes (%s), required %s" %
-                                 (self.op.iallocator, len(ial.nodes),
+                                 (self.op.iallocator, len(ial.result),
                                   ial.required_nodes), errors.ECODE_FAULT)
-    self.op.pnode = ial.nodes[0]
+    self.op.pnode = ial.result[0]
     self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
                  self.op.instance_name, self.op.iallocator,
-                 utils.CommaJoin(ial.nodes))
+                 utils.CommaJoin(ial.result))
     if ial.required_nodes == 2:
-      self.op.snode = ial.nodes[1]
+      self.op.snode = ial.result[1]
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -5951,7 +6166,6 @@ class LUCreateInstance(LogicalUnit):
           self.secondaries)
     return env, nl, nl
 
-
   def CheckPrereq(self):
     """Check prerequisites.
 
@@ -5975,8 +6189,8 @@ class LUCreateInstance(LogicalUnit):
           if src_path in exp_list[node].payload:
             found = True
             self.op.src_node = src_node = node
-            self.op.src_path = src_path = os.path.join(constants.EXPORT_DIR,
-                                                       src_path)
+            self.op.src_path = src_path = utils.PathJoin(constants.EXPORT_DIR,
+                                                         src_path)
             break
         if not found:
           raise errors.OpPrereqError("No export found for relative path %s" %
@@ -6013,7 +6227,7 @@ class LUCreateInstance(LogicalUnit):
         if export_info.has_option(constants.INISECT_INS, option):
           # FIXME: are the old os-es, disk sizes, etc. useful?
           export_name = export_info.get(constants.INISECT_INS, option)
-          image = os.path.join(src_path, export_name)
+          image = utils.PathJoin(src_path, export_name)
           disk_images.append(image)
         else:
           disk_images.append(False)
@@ -6087,33 +6301,43 @@ class LUCreateInstance(LogicalUnit):
     req_size = _ComputeDiskSize(self.op.disk_template,
                                 self.disks)
 
-    # Check lv size requirements
-    if req_size is not None:
-      nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
-                                         self.op.hypervisor)
-      for node in nodenames:
-        info = nodeinfo[node]
-        info.Raise("Cannot get current information from node %s" % node)
-        info = info.payload
-        vg_free = info.get('vg_free', None)
-        if not isinstance(vg_free, int):
-          raise errors.OpPrereqError("Can't compute free disk space on"
-                                     " node %s" % node, errors.ECODE_ENVIRON)
-        if req_size > vg_free:
-          raise errors.OpPrereqError("Not enough disk space on target node %s."
-                                     " %d MB available, %d MB required" %
-                                     (node, vg_free, req_size),
-                                     errors.ECODE_NORES)
+    # Check lv size requirements, if not adopting
+    if req_size is not None and not self.adopt_disks:
+      _CheckNodesFreeDisk(self, nodenames, req_size)
+
+    if self.adopt_disks: # instead, we must check the adoption data
+      all_lvs = set([i["adopt"] for i in self.disks])
+      if len(all_lvs) != len(self.disks):
+        raise errors.OpPrereqError("Duplicate volume names given for adoption",
+                                   errors.ECODE_INVAL)
+      for lv_name in all_lvs:
+        try:
+          self.cfg.ReserveLV(lv_name, self.proc.GetECId())
+        except errors.ReservationError:
+          raise errors.OpPrereqError("LV named %s used by another instance" %
+                                     lv_name, errors.ECODE_NOTUNIQUE)
+
+      node_lvs = self.rpc.call_lv_list([pnode.name],
+                                       self.cfg.GetVGName())[pnode.name]
+      node_lvs.Raise("Cannot get LV information from node %s" % pnode.name)
+      node_lvs = node_lvs.payload
+      delta = all_lvs.difference(node_lvs.keys())
+      if delta:
+        raise errors.OpPrereqError("Missing logical volume(s): %s" %
+                                   utils.CommaJoin(delta),
+                                   errors.ECODE_INVAL)
+      online_lvs = [lv for lv in all_lvs if node_lvs[lv][2]]
+      if online_lvs:
+        raise errors.OpPrereqError("Online logical volumes found, cannot"
+                                   " adopt: %s" % utils.CommaJoin(online_lvs),
+                                   errors.ECODE_STATE)
+      # update the size of disk based on what is found
+      for dsk in self.disks:
+        dsk["size"] = int(float(node_lvs[dsk["adopt"]][0]))
 
     _CheckHVParams(self, nodenames, self.op.hypervisor, self.op.hvparams)
 
-    # os verification
-    result = self.rpc.call_os_get(pnode.name, self.op.os_type)
-    result.Raise("OS '%s' not in supported os list for primary node %s" %
-                 (self.op.os_type, pnode.name),
-                 prereq=True, ecode=errors.ECODE_INVAL)
-    if not self.op.force_variant:
-      _CheckOSVariant(result.payload, self.op.os_type)
+    _CheckNodeHasOS(self, pnode.name, self.op.os_type, self.op.force_variant)
 
     _CheckNicsBridgesExist(self, self.nics, self.pnode.name)
 
@@ -6139,9 +6363,6 @@ class LUCreateInstance(LogicalUnit):
     else:
       network_port = None
 
-    ##if self.op.vnc_bind_address is None:
-    ##  self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
-
     # this is needed because os.path.join does not accept None arguments
     if self.op.file_storage_dir is None:
       string_file_storage_dir = ""
@@ -6149,10 +6370,8 @@ class LUCreateInstance(LogicalUnit):
       string_file_storage_dir = self.op.file_storage_dir
 
     # build the full file storage dir path
-    file_storage_dir = os.path.normpath(os.path.join(
-                                        self.cfg.GetFileStorageDir(),
-                                        string_file_storage_dir, instance))
-
+    file_storage_dir = utils.PathJoin(self.cfg.GetFileStorageDir(),
+                                      string_file_storage_dir, instance)
 
     disks = _GenerateDiskTemplate(self,
                                   self.op.disk_template,
@@ -6174,16 +6393,29 @@ class LUCreateInstance(LogicalUnit):
                             hypervisor=self.op.hypervisor,
                             )
 
-    feedback_fn("* creating instance disks...")
-    try:
-      _CreateDisks(self, iobj)
-    except errors.OpExecError:
-      self.LogWarning("Device creation failed, reverting...")
+    if self.adopt_disks:
+      # rename LVs to the newly-generated names; we need to construct
+      # 'fake' LV disks with the old data, plus the new unique_id
+      tmp_disks = [objects.Disk.FromDict(v.ToDict()) for v in disks]
+      rename_to = []
+      for t_dsk, a_dsk in zip (tmp_disks, self.disks):
+        rename_to.append(t_dsk.logical_id)
+        t_dsk.logical_id = (t_dsk.logical_id[0], a_dsk["adopt"])
+        self.cfg.SetDiskID(t_dsk, pnode_name)
+      result = self.rpc.call_blockdev_rename(pnode_name,
+                                             zip(tmp_disks, rename_to))
+      result.Raise("Failed to rename adoped LVs")
+    else:
+      feedback_fn("* creating instance disks...")
       try:
-        _RemoveDisks(self, iobj)
-      finally:
-        self.cfg.ReleaseDRBDMinors(instance)
-        raise
+        _CreateDisks(self, iobj)
+      except errors.OpExecError:
+        self.LogWarning("Device creation failed, reverting...")
+        try:
+          _RemoveDisks(self, iobj)
+        finally:
+          self.cfg.ReleaseDRBDMinors(instance)
+          raise
 
     feedback_fn("adding instance %s to cluster config" % instance)
 
@@ -6221,24 +6453,26 @@ class LUCreateInstance(LogicalUnit):
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance")
 
-    feedback_fn("creating os for instance %s on node %s" %
-                (instance, pnode_name))
-
-    if iobj.disk_template != constants.DT_DISKLESS:
+    if iobj.disk_template != constants.DT_DISKLESS and not self.adopt_disks:
       if self.op.mode == constants.INSTANCE_CREATE:
-        feedback_fn("* running the instance OS create scripts...")
-        result = self.rpc.call_instance_os_add(pnode_name, iobj, False)
-        result.Raise("Could not add os for instance %s"
-                     " on node %s" % (instance, pnode_name))
+        if not self.op.no_install:
+          feedback_fn("* running the instance OS create scripts...")
+          # FIXME: pass debug option from opcode to backend
+          result = self.rpc.call_instance_os_add(pnode_name, iobj, False,
+                                                 self.op.debug_level)
+          result.Raise("Could not add os for instance %s"
+                       " on node %s" % (instance, pnode_name))
 
       elif self.op.mode == constants.INSTANCE_IMPORT:
         feedback_fn("* running the instance OS import scripts...")
         src_node = self.op.src_node
         src_images = self.src_images
         cluster_name = self.cfg.GetClusterName()
+        # FIXME: pass debug option from opcode to backend
         import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
                                                          src_node, src_images,
-                                                         cluster_name)
+                                                         cluster_name,
+                                                         self.op.debug_level)
         msg = import_result.fail_msg
         if msg:
           self.LogWarning("Error while importing the disk images for instance"
@@ -6326,6 +6560,8 @@ class LUReplaceDisks(LogicalUnit):
       self.op.remote_node = None
     if not hasattr(self.op, "iallocator"):
       self.op.iallocator = None
+    if not hasattr(self.op, "early_release"):
+      self.op.early_release = False
 
     TLReplaceDisks.CheckArguments(self.op.mode, self.op.remote_node,
                                   self.op.iallocator)
@@ -6337,11 +6573,7 @@ class LUReplaceDisks(LogicalUnit):
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
 
     elif self.op.remote_node is not None:
-      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
-      if remote_node is None:
-        raise errors.OpPrereqError("Node '%s' not known" %
-                                   self.op.remote_node, errors.ECODE_NOENT)
-
+      remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
       self.op.remote_node = remote_node
 
       # Warning: do not remove the locking of the new secondary here
@@ -6357,7 +6589,7 @@ class LUReplaceDisks(LogicalUnit):
 
     self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
                                    self.op.iallocator, self.op.remote_node,
-                                   self.op.disks)
+                                   self.op.disks, False, self.op.early_release)
 
     self.tasklets = [self.replacer]
 
@@ -6404,16 +6636,15 @@ class LUEvacuateNode(LogicalUnit):
       self.op.remote_node = None
     if not hasattr(self.op, "iallocator"):
       self.op.iallocator = None
+    if not hasattr(self.op, "early_release"):
+      self.op.early_release = False
 
     TLReplaceDisks.CheckArguments(constants.REPLACE_DISK_CHG,
                                   self.op.remote_node,
                                   self.op.iallocator)
 
   def ExpandNames(self):
-    self.op.node_name = self.cfg.ExpandNodeName(self.op.node_name)
-    if self.op.node_name is None:
-      raise errors.OpPrereqError("Node '%s' not known" % self.op.node_name,
-                                 errors.ECODE_NOENT)
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
 
     self.needed_locks = {}
 
@@ -6422,18 +6653,13 @@ class LUEvacuateNode(LogicalUnit):
       self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
 
     elif self.op.remote_node is not None:
-      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
-      if remote_node is None:
-        raise errors.OpPrereqError("Node '%s' not known" %
-                                   self.op.remote_node, errors.ECODE_NOENT)
-
-      self.op.remote_node = remote_node
+      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
 
       # Warning: do not remove the locking of the new secondary here
       # unless DRBD8.AddChildren is changed to work in parallel;
       # currently it doesn't since parallel invocations of
       # FindUnusedMinor will conflict
-      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+      self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
       self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
 
     else:
@@ -6449,7 +6675,8 @@ class LUEvacuateNode(LogicalUnit):
       names.append(inst.name)
 
       replacer = TLReplaceDisks(self, inst.name, constants.REPLACE_DISK_CHG,
-                                self.op.iallocator, self.op.remote_node, [])
+                                self.op.iallocator, self.op.remote_node, [],
+                                True, self.op.early_release)
       tasklets.append(replacer)
 
     self.tasklets = tasklets
@@ -6491,7 +6718,7 @@ class TLReplaceDisks(Tasklet):
 
   """
   def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
-               disks):
+               disks, delay_iallocator, early_release):
     """Initializes this class.
 
     """
@@ -6503,6 +6730,8 @@ class TLReplaceDisks(Tasklet):
     self.iallocator_name = iallocator_name
     self.remote_node = remote_node
     self.disks = disks
+    self.delay_iallocator = delay_iallocator
+    self.early_release = early_release
 
     # Runtime data
     self.instance = None
@@ -6551,14 +6780,14 @@ class TLReplaceDisks(Tasklet):
                                  " %s" % (iallocator_name, ial.info),
                                  errors.ECODE_NORES)
 
-    if len(ial.nodes) != ial.required_nodes:
+    if len(ial.result) != ial.required_nodes:
       raise errors.OpPrereqError("iallocator '%s' returned invalid number"
                                  " of nodes (%s), required %s" %
                                  (iallocator_name,
-                                  len(ial.nodes), ial.required_nodes),
+                                  len(ial.result), ial.required_nodes),
                                  errors.ECODE_FAULT)
 
-    remote_node_name = ial.nodes[0]
+    remote_node_name = ial.result[0]
 
     lu.LogInfo("Selected new secondary for instance '%s': %s",
                instance_name, remote_node_name)
@@ -6589,6 +6818,19 @@ class TLReplaceDisks(Tasklet):
                                  len(instance.secondary_nodes),
                                  errors.ECODE_FAULT)
 
+    if not self.delay_iallocator:
+      self._CheckPrereq2()
+
+  def _CheckPrereq2(self):
+    """Check prerequisites, second part.
+
+    This function should always be part of CheckPrereq. It was separated and is
+    now called from Exec because during node evacuation iallocator was only
+    called with an unmodified cluster model, not taking planned changes into
+    account.
+
+    """
+    instance = self.instance
     secondary_node = instance.secondary_nodes[0]
 
     if self.iallocator_name is None:
@@ -6662,6 +6904,14 @@ class TLReplaceDisks(Tasklet):
 
         _CheckNodeNotDrained(self.lu, remote_node)
 
+        old_node_info = self.cfg.GetNodeInfo(secondary_node)
+        assert old_node_info is not None
+        if old_node_info.offline and not self.early_release:
+          # doesn't make sense to delay the release
+          self.early_release = True
+          self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
+                          " early-release mode", secondary_node)
+
       else:
         raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
                                      self.mode)
@@ -6692,6 +6942,9 @@ class TLReplaceDisks(Tasklet):
     This dispatches the disk replacement to the appropriate handler.
 
     """
+    if self.delay_iallocator:
+      self._CheckPrereq2()
+
     if not self.disks:
       feedback_fn("No disks need replacement")
       return
@@ -6829,6 +7082,10 @@ class TLReplaceDisks(Tasklet):
           self.lu.LogWarning("Can't remove old LV: %s" % msg,
                              hint="remove unused LVs manually")
 
+  def _ReleaseNodeLock(self, node_name):
+    """Releases the lock for a given node."""
+    self.lu.context.glm.release(locking.LEVEL_NODE, node_name)
+
   def _ExecDrbd8DiskOnly(self, feedback_fn):
     """Replace a disk on the primary or secondary for DRBD 8.
 
@@ -6939,18 +7196,30 @@ class TLReplaceDisks(Tasklet):
 
       self.cfg.Update(self.instance, feedback_fn)
 
+    cstep = 5
+    if self.early_release:
+      self.lu.LogStep(cstep, steps_total, "Removing old storage")
+      cstep += 1
+      self._RemoveOldStorage(self.target_node, iv_names)
+      # WARNING: we release both node locks here, do not do other RPCs
+      # than WaitForSync to the primary node
+      self._ReleaseNodeLock([self.target_node, self.other_node])
+
     # Wait for sync
     # This can fail as the old devices are degraded and _WaitForSync
     # does a combined result over all disks, so we don't check its return value
-    self.lu.LogStep(5, steps_total, "Sync devices")
+    self.lu.LogStep(cstep, steps_total, "Sync devices")
+    cstep += 1
     _WaitForSync(self.lu, self.instance)
 
     # Check all devices manually
     self._CheckDevices(self.instance.primary_node, iv_names)
 
     # Step: remove old storage
-    self.lu.LogStep(6, steps_total, "Removing old storage")
-    self._RemoveOldStorage(self.target_node, iv_names)
+    if not self.early_release:
+      self.lu.LogStep(cstep, steps_total, "Removing old storage")
+      cstep += 1
+      self._RemoveOldStorage(self.target_node, iv_names)
 
   def _ExecDrbd8Secondary(self, feedback_fn):
     """Replace the secondary node for DRBD 8.
@@ -7084,19 +7353,31 @@ class TLReplaceDisks(Tasklet):
                            to_node, msg,
                            hint=("please do a gnt-instance info to see the"
                                  " status of disks"))
+    cstep = 5
+    if self.early_release:
+      self.lu.LogStep(cstep, steps_total, "Removing old storage")
+      cstep += 1
+      self._RemoveOldStorage(self.target_node, iv_names)
+      # WARNING: we release all node locks here, do not do other RPCs
+      # than WaitForSync to the primary node
+      self._ReleaseNodeLock([self.instance.primary_node,
+                             self.target_node,
+                             self.new_node])
 
     # Wait for sync
     # This can fail as the old devices are degraded and _WaitForSync
     # does a combined result over all disks, so we don't check its return value
-    self.lu.LogStep(5, steps_total, "Sync devices")
+    self.lu.LogStep(cstep, steps_total, "Sync devices")
+    cstep += 1
     _WaitForSync(self.lu, self.instance)
 
     # Check all devices manually
     self._CheckDevices(self.instance.primary_node, iv_names)
 
     # Step: remove old storage
-    self.lu.LogStep(6, steps_total, "Removing old storage")
-    self._RemoveOldStorage(self.target_node, iv_names)
+    if not self.early_release:
+      self.lu.LogStep(cstep, steps_total, "Removing old storage")
+      self._RemoveOldStorage(self.target_node, iv_names)
 
 
 class LURepairNodeStorage(NoHooksLU):
@@ -7107,12 +7388,7 @@ class LURepairNodeStorage(NoHooksLU):
   REQ_BGL = False
 
   def CheckArguments(self):
-    node_name = self.cfg.ExpandNodeName(self.op.node_name)
-    if node_name is None:
-      raise errors.OpPrereqError("Invalid node name '%s'" % self.op.node_name,
-                                 errors.ECODE_NOENT)
-
-    self.op.node_name = node_name
+    self.op.node_name = _ExpandNodeName(self.cfg, self.op.node_name)
 
   def ExpandNames(self):
     self.needed_locks = {
@@ -7167,6 +7443,60 @@ class LURepairNodeStorage(NoHooksLU):
                  (self.op.name, self.op.node_name))
 
 
+class LUNodeEvacuationStrategy(NoHooksLU):
+  """Computes the node evacuation strategy.
+
+  """
+  _OP_REQP = ["nodes"]
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    if not hasattr(self.op, "remote_node"):
+      self.op.remote_node = None
+    if not hasattr(self.op, "iallocator"):
+      self.op.iallocator = None
+    if self.op.remote_node is not None and self.op.iallocator is not None:
+      raise errors.OpPrereqError("Give either the iallocator or the new"
+                                 " secondary, not both", errors.ECODE_INVAL)
+
+  def ExpandNames(self):
+    self.op.nodes = _GetWantedNodes(self, self.op.nodes)
+    self.needed_locks = locks = {}
+    if self.op.remote_node is None:
+      locks[locking.LEVEL_NODE] = locking.ALL_SET
+    else:
+      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+      locks[locking.LEVEL_NODE] = self.op.nodes + [self.op.remote_node]
+
+  def CheckPrereq(self):
+    pass
+
+  def Exec(self, feedback_fn):
+    if self.op.remote_node is not None:
+      instances = []
+      for node in self.op.nodes:
+        instances.extend(_GetNodeSecondaryInstances(self.cfg, node))
+      result = []
+      for i in instances:
+        if i.primary_node == self.op.remote_node:
+          raise errors.OpPrereqError("Node %s is the primary node of"
+                                     " instance %s, cannot use it as"
+                                     " secondary" %
+                                     (self.op.remote_node, i.name),
+                                     errors.ECODE_INVAL)
+        result.append([i.name, self.op.remote_node])
+    else:
+      ial = IAllocator(self.cfg, self.rpc,
+                       mode=constants.IALLOCATOR_MODE_MEVAC,
+                       evac_nodes=self.op.nodes)
+      ial.Run(self.op.iallocator, validate=True)
+      if not ial.success:
+        raise errors.OpExecError("No valid evacuation solution: %s" % ial.info,
+                                 errors.ECODE_NORES)
+      result = ial.result
+    return result
+
+
 class LUGrowDisk(LogicalUnit):
   """Grow a disk of an instance.
 
@@ -7196,10 +7526,7 @@ class LUGrowDisk(LogicalUnit):
       "AMOUNT": self.op.amount,
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
-    nl = [
-      self.cfg.GetMasterNode(),
-      self.instance.primary_node,
-      ]
+    nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
   def CheckPrereq(self):
@@ -7224,20 +7551,7 @@ class LUGrowDisk(LogicalUnit):
 
     self.disk = instance.FindDisk(self.op.disk)
 
-    nodeinfo = self.rpc.call_node_info(nodenames, self.cfg.GetVGName(),
-                                       instance.hypervisor)
-    for node in nodenames:
-      info = nodeinfo[node]
-      info.Raise("Cannot get current information from node %s" % node)
-      vg_free = info.payload.get('vg_free', None)
-      if not isinstance(vg_free, int):
-        raise errors.OpPrereqError("Can't compute free disk space on"
-                                   " node %s" % node, errors.ECODE_ENVIRON)
-      if self.op.amount > vg_free:
-        raise errors.OpPrereqError("Not enough disk space on target node %s:"
-                                   " %d MiB available, %d MiB required" %
-                                   (node, vg_free, self.op.amount),
-                                   errors.ECODE_NORES)
+    _CheckNodesFreeDisk(self, nodenames, self.op.amount)
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
@@ -7284,10 +7598,7 @@ class LUQueryInstanceData(NoHooksLU):
     if self.op.instances:
       self.wanted_names = []
       for name in self.op.instances:
-        full_name = self.cfg.ExpandInstanceName(name)
-        if full_name is None:
-          raise errors.OpPrereqError("Instance '%s' not known" % name,
-                                     errors.ECODE_NOENT)
+        full_name = _ExpandInstanceName(self.cfg, name)
         self.wanted_names.append(full_name)
       self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
     else:
@@ -7444,9 +7755,17 @@ class LUSetInstanceParams(LogicalUnit):
       self.op.beparams = {}
     if not hasattr(self.op, 'hvparams'):
       self.op.hvparams = {}
+    if not hasattr(self.op, "disk_template"):
+      self.op.disk_template = None
+    if not hasattr(self.op, "remote_node"):
+      self.op.remote_node = None
+    if not hasattr(self.op, "os_name"):
+      self.op.os_name = None
+    if not hasattr(self.op, "force_variant"):
+      self.op.force_variant = False
     self.op.force = getattr(self.op, "force", False)
-    if not (self.op.nics or self.op.disks or
-            self.op.hvparams or self.op.beparams):
+    if not (self.op.nics or self.op.disks or self.op.disk_template or
+            self.op.hvparams or self.op.beparams or self.op.os_name):
       raise errors.OpPrereqError("No changes submitted", errors.ECODE_INVAL)
 
     if self.op.hvparams:
@@ -7478,7 +7797,7 @@ class LUSetInstanceParams(LogicalUnit):
                                      errors.ECODE_INVAL)
         try:
           size = int(size)
-        except ValueError, err:
+        except (TypeError, ValueError), err:
           raise errors.OpPrereqError("Invalid disk size parameter: %s" %
                                      str(err), errors.ECODE_INVAL)
         disk_dict['size'] = size
@@ -7492,6 +7811,19 @@ class LUSetInstanceParams(LogicalUnit):
       raise errors.OpPrereqError("Only one disk add or remove operation"
                                  " supported at a time", errors.ECODE_INVAL)
 
+    if self.op.disks and self.op.disk_template is not None:
+      raise errors.OpPrereqError("Disk template conversion and other disk"
+                                 " changes not supported at the same time",
+                                 errors.ECODE_INVAL)
+
+    if self.op.disk_template:
+      _CheckDiskTemplate(self.op.disk_template)
+      if (self.op.disk_template in constants.DTS_NET_MIRROR and
+          self.op.remote_node is None):
+        raise errors.OpPrereqError("Changing the disk template to a mirrored"
+                                   " one requires specifying a secondary node",
+                                   errors.ECODE_INVAL)
+
     # NIC validation
     nic_addremove = 0
     for nic_op, nic_dict in self.op.nics:
@@ -7554,6 +7886,9 @@ class LUSetInstanceParams(LogicalUnit):
   def DeclareLocks(self, level):
     if level == locking.LEVEL_NODE:
       self._LockInstancesNodes()
+      if self.op.disk_template and self.op.remote_node:
+        self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+        self.needed_locks[locking.LEVEL_NODE].append(self.op.remote_node)
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -7603,6 +7938,8 @@ class LUSetInstanceParams(LogicalUnit):
         del args['nics'][-1]
 
     env = _BuildInstanceHookEnvByObject(self, self.instance, override=args)
+    if self.op.disk_template:
+      env["NEW_DISK_TEMPLATE"] = self.op.disk_template
     nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
     return env, nl, nl
 
@@ -7656,6 +7993,25 @@ class LUSetInstanceParams(LogicalUnit):
     pnode = instance.primary_node
     nodelist = list(instance.all_nodes)
 
+    if self.op.disk_template:
+      if instance.disk_template == self.op.disk_template:
+        raise errors.OpPrereqError("Instance already has disk template %s" %
+                                   instance.disk_template, errors.ECODE_INVAL)
+
+      if (instance.disk_template,
+          self.op.disk_template) not in self._DISK_CONVERSIONS:
+        raise errors.OpPrereqError("Unsupported disk template conversion from"
+                                   " %s to %s" % (instance.disk_template,
+                                                  self.op.disk_template),
+                                   errors.ECODE_INVAL)
+      if self.op.disk_template in constants.DTS_NET_MIRROR:
+        _CheckNodeOnline(self, self.op.remote_node)
+        _CheckNodeNotDrained(self, self.op.remote_node)
+        disks = [{"size": d.size} for d in instance.disks]
+        required = _ComputeDiskSize(self.op.disk_template, disks)
+        _CheckNodesFreeDisk(self, [self.op.remote_node], required)
+        _CheckInstanceDown(self, instance, "cannot change disk template")
+
     # hvparams processing
     if self.op.hvparams:
       i_hvdict, hv_new = self._GetUpdatedParams(
@@ -7821,17 +8177,8 @@ class LUSetInstanceParams(LogicalUnit):
       if disk_op == constants.DDM_REMOVE:
         if len(instance.disks) == 1:
           raise errors.OpPrereqError("Cannot remove the last disk of"
-                                     " an instance",
-                                     errors.ECODE_INVAL)
-        ins_l = self.rpc.call_instance_list([pnode], [instance.hypervisor])
-        ins_l = ins_l[pnode]
-        msg = ins_l.fail_msg
-        if msg:
-          raise errors.OpPrereqError("Can't contact node %s: %s" %
-                                     (pnode, msg), errors.ECODE_ENVIRON)
-        if instance.name in ins_l.payload:
-          raise errors.OpPrereqError("Instance is running, can't remove"
-                                     " disks.", errors.ECODE_STATE)
+                                     " an instance", errors.ECODE_INVAL)
+        _CheckInstanceDown(self, instance, "cannot remove disks")
 
       if (disk_op == constants.DDM_ADD and
           len(instance.nics) >= constants.MAX_DISKS):
@@ -7846,8 +8193,103 @@ class LUSetInstanceParams(LogicalUnit):
                                      (disk_op, len(instance.disks)),
                                      errors.ECODE_INVAL)
 
+    # OS change
+    if self.op.os_name and not self.op.force:
+      _CheckNodeHasOS(self, instance.primary_node, self.op.os_name,
+                      self.op.force_variant)
+
     return
 
+  def _ConvertPlainToDrbd(self, feedback_fn):
+    """Converts an instance from plain to drbd.
+
+    """
+    feedback_fn("Converting template to drbd")
+    instance = self.instance
+    pnode = instance.primary_node
+    snode = self.op.remote_node
+
+    # create a fake disk info for _GenerateDiskTemplate
+    disk_info = [{"size": d.size, "mode": d.mode} for d in instance.disks]
+    new_disks = _GenerateDiskTemplate(self, self.op.disk_template,
+                                      instance.name, pnode, [snode],
+                                      disk_info, None, None, 0)
+    info = _GetInstanceInfoText(instance)
+    feedback_fn("Creating aditional volumes...")
+    # first, create the missing data and meta devices
+    for disk in new_disks:
+      # unfortunately this is... not too nice
+      _CreateSingleBlockDev(self, pnode, instance, disk.children[1],
+                            info, True)
+      for child in disk.children:
+        _CreateSingleBlockDev(self, snode, instance, child, info, True)
+    # at this stage, all new LVs have been created, we can rename the
+    # old ones
+    feedback_fn("Renaming original volumes...")
+    rename_list = [(o, n.children[0].logical_id)
+                   for (o, n) in zip(instance.disks, new_disks)]
+    result = self.rpc.call_blockdev_rename(pnode, rename_list)
+    result.Raise("Failed to rename original LVs")
+
+    feedback_fn("Initializing DRBD devices...")
+    # all child devices are in place, we can now create the DRBD devices
+    for disk in new_disks:
+      for node in [pnode, snode]:
+        f_create = node == pnode
+        _CreateSingleBlockDev(self, node, instance, disk, info, f_create)
+
+    # at this point, the instance has been modified
+    instance.disk_template = constants.DT_DRBD8
+    instance.disks = new_disks
+    self.cfg.Update(instance, feedback_fn)
+
+    # disks are created, waiting for sync
+    disk_abort = not _WaitForSync(self, instance)
+    if disk_abort:
+      raise errors.OpExecError("There are some degraded disks for"
+                               " this instance, please cleanup manually")
+
+  def _ConvertDrbdToPlain(self, feedback_fn):
+    """Converts an instance from drbd to plain.
+
+    """
+    instance = self.instance
+    assert len(instance.secondary_nodes) == 1
+    pnode = instance.primary_node
+    snode = instance.secondary_nodes[0]
+    feedback_fn("Converting template to plain")
+
+    old_disks = instance.disks
+    new_disks = [d.children[0] for d in old_disks]
+
+    # copy over size and mode
+    for parent, child in zip(old_disks, new_disks):
+      child.size = parent.size
+      child.mode = parent.mode
+
+    # update instance structure
+    instance.disks = new_disks
+    instance.disk_template = constants.DT_PLAIN
+    self.cfg.Update(instance, feedback_fn)
+
+    feedback_fn("Removing volumes on the secondary node...")
+    for disk in old_disks:
+      self.cfg.SetDiskID(disk, snode)
+      msg = self.rpc.call_blockdev_remove(snode, disk).fail_msg
+      if msg:
+        self.LogWarning("Could not remove block device %s on node %s,"
+                        " continuing anyway: %s", disk.iv_name, snode, msg)
+
+    feedback_fn("Removing unneeded volumes on the primary node...")
+    for idx, disk in enumerate(old_disks):
+      meta = disk.children[1]
+      self.cfg.SetDiskID(meta, pnode)
+      msg = self.rpc.call_blockdev_remove(pnode, meta).fail_msg
+      if msg:
+        self.LogWarning("Could not remove metadata for disk %d on node %s,"
+                        " continuing anyway: %s", idx, pnode, msg)
+
+
   def Exec(self, feedback_fn):
     """Modifies an instance.
 
@@ -7912,6 +8354,20 @@ class LUSetInstanceParams(LogicalUnit):
         # change a given disk
         instance.disks[disk_op].mode = disk_dict['mode']
         result.append(("disk.mode/%d" % disk_op, disk_dict['mode']))
+
+    if self.op.disk_template:
+      r_shut = _ShutdownInstanceDisks(self, instance)
+      if not r_shut:
+        raise errors.OpExecError("Cannot shutdow instance disks, unable to"
+                                 " proceed with disk template conversion")
+      mode = (instance.disk_template, self.op.disk_template)
+      try:
+        self._DISK_CONVERSIONS[mode](self, feedback_fn)
+      except:
+        self.cfg.ReleaseDRBDMinors(instance.name)
+        raise
+      result.append(("disk_template", self.op.disk_template))
+
     # NIC changes
     for nic_op, nic_dict in self.op.nics:
       if nic_op == constants.DDM_REMOVE:
@@ -7952,10 +8408,18 @@ class LUSetInstanceParams(LogicalUnit):
       for key, val in self.op.beparams.iteritems():
         result.append(("be/%s" % key, val))
 
+    # OS change
+    if self.op.os_name:
+      instance.os = self.op.os_name
+
     self.cfg.Update(instance, feedback_fn)
 
     return result
 
+  _DISK_CONVERSIONS = {
+    (constants.DT_PLAIN, constants.DT_DRBD8): _ConvertPlainToDrbd,
+    (constants.DT_DRBD8, constants.DT_PLAIN): _ConvertDrbdToPlain,
+    }
 
 class LUQueryExports(NoHooksLU):
   """Query the exports list
@@ -8012,11 +8476,22 @@ class LUExportInstance(LogicalUnit):
     """Check the arguments.
 
     """
+    _CheckBooleanOpField(self.op, "remove_instance")
+    _CheckBooleanOpField(self.op, "ignore_remove_failures")
+
     self.shutdown_timeout = getattr(self.op, "shutdown_timeout",
                                     constants.DEFAULT_SHUTDOWN_TIMEOUT)
+    self.remove_instance = getattr(self.op, "remove_instance", False)
+    self.ignore_remove_failures = getattr(self.op, "ignore_remove_failures",
+                                          False)
+
+    if self.remove_instance and not self.op.shutdown:
+      raise errors.OpPrereqError("Can not remove instance without shutting it"
+                                 " down before")
 
   def ExpandNames(self):
     self._ExpandAndLockInstance()
+
     # FIXME: lock only instance primary and destination node
     #
     # Sad but true, for now we have do lock all nodes, as we don't know where
@@ -8041,6 +8516,8 @@ class LUExportInstance(LogicalUnit):
       "EXPORT_NODE": self.op.target_node,
       "EXPORT_DO_SHUTDOWN": self.op.shutdown,
       "SHUTDOWN_TIMEOUT": self.shutdown_timeout,
+      # TODO: Generic function for boolean env variables
+      "REMOVE_INSTANCE": str(bool(self.remove_instance)),
       }
     env.update(_BuildInstanceHookEnvByObject(self, self.instance))
     nl = [self.cfg.GetMasterNode(), self.instance.primary_node,
@@ -8059,17 +8536,15 @@ class LUExportInstance(LogicalUnit):
           "Cannot retrieve locked instance %s" % self.op.instance_name
     _CheckNodeOnline(self, self.instance.primary_node)
 
-    self.dst_node = self.cfg.GetNodeInfo(
-      self.cfg.ExpandNodeName(self.op.target_node))
+    self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
+    self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
+    assert self.dst_node is not None
 
-    if self.dst_node is None:
-      # This is wrong node name, not a non-locked node
-      raise errors.OpPrereqError("Wrong node name %s" % self.op.target_node,
-                                 errors.ECODE_NOENT)
     _CheckNodeOnline(self, self.dst_node.name)
     _CheckNodeNotDrained(self, self.dst_node.name)
 
     # instance disk type verification
+    # TODO: Implement export support for file-based disks
     for disk in self.instance.disks:
       if disk.dev_type == constants.LD_FILE:
         raise errors.OpPrereqError("Export not supported for instances with"
@@ -8088,6 +8563,7 @@ class LUExportInstance(LogicalUnit):
       feedback_fn("Shutting down instance %s" % instance.name)
       result = self.rpc.call_instance_shutdown(src_node, instance,
                                                self.shutdown_timeout)
+      # TODO: Maybe ignore failures if ignore_remove_failures is set
       result.Raise("Could not shutdown instance %s on"
                    " node %s" % (instance.name, src_node))
 
@@ -8131,7 +8607,7 @@ class LUExportInstance(LogicalUnit):
             snap_disks.append(new_dev)
 
       finally:
-        if self.op.shutdown and instance.admin_up:
+        if self.op.shutdown and instance.admin_up and not self.remove_instance:
           feedback_fn("Starting instance %s" % instance.name)
           result = self.rpc.call_instance_start(src_node, instance, None, None)
           msg = result.fail_msg
@@ -8146,8 +8622,10 @@ class LUExportInstance(LogicalUnit):
         feedback_fn("Exporting snapshot %s from %s to %s" %
                     (idx, src_node, dst_node.name))
         if dev:
+          # FIXME: pass debug from opcode to backend
           result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
-                                                 instance, cluster_name, idx)
+                                                 instance, cluster_name,
+                                                 idx, self.op.debug_level)
           msg = result.fail_msg
           if msg:
             self.LogWarning("Could not export disk/%s from node %s to"
@@ -8177,6 +8655,11 @@ class LUExportInstance(LogicalUnit):
         feedback_fn("Deactivating disks for %s" % instance.name)
         _ShutdownInstanceDisks(self, instance)
 
+    # Remove instance if requested
+    if self.remove_instance:
+      feedback_fn("Removing instance %s" % instance.name)
+      _RemoveInstance(self, feedback_fn, instance, self.ignore_remove_failures)
+
     nodelist = self.cfg.GetNodeList()
     nodelist.remove(dst_node.name)
 
@@ -8195,6 +8678,7 @@ class LUExportInstance(LogicalUnit):
           if msg:
             self.LogWarning("Could not remove older export for instance %s"
                             " on node %s: %s", iname, node, msg)
+
     return fin_resu, dresults
 
 
@@ -8261,19 +8745,11 @@ class TagsLU(NoHooksLU): # pylint: disable-msg=W0223
   def ExpandNames(self):
     self.needed_locks = {}
     if self.op.kind == constants.TAG_NODE:
-      name = self.cfg.ExpandNodeName(self.op.name)
-      if name is None:
-        raise errors.OpPrereqError("Invalid node name (%s)" %
-                                   (self.op.name,), errors.ECODE_NOENT)
-      self.op.name = name
-      self.needed_locks[locking.LEVEL_NODE] = name
+      self.op.name = _ExpandNodeName(self.cfg, self.op.name)
+      self.needed_locks[locking.LEVEL_NODE] = self.op.name
     elif self.op.kind == constants.TAG_INSTANCE:
-      name = self.cfg.ExpandInstanceName(self.op.name)
-      if name is None:
-        raise errors.OpPrereqError("Invalid instance name (%s)" %
-                                   (self.op.name,), errors.ECODE_NOENT)
-      self.op.name = name
-      self.needed_locks[locking.LEVEL_INSTANCE] = name
+      self.op.name = _ExpandInstanceName(self.cfg, self.op.name)
+      self.needed_locks[locking.LEVEL_INSTANCE] = self.op.name
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -8465,33 +8941,42 @@ class IAllocator(object):
   # pylint: disable-msg=R0902
   # lots of instance attributes
   _ALLO_KEYS = [
-    "mem_size", "disks", "disk_template",
+    "name", "mem_size", "disks", "disk_template",
     "os", "tags", "nics", "vcpus", "hypervisor",
     ]
   _RELO_KEYS = [
-    "relocate_from",
+    "name", "relocate_from",
+    ]
+  _EVAC_KEYS = [
+    "evac_nodes",
     ]
 
-  def __init__(self, cfg, rpc, mode, name, **kwargs):
+  def __init__(self, cfg, rpc, mode, **kwargs):
     self.cfg = cfg
     self.rpc = rpc
     # init buffer variables
     self.in_text = self.out_text = self.in_data = self.out_data = None
     # init all input fields so that pylint is happy
     self.mode = mode
-    self.name = name
     self.mem_size = self.disks = self.disk_template = None
     self.os = self.tags = self.nics = self.vcpus = None
     self.hypervisor = None
     self.relocate_from = None
+    self.name = None
+    self.evac_nodes = None
     # computed fields
     self.required_nodes = None
     # init result fields
-    self.success = self.info = self.nodes = None
+    self.success = self.info = self.result = None
     if self.mode == constants.IALLOCATOR_MODE_ALLOC:
       keyset = self._ALLO_KEYS
+      fn = self._AddNewInstance
     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
       keyset = self._RELO_KEYS
+      fn = self._AddRelocateInstance
+    elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
+      keyset = self._EVAC_KEYS
+      fn = self._AddEvacuateNodes
     else:
       raise errors.ProgrammerError("Unknown mode '%s' passed to the"
                                    " IAllocator" % self.mode)
@@ -8500,11 +8985,12 @@ class IAllocator(object):
         raise errors.ProgrammerError("Invalid input parameter '%s' to"
                                      " IAllocator" % key)
       setattr(self, key, kwargs[key])
+
     for key in keyset:
       if key not in kwargs:
         raise errors.ProgrammerError("Missing input parameter '%s' to"
                                      " IAllocator" % key)
-    self._BuildInputData()
+    self._BuildInputData(fn)
 
   def _ComputeClusterData(self):
     """Compute the generic allocator input data.
@@ -8533,6 +9019,8 @@ class IAllocator(object):
       hypervisor_name = self.hypervisor
     elif self.mode == constants.IALLOCATOR_MODE_RELOC:
       hypervisor_name = cfg.GetInstanceInfo(self.name).hypervisor
+    elif self.mode == constants.IALLOCATOR_MODE_MEVAC:
+      hypervisor_name = cluster_info.enabled_hypervisors[0]
 
     node_data = self.rpc.call_node_info(node_list, cfg.GetVGName(),
                                         hypervisor_name)
@@ -8643,8 +9131,6 @@ class IAllocator(object):
     done.
 
     """
-    data = self.in_data
-
     disk_space = _ComputeDiskSize(self.disk_template, self.disks)
 
     if self.disk_template in constants.DTS_NET_MIRROR:
@@ -8652,7 +9138,6 @@ class IAllocator(object):
     else:
       self.required_nodes = 1
     request = {
-      "type": "allocate",
       "name": self.name,
       "disk_template": self.disk_template,
       "tags": self.tags,
@@ -8664,7 +9149,7 @@ class IAllocator(object):
       "nics": self.nics,
       "required_nodes": self.required_nodes,
       }
-    data["request"] = request
+    return request
 
   def _AddRelocateInstance(self):
     """Add relocate instance data to allocator structure.
@@ -8694,24 +9179,31 @@ class IAllocator(object):
     disk_space = _ComputeDiskSize(instance.disk_template, disk_sizes)
 
     request = {
-      "type": "relocate",
       "name": self.name,
       "disk_space_total": disk_space,
       "required_nodes": self.required_nodes,
       "relocate_from": self.relocate_from,
       }
-    self.in_data["request"] = request
+    return request
+
+  def _AddEvacuateNodes(self):
+    """Add evacuate nodes data to allocator structure.
+
+    """
+    request = {
+      "evac_nodes": self.evac_nodes
+      }
+    return request
 
-  def _BuildInputData(self):
+  def _BuildInputData(self, fn):
     """Build input data structures.
 
     """
     self._ComputeClusterData()
 
-    if self.mode == constants.IALLOCATOR_MODE_ALLOC:
-      self._AddNewInstance()
-    else:
-      self._AddRelocateInstance()
+    request = fn()
+    request["type"] = self.mode
+    self.in_data["request"] = request
 
     self.in_text = serializer.Dump(self.in_data)
 
@@ -8744,14 +9236,19 @@ class IAllocator(object):
     if not isinstance(rdict, dict):
       raise errors.OpExecError("Can't parse iallocator results: not a dict")
 
-    for key in "success", "info", "nodes":
+    # TODO: remove backwards compatiblity in later versions
+    if "nodes" in rdict and "result" not in rdict:
+      rdict["result"] = rdict["nodes"]
+      del rdict["nodes"]
+
+    for key in "success", "info", "result":
       if key not in rdict:
         raise errors.OpExecError("Can't parse iallocator results:"
                                  " missing key '%s'" % key)
       setattr(self, key, rdict[key])
 
-    if not isinstance(rdict["nodes"], list):
-      raise errors.OpExecError("Can't parse iallocator results: 'nodes' key"
+    if not isinstance(rdict["result"], list):
+      raise errors.OpExecError("Can't parse iallocator results: 'result' key"
                                " is not a list")
     self.out_data = rdict
 
@@ -8807,12 +9304,13 @@ class LUTestAllocator(NoHooksLU):
       if not hasattr(self.op, "name"):
         raise errors.OpPrereqError("Missing attribute 'name' on opcode input",
                                    errors.ECODE_INVAL)
-      fname = self.cfg.ExpandInstanceName(self.op.name)
-      if fname is None:
-        raise errors.OpPrereqError("Instance '%s' not found for relocation" %
-                                   self.op.name, errors.ECODE_NOENT)
+      fname = _ExpandInstanceName(self.cfg, self.op.name)
       self.op.name = fname
       self.relocate_from = self.cfg.GetInstanceInfo(fname).secondary_nodes
+    elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
+      if not hasattr(self.op, "evac_nodes"):
+        raise errors.OpPrereqError("Missing attribute 'evac_nodes' on"
+                                   " opcode input", errors.ECODE_INVAL)
     else:
       raise errors.OpPrereqError("Invalid test allocator mode '%s'" %
                                  self.op.mode, errors.ECODE_INVAL)
@@ -8842,12 +9340,19 @@ class LUTestAllocator(NoHooksLU):
                        vcpus=self.op.vcpus,
                        hypervisor=self.op.hypervisor,
                        )
-    else:
+    elif self.op.mode == constants.IALLOCATOR_MODE_RELOC:
       ial = IAllocator(self.cfg, self.rpc,
                        mode=self.op.mode,
                        name=self.op.name,
                        relocate_from=list(self.relocate_from),
                        )
+    elif self.op.mode == constants.IALLOCATOR_MODE_MEVAC:
+      ial = IAllocator(self.cfg, self.rpc,
+                       mode=self.op.mode,
+                       evac_nodes=self.op.evac_nodes)
+    else:
+      raise errors.ProgrammerError("Uncatched mode %s in"
+                                   " LUTestAllocator.Exec", self.op.mode)
 
     if self.op.direction == constants.IALLOCATOR_DIR_IN:
       result = ial.in_text