Xen: remove two end-of-line semicolons
[ganeti-local] / lib / cmdlib.py
index cf028bc..dee811d 100644 (file)
@@ -37,11 +37,10 @@ from ganeti import logger
 from ganeti import utils
 from ganeti import errors
 from ganeti import hypervisor
-from ganeti import config
+from ganeti import locking
 from ganeti import constants
 from ganeti import objects
 from ganeti import opcodes
-from ganeti import ssconf
 from ganeti import serializer
 
 
@@ -49,14 +48,15 @@ class LogicalUnit(object):
   """Logical Unit base class.
 
   Subclasses must follow these rules:
-    - implement CheckPrereq which also fills in the opcode instance
-      with all the fields (even if as None)
+    - implement ExpandNames
+    - implement CheckPrereq
     - implement Exec
     - implement BuildHooksEnv
     - redefine HPATH and HTYPE
     - optionally redefine their run requirements:
         REQ_MASTER: the LU needs to run on the master node
         REQ_WSSTORE: the LU needs a writable SimpleStore
+        REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
 
   Note that all commands require root permissions.
 
@@ -66,8 +66,9 @@ class LogicalUnit(object):
   _OP_REQP = []
   REQ_MASTER = True
   REQ_WSSTORE = False
+  REQ_BGL = True
 
-  def __init__(self, processor, op, cfg, sstore):
+  def __init__(self, processor, op, context, sstore):
     """Constructor for LogicalUnit.
 
     This needs to be overriden in derived classes in order to check op
@@ -76,8 +77,13 @@ class LogicalUnit(object):
     """
     self.proc = processor
     self.op = op
-    self.cfg = cfg
+    self.cfg = context.cfg
     self.sstore = sstore
+    self.context = context
+    self.needed_locks = None
+    self.share_locks = dict(((i, 0) for i in locking.LEVELS))
+    # Used to force good behavior when calling helper functions
+    self.recalculate_locks = {}
     self.__ssh = None
 
     for attr_name in self._OP_REQP:
@@ -86,7 +92,7 @@ class LogicalUnit(object):
         raise errors.OpPrereqError("Required parameter '%s' missing" %
                                    attr_name)
 
-    if not cfg.IsCluster():
+    if not self.cfg.IsCluster():
       raise errors.OpPrereqError("Cluster not initialized yet,"
                                  " use 'gnt-cluster init' first.")
     if self.REQ_MASTER:
@@ -105,6 +111,68 @@ class LogicalUnit(object):
 
   ssh = property(fget=__GetSSH)
 
+  def ExpandNames(self):
+    """Expand names for this LU.
+
+    This method is called before starting to execute the opcode, and it should
+    update all the parameters of the opcode to their canonical form (e.g. a
+    short node name must be fully expanded after this method has successfully
+    completed). This way locking, hooks, logging, ecc. can work correctly.
+
+    LUs which implement this method must also populate the self.needed_locks
+    member, as a dict with lock levels as keys, and a list of needed lock names
+    as values. Rules:
+      - Use an empty dict if you don't need any lock
+      - If you don't need any lock at a particular level omit that level
+      - Don't put anything for the BGL level
+      - If you want all locks at a level use None as a value
+        (this reflects what LockSet does, and will be replaced before
+        CheckPrereq with the full list of nodes that have been locked)
+
+    If you need to share locks (rather than acquire them exclusively) at one
+    level you can modify self.share_locks, setting a true value (usually 1) for
+    that level. By default locks are not shared.
+
+    Examples:
+    # Acquire all nodes and one instance
+    self.needed_locks = {
+      locking.LEVEL_NODE: None,
+      locking.LEVEL_INSTANCES: ['instance1.example.tld'],
+    }
+    # Acquire just two nodes
+    self.needed_locks = {
+      locking.LEVEL_NODE: ['node1.example.tld', 'node2.example.tld'],
+    }
+    # Acquire no locks
+    self.needed_locks = {} # No, you can't leave it to the default value None
+
+    """
+    # The implementation of this method is mandatory only if the new LU is
+    # concurrent, so that old LUs don't need to be changed all at the same
+    # time.
+    if self.REQ_BGL:
+      self.needed_locks = {} # Exclusive LUs don't need locks.
+    else:
+      raise NotImplementedError
+
+  def DeclareLocks(self, level):
+    """Declare LU locking needs for a level
+
+    While most LUs can just declare their locking needs at ExpandNames time,
+    sometimes there's the need to calculate some locks after having acquired
+    the ones before. This function is called just before acquiring locks at a
+    particular level, but after acquiring the ones at lower levels, and permits
+    such calculations. It can be used to modify self.needed_locks, and by
+    default it does nothing.
+
+    This function is only called if you have something already set in
+    self.needed_locks for the level.
+
+    @param level: Locking level which is going to be locked
+    @type level: member of ganeti.locking.LEVELS
+
+    """
+
   def CheckPrereq(self):
     """Check prerequisites for this LU.
 
@@ -117,9 +185,7 @@ class LogicalUnit(object):
     not fulfilled. Its return value is ignored.
 
     This method should also update all the parameters of the opcode to
-    their canonical form; e.g. a short node name must be fully
-    expanded after this method has successfully completed (so that
-    hooks, logging, etc. work correctly).
+    their canonical form if it hasn't been done by ExpandNames before.
 
     """
     raise NotImplementedError
@@ -174,6 +240,65 @@ class LogicalUnit(object):
     """
     return lu_result
 
+  def _ExpandAndLockInstance(self):
+    """Helper function to expand and lock an instance.
+
+    Many LUs that work on an instance take its name in self.op.instance_name
+    and need to expand it and then declare the expanded name for locking. This
+    function does it, and then updates self.op.instance_name to the expanded
+    name. It also initializes needed_locks as a dict, if this hasn't been done
+    before.
+
+    """
+    if self.needed_locks is None:
+      self.needed_locks = {}
+    else:
+      assert locking.LEVEL_INSTANCE not in self.needed_locks, \
+        "_ExpandAndLockInstance called with instance-level locks set"
+    expanded_name = self.cfg.ExpandInstanceName(self.op.instance_name)
+    if expanded_name is None:
+      raise errors.OpPrereqError("Instance '%s' not known" %
+                                  self.op.instance_name)
+    self.needed_locks[locking.LEVEL_INSTANCE] = expanded_name
+    self.op.instance_name = expanded_name
+
+  def _LockInstancesNodes(self):
+    """Helper function to declare instances' nodes for locking.
+
+    This function should be called after locking one or more instances to lock
+    their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
+    with all primary or secondary nodes for instances already locked and
+    present in self.needed_locks[locking.LEVEL_INSTANCE].
+
+    It should be called from DeclareLocks, and for safety only works if
+    self.recalculate_locks[locking.LEVEL_NODE] is set.
+
+    In the future it may grow parameters to just lock some instance's nodes, or
+    to just lock primaries or secondary nodes, if needed.
+
+    If should be called in DeclareLocks in a way similar to:
+
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
+
+    """
+    assert locking.LEVEL_NODE in self.recalculate_locks, \
+      "_LockInstancesNodes helper function called with no nodes to recalculate"
+
+    # TODO: check if we're really been called with the instance locks held
+
+    # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
+    # future we might want to have different behaviors depending on the value
+    # of self.recalculate_locks[locking.LEVEL_NODE]
+    wanted_nodes = []
+    for instance_name in self.needed_locks[locking.LEVEL_INSTANCE]:
+      instance = self.context.cfg.GetInstanceInfo(instance_name)
+      wanted_nodes.append(instance.primary_node)
+      wanted_nodes.extend(instance.secondary_nodes)
+    self.needed_locks[locking.LEVEL_NODE] = wanted_nodes
+
+    del self.recalculate_locks[locking.LEVEL_NODE]
+
 
 class NoHooksLU(LogicalUnit):
   """Simple LU which runs no hooks.
@@ -351,7 +476,7 @@ class LUDestroyCluster(NoHooksLU):
 
     """
     master = self.sstore.GetMasterNode()
-    if not rpc.call_node_stop_master(master):
+    if not rpc.call_node_stop_master(master, False):
       raise errors.OpExecError("Could not disable the master role")
     priv_key, pub_key, _ = ssh.GetUserFiles(constants.GANETI_RUNAS)
     utils.CreateBackup(priv_key)
@@ -743,7 +868,8 @@ class LUVerifyCluster(LogicalUnit):
       lu_result: previous Exec result
 
     """
-    # We only really run POST phase hooks, and are only interested in their results
+    # We only really run POST phase hooks, and are only interested in
+    # their results
     if phase == constants.HOOKS_PHASE_POST:
       # Used to change hooks' output to proper indentation
       indent_re = re.compile('^', re.M)
@@ -880,8 +1006,7 @@ class LURenameCluster(LogicalUnit):
       raise errors.OpPrereqError("Neither the name nor the IP address of the"
                                  " cluster has changed")
     if new_ip != old_ip:
-      result = utils.RunCmd(["fping", "-q", new_ip])
-      if not result.failed:
+      if utils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
         raise errors.OpPrereqError("The given cluster IP address (%s) is"
                                    " reachable on the network. Aborting." %
                                    new_ip)
@@ -898,7 +1023,7 @@ class LURenameCluster(LogicalUnit):
 
     # shutdown the master IP
     master = ss.GetMasterNode()
-    if not rpc.call_node_stop_master(master):
+    if not rpc.call_node_stop_master(master, False):
       raise errors.OpExecError("Could not disable the master role")
 
     try:
@@ -921,7 +1046,7 @@ class LURenameCluster(LogicalUnit):
             logger.Error("copy of file %s to node %s failed" %
                          (fname, to_node))
     finally:
-      if not rpc.call_node_start_master(master):
+      if not rpc.call_node_start_master(master, False):
         logger.Error("Could not re-enable the master role on the master,"
                      " please restart manually.")
 
@@ -1051,15 +1176,7 @@ def _WaitForSync(cfgw, instance, proc, oneshot=False, unlock=False):
     if done or oneshot:
       break
 
-    if unlock:
-      #utils.Unlock('cmd')
-      pass
-    try:
-      time.sleep(min(60, max_time))
-    finally:
-      if unlock:
-        #utils.Lock('cmd')
-        pass
+    time.sleep(min(60, max_time))
 
   if done:
     proc.LogInfo("Instance %s's disks are in sync." % instance.name)
@@ -1187,7 +1304,7 @@ class LURemoveNode(LogicalUnit):
     """Build hooks env.
 
     This doesn't run on the target node in the pre phase as a failed
-    node would not allows itself to run.
+    node would then be impossible to remove.
 
     """
     env = {
@@ -1241,11 +1358,11 @@ class LURemoveNode(LogicalUnit):
 
     rpc.call_node_leave_cluster(node.name)
 
-    self.ssh.Run(node.name, 'root', "%s stop" % constants.NODE_INITD_SCRIPT)
-
     logger.Info("Removing node %s from config" % node.name)
 
     self.cfg.RemoveNode(node.name)
+    # Remove the node from the Ganeti Lock Manager
+    self.context.glm.remove(locking.LEVEL_NODE, node.name)
 
     utils.RemoveHostFromEtcHosts(node.name)
 
@@ -1525,11 +1642,6 @@ class LUAddNode(LogicalUnit):
                                  primary_ip=primary_ip,
                                  secondary_ip=secondary_ip)
 
-    if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
-      if not os.path.exists(constants.VNC_PASSWORD_FILE):
-        raise errors.OpPrereqError("Cluster VNC password file %s missing" %
-                                   constants.VNC_PASSWORD_FILE)
-
   def Exec(self, feedback_fn):
     """Adds the new node to the cluster.
 
@@ -1537,46 +1649,7 @@ class LUAddNode(LogicalUnit):
     new_node = self.new_node
     node = new_node.name
 
-    # set up inter-node password and certificate and restarts the node daemon
-    gntpass = self.sstore.GetNodeDaemonPassword()
-    if not re.match('^[a-zA-Z0-9.]{1,64}$', gntpass):
-      raise errors.OpExecError("ganeti password corruption detected")
-    f = open(constants.SSL_CERT_FILE)
-    try:
-      gntpem = f.read(8192)
-    finally:
-      f.close()
-    # in the base64 pem encoding, neither '!' nor '.' are valid chars,
-    # so we use this to detect an invalid certificate; as long as the
-    # cert doesn't contain this, the here-document will be correctly
-    # parsed by the shell sequence below
-    if re.search('^!EOF\.', gntpem, re.MULTILINE):
-      raise errors.OpExecError("invalid PEM encoding in the SSL certificate")
-    if not gntpem.endswith("\n"):
-      raise errors.OpExecError("PEM must end with newline")
-    logger.Info("copy cluster pass to %s and starting the node daemon" % node)
-
-    # and then connect with ssh to set password and start ganeti-noded
-    # note that all the below variables are sanitized at this point,
-    # either by being constants or by the checks above
-    ss = self.sstore
-    mycommand = ("umask 077 && "
-                 "echo '%s' > '%s' && "
-                 "cat > '%s' << '!EOF.' && \n"
-                 "%s!EOF.\n%s restart" %
-                 (gntpass, ss.KeyToFilename(ss.SS_NODED_PASS),
-                  constants.SSL_CERT_FILE, gntpem,
-                  constants.NODE_INITD_SCRIPT))
-
-    result = self.ssh.Run(node, 'root', mycommand, batch=False, ask_key=True)
-    if result.failed:
-      raise errors.OpExecError("Remote command on node %s, error: %s,"
-                               " output: %s" %
-                               (node, result.fail_reason, result.output))
-
     # check connectivity
-    time.sleep(4)
-
     result = rpc.call_version([node])[node]
     if result:
       if constants.PROTOCOL_VERSION == result:
@@ -1623,12 +1696,22 @@ class LUAddNode(LogicalUnit):
                                  " you gave (%s). Please fix and re-run this"
                                  " command." % new_node.secondary_ip)
 
-    success, msg = self.ssh.VerifyNodeHostname(node)
-    if not success:
-      raise errors.OpExecError("Node '%s' claims it has a different hostname"
-                               " than the one the resolver gives: %s."
-                               " Please fix and re-run this command." %
-                               (node, msg))
+    node_verify_list = [self.sstore.GetMasterNode()]
+    node_verify_param = {
+      'nodelist': [node],
+      # TODO: do a node-net-test as well?
+    }
+
+    result = rpc.call_node_verify(node_verify_list, node_verify_param)
+    for verifier in node_verify_list:
+      if not result[verifier]:
+        raise errors.OpExecError("Cannot communicate with %s's node daemon"
+                                 " for remote verification" % verifier)
+      if result[verifier]['nodelist']:
+        for failed in result[verifier]['nodelist']:
+          feedback_fn("ssh/hostname verification failed %s -> %s" %
+                      (verifier, result[verifier]['nodelist'][failed]))
+        raise errors.OpExecError("ssh/hostname verification failed.")
 
     # Distribute updated /etc/hosts and known_hosts to all nodes,
     # including the node just added
@@ -1647,88 +1730,19 @@ class LUAddNode(LogicalUnit):
           logger.Error("copy of file %s to node %s failed" %
                        (fname, to_node))
 
-    to_copy = ss.GetFileList()
+    to_copy = self.sstore.GetFileList()
     if self.sstore.GetHypervisorType() == constants.HT_XEN_HVM31:
       to_copy.append(constants.VNC_PASSWORD_FILE)
     for fname in to_copy:
-      if not self.ssh.CopyFileToNode(node, fname):
+      result = rpc.call_upload_file([node], fname)
+      if not result[node]:
         logger.Error("could not copy file %s to node %s" % (fname, node))
 
     if not self.op.readd:
       logger.Info("adding node %s to cluster.conf" % node)
       self.cfg.AddNode(new_node)
-
-
-class LUMasterFailover(LogicalUnit):
-  """Failover the master node to the current node.
-
-  This is a special LU in that it must run on a non-master node.
-
-  """
-  HPATH = "master-failover"
-  HTYPE = constants.HTYPE_CLUSTER
-  REQ_MASTER = False
-  REQ_WSSTORE = True
-  _OP_REQP = []
-
-  def BuildHooksEnv(self):
-    """Build hooks env.
-
-    This will run on the new master only in the pre phase, and on all
-    the nodes in the post phase.
-
-    """
-    env = {
-      "OP_TARGET": self.new_master,
-      "NEW_MASTER": self.new_master,
-      "OLD_MASTER": self.old_master,
-      }
-    return env, [self.new_master], self.cfg.GetNodeList()
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks that we are not already the master.
-
-    """
-    self.new_master = utils.HostInfo().name
-    self.old_master = self.sstore.GetMasterNode()
-
-    if self.old_master == self.new_master:
-      raise errors.OpPrereqError("This commands must be run on the node"
-                                 " where you want the new master to be."
-                                 " %s is already the master" %
-                                 self.old_master)
-
-  def Exec(self, feedback_fn):
-    """Failover the master node.
-
-    This command, when run on a non-master node, will cause the current
-    master to cease being master, and the non-master to become new
-    master.
-
-    """
-    #TODO: do not rely on gethostname returning the FQDN
-    logger.Info("setting master to %s, old master: %s" %
-                (self.new_master, self.old_master))
-
-    if not rpc.call_node_stop_master(self.old_master):
-      logger.Error("could disable the master role on the old master"
-                   " %s, please disable manually" % self.old_master)
-
-    ss = self.sstore
-    ss.SetKey(ss.SS_MASTER_NODE, self.new_master)
-    if not rpc.call_upload_file(self.cfg.GetNodeList(),
-                                ss.KeyToFilename(ss.SS_MASTER_NODE)):
-      logger.Error("could not distribute the new simple store master file"
-                   " to the other nodes, please check.")
-
-    if not rpc.call_node_start_master(self.new_master):
-      logger.Error("could not start the master role on the new master"
-                   " %s, please check" % self.new_master)
-      feedback_fn("Error in activating the master IP on the new master,"
-                  " please fix manually.")
-
+      # Add the new node to the Ganeti Lock Manager
+      self.context.glm.add(locking.LEVEL_NODE, node)
 
 
 class LUQueryClusterInfo(NoHooksLU):
@@ -1737,6 +1751,10 @@ class LUQueryClusterInfo(NoHooksLU):
   """
   _OP_REQP = []
   REQ_MASTER = False
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {}
 
   def CheckPrereq(self):
     """No prerequsites needed for this LU.
@@ -1763,50 +1781,15 @@ class LUQueryClusterInfo(NoHooksLU):
     return result
 
 
-class LUClusterCopyFile(NoHooksLU):
-  """Copy file to cluster.
-
-  """
-  _OP_REQP = ["nodes", "filename"]
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    It should check that the named file exists and that the given list
-    of nodes is valid.
-
-    """
-    if not os.path.exists(self.op.filename):
-      raise errors.OpPrereqError("No such filename '%s'" % self.op.filename)
-
-    self.nodes = _GetWantedNodes(self, self.op.nodes)
-
-  def Exec(self, feedback_fn):
-    """Copy a file from master to some nodes.
-
-    Args:
-      opts - class with options as members
-      args - list containing a single element, the file name
-    Opts used:
-      nodes - list containing the name of target nodes; if empty, all nodes
-
-    """
-    filename = self.op.filename
-
-    myname = utils.HostInfo().name
-
-    for node in self.nodes:
-      if node == myname:
-        continue
-      if not self.ssh.CopyFileToNode(node, filename):
-        logger.Error("Copy of file %s to node %s failed" % (filename, node))
-
-
 class LUDumpClusterConfig(NoHooksLU):
   """Return a text-representation of the cluster-config.
 
   """
   _OP_REQP = []
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self.needed_locks = {}
 
   def CheckPrereq(self):
     """No prerequisites.
@@ -1821,38 +1804,6 @@ class LUDumpClusterConfig(NoHooksLU):
     return self.cfg.DumpConfig()
 
 
-class LURunClusterCommand(NoHooksLU):
-  """Run a command on some nodes.
-
-  """
-  _OP_REQP = ["command", "nodes"]
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    It checks that the given list of nodes is valid.
-
-    """
-    self.nodes = _GetWantedNodes(self, self.op.nodes)
-
-  def Exec(self, feedback_fn):
-    """Run a command on some nodes.
-
-    """
-    # put the master at the end of the nodes list
-    master_node = self.sstore.GetMasterNode()
-    if master_node in self.nodes:
-      self.nodes.remove(master_node)
-      self.nodes.append(master_node)
-
-    data = []
-    for node in self.nodes:
-      result = self.ssh.Run(node, "root", self.op.command)
-      data.append((node, result.output, result.exit_code))
-
-    return data
-
-
 class LUActivateInstanceDisks(NoHooksLU):
   """Bring up an instance's disks.
 
@@ -2055,6 +2006,16 @@ class LUStartupInstance(LogicalUnit):
   HPATH = "instance-start"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name", "force"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2076,11 +2037,9 @@ class LUStartupInstance(LogicalUnit):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
+    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
     # check bridges existance
     _CheckInstanceBridgesExist(instance)
@@ -2089,9 +2048,6 @@ class LUStartupInstance(LogicalUnit):
                          "starting instance %s" % instance.name,
                          instance.memory)
 
-    self.instance = instance
-    self.op.instance_name = instance.name
-
   def Exec(self, feedback_fn):
     """Start the instance.
 
@@ -2118,6 +2074,16 @@ class LURebootInstance(LogicalUnit):
   HPATH = "instance-reboot"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name", "ignore_secondaries", "reboot_type"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2139,18 +2105,13 @@ class LURebootInstance(LogicalUnit):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
+    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
     # check bridges existance
     _CheckInstanceBridgesExist(instance)
 
-    self.instance = instance
-    self.op.instance_name = instance.name
-
   def Exec(self, feedback_fn):
     """Reboot the instance.
 
@@ -2194,6 +2155,16 @@ class LUShutdownInstance(LogicalUnit):
   HPATH = "instance-stop"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2212,12 +2183,9 @@ class LUShutdownInstance(LogicalUnit):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
-    self.instance = instance
+    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
   def Exec(self, feedback_fn):
     """Shutdown the instance.
@@ -2239,6 +2207,16 @@ class LUReinstallInstance(LogicalUnit):
   HPATH = "instance-reinstall"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2257,11 +2235,10 @@ class LUReinstallInstance(LogicalUnit):
     This checks that the instance is in the cluster and is not running.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
+    instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
+
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
                                  self.op.instance_name)
@@ -2362,9 +2339,7 @@ class LURenameInstance(LogicalUnit):
                                  new_name)
 
     if not getattr(self.op, "ignore_ip", False):
-      command = ["fping", "-q", name_info.ip]
-      result = utils.RunCmd(command)
-      if not result.failed:
+      if utils.TcpPing(name_info.ip, constants.DEFAULT_NODED_PORT):
         raise errors.OpPrereqError("IP %s of instance %s already in use" %
                                    (name_info.ip, new_name))
 
@@ -2380,6 +2355,9 @@ class LURenameInstance(LogicalUnit):
       old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
 
     self.cfg.RenameInstance(inst.name, self.op.new_name)
+    # Change the instance lock. This is definitely safe while we hold the BGL
+    self.context.glm.remove(locking.LEVEL_INSTANCE, inst.name)
+    self.context.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
 
     # re-read the instance from the configuration after rename
     inst = self.cfg.GetInstanceInfo(self.op.new_name)
@@ -2472,6 +2450,8 @@ class LURemoveInstance(LogicalUnit):
     logger.Info("removing instance %s out of cluster config" % instance.name)
 
     self.cfg.RemoveInstance(instance.name)
+    # Remove the new instance from the Ganeti Lock Manager
+    self.context.glm.remove(locking.LEVEL_INSTANCE, instance.name)
 
 
 class LUQueryInstances(NoHooksLU):
@@ -2600,6 +2580,16 @@ class LUFailoverInstance(LogicalUnit):
   HPATH = "instance-failover"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name", "ignore_consistency"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    self.needed_locks[locking.LEVEL_NODE] = []
+    self.recalculate_locks[locking.LEVEL_NODE] = 'replace'
+
+  def DeclareLocks(self, level):
+    if level == locking.LEVEL_NODE:
+      self._LockInstancesNodes()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -2620,11 +2610,9 @@ class LUFailoverInstance(LogicalUnit):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
+    self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
     if instance.disk_template not in constants.DTS_NET_MIRROR:
       raise errors.OpPrereqError("Instance's disk layout is not"
@@ -2647,8 +2635,6 @@ class LUFailoverInstance(LogicalUnit):
                                  " exist on destination node '%s'" %
                                  (brlist, target_node))
 
-    self.instance = instance
-
   def Exec(self, feedback_fn):
     """Failover an instance.
 
@@ -2772,22 +2758,6 @@ def _GenerateUniqueNames(cfg, exts):
   return results
 
 
-def _GenerateMDDRBDBranch(cfg, primary, secondary, size, names):
-  """Generate a drbd device complete with its children.
-
-  """
-  port = cfg.AllocatePort()
-  vgname = cfg.GetVGName()
-  dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
-                          logical_id=(vgname, names[0]))
-  dev_meta = objects.Disk(dev_type=constants.LD_LV, size=128,
-                          logical_id=(vgname, names[1]))
-  drbd_dev = objects.Disk(dev_type=constants.LD_DRBD7, size=size,
-                          logical_id = (primary, secondary, port),
-                          children = [dev_data, dev_meta])
-  return drbd_dev
-
-
 def _GenerateDRBD8Branch(cfg, primary, secondary, size, names, iv_name):
   """Generate a drbd8 device complete with its children.
 
@@ -3346,6 +3316,8 @@ class LUCreateInstance(LogicalUnit):
     feedback_fn("adding instance %s to cluster config" % instance)
 
     self.cfg.AddInstance(iobj)
+    # Add the new instance to the Ganeti Lock Manager
+    self.context.glm.add(locking.LEVEL_INSTANCE, instance)
 
     if self.op.wait_for_sync:
       disk_abort = not _WaitForSync(self.cfg, iobj, self.proc)
@@ -3360,6 +3332,8 @@ class LUCreateInstance(LogicalUnit):
     if disk_abort:
       _RemoveDisks(iobj, self.cfg)
       self.cfg.RemoveInstance(iobj.name)
+      # Remove the new instance from the Ganeti Lock Manager
+      self.context.glm.remove(locking.LEVEL_INSTANCE, iobj.name)
       raise errors.OpExecError("There are some degraded disks for"
                                " this instance")
 
@@ -3404,6 +3378,10 @@ class LUConnectConsole(NoHooksLU):
 
   """
   _OP_REQP = ["instance_name"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
 
   def CheckPrereq(self):
     """Check prerequisites.
@@ -3411,12 +3389,9 @@ class LUConnectConsole(NoHooksLU):
     This checks that the instance is in the cluster.
 
     """
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("Instance '%s' not known" %
-                                 self.op.instance_name)
-    self.instance = instance
+    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
 
   def Exec(self, feedback_fn):
     """Connect to the console of an instance
@@ -4146,6 +4121,10 @@ class LUSetInstanceParams(LogicalUnit):
   HPATH = "instance-modify"
   HTYPE = constants.HTYPE_INSTANCE
   _OP_REQP = ["instance_name"]
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -4183,6 +4162,9 @@ class LUSetInstanceParams(LogicalUnit):
     This only checks the instance list against the existing names.
 
     """
+    # FIXME: all the parameters could be checked before, in ExpandNames, or in
+    # a separate CheckArguments function, if we implement one, so the operation
+    # can be aborted without waiting for any lock, should it have an error...
     self.mem = getattr(self.op, "mem", None)
     self.vcpus = getattr(self.op, "vcpus", None)
     self.ip = getattr(self.op, "ip", None)
@@ -4277,13 +4259,9 @@ class LUSetInstanceParams(LogicalUnit):
                                    " like a valid IP address" %
                                    self.op.vnc_bind_address)
 
-    instance = self.cfg.GetInstanceInfo(
-      self.cfg.ExpandInstanceName(self.op.instance_name))
-    if instance is None:
-      raise errors.OpPrereqError("No such instance name '%s'" %
-                                 self.op.instance_name)
-    self.op.instance_name = instance.name
-    self.instance = instance
+    self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
+    assert self.instance is not None, \
+      "Cannot retrieve locked instance %s" % self.op.instance_name
     return
 
   def Exec(self, feedback_fn):
@@ -4333,7 +4311,7 @@ class LUSetInstanceParams(LogicalUnit):
       instance.vnc_bind_address = self.vnc_bind_address
       result.append(("vnc_bind_address", self.vnc_bind_address))
 
-    self.cfg.AddInstance(instance)
+    self.cfg.Update(instance)
 
     return result
 
@@ -4422,8 +4400,8 @@ class LUExportInstance(LogicalUnit):
     if self.op.shutdown:
       # shutdown the instance, but not the disks
       if not rpc.call_instance_shutdown(src_node, instance):
-         raise errors.OpExecError("Could not shutdown instance %s on node %s" %
-                                  (instance.name, src_node))
+        raise errors.OpExecError("Could not shutdown instance %s on node %s" %
+                                 (instance.name, src_node))
 
     vgname = self.cfg.GetVGName()
 
@@ -4561,7 +4539,7 @@ class LUGetTags(TagsLU):
     """Returns the tag list.
 
     """
-    return self.target.GetTags()
+    return list(self.target.GetTags())
 
 
 class LUSearchTags(NoHooksLU):
@@ -4670,25 +4648,35 @@ class LUDelTags(TagsLU):
                                 " config file and the operation has been"
                                 " aborted. Please retry.")
 
+
 class LUTestDelay(NoHooksLU):
   """Sleep for a specified amount of time.
 
-  This LU sleeps on the master and/or nodes for a specified amoutn of
+  This LU sleeps on the master and/or nodes for a specified amount of
   time.
 
   """
   _OP_REQP = ["duration", "on_master", "on_nodes"]
+  REQ_BGL = False
 
-  def CheckPrereq(self):
-    """Check prerequisites.
+  def ExpandNames(self):
+    """Expand names and set required locks.
 
-    This checks that we have a good list of nodes and/or the duration
-    is valid.
+    This expands the node list, if any.
 
     """
-
+    self.needed_locks = {}
     if self.op.on_nodes:
+      # _GetWantedNodes can be used here, but is not always appropriate to use
+      # this way in ExpandNames. Check LogicalUnit.ExpandNames docstring for
+      # more information.
       self.op.on_nodes = _GetWantedNodes(self, self.op.on_nodes)
+      self.needed_locks[locking.LEVEL_NODE] = self.op.on_nodes
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    """
 
   def Exec(self, feedback_fn):
     """Do the actual sleep.
@@ -4946,9 +4934,8 @@ class IAllocator(object):
     if rcode == constants.IARUN_NOTFOUND:
       raise errors.OpExecError("Can't find allocator '%s'" % name)
     elif rcode == constants.IARUN_FAILURE:
-        raise errors.OpExecError("Instance allocator call failed: %s,"
-                                 " output: %s" %
-                                 (fail, stdout+stderr))
+      raise errors.OpExecError("Instance allocator call failed: %s,"
+                               " output: %s" % (fail, stdout+stderr))
     self.out_text = stdout
     if validate:
       self._ValidateResult()