_GetUpdatedParams: enhance value removal options
[ganeti-local] / lib / mcpu.py
index f67114c..3198780 100644 (file)
@@ -50,7 +50,6 @@ def _CalculateLockAttemptTimeouts():
   """Calculate timeouts for lock attempts.
 
   """
-  running_sum = 0
   result = [1.0]
 
   # Wait for a total of at least 150s before doing a blocking acquire
@@ -81,6 +80,7 @@ class _LockAttemptTimeoutStrategy(object):
     "_random_fn",
     "_start_time",
     "_time_fn",
+    "_running_timeout",
     ]
 
   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
@@ -103,7 +103,14 @@ class _LockAttemptTimeoutStrategy(object):
     self._time_fn = _time_fn
     self._random_fn = _random_fn
 
-    self._start_time = None
+    try:
+      timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
+    except IndexError:
+      # No more timeouts, do blocking acquire
+      timeout = None
+
+    self._running_timeout = locking.RunningTimeout(timeout, False,
+                                                   _time_fn=_time_fn)
 
   def NextAttempt(self):
     """Returns the strategy for the next attempt.
@@ -117,31 +124,19 @@ class _LockAttemptTimeoutStrategy(object):
     """Returns the remaining timeout.
 
     """
-    try:
-      timeout = self._TIMEOUT_PER_ATTEMPT[self._attempt]
-    except IndexError:
-      # No more timeouts, do blocking acquire
-      return None
-
-    # Get start time on first calculation
-    if self._start_time is None:
-      self._start_time = self._time_fn()
-
-    # Calculate remaining time for this attempt
-    remaining_timeout = self._start_time + timeout - self._time_fn()
+    timeout = self._running_timeout.Remaining()
 
-    # Add a small variation (-/+ 5%) to timeouts. This helps in situations
-    # where two or more jobs are fighting for the same lock(s).
-    variation_range = remaining_timeout * 0.1
-    remaining_timeout += ((self._random_fn() * variation_range) -
-                          (variation_range * 0.5))
+    if timeout is not None:
+      # Add a small variation (-/+ 5%) to timeout. This helps in situations
+      # where two or more jobs are fighting for the same lock(s).
+      variation_range = timeout * 0.1
+      timeout += ((self._random_fn() * variation_range) -
+                  (variation_range * 0.5))
 
-    assert remaining_timeout >= 0.0, "Timeout must be positive"
+    return timeout
 
-    return remaining_timeout
 
-
-class OpExecCbBase:
+class OpExecCbBase: # pylint: disable-msg=W0232
   """Base class for OpCode execution callbacks.
 
   """
@@ -190,6 +185,7 @@ class Processor(object):
     opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
     opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
     opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
+    opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy,
     # instance lu
     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
@@ -214,6 +210,7 @@ class Processor(object):
     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
     # exports lu
     opcodes.OpQueryExports: cmdlib.LUQueryExports,
+    opcodes.OpPrepareExport: cmdlib.LUPrepareExport,
     opcodes.OpExportInstance: cmdlib.LUExportInstance,
     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
     # tags lu
@@ -226,11 +223,17 @@ class Processor(object):
     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
     }
 
-  def __init__(self, context):
+  def __init__(self, context, ec_id):
     """Constructor for Processor
 
+    @type context: GanetiContext
+    @param context: global Ganeti context
+    @type ec_id: string
+    @param ec_id: execution context identifier
+
     """
     self.context = context
+    self._ec_id = ec_id
     self._cbs = None
     self.rpc = rpc.RpcRunner(context.cfg)
     self.hmclass = HooksMaster
@@ -274,7 +277,7 @@ class Processor(object):
     elif isinstance(names, basestring):
       parts.append(names)
     else:
-      parts.append(",".join(names))
+      parts.append(",".join(sorted(names)))
 
     if shared:
       parts.append("shared")
@@ -381,8 +384,6 @@ class Processor(object):
           if acquired is None:
             raise _LockAcquireTimeout()
 
-          lu.acquired_locks[level] = acquired
-
         else:
           # Adding locks
           add_locks = lu.add_locks[level]
@@ -393,10 +394,14 @@ class Processor(object):
           except errors.LockError:
             raise errors.OpPrereqError(
               "Couldn't add locks (%s), probably because of a race condition"
-              " with another job, who added them first" % add_locks)
+              " with another job, who added them first" % add_locks,
+              errors.ECODE_FAULT)
+
+          acquired = add_locks
 
-          lu.acquired_locks[level] = add_locks
         try:
+          lu.acquired_locks[level] = acquired
+
           result = self._LockAndExecLU(lu, level + 1, calc_timeout)
         finally:
           if level in lu.remove_locks:
@@ -447,8 +452,13 @@ class Processor(object):
             lu.ExpandNames()
             assert lu.needed_locks is not None, "needed_locks not set by LU"
 
-            return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
-                                       timeout_strategy.CalcRemainingTimeout)
+            try:
+              return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
+                                         timeout_strategy.CalcRemainingTimeout)
+            finally:
+              if self._ec_id:
+                self.context.cfg.DropECReservations(self._ec_id)
+
           finally:
             self.context.glm.release(locking.LEVEL_CLUSTER)
 
@@ -503,6 +513,11 @@ class Processor(object):
     logging.info(message)
     self._Feedback(" - INFO: %s" % message)
 
+  def GetECId(self):
+    if not self._ec_id:
+      errors.ProgrammerError("Tried to use execution context id when not set")
+    return self._ec_id
+
 
 class HooksMaster(object):
   """Hooks master.