Export extractExTags and updateExclTags
[ganeti-local] / lib / mcpu.py
index 78faa37..4498747 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -28,80 +28,322 @@ are two kinds of classes defined:
 
 """
 
+import sys
 import logging
+import random
+import time
+import itertools
+import traceback
 
 from ganeti import opcodes
 from ganeti import constants
 from ganeti import errors
-from ganeti import rpc
+from ganeti import hooksmaster
 from ganeti import cmdlib
 from ganeti import locking
+from ganeti import utils
+from ganeti import compat
+
+
+_OP_PREFIX = "Op"
+_LU_PREFIX = "LU"
+
+#: LU classes which don't need to acquire the node allocation lock
+#: (L{locking.NAL}) when they acquire all node or node resource locks
+_NODE_ALLOC_WHITELIST = frozenset([])
+
+#: LU classes which don't need to acquire the node allocation lock
+#: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
+#: or node resource locks
+_NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
+  cmdlib.LUBackupExport,
+  cmdlib.LUBackupRemove,
+  cmdlib.LUOobCommand,
+  ])
+
+
+class LockAcquireTimeout(Exception):
+  """Exception to report timeouts on acquiring locks.
+
+  """
+
+
+def _CalculateLockAttemptTimeouts():
+  """Calculate timeouts for lock attempts.
+
+  """
+  result = [constants.LOCK_ATTEMPTS_MINWAIT]
+  running_sum = result[0]
+
+  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
+  # blocking acquire
+  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
+    timeout = (result[-1] * 1.05) ** 1.25
+
+    # Cap max timeout. This gives other jobs a chance to run even if
+    # we're still trying to get our locks, before finally moving to a
+    # blocking acquire.
+    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
+    # And also cap the lower boundary for safety
+    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
+
+    result.append(timeout)
+    running_sum += timeout
+
+  return result
+
+
+class LockAttemptTimeoutStrategy(object):
+  """Class with lock acquire timeout strategy.
+
+  """
+  __slots__ = [
+    "_timeouts",
+    "_random_fn",
+    "_time_fn",
+    ]
+
+  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
+
+  def __init__(self, _time_fn=time.time, _random_fn=random.random):
+    """Initializes this class.
+
+    @param _time_fn: Time function for unittests
+    @param _random_fn: Random number generator for unittests
+
+    """
+    object.__init__(self)
+
+    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
+    self._time_fn = _time_fn
+    self._random_fn = _random_fn
+
+  def NextAttempt(self):
+    """Returns the timeout for the next attempt.
+
+    """
+    try:
+      timeout = self._timeouts.next()
+    except StopIteration:
+      # No more timeouts, do blocking acquire
+      timeout = None
+
+    if timeout is not None:
+      # Add a small variation (-/+ 5%) to timeout. This helps in situations
+      # where two or more jobs are fighting for the same lock(s).
+      variation_range = timeout * 0.1
+      timeout += ((self._random_fn() * variation_range) -
+                  (variation_range * 0.5))
+
+    return timeout
+
+
+class OpExecCbBase: # pylint: disable=W0232
+  """Base class for OpCode execution callbacks.
+
+  """
+  def NotifyStart(self):
+    """Called when we are about to execute the LU.
+
+    This function is called when we're about to start the lu's Exec() method,
+    that is, after we have acquired all locks.
+
+    """
+
+  def Feedback(self, *args):
+    """Sends feedback from the LU code to the end-user.
+
+    """
+
+  def CurrentPriority(self): # pylint: disable=R0201
+    """Returns current priority or C{None}.
+
+    """
+    return None
+
+  def SubmitManyJobs(self, jobs):
+    """Submits jobs for processing.
+
+    See L{jqueue.JobQueue.SubmitManyJobs}.
+
+    """
+    raise NotImplementedError
+
+
+def _LUNameForOpName(opname):
+  """Computes the LU name for a given OpCode name.
+
+  """
+  assert opname.startswith(_OP_PREFIX), \
+      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
+
+  return _LU_PREFIX + opname[len(_OP_PREFIX):]
+
+
+def _ComputeDispatchTable():
+  """Computes the opcode-to-lu dispatch table.
+
+  """
+  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
+              for op in opcodes.OP_MAPPING.values()
+              if op.WITH_LU)
+
+
+def _SetBaseOpParams(src, defcomment, dst):
+  """Copies basic opcode parameters.
+
+  @type src: L{opcodes.OpCode}
+  @param src: Source opcode
+  @type defcomment: string
+  @param defcomment: Comment to specify if not already given
+  @type dst: L{opcodes.OpCode}
+  @param dst: Destination opcode
+
+  """
+  if hasattr(src, "debug_level"):
+    dst.debug_level = src.debug_level
+
+  if (getattr(dst, "priority", None) is None and
+      hasattr(src, "priority")):
+    dst.priority = src.priority
+
+  if not getattr(dst, opcodes.COMMENT_ATTR, None):
+    dst.comment = defcomment
+
+
+def _ProcessResult(submit_fn, op, result):
+  """Examines opcode result.
+
+  If necessary, additional processing on the result is done.
+
+  """
+  if isinstance(result, cmdlib.ResultWithJobs):
+    # Copy basic parameters (e.g. priority)
+    map(compat.partial(_SetBaseOpParams, op,
+                       "Submitted by %s" % op.OP_ID),
+        itertools.chain(*result.jobs))
+
+    # Submit jobs
+    job_submission = submit_fn(result.jobs)
+
+    # Build dictionary
+    result = result.other
+
+    assert constants.JOB_IDS_KEY not in result, \
+      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
+
+    result[constants.JOB_IDS_KEY] = job_submission
+
+  return result
+
+
+def _FailingSubmitManyJobs(_):
+  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
+
+  """
+  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
+                               " queries) can not submit jobs")
+
+
+def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
+                 _nal_whitelist=_NODE_ALLOC_WHITELIST):
+  """Performs consistency checks on locks acquired by a logical unit.
+
+  @type lu: L{cmdlib.LogicalUnit}
+  @param lu: Logical unit instance
+  @type glm: L{locking.GanetiLockManager}
+  @param glm: Lock manager
+
+  """
+  if not __debug__:
+    return
+
+  have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
+
+  for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
+    # TODO: Verify using actual lock mode, not using LU variables
+    if level in lu.needed_locks:
+      share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
+      share_level = lu.share_locks[level]
+
+      if lu.__class__ in _mode_whitelist:
+        assert share_node_alloc != share_level, \
+          "LU is whitelisted to use different modes for node allocation lock"
+      else:
+        assert bool(share_node_alloc) == bool(share_level), \
+          ("Node allocation lock must be acquired using the same mode as nodes"
+           " and node resources")
+
+      if lu.__class__ in _nal_whitelist:
+        assert not have_nal, \
+          "LU is whitelisted for not acquiring the node allocation lock"
+      elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
+        assert have_nal, \
+          ("Node allocation lock must be used if an LU acquires all nodes"
+           " or node resources")
 
 
 class Processor(object):
   """Object which runs OpCodes"""
-  DISPATCH_TABLE = {
-    # Cluster
-    opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
-    opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
-    opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
-    opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
-    opcodes.OpRenameCluster: cmdlib.LURenameCluster,
-    opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
-    opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
-    opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
-    # node lu
-    opcodes.OpAddNode: cmdlib.LUAddNode,
-    opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
-    opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
-    opcodes.OpRemoveNode: cmdlib.LURemoveNode,
-    opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
-    opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
-    # instance lu
-    opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
-    opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
-    opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
-    opcodes.OpRenameInstance: cmdlib.LURenameInstance,
-    opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
-    opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
-    opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
-    opcodes.OpRebootInstance: cmdlib.LURebootInstance,
-    opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
-    opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
-    opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
-    opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
-    opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
-    opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
-    opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
-    opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
-    opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
-    # os lu
-    opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
-    # exports lu
-    opcodes.OpQueryExports: cmdlib.LUQueryExports,
-    opcodes.OpExportInstance: cmdlib.LUExportInstance,
-    opcodes.OpRemoveExport: cmdlib.LURemoveExport,
-    # tags lu
-    opcodes.OpGetTags: cmdlib.LUGetTags,
-    opcodes.OpSearchTags: cmdlib.LUSearchTags,
-    opcodes.OpAddTags: cmdlib.LUAddTags,
-    opcodes.OpDelTags: cmdlib.LUDelTags,
-    # test lu
-    opcodes.OpTestDelay: cmdlib.LUTestDelay,
-    opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
-    }
-
-  def __init__(self, context):
+  DISPATCH_TABLE = _ComputeDispatchTable()
+
+  def __init__(self, context, ec_id, enable_locks=True):
     """Constructor for Processor
 
-    Args:
-     - feedback_fn: the feedback function (taking one string) to be run when
-                    interesting events are happening
+    @type context: GanetiContext
+    @param context: global Ganeti context
+    @type ec_id: string
+    @param ec_id: execution context identifier
+
     """
     self.context = context
-    self._feedback_fn = None
-    self.exclusive_BGL = False
-    self.rpc = rpc.RpcRunner(context.cfg)
+    self._ec_id = ec_id
+    self._cbs = None
+    self.rpc = context.rpc
+    self.hmclass = hooksmaster.HooksMaster
+    self._enable_locks = enable_locks
+
+  def _CheckLocksEnabled(self):
+    """Checks if locking is enabled.
+
+    @raise errors.ProgrammerError: In case locking is not enabled
+
+    """
+    if not self._enable_locks:
+      raise errors.ProgrammerError("Attempted to use disabled locks")
+
+  def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
+    """Acquires locks via the Ganeti lock manager.
+
+    @type level: int
+    @param level: Lock level
+    @type names: list or string
+    @param names: Lock names
+    @type shared: bool
+    @param shared: Whether the locks should be acquired in shared mode
+    @type opportunistic: bool
+    @param opportunistic: Whether to acquire opportunistically
+    @type timeout: None or float
+    @param timeout: Timeout for acquiring the locks
+    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
+        amount of time
+
+    """
+    self._CheckLocksEnabled()
+
+    if self._cbs:
+      priority = self._cbs.CurrentPriority()
+    else:
+      priority = None
+
+    acquired = self.context.glm.acquire(level, names, shared=shared,
+                                        timeout=timeout, priority=priority,
+                                        opportunistic=opportunistic)
+
+    if acquired is None:
+      raise LockAcquireTimeout()
+
+    return acquired
 
   def _ExecLU(self, lu):
     """Logical Unit execution sequence.
@@ -109,10 +351,11 @@ class Processor(object):
     """
     write_count = self.context.cfg.write_count
     lu.CheckPrereq()
-    hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
+
+    hm = self.BuildHooksManager(lu)
     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
-                     self._feedback_fn, None)
+                     self.Log, None)
 
     if getattr(lu.op, "dry_run", False):
       # in this mode, no post-hooks are run, and the config is not
@@ -122,11 +365,16 @@ class Processor(object):
                    " the operation")
       return lu.dry_run_result
 
+    if self._cbs:
+      submit_mj_fn = self._cbs.SubmitManyJobs
+    else:
+      submit_mj_fn = _FailingSubmitManyJobs
+
     try:
-      result = lu.Exec(self._feedback_fn)
+      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
-                                self._feedback_fn, result)
+                                self.Log, result)
     finally:
       # FIXME: This needs locks if not lu_class.REQ_BGL
       if write_count != self.context.cfg.write_count:
@@ -134,7 +382,10 @@ class Processor(object):
 
     return result
 
-  def _LockAndExecLU(self, lu, level):
+  def BuildHooksManager(self, lu):
+    return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
+
+  def _LockAndExecLU(self, lu, level, calc_timeout):
     """Execute a Logical Unit, with the needed locks.
 
     This is a recursive function that starts locking the given level, and
@@ -142,96 +393,165 @@ class Processor(object):
     given LU and its opcodes.
 
     """
+    glm = self.context.glm
     adding_locks = level in lu.add_locks
     acquiring_locks = level in lu.needed_locks
+
     if level not in locking.LEVELS:
-      if callable(self._run_notifier):
-        self._run_notifier()
-      result = self._ExecLU(lu)
+      _VerifyLocks(lu, glm)
+
+      if self._cbs:
+        self._cbs.NotifyStart()
+
+      try:
+        result = self._ExecLU(lu)
+      except AssertionError, err:
+        # this is a bit ugly, as we don't know from which phase
+        # (prereq, exec) this comes; but it's better than an exception
+        # with no information
+        (_, _, tb) = sys.exc_info()
+        err_info = traceback.format_tb(tb)
+        del tb
+        logging.exception("Detected AssertionError")
+        raise errors.OpExecError("Internal assertion error: please report"
+                                 " this as a bug.\nError message: '%s';"
+                                 " location:\n%s" % (str(err), err_info[-1]))
+
     elif adding_locks and acquiring_locks:
       # We could both acquire and add locks at the same level, but for now we
       # don't need this, so we'll avoid the complicated code needed.
-      raise NotImplementedError(
-        "Can't declare locks to acquire when adding others")
+      raise NotImplementedError("Can't declare locks to acquire when adding"
+                                " others")
+
     elif adding_locks or acquiring_locks:
+      self._CheckLocksEnabled()
+
       lu.DeclareLocks(level)
       share = lu.share_locks[level]
-      if acquiring_locks:
-        needed_locks = lu.needed_locks[level]
-        lu.acquired_locks[level] = self.context.glm.acquire(level,
-                                                            needed_locks,
-                                                            shared=share)
-      else: # adding_locks
-        add_locks = lu.add_locks[level]
-        lu.remove_locks[level] = add_locks
-        try:
-          self.context.glm.add(level, add_locks, acquired=1, shared=share)
-        except errors.LockError:
-          raise errors.OpPrereqError(
-            "Couldn't add locks (%s), probably because of a race condition"
-            " with another job, who added them first" % add_locks)
+      opportunistic = lu.opportunistic_locks[level]
+
       try:
+        assert adding_locks ^ acquiring_locks, \
+          "Locks must be either added or acquired"
+
+        if acquiring_locks:
+          # Acquiring locks
+          needed_locks = lu.needed_locks[level]
+
+          self._AcquireLocks(level, needed_locks, share, opportunistic,
+                             calc_timeout())
+        else:
+          # Adding locks
+          add_locks = lu.add_locks[level]
+          lu.remove_locks[level] = add_locks
+
+          try:
+            glm.add(level, add_locks, acquired=1, shared=share)
+          except errors.LockError:
+            logging.exception("Detected lock error in level %s for locks"
+                              " %s, shared=%s", level, add_locks, share)
+            raise errors.OpPrereqError(
+              "Couldn't add locks (%s), most likely because of another"
+              " job who added them first" % add_locks,
+              errors.ECODE_NOTUNIQUE)
+
         try:
-          if adding_locks:
-            lu.acquired_locks[level] = add_locks
-          result = self._LockAndExecLU(lu, level + 1)
+          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
         finally:
           if level in lu.remove_locks:
-            self.context.glm.remove(level, lu.remove_locks[level])
+            glm.remove(level, lu.remove_locks[level])
       finally:
-        if self.context.glm.is_owned(level):
-          self.context.glm.release(level)
+        if glm.is_owned(level):
+          glm.release(level)
+
     else:
-      result = self._LockAndExecLU(lu, level + 1)
+      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
 
     return result
 
-  def ExecOpCode(self, op, feedback_fn, run_notifier):
+  def ExecOpCode(self, op, cbs, timeout=None):
     """Execute an opcode.
 
     @type op: an OpCode instance
     @param op: the opcode to be executed
-    @type feedback_fn: a function that takes a single argument
-    @param feedback_fn: this function will be used as feedback from the LU
-                        code to the end-user
-    @type run_notifier: callable (no arguments) or None
-    @param run_notifier:  this function (if callable) will be called when
-                          we are about to call the lu's Exec() method, that
-                          is, after we have acquired all locks
+    @type cbs: L{OpExecCbBase}
+    @param cbs: Runtime callbacks
+    @type timeout: float or None
+    @param timeout: Maximum time to acquire all locks, None for no timeout
+    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
+        amount of time
 
     """
     if not isinstance(op, opcodes.OpCode):
       raise errors.ProgrammerError("Non-opcode instance passed"
-                                   " to ExecOpcode")
+                                   " to ExecOpcode (%s)" % type(op))
 
-    self._feedback_fn = feedback_fn
-    self._run_notifier = run_notifier
     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
     if lu_class is None:
       raise errors.OpCodeUnknown("Unknown opcode")
 
-    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
-    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
-    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
-                             shared=not lu_class.REQ_BGL)
+    if timeout is None:
+      calc_timeout = lambda: None
+    else:
+      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
+
+    self._cbs = cbs
     try:
-      self.exclusive_BGL = lu_class.REQ_BGL
-      lu = lu_class(self, op, self.context, self.rpc)
-      lu.ExpandNames()
-      assert lu.needed_locks is not None, "needed_locks not set by LU"
-      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
+      if self._enable_locks:
+        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
+        # and in a shared fashion otherwise (to prevent concurrent run with
+        # an exclusive LU.
+        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
+                            not lu_class.REQ_BGL, False, calc_timeout())
+      elif lu_class.REQ_BGL:
+        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
+                                     " disabled" % op.OP_ID)
+
+      try:
+        lu = lu_class(self, op, self.context, self.rpc)
+        lu.ExpandNames()
+        assert lu.needed_locks is not None, "needed_locks not set by LU"
+
+        try:
+          result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
+                                       calc_timeout)
+        finally:
+          if self._ec_id:
+            self.context.cfg.DropECReservations(self._ec_id)
+      finally:
+        # Release BGL if owned
+        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
+          assert self._enable_locks
+          self.context.glm.release(locking.LEVEL_CLUSTER)
     finally:
-      self.context.glm.release(locking.LEVEL_CLUSTER)
-      self.exclusive_BGL = False
+      self._cbs = None
+
+    resultcheck_fn = op.OP_RESULT
+    if not (resultcheck_fn is None or resultcheck_fn(result)):
+      logging.error("Expected opcode result matching %s, got %s",
+                    resultcheck_fn, result)
+      if not getattr(op, "dry_run", False):
+        # FIXME: LUs should still behave in dry_run mode, or
+        # alternately we should have OP_DRYRUN_RESULT; in the
+        # meantime, we simply skip the OP_RESULT check in dry-run mode
+        raise errors.OpResultError("Opcode result does not match %s: %s" %
+                                   (resultcheck_fn, utils.Truncate(result, 80)))
 
     return result
 
+  def Log(self, *args):
+    """Forward call to feedback callback function.
+
+    """
+    if self._cbs:
+      self._cbs.Feedback(*args)
+
   def LogStep(self, current, total, message):
     """Log a change in LU execution progress.
 
     """
     logging.debug("Step %d/%d %s", current, total, message)
-    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
+    self.Log("STEP %d/%d %s" % (current, total, message))
 
   def LogWarning(self, message, *args, **kwargs):
     """Log a warning to the logs and the user.
@@ -248,9 +568,9 @@ class Processor(object):
       message = message % tuple(args)
     if message:
       logging.warning(message)
-      self._feedback_fn(" - WARNING: %s" % message)
+      self.Log(" - WARNING: %s" % message)
     if "hint" in kwargs:
-      self._feedback_fn("      Hint: %s" % kwargs["hint"])
+      self.Log("      Hint: %s" % kwargs["hint"])
 
   def LogInfo(self, message, *args):
     """Log an informational message to the logs and the user.
@@ -259,118 +579,13 @@ class Processor(object):
     if args:
       message = message % tuple(args)
     logging.info(message)
-    self._feedback_fn(" - INFO: %s" % message)
-
-
-class HooksMaster(object):
-  """Hooks master.
-
-  This class distributes the run commands to the nodes based on the
-  specific LU class.
-
-  In order to remove the direct dependency on the rpc module, the
-  constructor needs a function which actually does the remote
-  call. This will usually be rpc.call_hooks_runner, but any function
-  which behaves the same works.
+    self.Log(" - INFO: %s" % message)
 
-  """
-  def __init__(self, callfn, proc, lu):
-    self.callfn = callfn
-    self.proc = proc
-    self.lu = lu
-    self.op = lu.op
-    self.env, node_list_pre, node_list_post = self._BuildEnv()
-    self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
-                      constants.HOOKS_PHASE_POST: node_list_post}
-
-  def _BuildEnv(self):
-    """Compute the environment and the target nodes.
-
-    Based on the opcode and the current node list, this builds the
-    environment for the hooks and the target node list for the run.
-
-    """
-    env = {
-      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
-      "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
-      "GANETI_OP_CODE": self.op.OP_ID,
-      "GANETI_OBJECT_TYPE": self.lu.HTYPE,
-      "GANETI_DATA_DIR": constants.DATA_DIR,
-      }
-
-    if self.lu.HPATH is not None:
-      lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
-      if lu_env:
-        for key in lu_env:
-          env["GANETI_" + key] = lu_env[key]
-    else:
-      lu_nodes_pre = lu_nodes_post = []
-
-    return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
-
-  def _RunWrapper(self, node_list, hpath, phase):
-    """Simple wrapper over self.callfn.
-
-    This method fixes the environment before doing the rpc call.
-
-    """
-    env = self.env.copy()
-    env["GANETI_HOOKS_PHASE"] = phase
-    env["GANETI_HOOKS_PATH"] = hpath
-    if self.lu.cfg is not None:
-      env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
-      env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
-
-    env = dict([(str(key), str(val)) for key, val in env.iteritems()])
-
-    return self.callfn(node_list, hpath, phase, env)
-
-  def RunPhase(self, phase):
-    """Run all the scripts for a phase.
-
-    This is the main function of the HookMaster.
-
-    @param phase: one of L{constants.HOOKS_PHASE_POST} or
-        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
-    @return: the processed results of the hooks multi-node rpc call
-    @raise errors.HooksFailure: on communication failure to the nodes
-
-    """
-    if not self.node_list[phase]:
-      # empty node list, we should not attempt to run this as either
-      # we're in the cluster init phase and the rpc client part can't
-      # even attempt to run, or this LU doesn't do hooks at all
-      return
-    hpath = self.lu.HPATH
-    results = self._RunWrapper(self.node_list[phase], hpath, phase)
-    if phase == constants.HOOKS_PHASE_PRE:
-      errs = []
-      if not results:
-        raise errors.HooksFailure("Communication failure")
-      for node_name in results:
-        res = results[node_name]
-        if res.offline:
-          continue
-        msg = res.RemoteFailMsg()
-        if msg:
-          self.proc.LogWarning("Communication failure to node %s: %s",
-                               node_name, msg)
-          continue
-        for script, hkr, output in res.payload:
-          if hkr == constants.HKR_FAIL:
-            errs.append((node_name, script, output))
-      if errs:
-        raise errors.HooksAbort(errs)
-    return results
-
-  def RunConfigUpdate(self):
-    """Run the special configuration update hook
-
-    This is a special hook that runs only on the master after each
-    top-level LI if the configuration has been updated.
+  def GetECId(self):
+    """Returns the current execution context ID.
 
     """
-    phase = constants.HOOKS_PHASE_POST
-    hpath = constants.HOOKS_NAME_CFGUPDATE
-    nodes = [self.lu.cfg.GetMasterNode()]
-    self._RunWrapper(nodes, hpath, phase)
+    if not self._ec_id:
+      raise errors.ProgrammerError("Tried to use execution context id when"
+                                   " not set")
+    return self._ec_id