Add the options attribute to cli.JobExecutor
[ganeti-local] / lib / mcpu.py
index 7b910fd..933d596 100644 (file)
@@ -38,7 +38,6 @@ from ganeti import errors
 from ganeti import rpc
 from ganeti import cmdlib
 from ganeti import locking
-from ganeti import utils
 
 
 class _LockAcquireTimeout(Exception):
@@ -47,87 +46,97 @@ class _LockAcquireTimeout(Exception):
   """
 
 
-class _LockTimeoutStrategy(object):
+def _CalculateLockAttemptTimeouts():
+  """Calculate timeouts for lock attempts.
+
+  """
+  result = [1.0]
+
+  # Wait for a total of at least 150s before doing a blocking acquire
+  while sum(result) < 150.0:
+    timeout = (result[-1] * 1.05) ** 1.25
+
+    # Cap timeout at 10 seconds. This gives other jobs a chance to run
+    # even if we're still trying to get our locks, before finally moving
+    # to a blocking acquire.
+    if timeout > 10.0:
+      timeout = 10.0
+
+    elif timeout < 0.1:
+      # Lower boundary for safety
+      timeout = 0.1
+
+    result.append(timeout)
+
+  return result
+
+
+class _LockAttemptTimeoutStrategy(object):
   """Class with lock acquire timeout strategy.
 
   """
   __slots__ = [
-    "_attempts",
+    "_attempt",
     "_random_fn",
     "_start_time",
+    "_time_fn",
+    "_running_timeout",
     ]
 
-  _MAX_ATTEMPTS = 10
-  """How many retries before going into blocking mode"""
-
-  _ATTEMPT_FACTOR = 1.75
-  """Factor between attempts"""
+  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
 
-  def __init__(self, _random_fn=None):
+  def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
     """Initializes this class.
 
+    @type attempt: int
+    @param attempt: Current attempt number
+    @param _time_fn: Time function for unittests
     @param _random_fn: Random number generator for unittests
 
     """
     object.__init__(self)
 
-    self._start_time = None
-    self._attempts = 0
+    if attempt < 0:
+      raise ValueError("Attempt must be zero or positive")
 
-    if _random_fn is None:
-      self._random_fn = random.random
-    else:
-      self._random_fn = _random_fn
+    self._attempt = attempt
+    self._time_fn = _time_fn
+    self._random_fn = _random_fn
+
+    try:
+      timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
+    except IndexError:
+      # No more timeouts, do blocking acquire
+      timeout = None
+
+    self._running_timeout = locking.RunningTimeout(timeout, False,
+                                                   _time_fn=_time_fn)
 
   def NextAttempt(self):
-    """Advances to the next attempt.
+    """Returns the strategy for the next attempt.
 
     """
-    assert self._attempts >= 0
-    self._attempts += 1
+    return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
+                                       _time_fn=self._time_fn,
+                                       _random_fn=self._random_fn)
 
   def CalcRemainingTimeout(self):
     """Returns the remaining timeout.
 
     """
-    assert self._attempts >= 0
-
-    if self._attempts == self._MAX_ATTEMPTS:
-      # Only blocking acquires after 10 retries
-      return None
-
-    if self._attempts > self._MAX_ATTEMPTS:
-      raise RuntimeError("Blocking acquire ran into timeout")
-
-    # Get start time on first calculation
-    if self._start_time is None:
-      self._start_time = time.time()
+    timeout = self._running_timeout.Remaining()
 
-    # Calculate remaining time for this attempt
-    timeout = (self._start_time + (self._ATTEMPT_FACTOR ** self._attempts) -
-               time.time())
-
-    if timeout > 10.0:
-      # Cap timeout at 10 seconds. This gives other jobs a chance to run
-      # even if we're still trying to get our locks, before finally moving
-      # to a blocking acquire.
-      timeout = 10.0
-
-    elif timeout < 0.1:
-      # Lower boundary
-      timeout = 0.1
-
-    # Add a small variation (-/+ 5%) to timeouts. This helps in situations
-    # where two or more jobs are fighting for the same lock(s).
-    variation_range = timeout * 0.1
-    timeout += (self._random_fn() * variation_range) - (variation_range * 0.5)
-
-    assert timeout >= 0.0, "Timeout must be positive"
+    if timeout is not None:
+      # Add a small variation (-/+ 5%) to timeout. This helps in situations
+      # where two or more jobs are fighting for the same lock(s).
+      variation_range = timeout * 0.1
+      timeout += ((self._random_fn() * variation_range) -
+                  (variation_range * 0.5))
 
     return timeout
 
 
-class OpExecCbBase:
+class OpExecCbBase: # pylint: disable-msg=W0232
   """Base class for OpCode execution callbacks.
 
   """
@@ -212,16 +221,22 @@ class Processor(object):
     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
     }
 
-  def __init__(self, context):
+  def __init__(self, context, ec_id):
     """Constructor for Processor
 
+    @type context: GanetiContext
+    @param context: global Ganeti context
+    @type ec_id: string
+    @param ec_id: execution context identifier
+
     """
     self.context = context
+    self._ec_id = ec_id
     self._cbs = None
     self.rpc = rpc.RpcRunner(context.cfg)
     self.hmclass = HooksMaster
 
-  def _ReportLocks(self, level, names, shared, acquired):
+  def _ReportLocks(self, level, names, shared, timeout, acquired, result):
     """Reports lock operations.
 
     @type level: int
@@ -229,18 +244,29 @@ class Processor(object):
     @type names: list or string
     @param names: Lock names
     @type shared: bool
-    @param shared: Whether the lock should be acquired in shared mode
+    @param shared: Whether the locks should be acquired in shared mode
+    @type timeout: None or float
+    @param timeout: Timeout for acquiring the locks
     @type acquired: bool
-    @param acquired: Whether the lock has already been acquired
+    @param acquired: Whether the locks have already been acquired
+    @type result: None or set
+    @param result: Result from L{locking.GanetiLockManager.acquire}
 
     """
     parts = []
 
     # Build message
     if acquired:
-      parts.append("acquired")
+      if result is None:
+        parts.append("timeout")
+      else:
+        parts.append("acquired")
     else:
       parts.append("waiting")
+      if timeout is None:
+        parts.append("blocking")
+      else:
+        parts.append("timeout=%0.6fs" % timeout)
 
     parts.append(locking.LEVEL_NAMES[level])
 
@@ -249,7 +275,7 @@ class Processor(object):
     elif isinstance(names, basestring):
       parts.append(names)
     else:
-      parts.append(",".join(names))
+      parts.append(",".join(sorted(names)))
 
     if shared:
       parts.append("shared")
@@ -263,6 +289,28 @@ class Processor(object):
     if self._cbs:
       self._cbs.ReportLocks(msg)
 
+  def _AcquireLocks(self, level, names, shared, timeout):
+    """Acquires locks via the Ganeti lock manager.
+
+    @type level: int
+    @param level: Lock level
+    @type names: list or string
+    @param names: Lock names
+    @type shared: bool
+    @param shared: Whether the locks should be acquired in shared mode
+    @type timeout: None or float
+    @param timeout: Timeout for acquiring the locks
+
+    """
+    self._ReportLocks(level, names, shared, timeout, False, None)
+
+    acquired = self.context.glm.acquire(level, names, shared=shared,
+                                        timeout=timeout)
+
+    self._ReportLocks(level, names, shared, timeout, True, acquired)
+
+    return acquired
+
   def _ExecLU(self, lu):
     """Logical Unit execution sequence.
 
@@ -328,19 +376,12 @@ class Processor(object):
           # Acquiring locks
           needed_locks = lu.needed_locks[level]
 
-          self._ReportLocks(level, needed_locks, share, False)
-          acquired = self.context.glm.acquire(level,
-                                              needed_locks,
-                                              shared=share,
-                                              timeout=calc_timeout())
-          # TODO: Report timeout
-          self._ReportLocks(level, needed_locks, share, True)
+          acquired = self._AcquireLocks(level, needed_locks, share,
+                                        calc_timeout())
 
           if acquired is None:
             raise _LockAcquireTimeout()
 
-          lu.acquired_locks[level] = acquired
-
         else:
           # Adding locks
           add_locks = lu.add_locks[level]
@@ -351,10 +392,14 @@ class Processor(object):
           except errors.LockError:
             raise errors.OpPrereqError(
               "Couldn't add locks (%s), probably because of a race condition"
-              " with another job, who added them first" % add_locks)
+              " with another job, who added them first" % add_locks,
+              errors.ECODE_FAULT)
+
+          acquired = add_locks
 
-          lu.acquired_locks[level] = add_locks
         try:
+          lu.acquired_locks[level] = acquired
+
           result = self._LockAndExecLU(lu, level + 1, calc_timeout)
         finally:
           if level in lu.remove_locks:
@@ -387,27 +432,17 @@ class Processor(object):
       if lu_class is None:
         raise errors.OpCodeUnknown("Unknown opcode")
 
-      timeout_strategy = _LockTimeoutStrategy()
-      calc_timeout = timeout_strategy.CalcRemainingTimeout
+      timeout_strategy = _LockAttemptTimeoutStrategy()
 
       while True:
         try:
-          self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
-                            not lu_class.REQ_BGL, False)
-          try:
-            # Acquire the Big Ganeti Lock exclusively if this LU requires it,
-            # and in a shared fashion otherwise (to prevent concurrent run with
-            # an exclusive LU.
-            acquired_bgl = self.context.glm.acquire(locking.LEVEL_CLUSTER,
-                                                    [locking.BGL],
-                                                    shared=not lu_class.REQ_BGL,
-                                                    timeout=calc_timeout())
-          finally:
-            # TODO: Report timeout
-            self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
-                              not lu_class.REQ_BGL, True)
+          acquire_timeout = timeout_strategy.CalcRemainingTimeout()
 
-          if acquired_bgl is None:
+          # Acquire the Big Ganeti Lock exclusively if this LU requires it,
+          # and in a shared fashion otherwise (to prevent concurrent run with
+          # an exclusive LU.
+          if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
+                                not lu_class.REQ_BGL, acquire_timeout) is None:
             raise _LockAcquireTimeout()
 
           try:
@@ -415,7 +450,13 @@ class Processor(object):
             lu.ExpandNames()
             assert lu.needed_locks is not None, "needed_locks not set by LU"
 
-            return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
+            try:
+              return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
+                                         timeout_strategy.CalcRemainingTimeout)
+            finally:
+              if self._ec_id:
+                self.context.cfg.DropECReservations(self._ec_id)
+
           finally:
             self.context.glm.release(locking.LEVEL_CLUSTER)
 
@@ -423,7 +464,7 @@ class Processor(object):
           # Timeout while waiting for lock, try again
           pass
 
-        timeout_strategy.NextAttempt()
+        timeout_strategy = timeout_strategy.NextAttempt()
 
     finally:
       self._cbs = None
@@ -470,6 +511,11 @@ class Processor(object):
     logging.info(message)
     self._Feedback(" - INFO: %s" % message)
 
+  def GetECId(self):
+    if not self._ec_id:
+      errors.ProgrammerError("Tried to use execution context id when not set")
+    return self._ec_id
+
 
 class HooksMaster(object):
   """Hooks master.