Add a new NodeGroup config object
[ganeti-local] / lib / mcpu.py
index 647a69a..4039d75 100644 (file)
@@ -50,7 +50,6 @@ def _CalculateLockAttemptTimeouts():
   """Calculate timeouts for lock attempts.
 
   """
-  running_sum = 0
   result = [1.0]
 
   # Wait for a total of at least 150s before doing a blocking acquire
@@ -81,6 +80,7 @@ class _LockAttemptTimeoutStrategy(object):
     "_random_fn",
     "_start_time",
     "_time_fn",
+    "_running_timeout",
     ]
 
   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
@@ -103,7 +103,14 @@ class _LockAttemptTimeoutStrategy(object):
     self._time_fn = _time_fn
     self._random_fn = _random_fn
 
-    self._start_time = None
+    try:
+      timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
+    except IndexError:
+      # No more timeouts, do blocking acquire
+      timeout = None
+
+    self._running_timeout = locking.RunningTimeout(timeout, False,
+                                                   _time_fn=_time_fn)
 
   def NextAttempt(self):
     """Returns the strategy for the next attempt.
@@ -117,31 +124,19 @@ class _LockAttemptTimeoutStrategy(object):
     """Returns the remaining timeout.
 
     """
-    try:
-      timeout = self._TIMEOUT_PER_ATTEMPT[self._attempt]
-    except IndexError:
-      # No more timeouts, do blocking acquire
-      return None
+    timeout = self._running_timeout.Remaining()
 
-    # Get start time on first calculation
-    if self._start_time is None:
-      self._start_time = self._time_fn()
+    if timeout is not None:
+      # Add a small variation (-/+ 5%) to timeout. This helps in situations
+      # where two or more jobs are fighting for the same lock(s).
+      variation_range = timeout * 0.1
+      timeout += ((self._random_fn() * variation_range) -
+                  (variation_range * 0.5))
 
-    # Calculate remaining time for this attempt
-    remaining_timeout = self._start_time + timeout - self._time_fn()
+    return timeout
 
-    # Add a small variation (-/+ 5%) to timeouts. This helps in situations
-    # where two or more jobs are fighting for the same lock(s).
-    variation_range = remaining_timeout * 0.1
-    remaining_timeout += ((self._random_fn() * variation_range) -
-                          (variation_range * 0.5))
 
-    assert remaining_timeout >= 0.0, "Timeout must be positive"
-
-    return remaining_timeout
-
-
-class OpExecCbBase:
+class OpExecCbBase: # pylint: disable-msg=W0232
   """Base class for OpCode execution callbacks.
 
   """
@@ -188,8 +183,8 @@ class Processor(object):
     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
     opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
-    opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
     opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
+    opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy,
     # instance lu
     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
@@ -214,6 +209,7 @@ class Processor(object):
     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
     # exports lu
     opcodes.OpQueryExports: cmdlib.LUQueryExports,
+    opcodes.OpPrepareExport: cmdlib.LUPrepareExport,
     opcodes.OpExportInstance: cmdlib.LUExportInstance,
     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
     # tags lu
@@ -224,13 +220,20 @@ class Processor(object):
     # test lu
     opcodes.OpTestDelay: cmdlib.LUTestDelay,
     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
+    opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue,
     }
 
-  def __init__(self, context):
+  def __init__(self, context, ec_id):
     """Constructor for Processor
 
+    @type context: GanetiContext
+    @param context: global Ganeti context
+    @type ec_id: string
+    @param ec_id: execution context identifier
+
     """
     self.context = context
+    self._ec_id = ec_id
     self._cbs = None
     self.rpc = rpc.RpcRunner(context.cfg)
     self.hmclass = HooksMaster
@@ -274,7 +277,7 @@ class Processor(object):
     elif isinstance(names, basestring):
       parts.append(names)
     else:
-      parts.append(",".join(names))
+      parts.append(",".join(sorted(names)))
 
     if shared:
       parts.append("shared")
@@ -319,7 +322,7 @@ class Processor(object):
     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
-                     self._Feedback, None)
+                     self.Log, None)
 
     if getattr(lu.op, "dry_run", False):
       # in this mode, no post-hooks are run, and the config is not
@@ -330,10 +333,10 @@ class Processor(object):
       return lu.dry_run_result
 
     try:
-      result = lu.Exec(self._Feedback)
+      result = lu.Exec(self.Log)
       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
-                                self._Feedback, result)
+                                self.Log, result)
     finally:
       # FIXME: This needs locks if not lu_class.REQ_BGL
       if write_count != self.context.cfg.write_count:
@@ -391,7 +394,8 @@ class Processor(object):
           except errors.LockError:
             raise errors.OpPrereqError(
               "Couldn't add locks (%s), probably because of a race condition"
-              " with another job, who added them first" % add_locks)
+              " with another job, who added them first" % add_locks,
+              errors.ECODE_FAULT)
 
           acquired = add_locks
 
@@ -448,8 +452,13 @@ class Processor(object):
             lu.ExpandNames()
             assert lu.needed_locks is not None, "needed_locks not set by LU"
 
-            return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
-                                       timeout_strategy.CalcRemainingTimeout)
+            try:
+              return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
+                                         timeout_strategy.CalcRemainingTimeout)
+            finally:
+              if self._ec_id:
+                self.context.cfg.DropECReservations(self._ec_id)
+
           finally:
             self.context.glm.release(locking.LEVEL_CLUSTER)
 
@@ -462,7 +471,7 @@ class Processor(object):
     finally:
       self._cbs = None
 
-  def _Feedback(self, *args):
+  def Log(self, *args):
     """Forward call to feedback callback function.
 
     """
@@ -474,7 +483,7 @@ class Processor(object):
 
     """
     logging.debug("Step %d/%d %s", current, total, message)
-    self._Feedback("STEP %d/%d %s" % (current, total, message))
+    self.Log("STEP %d/%d %s" % (current, total, message))
 
   def LogWarning(self, message, *args, **kwargs):
     """Log a warning to the logs and the user.
@@ -491,9 +500,9 @@ class Processor(object):
       message = message % tuple(args)
     if message:
       logging.warning(message)
-      self._Feedback(" - WARNING: %s" % message)
+      self.Log(" - WARNING: %s" % message)
     if "hint" in kwargs:
-      self._Feedback("      Hint: %s" % kwargs["hint"])
+      self.Log("      Hint: %s" % kwargs["hint"])
 
   def LogInfo(self, message, *args):
     """Log an informational message to the logs and the user.
@@ -502,7 +511,12 @@ class Processor(object):
     if args:
       message = message % tuple(args)
     logging.info(message)
-    self._Feedback(" - INFO: %s" % message)
+    self.Log(" - INFO: %s" % message)
+
+  def GetECId(self):
+    if not self._ec_id:
+      errors.ProgrammerError("Tried to use execution context id when not set")
+    return self._ec_id
 
 
 class HooksMaster(object):