locking: Implement opportunistic locking in LockSet
[ganeti-local] / lib / mcpu.py
index 6e8be77..f737f7e 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -28,10 +28,12 @@ are two kinds of classes defined:
 
 """
 
+import sys
 import logging
 import random
 import time
 import itertools
+import traceback
 
 from ganeti import opcodes
 from ganeti import constants
@@ -40,6 +42,7 @@ from ganeti import cmdlib
 from ganeti import locking
 from ganeti import utils
 from ganeti import compat
+from ganeti import pathutils
 
 
 _OP_PREFIX = "Op"
@@ -139,10 +142,11 @@ class OpExecCbBase: # pylint: disable=W0232
 
     """
 
-  def CheckCancel(self):
-    """Check whether job has been cancelled.
+  def CurrentPriority(self): # pylint: disable=R0201
+    """Returns current priority or C{None}.
 
     """
+    return None
 
   def SubmitManyJobs(self, jobs):
     """Submits jobs for processing.
@@ -271,7 +275,7 @@ class Processor(object):
     if not self._enable_locks:
       raise errors.ProgrammerError("Attempted to use disabled locks")
 
-  def _AcquireLocks(self, level, names, shared, timeout, priority):
+  def _AcquireLocks(self, level, names, shared, timeout):
     """Acquires locks via the Ganeti lock manager.
 
     @type level: int
@@ -289,7 +293,9 @@ class Processor(object):
     self._CheckLocksEnabled()
 
     if self._cbs:
-      self._cbs.CheckCancel()
+      priority = self._cbs.CurrentPriority()
+    else:
+      priority = None
 
     acquired = self.context.glm.acquire(level, names, shared=shared,
                                         timeout=timeout, priority=priority)
@@ -339,7 +345,7 @@ class Processor(object):
   def BuildHooksManager(self, lu):
     return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
 
-  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
+  def _LockAndExecLU(self, lu, level, calc_timeout):
     """Execute a Logical Unit, with the needed locks.
 
     This is a recursive function that starts locking the given level, and
@@ -353,7 +359,19 @@ class Processor(object):
       if self._cbs:
         self._cbs.NotifyStart()
 
-      result = self._ExecLU(lu)
+      try:
+        result = self._ExecLU(lu)
+      except AssertionError, err:
+        # this is a bit ugly, as we don't know from which phase
+        # (prereq, exec) this comes; but it's better than an exception
+        # with no information
+        (_, _, tb) = sys.exc_info()
+        err_info = traceback.format_tb(tb)
+        del tb
+        logging.exception("Detected AssertionError")
+        raise errors.OpExecError("Internal assertion error: please report"
+                                 " this as a bug.\nError message: '%s';"
+                                 " location:\n%s" % (str(err), err_info[-1]))
 
     elif adding_locks and acquiring_locks:
       # We could both acquire and add locks at the same level, but for now we
@@ -376,7 +394,7 @@ class Processor(object):
           needed_locks = lu.needed_locks[level]
 
           self._AcquireLocks(level, needed_locks, share,
-                             calc_timeout(), priority)
+                             calc_timeout())
         else:
           # Adding locks
           add_locks = lu.add_locks[level]
@@ -385,13 +403,15 @@ class Processor(object):
           try:
             self.context.glm.add(level, add_locks, acquired=1, shared=share)
           except errors.LockError:
+            logging.exception("Detected lock error in level %s for locks"
+                              " %s, shared=%s", level, add_locks, share)
             raise errors.OpPrereqError(
-              "Couldn't add locks (%s), probably because of a race condition"
-              " with another job, who added them first" % add_locks,
-              errors.ECODE_FAULT)
+              "Couldn't add locks (%s), most likely because of another"
+              " job who added them first" % add_locks,
+              errors.ECODE_NOTUNIQUE)
 
         try:
-          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
+          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
         finally:
           if level in lu.remove_locks:
             self.context.glm.remove(level, lu.remove_locks[level])
@@ -400,11 +420,11 @@ class Processor(object):
           self.context.glm.release(level)
 
     else:
-      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
+      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
 
     return result
 
-  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
+  def ExecOpCode(self, op, cbs, timeout=None):
     """Execute an opcode.
 
     @type op: an OpCode instance
@@ -413,8 +433,6 @@ class Processor(object):
     @param cbs: Runtime callbacks
     @type timeout: float or None
     @param timeout: Maximum time to acquire all locks, None for no timeout
-    @type priority: number or None
-    @param priority: Priority for acquiring lock(s)
     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
         amount of time
 
@@ -439,8 +457,7 @@ class Processor(object):
         # and in a shared fashion otherwise (to prevent concurrent run with
         # an exclusive LU.
         self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
-                            not lu_class.REQ_BGL, calc_timeout(),
-                            priority)
+                            not lu_class.REQ_BGL, calc_timeout())
       elif lu_class.REQ_BGL:
         raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
                                      " disabled" % op.OP_ID)
@@ -451,8 +468,8 @@ class Processor(object):
         assert lu.needed_locks is not None, "needed_locks not set by LU"
 
         try:
-          result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
-                                       priority)
+          result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
+                                       calc_timeout)
         finally:
           if self._ec_id:
             self.context.cfg.DropECReservations(self._ec_id)
@@ -527,8 +544,8 @@ class Processor(object):
 
 class HooksMaster(object):
   def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
-    hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, cluster_name=None,
-    master_name=None):
+               hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
+               cluster_name=None, master_name=None):
     """Base class for hooks masters.
 
     This class invokes the execution of hooks according to the behaviour
@@ -627,7 +644,7 @@ class HooksMaster(object):
       "PATH": constants.HOOKS_PATH,
       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
       "GANETI_OP_CODE": self.opcode,
-      "GANETI_DATA_DIR": constants.DATA_DIR,
+      "GANETI_DATA_DIR": pathutils.DATA_DIR,
       "GANETI_HOOKS_PHASE": phase,
       "GANETI_HOOKS_PATH": hpath,
       }