Add targeted pylint disables
[ganeti-local] / lib / mcpu.py
index a6014a1..af4ff1a 100644 (file)
@@ -29,6 +29,8 @@ are two kinds of classes defined:
 """
 
 import logging
+import random
+import time
 
 from ganeti import opcodes
 from ganeti import constants
@@ -38,10 +40,130 @@ from ganeti import cmdlib
 from ganeti import locking
 
 
+class _LockAcquireTimeout(Exception):
+  """Internal exception to report timeouts on acquiring locks.
+
+  """
+
+
+def _CalculateLockAttemptTimeouts():
+  """Calculate timeouts for lock attempts.
+
+  """
+  result = [1.0]
+
+  # Wait for a total of at least 150s before doing a blocking acquire
+  while sum(result) < 150.0:
+    timeout = (result[-1] * 1.05) ** 1.25
+
+    # Cap timeout at 10 seconds. This gives other jobs a chance to run
+    # even if we're still trying to get our locks, before finally moving
+    # to a blocking acquire.
+    if timeout > 10.0:
+      timeout = 10.0
+
+    elif timeout < 0.1:
+      # Lower boundary for safety
+      timeout = 0.1
+
+    result.append(timeout)
+
+  return result
+
+
+class _LockAttemptTimeoutStrategy(object):
+  """Class with lock acquire timeout strategy.
+
+  """
+  __slots__ = [
+    "_attempt",
+    "_random_fn",
+    "_start_time",
+    "_time_fn",
+    "_running_timeout",
+    ]
+
+  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
+
+  def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
+    """Initializes this class.
+
+    @type attempt: int
+    @param attempt: Current attempt number
+    @param _time_fn: Time function for unittests
+    @param _random_fn: Random number generator for unittests
+
+    """
+    object.__init__(self)
+
+    if attempt < 0:
+      raise ValueError("Attempt must be zero or positive")
+
+    self._attempt = attempt
+    self._time_fn = _time_fn
+    self._random_fn = _random_fn
+
+    try:
+      timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
+    except IndexError:
+      # No more timeouts, do blocking acquire
+      timeout = None
+
+    self._running_timeout = locking.RunningTimeout(timeout, False,
+                                                   _time_fn=_time_fn)
+
+  def NextAttempt(self):
+    """Returns the strategy for the next attempt.
+
+    """
+    return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
+                                       _time_fn=self._time_fn,
+                                       _random_fn=self._random_fn)
+
+  def CalcRemainingTimeout(self):
+    """Returns the remaining timeout.
+
+    """
+    timeout = self._running_timeout.Remaining()
+
+    if timeout is not None:
+      # Add a small variation (-/+ 5%) to timeout. This helps in situations
+      # where two or more jobs are fighting for the same lock(s).
+      variation_range = timeout * 0.1
+      timeout += ((self._random_fn() * variation_range) -
+                  (variation_range * 0.5))
+
+    return timeout
+
+
+class OpExecCbBase: # pylint: disable-msg=W0232
+  """Base class for OpCode execution callbacks.
+
+  """
+  def NotifyStart(self):
+    """Called when we are about to execute the LU.
+
+    This function is called when we're about to start the lu's Exec() method,
+    that is, after we have acquired all locks.
+
+    """
+
+  def Feedback(self, *args):
+    """Sends feedback from the LU code to the end-user.
+
+    """
+
+  def ReportLocks(self, msg):
+    """Report lock operations.
+
+    """
+
+
 class Processor(object):
   """Object which runs OpCodes"""
   DISPATCH_TABLE = {
     # Cluster
+    opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
@@ -50,12 +172,19 @@ class Processor(object):
     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
+    opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
     # node lu
     opcodes.OpAddNode: cmdlib.LUAddNode,
     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
+    opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
+    opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
+    opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
+    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
+    opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
+    opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
     # instance lu
     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
@@ -67,8 +196,10 @@ class Processor(object):
     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
+    opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
     opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
+    opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
@@ -90,17 +221,95 @@ class Processor(object):
     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
     }
 
-  def __init__(self, context):
+  def __init__(self, context, ec_id):
     """Constructor for Processor
 
-    Args:
-     - feedback_fn: the feedback function (taking one string) to be run when
-                    interesting events are happening
+    @type context: GanetiContext
+    @param context: global Ganeti context
+    @type ec_id: string
+    @param ec_id: execution context identifier
+
     """
     self.context = context
-    self._feedback_fn = None
-    self.exclusive_BGL = False
+    self._ec_id = ec_id
+    self._cbs = None
     self.rpc = rpc.RpcRunner(context.cfg)
+    self.hmclass = HooksMaster
+
+  def _ReportLocks(self, level, names, shared, timeout, acquired, result):
+    """Reports lock operations.
+
+    @type level: int
+    @param level: Lock level
+    @type names: list or string
+    @param names: Lock names
+    @type shared: bool
+    @param shared: Whether the locks should be acquired in shared mode
+    @type timeout: None or float
+    @param timeout: Timeout for acquiring the locks
+    @type acquired: bool
+    @param acquired: Whether the locks have already been acquired
+    @type result: None or set
+    @param result: Result from L{locking.GanetiLockManager.acquire}
+
+    """
+    parts = []
+
+    # Build message
+    if acquired:
+      if result is None:
+        parts.append("timeout")
+      else:
+        parts.append("acquired")
+    else:
+      parts.append("waiting")
+      if timeout is None:
+        parts.append("blocking")
+      else:
+        parts.append("timeout=%0.6fs" % timeout)
+
+    parts.append(locking.LEVEL_NAMES[level])
+
+    if names == locking.ALL_SET:
+      parts.append("ALL")
+    elif isinstance(names, basestring):
+      parts.append(names)
+    else:
+      parts.append(",".join(names))
+
+    if shared:
+      parts.append("shared")
+    else:
+      parts.append("exclusive")
+
+    msg = "/".join(parts)
+
+    logging.debug("LU locks %s", msg)
+
+    if self._cbs:
+      self._cbs.ReportLocks(msg)
+
+  def _AcquireLocks(self, level, names, shared, timeout):
+    """Acquires locks via the Ganeti lock manager.
+
+    @type level: int
+    @param level: Lock level
+    @type names: list or string
+    @param names: Lock names
+    @type shared: bool
+    @param shared: Whether the locks should be acquired in shared mode
+    @type timeout: None or float
+    @param timeout: Timeout for acquiring the locks
+
+    """
+    self._ReportLocks(level, names, shared, timeout, False, None)
+
+    acquired = self.context.glm.acquire(level, names, shared=shared,
+                                        timeout=timeout)
+
+    self._ReportLocks(level, names, shared, timeout, True, acquired)
+
+    return acquired
 
   def _ExecLU(self, lu):
     """Logical Unit execution sequence.
@@ -108,15 +317,24 @@ class Processor(object):
     """
     write_count = self.context.cfg.write_count
     lu.CheckPrereq()
-    hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
+    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
-                     self._feedback_fn, None)
+                     self._Feedback, None)
+
+    if getattr(lu.op, "dry_run", False):
+      # in this mode, no post-hooks are run, and the config is not
+      # written (as it might have been modified by another LU, and we
+      # shouldn't do writeout on behalf of other threads
+      self.LogInfo("dry-run mode requested, not actually executing"
+                   " the operation")
+      return lu.dry_run_result
+
     try:
-      result = lu.Exec(self._feedback_fn)
+      result = lu.Exec(self._Feedback)
       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
-                                self._feedback_fn, result)
+                                self._Feedback, result)
     finally:
       # FIXME: This needs locks if not lu_class.REQ_BGL
       if write_count != self.context.cfg.write_count:
@@ -124,7 +342,7 @@ class Processor(object):
 
     return result
 
-  def _LockAndExecLU(self, lu, level):
+  def _LockAndExecLU(self, lu, level, calc_timeout):
     """Execute a Logical Unit, with the needed locks.
 
     This is a recursive function that starts locking the given level, and
@@ -135,93 +353,135 @@ class Processor(object):
     adding_locks = level in lu.add_locks
     acquiring_locks = level in lu.needed_locks
     if level not in locking.LEVELS:
-      if callable(self._run_notifier):
-        self._run_notifier()
+      if self._cbs:
+        self._cbs.NotifyStart()
+
       result = self._ExecLU(lu)
+
     elif adding_locks and acquiring_locks:
       # We could both acquire and add locks at the same level, but for now we
       # don't need this, so we'll avoid the complicated code needed.
-      raise NotImplementedError(
-        "Can't declare locks to acquire when adding others")
+      raise NotImplementedError("Can't declare locks to acquire when adding"
+                                " others")
+
     elif adding_locks or acquiring_locks:
       lu.DeclareLocks(level)
       share = lu.share_locks[level]
-      if acquiring_locks:
-        needed_locks = lu.needed_locks[level]
-        lu.acquired_locks[level] = self.context.glm.acquire(level,
-                                                            needed_locks,
-                                                            shared=share)
-      else: # adding_locks
-        add_locks = lu.add_locks[level]
-        lu.remove_locks[level] = add_locks
-        try:
-          self.context.glm.add(level, add_locks, acquired=1, shared=share)
-        except errors.LockError:
-          raise errors.OpPrereqError(
-            "Couldn't add locks (%s), probably because of a race condition"
-            " with another job, who added them first" % add_locks)
+
       try:
+        assert adding_locks ^ acquiring_locks, \
+          "Locks must be either added or acquired"
+
+        if acquiring_locks:
+          # Acquiring locks
+          needed_locks = lu.needed_locks[level]
+
+          acquired = self._AcquireLocks(level, needed_locks, share,
+                                        calc_timeout())
+
+          if acquired is None:
+            raise _LockAcquireTimeout()
+
+        else:
+          # Adding locks
+          add_locks = lu.add_locks[level]
+          lu.remove_locks[level] = add_locks
+
+          try:
+            self.context.glm.add(level, add_locks, acquired=1, shared=share)
+          except errors.LockError:
+            raise errors.OpPrereqError(
+              "Couldn't add locks (%s), probably because of a race condition"
+              " with another job, who added them first" % add_locks,
+              errors.ECODE_FAULT)
+
+          acquired = add_locks
+
         try:
-          if adding_locks:
-            lu.acquired_locks[level] = add_locks
-          result = self._LockAndExecLU(lu, level + 1)
+          lu.acquired_locks[level] = acquired
+
+          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
         finally:
           if level in lu.remove_locks:
             self.context.glm.remove(level, lu.remove_locks[level])
       finally:
         if self.context.glm.is_owned(level):
           self.context.glm.release(level)
+
     else:
-      result = self._LockAndExecLU(lu, level + 1)
+      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
 
     return result
 
-  def ExecOpCode(self, op, feedback_fn, run_notifier):
+  def ExecOpCode(self, op, cbs):
     """Execute an opcode.
 
     @type op: an OpCode instance
     @param op: the opcode to be executed
-    @type feedback_fn: a function that takes a single argument
-    @param feedback_fn: this function will be used as feedback from the LU
-                        code to the end-user
-    @type run_notifier: callable (no arguments) or None
-    @param run_notifier:  this function (if callable) will be called when
-                          we are about to call the lu's Exec() method, that
-                          is, after we have acquired all locks
+    @type cbs: L{OpExecCbBase}
+    @param cbs: Runtime callbacks
 
     """
     if not isinstance(op, opcodes.OpCode):
       raise errors.ProgrammerError("Non-opcode instance passed"
                                    " to ExecOpcode")
 
-    self._feedback_fn = feedback_fn
-    self._run_notifier = run_notifier
-    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
-    if lu_class is None:
-      raise errors.OpCodeUnknown("Unknown opcode")
-
-    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
-    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
-    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
-                             shared=not lu_class.REQ_BGL)
+    self._cbs = cbs
     try:
-      self.exclusive_BGL = lu_class.REQ_BGL
-      lu = lu_class(self, op, self.context, self.rpc)
-      lu.ExpandNames()
-      assert lu.needed_locks is not None, "needed_locks not set by LU"
-      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
+      lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
+      if lu_class is None:
+        raise errors.OpCodeUnknown("Unknown opcode")
+
+      timeout_strategy = _LockAttemptTimeoutStrategy()
+
+      while True:
+        try:
+          acquire_timeout = timeout_strategy.CalcRemainingTimeout()
+
+          # Acquire the Big Ganeti Lock exclusively if this LU requires it,
+          # and in a shared fashion otherwise (to prevent concurrent run with
+          # an exclusive LU.
+          if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
+                                not lu_class.REQ_BGL, acquire_timeout) is None:
+            raise _LockAcquireTimeout()
+
+          try:
+            lu = lu_class(self, op, self.context, self.rpc)
+            lu.ExpandNames()
+            assert lu.needed_locks is not None, "needed_locks not set by LU"
+
+            try:
+              return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
+                                         timeout_strategy.CalcRemainingTimeout)
+            finally:
+              if self._ec_id:
+                self.context.cfg.DropECReservations(self._ec_id)
+
+          finally:
+            self.context.glm.release(locking.LEVEL_CLUSTER)
+
+        except _LockAcquireTimeout:
+          # Timeout while waiting for lock, try again
+          pass
+
+        timeout_strategy = timeout_strategy.NextAttempt()
+
     finally:
-      self.context.glm.release(locking.LEVEL_CLUSTER)
-      self.exclusive_BGL = False
+      self._cbs = None
 
-    return result
+  def _Feedback(self, *args):
+    """Forward call to feedback callback function.
+
+    """
+    if self._cbs:
+      self._cbs.Feedback(*args)
 
   def LogStep(self, current, total, message):
     """Log a change in LU execution progress.
 
     """
     logging.debug("Step %d/%d %s", current, total, message)
-    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
+    self._Feedback("STEP %d/%d %s" % (current, total, message))
 
   def LogWarning(self, message, *args, **kwargs):
     """Log a warning to the logs and the user.
@@ -238,9 +498,9 @@ class Processor(object):
       message = message % tuple(args)
     if message:
       logging.warning(message)
-      self._feedback_fn(" - WARNING: %s" % message)
+      self._Feedback(" - WARNING: %s" % message)
     if "hint" in kwargs:
-      self._feedback_fn("      Hint: %s" % kwargs["hint"])
+      self._Feedback("      Hint: %s" % kwargs["hint"])
 
   def LogInfo(self, message, *args):
     """Log an informational message to the logs and the user.
@@ -249,7 +509,12 @@ class Processor(object):
     if args:
       message = message % tuple(args)
     logging.info(message)
-    self._feedback_fn(" - INFO: %s" % message)
+    self._Feedback(" - INFO: %s" % message)
+
+  def GetECId(self):
+    if not self._ec_id:
+      errors.ProgrammerError("Tried to use execution context id when not set")
+    return self._ec_id
 
 
 class HooksMaster(object):
@@ -264,9 +529,8 @@ class HooksMaster(object):
   which behaves the same works.
 
   """
-  def __init__(self, callfn, proc, lu):
+  def __init__(self, callfn, lu):
     self.callfn = callfn
-    self.proc = proc
     self.lu = lu
     self.op = lu.op
     self.env, node_list_pre, node_list_post = self._BuildEnv()
@@ -315,40 +579,57 @@ class HooksMaster(object):
 
     return self.callfn(node_list, hpath, phase, env)
 
-  def RunPhase(self, phase):
+  def RunPhase(self, phase, nodes=None):
     """Run all the scripts for a phase.
 
     This is the main function of the HookMaster.
 
     @param phase: one of L{constants.HOOKS_PHASE_POST} or
         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
+    @param nodes: overrides the predefined list of nodes for the given phase
     @return: the processed results of the hooks multi-node rpc call
     @raise errors.HooksFailure: on communication failure to the nodes
+    @raise errors.HooksAbort: on failure of one of the hooks
 
     """
-    if not self.node_list[phase]:
+    if not self.node_list[phase] and not nodes:
       # empty node list, we should not attempt to run this as either
       # we're in the cluster init phase and the rpc client part can't
       # even attempt to run, or this LU doesn't do hooks at all
       return
     hpath = self.lu.HPATH
-    results = self._RunWrapper(self.node_list[phase], hpath, phase)
-    if phase == constants.HOOKS_PHASE_PRE:
-      errs = []
-      if not results:
-        raise errors.HooksFailure("Communication failure")
-      for node_name in results:
-        res = results[node_name]
-        if res.failed or res.data is False or not isinstance(res.data, list):
-          if not res.offline:
-            self.proc.LogWarning("Communication failure to node %s" %
-                                 node_name)
-          continue
-        for script, hkr, output in res.data:
-          if hkr == constants.HKR_FAIL:
+    if nodes is not None:
+      results = self._RunWrapper(nodes, hpath, phase)
+    else:
+      results = self._RunWrapper(self.node_list[phase], hpath, phase)
+    errs = []
+    if not results:
+      msg = "Communication Failure"
+      if phase == constants.HOOKS_PHASE_PRE:
+        raise errors.HooksFailure(msg)
+      else:
+        self.lu.LogWarning(msg)
+        return results
+    for node_name in results:
+      res = results[node_name]
+      if res.offline:
+        continue
+      msg = res.fail_msg
+      if msg:
+        self.lu.LogWarning("Communication failure to node %s: %s",
+                           node_name, msg)
+        continue
+      for script, hkr, output in res.payload:
+        if hkr == constants.HKR_FAIL:
+          if phase == constants.HOOKS_PHASE_PRE:
             errs.append((node_name, script, output))
-      if errs:
-        raise errors.HooksAbort(errs)
+          else:
+            if not output:
+              output = "(no output)"
+            self.lu.LogWarning("On %s script %s failed, output: %s" %
+                               (node_name, script, output))
+    if errs and phase == constants.HOOKS_PHASE_PRE:
+      raise errors.HooksAbort(errs)
     return results
 
   def RunConfigUpdate(self):
@@ -361,4 +642,4 @@ class HooksMaster(object):
     phase = constants.HOOKS_PHASE_POST
     hpath = constants.HOOKS_NAME_CFGUPDATE
     nodes = [self.lu.cfg.GetMasterNode()]
-    results = self._RunWrapper(nodes, hpath, phase)
+    self._RunWrapper(nodes, hpath, phase)