locking: Implement opportunistic locking in LockSet
[ganeti-local] / lib / mcpu.py
index 96c589b..f737f7e 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -28,9 +28,12 @@ are two kinds of classes defined:
 
 """
 
+import sys
 import logging
 import random
 import time
+import itertools
+import traceback
 
 from ganeti import opcodes
 from ganeti import constants
@@ -39,6 +42,7 @@ from ganeti import cmdlib
 from ganeti import locking
 from ganeti import utils
 from ganeti import compat
+from ganeti import pathutils
 
 
 _OP_PREFIX = "Op"
@@ -138,10 +142,11 @@ class OpExecCbBase: # pylint: disable=W0232
 
     """
 
-  def CheckCancel(self):
-    """Check whether job has been cancelled.
+  def CurrentPriority(self): # pylint: disable=R0201
+    """Returns current priority or C{None}.
 
     """
+    return None
 
   def SubmitManyJobs(self, jobs):
     """Submits jobs for processing.
@@ -171,6 +176,62 @@ def _ComputeDispatchTable():
               if op.WITH_LU)
 
 
+def _SetBaseOpParams(src, defcomment, dst):
+  """Copies basic opcode parameters.
+
+  @type src: L{opcodes.OpCode}
+  @param src: Source opcode
+  @type defcomment: string
+  @param defcomment: Comment to specify if not already given
+  @type dst: L{opcodes.OpCode}
+  @param dst: Destination opcode
+
+  """
+  if hasattr(src, "debug_level"):
+    dst.debug_level = src.debug_level
+
+  if (getattr(dst, "priority", None) is None and
+      hasattr(src, "priority")):
+    dst.priority = src.priority
+
+  if not getattr(dst, opcodes.COMMENT_ATTR, None):
+    dst.comment = defcomment
+
+
+def _ProcessResult(submit_fn, op, result):
+  """Examines opcode result.
+
+  If necessary, additional processing on the result is done.
+
+  """
+  if isinstance(result, cmdlib.ResultWithJobs):
+    # Copy basic parameters (e.g. priority)
+    map(compat.partial(_SetBaseOpParams, op,
+                       "Submitted by %s" % op.OP_ID),
+        itertools.chain(*result.jobs))
+
+    # Submit jobs
+    job_submission = submit_fn(result.jobs)
+
+    # Build dictionary
+    result = result.other
+
+    assert constants.JOB_IDS_KEY not in result, \
+      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
+
+    result[constants.JOB_IDS_KEY] = job_submission
+
+  return result
+
+
+def _FailingSubmitManyJobs(_):
+  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
+
+  """
+  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
+                               " queries) can not submit jobs")
+
+
 def _RpcResultsToHooksResults(rpc_results):
   """Function to convert RPC results to the format expected by HooksMaster.
 
@@ -189,7 +250,7 @@ class Processor(object):
   """Object which runs OpCodes"""
   DISPATCH_TABLE = _ComputeDispatchTable()
 
-  def __init__(self, context, ec_id):
+  def __init__(self, context, ec_id, enable_locks=True):
     """Constructor for Processor
 
     @type context: GanetiContext
@@ -203,8 +264,18 @@ class Processor(object):
     self._cbs = None
     self.rpc = context.rpc
     self.hmclass = HooksMaster
+    self._enable_locks = enable_locks
 
-  def _AcquireLocks(self, level, names, shared, timeout, priority):
+  def _CheckLocksEnabled(self):
+    """Checks if locking is enabled.
+
+    @raise errors.ProgrammerError: In case locking is not enabled
+
+    """
+    if not self._enable_locks:
+      raise errors.ProgrammerError("Attempted to use disabled locks")
+
+  def _AcquireLocks(self, level, names, shared, timeout):
     """Acquires locks via the Ganeti lock manager.
 
     @type level: int
@@ -219,8 +290,12 @@ class Processor(object):
         amount of time
 
     """
+    self._CheckLocksEnabled()
+
     if self._cbs:
-      self._cbs.CheckCancel()
+      priority = self._cbs.CurrentPriority()
+    else:
+      priority = None
 
     acquired = self.context.glm.acquire(level, names, shared=shared,
                                         timeout=timeout, priority=priority)
@@ -230,26 +305,6 @@ class Processor(object):
 
     return acquired
 
-  def _ProcessResult(self, result):
-    """Examines opcode result.
-
-    If necessary, additional processing on the result is done.
-
-    """
-    if isinstance(result, cmdlib.ResultWithJobs):
-      # Submit jobs
-      job_submission = self._cbs.SubmitManyJobs(result.jobs)
-
-      # Build dictionary
-      result = result.other
-
-      assert constants.JOB_IDS_KEY not in result, \
-        "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
-
-      result[constants.JOB_IDS_KEY] = job_submission
-
-    return result
-
   def _ExecLU(self, lu):
     """Logical Unit execution sequence.
 
@@ -270,8 +325,13 @@ class Processor(object):
                    " the operation")
       return lu.dry_run_result
 
+    if self._cbs:
+      submit_mj_fn = self._cbs.SubmitManyJobs
+    else:
+      submit_mj_fn = _FailingSubmitManyJobs
+
     try:
-      result = self._ProcessResult(lu.Exec(self.Log))
+      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
                                 self.Log, result)
@@ -285,7 +345,7 @@ class Processor(object):
   def BuildHooksManager(self, lu):
     return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
 
-  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
+  def _LockAndExecLU(self, lu, level, calc_timeout):
     """Execute a Logical Unit, with the needed locks.
 
     This is a recursive function that starts locking the given level, and
@@ -299,7 +359,19 @@ class Processor(object):
       if self._cbs:
         self._cbs.NotifyStart()
 
-      result = self._ExecLU(lu)
+      try:
+        result = self._ExecLU(lu)
+      except AssertionError, err:
+        # this is a bit ugly, as we don't know from which phase
+        # (prereq, exec) this comes; but it's better than an exception
+        # with no information
+        (_, _, tb) = sys.exc_info()
+        err_info = traceback.format_tb(tb)
+        del tb
+        logging.exception("Detected AssertionError")
+        raise errors.OpExecError("Internal assertion error: please report"
+                                 " this as a bug.\nError message: '%s';"
+                                 " location:\n%s" % (str(err), err_info[-1]))
 
     elif adding_locks and acquiring_locks:
       # We could both acquire and add locks at the same level, but for now we
@@ -308,6 +380,8 @@ class Processor(object):
                                 " others")
 
     elif adding_locks or acquiring_locks:
+      self._CheckLocksEnabled()
+
       lu.DeclareLocks(level)
       share = lu.share_locks[level]
 
@@ -320,7 +394,7 @@ class Processor(object):
           needed_locks = lu.needed_locks[level]
 
           self._AcquireLocks(level, needed_locks, share,
-                             calc_timeout(), priority)
+                             calc_timeout())
         else:
           # Adding locks
           add_locks = lu.add_locks[level]
@@ -329,13 +403,15 @@ class Processor(object):
           try:
             self.context.glm.add(level, add_locks, acquired=1, shared=share)
           except errors.LockError:
+            logging.exception("Detected lock error in level %s for locks"
+                              " %s, shared=%s", level, add_locks, share)
             raise errors.OpPrereqError(
-              "Couldn't add locks (%s), probably because of a race condition"
-              " with another job, who added them first" % add_locks,
-              errors.ECODE_FAULT)
+              "Couldn't add locks (%s), most likely because of another"
+              " job who added them first" % add_locks,
+              errors.ECODE_NOTUNIQUE)
 
         try:
-          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
+          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
         finally:
           if level in lu.remove_locks:
             self.context.glm.remove(level, lu.remove_locks[level])
@@ -344,11 +420,11 @@ class Processor(object):
           self.context.glm.release(level)
 
     else:
-      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
+      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
 
     return result
 
-  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
+  def ExecOpCode(self, op, cbs, timeout=None):
     """Execute an opcode.
 
     @type op: an OpCode instance
@@ -357,8 +433,6 @@ class Processor(object):
     @param cbs: Runtime callbacks
     @type timeout: float or None
     @param timeout: Maximum time to acquire all locks, None for no timeout
-    @type priority: number or None
-    @param priority: Priority for acquiring lock(s)
     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
         amount of time
 
@@ -378,25 +452,32 @@ class Processor(object):
 
     self._cbs = cbs
     try:
-      # Acquire the Big Ganeti Lock exclusively if this LU requires it,
-      # and in a shared fashion otherwise (to prevent concurrent run with
-      # an exclusive LU.
-      self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
-                          not lu_class.REQ_BGL, calc_timeout(),
-                          priority)
+      if self._enable_locks:
+        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
+        # and in a shared fashion otherwise (to prevent concurrent run with
+        # an exclusive LU.
+        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
+                            not lu_class.REQ_BGL, calc_timeout())
+      elif lu_class.REQ_BGL:
+        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
+                                     " disabled" % op.OP_ID)
+
       try:
         lu = lu_class(self, op, self.context, self.rpc)
         lu.ExpandNames()
         assert lu.needed_locks is not None, "needed_locks not set by LU"
 
         try:
-          result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
-                                       priority)
+          result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
+                                       calc_timeout)
         finally:
           if self._ec_id:
             self.context.cfg.DropECReservations(self._ec_id)
       finally:
-        self.context.glm.release(locking.LEVEL_CLUSTER)
+        # Release BGL if owned
+        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
+          assert self._enable_locks
+          self.context.glm.release(locking.LEVEL_CLUSTER)
     finally:
       self._cbs = None
 
@@ -404,8 +485,8 @@ class Processor(object):
     if not (resultcheck_fn is None or resultcheck_fn(result)):
       logging.error("Expected opcode result matching %s, got %s",
                     resultcheck_fn, result)
-      raise errors.OpResultError("Opcode result does not match %s, got %s" %
-                                 (resultcheck_fn, result[:80]))
+      raise errors.OpResultError("Opcode result does not match %s: %s" %
+                                 (resultcheck_fn, utils.Truncate(result, 80)))
 
     return result
 
@@ -463,8 +544,8 @@ class Processor(object):
 
 class HooksMaster(object):
   def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
-    hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, cluster_name=None,
-    master_name=None):
+               hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
+               cluster_name=None, master_name=None):
     """Base class for hooks masters.
 
     This class invokes the execution of hooks according to the behaviour
@@ -563,7 +644,7 @@ class HooksMaster(object):
       "PATH": constants.HOOKS_PATH,
       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
       "GANETI_OP_CODE": self.opcode,
-      "GANETI_DATA_DIR": constants.DATA_DIR,
+      "GANETI_DATA_DIR": pathutils.DATA_DIR,
       "GANETI_HOOKS_PHASE": phase,
       "GANETI_HOOKS_PATH": hpath,
       }