Improve LUQueryNodes for lockless case
[ganeti-local] / lib / cmdlib.py
index ab40449..87ce76d 100644 (file)
 
 import os
 import os.path
-import sha
 import time
-import tempfile
 import re
 import platform
 import logging
 import copy
-import random
 
 from ganeti import ssh
 from ganeti import utils
@@ -41,7 +38,6 @@ from ganeti import hypervisor
 from ganeti import locking
 from ganeti import constants
 from ganeti import objects
-from ganeti import opcodes
 from ganeti import serializer
 from ganeti import ssconf
 
@@ -69,7 +65,7 @@ class LogicalUnit(object):
   def __init__(self, processor, op, context, rpc):
     """Constructor for LogicalUnit.
 
-    This needs to be overriden in derived classes in order to check op
+    This needs to be overridden in derived classes in order to check op
     validity.
 
     """
@@ -117,7 +113,7 @@ class LogicalUnit(object):
     CheckPrereq, doing these separate is better because:
 
       - ExpandNames is left as as purely a lock-related function
-      - CheckPrereq is run after we have aquired locks (and possible
+      - CheckPrereq is run after we have acquired locks (and possible
         waited for them)
 
     The function is allowed to change the self.op attribute so that
@@ -454,7 +450,8 @@ def _CheckNodeNotDrained(lu, node):
 
 
 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
-                          memory, vcpus, nics, disk_template, disks):
+                          memory, vcpus, nics, disk_template, disks,
+                          bep, hvp, hypervisor_name):
   """Builds instance related env variables for hooks
 
   This builds the hook environment from individual variables.
@@ -477,9 +474,15 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
   @param nics: list of tuples (ip, bridge, mac) representing
       the NICs the instance  has
   @type disk_template: string
-  @param disk_template: the distk template of the instance
+  @param disk_template: the disk template of the instance
   @type disks: list
   @param disks: the list of (size, mode) pairs
+  @type bep: dict
+  @param bep: the backend parameters for the instance
+  @type hvp: dict
+  @param hvp: the hypervisor parameters for the instance
+  @type hypervisor_name: string
+  @param hypervisor_name: the hypervisor for the instance
   @rtype: dict
   @return: the hook environment for this instance
 
@@ -498,6 +501,7 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
     "INSTANCE_MEMORY": memory,
     "INSTANCE_VCPUS": vcpus,
     "INSTANCE_DISK_TEMPLATE": disk_template,
+    "INSTANCE_HYPERVISOR": hypervisor_name,
   }
 
   if nics:
@@ -523,6 +527,10 @@ def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
 
   env["INSTANCE_DISK_COUNT"] = disk_count
 
+  for source, kind in [(bep, "BE"), (hvp, "HV")]:
+    for key, value in source.items():
+      env["INSTANCE_%s_%s" % (kind, key)] = value
+
   return env
 
 
@@ -541,7 +549,9 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None):
   @return: the hook environment dictionary
 
   """
-  bep = lu.cfg.GetClusterInfo().FillBE(instance)
+  cluster = lu.cfg.GetClusterInfo()
+  bep = cluster.FillBE(instance)
+  hvp = cluster.FillHV(instance)
   args = {
     'name': instance.name,
     'primary_node': instance.primary_node,
@@ -553,6 +563,9 @@ def _BuildInstanceHookEnvByObject(lu, instance, override=None):
     'nics': [(nic.ip, nic.bridge, nic.mac) for nic in instance.nics],
     'disk_template': instance.disk_template,
     'disks': [(disk.size, disk.mode) for disk in instance.disks],
+    'bep': bep,
+    'hvp': hvp,
+    'hypervisor_name': instance.hypervisor,
   }
   if override:
     args.update(override)
@@ -576,10 +589,10 @@ def _AdjustCandidatePool(lu):
 
 
 def _CheckInstanceBridgesExist(lu, instance):
-  """Check that the brigdes needed by an instance exist.
+  """Check that the bridges needed by an instance exist.
 
   """
-  # check bridges existance
+  # check bridges existence
   brlist = [nic.bridge for nic in instance.nics]
   result = lu.rpc.call_bridges_exist(instance.primary_node, brlist)
   result.Raise()
@@ -600,7 +613,7 @@ class LUDestroyCluster(NoHooksLU):
 
     This checks whether the cluster is empty.
 
-    Any errors are signalled by raising errors.OpPrereqError.
+    Any errors are signaled by raising errors.OpPrereqError.
 
     """
     master = self.cfg.GetMasterNode()
@@ -653,7 +666,7 @@ class LUVerifyCluster(LogicalUnit):
     Test list:
 
       - compares ganeti version
-      - checks vg existance and size > 20G
+      - checks vg existence and size > 20G
       - checks config file checksum
       - checks ssh to other nodes
 
@@ -735,8 +748,8 @@ class LUVerifyCluster(LogicalUnit):
           else:
             # not candidate and this is not a must-have file
             bad = True
-            feedback_fn("  - ERROR: non master-candidate has old/wrong file"
-                        " '%s'" % file_name)
+            feedback_fn("  - ERROR: file '%s' should not exist on non master"
+                        " candidates (and the file is outdated)" % file_name)
         else:
           # all good, except non-master/non-must have combination
           if not node_is_mc and not must_have_file:
@@ -892,7 +905,7 @@ class LUVerifyCluster(LogicalUnit):
           if bep[constants.BE_AUTO_BALANCE]:
             needed_mem += bep[constants.BE_MEMORY]
         if nodeinfo['mfree'] < needed_mem:
-          feedback_fn("  - ERROR: not enough memory on node %s to accomodate"
+          feedback_fn("  - ERROR: not enough memory on node %s to accommodate"
                       " failovers should node %s fail" % (node, prinode))
           bad = True
     return bad
@@ -911,7 +924,7 @@ class LUVerifyCluster(LogicalUnit):
   def BuildHooksEnv(self):
     """Build hooks env.
 
-    Cluster-Verify hooks just rone in the post phase and their failure makes
+    Cluster-Verify hooks just ran in the post phase and their failure makes
     the output be logged in the verify output and the verification to fail.
 
     """
@@ -1178,7 +1191,7 @@ class LUVerifyCluster(LogicalUnit):
     return not bad
 
   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
-    """Analize the post-hooks' result
+    """Analyze the post-hooks' result
 
     This method analyses the hook result, handles it, and sends some
     nicely-formatted feedback back to the user.
@@ -1277,7 +1290,6 @@ class LUVerifyDisks(NoHooksLU):
 
     node_lvs = self.rpc.call_volume_list(nodes, vg_name)
 
-    to_act = set()
     for node in nodes:
       # node_volume
       lvs = node_lvs[node]
@@ -1313,6 +1325,128 @@ class LUVerifyDisks(NoHooksLU):
     return result
 
 
+class LURepairDiskSizes(NoHooksLU):
+  """Verifies the cluster disks sizes.
+
+  """
+  _OP_REQP = ["instances"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+
+    if not isinstance(self.op.instances, list):
+      raise errors.OpPrereqError("Invalid argument type 'instances'")
+
+    if self.op.instances:
+      self.wanted_names = []
+      for name in self.op.instances:
+        full_name = self.cfg.ExpandInstanceName(name)
+        if full_name is None:
+          raise errors.OpPrereqError("Instance '%s' not known" % name)
+        self.wanted_names.append(full_name)
+      self.needed_locks = {
+        locking.LEVEL_NODE: [],
+        locking.LEVEL_INSTANCE: self.wanted_names,
+        }
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+    else:
+      self.wanted_names = None
+      self.needed_locks = {
+        locking.LEVEL_NODE: locking.ALL_SET,
+        locking.LEVEL_INSTANCE: locking.ALL_SET,
+        }
+    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE and self.wanted_names is not None:
+      self._LockInstancesNodes(primary_only=True)
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This only checks the optional instance list against the existing names.
+
+    """
+    if self.wanted_names is None:
+      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+
+    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
+                             in self.wanted_names]
+
+  def _EnsureChildSizes(self, disk):
+    """Ensure children of the disk have the needed disk size.
+
+    This is valid mainly for DRBD8 and fixes an issue where the
+    children have smaller disk size.
+
+    @param disk: an L{ganeti.objects.Disk} object
+
+    """
+    if disk.dev_type == constants.LD_DRBD8:
+      assert disk.children, "Empty children for DRBD8?"
+      fchild = disk.children[0]
+      mismatch = fchild.size < disk.size
+      if mismatch:
+        self.LogInfo("Child disk has size %d, parent %d, fixing",
+                     fchild.size, disk.size)
+        fchild.size = disk.size
+
+      # and we recurse on this child only, not on the metadev
+      return self._EnsureChildSizes(fchild) or mismatch
+    else:
+      return False
+
+  def Exec(self, feedback_fn):
+    """Verify the size of cluster disks.
+
+    """
+    # TODO: check child disks too
+    # TODO: check differences in size between primary/secondary nodes
+    per_node_disks = {}
+    for instance in self.wanted_instances:
+      pnode = instance.primary_node
+      if pnode not in per_node_disks:
+        per_node_disks[pnode] = []
+      for idx, disk in enumerate(instance.disks):
+        per_node_disks[pnode].append((instance, idx, disk))
+
+    changed = []
+    for node, dskl in per_node_disks.items():
+      newl = [v[2].Copy() for v in dskl]
+      for dsk in newl:
+        self.cfg.SetDiskID(dsk, node)
+      result = self.rpc.call_blockdev_getsizes(node, newl)
+      if result.failed:
+        self.LogWarning("Failure in blockdev_getsizes call to node"
+                        " %s, ignoring", node)
+        continue
+      if len(result.data) != len(dskl):
+        self.LogWarning("Invalid result from node %s, ignoring node results",
+                        node)
+        continue
+      for ((instance, idx, disk), size) in zip(dskl, result.data):
+        if size is None:
+          self.LogWarning("Disk %d of instance %s did not return size"
+                          " information, ignoring", idx, instance.name)
+          continue
+        if not isinstance(size, (int, long)):
+          self.LogWarning("Disk %d of instance %s did not return valid"
+                          " size information, ignoring", idx, instance.name)
+          continue
+        size = size >> 20
+        if size != disk.size:
+          self.LogInfo("Disk %d of instance %s has mismatched size,"
+                       " correcting: recorded %d, actual %d", idx,
+                       instance.name, disk.size, size)
+          disk.size = size
+          self.cfg.Update(instance)
+          changed.append((instance.name, idx, size))
+        if self._EnsureChildSizes(disk):
+          self.cfg.Update(instance)
+          changed.append((instance.name, idx, disk.size))
+    return changed
+
+
 class LURenameCluster(LogicalUnit):
   """Rename the cluster.
 
@@ -1387,7 +1521,7 @@ class LURenameCluster(LogicalUnit):
                         constants.SSH_KNOWN_HOSTS_FILE, to_node)
 
     finally:
-      result = self.rpc.call_node_start_master(master, False)
+      result = self.rpc.call_node_start_master(master, False, False)
       if result.failed or not result.data:
         self.LogWarning("Could not re-enable the master role on"
                         " the master, please restart manually.")
@@ -1398,7 +1532,7 @@ def _RecursiveCheckIfLVMBased(disk):
 
   @type disk: L{objects.Disk}
   @param disk: the disk to check
-  @rtype: booleean
+  @rtype: boolean
   @return: boolean indicating whether a LD_LV dev_type was found or not
 
   """
@@ -1504,6 +1638,13 @@ class LUSetClusterParams(LogicalUnit):
 
     if self.op.enabled_hypervisors is not None:
       self.hv_list = self.op.enabled_hypervisors
+      if not self.hv_list:
+        raise errors.OpPrereqError("Enabled hypervisors list must contain at"
+                                   " least one member")
+      invalid_hvs = set(self.hv_list) - constants.HYPER_TYPES
+      if invalid_hvs:
+        raise errors.OpPrereqError("Enabled hypervisors contains invalid"
+                                   " entries: %s" % invalid_hvs)
     else:
       self.hv_list = cluster.enabled_hypervisors
 
@@ -1524,8 +1665,11 @@ class LUSetClusterParams(LogicalUnit):
 
     """
     if self.op.vg_name is not None:
-      if self.op.vg_name != self.cfg.GetVGName():
-        self.cfg.SetVGName(self.op.vg_name)
+      new_volume = self.op.vg_name
+      if not new_volume:
+        new_volume = None
+      if new_volume != self.cfg.GetVGName():
+        self.cfg.SetVGName(new_volume)
       else:
         feedback_fn("Cluster LVM configuration already in desired"
                     " state, not changing")
@@ -1537,14 +1681,11 @@ class LUSetClusterParams(LogicalUnit):
       self.cluster.beparams[constants.BEGR_DEFAULT] = self.new_beparams
     if self.op.candidate_pool_size is not None:
       self.cluster.candidate_pool_size = self.op.candidate_pool_size
+      # we need to update the pool size here, otherwise the save will fail
+      _AdjustCandidatePool(self)
 
     self.cfg.Update(self.cluster)
 
-    # we want to update nodes after the cluster so that if any errors
-    # happen, we have recorded and saved the cluster info
-    if self.op.candidate_pool_size is not None:
-      _AdjustCandidatePool(self)
-
 
 class LURedistributeConfig(NoHooksLU):
   """Force the redistribution of cluster configuration.
@@ -1589,6 +1730,7 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False):
     lu.cfg.SetDiskID(dev, node)
 
   retries = 0
+  degr_retries = 10 # in seconds, as we sleep 1 second each time
   while True:
     max_time = 0
     done = True
@@ -1621,6 +1763,16 @@ def _WaitForSync(lu, instance, oneshot=False, unlock=False):
           rem_time = "no time estimate"
         lu.proc.LogInfo("- device %s: %5.2f%% done, %s" %
                         (instance.disks[i].iv_name, perc_done, rem_time))
+
+    # if we're done but degraded, let's do a few small retries, to
+    # make sure we see a stable and not transient situation; therefore
+    # we force restart of the loop
+    if (done or oneshot) and cumul_degraded and degr_retries > 0:
+      logging.info("Degraded disks found, %d retries left", degr_retries)
+      degr_retries -= 1
+      time.sleep(1)
+      continue
+
     if done or oneshot:
       break
 
@@ -1788,7 +1940,7 @@ class LURemoveNode(LogicalUnit):
      - it does not have primary or secondary instances
      - it's not the master
 
-    Any errors are signalled by raising errors.OpPrereqError.
+    Any errors are signaled by raising errors.OpPrereqError.
 
     """
     node = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.node_name))
@@ -1848,6 +2000,7 @@ class LUQueryNodes(NoHooksLU):
     "master",
     "offline",
     "drained",
+    "role",
     )
 
   def ExpandNames(self):
@@ -1930,10 +2083,9 @@ class LUQueryNodes(NoHooksLU):
     inst_fields = frozenset(("pinst_cnt", "pinst_list",
                              "sinst_cnt", "sinst_list"))
     if inst_fields & frozenset(self.op.output_fields):
-      instancelist = self.cfg.GetInstanceList()
+      inst_data = self.cfg.GetAllInstancesInfo()
 
-      for instance_name in instancelist:
-        inst = self.cfg.GetInstanceInfo(instance_name)
+      for instance_name, inst in inst_data.items():
         if inst.primary_node in node_to_primary:
           node_to_primary[inst.primary_node].add(inst.name)
         for secnode in inst.secondary_nodes:
@@ -1976,6 +2128,17 @@ class LUQueryNodes(NoHooksLU):
           val = node.drained
         elif self._FIELDS_DYNAMIC.Matches(field):
           val = live_data[node.name].get(field, None)
+        elif field == "role":
+          if node.name == master_node:
+            val = "M"
+          elif node.master_candidate:
+            val = "C"
+          elif node.drained:
+            val = "D"
+          elif node.offline:
+            val = "O"
+          else:
+            val = "R"
         else:
           raise errors.ParameterError(field)
         node_output.append(val)
@@ -2097,7 +2260,7 @@ class LUAddNode(LogicalUnit):
      - it is resolvable
      - its parameters (single/dual homed) matches the cluster
 
-    Any errors are signalled by raising errors.OpPrereqError.
+    Any errors are signaled by raising errors.OpPrereqError.
 
     """
     node_name = self.op.node_name
@@ -2151,7 +2314,7 @@ class LUAddNode(LogicalUnit):
         raise errors.OpPrereqError("The master has a private ip but the"
                                    " new node doesn't have one")
 
-    # checks reachablity
+    # checks reachability
     if not utils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
       raise errors.OpPrereqError("Node not reachable by ping")
 
@@ -2163,14 +2326,24 @@ class LUAddNode(LogicalUnit):
                                    " based ping to noded port")
 
     cp_size = self.cfg.GetClusterInfo().candidate_pool_size
-    mc_now, _ = self.cfg.GetMasterCandidateStats()
-    master_candidate = mc_now < cp_size
+    if self.op.readd:
+      exceptions = [node]
+    else:
+      exceptions = []
+    mc_now, mc_max = self.cfg.GetMasterCandidateStats(exceptions)
+    # the new node will increase mc_max with one, so:
+    mc_max = min(mc_max + 1, cp_size)
+    self.master_candidate = mc_now < mc_max
 
-    self.new_node = objects.Node(name=node,
-                                 primary_ip=primary_ip,
-                                 secondary_ip=secondary_ip,
-                                 master_candidate=master_candidate,
-                                 offline=False, drained=False)
+    if self.op.readd:
+      self.new_node = self.cfg.GetNodeInfo(node)
+      assert self.new_node is not None, "Can't retrieve locked node %s" % node
+    else:
+      self.new_node = objects.Node(name=node,
+                                   primary_ip=primary_ip,
+                                   secondary_ip=secondary_ip,
+                                   master_candidate=self.master_candidate,
+                                   offline=False, drained=False)
 
   def Exec(self, feedback_fn):
     """Adds the new node to the cluster.
@@ -2179,6 +2352,20 @@ class LUAddNode(LogicalUnit):
     new_node = self.new_node
     node = new_node.name
 
+    # for re-adds, reset the offline/drained/master-candidate flags;
+    # we need to reset here, otherwise offline would prevent RPC calls
+    # later in the procedure; this also means that if the re-add
+    # fails, we are left with a non-offlined, broken node
+    if self.op.readd:
+      new_node.drained = new_node.offline = False
+      self.LogInfo("Readding a node, the offline/drained flags were reset")
+      # if we demote the node, we do cleanup later in the procedure
+      new_node.master_candidate = self.master_candidate
+
+    # notify the user about any possible mc promotion
+    if new_node.master_candidate:
+      self.LogInfo("Node will be a master candidate")
+
     # check connectivity
     result = self.rpc.call_version([node])[node]
     result.Raise()
@@ -2218,7 +2405,8 @@ class LUAddNode(LogicalUnit):
                                " new node: %s" % msg)
 
     # Add node to our /etc/hosts, and add key to known_hosts
-    utils.AddHostToEtcHosts(new_node.name)
+    if self.cfg.GetClusterInfo().modify_etc_hosts:
+      utils.AddHostToEtcHosts(new_node.name)
 
     if new_node.secondary_ip != new_node.primary_ip:
       result = self.rpc.call_node_has_ip_address(new_node.name,
@@ -2242,7 +2430,8 @@ class LUAddNode(LogicalUnit):
                                  " for remote verification" % verifier)
       if result[verifier].data['nodelist']:
         for failed in result[verifier].data['nodelist']:
-          feedback_fn("ssh/hostname verification failed %s -> %s" %
+          feedback_fn("ssh/hostname verification failed"
+                      " (checking from %s): %s" %
                       (verifier, result[verifier].data['nodelist'][failed]))
         raise errors.OpExecError("ssh/hostname verification failed.")
 
@@ -2274,6 +2463,15 @@ class LUAddNode(LogicalUnit):
 
     if self.op.readd:
       self.context.ReaddNode(new_node)
+      # make sure we redistribute the config
+      self.cfg.Update(new_node)
+      # and make sure the new node will not have old files around
+      if not new_node.master_candidate:
+        result = self.rpc.call_node_demote_from_mc(new_node.name)
+        msg = result.RemoteFailMsg()
+        if msg:
+          self.LogWarning("Node failed to demote itself from master"
+                          " candidate status: %s" % msg)
     else:
       self.context.AddNode(new_node)
 
@@ -2329,12 +2527,16 @@ class LUSetNodeParams(LogicalUnit):
     """
     node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
 
+    if (self.op.master_candidate is not None or
+        self.op.drained is not None or
+        self.op.offline is not None):
+      # we can't change the master's node flags
+      if self.op.node_name == self.cfg.GetMasterNode():
+        raise errors.OpPrereqError("The master role can be changed"
+                                   " only via masterfailover")
+
     if ((self.op.master_candidate == False or self.op.offline == True or
          self.op.drained == True) and node.master_candidate):
-      # we will demote the node from master_candidate
-      if self.op.node_name == self.cfg.GetMasterNode():
-        raise errors.OpPrereqError("The master node has to be a"
-                                   " master candidate, online and not drained")
       cp_size = self.cfg.GetClusterInfo().candidate_pool_size
       num_candidates, _ = self.cfg.GetMasterCandidateStats()
       if num_candidates <= cp_size:
@@ -2392,6 +2594,10 @@ class LUSetNodeParams(LogicalUnit):
           node.master_candidate = False
           changed_mc = True
           result.append(("master_candidate", "auto-demotion due to drain"))
+          rrc = self.rpc.call_node_demote_from_mc(node.name)
+          msg = rrc.RemoteFailMsg()
+          if msg:
+            self.LogWarning("Node failed to demote itself: %s" % msg)
         if node.offline:
           node.offline = False
           result.append(("offline", "clear offline status due to drain"))
@@ -2437,10 +2643,15 @@ class LUQueryClusterInfo(NoHooksLU):
       "master": cluster.master_node,
       "default_hypervisor": cluster.default_hypervisor,
       "enabled_hypervisors": cluster.enabled_hypervisors,
-      "hvparams": dict([(hypervisor, cluster.hvparams[hypervisor])
-                        for hypervisor in cluster.enabled_hypervisors]),
+      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
+                        for hypervisor_name in cluster.enabled_hypervisors]),
       "beparams": cluster.beparams,
       "candidate_pool_size": cluster.candidate_pool_size,
+      "default_bridge": cluster.default_bridge,
+      "master_netdev": cluster.master_netdev,
+      "volume_group_name": cluster.volume_group_name,
+      "file_storage_dir": cluster.file_storage_dir,
+      "tags": list(cluster.GetTags()),
       }
 
     return result
@@ -2512,19 +2723,24 @@ class LUActivateInstanceDisks(NoHooksLU):
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
     _CheckNodeOnline(self, self.instance.primary_node)
+    if not hasattr(self.op, "ignore_size"):
+      self.op.ignore_size = False
 
   def Exec(self, feedback_fn):
     """Activate the disks.
 
     """
-    disks_ok, disks_info = _AssembleInstanceDisks(self, self.instance)
+    disks_ok, disks_info = \
+              _AssembleInstanceDisks(self, self.instance,
+                                     ignore_size=self.op.ignore_size)
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block devices")
 
     return disks_info
 
 
-def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
+def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False,
+                           ignore_size=False):
   """Prepare the block devices for an instance.
 
   This sets up the block devices on all nodes.
@@ -2536,6 +2752,10 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
   @type ignore_secondaries: boolean
   @param ignore_secondaries: if true, errors on secondary nodes
       won't result in an error return from the function
+  @type ignore_size: boolean
+  @param ignore_size: if true, the current known size of the disk
+      will not be used during the disk activation, useful for cases
+      when the size is wrong
   @return: False if the operation failed, otherwise a list of
       (host, instance_visible_name, node_visible_name)
       with the mapping from node devices to instance devices
@@ -2556,6 +2776,9 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
   # 1st pass, assemble on all nodes in secondary mode
   for inst_disk in instance.disks:
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
+      if ignore_size:
+        node_disk = node_disk.Copy()
+        node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, False)
       msg = result.RemoteFailMsg()
@@ -2573,6 +2796,9 @@ def _AssembleInstanceDisks(lu, instance, ignore_secondaries=False):
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
       if node != instance.primary_node:
         continue
+      if ignore_size:
+        node_disk = node_disk.Copy()
+        node_disk.UnsetSize()
       lu.cfg.SetDiskID(node_disk, node)
       result = lu.rpc.call_blockdev_assemble(node, node_disk, iname, True)
       msg = result.RemoteFailMsg()
@@ -2597,7 +2823,7 @@ def _StartInstanceDisks(lu, instance, force):
   """Start the disks of an instance.
 
   """
-  disks_ok, dummy = _AssembleInstanceDisks(lu, instance,
+  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
                                            ignore_secondaries=force)
   if not disks_ok:
     _ShutdownInstanceDisks(lu, instance)
@@ -2755,15 +2981,48 @@ class LUStartupInstance(LogicalUnit):
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
 
+    # extra beparams
+    self.beparams = getattr(self.op, "beparams", {})
+    if self.beparams:
+      if not isinstance(self.beparams, dict):
+        raise errors.OpPrereqError("Invalid beparams passed: %s, expected"
+                                   " dict" % (type(self.beparams), ))
+      # fill the beparams dict
+      utils.ForceDictType(self.beparams, constants.BES_PARAMETER_TYPES)
+      self.op.beparams = self.beparams
+
+    # extra hvparams
+    self.hvparams = getattr(self.op, "hvparams", {})
+    if self.hvparams:
+      if not isinstance(self.hvparams, dict):
+        raise errors.OpPrereqError("Invalid hvparams passed: %s, expected"
+                                   " dict" % (type(self.hvparams), ))
+
+      # check hypervisor parameter syntax (locally)
+      cluster = self.cfg.GetClusterInfo()
+      utils.ForceDictType(self.hvparams, constants.HVS_PARAMETER_TYPES)
+      filled_hvp = cluster.FillDict(cluster.hvparams[instance.hypervisor],
+                                    instance.hvparams)
+      filled_hvp.update(self.hvparams)
+      hv_type = hypervisor.GetHypervisor(instance.hypervisor)
+      hv_type.CheckParameterSyntax(filled_hvp)
+      _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
+      self.op.hvparams = self.hvparams
+
     _CheckNodeOnline(self, instance.primary_node)
 
     bep = self.cfg.GetClusterInfo().FillBE(instance)
-    # check bridges existance
+    # check bridges existence
     _CheckInstanceBridgesExist(self, instance)
 
-    _CheckNodeFreeMemory(self, instance.primary_node,
-                         "starting instance %s" % instance.name,
-                         bep[constants.BE_MEMORY], instance.hypervisor)
+    remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                              instance.name,
+                                              instance.hypervisor)
+    remote_info.Raise()
+    if not remote_info.data:
+      _CheckNodeFreeMemory(self, instance.primary_node,
+                           "starting instance %s" % instance.name,
+                           bep[constants.BE_MEMORY], instance.hypervisor)
 
   def Exec(self, feedback_fn):
     """Start the instance.
@@ -2778,7 +3037,8 @@ class LUStartupInstance(LogicalUnit):
 
     _StartInstanceDisks(self, instance, force)
 
-    result = self.rpc.call_instance_start(node_current, instance)
+    result = self.rpc.call_instance_start(node_current, instance,
+                                          self.hvparams, self.beparams)
     msg = result.RemoteFailMsg()
     if msg:
       _ShutdownInstanceDisks(self, instance)
@@ -2830,7 +3090,7 @@ class LURebootInstance(LogicalUnit):
 
     _CheckNodeOnline(self, instance.primary_node)
 
-    # check bridges existance
+    # check bridges existence
     _CheckInstanceBridgesExist(self, instance)
 
   def Exec(self, feedback_fn):
@@ -2860,7 +3120,7 @@ class LURebootInstance(LogicalUnit):
                                  " full reboot: %s" % msg)
       _ShutdownInstanceDisks(self, instance)
       _StartInstanceDisks(self, instance, ignore_secondaries)
-      result = self.rpc.call_instance_start(node_current, instance)
+      result = self.rpc.call_instance_start(node_current, instance, None, None)
       msg = result.RemoteFailMsg()
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -2960,7 +3220,8 @@ class LUReinstallInstance(LogicalUnit):
     remote_info = self.rpc.call_instance_info(instance.primary_node,
                                               instance.name,
                                               instance.hypervisor)
-    if remote_info.failed or remote_info.data:
+    remote_info.Raise()
+    if remote_info.data:
       raise errors.OpPrereqError("Instance '%s' is running on the node %s" %
                                  (self.op.instance_name,
                                   instance.primary_node))
@@ -3341,14 +3602,25 @@ class LUQueryInstances(NoHooksLU):
             val = live_data[instance.name].get("memory", "?")
           else:
             val = "-"
+        elif field == "vcpus":
+          val = i_be[constants.BE_VCPUS]
         elif field == "disk_template":
           val = instance.disk_template
         elif field == "ip":
-          val = instance.nics[0].ip
+          if instance.nics:
+            val = instance.nics[0].ip
+          else:
+            val = None
         elif field == "bridge":
-          val = instance.nics[0].bridge
+          if instance.nics:
+            val = instance.nics[0].bridge
+          else:
+            val = None
         elif field == "mac":
-          val = instance.nics[0].mac
+          if instance.nics:
+            val = instance.nics[0].mac
+          else:
+            val = None
         elif field == "sda_size" or field == "sdb_size":
           idx = ord(field[2]) - ord('a')
           try:
@@ -3415,9 +3687,10 @@ class LUQueryInstances(NoHooksLU):
                 else:
                   assert False, "Unhandled NIC parameter"
           else:
-            assert False, "Unhandled variable parameter"
+            assert False, ("Declared but unhandled variable parameter '%s'" %
+                           field)
         else:
-          raise errors.ParameterError(field)
+          assert False, "Declared but unhandled parameter '%s'" % field
         iout.append(val)
       output.append(iout)
 
@@ -3478,12 +3751,17 @@ class LUFailoverInstance(LogicalUnit):
     target_node = secondary_nodes[0]
     _CheckNodeOnline(self, target_node)
     _CheckNodeNotDrained(self, target_node)
-    # check memory requirements on the secondary node
-    _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
-                         instance.name, bep[constants.BE_MEMORY],
-                         instance.hypervisor)
 
-    # check bridge existance
+    if instance.admin_up:
+      # check memory requirements on the secondary node
+      _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
+                           instance.name, bep[constants.BE_MEMORY],
+                           instance.hypervisor)
+    else:
+      self.LogInfo("Not checking memory on the secondary node as"
+                   " instance will not be started")
+
+    # check bridge existence
     brlist = [nic.bridge for nic in instance.nics]
     result = self.rpc.call_bridges_exist(target_node, brlist)
     result.Raise()
@@ -3543,14 +3821,14 @@ class LUFailoverInstance(LogicalUnit):
       logging.info("Starting instance %s on node %s",
                    instance.name, target_node)
 
-      disks_ok, dummy = _AssembleInstanceDisks(self, instance,
+      disks_ok, _ = _AssembleInstanceDisks(self, instance,
                                                ignore_secondaries=True)
       if not disks_ok:
         _ShutdownInstanceDisks(self, instance)
         raise errors.OpExecError("Can't activate the instance's disks")
 
       feedback_fn("* starting the instance on the target node")
-      result = self.rpc.call_instance_start(target_node, instance)
+      result = self.rpc.call_instance_start(target_node, instance, None, None)
       msg = result.RemoteFailMsg()
       if msg:
         _ShutdownInstanceDisks(self, instance)
@@ -3621,7 +3899,7 @@ class LUMigrateInstance(LogicalUnit):
                          instance.name, i_be[constants.BE_MEMORY],
                          instance.hypervisor)
 
-    # check bridge existance
+    # check bridge existence
     brlist = [nic.bridge for nic in instance.nics]
     result = self.rpc.call_bridges_exist(target_node, brlist)
     if result.failed or not result.data:
@@ -4057,7 +4335,7 @@ def _GenerateDiskTemplate(lu, template_name,
     if len(secondary_nodes) != 0:
       raise errors.ProgrammerError("Wrong template configuration")
 
-    names = _GenerateUniqueNames(lu, [".disk%d" % i
+    names = _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
                                       for i in range(disk_count)])
     for idx, disk in enumerate(disk_info):
       disk_index = idx + base_index
@@ -4074,7 +4352,7 @@ def _GenerateDiskTemplate(lu, template_name,
       [primary_node, remote_node] * len(disk_info), instance_name)
 
     names = []
-    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % i
+    for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
                                                for i in range(disk_count)]):
       names.append(lv_prefix + "_data")
       names.append(lv_prefix + "_meta")
@@ -4300,6 +4578,7 @@ class LUCreateInstance(LogicalUnit):
                                   self.op.hvparams)
     hv_type = hypervisor.GetHypervisor(self.op.hypervisor)
     hv_type.CheckParameterSyntax(filled_hvp)
+    self.hv_full = filled_hvp
 
     # fill and remember the beparams dict
     utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
@@ -4341,6 +4620,12 @@ class LUCreateInstance(LogicalUnit):
         if not utils.IsValidMac(mac.lower()):
           raise errors.OpPrereqError("Invalid MAC address specified: %s" %
                                      mac)
+        else:
+          # or validate/reserve the current one
+          if self.cfg.IsMacInUse(mac):
+            raise errors.OpPrereqError("MAC address %s already in use"
+                                       " in cluster" % mac)
+
       # bridge verification
       bridge = nic.get("bridge", None)
       if bridge is None:
@@ -4477,6 +4762,9 @@ class LUCreateInstance(LogicalUnit):
       nics=[(n.ip, n.bridge, n.mac) for n in self.nics],
       disk_template=self.op.disk_template,
       disks=[(d["size"], d["mode"]) for d in self.disks],
+      bep=self.be_full,
+      hvp=self.hv_full,
+      hypervisor_name=self.op.hypervisor,
     ))
 
     nl = ([self.cfg.GetMasterNode(), self.op.pnode] +
@@ -4643,7 +4931,7 @@ class LUCreateInstance(LogicalUnit):
     # os verification
     result = self.rpc.call_os_get(pnode.name, self.op.os_type)
     result.Raise()
-    if not isinstance(result.data, objects.OS):
+    if not isinstance(result.data, objects.OS) or not result.data:
       raise errors.OpPrereqError("OS '%s' not in supported os list for"
                                  " primary node"  % self.op.os_type)
 
@@ -4794,7 +5082,7 @@ class LUCreateInstance(LogicalUnit):
       self.cfg.Update(iobj)
       logging.info("Starting instance %s on node %s", instance, pnode_name)
       feedback_fn("* starting instance...")
-      result = self.rpc.call_instance_start(pnode_name, iobj)
+      result = self.rpc.call_instance_start(pnode_name, iobj, None, None)
       msg = result.RemoteFailMsg()
       if msg:
         raise errors.OpExecError("Could not start instance: %s" % msg)
@@ -5295,7 +5583,6 @@ class LUReplaceDisks(LogicalUnit):
     logging.debug("Allocated minors %s" % (minors,))
     self.proc.LogStep(4, steps_total, "changing drbd configuration")
     for idx, (dev, new_minor) in enumerate(zip(instance.disks, minors)):
-      size = dev.size
       info("activating a new drbd on %s for disk/%d" % (new_node, idx))
       # create new devices on new_node; note that we create two IDs:
       # one without port, so the drbd will be activated without
@@ -5315,7 +5602,8 @@ class LUReplaceDisks(LogicalUnit):
                     new_net_id)
       new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
                               logical_id=new_alone_id,
-                              children=dev.children)
+                              children=dev.children,
+                              size=dev.size)
       try:
         _CreateSingleBlockDev(self, new_node, instance, new_drbd,
                               _GetInstanceInfoText(instance), False)
@@ -5615,6 +5903,7 @@ class LUQueryInstanceData(NoHooksLU):
       "sstatus": dev_sstatus,
       "children": dev_children,
       "mode": dev.mode,
+      "size": dev.size,
       }
 
     return data
@@ -5828,7 +6117,7 @@ class LUSetInstanceParams(LogicalUnit):
     This only checks the instance list against the existing names.
 
     """
-    force = self.force = self.op.force
+    self.force = self.op.force
 
     # checking the new params on the primary/secondary nodes
 
@@ -5898,7 +6187,7 @@ class LUSetInstanceParams(LogicalUnit):
         self.warn.append("Can't get info from primary node %s" % pnode)
       else:
         if not instance_info.failed and instance_info.data:
-          current_mem = instance_info.data['memory']
+          current_mem = int(instance_info.data['memory'])
         else:
           # Assume instance not running
           # (there is a slight race condition here, but it's not very probable,
@@ -6151,7 +6440,7 @@ class LUExportInstance(LogicalUnit):
     # remove it from its current node. In the future we could fix this by:
     #  - making a tasklet to search (share-lock all), then create the new one,
     #    then one to remove, after
-    #  - removing the removal operation altoghether
+    #  - removing the removal operation altogether
     self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
 
   def DeclareLocks(self, level):
@@ -6225,13 +6514,15 @@ class LUExportInstance(LogicalUnit):
     for disk in instance.disks:
       self.cfg.SetDiskID(disk, src_node)
 
+    # per-disk results
+    dresults = []
     try:
-      for disk in instance.disks:
+      for idx, disk in enumerate(instance.disks):
         # new_dev_name will be a snapshot of an lvm leaf of the one we passed
         new_dev_name = self.rpc.call_blockdev_snapshot(src_node, disk)
         if new_dev_name.failed or not new_dev_name.data:
-          self.LogWarning("Could not snapshot block device %s on node %s",
-                          disk.logical_id[1], src_node)
+          self.LogWarning("Could not snapshot disk/%d on node %s",
+                          idx, src_node)
           snap_disks.append(False)
         else:
           new_dev = objects.Disk(dev_type=constants.LD_LV, size=disk.size,
@@ -6242,7 +6533,7 @@ class LUExportInstance(LogicalUnit):
 
     finally:
       if self.op.shutdown and instance.admin_up:
-        result = self.rpc.call_instance_start(src_node, instance)
+        result = self.rpc.call_instance_start(src_node, instance, None, None)
         msg = result.RemoteFailMsg()
         if msg:
           _ShutdownInstanceDisks(self, instance)
@@ -6256,18 +6547,24 @@ class LUExportInstance(LogicalUnit):
         result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
                                                instance, cluster_name, idx)
         if result.failed or not result.data:
-          self.LogWarning("Could not export block device %s from node %s to"
-                          " node %s", dev.logical_id[1], src_node,
-                          dst_node.name)
+          self.LogWarning("Could not export disk/%d from node %s to"
+                          " node %s", idx, src_node, dst_node.name)
+          dresults.append(False)
+        else:
+          dresults.append(True)
         msg = self.rpc.call_blockdev_remove(src_node, dev).RemoteFailMsg()
         if msg:
-          self.LogWarning("Could not remove snapshot block device %s from node"
-                          " %s: %s", dev.logical_id[1], src_node, msg)
+          self.LogWarning("Could not remove snapshot for disk/%d from node"
+                          " %s: %s", idx, src_node, msg)
+      else:
+        dresults.append(False)
 
     result = self.rpc.call_finalize_export(dst_node.name, instance, snap_disks)
+    fin_resu = True
     if result.failed or not result.data:
       self.LogWarning("Could not finalize export for instance %s on node %s",
                       instance.name, dst_node.name)
+      fin_resu = False
 
     nodelist = self.cfg.GetNodeList()
     nodelist.remove(dst_node.name)
@@ -6284,6 +6581,7 @@ class LUExportInstance(LogicalUnit):
           if not self.rpc.call_export_remove(node, instance.name):
             self.LogWarning("Could not remove older export for instance %s"
                             " on node %s", instance.name, node)
+    return fin_resu, dresults
 
 
 class LURemoveExport(NoHooksLU):
@@ -6648,7 +6946,7 @@ class IAllocator(object):
         "master_candidate": ninfo.master_candidate,
         }
 
-      if not ninfo.offline:
+      if not (ninfo.offline or ninfo.drained):
         nresult.Raise()
         if not isinstance(nresult.data, dict):
           raise errors.OpExecError("Can't get data for node %s" % nname)
@@ -6711,6 +7009,8 @@ class IAllocator(object):
         "disk_template": iinfo.disk_template,
         "hypervisor": iinfo.hypervisor,
         }
+      pir["disk_space_total"] = _ComputeDiskSize(iinfo.disk_template,
+                                                 pir["disks"])
       instance_data[iinfo.name] = pir
 
     data["instances"] = instance_data
@@ -6803,7 +7103,6 @@ class IAllocator(object):
     """
     if call_fn is None:
       call_fn = self.lu.rpc.call_iallocator_runner
-    data = self.in_text
 
     result = call_fn(self.lu.cfg.GetMasterNode(), name, self.in_text)
     result.Raise()