Improve handling of lock exceptions
[ganeti-local] / lib / mcpu.py
index 96c589b..6ecae61 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -31,6 +31,7 @@ are two kinds of classes defined:
 import logging
 import random
 import time
+import itertools
 
 from ganeti import opcodes
 from ganeti import constants
@@ -171,6 +172,62 @@ def _ComputeDispatchTable():
               if op.WITH_LU)
 
 
+def _SetBaseOpParams(src, defcomment, dst):
+  """Copies basic opcode parameters.
+
+  @type src: L{opcodes.OpCode}
+  @param src: Source opcode
+  @type defcomment: string
+  @param defcomment: Comment to specify if not already given
+  @type dst: L{opcodes.OpCode}
+  @param dst: Destination opcode
+
+  """
+  if hasattr(src, "debug_level"):
+    dst.debug_level = src.debug_level
+
+  if (getattr(dst, "priority", None) is None and
+      hasattr(src, "priority")):
+    dst.priority = src.priority
+
+  if not getattr(dst, opcodes.COMMENT_ATTR, None):
+    dst.comment = defcomment
+
+
+def _ProcessResult(submit_fn, op, result):
+  """Examines opcode result.
+
+  If necessary, additional processing on the result is done.
+
+  """
+  if isinstance(result, cmdlib.ResultWithJobs):
+    # Copy basic parameters (e.g. priority)
+    map(compat.partial(_SetBaseOpParams, op,
+                       "Submitted by %s" % op.OP_ID),
+        itertools.chain(*result.jobs))
+
+    # Submit jobs
+    job_submission = submit_fn(result.jobs)
+
+    # Build dictionary
+    result = result.other
+
+    assert constants.JOB_IDS_KEY not in result, \
+      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
+
+    result[constants.JOB_IDS_KEY] = job_submission
+
+  return result
+
+
+def _FailingSubmitManyJobs(_):
+  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
+
+  """
+  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
+                               " queries) can not submit jobs")
+
+
 def _RpcResultsToHooksResults(rpc_results):
   """Function to convert RPC results to the format expected by HooksMaster.
 
@@ -189,7 +246,7 @@ class Processor(object):
   """Object which runs OpCodes"""
   DISPATCH_TABLE = _ComputeDispatchTable()
 
-  def __init__(self, context, ec_id):
+  def __init__(self, context, ec_id, enable_locks=True):
     """Constructor for Processor
 
     @type context: GanetiContext
@@ -203,6 +260,16 @@ class Processor(object):
     self._cbs = None
     self.rpc = context.rpc
     self.hmclass = HooksMaster
+    self._enable_locks = enable_locks
+
+  def _CheckLocksEnabled(self):
+    """Checks if locking is enabled.
+
+    @raise errors.ProgrammerError: In case locking is not enabled
+
+    """
+    if not self._enable_locks:
+      raise errors.ProgrammerError("Attempted to use disabled locks")
 
   def _AcquireLocks(self, level, names, shared, timeout, priority):
     """Acquires locks via the Ganeti lock manager.
@@ -219,6 +286,8 @@ class Processor(object):
         amount of time
 
     """
+    self._CheckLocksEnabled()
+
     if self._cbs:
       self._cbs.CheckCancel()
 
@@ -230,26 +299,6 @@ class Processor(object):
 
     return acquired
 
-  def _ProcessResult(self, result):
-    """Examines opcode result.
-
-    If necessary, additional processing on the result is done.
-
-    """
-    if isinstance(result, cmdlib.ResultWithJobs):
-      # Submit jobs
-      job_submission = self._cbs.SubmitManyJobs(result.jobs)
-
-      # Build dictionary
-      result = result.other
-
-      assert constants.JOB_IDS_KEY not in result, \
-        "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
-
-      result[constants.JOB_IDS_KEY] = job_submission
-
-    return result
-
   def _ExecLU(self, lu):
     """Logical Unit execution sequence.
 
@@ -270,8 +319,13 @@ class Processor(object):
                    " the operation")
       return lu.dry_run_result
 
+    if self._cbs:
+      submit_mj_fn = self._cbs.SubmitManyJobs
+    else:
+      submit_mj_fn = _FailingSubmitManyJobs
+
     try:
-      result = self._ProcessResult(lu.Exec(self.Log))
+      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
                                 self.Log, result)
@@ -308,6 +362,8 @@ class Processor(object):
                                 " others")
 
     elif adding_locks or acquiring_locks:
+      self._CheckLocksEnabled()
+
       lu.DeclareLocks(level)
       share = lu.share_locks[level]
 
@@ -329,10 +385,12 @@ class Processor(object):
           try:
             self.context.glm.add(level, add_locks, acquired=1, shared=share)
           except errors.LockError:
+            logging.exception("Detected lock error in level %s for locks"
+                              " %s, shared=%s", level, add_locks, share)
             raise errors.OpPrereqError(
-              "Couldn't add locks (%s), probably because of a race condition"
-              " with another job, who added them first" % add_locks,
-              errors.ECODE_FAULT)
+              "Couldn't add locks (%s), most likely because of another"
+              " job who added them first" % add_locks,
+              errors.ECODE_NOTUNIQUE)
 
         try:
           result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
@@ -378,12 +436,17 @@ class Processor(object):
 
     self._cbs = cbs
     try:
-      # Acquire the Big Ganeti Lock exclusively if this LU requires it,
-      # and in a shared fashion otherwise (to prevent concurrent run with
-      # an exclusive LU.
-      self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
-                          not lu_class.REQ_BGL, calc_timeout(),
-                          priority)
+      if self._enable_locks:
+        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
+        # and in a shared fashion otherwise (to prevent concurrent run with
+        # an exclusive LU.
+        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
+                            not lu_class.REQ_BGL, calc_timeout(),
+                            priority)
+      elif lu_class.REQ_BGL:
+        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
+                                     " disabled" % op.OP_ID)
+
       try:
         lu = lu_class(self, op, self.context, self.rpc)
         lu.ExpandNames()
@@ -396,7 +459,10 @@ class Processor(object):
           if self._ec_id:
             self.context.cfg.DropECReservations(self._ec_id)
       finally:
-        self.context.glm.release(locking.LEVEL_CLUSTER)
+        # Release BGL if owned
+        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
+          assert self._enable_locks
+          self.context.glm.release(locking.LEVEL_CLUSTER)
     finally:
       self._cbs = None
 
@@ -404,8 +470,8 @@ class Processor(object):
     if not (resultcheck_fn is None or resultcheck_fn(result)):
       logging.error("Expected opcode result matching %s, got %s",
                     resultcheck_fn, result)
-      raise errors.OpResultError("Opcode result does not match %s, got %s" %
-                                 (resultcheck_fn, result[:80]))
+      raise errors.OpResultError("Opcode result does not match %s: %s" %
+                                 (resultcheck_fn, utils.Truncate(result, 80)))
 
     return result