gnt-*: Print better error message for uninitialized cluster
[ganeti-local] / lib / mcpu.py
index f9d5ab7..f8434dc 100644 (file)
@@ -29,6 +29,8 @@ are two kinds of classes defined:
 """
 
 import logging
+import random
+import time
 
 from ganeti import opcodes
 from ganeti import constants
@@ -36,7 +38,103 @@ from ganeti import errors
 from ganeti import rpc
 from ganeti import cmdlib
 from ganeti import locking
-from ganeti import utils
+
+
+class _LockAcquireTimeout(Exception):
+  """Internal exception to report timeouts on acquiring locks.
+
+  """
+
+
+def _CalculateLockAttemptTimeouts():
+  """Calculate timeouts for lock attempts.
+
+  """
+  running_sum = 0
+  result = [1.0]
+
+  # Wait for a total of at least 150s before doing a blocking acquire
+  while sum(result) < 150.0:
+    timeout = (result[-1] * 1.05) ** 1.25
+
+    # Cap timeout at 10 seconds. This gives other jobs a chance to run
+    # even if we're still trying to get our locks, before finally moving
+    # to a blocking acquire.
+    if timeout > 10.0:
+      timeout = 10.0
+
+    elif timeout < 0.1:
+      # Lower boundary for safety
+      timeout = 0.1
+
+    result.append(timeout)
+
+  return result
+
+
+class _LockAttemptTimeoutStrategy(object):
+  """Class with lock acquire timeout strategy.
+
+  """
+  __slots__ = [
+    "_attempt",
+    "_random_fn",
+    "_start_time",
+    "_time_fn",
+    "_running_timeout",
+    ]
+
+  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
+
+  def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
+    """Initializes this class.
+
+    @type attempt: int
+    @param attempt: Current attempt number
+    @param _time_fn: Time function for unittests
+    @param _random_fn: Random number generator for unittests
+
+    """
+    object.__init__(self)
+
+    if attempt < 0:
+      raise ValueError("Attempt must be zero or positive")
+
+    self._attempt = attempt
+    self._time_fn = _time_fn
+    self._random_fn = _random_fn
+
+    try:
+      timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
+    except IndexError:
+      # No more timeouts, do blocking acquire
+      timeout = None
+
+    self._running_timeout = locking.RunningTimeout(timeout, False,
+                                                   _time_fn=_time_fn)
+
+  def NextAttempt(self):
+    """Returns the strategy for the next attempt.
+
+    """
+    return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
+                                       _time_fn=self._time_fn,
+                                       _random_fn=self._random_fn)
+
+  def CalcRemainingTimeout(self):
+    """Returns the remaining timeout.
+
+    """
+    timeout = self._running_timeout.Remaining()
+
+    if timeout is not None:
+      # Add a small variation (-/+ 5%) to timeout. This helps in situations
+      # where two or more jobs are fighting for the same lock(s).
+      variation_range = timeout * 0.1
+      timeout += ((self._random_fn() * variation_range) -
+                  (variation_range * 0.5))
+
+    return timeout
 
 
 class OpExecCbBase:
@@ -130,11 +228,10 @@ class Processor(object):
     """
     self.context = context
     self._cbs = None
-    self.exclusive_BGL = False
     self.rpc = rpc.RpcRunner(context.cfg)
     self.hmclass = HooksMaster
 
-  def _ReportLocks(self, level, names, shared, acquired):
+  def _ReportLocks(self, level, names, shared, timeout, acquired, result):
     """Reports lock operations.
 
     @type level: int
@@ -142,18 +239,29 @@ class Processor(object):
     @type names: list or string
     @param names: Lock names
     @type shared: bool
-    @param shared: Whether the lock should be acquired in shared mode
+    @param shared: Whether the locks should be acquired in shared mode
+    @type timeout: None or float
+    @param timeout: Timeout for acquiring the locks
     @type acquired: bool
-    @param acquired: Whether the lock has already been acquired
+    @param acquired: Whether the locks have already been acquired
+    @type result: None or set
+    @param result: Result from L{locking.GanetiLockManager.acquire}
 
     """
     parts = []
 
     # Build message
     if acquired:
-      parts.append("acquired")
+      if result is None:
+        parts.append("timeout")
+      else:
+        parts.append("acquired")
     else:
       parts.append("waiting")
+      if timeout is None:
+        parts.append("blocking")
+      else:
+        parts.append("timeout=%0.6fs" % timeout)
 
     parts.append(locking.LEVEL_NAMES[level])
 
@@ -176,6 +284,28 @@ class Processor(object):
     if self._cbs:
       self._cbs.ReportLocks(msg)
 
+  def _AcquireLocks(self, level, names, shared, timeout):
+    """Acquires locks via the Ganeti lock manager.
+
+    @type level: int
+    @param level: Lock level
+    @type names: list or string
+    @param names: Lock names
+    @type shared: bool
+    @param shared: Whether the locks should be acquired in shared mode
+    @type timeout: None or float
+    @param timeout: Timeout for acquiring the locks
+
+    """
+    self._ReportLocks(level, names, shared, timeout, False, None)
+
+    acquired = self.context.glm.acquire(level, names, shared=shared,
+                                        timeout=timeout)
+
+    self._ReportLocks(level, names, shared, timeout, True, acquired)
+
+    return acquired
+
   def _ExecLU(self, lu):
     """Logical Unit execution sequence.
 
@@ -207,7 +337,7 @@ class Processor(object):
 
     return result
 
-  def _LockAndExecLU(self, lu, level):
+  def _LockAndExecLU(self, lu, level, calc_timeout):
     """Execute a Logical Unit, with the needed locks.
 
     This is a recursive function that starts locking the given level, and
@@ -222,45 +352,59 @@ class Processor(object):
         self._cbs.NotifyStart()
 
       result = self._ExecLU(lu)
+
     elif adding_locks and acquiring_locks:
       # We could both acquire and add locks at the same level, but for now we
       # don't need this, so we'll avoid the complicated code needed.
-      raise NotImplementedError(
-        "Can't declare locks to acquire when adding others")
+      raise NotImplementedError("Can't declare locks to acquire when adding"
+                                " others")
+
     elif adding_locks or acquiring_locks:
       lu.DeclareLocks(level)
       share = lu.share_locks[level]
-      if acquiring_locks:
-        needed_locks = lu.needed_locks[level]
-
-        self._ReportLocks(level, needed_locks, share, False)
-        lu.acquired_locks[level] = self.context.glm.acquire(level,
-                                                            needed_locks,
-                                                            shared=share)
-        self._ReportLocks(level, needed_locks, share, True)
-
-      else: # adding_locks
-        add_locks = lu.add_locks[level]
-        lu.remove_locks[level] = add_locks
-        try:
-          self.context.glm.add(level, add_locks, acquired=1, shared=share)
-        except errors.LockError:
-          raise errors.OpPrereqError(
-            "Couldn't add locks (%s), probably because of a race condition"
-            " with another job, who added them first" % add_locks)
+
       try:
+        assert adding_locks ^ acquiring_locks, \
+          "Locks must be either added or acquired"
+
+        if acquiring_locks:
+          # Acquiring locks
+          needed_locks = lu.needed_locks[level]
+
+          acquired = self._AcquireLocks(level, needed_locks, share,
+                                        calc_timeout())
+
+          if acquired is None:
+            raise _LockAcquireTimeout()
+
+        else:
+          # Adding locks
+          add_locks = lu.add_locks[level]
+          lu.remove_locks[level] = add_locks
+
+          try:
+            self.context.glm.add(level, add_locks, acquired=1, shared=share)
+          except errors.LockError:
+            raise errors.OpPrereqError(
+              "Couldn't add locks (%s), probably because of a race condition"
+              " with another job, who added them first" % add_locks,
+              errors.ECODE_FAULT)
+
+          acquired = add_locks
+
         try:
-          if adding_locks:
-            lu.acquired_locks[level] = add_locks
-          result = self._LockAndExecLU(lu, level + 1)
+          lu.acquired_locks[level] = acquired
+
+          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
         finally:
           if level in lu.remove_locks:
             self.context.glm.remove(level, lu.remove_locks[level])
       finally:
         if self.context.glm.is_owned(level):
           self.context.glm.release(level)
+
     else:
-      result = self._LockAndExecLU(lu, level + 1)
+      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
 
     return result
 
@@ -283,31 +427,38 @@ class Processor(object):
       if lu_class is None:
         raise errors.OpCodeUnknown("Unknown opcode")
 
-      # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
-      # shared fashion otherwise (to prevent concurrent run with an exclusive
-      # LU.
-      self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
-                        not lu_class.REQ_BGL, False)
-      try:
-        self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
-                                 shared=not lu_class.REQ_BGL)
-      finally:
-        self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
-                          not lu_class.REQ_BGL, True)
-      try:
-        self.exclusive_BGL = lu_class.REQ_BGL
-        lu = lu_class(self, op, self.context, self.rpc)
-        lu.ExpandNames()
-        assert lu.needed_locks is not None, "needed_locks not set by LU"
-        result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
-      finally:
-        self.context.glm.release(locking.LEVEL_CLUSTER)
-        self.exclusive_BGL = False
+      timeout_strategy = _LockAttemptTimeoutStrategy()
+
+      while True:
+        try:
+          acquire_timeout = timeout_strategy.CalcRemainingTimeout()
+
+          # Acquire the Big Ganeti Lock exclusively if this LU requires it,
+          # and in a shared fashion otherwise (to prevent concurrent run with
+          # an exclusive LU.
+          if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
+                                not lu_class.REQ_BGL, acquire_timeout) is None:
+            raise _LockAcquireTimeout()
+
+          try:
+            lu = lu_class(self, op, self.context, self.rpc)
+            lu.ExpandNames()
+            assert lu.needed_locks is not None, "needed_locks not set by LU"
+
+            return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
+                                       timeout_strategy.CalcRemainingTimeout)
+          finally:
+            self.context.glm.release(locking.LEVEL_CLUSTER)
+
+        except _LockAcquireTimeout:
+          # Timeout while waiting for lock, try again
+          pass
+
+        timeout_strategy = timeout_strategy.NextAttempt()
+
     finally:
       self._cbs = None
 
-    return result
-
   def _Feedback(self, *args):
     """Forward call to feedback callback function.