Parallelize Tag operations
[ganeti-local] / lib / cmdlib.py
index c241dad..6dbaa92 100644 (file)
@@ -37,11 +37,10 @@ from ganeti import logger
 from ganeti import utils
 from ganeti import errors
 from ganeti import hypervisor
-from ganeti import config
+from ganeti import locking
 from ganeti import constants
 from ganeti import objects
 from ganeti import opcodes
-from ganeti import ssconf
 from ganeti import serializer
 
 
@@ -49,22 +48,27 @@ class LogicalUnit(object):
   """Logical Unit base class.
 
   Subclasses must follow these rules:
-    - implement CheckPrereq which also fills in the opcode instance
-      with all the fields (even if as None)
+    - implement ExpandNames
+    - implement CheckPrereq
     - implement Exec
     - implement BuildHooksEnv
     - redefine HPATH and HTYPE
-    - optionally redefine their run requirements (REQ_CLUSTER,
-      REQ_MASTER); note that all commands require root permissions
+    - optionally redefine their run requirements:
+        REQ_MASTER: the LU needs to run on the master node
+        REQ_WSSTORE: the LU needs a writable SimpleStore
+        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
+
+  Note that all commands require root permissions.
 
   """
   HPATH = None
   HTYPE = None
   _OP_REQP = []
-  REQ_CLUSTER = True
   REQ_MASTER = True
+  REQ_WSSTORE = False
+  REQ_BGL = True
 
-  def __init__(self, processor, op, cfg, sstore):
+  def __init__(self, processor, op, context, sstore):
     """Constructor for LogicalUnit.
 
     This needs to be overriden in derived classes in order to check op
@@ -73,8 +77,17 @@ class LogicalUnit(object):
     """
     self.proc = processor
     self.op = op
-    self.cfg = cfg
+    self.cfg = context.cfg
     self.sstore = sstore
+    self.context = context
+    # Dicts used to declare locking needs to mcpu
+    self.needed_locks = None
+    self.acquired_locks = {}
+    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
+    self.add_locks = {}
+    self.remove_locks = {}
+    # Used to force good behavior when calling helper functions
+    self.recalculate_locks = {}
     self.__ssh = None
 
     for attr_name in self._OP_REQP:
@@ -82,15 +95,15 @@ class LogicalUnit(object):
       if attr_val is None:
         raise errors.OpPrereqError("Required parameter '%s' missing" %
                                    attr_name)
-    if self.REQ_CLUSTER:
-      if not cfg.IsCluster():
-        raise errors.OpPrereqError("Cluster not initialized yet,"
-                                   " use 'gnt-cluster init' first.")
-      if self.REQ_MASTER:
-        master = sstore.GetMasterNode()
-        if master != utils.HostInfo().name:
-          raise errors.OpPrereqError("Commands must be run on the master"
-                                     " node %s" % master)
+
+    if not self.cfg.IsCluster():
+      raise errors.OpPrereqError("Cluster not initialized yet,"
+                                 " use 'gnt-cluster init' first.")
+    if self.REQ_MASTER:
+      master = sstore.GetMasterNode()
+      if master != utils.HostInfo().name:
+        raise errors.OpPrereqError("Commands must be run on the master"
+                                   " node %s" % master)
 
   def __GetSSH(self):
     """Returns the SshRunner object
@@ -102,6 +115,66 @@ class LogicalUnit(object):
 
   ssh = property(fget=__GetSSH)
 
+  def ExpandNames(self):
+    """Expand names for this LU.
+
+    This method is called before starting to execute the opcode, and it should
+    update all the parameters of the opcode to their canonical form (e.g. a
+    short node name must be fully expanded after this method has successfully
+    completed). This way locking, hooks, logging, ecc. can work correctly.
+
+    LUs which implement this method must also populate the self.needed_locks
+    member, as a dict with lock levels as keys, and a list of needed lock names
+    as values. Rules:
+      - Use an empty dict if you don't need any lock
+      - If you don't need any lock at a particular level omit that level
+      - Don't put anything for the BGL level
+      - If you want all locks at a level use locking.ALL_SET as a value
+
+    If you need to share locks (rather than acquire them exclusively) at one
+    level you can modify self.share_locks, setting a true value (usually 1) for
+    that level. By default locks are not shared.
+
+    Examples:
+    # Acquire all nodes and one instance
+    self.needed_locks = {
+      locking.LEVEL_NODE: locking.ALL_SET,
+      locking.LEVEL_INSTANCE: ['instance1.example.tld'],
+    }
+    # Acquire just two nodes
+    self.needed_locks = {
+      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
+    }
+    # Acquire no locks
+    self.needed_locks = {} # No, you can't leave it to the default value None
+
+    """
+    # The implementation of this method is mandatory only if the new LU is
+    # concurrent, so that old LUs don't need to be changed all at the same
+    # time.
+    if self.REQ_BGL:
+      self.needed_locks = {} # Exclusive LUs don't need locks.
+    else:
+      raise NotImplementedError
+
+  def DeclareLocks(self, level):
+    """Declare LU locking needs for a level
+
+    While most LUs can just declare their locking needs at ExpandNames time,
+    sometimes there's the need to calculate some locks after having acquired
+    the ones before. This function is called just before acquiring locks at a
+    particular level, but after acquiring the ones at lower levels, and permits
+    such calculations. It can be used to modify self.needed_locks, and by
+    default it does nothing.
+
+    This function is only called if you have something already set in
+    self.needed_locks for the level.
+
+    @param level: Locking level which is going to be locked
+    @type level: member of ganeti.locking.LEVELS
+
+    """
+
   def CheckPrereq(self):
     """Check prerequisites for this LU.
 
@@ -114,9 +187,7 @@ class LogicalUnit(object):
     not fulfilled. Its return value is ignored.
 
     This method should also update all the parameters of the opcode to
-    their canonical form; e.g. a short node name must be fully
-    expanded after this method has successfully completed (so that
-    hooks, logging, etc. work correctly).
+    their canonical form if it hasn't been done by ExpandNames before.
 
     """
     raise NotImplementedError
@@ -171,33 +242,83 @@ class LogicalUnit(object):
     """
     return lu_result
 
+  def _ExpandAndLockInstance(self):
+    """Helper function to expand and lock an instance.
 
-class NoHooksLU(LogicalUnit):
-  """Simple LU which runs no hooks.
+    Many LUs that work on an instance take its name in self.op.instance_name
+    and need to expand it and then declare the expanded name for locking. This
+    function does it, and then updates self.op.instance_name to the expanded
+    name. It also initializes needed_locks as a dict, if this hasn't been done
+    before.
 
-  This LU is intended as a parent for other LogicalUnits which will
-  run no hooks, in order to reduce duplicate code.
+    """
+    if self.needed_locks is None:
+      self.needed_locks = {}
+    else:
+      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
+        "_ExpandAndLockInstance called with instance-level locks set"
+    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
+    if expanded_name is None:
+      raise errors.OpPrereqError("Instance '%s' not known" %
+                                  self.op.instance_name)
+    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
+    self.op.instance_name = expanded_name
 
-  """
-  HPATH = None
-  HTYPE = None
+  def _LockInstancesNodes(self, primary_only=False):
+    """Helper function to declare instances' nodes for locking.
 
+    This function should be called after locking one or more instances to lock
+    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
+    with all primary or secondary nodes for instances already locked and
+    present in self.needed_locks[locking.LEVEL_INSTANCE].
 
-def _AddHostToEtcHosts(hostname):
-  """Wrapper around utils.SetEtcHostsEntry.
+    It should be called from DeclareLocks, and for safety only works if
+    self.recalculate_locks[locking.LEVEL_NODE] is set.
 
-  """
-  hi = utils.HostInfo(name=hostname)
-  utils.SetEtcHostsEntry(constants.ETC_HOSTS, hi.ip, hi.name, [hi.ShortName()])
+    In the future it may grow parameters to just lock some instance's nodes, or
+    to just lock primaries or secondary nodes, if needed.
+
+    If should be called in DeclareLocks in a way similar to:
+
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
+
+    @type primary_only: boolean
+    @param primary_only: only lock primary nodes of locked instances
+
+    """
+    assert locking.LEVEL_NODE in self.recalculate_locks, \
+      "_LockInstancesNodes helper function called with no nodes to recalculate"
+
+    # TODO: check if we're really been called with the instance locks held
+
+    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
+    # future we might want to have different behaviors depending on the value
+    # of self.recalculate_locks[locking.LEVEL_NODE]
+    wanted_nodes = []
+    for instance_name in self.acquired_locks[locking.LEVEL_INSTANCE]:
+      instance = self.context.cfg.GetInstanceInfo(instance_name)
+      wanted_nodes.append(instance.primary_node)
+      if not primary_only:
+        wanted_nodes.extend(instance.secondary_nodes)
+
+    if self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_REPLACE:
+      self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
+    elif self.recalculate_locks[locking.LEVEL_NODE] == constants.LOCKS_APPEND:
+      self.needed_locks[locking.LEVEL_NODE].extend(wanted_nodes)
 
+    del self.recalculate_locks[locking.LEVEL_NODE]
 
-def _RemoveHostFromEtcHosts(hostname):
-  """Wrapper around utils.RemoveEtcHostsEntry.
+
+class NoHooksLU(LogicalUnit):
+  """Simple LU which runs no hooks.
+
+  This LU is intended as a parent for other LogicalUnits which will
+  run no hooks, in order to reduce duplicate code.
 
   """
-  hi = utils.HostInfo(name=hostname)
-  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.name)
-  utils.RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
+  HPATH = None
+  HTYPE = None
 
 
 def _GetWantedNodes(lu, nodes):
@@ -210,17 +331,17 @@ def _GetWantedNodes(lu, nodes):
   if not isinstance(nodes, list):
     raise errors.OpPrereqError("Invalid argument type 'nodes'")
 
-  if nodes:
-    wanted = []
+  if not nodes:
+    raise errors.ProgrammerError("_GetWantedNodes should only be called with a"
+      " non-empty list of nodes whose name is to be expanded.")
 
-    for name in nodes:
-      node = lu.cfg.ExpandNodeName(name)
-      if node is None:
-        raise errors.OpPrereqError("No such node name '%s'" % name)
-      wanted.append(node)
+  wanted = []
+  for name in nodes:
+    node = lu.cfg.ExpandNodeName(name)
+    if node is None:
+      raise errors.OpPrereqError("No such node name '%s'" % name)
+    wanted.append(node)
 
-  else:
-    wanted = lu.cfg.GetNodeList()
   return utils.NiceSort(wanted)
 
 
@@ -323,85 +444,6 @@ def _BuildInstanceHookEnvByObject(instance, override=None):
   return _BuildInstanceHookEnv(**args)
 
 
-def _HasValidVG(vglist, vgname):
-  """Checks if the volume group list is valid.
-
-  A non-None return value means there's an error, and the return value
-  is the error message.
-
-  """
-  vgsize = vglist.get(vgname, None)
-  if vgsize is None:
-    return "volume group '%s' missing" % vgname
-  elif vgsize < 20480:
-    return ("volume group '%s' too small (20480MiB required, %dMib found)" %
-            (vgname, vgsize))
-  return None
-
-
-def _InitSSHSetup(node):
-  """Setup the SSH configuration for the cluster.
-
-
-  This generates a dsa keypair for root, adds the pub key to the
-  permitted hosts and adds the hostkey to its own known hosts.
-
-  Args:
-    node: the name of this host as a fqdn
-
-  """
-  priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
-
-  for name in priv_key, pub_key:
-    if os.path.exists(name):
-      utils.CreateBackup(name)
-    utils.RemoveFile(name)
-
-  result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
-                         "-f", priv_key,
-                         "-q", "-N", ""])
-  if result.failed:
-    raise errors.OpExecError("Could not generate ssh keypair, error %s" %
-                             result.output)
-
-  f = open(pub_key, 'r')
-  try:
-    utils.AddAuthorizedKey(auth_keys, f.read(8192))
-  finally:
-    f.close()
-
-
-def _InitGanetiServerSetup(ss):
-  """Setup the necessary configuration for the initial node daemon.
-
-  This creates the nodepass file containing the shared password for
-  the cluster and also generates the SSL certificate.
-
-  """
-  # Create pseudo random password
-  randpass = sha.new(os.urandom(64)).hexdigest()
-  # and write it into sstore
-  ss.SetKey(ss.SS_NODED_PASS, randpass)
-
-  result = utils.RunCmd(["openssl", "req", "-new", "-newkey", "rsa:1024",
-                         "-days", str(365*5), "-nodes", "-x509",
-                         "-keyout", constants.SSL_CERT_FILE,
-                         "-out", constants.SSL_CERT_FILE, "-batch"])
-  if result.failed:
-    raise errors.OpExecError("could not generate server ssl cert, command"
-                             " %s had exitcode %s and error message %s" %
-                             (result.cmd, result.exit_code, result.output))
-
-  os.chmod(constants.SSL_CERT_FILE, 0400)
-
-  result = utils.RunCmd([constants.NODE_INITD_SCRIPT, "restart"])
-
-  if result.failed:
-    raise errors.OpExecError("Could not start the node daemon, command %s"
-                             " had exitcode %s and error %s" %
-                             (result.cmd, result.exit_code, result.output))
-
-
 def _CheckInstanceBridgesExist(instance):
   """Check that the brigdes needed by an instance exist.
 
@@ -414,160 +456,6 @@ def _CheckInstanceBridgesExist(instance):
                                (brlist, instance.primary_node))
 
 
-class LUInitCluster(LogicalUnit):
-  """Initialise the cluster.
-
-  """
-  HPATH = "cluster-init"
-  HTYPE = constants.HTYPE_CLUSTER
-  _OP_REQP = ["cluster_name", "hypervisor_type", "mac_prefix",
-              "def_bridge", "master_netdev", "file_storage_dir"]
-  REQ_CLUSTER = False
-
-  def BuildHooksEnv(self):
-    """Build hooks env.
-
-    Notes: Since we don't require a cluster, we must manually add
-    ourselves in the post-run node list.
-
-    """
-    env = {"OP_TARGET": self.op.cluster_name}
-    return env, [], [self.hostname.name]
-
-  def CheckPrereq(self):
-    """Verify that the passed name is a valid one.
-
-    """
-    if config.ConfigWriter.IsCluster():
-      raise errors.OpPrereqError("Cluster is already initialised")
-
-    if self.op.hypervisor_type == constants.HT_XEN_HVM31:
-      if not os.path.exists(constants.VNC_PASSWORD_FILE):
-        raise errors.OpPrereqError("Please prepare the cluster VNC"
-                                   "password file %s" %
-                                   constants.VNC_PASSWORD_FILE)
-
-    self.hostname = hostname = utils.HostInfo()
-
-    if hostname.ip.startswith("127."):
-      raise errors.OpPrereqError("This host's IP resolves to the private"
-                                 " range (%s). Please fix DNS or %s." %
-                                 (hostname.ip, constants.ETC_HOSTS))
-
-    if not utils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT,
-                         source=constants.LOCALHOST_IP_ADDRESS):
-      raise errors.OpPrereqError("Inconsistency: this host's name resolves"
-                                 " to %s,\nbut this ip address does not"
-                                 " belong to this host."
-                                 " Aborting." % hostname.ip)
-
-    self.clustername = clustername = utils.HostInfo(self.op.cluster_name)
-
-    if utils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT,
-                     timeout=5):
-      raise errors.OpPrereqError("Cluster IP already active. Aborting.")
-
-    secondary_ip = getattr(self.op, "secondary_ip", None)
-    if secondary_ip and not utils.IsValidIP(secondary_ip):
-      raise errors.OpPrereqError("Invalid secondary ip given")
-    if (secondary_ip and
-        secondary_ip != hostname.ip and
-        (not utils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
-                           source=constants.LOCALHOST_IP_ADDRESS))):
-      raise errors.OpPrereqError("You gave %s as secondary IP,"
-                                 " but it does not belong to this host." %
-                                 secondary_ip)
-    self.secondary_ip = secondary_ip
-
-    if not hasattr(self.op, "vg_name"):
-      self.op.vg_name = None
-    # if vg_name not None, checks if volume group is valid
-    if self.op.vg_name:
-      vgstatus = _HasValidVG(utils.ListVolumeGroups(), self.op.vg_name)
-      if vgstatus:
-        raise errors.OpPrereqError("Error: %s\nspecify --no-lvm-storage if"
-                                   " you are not using lvm" % vgstatus)
-
-    self.op.file_storage_dir = os.path.normpath(self.op.file_storage_dir)
-
-    if not os.path.isabs(self.op.file_storage_dir):
-      raise errors.OpPrereqError("The file storage directory you have is"
-                                 " not an absolute path.")
-
-    if not os.path.exists(self.op.file_storage_dir):
-      try:
-        os.makedirs(self.op.file_storage_dir, 0750)
-      except OSError, err:
-        raise errors.OpPrereqError("Cannot create file storage directory"
-                                   " '%s': %s" %
-                                   (self.op.file_storage_dir, err))
-
-    if not os.path.isdir(self.op.file_storage_dir):
-      raise errors.OpPrereqError("The file storage directory '%s' is not"
-                                 " a directory." % self.op.file_storage_dir)
-
-    if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$",
-                    self.op.mac_prefix):
-      raise errors.OpPrereqError("Invalid mac prefix given '%s'" %
-                                 self.op.mac_prefix)
-
-    if self.op.hypervisor_type not in constants.HYPER_TYPES:
-      raise errors.OpPrereqError("Invalid hypervisor type given '%s'" %
-                                 self.op.hypervisor_type)
-
-    result = utils.RunCmd(["ip", "link", "show", "dev", self.op.master_netdev])
-    if result.failed:
-      raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
-                                 (self.op.master_netdev,
-                                  result.output.strip()))
-
-    if not (os.path.isfile(constants.NODE_INITD_SCRIPT) and
-            os.access(constants.NODE_INITD_SCRIPT, os.X_OK)):
-      raise errors.OpPrereqError("Init.d script '%s' missing or not"
-                                 " executable." % constants.NODE_INITD_SCRIPT)
-
-  def Exec(self, feedback_fn):
-    """Initialize the cluster.
-
-    """
-    clustername = self.clustername
-    hostname = self.hostname
-
-    # set up the simple store
-    self.sstore = ss = ssconf.SimpleStore()
-    ss.SetKey(ss.SS_HYPERVISOR, self.op.hypervisor_type)
-    ss.SetKey(ss.SS_MASTER_NODE, hostname.name)
-    ss.SetKey(ss.SS_MASTER_IP, clustername.ip)
-    ss.SetKey(ss.SS_MASTER_NETDEV, self.op.master_netdev)
-    ss.SetKey(ss.SS_CLUSTER_NAME, clustername.name)
-    ss.SetKey(ss.SS_FILE_STORAGE_DIR, self.op.file_storage_dir)
-
-    # set up the inter-node password and certificate
-    _InitGanetiServerSetup(ss)
-
-    # start the master ip
-    rpc.call_node_start_master(hostname.name)
-
-    # set up ssh config and /etc/hosts
-    f = open(constants.SSH_HOST_RSA_PUB, 'r')
-    try:
-      sshline = f.read()
-    finally:
-      f.close()
-    sshkey = sshline.split(" ")[1]
-
-    _AddHostToEtcHosts(hostname.name)
-    _InitSSHSetup(hostname.name)
-
-    # init of cluster config file
-    self.cfg = cfgw = config.ConfigWriter()
-    cfgw.InitConfig(hostname.name, hostname.ip, self.secondary_ip,
-                    sshkey, self.op.mac_prefix,
-                    self.op.vg_name, self.op.def_bridge)
-
-    ssh.WriteKnownHostsFile(cfgw, ss, constants.SSH_KNOWN_HOSTS_FILE)
-
-
 class LUDestroyCluster(NoHooksLU):
   """Logical unit for destroying the cluster.
 
@@ -598,19 +486,29 @@ class LUDestroyCluster(NoHooksLU):
 
     """
     master = self.sstore.GetMasterNode()
-    if not rpc.call_node_stop_master(master):
+    if not rpc.call_node_stop_master(master, False):
       raise errors.OpExecError("Could not disable the master role")
     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
     utils.CreateBackup(priv_key)
     utils.CreateBackup(pub_key)
-    rpc.call_node_leave_cluster(master)
+    return master
 
 
-class LUVerifyCluster(NoHooksLU):
+class LUVerifyCluster(LogicalUnit):
   """Verifies the cluster status.
 
   """
+  HPATH = "cluster-verify"
+  HTYPE = constants.HTYPE_CLUSTER
   _OP_REQP = ["skip_checks"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {
+      locking.LEVEL_NODE: locking.ALL_SET,
+      locking.LEVEL_INSTANCE: locking.ALL_SET,
+    }
+    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
 
   def _VerifyNode(self, node, file_list, local_cksum, vglist, node_result,
                   remote_version, feedback_fn):
@@ -647,7 +545,8 @@ class LUVerifyCluster(NoHooksLU):
                       (node,))
       bad = True
     else:
-      vgstatus = _HasValidVG(vglist, self.cfg.GetVGName())
+      vgstatus = utils.CheckVolumeGroupSize(vglist, self.cfg.GetVGName(),
+                                            constants.MIN_VG_SIZE)
       if vgstatus:
         feedback_fn("  - ERROR: %s on node %s" % (vgstatus, node))
         bad = True
@@ -670,13 +569,24 @@ class LUVerifyCluster(NoHooksLU):
 
     if 'nodelist' not in node_result:
       bad = True
-      feedback_fn("  - ERROR: node hasn't returned node connectivity data")
+      feedback_fn("  - ERROR: node hasn't returned node ssh connectivity data")
     else:
       if node_result['nodelist']:
         bad = True
         for node in node_result['nodelist']:
-          feedback_fn("  - ERROR: communication with node '%s': %s" %
+          feedback_fn("  - ERROR: ssh communication with node '%s': %s" %
                           (node, node_result['nodelist'][node]))
+    if 'node-net-test' not in node_result:
+      bad = True
+      feedback_fn("  - ERROR: node hasn't returned node tcp connectivity data")
+    else:
+      if node_result['node-net-test']:
+        bad = True
+        nlist = utils.NiceSort(node_result['node-net-test'].keys())
+        for node in nlist:
+          feedback_fn("  - ERROR: tcp communication with node '%s': %s" %
+                          (node, node_result['node-net-test'][node]))
+
     hyp_result = node_result.get('hypervisor', None)
     if hyp_result is not None:
       feedback_fn("  - ERROR: hypervisor verify failure: '%s'" % hyp_result)
@@ -791,6 +701,18 @@ class LUVerifyCluster(NoHooksLU):
     if not constants.VERIFY_OPTIONAL_CHECKS.issuperset(self.skip_set):
       raise errors.OpPrereqError("Invalid checks to be skipped specified")
 
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    Cluster-Verify hooks just rone in the post phase and their failure makes
+    the output be logged in the verify output and the verification to fail.
+
+    """
+    all_nodes = self.cfg.GetNodeList()
+    # TODO: populate the environment with useful information for verify hooks
+    env = {}
+    return env, [], all_nodes
+
   def Exec(self, feedback_fn):
     """Verify integrity of cluster, performing various test on nodes.
 
@@ -802,6 +724,7 @@ class LUVerifyCluster(NoHooksLU):
 
     vg_name = self.cfg.GetVGName()
     nodelist = utils.NiceSort(self.cfg.GetNodeList())
+    nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
     i_non_redundant = [] # Non redundant instances
     node_volume = {}
@@ -824,6 +747,8 @@ class LUVerifyCluster(NoHooksLU):
       'filelist': file_names,
       'nodelist': nodelist,
       'hypervisor': None,
+      'node-net-test': [(node.name, node.primary_ip, node.secondary_ip)
+                        for node in nodeinfo]
       }
     all_nvinfo = rpc.call_node_verify(nodelist, node_verify_param)
     all_rversion = rpc.call_version(nodelist)
@@ -948,7 +873,49 @@ class LUVerifyCluster(NoHooksLU):
       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
                   % len(i_non_redundant))
 
-    return int(bad)
+    return not bad
+
+  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
+    """Analize the post-hooks' result, handle it, and send some
+    nicely-formatted feedback back to the user.
+
+    Args:
+      phase: the hooks phase that has just been run
+      hooks_results: the results of the multi-node hooks rpc call
+      feedback_fn: function to send feedback back to the caller
+      lu_result: previous Exec result
+
+    """
+    # We only really run POST phase hooks, and are only interested in
+    # their results
+    if phase == constants.HOOKS_PHASE_POST:
+      # Used to change hooks' output to proper indentation
+      indent_re = re.compile('^', re.M)
+      feedback_fn("* Hooks Results")
+      if not hooks_results:
+        feedback_fn("  - ERROR: general communication failure")
+        lu_result = 1
+      else:
+        for node_name in hooks_results:
+          show_node_header = True
+          res = hooks_results[node_name]
+          if res is False or not isinstance(res, list):
+            feedback_fn("    Communication failure")
+            lu_result = 1
+            continue
+          for script, hkr, output in res:
+            if hkr == constants.HKR_FAIL:
+              # The node header is only shown once, if there are
+              # failing hooks on that node
+              if show_node_header:
+                feedback_fn("  Node %s:" % node_name)
+                show_node_header = False
+              feedback_fn("    ERROR: Script %s failed, output:" % script)
+              output = indent_re.sub('      ', output)
+              feedback_fn("%s" % output)
+              lu_result = 1
+
+      return lu_result
 
 
 class LUVerifyDisks(NoHooksLU):
@@ -956,6 +923,14 @@ class LUVerifyDisks(NoHooksLU):
 
   """
   _OP_REQP = []
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {
+      locking.LEVEL_NODE: locking.ALL_SET,
+      locking.LEVEL_INSTANCE: locking.ALL_SET,
+    }
+    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -1030,6 +1005,7 @@ class LURenameCluster(LogicalUnit):
   HPATH = "cluster-rename"
   HTYPE = constants.HTYPE_CLUSTER
   _OP_REQP = ["name"]
+  REQ_WSSTORE = True
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -1056,8 +1032,7 @@ class LURenameCluster(LogicalUnit):
       raise errors.OpPrereqError("Neither the name nor the IP address of the"
                                  " cluster has changed")
     if new_ip != old_ip:
-      result = utils.RunCmd(["fping", "-q", new_ip])
-      if not result.failed:
+      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
         raise errors.OpPrereqError("The given cluster IP address (%s) is"
                                    " reachable on the network. Aborting." %
                                    new_ip)
@@ -1074,7 +1049,7 @@ class LURenameCluster(LogicalUnit):
 
     # shutdown the master IP
     master = ss.GetMasterNode()
-    if not rpc.call_node_stop_master(master):
+    if not rpc.call_node_stop_master(master, False):
       raise errors.OpExecError("Could not disable the master role")
 
     try:
@@ -1097,7 +1072,7 @@ class LURenameCluster(LogicalUnit):
             logger.Error("copy of file %s to node %s failed" %
                          (fname, to_node))
     finally:
-      if not rpc.call_node_start_master(master):
+      if not rpc.call_node_start_master(master, False):
         logger.Error("Could not re-enable the master role on the master,"
                      " please restart manually.")
 
@@ -1126,6 +1101,15 @@ class LUSetClusterParams(LogicalUnit):
   HPATH = "cluster-modify"
   HTYPE = constants.HTYPE_CLUSTER
   _OP_REQP = []
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    # FIXME: in the future maybe other cluster params won't require checking on
+    # all nodes to be modified.
+    self.needed_locks = {
+      locking.LEVEL_NODE: locking.ALL_SET,
+    }
+    self.share_locks[locking.LEVEL_NODE] = 1
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -1145,9 +1129,10 @@ class LUSetClusterParams(LogicalUnit):
     if the given volume group is valid.
 
     """
+    # FIXME: This only works because there is only one parameter that can be
+    # changed or removed.
     if not self.op.vg_name:
-      instances = [self.cfg.GetInstanceInfo(name)
-                   for name in self.cfg.GetInstanceList()]
+      instances = self.cfg.GetAllInstancesInfo().values()
       for inst in instances:
         for disk in inst.disks:
           if _RecursiveCheckIfLVMBased(disk):
@@ -1156,10 +1141,11 @@ class LUSetClusterParams(LogicalUnit):
 
     # if vg_name not None, checks given volume group on all nodes
     if self.op.vg_name:
-      node_list = self.cfg.GetNodeList()
+      node_list = self.acquired_locks[locking.LEVEL_NODE]
       vglist = rpc.call_vg_list(node_list)
       for node in node_list:
-        vgstatus = _HasValidVG(vglist[node], self.op.vg_name)
+        vgstatus = utils.CheckVolumeGroupSize(vglist[node], self.op.vg_name,
+                                              constants.MIN_VG_SIZE)
         if vgstatus:
           raise errors.OpPrereqError("Error on node '%s': %s" %
                                      (node, vgstatus))
@@ -1226,15 +1212,7 @@ def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
     if done or oneshot:
       break
 
-    if unlock:
-      #utils.Unlock('cmd')
-      pass
-    try:
-      time.sleep(min(60, max_time))
-    finally:
-      if unlock:
-        #utils.Lock('cmd')
-        pass
+    time.sleep(min(60, max_time))
 
   if done:
     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
@@ -1275,13 +1253,9 @@ class LUDiagnoseOS(NoHooksLU):
 
   """
   _OP_REQP = ["output_fields", "names"]
+  REQ_BGL = False
 
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This always succeeds, since this is a pure query LU.
-
-    """
+  def ExpandNames(self):
     if self.op.names:
       raise errors.OpPrereqError("Selective OS query not supported")
 
@@ -1290,6 +1264,16 @@ class LUDiagnoseOS(NoHooksLU):
                        dynamic=self.dynamic_fields,
                        selected=self.op.output_fields)
 
+    # Lock all nodes, in shared mode
+    self.needed_locks = {}
+    self.share_locks[locking.LEVEL_NODE] = 1
+    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+
   @staticmethod
   def _DiagnoseByOS(node_list, rlist):
     """Remaps a per-node return list into an a per-os per-node dictionary
@@ -1325,7 +1309,7 @@ class LUDiagnoseOS(NoHooksLU):
     """Compute the list of OSes.
 
     """
-    node_list = self.cfg.GetNodeList()
+    node_list = self.acquired_locks[locking.LEVEL_NODE]
     node_data = rpc.call_os_diagnose(node_list)
     if node_data == False:
       raise errors.OpExecError("Can't gather the list of OSes")
@@ -1362,7 +1346,7 @@ class LURemoveNode(LogicalUnit):
     """Build hooks env.
 
     This doesn't run on the target node in the pre phase as a failed
-    node would not allows itself to run.
+    node would then be impossible to remove.
 
     """
     env = {
@@ -1414,15 +1398,9 @@ class LURemoveNode(LogicalUnit):
     logger.Info("stopping the node daemon and removing configs from node %s" %
                 node.name)
 
-    rpc.call_node_leave_cluster(node.name)
-
-    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
-
-    logger.Info("Removing node %s from config" % node.name)
-
-    self.cfg.RemoveNode(node.name)
+    self.context.RemoveNode(node.name)
 
-    _RemoveHostFromEtcHosts(node.name)
+    rpc.call_node_leave_cluster(node.name)
 
 
 class LUQueryNodes(NoHooksLU):
@@ -1430,31 +1408,58 @@ class LUQueryNodes(NoHooksLU):
 
   """
   _OP_REQP = ["output_fields", "names"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.dynamic_fields = frozenset([
+      "dtotal", "dfree",
+      "mtotal", "mnode", "mfree",
+      "bootid",
+      "ctotal",
+      ])
+
+    self.static_fields = frozenset([
+      "name", "pinst_cnt", "sinst_cnt",
+      "pinst_list", "sinst_list",
+      "pip", "sip", "tags",
+      ])
+
+    _CheckOutputFields(static=self.static_fields,
+                       dynamic=self.dynamic_fields,
+                       selected=self.op.output_fields)
 
-  def CheckPrereq(self):
-    """Check prerequisites.
+    self.needed_locks = {}
+    self.share_locks[locking.LEVEL_NODE] = 1
 
-    This checks that the fields required are valid output fields.
+    if self.op.names:
+      self.wanted = _GetWantedNodes(self, self.op.names)
+    else:
+      self.wanted = locking.ALL_SET
 
-    """
-    self.dynamic_fields = frozenset(["dtotal", "dfree",
-                                     "mtotal", "mnode", "mfree",
-                                     "bootid"])
+    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
+    if self.do_locking:
+      # if we don't request only static fields, we need to lock the nodes
+      self.needed_locks[locking.LEVEL_NODE] = self.wanted
 
-    _CheckOutputFields(static=["name", "pinst_cnt", "sinst_cnt",
-                               "pinst_list", "sinst_list",
-                               "pip", "sip"],
-                       dynamic=self.dynamic_fields,
-                       selected=self.op.output_fields)
 
-    self.wanted = _GetWantedNodes(self, self.op.names)
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    # The validation of the node list is done in the _GetWantedNodes,
+    # if non empty, and if empty, there's no validation to do
+    pass
 
   def Exec(self, feedback_fn):
     """Computes the list of nodes and their attributes.
 
     """
-    nodenames = self.wanted
-    nodelist = [self.cfg.GetNodeInfo(name) for name in nodenames]
+    all_info = self.cfg.GetAllNodesInfo()
+    if self.do_locking:
+      nodenames = self.acquired_locks[locking.LEVEL_NODE]
+    else:
+      nodenames = all_info.keys()
+    nodelist = [all_info[name] for name in nodenames]
 
     # begin data gathering
 
@@ -1470,6 +1475,7 @@ class LUQueryNodes(NoHooksLU):
             "mfree": utils.TryConvert(int, nodeinfo['memory_free']),
             "dtotal": utils.TryConvert(int, nodeinfo['vg_size']),
             "dfree": utils.TryConvert(int, nodeinfo['vg_free']),
+            "ctotal": utils.TryConvert(int, nodeinfo['cpu_total']),
             "bootid": nodeinfo['bootid'],
             }
         else:
@@ -1513,6 +1519,8 @@ class LUQueryNodes(NoHooksLU):
           val = node.primary_ip
         elif field == "sip":
           val = node.secondary_ip
+        elif field == "tags":
+          val = list(node.GetTags())
         elif field in self.dynamic_fields:
           val = live_data[node.name].get(field, None)
         else:
@@ -1528,6 +1536,20 @@ class LUQueryNodeVolumes(NoHooksLU):
 
   """
   _OP_REQP = ["nodes", "output_fields"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    _CheckOutputFields(static=["node"],
+                       dynamic=["phys", "vg", "name", "size", "instance"],
+                       selected=self.op.output_fields)
+
+    self.needed_locks = {}
+    self.share_locks[locking.LEVEL_NODE] = 1
+    if not self.op.nodes:
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    else:
+      self.needed_locks[locking.LEVEL_NODE] = \
+        _GetWantedNodes(self, self.op.nodes)
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -1535,12 +1557,7 @@ class LUQueryNodeVolumes(NoHooksLU):
     This checks that the fields required are valid output fields.
 
     """
-    self.nodes = _GetWantedNodes(self, self.op.nodes)
-
-    _CheckOutputFields(static=["node"],
-                       dynamic=["phys", "vg", "name", "size", "instance"],
-                       selected=self.op.output_fields)
-
+    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
 
   def Exec(self, feedback_fn):
     """Computes the list of nodes and their attributes.
@@ -1694,11 +1711,6 @@ class LUAddNode(LogicalUnit):
                                  primary_ip=primary_ip,
                                  secondary_ip=secondary_ip)
 
-    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
-      if not os.path.exists(constants.VNC_PASSWORD_FILE):
-        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
-                                   constants.VNC_PASSWORD_FILE)
-
   def Exec(self, feedback_fn):
     """Adds the new node to the cluster.
 
@@ -1706,46 +1718,7 @@ class LUAddNode(LogicalUnit):
     new_node = self.new_node
     node = new_node.name
 
-    # set up inter-node password and certificate and restarts the node daemon
-    gntpass = self.sstore.GetNodeDaemonPassword()
-    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
-      raise errors.OpExecError("ganeti password corruption detected")
-    f = open(constants.SSL_CERT_FILE)
-    try:
-      gntpem = f.read(8192)
-    finally:
-      f.close()
-    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
-    # so we use this to detect an invalid certificate; as long as the
-    # cert doesn't contain this, the here-document will be correctly
-    # parsed by the shell sequence below
-    if re.search('^!EOF\.', gntpem, re.MULTILINE):
-      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
-    if not gntpem.endswith("\n"):
-      raise errors.OpExecError("PEM must end with newline")
-    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
-
-    # and then connect with ssh to set password and start ganeti-noded
-    # note that all the below variables are sanitized at this point,
-    # either by being constants or by the checks above
-    ss = self.sstore
-    mycommand = ("umask 077 && "
-                 "echo '%s' > '%s' && "
-                 "cat > '%s' << '!EOF.' && \n"
-                 "%s!EOF.\n%s restart" %
-                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
-                  constants.SSL_CERT_FILE, gntpem,
-                  constants.NODE_INITD_SCRIPT))
-
-    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
-    if result.failed:
-      raise errors.OpExecError("Remote command on node %s, error: %s,"
-                               " output: %s" %
-                               (node, result.fail_reason, result.output))
-
     # check connectivity
-    time.sleep(4)
-
     result = rpc.call_version([node])[node]
     if result:
       if constants.PROTOCOL_VERSION == result:
@@ -1780,7 +1753,7 @@ class LUAddNode(LogicalUnit):
       raise errors.OpExecError("Cannot transfer ssh keys to the new node")
 
     # Add node to our /etc/hosts, and add key to known_hosts
-    _AddHostToEtcHosts(new_node.name)
+    utils.AddHostToEtcHosts(new_node.name)
 
     if new_node.secondary_ip != new_node.primary_ip:
       if not rpc.call_node_tcp_ping(new_node.name,
@@ -1792,17 +1765,29 @@ class LUAddNode(LogicalUnit):
                                  " you gave (%s). Please fix and re-run this"
                                  " command." % new_node.secondary_ip)
 
-    success, msg = self.ssh.VerifyNodeHostname(node)
-    if not success:
-      raise errors.OpExecError("Node '%s' claims it has a different hostname"
-                               " than the one the resolver gives: %s."
-                               " Please fix and re-run this command." %
-                               (node, msg))
+    node_verify_list = [self.sstore.GetMasterNode()]
+    node_verify_param = {
+      'nodelist': [node],
+      # TODO: do a node-net-test as well?
+    }
+
+    result = rpc.call_node_verify(node_verify_list, node_verify_param)
+    for verifier in node_verify_list:
+      if not result[verifier]:
+        raise errors.OpExecError("Cannot communicate with %s's node daemon"
+                                 " for remote verification" % verifier)
+      if result[verifier]['nodelist']:
+        for failed in result[verifier]['nodelist']:
+          feedback_fn("ssh/hostname verification failed %s -> %s" %
+                      (verifier, result[verifier]['nodelist'][failed]))
+        raise errors.OpExecError("ssh/hostname verification failed.")
 
     # Distribute updated /etc/hosts and known_hosts to all nodes,
     # including the node just added
     myself = self.cfg.GetNodeInfo(self.sstore.GetMasterNode())
-    dist_nodes = self.cfg.GetNodeList() + [node]
+    dist_nodes = self.cfg.GetNodeList()
+    if not self.op.readd:
+      dist_nodes.append(node)
     if myself.name in dist_nodes:
       dist_nodes.remove(myself.name)
 
@@ -1814,87 +1799,18 @@ class LUAddNode(LogicalUnit):
           logger.Error("copy of file %s to node %s failed" %
                        (fname, to_node))
 
-    to_copy = ss.GetFileList()
+    to_copy = self.sstore.GetFileList()
     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
       to_copy.append(constants.VNC_PASSWORD_FILE)
     for fname in to_copy:
-      if not self.ssh.CopyFileToNode(node, fname):
+      result = rpc.call_upload_file([node], fname)
+      if not result[node]:
         logger.Error("could not copy file %s to node %s" % (fname, node))
 
-    if not self.op.readd:
-      logger.Info("adding node %s to cluster.conf" % node)
-      self.cfg.AddNode(new_node)
-
-
-class LUMasterFailover(LogicalUnit):
-  """Failover the master node to the current node.
-
-  This is a special LU in that it must run on a non-master node.
-
-  """
-  HPATH = "master-failover"
-  HTYPE = constants.HTYPE_CLUSTER
-  REQ_MASTER = False
-  _OP_REQP = []
-
-  def BuildHooksEnv(self):
-    """Build hooks env.
-
-    This will run on the new master only in the pre phase, and on all
-    the nodes in the post phase.
-
-    """
-    env = {
-      "OP_TARGET": self.new_master,
-      "NEW_MASTER": self.new_master,
-      "OLD_MASTER": self.old_master,
-      }
-    return env, [self.new_master], self.cfg.GetNodeList()
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks that we are not already the master.
-
-    """
-    self.new_master = utils.HostInfo().name
-    self.old_master = self.sstore.GetMasterNode()
-
-    if self.old_master == self.new_master:
-      raise errors.OpPrereqError("This commands must be run on the node"
-                                 " where you want the new master to be."
-                                 " %s is already the master" %
-                                 self.old_master)
-
-  def Exec(self, feedback_fn):
-    """Failover the master node.
-
-    This command, when run on a non-master node, will cause the current
-    master to cease being master, and the non-master to become new
-    master.
-
-    """
-    #TODO: do not rely on gethostname returning the FQDN
-    logger.Info("setting master to %s, old master: %s" %
-                (self.new_master, self.old_master))
-
-    if not rpc.call_node_stop_master(self.old_master):
-      logger.Error("could disable the master role on the old master"
-                   " %s, please disable manually" % self.old_master)
-
-    ss = self.sstore
-    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
-    if not rpc.call_upload_file(self.cfg.GetNodeList(),
-                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
-      logger.Error("could not distribute the new simple store master file"
-                   " to the other nodes, please check.")
-
-    if not rpc.call_node_start_master(self.new_master):
-      logger.Error("could not start the master role on the new master"
-                   " %s, please check" % self.new_master)
-      feedback_fn("Error in activating the master IP on the new master,"
-                  " please fix manually.")
-
+    if self.op.readd:
+      self.context.ReaddNode(new_node)
+    else:
+      self.context.AddNode(new_node)
 
 
 class LUQueryClusterInfo(NoHooksLU):
@@ -1903,6 +1819,10 @@ class LUQueryClusterInfo(NoHooksLU):
   """
   _OP_REQP = []
   REQ_MASTER = False
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {}
 
   def CheckPrereq(self):
     """No prerequsites needed for this LU.
@@ -1923,55 +1843,21 @@ class LUQueryClusterInfo(NoHooksLU):
       "export_version": constants.EXPORT_VERSION,
       "master": self.sstore.GetMasterNode(),
       "architecture": (platform.architecture()[0], platform.machine()),
+      "hypervisor_type": self.sstore.GetHypervisorType(),
       }
 
     return result
 
 
-class LUClusterCopyFile(NoHooksLU):
-  """Copy file to cluster.
-
-  """
-  _OP_REQP = ["nodes", "filename"]
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    It should check that the named file exists and that the given list
-    of nodes is valid.
-
-    """
-    if not os.path.exists(self.op.filename):
-      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
-
-    self.nodes = _GetWantedNodes(self, self.op.nodes)
-
-  def Exec(self, feedback_fn):
-    """Copy a file from master to some nodes.
-
-    Args:
-      opts - class with options as members
-      args - list containing a single element, the file name
-    Opts used:
-      nodes - list containing the name of target nodes; if empty, all nodes
-
-    """
-    filename = self.op.filename
-
-    myname = utils.HostInfo().name
-
-    for node in self.nodes:
-      if node == myname:
-        continue
-      if not self.ssh.CopyFileToNode(node, filename):
-        logger.Error("Copy of file %s to node %s failed" % (filename, node))
-
-
 class LUDumpClusterConfig(NoHooksLU):
   """Return a text-representation of the cluster-config.
 
   """
   _OP_REQP = []
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {}
 
   def CheckPrereq(self):
     """No prerequisites.
@@ -1986,67 +1872,41 @@ class LUDumpClusterConfig(NoHooksLU):
     return self.cfg.DumpConfig()
 
 
-class LURunClusterCommand(NoHooksLU):
-  """Run a command on some nodes.
+class LUActivateInstanceDisks(NoHooksLU):
+  """Bring up an instance's disks.
 
   """
-  _OP_REQP = ["command", "nodes"]
+  _OP_REQP = ["instance_name"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def CheckPrereq(self):
     """Check prerequisites.
 
-    It checks that the given list of nodes is valid.
+    This checks that the instance is in the cluster.
 
     """
-    self.nodes = _GetWantedNodes(self, self.op.nodes)
+    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
   def Exec(self, feedback_fn):
-    """Run a command on some nodes.
+    """Activate the disks.
 
     """
-    # put the master at the end of the nodes list
-    master_node = self.sstore.GetMasterNode()
-    if master_node in self.nodes:
-      self.nodes.remove(master_node)
-      self.nodes.append(master_node)
-
-    data = []
-    for node in self.nodes:
-      result = self.ssh.Run(node, "root", self.op.command)
-      data.append((node, result.output, result.exit_code))
+    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
+    if not disks_ok:
+      raise errors.OpExecError("Cannot activate block devices")
 
-    return data
-
-
-class LUActivateInstanceDisks(NoHooksLU):
-  """Bring up an instance's disks.
-
-  """
-  _OP_REQP = ["instance_name"]
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks that the instance is in the cluster.
-
-    """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
-    self.instance = instance
-
-
-  def Exec(self, feedback_fn):
-    """Activate the disks.
-
-    """
-    disks_ok, disks_info = _AssembleInstanceDisks(self.instance, self.cfg)
-    if not disks_ok:
-      raise errors.OpExecError("Cannot activate block devices")
-
-    return disks_info
+    return disks_info
 
 
 def _AssembleInstanceDisks(instance, cfg, ignore_secondaries=False):
@@ -2130,6 +1990,16 @@ class LUDeactivateInstanceDisks(NoHooksLU):
 
   """
   _OP_REQP = ["instance_name"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -2137,29 +2007,36 @@ class LUDeactivateInstanceDisks(NoHooksLU):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
-    self.instance = instance
+    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
   def Exec(self, feedback_fn):
     """Deactivate the disks
 
     """
     instance = self.instance
-    ins_l = rpc.call_instance_list([instance.primary_node])
-    ins_l = ins_l[instance.primary_node]
-    if not type(ins_l) is list:
-      raise errors.OpExecError("Can't contact node '%s'" %
-                               instance.primary_node)
+    _SafeShutdownInstanceDisks(instance, self.cfg)
 
-    if self.instance.name in ins_l:
-      raise errors.OpExecError("Instance is running, can't shutdown"
-                               " block devices.")
 
-    _ShutdownInstanceDisks(instance, self.cfg)
+def _SafeShutdownInstanceDisks(instance, cfg):
+  """Shutdown block devices of an instance.
+
+  This function checks if an instance is running, before calling
+  _ShutdownInstanceDisks.
+
+  """
+  ins_l = rpc.call_instance_list([instance.primary_node])
+  ins_l = ins_l[instance.primary_node]
+  if not type(ins_l) is list:
+    raise errors.OpExecError("Can't contact node '%s'" %
+                             instance.primary_node)
+
+  if instance.name in ins_l:
+    raise errors.OpExecError("Instance is running, can't shutdown"
+                             " block devices.")
+
+  _ShutdownInstanceDisks(instance, cfg)
 
 
 def _ShutdownInstanceDisks(instance, cfg, ignore_primary=False):
@@ -2220,6 +2097,16 @@ class LUStartupInstance(LogicalUnit):
   HPATH = "instance-start"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name", "force"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2241,11 +2128,9 @@ class LUStartupInstance(LogicalUnit):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
+    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
     # check bridges existance
     _CheckInstanceBridgesExist(instance)
@@ -2254,9 +2139,6 @@ class LUStartupInstance(LogicalUnit):
                          "starting instance %s" % instance.name,
                          instance.memory)
 
-    self.instance = instance
-    self.op.instance_name = instance.name
-
   def Exec(self, feedback_fn):
     """Start the instance.
 
@@ -2283,6 +2165,24 @@ class LURebootInstance(LogicalUnit):
   HPATH = "instance-reboot"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    if self.op.reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
+                                   constants.INSTANCE_REBOOT_HARD,
+                                   constants.INSTANCE_REBOOT_FULL]:
+      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
+                                  (constants.INSTANCE_REBOOT_SOFT,
+                                   constants.INSTANCE_REBOOT_HARD,
+                                   constants.INSTANCE_REBOOT_FULL))
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      primary_only = not constants.INSTANCE_REBOOT_FULL
+      self._LockInstancesNodes(primary_only=primary_only)
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2304,18 +2204,13 @@ class LURebootInstance(LogicalUnit):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
+    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
     # check bridges existance
     _CheckInstanceBridgesExist(instance)
 
-    self.instance = instance
-    self.op.instance_name = instance.name
-
   def Exec(self, feedback_fn):
     """Reboot the instance.
 
@@ -2327,14 +2222,6 @@ class LURebootInstance(LogicalUnit):
 
     node_current = instance.primary_node
 
-    if reboot_type not in [constants.INSTANCE_REBOOT_SOFT,
-                           constants.INSTANCE_REBOOT_HARD,
-                           constants.INSTANCE_REBOOT_FULL]:
-      raise errors.ParameterError("reboot type not in [%s, %s, %s]" %
-                                  (constants.INSTANCE_REBOOT_SOFT,
-                                   constants.INSTANCE_REBOOT_HARD,
-                                   constants.INSTANCE_REBOOT_FULL))
-
     if reboot_type in [constants.INSTANCE_REBOOT_SOFT,
                        constants.INSTANCE_REBOOT_HARD]:
       if not rpc.call_instance_reboot(node_current, instance,
@@ -2359,6 +2246,16 @@ class LUShutdownInstance(LogicalUnit):
   HPATH = "instance-stop"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2377,12 +2274,9 @@ class LUShutdownInstance(LogicalUnit):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
-    self.instance = instance
+    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
   def Exec(self, feedback_fn):
     """Shutdown the instance.
@@ -2404,6 +2298,16 @@ class LUReinstallInstance(LogicalUnit):
   HPATH = "instance-reinstall"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2422,11 +2326,10 @@ class LUReinstallInstance(LogicalUnit):
     This checks that the instance is in the cluster and is not running.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
+    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
+
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name)
@@ -2527,9 +2430,7 @@ class LURenameInstance(LogicalUnit):
                                  new_name)
 
     if not getattr(self.op, "ignore_ip", False):
-      command = ["fping", "-q", name_info.ip]
-      result = utils.RunCmd(command)
-      if not result.failed:
+      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
         raise errors.OpPrereqError("IP %s of instance %s already in use" %
                                    (name_info.ip, new_name))
 
@@ -2545,6 +2446,9 @@ class LURenameInstance(LogicalUnit):
       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
 
     self.cfg.RenameInstance(inst.name, self.op.new_name)
+    # Change the instance lock. This is definitely safe while we hold the BGL
+    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
+    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
 
     # re-read the instance from the configuration after rename
     inst = self.cfg.GetInstanceInfo(self.op.new_name)
@@ -2572,8 +2476,8 @@ class LURenameInstance(LogicalUnit):
     try:
       if not rpc.call_instance_run_rename(inst.primary_node, inst, old_name,
                                           "sda", "sdb"):
-        msg = ("Could run OS rename script for instance %s on node %s (but the"
-               " instance has been renamed in Ganeti)" %
+        msg = ("Could not run OS rename script for instance %s on node %s"
+               " (but the instance has been renamed in Ganeti)" %
                (inst.name, inst.primary_node))
         logger.Error(msg)
     finally:
@@ -2586,7 +2490,17 @@ class LURemoveInstance(LogicalUnit):
   """
   HPATH = "instance-remove"
   HTYPE = constants.HTYPE_INSTANCE
-  _OP_REQP = ["instance_name"]
+  _OP_REQP = ["instance_name", "ignore_failures"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2604,12 +2518,9 @@ class LURemoveInstance(LogicalUnit):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
-    self.instance = instance
+    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
   def Exec(self, feedback_fn):
     """Remove the instance.
@@ -2637,6 +2548,7 @@ class LURemoveInstance(LogicalUnit):
     logger.Info("removing instance %s out of cluster config" % instance.name)
 
     self.cfg.RemoveInstance(instance.name)
+    self.remove_locks[locking.LEVEL_INSTANCE] = instance.name
 
 
 class LUQueryInstances(NoHooksLU):
@@ -2644,30 +2556,60 @@ class LUQueryInstances(NoHooksLU):
 
   """
   _OP_REQP = ["output_fields", "names"]
+  REQ_BGL = False
 
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks that the fields required are valid output fields.
-
-    """
+  def ExpandNames(self):
     self.dynamic_fields = frozenset(["oper_state", "oper_ram", "status"])
-    _CheckOutputFields(static=["name", "os", "pnode", "snodes",
-                               "admin_state", "admin_ram",
-                               "disk_template", "ip", "mac", "bridge",
-                               "sda_size", "sdb_size", "vcpus"],
+    self.static_fields = frozenset([
+      "name", "os", "pnode", "snodes",
+      "admin_state", "admin_ram",
+      "disk_template", "ip", "mac", "bridge",
+      "sda_size", "sdb_size", "vcpus", "tags",
+      "auto_balance",
+      "network_port", "kernel_path", "initrd_path",
+      "hvm_boot_order", "hvm_acpi", "hvm_pae",
+      "hvm_cdrom_image_path", "hvm_nic_type",
+      "hvm_disk_type", "vnc_bind_address",
+      ])
+    _CheckOutputFields(static=self.static_fields,
                        dynamic=self.dynamic_fields,
                        selected=self.op.output_fields)
 
-    self.wanted = _GetWantedInstances(self, self.op.names)
+    self.needed_locks = {}
+    self.share_locks[locking.LEVEL_INSTANCE] = 1
+    self.share_locks[locking.LEVEL_NODE] = 1
+
+    if self.op.names:
+      self.wanted = _GetWantedInstances(self, self.op.names)
+    else:
+      self.wanted = locking.ALL_SET
+
+    self.do_locking = not self.static_fields.issuperset(self.op.output_fields)
+    if self.do_locking:
+      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
+      self.needed_locks[locking.LEVEL_NODE] = []
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE and self.do_locking:
+      self._LockInstancesNodes()
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    pass
 
   def Exec(self, feedback_fn):
     """Computes the list of nodes and their attributes.
 
     """
-    instance_names = self.wanted
-    instance_list = [self.cfg.GetInstanceInfo(iname) for iname
-                     in instance_names]
+    all_info = self.cfg.GetAllInstancesInfo()
+    if self.do_locking:
+      instance_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+    else:
+      instance_names = all_info.keys()
+    instance_list = [all_info[iname] for iname in instance_names]
 
     # begin data gathering
 
@@ -2748,6 +2690,20 @@ class LUQueryInstances(NoHooksLU):
             val = disk.size
         elif field == "vcpus":
           val = instance.vcpus
+        elif field == "tags":
+          val = list(instance.GetTags())
+        elif field in ("network_port", "kernel_path", "initrd_path",
+                       "hvm_boot_order", "hvm_acpi", "hvm_pae",
+                       "hvm_cdrom_image_path", "hvm_nic_type",
+                       "hvm_disk_type", "vnc_bind_address"):
+          val = getattr(instance, field, None)
+          if val is not None:
+            pass
+          elif field in ("hvm_nic_type", "hvm_disk_type",
+                         "kernel_path", "initrd_path"):
+            val = "default"
+          else:
+            val = "-"
         else:
           raise errors.ParameterError(field)
         iout.append(val)
@@ -2763,6 +2719,16 @@ class LUFailoverInstance(LogicalUnit):
   HPATH = "instance-failover"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name", "ignore_consistency"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2783,11 +2749,9 @@ class LUFailoverInstance(LogicalUnit):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
+    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
     if instance.disk_template not in constants.DTS_NET_MIRROR:
       raise errors.OpPrereqError("Instance's disk layout is not"
@@ -2796,7 +2760,7 @@ class LUFailoverInstance(LogicalUnit):
     secondary_nodes = instance.secondary_nodes
     if not secondary_nodes:
       raise errors.ProgrammerError("no secondary node but using "
-                                   "DT_REMOTE_RAID1 template")
+                                   "a mirrored disk template")
 
     target_node = secondary_nodes[0]
     # check memory requirements on the secondary node
@@ -2810,8 +2774,6 @@ class LUFailoverInstance(LogicalUnit):
                                  " exist on destination node '%s'" %
                                  (brlist, target_node))
 
-    self.instance = instance
-
   def Exec(self, feedback_fn):
     """Failover an instance.
 
@@ -2826,7 +2788,7 @@ class LUFailoverInstance(LogicalUnit):
 
     feedback_fn("* checking disk consistency between source and target")
     for dev in instance.disks:
-      # for remote_raid1, these are md over drbd
+      # for drbd, these are drbd over lvm
       if not _CheckDiskConsistency(self.cfg, dev, target_node, False):
         if instance.status == "up" and not self.op.ignore_consistency:
           raise errors.OpExecError("Disk %s is degraded on target node,"
@@ -2851,7 +2813,7 @@ class LUFailoverInstance(LogicalUnit):
 
     instance.primary_node = target_node
     # distribute new instance config to the other nodes
-    self.cfg.AddInstance(instance)
+    self.cfg.Update(instance)
 
     # Only start the instance if it's marked as up
     if instance.status == "up":
@@ -2935,22 +2897,6 @@ def _GenerateUniqueNames(cfg, exts):
   return results
 
 
-def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
-  """Generate a drbd device complete with its children.
-
-  """
-  port = cfg.AllocatePort()
-  vgname = cfg.GetVGName()
-  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
-                          logical_id=(vgname, names[0]))
-  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
-                          logical_id=(vgname, names[1]))
-  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
-                          logical_id = (primary, secondary, port),
-                          children = [dev_data, dev_meta])
-  return drbd_dev
-
-
 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
   """Generate a drbd8 device complete with its children.
 
@@ -3141,6 +3087,125 @@ class LUCreateInstance(LogicalUnit):
   _OP_REQP = ["instance_name", "mem_size", "disk_size",
               "disk_template", "swap_size", "mode", "start", "vcpus",
               "wait_for_sync", "ip_check", "mac"]
+  REQ_BGL = False
+
+  def _ExpandNode(self, node):
+    """Expands and checks one node name.
+
+    """
+    node_full = self.cfg.ExpandNodeName(node)
+    if node_full is None:
+      raise errors.OpPrereqError("Unknown node %s" % node)
+    return node_full
+
+  def ExpandNames(self):
+    """ExpandNames for CreateInstance.
+
+    Figure out the right locks for instance creation.
+
+    """
+    self.needed_locks = {}
+
+    # set optional parameters to none if they don't exist
+    for attr in ["kernel_path", "initrd_path", "pnode", "snode",
+                 "iallocator", "hvm_boot_order", "hvm_acpi", "hvm_pae",
+                 "hvm_cdrom_image_path", "hvm_nic_type", "hvm_disk_type",
+                 "vnc_bind_address"]:
+      if not hasattr(self.op, attr):
+        setattr(self.op, attr, None)
+
+    # verify creation mode
+    if self.op.mode not in (constants.INSTANCE_CREATE,
+                            constants.INSTANCE_IMPORT):
+      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
+                                 self.op.mode)
+    # disk template and mirror node verification
+    if self.op.disk_template not in constants.DISK_TEMPLATES:
+      raise errors.OpPrereqError("Invalid disk template name")
+
+    #### instance parameters check
+
+    # instance name verification
+    hostname1 = utils.HostInfo(self.op.instance_name)
+    self.op.instance_name = instance_name = hostname1.name
+
+    # this is just a preventive check, but someone might still add this
+    # instance in the meantime, and creation will fail at lock-add time
+    if instance_name in self.cfg.GetInstanceList():
+      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
+                                 instance_name)
+
+    self.add_locks[locking.LEVEL_INSTANCE] = instance_name
+
+    # ip validity checks
+    ip = getattr(self.op, "ip", None)
+    if ip is None or ip.lower() == "none":
+      inst_ip = None
+    elif ip.lower() == "auto":
+      inst_ip = hostname1.ip
+    else:
+      if not utils.IsValidIP(ip):
+        raise errors.OpPrereqError("given IP address '%s' doesn't look"
+                                   " like a valid IP" % ip)
+      inst_ip = ip
+    self.inst_ip = self.op.ip = inst_ip
+    # used in CheckPrereq for ip ping check
+    self.check_ip = hostname1.ip
+
+    # MAC address verification
+    if self.op.mac != "auto":
+      if not utils.IsValidMac(self.op.mac.lower()):
+        raise errors.OpPrereqError("invalid MAC address specified: %s" %
+                                   self.op.mac)
+
+    # boot order verification
+    if self.op.hvm_boot_order is not None:
+      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
+        raise errors.OpPrereqError("invalid boot order specified,"
+                                   " must be one or more of [acdn]")
+    # file storage checks
+    if (self.op.file_driver and
+        not self.op.file_driver in constants.FILE_DRIVER):
+      raise errors.OpPrereqError("Invalid file driver name '%s'" %
+                                 self.op.file_driver)
+
+    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
+      raise errors.OpPrereqError("File storage directory path not absolute")
+
+    ### Node/iallocator related checks
+    if [self.op.iallocator, self.op.pnode].count(None) != 1:
+      raise errors.OpPrereqError("One and only one of iallocator and primary"
+                                 " node must be given")
+
+    if self.op.iallocator:
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    else:
+      self.op.pnode = self._ExpandNode(self.op.pnode)
+      nodelist = [self.op.pnode]
+      if self.op.snode is not None:
+        self.op.snode = self._ExpandNode(self.op.snode)
+        nodelist.append(self.op.snode)
+      self.needed_locks[locking.LEVEL_NODE] = nodelist
+
+    # in case of import lock the source node too
+    if self.op.mode == constants.INSTANCE_IMPORT:
+      src_node = getattr(self.op, "src_node", None)
+      src_path = getattr(self.op, "src_path", None)
+
+      if src_node is None or src_path is None:
+        raise errors.OpPrereqError("Importing an instance requires source"
+                                   " node and path options")
+
+      if not os.path.isabs(src_path):
+        raise errors.OpPrereqError("The source path must be absolute")
+
+      self.op.src_node = src_node = self._ExpandNode(src_node)
+      if self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET:
+        self.needed_locks[locking.LEVEL_NODE].append(src_node)
+
+    else: # INSTANCE_CREATE
+      if getattr(self.op, "os_type", None) is None:
+        raise errors.OpPrereqError("No guest OS specified")
 
   def _RunAllocator(self):
     """Run the allocator based on input opcode.
@@ -3216,35 +3281,14 @@ class LUCreateInstance(LogicalUnit):
     """Check prerequisites.
 
     """
-    # set optional parameters to none if they don't exist
-    for attr in ["kernel_path", "initrd_path", "hvm_boot_order", "pnode",
-                 "iallocator"]:
-      if not hasattr(self.op, attr):
-        setattr(self.op, attr, None)
-
-    if self.op.mode not in (constants.INSTANCE_CREATE,
-                            constants.INSTANCE_IMPORT):
-      raise errors.OpPrereqError("Invalid instance creation mode '%s'" %
-                                 self.op.mode)
-
     if (not self.cfg.GetVGName() and
         self.op.disk_template not in constants.DTS_NOT_LVM):
       raise errors.OpPrereqError("Cluster does not support lvm-based"
                                  " instances")
 
     if self.op.mode == constants.INSTANCE_IMPORT:
-      src_node = getattr(self.op, "src_node", None)
-      src_path = getattr(self.op, "src_path", None)
-      if src_node is None or src_path is None:
-        raise errors.OpPrereqError("Importing an instance requires source"
-                                   " node and path options")
-      src_node_full = self.cfg.ExpandNodeName(src_node)
-      if src_node_full is None:
-        raise errors.OpPrereqError("Unknown source node '%s'" % src_node)
-      self.op.src_node = src_node = src_node_full
-
-      if not os.path.isabs(src_path):
-        raise errors.OpPrereqError("The source path must be absolute")
+      src_node = self.op.src_node
+      src_path = self.op.src_path
 
       export_info = rpc.call_export_info(src_node, src_path)
 
@@ -3268,52 +3312,17 @@ class LUCreateInstance(LogicalUnit):
       diskimage = os.path.join(src_path, export_info.get(constants.INISECT_INS,
                                                          'disk0_dump'))
       self.src_image = diskimage
-    else: # INSTANCE_CREATE
-      if getattr(self.op, "os_type", None) is None:
-        raise errors.OpPrereqError("No guest OS specified")
-
-    #### instance parameters check
 
-    # disk template and mirror node verification
-    if self.op.disk_template not in constants.DISK_TEMPLATES:
-      raise errors.OpPrereqError("Invalid disk template name")
-
-    # instance name verification
-    hostname1 = utils.HostInfo(self.op.instance_name)
-
-    self.op.instance_name = instance_name = hostname1.name
-    instance_list = self.cfg.GetInstanceList()
-    if instance_name in instance_list:
-      raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
-                                 instance_name)
-
-    # ip validity checks
-    ip = getattr(self.op, "ip", None)
-    if ip is None or ip.lower() == "none":
-      inst_ip = None
-    elif ip.lower() == "auto":
-      inst_ip = hostname1.ip
-    else:
-      if not utils.IsValidIP(ip):
-        raise errors.OpPrereqError("given IP address '%s' doesn't look"
-                                   " like a valid IP" % ip)
-      inst_ip = ip
-    self.inst_ip = self.op.ip = inst_ip
+    # ip ping checks (we use the same ip that was resolved in ExpandNames)
 
     if self.op.start and not self.op.ip_check:
       raise errors.OpPrereqError("Cannot ignore IP address conflicts when"
                                  " adding an instance in start mode")
 
     if self.op.ip_check:
-      if utils.TcpPing(hostname1.ip, constants.DEFAULT_NODED_PORT):
+      if utils.TcpPing(self.check_ip, constants.DEFAULT_NODED_PORT):
         raise errors.OpPrereqError("IP %s of instance %s already in use" %
-                                   (hostname1.ip, instance_name))
-
-    # MAC address verification
-    if self.op.mac != "auto":
-      if not utils.IsValidMac(self.op.mac.lower()):
-        raise errors.OpPrereqError("invalid MAC address specified: %s" %
-                                   self.op.mac)
+                                   (self.check_ip, instance_name))
 
     # bridge verification
     bridge = getattr(self.op, "bridge", None)
@@ -3322,54 +3331,28 @@ class LUCreateInstance(LogicalUnit):
     else:
       self.op.bridge = bridge
 
-    # boot order verification
-    if self.op.hvm_boot_order is not None:
-      if len(self.op.hvm_boot_order.strip("acdn")) != 0:
-        raise errors.OpPrereqError("invalid boot order specified,"
-                                   " must be one or more of [acdn]")
-    # file storage checks
-    if (self.op.file_driver and
-        not self.op.file_driver in constants.FILE_DRIVER):
-      raise errors.OpPrereqError("Invalid file driver name '%s'" %
-                                 self.op.file_driver)
-
-    if self.op.file_storage_dir and os.path.isabs(self.op.file_storage_dir):
-      raise errors.OpPrereqError("File storage directory not a relative"
-                                 " path")
     #### allocator run
 
-    if [self.op.iallocator, self.op.pnode].count(None) != 1:
-      raise errors.OpPrereqError("One and only one of iallocator and primary"
-                                 " node must be given")
-
     if self.op.iallocator is not None:
       self._RunAllocator()
 
     #### node related checks
 
     # check primary node
-    pnode = self.cfg.GetNodeInfo(self.cfg.ExpandNodeName(self.op.pnode))
-    if pnode is None:
-      raise errors.OpPrereqError("Primary node '%s' is unknown" %
-                                 self.op.pnode)
-    self.op.pnode = pnode.name
-    self.pnode = pnode
+    self.pnode = pnode = self.cfg.GetNodeInfo(self.op.pnode)
+    assert self.pnode is not None, \
+      "Cannot retrieve locked node %s" % self.op.pnode
     self.secondaries = []
 
     # mirror node verification
     if self.op.disk_template in constants.DTS_NET_MIRROR:
-      if getattr(self.op, "snode", None) is None:
+      if self.op.snode is None:
         raise errors.OpPrereqError("The networked disk templates need"
                                    " a mirror node")
-
-      snode_name = self.cfg.ExpandNodeName(self.op.snode)
-      if snode_name is None:
-        raise errors.OpPrereqError("Unknown secondary node '%s'" %
-                                   self.op.snode)
-      elif snode_name == pnode.name:
+      if self.op.snode == pnode.name:
         raise errors.OpPrereqError("The secondary node cannot be"
                                    " the primary node.")
-      self.secondaries.append(snode_name)
+      self.secondaries.append(self.op.snode)
 
     req_size = _ComputeDiskSize(self.op.disk_template,
                                 self.op.disk_size, self.op.swap_size)
@@ -3382,7 +3365,7 @@ class LUCreateInstance(LogicalUnit):
         info = nodeinfo.get(node, None)
         if not info:
           raise errors.OpPrereqError("Cannot get current information"
-                                     " from node '%s'" % nodeinfo)
+                                     " from node '%s'" % node)
         vg_free = info.get('vg_free', None)
         if not isinstance(vg_free, int):
           raise errors.OpPrereqError("Can't compute free disk space on"
@@ -3401,13 +3384,47 @@ class LUCreateInstance(LogicalUnit):
     if self.op.kernel_path == constants.VALUE_NONE:
       raise errors.OpPrereqError("Can't set instance kernel to none")
 
-
     # bridge check on primary node
     if not rpc.call_bridges_exist(self.pnode.name, [self.op.bridge]):
       raise errors.OpPrereqError("target bridge '%s' does not exist on"
                                  " destination node '%s'" %
                                  (self.op.bridge, pnode.name))
 
+    # memory check on primary node
+    if self.op.start:
+      _CheckNodeFreeMemory(self.cfg, self.pnode.name,
+                           "creating instance %s" % self.op.instance_name,
+                           self.op.mem_size)
+
+    # hvm_cdrom_image_path verification
+    if self.op.hvm_cdrom_image_path is not None:
+      # FIXME (als): shouldn't these checks happen on the destination node?
+      if not os.path.isabs(self.op.hvm_cdrom_image_path):
+        raise errors.OpPrereqError("The path to the HVM CDROM image must"
+                                   " be an absolute path or None, not %s" %
+                                   self.op.hvm_cdrom_image_path)
+      if not os.path.isfile(self.op.hvm_cdrom_image_path):
+        raise errors.OpPrereqError("The HVM CDROM image must either be a"
+                                   " regular file or a symlink pointing to"
+                                   " an existing regular file, not %s" %
+                                   self.op.hvm_cdrom_image_path)
+
+    # vnc_bind_address verification
+    if self.op.vnc_bind_address is not None:
+      if not utils.IsValidIP(self.op.vnc_bind_address):
+        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
+                                   " like a valid IP address" %
+                                   self.op.vnc_bind_address)
+
+    # Xen HVM device type checks
+    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
+      if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
+        raise errors.OpPrereqError("Invalid NIC type %s specified for Xen HVM"
+                                   " hypervisor" % self.op.hvm_nic_type)
+      if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
+        raise errors.OpPrereqError("Invalid disk type %s specified for Xen HVM"
+                                   " hypervisor" % self.op.hvm_disk_type)
+
     if self.op.start:
       self.instance_status = 'up'
     else:
@@ -3435,6 +3452,9 @@ class LUCreateInstance(LogicalUnit):
     else:
       network_port = None
 
+    if self.op.vnc_bind_address is None:
+      self.op.vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
+
     # this is needed because os.path.join does not accept None arguments
     if self.op.file_storage_dir is None:
       string_file_storage_dir = ""
@@ -3466,6 +3486,12 @@ class LUCreateInstance(LogicalUnit):
                             kernel_path=self.op.kernel_path,
                             initrd_path=self.op.initrd_path,
                             hvm_boot_order=self.op.hvm_boot_order,
+                            hvm_acpi=self.op.hvm_acpi,
+                            hvm_pae=self.op.hvm_pae,
+                            hvm_cdrom_image_path=self.op.hvm_cdrom_image_path,
+                            vnc_bind_address=self.op.vnc_bind_address,
+                            hvm_nic_type=self.op.hvm_nic_type,
+                            hvm_disk_type=self.op.hvm_disk_type,
                             )
 
     feedback_fn("* creating instance disks...")
@@ -3476,6 +3502,9 @@ class LUCreateInstance(LogicalUnit):
     feedback_fn("adding instance %s to cluster config" % instance)
 
     self.cfg.AddInstance(iobj)
+    # Declare that we don't want to remove the instance lock anymore, as we've
+    # added the instance to the config
+    del self.remove_locks[locking.LEVEL_INSTANCE]
 
     if self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
@@ -3490,6 +3519,8 @@ class LUCreateInstance(LogicalUnit):
     if disk_abort:
       _RemoveDisks(iobj, self.cfg)
       self.cfg.RemoveInstance(iobj.name)
+      # Make sure the instance lock gets removed
+      self.remove_locks[locking.LEVEL_INSTANCE] = iobj.name
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance")
 
@@ -3534,6 +3565,10 @@ class LUConnectConsole(NoHooksLU):
 
   """
   _OP_REQP = ["instance_name"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -3541,12 +3576,9 @@ class LUConnectConsole(NoHooksLU):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
-    self.instance = instance
+    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
   def Exec(self, feedback_fn):
     """Connect to the console of an instance
@@ -3578,6 +3610,38 @@ class LUReplaceDisks(LogicalUnit):
   HPATH = "mirrors-replace"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name", "mode", "disks"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+
+    if not hasattr(self.op, "remote_node"):
+      self.op.remote_node = None
+
+    ia_name = getattr(self.op, "iallocator", None)
+    if ia_name is not None:
+      if self.op.remote_node is not None:
+        raise errors.OpPrereqError("Give either the iallocator or the new"
+                                   " secondary, not both")
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    elif self.op.remote_node is not None:
+      remote_node = self.cfg.ExpandNodeName(self.op.remote_node)
+      if remote_node is None:
+        raise errors.OpPrereqError("Node '%s' not known" %
+                                   self.op.remote_node)
+      self.op.remote_node = remote_node
+      self.needed_locks[locking.LEVEL_NODE] = [remote_node]
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
+    else:
+      self.needed_locks[locking.LEVEL_NODE] = []
+      self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    # If we're not already locking all nodes in the set we have to declare the
+    # instance's primary/secondary nodes.
+    if (level == locking.LEVEL_NODE and
+        self.needed_locks[locking.LEVEL_NODE] is not locking.ALL_SET):
+      self._LockInstancesNodes()
 
   def _RunAllocator(self):
     """Compute a new secondary node using an IAllocator.
@@ -3628,16 +3692,10 @@ class LUReplaceDisks(LogicalUnit):
     This checks that the instance is in the cluster.
 
     """
-    if not hasattr(self.op, "remote_node"):
-      self.op.remote_node = None
-
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
+    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
     self.instance = instance
-    self.op.instance_name = instance.name
 
     if instance.disk_template not in constants.DTS_NET_MIRROR:
       raise errors.OpPrereqError("Instance's disk layout is not"
@@ -3652,18 +3710,13 @@ class LUReplaceDisks(LogicalUnit):
 
     ia_name = getattr(self.op, "iallocator", None)
     if ia_name is not None:
-      if self.op.remote_node is not None:
-        raise errors.OpPrereqError("Give either the iallocator or the new"
-                                   " secondary, not both")
-      self.op.remote_node = self._RunAllocator()
+      self._RunAllocator()
 
     remote_node = self.op.remote_node
     if remote_node is not None:
-      remote_node = self.cfg.ExpandNodeName(remote_node)
-      if remote_node is None:
-        raise errors.OpPrereqError("Node '%s' not known" %
-                                   self.op.remote_node)
       self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
+      assert self.remote_node_info is not None, \
+        "Cannot retrieve locked node %s" % remote_node
     else:
       self.remote_node_info = None
     if remote_node == instance.primary_node:
@@ -3675,13 +3728,6 @@ class LUReplaceDisks(LogicalUnit):
         # replacement as for drbd7 (no different port allocated)
         raise errors.OpPrereqError("Same secondary given, cannot execute"
                                    " replacement")
-      # the user gave the current secondary, switch to
-      # 'no-replace-secondary' mode for drbd7
-      remote_node = None
-    if (instance.disk_template == constants.DT_REMOTE_RAID1 and
-        self.op.mode != constants.REPLACE_DISK_ALL):
-      raise errors.OpPrereqError("Template 'remote_raid1' only allows all"
-                                 " disks replacement, not individual ones")
     if instance.disk_template == constants.DT_DRBD8:
       if (self.op.mode == constants.REPLACE_DISK_ALL and
           remote_node is not None):
@@ -3711,102 +3757,6 @@ class LUReplaceDisks(LogicalUnit):
       if instance.FindDisk(name) is None:
         raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
                                    (name, instance.name))
-    self.op.remote_node = remote_node
-
-  def _ExecRR1(self, feedback_fn):
-    """Replace the disks of an instance.
-
-    """
-    instance = self.instance
-    iv_names = {}
-    # start of work
-    if self.op.remote_node is None:
-      remote_node = self.sec_node
-    else:
-      remote_node = self.op.remote_node
-    cfg = self.cfg
-    for dev in instance.disks:
-      size = dev.size
-      lv_names = [".%s_%s" % (dev.iv_name, suf) for suf in ["data", "meta"]]
-      names = _GenerateUniqueNames(cfg, lv_names)
-      new_drbd = _GenerateMDDRBDBranch(cfg, instance.primary_node,
-                                       remote_node, size, names)
-      iv_names[dev.iv_name] = (dev, dev.children[0], new_drbd)
-      logger.Info("adding new mirror component on secondary for %s" %
-                  dev.iv_name)
-      #HARDCODE
-      if not _CreateBlockDevOnSecondary(cfg, remote_node, instance,
-                                        new_drbd, False,
-                                        _GetInstanceInfoText(instance)):
-        raise errors.OpExecError("Failed to create new component on secondary"
-                                 " node %s. Full abort, cleanup manually!" %
-                                 remote_node)
-
-      logger.Info("adding new mirror component on primary")
-      #HARDCODE
-      if not _CreateBlockDevOnPrimary(cfg, instance.primary_node,
-                                      instance, new_drbd,
-                                      _GetInstanceInfoText(instance)):
-        # remove secondary dev
-        cfg.SetDiskID(new_drbd, remote_node)
-        rpc.call_blockdev_remove(remote_node, new_drbd)
-        raise errors.OpExecError("Failed to create volume on primary!"
-                                 " Full abort, cleanup manually!!")
-
-      # the device exists now
-      # call the primary node to add the mirror to md
-      logger.Info("adding new mirror component to md")
-      if not rpc.call_blockdev_addchildren(instance.primary_node, dev,
-                                           [new_drbd]):
-        logger.Error("Can't add mirror compoment to md!")
-        cfg.SetDiskID(new_drbd, remote_node)
-        if not rpc.call_blockdev_remove(remote_node, new_drbd):
-          logger.Error("Can't rollback on secondary")
-        cfg.SetDiskID(new_drbd, instance.primary_node)
-        if not rpc.call_blockdev_remove(instance.primary_node, new_drbd):
-          logger.Error("Can't rollback on primary")
-        raise errors.OpExecError("Full abort, cleanup manually!!")
-
-      dev.children.append(new_drbd)
-      cfg.AddInstance(instance)
-
-    # this can fail as the old devices are degraded and _WaitForSync
-    # does a combined result over all disks, so we don't check its
-    # return value
-    _WaitForSync(cfg, instance, self.proc, unlock=True)
-
-    # so check manually all the devices
-    for name in iv_names:
-      dev, child, new_drbd = iv_names[name]
-      cfg.SetDiskID(dev, instance.primary_node)
-      is_degr = rpc.call_blockdev_find(instance.primary_node, dev)[5]
-      if is_degr:
-        raise errors.OpExecError("MD device %s is degraded!" % name)
-      cfg.SetDiskID(new_drbd, instance.primary_node)
-      is_degr = rpc.call_blockdev_find(instance.primary_node, new_drbd)[5]
-      if is_degr:
-        raise errors.OpExecError("New drbd device %s is degraded!" % name)
-
-    for name in iv_names:
-      dev, child, new_drbd = iv_names[name]
-      logger.Info("remove mirror %s component" % name)
-      cfg.SetDiskID(dev, instance.primary_node)
-      if not rpc.call_blockdev_removechildren(instance.primary_node,
-                                              dev, [child]):
-        logger.Error("Can't remove child from mirror, aborting"
-                     " *this device cleanup*.\nYou need to cleanup manually!!")
-        continue
-
-      for node in child.logical_id[:2]:
-        logger.Info("remove child device on %s" % node)
-        cfg.SetDiskID(child, node)
-        if not rpc.call_blockdev_remove(node, child):
-          logger.Error("Warning: failed to remove device from node %s,"
-                       " continuing operation." % node)
-
-      dev.children.remove(child)
-
-      cfg.AddInstance(instance)
 
   def _ExecD8DiskOnly(self, feedback_fn):
     """Replace a disk on the primary or secondary for dbrd8.
@@ -4149,45 +4099,163 @@ class LUReplaceDisks(LogicalUnit):
 
     """
     instance = self.instance
-    if instance.disk_template == constants.DT_REMOTE_RAID1:
-      fn = self._ExecRR1
-    elif instance.disk_template == constants.DT_DRBD8:
+
+    # Activate the instance disks if we're replacing them on a down instance
+    if instance.status == "down":
+      _StartInstanceDisks(self.cfg, instance, True)
+
+    if instance.disk_template == constants.DT_DRBD8:
       if self.op.remote_node is None:
         fn = self._ExecD8DiskOnly
       else:
         fn = self._ExecD8Secondary
     else:
       raise errors.ProgrammerError("Unhandled disk replacement case")
-    return fn(feedback_fn)
 
+    ret = fn(feedback_fn)
 
-class LUQueryInstanceData(NoHooksLU):
-  """Query runtime instance data.
+    # Deactivate the instance disks if we're replacing them on a down instance
+    if instance.status == "down":
+      _SafeShutdownInstanceDisks(instance, self.cfg)
+
+    return ret
+
+
+class LUGrowDisk(LogicalUnit):
+  """Grow a disk of an instance.
 
   """
-  _OP_REQP = ["instances"]
+  HPATH = "disk-grow"
+  HTYPE = constants.HTYPE_INSTANCE
+  _OP_REQP = ["instance_name", "disk", "amount"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on the master, the primary and all the secondaries.
+
+    """
+    env = {
+      "DISK": self.op.disk,
+      "AMOUNT": self.op.amount,
+      }
+    env.update(_BuildInstanceHookEnvByObject(self.instance))
+    nl = [
+      self.sstore.GetMasterNode(),
+      self.instance.primary_node,
+      ]
+    return env, nl, nl
 
   def CheckPrereq(self):
     """Check prerequisites.
 
-    This only checks the optional instance list against the existing names.
+    This checks that the instance is in the cluster.
 
     """
+    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
+
+    self.instance = instance
+
+    if instance.disk_template not in (constants.DT_PLAIN, constants.DT_DRBD8):
+      raise errors.OpPrereqError("Instance's disk layout does not support"
+                                 " growing.")
+
+    if instance.FindDisk(self.op.disk) is None:
+      raise errors.OpPrereqError("Disk '%s' not found for instance '%s'" %
+                                 (self.op.disk, instance.name))
+
+    nodenames = [instance.primary_node] + list(instance.secondary_nodes)
+    nodeinfo = rpc.call_node_info(nodenames, self.cfg.GetVGName())
+    for node in nodenames:
+      info = nodeinfo.get(node, None)
+      if not info:
+        raise errors.OpPrereqError("Cannot get current information"
+                                   " from node '%s'" % node)
+      vg_free = info.get('vg_free', None)
+      if not isinstance(vg_free, int):
+        raise errors.OpPrereqError("Can't compute free disk space on"
+                                   " node %s" % node)
+      if self.op.amount > info['vg_free']:
+        raise errors.OpPrereqError("Not enough disk space on target node %s:"
+                                   " %d MiB available, %d MiB required" %
+                                   (node, info['vg_free'], self.op.amount))
+
+  def Exec(self, feedback_fn):
+    """Execute disk grow.
+
+    """
+    instance = self.instance
+    disk = instance.FindDisk(self.op.disk)
+    for node in (instance.secondary_nodes + (instance.primary_node,)):
+      self.cfg.SetDiskID(disk, node)
+      result = rpc.call_blockdev_grow(node, disk, self.op.amount)
+      if not result or not isinstance(result, (list, tuple)) or len(result) != 2:
+        raise errors.OpExecError("grow request failed to node %s" % node)
+      elif not result[0]:
+        raise errors.OpExecError("grow request failed to node %s: %s" %
+                                 (node, result[1]))
+    disk.RecordGrow(self.op.amount)
+    self.cfg.Update(instance)
+    return
+
+
+class LUQueryInstanceData(NoHooksLU):
+  """Query runtime instance data.
+
+  """
+  _OP_REQP = ["instances"]
+  REQ_BGL = False
+  def ExpandNames(self):
+    self.needed_locks = {}
+    self.share_locks = dict(((i, 1) for i in locking.LEVELS))
+
     if not isinstance(self.op.instances, list):
       raise errors.OpPrereqError("Invalid argument type 'instances'")
+
     if self.op.instances:
-      self.wanted_instances = []
-      names = self.op.instances
-      for name in names:
-        instance = self.cfg.GetInstanceInfo(self.cfg.ExpandInstanceName(name))
-        if instance is None:
-          raise errors.OpPrereqError("No such instance name '%s'" % name)
-        self.wanted_instances.append(instance)
+      self.wanted_names = []
+      for name in self.op.instances:
+        full_name = self.cfg.ExpandInstanceName(name)
+        if full_name is None:
+          raise errors.OpPrereqError("Instance '%s' not known" %
+                                     self.op.instance_name)
+        self.wanted_names.append(full_name)
+      self.needed_locks[locking.LEVEL_INSTANCE] = self.wanted_names
     else:
-      self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
-                               in self.cfg.GetInstanceList()]
-    return
+      self.wanted_names = None
+      self.needed_locks[locking.LEVEL_INSTANCE] = locking.ALL_SET
+
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This only checks the optional instance list against the existing names.
+
+    """
+    if self.wanted_names is None:
+      self.wanted_names = self.acquired_locks[locking.LEVEL_INSTANCE]
+
+    self.wanted_instances = [self.cfg.GetInstanceInfo(name) for name
+                             in self.wanted_names]
+    return
 
   def _ComputeDiskStatus(self, instance, snode, dev):
     """Compute block device status.
@@ -4254,13 +4322,43 @@ class LUQueryInstanceData(NoHooksLU):
         "memory": instance.memory,
         "nics": [(nic.mac, nic.ip, nic.bridge) for nic in instance.nics],
         "disks": disks,
-        "network_port": instance.network_port,
         "vcpus": instance.vcpus,
-        "kernel_path": instance.kernel_path,
-        "initrd_path": instance.initrd_path,
-        "hvm_boot_order": instance.hvm_boot_order,
         }
 
+      htkind = self.sstore.GetHypervisorType()
+      if htkind == constants.HT_XEN_PVM30:
+        idict["kernel_path"] = instance.kernel_path
+        idict["initrd_path"] = instance.initrd_path
+
+      if htkind == constants.HT_XEN_HVM31:
+        idict["hvm_boot_order"] = instance.hvm_boot_order
+        idict["hvm_acpi"] = instance.hvm_acpi
+        idict["hvm_pae"] = instance.hvm_pae
+        idict["hvm_cdrom_image_path"] = instance.hvm_cdrom_image_path
+        idict["hvm_nic_type"] = instance.hvm_nic_type
+        idict["hvm_disk_type"] = instance.hvm_disk_type
+
+      if htkind in constants.HTS_REQ_PORT:
+        if instance.vnc_bind_address is None:
+          vnc_bind_address = constants.VNC_DEFAULT_BIND_ADDRESS
+        else:
+          vnc_bind_address = instance.vnc_bind_address
+        if instance.network_port is None:
+          vnc_console_port = None
+        elif vnc_bind_address == constants.BIND_ADDRESS_GLOBAL:
+          vnc_console_port = "%s:%s" % (instance.primary_node,
+                                       instance.network_port)
+        elif vnc_bind_address == constants.LOCALHOST_IP_ADDRESS:
+          vnc_console_port = "%s:%s on node %s" % (vnc_bind_address,
+                                                   instance.network_port,
+                                                   instance.primary_node)
+        else:
+          vnc_console_port = "%s:%s" % (instance.vnc_bind_address,
+                                        instance.network_port)
+        idict["vnc_console_port"] = vnc_console_port
+        idict["vnc_bind_address"] = vnc_bind_address
+        idict["network_port"] = instance.network_port
+
       result[instance.name] = idict
 
     return result
@@ -4273,6 +4371,10 @@ class LUSetInstanceParams(LogicalUnit):
   HPATH = "instance-modify"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -4310,6 +4412,9 @@ class LUSetInstanceParams(LogicalUnit):
     This only checks the instance list against the existing names.
 
     """
+    # FIXME: all the parameters could be checked before, in ExpandNames, or in
+    # a separate CheckArguments function, if we implement one, so the operation
+    # can be aborted without waiting for any lock, should it have an error...
     self.mem = getattr(self.op, "mem", None)
     self.vcpus = getattr(self.op, "vcpus", None)
     self.ip = getattr(self.op, "ip", None)
@@ -4318,9 +4423,18 @@ class LUSetInstanceParams(LogicalUnit):
     self.kernel_path = getattr(self.op, "kernel_path", None)
     self.initrd_path = getattr(self.op, "initrd_path", None)
     self.hvm_boot_order = getattr(self.op, "hvm_boot_order", None)
-    all_params = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
-                  self.kernel_path, self.initrd_path, self.hvm_boot_order]
-    if all_params.count(None) == len(all_params):
+    self.hvm_acpi = getattr(self.op, "hvm_acpi", None)
+    self.hvm_pae = getattr(self.op, "hvm_pae", None)
+    self.hvm_nic_type = getattr(self.op, "hvm_nic_type", None)
+    self.hvm_disk_type = getattr(self.op, "hvm_disk_type", None)
+    self.hvm_cdrom_image_path = getattr(self.op, "hvm_cdrom_image_path", None)
+    self.vnc_bind_address = getattr(self.op, "vnc_bind_address", None)
+    self.force = getattr(self.op, "force", None)
+    all_parms = [self.mem, self.vcpus, self.ip, self.bridge, self.mac,
+                 self.kernel_path, self.initrd_path, self.hvm_boot_order,
+                 self.hvm_acpi, self.hvm_pae, self.hvm_cdrom_image_path,
+                 self.vnc_bind_address, self.hvm_nic_type, self.hvm_disk_type]
+    if all_parms.count(None) == len(all_parms):
       raise errors.OpPrereqError("No changes submitted")
     if self.mem is not None:
       try:
@@ -4379,13 +4493,73 @@ class LUSetInstanceParams(LogicalUnit):
                                      " must be one or more of [acdn]"
                                      " or 'default'")
 
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("No such instance name '%s'" %
-                                 self.op.instance_name)
-    self.op.instance_name = instance.name
-    self.instance = instance
+    # hvm_cdrom_image_path verification
+    if self.op.hvm_cdrom_image_path is not None:
+      if not (os.path.isabs(self.op.hvm_cdrom_image_path) or
+              self.op.hvm_cdrom_image_path.lower() == "none"):
+        raise errors.OpPrereqError("The path to the HVM CDROM image must"
+                                   " be an absolute path or None, not %s" %
+                                   self.op.hvm_cdrom_image_path)
+      if not (os.path.isfile(self.op.hvm_cdrom_image_path) or
+              self.op.hvm_cdrom_image_path.lower() == "none"):
+        raise errors.OpPrereqError("The HVM CDROM image must either be a"
+                                   " regular file or a symlink pointing to"
+                                   " an existing regular file, not %s" %
+                                   self.op.hvm_cdrom_image_path)
+
+    # vnc_bind_address verification
+    if self.op.vnc_bind_address is not None:
+      if not utils.IsValidIP(self.op.vnc_bind_address):
+        raise errors.OpPrereqError("given VNC bind address '%s' doesn't look"
+                                   " like a valid IP address" %
+                                   self.op.vnc_bind_address)
+
+    instance = self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
+    self.warn = []
+    if self.mem is not None and not self.force:
+      pnode = self.instance.primary_node
+      nodelist = [pnode]
+      nodelist.extend(instance.secondary_nodes)
+      instance_info = rpc.call_instance_info(pnode, instance.name)
+      nodeinfo = rpc.call_node_info(nodelist, self.cfg.GetVGName())
+
+      if pnode not in nodeinfo or not isinstance(nodeinfo[pnode], dict):
+        # Assume the primary node is unreachable and go ahead
+        self.warn.append("Can't get info from primary node %s" % pnode)
+      else:
+        if instance_info:
+          current_mem = instance_info['memory']
+        else:
+          # Assume instance not running
+          # (there is a slight race condition here, but it's not very probable,
+          # and we have no other way to check)
+          current_mem = 0
+        miss_mem = self.mem - current_mem - nodeinfo[pnode]['memory_free']
+        if miss_mem > 0:
+          raise errors.OpPrereqError("This change will prevent the instance"
+                                     " from starting, due to %d MB of memory"
+                                     " missing on its primary node" % miss_mem)
+
+      for node in instance.secondary_nodes:
+        if node not in nodeinfo or not isinstance(nodeinfo[node], dict):
+          self.warn.append("Can't get info from secondary node %s" % node)
+        elif self.mem > nodeinfo[node]['memory_free']:
+          self.warn.append("Not enough memory to failover instance to secondary"
+                           " node %s" % node)
+
+    # Xen HVM device type checks
+    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
+      if self.op.hvm_nic_type is not None:
+        if self.op.hvm_nic_type not in constants.HT_HVM_VALID_NIC_TYPES:
+          raise errors.OpPrereqError("Invalid NIC type %s specified for Xen"
+                                     " HVM  hypervisor" % self.op.hvm_nic_type)
+      if self.op.hvm_disk_type is not None:
+        if self.op.hvm_disk_type not in constants.HT_HVM_VALID_DISK_TYPES:
+          raise errors.OpPrereqError("Invalid disk type %s specified for Xen"
+                                     " HVM hypervisor" % self.op.hvm_disk_type)
+
     return
 
   def Exec(self, feedback_fn):
@@ -4393,6 +4567,11 @@ class LUSetInstanceParams(LogicalUnit):
 
     All parameters take effect only at the next restart of the instance.
     """
+    # Process here the warnings from CheckPrereq, as we don't have a
+    # feedback_fn there.
+    for warn in self.warn:
+      feedback_fn("WARNING: %s" % warn)
+
     result = []
     instance = self.instance
     if self.mem:
@@ -4422,8 +4601,29 @@ class LUSetInstanceParams(LogicalUnit):
       else:
         instance.hvm_boot_order = self.hvm_boot_order
       result.append(("hvm_boot_order", self.hvm_boot_order))
+    if self.hvm_acpi is not None:
+      instance.hvm_acpi = self.hvm_acpi
+      result.append(("hvm_acpi", self.hvm_acpi))
+    if self.hvm_pae is not None:
+      instance.hvm_pae = self.hvm_pae
+      result.append(("hvm_pae", self.hvm_pae))
+    if self.hvm_nic_type is not None:
+      instance.hvm_nic_type = self.hvm_nic_type
+      result.append(("hvm_nic_type", self.hvm_nic_type))
+    if self.hvm_disk_type is not None:
+      instance.hvm_disk_type = self.hvm_disk_type
+      result.append(("hvm_disk_type", self.hvm_disk_type))
+    if self.hvm_cdrom_image_path:
+      if self.hvm_cdrom_image_path == constants.VALUE_NONE:
+        instance.hvm_cdrom_image_path = None
+      else:
+        instance.hvm_cdrom_image_path = self.hvm_cdrom_image_path
+      result.append(("hvm_cdrom_image_path", self.hvm_cdrom_image_path))
+    if self.vnc_bind_address:
+      instance.vnc_bind_address = self.vnc_bind_address
+      result.append(("vnc_bind_address", self.vnc_bind_address))
 
-    self.cfg.AddInstance(instance)
+    self.cfg.Update(instance)
 
     return result
 
@@ -4432,13 +4632,23 @@ class LUQueryExports(NoHooksLU):
   """Query the exports list
 
   """
-  _OP_REQP = []
+  _OP_REQP = ['nodes']
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {}
+    self.share_locks[locking.LEVEL_NODE] = 1
+    if not self.op.nodes:
+      self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+    else:
+      self.needed_locks[locking.LEVEL_NODE] = \
+        _GetWantedNodes(self, self.op.nodes)
 
   def CheckPrereq(self):
-    """Check that the nodelist contains only existing nodes.
+    """Check prerequisites.
 
     """
-    self.nodes = _GetWantedNodes(self, getattr(self.op, "nodes", None))
+    self.nodes = self.acquired_locks[locking.LEVEL_NODE]
 
   def Exec(self, feedback_fn):
     """Compute the list of all the exported system images.
@@ -4459,6 +4669,23 @@ class LUExportInstance(LogicalUnit):
   HPATH = "instance-export"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name", "target_node", "shutdown"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    # FIXME: lock only instance primary and destination node
+    #
+    # Sad but true, for now we have do lock all nodes, as we don't know where
+    # the previous export might be, and and in this LU we search for it and
+    # remove it from its current node. In the future we could fix this by:
+    #  - making a tasklet to search (share-lock all), then create the new one,
+    #    then one to remove, after
+    #  - removing the removal operation altoghether
+    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+
+  def DeclareLocks(self, level):
+    """Last minute lock declaration."""
+    # All nodes are locked anyway, so nothing to do here.
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -4481,20 +4708,16 @@ class LUExportInstance(LogicalUnit):
     This checks that the instance and node names are valid.
 
     """
-    instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
+    instance_name = self.op.instance_name
     self.instance = self.cfg.GetInstanceInfo(instance_name)
-    if self.instance is None:
-      raise errors.OpPrereqError("Instance '%s' not found" %
-                                 self.op.instance_name)
+    assert self.instance is not None, \
+          "Cannot retrieve locked instance %s" % self.op.instance_name
 
-    # node verification
-    dst_node_short = self.cfg.ExpandNodeName(self.op.target_node)
-    self.dst_node = self.cfg.GetNodeInfo(dst_node_short)
+    self.dst_node = self.cfg.GetNodeInfo(
+      self.cfg.ExpandNodeName(self.op.target_node))
 
-    if self.dst_node is None:
-      raise errors.OpPrereqError("Destination node '%s' is unknown." %
-                                 self.op.target_node)
-    self.op.target_node = self.dst_node.name
+    assert self.dst_node is not None, \
+          "Cannot retrieve locked node %s" % self.op.target_node
 
     # instance disk type verification
     for disk in self.instance.disks:
@@ -4512,8 +4735,8 @@ class LUExportInstance(LogicalUnit):
     if self.op.shutdown:
       # shutdown the instance, but not the disks
       if not rpc.call_instance_shutdown(src_node, instance):
-         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
-                                  (instance.name, src_node))
+        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
+                                 (instance.name, src_node))
 
     vgname = self.cfg.GetVGName()
 
@@ -4562,8 +4785,7 @@ class LUExportInstance(LogicalUnit):
     # if we proceed the backup would be removed because OpQueryExports
     # substitutes an empty list with the full cluster node list.
     if nodelist:
-      op = opcodes.OpQueryExports(nodes=nodelist)
-      exportlist = self.proc.ChainOpCode(op)
+      exportlist = rpc.call_export_list(nodelist)
       for node in exportlist:
         if instance.name in exportlist[node]:
           if not rpc.call_export_remove(node, instance.name):
@@ -4576,6 +4798,14 @@ class LURemoveExport(NoHooksLU):
 
   """
   _OP_REQP = ["instance_name"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {}
+    # We need all nodes to be locked in order for RemoveExport to work, but we
+    # don't need to lock the instance itself, as nothing will happen to it (and
+    # we can remove exports also for a removed instance)
+    self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4594,8 +4824,7 @@ class LURemoveExport(NoHooksLU):
       fqdn_warn = True
       instance_name = self.op.instance_name
 
-    op = opcodes.OpQueryExports(nodes=[])
-    exportlist = self.proc.ChainOpCode(op)
+    exportlist = rpc.call_export_list(self.acquired_locks[locking.LEVEL_NODE])
     found = False
     for node in exportlist:
       if instance_name in exportlist[node]:
@@ -4616,26 +4845,34 @@ class TagsLU(NoHooksLU):
   This is an abstract class which is the parent of all the other tags LUs.
 
   """
-  def CheckPrereq(self):
-    """Check prerequisites.
 
-    """
-    if self.op.kind == constants.TAG_CLUSTER:
-      self.target = self.cfg.GetClusterInfo()
-    elif self.op.kind == constants.TAG_NODE:
+  def ExpandNames(self):
+    self.needed_locks = {}
+    if self.op.kind == constants.TAG_NODE:
       name = self.cfg.ExpandNodeName(self.op.name)
       if name is None:
         raise errors.OpPrereqError("Invalid node name (%s)" %
                                    (self.op.name,))
       self.op.name = name
-      self.target = self.cfg.GetNodeInfo(name)
+      self.needed_locks[locking.LEVEL_NODE] = name
     elif self.op.kind == constants.TAG_INSTANCE:
       name = self.cfg.ExpandInstanceName(self.op.name)
       if name is None:
         raise errors.OpPrereqError("Invalid instance name (%s)" %
                                    (self.op.name,))
       self.op.name = name
-      self.target = self.cfg.GetInstanceInfo(name)
+      self.needed_locks[locking.LEVEL_INSTANCE] = name
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
+    if self.op.kind == constants.TAG_CLUSTER:
+      self.target = self.cfg.GetClusterInfo()
+    elif self.op.kind == constants.TAG_NODE:
+      self.target = self.cfg.GetNodeInfo(self.op.name)
+    elif self.op.kind == constants.TAG_INSTANCE:
+      self.target = self.cfg.GetInstanceInfo(self.op.name)
     else:
       raise errors.OpPrereqError("Wrong tag type requested (%s)" %
                                  str(self.op.kind))
@@ -4646,12 +4883,13 @@ class LUGetTags(TagsLU):
 
   """
   _OP_REQP = ["kind", "name"]
+  REQ_BGL = False
 
   def Exec(self, feedback_fn):
     """Returns the tag list.
 
     """
-    return self.target.GetTags()
+    return list(self.target.GetTags())
 
 
 class LUSearchTags(NoHooksLU):
@@ -4659,6 +4897,10 @@ class LUSearchTags(NoHooksLU):
 
   """
   _OP_REQP = ["pattern"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {}
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4678,9 +4920,9 @@ class LUSearchTags(NoHooksLU):
     """
     cfg = self.cfg
     tgts = [("/cluster", cfg.GetClusterInfo())]
-    ilist = [cfg.GetInstanceInfo(name) for name in cfg.GetInstanceList()]
+    ilist = cfg.GetAllInstancesInfo().values()
     tgts.extend([("/instances/%s" % i.name, i) for i in ilist])
-    nlist = [cfg.GetNodeInfo(name) for name in cfg.GetNodeList()]
+    nlist = cfg.GetAllNodesInfo().values()
     tgts.extend([("/nodes/%s" % n.name, n) for n in nlist])
     results = []
     for path, target in tgts:
@@ -4695,6 +4937,7 @@ class LUAddTags(TagsLU):
 
   """
   _OP_REQP = ["kind", "name", "tags"]
+  REQ_BGL = False
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4728,6 +4971,7 @@ class LUDelTags(TagsLU):
 
   """
   _OP_REQP = ["kind", "name", "tags"]
+  REQ_BGL = False
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -4760,25 +5004,35 @@ class LUDelTags(TagsLU):
                                 " config file and the operation has been"
                                 " aborted. Please retry.")
 
+
 class LUTestDelay(NoHooksLU):
   """Sleep for a specified amount of time.
 
-  This LU sleeps on the master and/or nodes for a specified amoutn of
+  This LU sleeps on the master and/or nodes for a specified amount of
   time.
 
   """
   _OP_REQP = ["duration", "on_master", "on_nodes"]
+  REQ_BGL = False
 
-  def CheckPrereq(self):
-    """Check prerequisites.
+  def ExpandNames(self):
+    """Expand names and set required locks.
 
-    This checks that we have a good list of nodes and/or the duration
-    is valid.
+    This expands the node list, if any.
 
     """
-
+    self.needed_locks = {}
     if self.op.on_nodes:
+      # _GetWantedNodes can be used here, but is not always appropriate to use
+      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
+      # more information.
       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
+      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
 
   def Exec(self, feedback_fn):
     """Do the actual sleep.
@@ -4879,7 +5133,7 @@ class IAllocator(object):
         raise errors.OpExecError("Can't get data for node %s" % nname)
       remote_info = node_data[nname]
       for attr in ['memory_total', 'memory_free', 'memory_dom0',
-                   'vg_size', 'vg_free']:
+                   'vg_size', 'vg_free', 'cpu_total']:
         if attr not in remote_info:
           raise errors.OpExecError("Node '%s' didn't return attribute '%s'" %
                                    (nname, attr))
@@ -4908,6 +5162,7 @@ class IAllocator(object):
         "free_disk": remote_info['vg_free'],
         "primary_ip": ninfo.primary_ip,
         "secondary_ip": ninfo.secondary_ip,
+        "total_cpus": remote_info['cpu_total'],
         }
       node_results[nname] = pnr
     data["nodes"] = node_results
@@ -5027,7 +5282,7 @@ class IAllocator(object):
 
     result = call_fn(self.sstore.GetMasterNode(), name, self.in_text)
 
-    if not isinstance(result, tuple) or len(result) != 4:
+    if not isinstance(result, (list, tuple)) or len(result) != 4:
       raise errors.OpExecError("Invalid result from master iallocator runner")
 
     rcode, stdout, stderr, fail = result
@@ -5035,9 +5290,8 @@ class IAllocator(object):
     if rcode == constants.IARUN_NOTFOUND:
       raise errors.OpExecError("Can't find allocator '%s'" % name)
     elif rcode == constants.IARUN_FAILURE:
-        raise errors.OpExecError("Instance allocator call failed: %s,"
-                                 " output: %s" %
-                                 (fail, stdout+stderr))
+      raise errors.OpExecError("Instance allocator call failed: %s,"
+                               " output: %s" % (fail, stdout+stderr))
     self.out_text = stdout
     if validate:
       self._ValidateResult()