Fix handling of ^C in the CLI scripts
[ganeti-local] / lib / mcpu.py
index 1ad93ca..7210178 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2011 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -38,95 +38,90 @@ from ganeti import errors
 from ganeti import rpc
 from ganeti import cmdlib
 from ganeti import locking
+from ganeti import utils
 
 
-class _LockAcquireTimeout(Exception):
-  """Internal exception to report timeouts on acquiring locks.
+_OP_PREFIX = "Op"
+_LU_PREFIX = "LU"
+
+
+class LockAcquireTimeout(Exception):
+  """Exception to report timeouts on acquiring locks.
+
+  """
+
+
+def _CalculateLockAttemptTimeouts():
+  """Calculate timeouts for lock attempts.
 
   """
+  result = [1.0]
 
+  # Wait for a total of at least 150s before doing a blocking acquire
+  while sum(result) < 150.0:
+    timeout = (result[-1] * 1.05) ** 1.25
+
+    # Cap timeout at 10 seconds. This gives other jobs a chance to run
+    # even if we're still trying to get our locks, before finally moving
+    # to a blocking acquire.
+    if timeout > 10.0:
+      timeout = 10.0
+
+    elif timeout < 0.1:
+      # Lower boundary for safety
+      timeout = 0.1
 
-class _LockTimeoutStrategy(object):
+    result.append(timeout)
+
+  return result
+
+
+class LockAttemptTimeoutStrategy(object):
   """Class with lock acquire timeout strategy.
 
   """
   __slots__ = [
-    "_attempts",
+    "_timeouts",
     "_random_fn",
-    "_start_time",
+    "_time_fn",
     ]
 
-  _MAX_ATTEMPTS = 10
-  """How many retries before going into blocking mode"""
+  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
 
-  _ATTEMPT_FACTOR = 1.75
-  """Factor between attempts"""
-
-  def __init__(self, _random_fn=None):
+  def __init__(self, _time_fn=time.time, _random_fn=random.random):
     """Initializes this class.
 
+    @param _time_fn: Time function for unittests
     @param _random_fn: Random number generator for unittests
 
     """
     object.__init__(self)
 
-    self._start_time = None
-    self._attempts = 0
-
-    if _random_fn is None:
-      self._random_fn = random.random
-    else:
-      self._random_fn = _random_fn
+    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
+    self._time_fn = _time_fn
+    self._random_fn = _random_fn
 
   def NextAttempt(self):
-    """Advances to the next attempt.
+    """Returns the timeout for the next attempt.
 
     """
-    assert self._attempts >= 0
-    self._attempts += 1
-
-  def CalcRemainingTimeout(self):
-    """Returns the remaining timeout.
-
-    """
-    assert self._attempts >= 0
-
-    if self._attempts == self._MAX_ATTEMPTS:
-      # Only blocking acquires after 10 retries
-      return None
-
-    if self._attempts > self._MAX_ATTEMPTS:
-      raise RuntimeError("Blocking acquire ran into timeout")
-
-    # Get start time on first calculation
-    if self._start_time is None:
-      self._start_time = time.time()
-
-    # Calculate remaining time for this attempt
-    timeout = (self._start_time + (self._ATTEMPT_FACTOR ** self._attempts) -
-               time.time())
-
-    if timeout > 10.0:
-      # Cap timeout at 10 seconds. This gives other jobs a chance to run
-      # even if we're still trying to get our locks, before finally moving
-      # to a blocking acquire.
-      timeout = 10.0
-
-    elif timeout < 0.1:
-      # Lower boundary
-      timeout = 0.1
-
-    # Add a small variation (-/+ 5%) to timeouts. This helps in situations
-    # where two or more jobs are fighting for the same lock(s).
-    variation_range = timeout * 0.1
-    timeout += (self._random_fn() * variation_range) - (variation_range * 0.5)
-
-    assert timeout >= 0.0, "Timeout must be positive"
+    try:
+      timeout = self._timeouts.next()
+    except StopIteration:
+      # No more timeouts, do blocking acquire
+      timeout = None
+
+    if timeout is not None:
+      # Add a small variation (-/+ 5%) to timeout. This helps in situations
+      # where two or more jobs are fighting for the same lock(s).
+      variation_range = timeout * 0.1
+      timeout += ((self._random_fn() * variation_range) -
+                  (variation_range * 0.5))
 
     return timeout
 
 
-class OpExecCbBase:
+class OpExecCbBase: # pylint: disable-msg=W0232
   """Base class for OpCode execution callbacks.
 
   """
@@ -143,137 +138,51 @@ class OpExecCbBase:
 
     """
 
-  def ReportLocks(self, msg):
-    """Report lock operations.
+  def CheckCancel(self):
+    """Check whether job has been cancelled.
 
     """
 
 
-class Processor(object):
-  """Object which runs OpCodes"""
-  DISPATCH_TABLE = {
-    # Cluster
-    opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
-    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
-    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
-    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
-    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
-    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
-    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
-    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
-    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
-    opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
-    # node lu
-    opcodes.OpAddNode: cmdlib.LUAddNode,
-    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
-    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
-    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
-    opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
-    opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
-    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
-    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
-    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
-    opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
-    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
-    # instance lu
-    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
-    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
-    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
-    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
-    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
-    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
-    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
-    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
-    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
-    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
-    opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
-    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
-    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
-    opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
-    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
-    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
-    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
-    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
-    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
-    # os lu
-    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
-    # exports lu
-    opcodes.OpQueryExports: cmdlib.LUQueryExports,
-    opcodes.OpExportInstance: cmdlib.LUExportInstance,
-    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
-    # tags lu
-    opcodes.OpGetTags: cmdlib.LUGetTags,
-    opcodes.OpSearchTags: cmdlib.LUSearchTags,
-    opcodes.OpAddTags: cmdlib.LUAddTags,
-    opcodes.OpDelTags: cmdlib.LUDelTags,
-    # test lu
-    opcodes.OpTestDelay: cmdlib.LUTestDelay,
-    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
-    }
-
-  def __init__(self, context):
-    """Constructor for Processor
+def _LUNameForOpName(opname):
+  """Computes the LU name for a given OpCode name.
 
-    """
-    self.context = context
-    self._cbs = None
-    self.rpc = rpc.RpcRunner(context.cfg)
-    self.hmclass = HooksMaster
-
-  def _ReportLocks(self, level, names, shared, timeout, acquired, result):
-    """Reports lock operations.
+  """
+  assert opname.startswith(_OP_PREFIX), \
+      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
 
-    @type level: int
-    @param level: Lock level
-    @type names: list or string
-    @param names: Lock names
-    @type shared: bool
-    @param shared: Whether the locks should be acquired in shared mode
-    @type timeout: None or float
-    @param timeout: Timeout for acquiring the locks
-    @type acquired: bool
-    @param acquired: Whether the locks have already been acquired
-    @type result: None or set
-    @param result: Result from L{locking.GanetiLockManager.acquire}
+  return _LU_PREFIX + opname[len(_OP_PREFIX):]
 
-    """
-    parts = []
 
-    # Build message
-    if acquired:
-      if result is None:
-        parts.append("timeout")
-      else:
-        parts.append("acquired")
-    else:
-      parts.append("waiting")
-      if timeout is None:
-        parts.append("blocking")
-      else:
-        parts.append("timeout=%0.6fs" % timeout)
+def _ComputeDispatchTable():
+  """Computes the opcode-to-lu dispatch table.
 
-    parts.append(locking.LEVEL_NAMES[level])
+  """
+  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
+              for op in opcodes.OP_MAPPING.values()
+              if op.WITH_LU)
 
-    if names == locking.ALL_SET:
-      parts.append("ALL")
-    elif isinstance(names, basestring):
-      parts.append(names)
-    else:
-      parts.append(",".join(names))
 
-    if shared:
-      parts.append("shared")
-    else:
-      parts.append("exclusive")
+class Processor(object):
+  """Object which runs OpCodes"""
+  DISPATCH_TABLE = _ComputeDispatchTable()
 
-    msg = "/".join(parts)
+  def __init__(self, context, ec_id):
+    """Constructor for Processor
 
-    logging.debug("LU locks %s", msg)
+    @type context: GanetiContext
+    @param context: global Ganeti context
+    @type ec_id: string
+    @param ec_id: execution context identifier
 
-    if self._cbs:
-      self._cbs.ReportLocks(msg)
+    """
+    self.context = context
+    self._ec_id = ec_id
+    self._cbs = None
+    self.rpc = rpc.RpcRunner(context.cfg)
+    self.hmclass = HooksMaster
 
-  def _AcquireLocks(self, level, names, shared, timeout):
+  def _AcquireLocks(self, level, names, shared, timeout, priority):
     """Acquires locks via the Ganeti lock manager.
 
     @type level: int
@@ -284,14 +193,18 @@ class Processor(object):
     @param shared: Whether the locks should be acquired in shared mode
     @type timeout: None or float
     @param timeout: Timeout for acquiring the locks
+    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
+        amount of time
 
     """
-    self._ReportLocks(level, names, shared, timeout, False, None)
+    if self._cbs:
+      self._cbs.CheckCancel()
 
     acquired = self.context.glm.acquire(level, names, shared=shared,
-                                        timeout=timeout)
+                                        timeout=timeout, priority=priority)
 
-    self._ReportLocks(level, names, shared, timeout, True, acquired)
+    if acquired is None:
+      raise LockAcquireTimeout()
 
     return acquired
 
@@ -304,7 +217,7 @@ class Processor(object):
     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
-                     self._Feedback, None)
+                     self.Log, None)
 
     if getattr(lu.op, "dry_run", False):
       # in this mode, no post-hooks are run, and the config is not
@@ -315,10 +228,10 @@ class Processor(object):
       return lu.dry_run_result
 
     try:
-      result = lu.Exec(self._Feedback)
+      result = lu.Exec(self.Log)
       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
-                                self._Feedback, result)
+                                self.Log, result)
     finally:
       # FIXME: This needs locks if not lu_class.REQ_BGL
       if write_count != self.context.cfg.write_count:
@@ -326,7 +239,7 @@ class Processor(object):
 
     return result
 
-  def _LockAndExecLU(self, lu, level, calc_timeout):
+  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
     """Execute a Logical Unit, with the needed locks.
 
     This is a recursive function that starts locking the given level, and
@@ -361,13 +274,7 @@ class Processor(object):
           needed_locks = lu.needed_locks[level]
 
           acquired = self._AcquireLocks(level, needed_locks, share,
-                                        calc_timeout())
-
-          if acquired is None:
-            raise _LockAcquireTimeout()
-
-          lu.acquired_locks[level] = acquired
-
+                                        calc_timeout(), priority)
         else:
           # Adding locks
           add_locks = lu.add_locks[level]
@@ -378,11 +285,15 @@ class Processor(object):
           except errors.LockError:
             raise errors.OpPrereqError(
               "Couldn't add locks (%s), probably because of a race condition"
-              " with another job, who added them first" % add_locks)
+              " with another job, who added them first" % add_locks,
+              errors.ECODE_FAULT)
+
+          acquired = add_locks
 
-          lu.acquired_locks[level] = add_locks
         try:
-          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
+          lu.acquired_locks[level] = acquired
+
+          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
         finally:
           if level in lu.remove_locks:
             self.context.glm.remove(level, lu.remove_locks[level])
@@ -391,60 +302,63 @@ class Processor(object):
           self.context.glm.release(level)
 
     else:
-      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
+      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
 
     return result
 
-  def ExecOpCode(self, op, cbs):
+  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
     """Execute an opcode.
 
     @type op: an OpCode instance
     @param op: the opcode to be executed
     @type cbs: L{OpExecCbBase}
     @param cbs: Runtime callbacks
+    @type timeout: float or None
+    @param timeout: Maximum time to acquire all locks, None for no timeout
+    @type priority: number or None
+    @param priority: Priority for acquiring lock(s)
+    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
+        amount of time
 
     """
     if not isinstance(op, opcodes.OpCode):
       raise errors.ProgrammerError("Non-opcode instance passed"
                                    " to ExecOpcode")
 
+    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
+    if lu_class is None:
+      raise errors.OpCodeUnknown("Unknown opcode")
+
+    if timeout is None:
+      calc_timeout = lambda: None
+    else:
+      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
+
     self._cbs = cbs
     try:
-      lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
-      if lu_class is None:
-        raise errors.OpCodeUnknown("Unknown opcode")
-
-      timeout_strategy = _LockTimeoutStrategy()
-      calc_timeout = timeout_strategy.CalcRemainingTimeout
+      # Acquire the Big Ganeti Lock exclusively if this LU requires it,
+      # and in a shared fashion otherwise (to prevent concurrent run with
+      # an exclusive LU.
+      self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
+                          not lu_class.REQ_BGL, calc_timeout(),
+                          priority)
+      try:
+        lu = lu_class(self, op, self.context, self.rpc)
+        lu.ExpandNames()
+        assert lu.needed_locks is not None, "needed_locks not set by LU"
 
-      while True:
         try:
-          # Acquire the Big Ganeti Lock exclusively if this LU requires it,
-          # and in a shared fashion otherwise (to prevent concurrent run with
-          # an exclusive LU.
-          if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
-                                not lu_class.REQ_BGL, calc_timeout()) is None:
-            raise _LockAcquireTimeout()
-
-          try:
-            lu = lu_class(self, op, self.context, self.rpc)
-            lu.ExpandNames()
-            assert lu.needed_locks is not None, "needed_locks not set by LU"
-
-            return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
-          finally:
-            self.context.glm.release(locking.LEVEL_CLUSTER)
-
-        except _LockAcquireTimeout:
-          # Timeout while waiting for lock, try again
-          pass
-
-        timeout_strategy.NextAttempt()
-
+          return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
+                                     priority)
+        finally:
+          if self._ec_id:
+            self.context.cfg.DropECReservations(self._ec_id)
+      finally:
+        self.context.glm.release(locking.LEVEL_CLUSTER)
     finally:
       self._cbs = None
 
-  def _Feedback(self, *args):
+  def Log(self, *args):
     """Forward call to feedback callback function.
 
     """
@@ -456,7 +370,7 @@ class Processor(object):
 
     """
     logging.debug("Step %d/%d %s", current, total, message)
-    self._Feedback("STEP %d/%d %s" % (current, total, message))
+    self.Log("STEP %d/%d %s" % (current, total, message))
 
   def LogWarning(self, message, *args, **kwargs):
     """Log a warning to the logs and the user.
@@ -473,9 +387,9 @@ class Processor(object):
       message = message % tuple(args)
     if message:
       logging.warning(message)
-      self._Feedback(" - WARNING: %s" % message)
+      self.Log(" - WARNING: %s" % message)
     if "hint" in kwargs:
-      self._Feedback("      Hint: %s" % kwargs["hint"])
+      self.Log("      Hint: %s" % kwargs["hint"])
 
   def LogInfo(self, message, *args):
     """Log an informational message to the logs and the user.
@@ -484,7 +398,12 @@ class Processor(object):
     if args:
       message = message % tuple(args)
     logging.info(message)
-    self._Feedback(" - INFO: %s" % message)
+    self.Log(" - INFO: %s" % message)
+
+  def GetECId(self):
+    if not self._ec_id:
+      errors.ProgrammerError("Tried to use execution context id when not set")
+    return self._ec_id
 
 
 class HooksMaster(object):