Export extractExTags and updateExclTags
[ganeti-local] / lib / mcpu.py
index f737f7e..4498747 100644 (file)
@@ -38,16 +38,29 @@ import traceback
 from ganeti import opcodes
 from ganeti import constants
 from ganeti import errors
+from ganeti import hooksmaster
 from ganeti import cmdlib
 from ganeti import locking
 from ganeti import utils
 from ganeti import compat
-from ganeti import pathutils
 
 
 _OP_PREFIX = "Op"
 _LU_PREFIX = "LU"
 
+#: LU classes which don't need to acquire the node allocation lock
+#: (L{locking.NAL}) when they acquire all node or node resource locks
+_NODE_ALLOC_WHITELIST = frozenset([])
+
+#: LU classes which don't need to acquire the node allocation lock
+#: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
+#: or node resource locks
+_NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
+  cmdlib.LUBackupExport,
+  cmdlib.LUBackupRemove,
+  cmdlib.LUOobCommand,
+  ])
+
 
 class LockAcquireTimeout(Exception):
   """Exception to report timeouts on acquiring locks.
@@ -232,18 +245,42 @@ def _FailingSubmitManyJobs(_):
                                " queries) can not submit jobs")
 
 
-def _RpcResultsToHooksResults(rpc_results):
-  """Function to convert RPC results to the format expected by HooksMaster.
+def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
+                 _nal_whitelist=_NODE_ALLOC_WHITELIST):
+  """Performs consistency checks on locks acquired by a logical unit.
 
-  @type rpc_results: dict(node: L{rpc.RpcResult})
-  @param rpc_results: RPC results
-  @rtype: dict(node: (fail_msg, offline, hooks_results))
-  @return: RPC results unpacked according to the format expected by
-    L({mcpu.HooksMaster}
+  @type lu: L{cmdlib.LogicalUnit}
+  @param lu: Logical unit instance
+  @type glm: L{locking.GanetiLockManager}
+  @param glm: Lock manager
 
   """
-  return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
-              for (node, rpc_res) in rpc_results.items())
+  if not __debug__:
+    return
+
+  have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
+
+  for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
+    # TODO: Verify using actual lock mode, not using LU variables
+    if level in lu.needed_locks:
+      share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
+      share_level = lu.share_locks[level]
+
+      if lu.__class__ in _mode_whitelist:
+        assert share_node_alloc != share_level, \
+          "LU is whitelisted to use different modes for node allocation lock"
+      else:
+        assert bool(share_node_alloc) == bool(share_level), \
+          ("Node allocation lock must be acquired using the same mode as nodes"
+           " and node resources")
+
+      if lu.__class__ in _nal_whitelist:
+        assert not have_nal, \
+          "LU is whitelisted for not acquiring the node allocation lock"
+      elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
+        assert have_nal, \
+          ("Node allocation lock must be used if an LU acquires all nodes"
+           " or node resources")
 
 
 class Processor(object):
@@ -263,7 +300,7 @@ class Processor(object):
     self._ec_id = ec_id
     self._cbs = None
     self.rpc = context.rpc
-    self.hmclass = HooksMaster
+    self.hmclass = hooksmaster.HooksMaster
     self._enable_locks = enable_locks
 
   def _CheckLocksEnabled(self):
@@ -275,7 +312,7 @@ class Processor(object):
     if not self._enable_locks:
       raise errors.ProgrammerError("Attempted to use disabled locks")
 
-  def _AcquireLocks(self, level, names, shared, timeout):
+  def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
     """Acquires locks via the Ganeti lock manager.
 
     @type level: int
@@ -284,6 +321,8 @@ class Processor(object):
     @param names: Lock names
     @type shared: bool
     @param shared: Whether the locks should be acquired in shared mode
+    @type opportunistic: bool
+    @param opportunistic: Whether to acquire opportunistically
     @type timeout: None or float
     @param timeout: Timeout for acquiring the locks
     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
@@ -298,7 +337,8 @@ class Processor(object):
       priority = None
 
     acquired = self.context.glm.acquire(level, names, shared=shared,
-                                        timeout=timeout, priority=priority)
+                                        timeout=timeout, priority=priority,
+                                        opportunistic=opportunistic)
 
     if acquired is None:
       raise LockAcquireTimeout()
@@ -353,9 +393,13 @@ class Processor(object):
     given LU and its opcodes.
 
     """
+    glm = self.context.glm
     adding_locks = level in lu.add_locks
     acquiring_locks = level in lu.needed_locks
+
     if level not in locking.LEVELS:
+      _VerifyLocks(lu, glm)
+
       if self._cbs:
         self._cbs.NotifyStart()
 
@@ -384,6 +428,7 @@ class Processor(object):
 
       lu.DeclareLocks(level)
       share = lu.share_locks[level]
+      opportunistic = lu.opportunistic_locks[level]
 
       try:
         assert adding_locks ^ acquiring_locks, \
@@ -393,7 +438,7 @@ class Processor(object):
           # Acquiring locks
           needed_locks = lu.needed_locks[level]
 
-          self._AcquireLocks(level, needed_locks, share,
+          self._AcquireLocks(level, needed_locks, share, opportunistic,
                              calc_timeout())
         else:
           # Adding locks
@@ -401,7 +446,7 @@ class Processor(object):
           lu.remove_locks[level] = add_locks
 
           try:
-            self.context.glm.add(level, add_locks, acquired=1, shared=share)
+            glm.add(level, add_locks, acquired=1, shared=share)
           except errors.LockError:
             logging.exception("Detected lock error in level %s for locks"
                               " %s, shared=%s", level, add_locks, share)
@@ -414,10 +459,10 @@ class Processor(object):
           result = self._LockAndExecLU(lu, level + 1, calc_timeout)
         finally:
           if level in lu.remove_locks:
-            self.context.glm.remove(level, lu.remove_locks[level])
+            glm.remove(level, lu.remove_locks[level])
       finally:
-        if self.context.glm.is_owned(level):
-          self.context.glm.release(level)
+        if glm.is_owned(level):
+          glm.release(level)
 
     else:
       result = self._LockAndExecLU(lu, level + 1, calc_timeout)
@@ -457,7 +502,7 @@ class Processor(object):
         # and in a shared fashion otherwise (to prevent concurrent run with
         # an exclusive LU.
         self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
-                            not lu_class.REQ_BGL, calc_timeout())
+                            not lu_class.REQ_BGL, False, calc_timeout())
       elif lu_class.REQ_BGL:
         raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
                                      " disabled" % op.OP_ID)
@@ -485,8 +530,12 @@ class Processor(object):
     if not (resultcheck_fn is None or resultcheck_fn(result)):
       logging.error("Expected opcode result matching %s, got %s",
                     resultcheck_fn, result)
-      raise errors.OpResultError("Opcode result does not match %s: %s" %
-                                 (resultcheck_fn, utils.Truncate(result, 80)))
+      if not getattr(op, "dry_run", False):
+        # FIXME: LUs should still behave in dry_run mode, or
+        # alternately we should have OP_DRYRUN_RESULT; in the
+        # meantime, we simply skip the OP_RESULT check in dry-run mode
+        raise errors.OpResultError("Opcode result does not match %s: %s" %
+                                   (resultcheck_fn, utils.Truncate(result, 80)))
 
     return result
 
@@ -540,229 +589,3 @@ class Processor(object):
       raise errors.ProgrammerError("Tried to use execution context id when"
                                    " not set")
     return self._ec_id
-
-
-class HooksMaster(object):
-  def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
-               hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
-               cluster_name=None, master_name=None):
-    """Base class for hooks masters.
-
-    This class invokes the execution of hooks according to the behaviour
-    specified by its parameters.
-
-    @type opcode: string
-    @param opcode: opcode of the operation to which the hooks are tied
-    @type hooks_path: string
-    @param hooks_path: prefix of the hooks directories
-    @type nodes: 2-tuple of lists
-    @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
-      run and nodes on which post-hooks must be run
-    @type hooks_execution_fn: function that accepts the following parameters:
-      (node_list, hooks_path, phase, environment)
-    @param hooks_execution_fn: function that will execute the hooks; can be
-      None, indicating that no conversion is necessary.
-    @type hooks_results_adapt_fn: function
-    @param hooks_results_adapt_fn: function that will adapt the return value of
-      hooks_execution_fn to the format expected by RunPhase
-    @type build_env_fn: function that returns a dictionary having strings as
-      keys
-    @param build_env_fn: function that builds the environment for the hooks
-    @type log_fn: function that accepts a string
-    @param log_fn: logging function
-    @type htype: string or None
-    @param htype: None or one of L{constants.HTYPE_CLUSTER},
-     L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
-    @type cluster_name: string
-    @param cluster_name: name of the cluster
-    @type master_name: string
-    @param master_name: name of the master
-
-    """
-    self.opcode = opcode
-    self.hooks_path = hooks_path
-    self.hooks_execution_fn = hooks_execution_fn
-    self.hooks_results_adapt_fn = hooks_results_adapt_fn
-    self.build_env_fn = build_env_fn
-    self.log_fn = log_fn
-    self.htype = htype
-    self.cluster_name = cluster_name
-    self.master_name = master_name
-
-    self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
-    (self.pre_nodes, self.post_nodes) = nodes
-
-  def _BuildEnv(self, phase):
-    """Compute the environment and the target nodes.
-
-    Based on the opcode and the current node list, this builds the
-    environment for the hooks and the target node list for the run.
-
-    """
-    if phase == constants.HOOKS_PHASE_PRE:
-      prefix = "GANETI_"
-    elif phase == constants.HOOKS_PHASE_POST:
-      prefix = "GANETI_POST_"
-    else:
-      raise AssertionError("Unknown phase '%s'" % phase)
-
-    env = {}
-
-    if self.hooks_path is not None:
-      phase_env = self.build_env_fn()
-      if phase_env:
-        assert not compat.any(key.upper().startswith(prefix)
-                              for key in phase_env)
-        env.update(("%s%s" % (prefix, key), value)
-                   for (key, value) in phase_env.items())
-
-    if phase == constants.HOOKS_PHASE_PRE:
-      assert compat.all((key.startswith("GANETI_") and
-                         not key.startswith("GANETI_POST_"))
-                        for key in env)
-
-    elif phase == constants.HOOKS_PHASE_POST:
-      assert compat.all(key.startswith("GANETI_POST_") for key in env)
-      assert isinstance(self.pre_env, dict)
-
-      # Merge with pre-phase environment
-      assert not compat.any(key.startswith("GANETI_POST_")
-                            for key in self.pre_env)
-      env.update(self.pre_env)
-    else:
-      raise AssertionError("Unknown phase '%s'" % phase)
-
-    return env
-
-  def _RunWrapper(self, node_list, hpath, phase, phase_env):
-    """Simple wrapper over self.callfn.
-
-    This method fixes the environment before executing the hooks.
-
-    """
-    env = {
-      "PATH": constants.HOOKS_PATH,
-      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
-      "GANETI_OP_CODE": self.opcode,
-      "GANETI_DATA_DIR": pathutils.DATA_DIR,
-      "GANETI_HOOKS_PHASE": phase,
-      "GANETI_HOOKS_PATH": hpath,
-      }
-
-    if self.htype:
-      env["GANETI_OBJECT_TYPE"] = self.htype
-
-    if self.cluster_name is not None:
-      env["GANETI_CLUSTER"] = self.cluster_name
-
-    if self.master_name is not None:
-      env["GANETI_MASTER"] = self.master_name
-
-    if phase_env:
-      env = utils.algo.JoinDisjointDicts(env, phase_env)
-
-    # Convert everything to strings
-    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
-
-    assert compat.all(key == "PATH" or key.startswith("GANETI_")
-                      for key in env)
-
-    return self.hooks_execution_fn(node_list, hpath, phase, env)
-
-  def RunPhase(self, phase, nodes=None):
-    """Run all the scripts for a phase.
-
-    This is the main function of the HookMaster.
-    It executes self.hooks_execution_fn, and after running
-    self.hooks_results_adapt_fn on its results it expects them to be in the form
-    {node_name: (fail_msg, [(script, result, output), ...]}).
-
-    @param phase: one of L{constants.HOOKS_PHASE_POST} or
-        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
-    @param nodes: overrides the predefined list of nodes for the given phase
-    @return: the processed results of the hooks multi-node rpc call
-    @raise errors.HooksFailure: on communication failure to the nodes
-    @raise errors.HooksAbort: on failure of one of the hooks
-
-    """
-    if phase == constants.HOOKS_PHASE_PRE:
-      if nodes is None:
-        nodes = self.pre_nodes
-      env = self.pre_env
-    elif phase == constants.HOOKS_PHASE_POST:
-      if nodes is None:
-        nodes = self.post_nodes
-      env = self._BuildEnv(phase)
-    else:
-      raise AssertionError("Unknown phase '%s'" % phase)
-
-    if not nodes:
-      # empty node list, we should not attempt to run this as either
-      # we're in the cluster init phase and the rpc client part can't
-      # even attempt to run, or this LU doesn't do hooks at all
-      return
-
-    results = self._RunWrapper(nodes, self.hooks_path, phase, env)
-    if not results:
-      msg = "Communication Failure"
-      if phase == constants.HOOKS_PHASE_PRE:
-        raise errors.HooksFailure(msg)
-      else:
-        self.log_fn(msg)
-        return results
-
-    converted_res = results
-    if self.hooks_results_adapt_fn:
-      converted_res = self.hooks_results_adapt_fn(results)
-
-    errs = []
-    for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
-      if offline:
-        continue
-
-      if fail_msg:
-        self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
-        continue
-
-      for script, hkr, output in hooks_results:
-        if hkr == constants.HKR_FAIL:
-          if phase == constants.HOOKS_PHASE_PRE:
-            errs.append((node_name, script, output))
-          else:
-            if not output:
-              output = "(no output)"
-            self.log_fn("On %s script %s failed, output: %s" %
-                        (node_name, script, output))
-
-    if errs and phase == constants.HOOKS_PHASE_PRE:
-      raise errors.HooksAbort(errs)
-
-    return results
-
-  def RunConfigUpdate(self):
-    """Run the special configuration update hook
-
-    This is a special hook that runs only on the master after each
-    top-level LI if the configuration has been updated.
-
-    """
-    phase = constants.HOOKS_PHASE_POST
-    hpath = constants.HOOKS_NAME_CFGUPDATE
-    nodes = [self.master_name]
-    self._RunWrapper(nodes, hpath, phase, self.pre_env)
-
-  @staticmethod
-  def BuildFromLu(hooks_execution_fn, lu):
-    if lu.HPATH is None:
-      nodes = (None, None)
-    else:
-      nodes = map(frozenset, lu.BuildHooksNodes())
-
-    master_name = cluster_name = None
-    if lu.cfg:
-      master_name = lu.cfg.GetMasterNode()
-      cluster_name = lu.cfg.GetClusterName()
-
-    return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
-                       _RpcResultsToHooksResults, lu.BuildHooksEnv,
-                       lu.LogWarning, lu.HTYPE, cluster_name, master_name)