jqueue: Improve inotify error reporting
[ganeti-local] / lib / mcpu.py
index 96c589b..2851f29 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
 #
 #
 
-# Copyright (C) 2006, 2007, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -28,9 +28,12 @@ are two kinds of classes defined:
 
 """
 
 
 """
 
+import sys
 import logging
 import random
 import time
 import logging
 import random
 import time
+import itertools
+import traceback
 
 from ganeti import opcodes
 from ganeti import constants
 
 from ganeti import opcodes
 from ganeti import constants
@@ -39,11 +42,25 @@ from ganeti import cmdlib
 from ganeti import locking
 from ganeti import utils
 from ganeti import compat
 from ganeti import locking
 from ganeti import utils
 from ganeti import compat
+from ganeti import pathutils
 
 
 _OP_PREFIX = "Op"
 _LU_PREFIX = "LU"
 
 
 
 _OP_PREFIX = "Op"
 _LU_PREFIX = "LU"
 
+#: LU classes which don't need to acquire the node allocation lock
+#: (L{locking.NAL}) when they acquire all node or node resource locks
+_NODE_ALLOC_WHITELIST = frozenset()
+
+#: LU classes which don't need to acquire the node allocation lock
+#: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
+#: or node resource locks
+_NODE_ALLOC_MODE_WHITELIST = frozenset([
+  cmdlib.LUBackupExport,
+  cmdlib.LUBackupRemove,
+  cmdlib.LUOobCommand,
+  ])
+
 
 class LockAcquireTimeout(Exception):
   """Exception to report timeouts on acquiring locks.
 
 class LockAcquireTimeout(Exception):
   """Exception to report timeouts on acquiring locks.
@@ -138,10 +155,11 @@ class OpExecCbBase: # pylint: disable=W0232
 
     """
 
 
     """
 
-  def CheckCancel(self):
-    """Check whether job has been cancelled.
+  def CurrentPriority(self): # pylint: disable=R0201
+    """Returns current priority or C{None}.
 
     """
 
     """
+    return None
 
   def SubmitManyJobs(self, jobs):
     """Submits jobs for processing.
 
   def SubmitManyJobs(self, jobs):
     """Submits jobs for processing.
@@ -171,6 +189,62 @@ def _ComputeDispatchTable():
               if op.WITH_LU)
 
 
               if op.WITH_LU)
 
 
+def _SetBaseOpParams(src, defcomment, dst):
+  """Copies basic opcode parameters.
+
+  @type src: L{opcodes.OpCode}
+  @param src: Source opcode
+  @type defcomment: string
+  @param defcomment: Comment to specify if not already given
+  @type dst: L{opcodes.OpCode}
+  @param dst: Destination opcode
+
+  """
+  if hasattr(src, "debug_level"):
+    dst.debug_level = src.debug_level
+
+  if (getattr(dst, "priority", None) is None and
+      hasattr(src, "priority")):
+    dst.priority = src.priority
+
+  if not getattr(dst, opcodes.COMMENT_ATTR, None):
+    dst.comment = defcomment
+
+
+def _ProcessResult(submit_fn, op, result):
+  """Examines opcode result.
+
+  If necessary, additional processing on the result is done.
+
+  """
+  if isinstance(result, cmdlib.ResultWithJobs):
+    # Copy basic parameters (e.g. priority)
+    map(compat.partial(_SetBaseOpParams, op,
+                       "Submitted by %s" % op.OP_ID),
+        itertools.chain(*result.jobs))
+
+    # Submit jobs
+    job_submission = submit_fn(result.jobs)
+
+    # Build dictionary
+    result = result.other
+
+    assert constants.JOB_IDS_KEY not in result, \
+      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
+
+    result[constants.JOB_IDS_KEY] = job_submission
+
+  return result
+
+
+def _FailingSubmitManyJobs(_):
+  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
+
+  """
+  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
+                               " queries) can not submit jobs")
+
+
 def _RpcResultsToHooksResults(rpc_results):
   """Function to convert RPC results to the format expected by HooksMaster.
 
 def _RpcResultsToHooksResults(rpc_results):
   """Function to convert RPC results to the format expected by HooksMaster.
 
@@ -185,11 +259,49 @@ def _RpcResultsToHooksResults(rpc_results):
               for (node, rpc_res) in rpc_results.items())
 
 
               for (node, rpc_res) in rpc_results.items())
 
 
+def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
+                 _nal_whitelist=_NODE_ALLOC_WHITELIST):
+  """Performs consistency checks on locks acquired by a logical unit.
+
+  @type lu: L{cmdlib.LogicalUnit}
+  @param lu: Logical unit instance
+  @type glm: L{locking.GanetiLockManager}
+  @param glm: Lock manager
+
+  """
+  if not __debug__:
+    return
+
+  have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
+
+  for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
+    # TODO: Verify using actual lock mode, not using LU variables
+    if level in lu.needed_locks:
+      share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
+      share_level = lu.share_locks[level]
+
+      if lu.__class__ in _mode_whitelist:
+        assert share_node_alloc != share_level, \
+          "LU is whitelisted to use different modes for node allocation lock"
+      else:
+        assert bool(share_node_alloc) == bool(share_level), \
+          ("Node allocation lock must be acquired using the same mode as nodes"
+           " and node resources")
+
+      if lu.__class__ in _nal_whitelist:
+        assert not have_nal, \
+          "LU is whitelisted for not acquiring the node allocation lock"
+      elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
+        assert have_nal, \
+          ("Node allocation lock must be used if an LU acquires all nodes"
+           " or node resources")
+
+
 class Processor(object):
   """Object which runs OpCodes"""
   DISPATCH_TABLE = _ComputeDispatchTable()
 
 class Processor(object):
   """Object which runs OpCodes"""
   DISPATCH_TABLE = _ComputeDispatchTable()
 
-  def __init__(self, context, ec_id):
+  def __init__(self, context, ec_id, enable_locks=True):
     """Constructor for Processor
 
     @type context: GanetiContext
     """Constructor for Processor
 
     @type context: GanetiContext
@@ -203,8 +315,18 @@ class Processor(object):
     self._cbs = None
     self.rpc = context.rpc
     self.hmclass = HooksMaster
     self._cbs = None
     self.rpc = context.rpc
     self.hmclass = HooksMaster
+    self._enable_locks = enable_locks
+
+  def _CheckLocksEnabled(self):
+    """Checks if locking is enabled.
+
+    @raise errors.ProgrammerError: In case locking is not enabled
 
 
-  def _AcquireLocks(self, level, names, shared, timeout, priority):
+    """
+    if not self._enable_locks:
+      raise errors.ProgrammerError("Attempted to use disabled locks")
+
+  def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
     """Acquires locks via the Ganeti lock manager.
 
     @type level: int
     """Acquires locks via the Ganeti lock manager.
 
     @type level: int
@@ -213,43 +335,30 @@ class Processor(object):
     @param names: Lock names
     @type shared: bool
     @param shared: Whether the locks should be acquired in shared mode
     @param names: Lock names
     @type shared: bool
     @param shared: Whether the locks should be acquired in shared mode
+    @type opportunistic: bool
+    @param opportunistic: Whether to acquire opportunistically
     @type timeout: None or float
     @param timeout: Timeout for acquiring the locks
     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
         amount of time
 
     """
     @type timeout: None or float
     @param timeout: Timeout for acquiring the locks
     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
         amount of time
 
     """
+    self._CheckLocksEnabled()
+
     if self._cbs:
     if self._cbs:
-      self._cbs.CheckCancel()
+      priority = self._cbs.CurrentPriority()
+    else:
+      priority = None
 
     acquired = self.context.glm.acquire(level, names, shared=shared,
 
     acquired = self.context.glm.acquire(level, names, shared=shared,
-                                        timeout=timeout, priority=priority)
+                                        timeout=timeout, priority=priority,
+                                        opportunistic=opportunistic)
 
     if acquired is None:
       raise LockAcquireTimeout()
 
     return acquired
 
 
     if acquired is None:
       raise LockAcquireTimeout()
 
     return acquired
 
-  def _ProcessResult(self, result):
-    """Examines opcode result.
-
-    If necessary, additional processing on the result is done.
-
-    """
-    if isinstance(result, cmdlib.ResultWithJobs):
-      # Submit jobs
-      job_submission = self._cbs.SubmitManyJobs(result.jobs)
-
-      # Build dictionary
-      result = result.other
-
-      assert constants.JOB_IDS_KEY not in result, \
-        "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
-
-      result[constants.JOB_IDS_KEY] = job_submission
-
-    return result
-
   def _ExecLU(self, lu):
     """Logical Unit execution sequence.
 
   def _ExecLU(self, lu):
     """Logical Unit execution sequence.
 
@@ -270,8 +379,13 @@ class Processor(object):
                    " the operation")
       return lu.dry_run_result
 
                    " the operation")
       return lu.dry_run_result
 
+    if self._cbs:
+      submit_mj_fn = self._cbs.SubmitManyJobs
+    else:
+      submit_mj_fn = _FailingSubmitManyJobs
+
     try:
     try:
-      result = self._ProcessResult(lu.Exec(self.Log))
+      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
                                 self.Log, result)
       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
                                 self.Log, result)
@@ -285,7 +399,7 @@ class Processor(object):
   def BuildHooksManager(self, lu):
     return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
 
   def BuildHooksManager(self, lu):
     return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
 
-  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
+  def _LockAndExecLU(self, lu, level, calc_timeout):
     """Execute a Logical Unit, with the needed locks.
 
     This is a recursive function that starts locking the given level, and
     """Execute a Logical Unit, with the needed locks.
 
     This is a recursive function that starts locking the given level, and
@@ -293,13 +407,29 @@ class Processor(object):
     given LU and its opcodes.
 
     """
     given LU and its opcodes.
 
     """
+    glm = self.context.glm
     adding_locks = level in lu.add_locks
     acquiring_locks = level in lu.needed_locks
     adding_locks = level in lu.add_locks
     acquiring_locks = level in lu.needed_locks
+
     if level not in locking.LEVELS:
     if level not in locking.LEVELS:
+      _VerifyLocks(lu, glm)
+
       if self._cbs:
         self._cbs.NotifyStart()
 
       if self._cbs:
         self._cbs.NotifyStart()
 
-      result = self._ExecLU(lu)
+      try:
+        result = self._ExecLU(lu)
+      except AssertionError, err:
+        # this is a bit ugly, as we don't know from which phase
+        # (prereq, exec) this comes; but it's better than an exception
+        # with no information
+        (_, _, tb) = sys.exc_info()
+        err_info = traceback.format_tb(tb)
+        del tb
+        logging.exception("Detected AssertionError")
+        raise errors.OpExecError("Internal assertion error: please report"
+                                 " this as a bug.\nError message: '%s';"
+                                 " location:\n%s" % (str(err), err_info[-1]))
 
     elif adding_locks and acquiring_locks:
       # We could both acquire and add locks at the same level, but for now we
 
     elif adding_locks and acquiring_locks:
       # We could both acquire and add locks at the same level, but for now we
@@ -308,8 +438,11 @@ class Processor(object):
                                 " others")
 
     elif adding_locks or acquiring_locks:
                                 " others")
 
     elif adding_locks or acquiring_locks:
+      self._CheckLocksEnabled()
+
       lu.DeclareLocks(level)
       share = lu.share_locks[level]
       lu.DeclareLocks(level)
       share = lu.share_locks[level]
+      opportunistic = lu.opportunistic_locks[level]
 
       try:
         assert adding_locks ^ acquiring_locks, \
 
       try:
         assert adding_locks ^ acquiring_locks, \
@@ -319,36 +452,38 @@ class Processor(object):
           # Acquiring locks
           needed_locks = lu.needed_locks[level]
 
           # Acquiring locks
           needed_locks = lu.needed_locks[level]
 
-          self._AcquireLocks(level, needed_locks, share,
-                             calc_timeout(), priority)
+          self._AcquireLocks(level, needed_locks, share, opportunistic,
+                             calc_timeout())
         else:
           # Adding locks
           add_locks = lu.add_locks[level]
           lu.remove_locks[level] = add_locks
 
           try:
         else:
           # Adding locks
           add_locks = lu.add_locks[level]
           lu.remove_locks[level] = add_locks
 
           try:
-            self.context.glm.add(level, add_locks, acquired=1, shared=share)
+            glm.add(level, add_locks, acquired=1, shared=share)
           except errors.LockError:
           except errors.LockError:
+            logging.exception("Detected lock error in level %s for locks"
+                              " %s, shared=%s", level, add_locks, share)
             raise errors.OpPrereqError(
             raise errors.OpPrereqError(
-              "Couldn't add locks (%s), probably because of a race condition"
-              " with another job, who added them first" % add_locks,
-              errors.ECODE_FAULT)
+              "Couldn't add locks (%s), most likely because of another"
+              " job who added them first" % add_locks,
+              errors.ECODE_NOTUNIQUE)
 
         try:
 
         try:
-          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
+          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
         finally:
           if level in lu.remove_locks:
         finally:
           if level in lu.remove_locks:
-            self.context.glm.remove(level, lu.remove_locks[level])
+            glm.remove(level, lu.remove_locks[level])
       finally:
       finally:
-        if self.context.glm.is_owned(level):
-          self.context.glm.release(level)
+        if glm.is_owned(level):
+          glm.release(level)
 
     else:
 
     else:
-      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
+      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
 
     return result
 
 
     return result
 
-  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
+  def ExecOpCode(self, op, cbs, timeout=None):
     """Execute an opcode.
 
     @type op: an OpCode instance
     """Execute an opcode.
 
     @type op: an OpCode instance
@@ -357,8 +492,6 @@ class Processor(object):
     @param cbs: Runtime callbacks
     @type timeout: float or None
     @param timeout: Maximum time to acquire all locks, None for no timeout
     @param cbs: Runtime callbacks
     @type timeout: float or None
     @param timeout: Maximum time to acquire all locks, None for no timeout
-    @type priority: number or None
-    @param priority: Priority for acquiring lock(s)
     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
         amount of time
 
     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
         amount of time
 
@@ -378,25 +511,32 @@ class Processor(object):
 
     self._cbs = cbs
     try:
 
     self._cbs = cbs
     try:
-      # Acquire the Big Ganeti Lock exclusively if this LU requires it,
-      # and in a shared fashion otherwise (to prevent concurrent run with
-      # an exclusive LU.
-      self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
-                          not lu_class.REQ_BGL, calc_timeout(),
-                          priority)
+      if self._enable_locks:
+        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
+        # and in a shared fashion otherwise (to prevent concurrent run with
+        # an exclusive LU.
+        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
+                            not lu_class.REQ_BGL, False, calc_timeout())
+      elif lu_class.REQ_BGL:
+        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
+                                     " disabled" % op.OP_ID)
+
       try:
         lu = lu_class(self, op, self.context, self.rpc)
         lu.ExpandNames()
         assert lu.needed_locks is not None, "needed_locks not set by LU"
 
         try:
       try:
         lu = lu_class(self, op, self.context, self.rpc)
         lu.ExpandNames()
         assert lu.needed_locks is not None, "needed_locks not set by LU"
 
         try:
-          result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
-                                       priority)
+          result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
+                                       calc_timeout)
         finally:
           if self._ec_id:
             self.context.cfg.DropECReservations(self._ec_id)
       finally:
         finally:
           if self._ec_id:
             self.context.cfg.DropECReservations(self._ec_id)
       finally:
-        self.context.glm.release(locking.LEVEL_CLUSTER)
+        # Release BGL if owned
+        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
+          assert self._enable_locks
+          self.context.glm.release(locking.LEVEL_CLUSTER)
     finally:
       self._cbs = None
 
     finally:
       self._cbs = None
 
@@ -404,8 +544,8 @@ class Processor(object):
     if not (resultcheck_fn is None or resultcheck_fn(result)):
       logging.error("Expected opcode result matching %s, got %s",
                     resultcheck_fn, result)
     if not (resultcheck_fn is None or resultcheck_fn(result)):
       logging.error("Expected opcode result matching %s, got %s",
                     resultcheck_fn, result)
-      raise errors.OpResultError("Opcode result does not match %s, got %s" %
-                                 (resultcheck_fn, result[:80]))
+      raise errors.OpResultError("Opcode result does not match %s: %s" %
+                                 (resultcheck_fn, utils.Truncate(result, 80)))
 
     return result
 
 
     return result
 
@@ -463,8 +603,8 @@ class Processor(object):
 
 class HooksMaster(object):
   def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
 
 class HooksMaster(object):
   def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
-    hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, cluster_name=None,
-    master_name=None):
+               hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
+               cluster_name=None, master_name=None):
     """Base class for hooks masters.
 
     This class invokes the execution of hooks according to the behaviour
     """Base class for hooks masters.
 
     This class invokes the execution of hooks according to the behaviour
@@ -563,7 +703,7 @@ class HooksMaster(object):
       "PATH": constants.HOOKS_PATH,
       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
       "GANETI_OP_CODE": self.opcode,
       "PATH": constants.HOOKS_PATH,
       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
       "GANETI_OP_CODE": self.opcode,
-      "GANETI_DATA_DIR": constants.DATA_DIR,
+      "GANETI_DATA_DIR": pathutils.DATA_DIR,
       "GANETI_HOOKS_PHASE": phase,
       "GANETI_HOOKS_PATH": hpath,
       }
       "GANETI_HOOKS_PHASE": phase,
       "GANETI_HOOKS_PATH": hpath,
       }