Merge branch 'devel-2.7'
[ganeti-local] / lib / mcpu.py
index 37588e1..594e16e 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -28,23 +28,39 @@ are two kinds of classes defined:
 
 """
 
+import sys
 import logging
 import random
 import time
+import itertools
+import traceback
 
 from ganeti import opcodes
 from ganeti import constants
 from ganeti import errors
-from ganeti import rpc
 from ganeti import cmdlib
 from ganeti import locking
 from ganeti import utils
 from ganeti import compat
+from ganeti import pathutils
 
 
 _OP_PREFIX = "Op"
 _LU_PREFIX = "LU"
 
+#: LU classes which don't need to acquire the node allocation lock
+#: (L{locking.NAL}) when they acquire all node or node resource locks
+_NODE_ALLOC_WHITELIST = frozenset([])
+
+#: LU classes which don't need to acquire the node allocation lock
+#: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
+#: or node resource locks
+_NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
+  cmdlib.LUBackupExport,
+  cmdlib.LUBackupRemove,
+  cmdlib.LUOobCommand,
+  ])
+
 
 class LockAcquireTimeout(Exception):
   """Exception to report timeouts on acquiring locks.
@@ -56,23 +72,23 @@ def _CalculateLockAttemptTimeouts():
   """Calculate timeouts for lock attempts.
 
   """
-  result = [1.0]
+  result = [constants.LOCK_ATTEMPTS_MINWAIT]
+  running_sum = result[0]
 
-  # Wait for a total of at least 150s before doing a blocking acquire
-  while sum(result) < 150.0:
+  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
+  # blocking acquire
+  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
     timeout = (result[-1] * 1.05) ** 1.25
 
-    # Cap timeout at 10 seconds. This gives other jobs a chance to run
-    # even if we're still trying to get our locks, before finally moving
-    # to a blocking acquire.
-    if timeout > 10.0:
-      timeout = 10.0
-
-    elif timeout < 0.1:
-      # Lower boundary for safety
-      timeout = 0.1
+    # Cap max timeout. This gives other jobs a chance to run even if
+    # we're still trying to get our locks, before finally moving to a
+    # blocking acquire.
+    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
+    # And also cap the lower boundary for safety
+    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
 
     result.append(timeout)
+    running_sum += timeout
 
   return result
 
@@ -122,7 +138,7 @@ class LockAttemptTimeoutStrategy(object):
     return timeout
 
 
-class OpExecCbBase: # pylint: disable-msg=W0232
+class OpExecCbBase: # pylint: disable=W0232
   """Base class for OpCode execution callbacks.
 
   """
@@ -139,10 +155,11 @@ class OpExecCbBase: # pylint: disable-msg=W0232
 
     """
 
-  def CheckCancel(self):
-    """Check whether job has been cancelled.
+  def CurrentPriority(self): # pylint: disable=R0201
+    """Returns current priority or C{None}.
 
     """
+    return None
 
   def SubmitManyJobs(self, jobs):
     """Submits jobs for processing.
@@ -172,11 +189,119 @@ def _ComputeDispatchTable():
               if op.WITH_LU)
 
 
+def _SetBaseOpParams(src, defcomment, dst):
+  """Copies basic opcode parameters.
+
+  @type src: L{opcodes.OpCode}
+  @param src: Source opcode
+  @type defcomment: string
+  @param defcomment: Comment to specify if not already given
+  @type dst: L{opcodes.OpCode}
+  @param dst: Destination opcode
+
+  """
+  if hasattr(src, "debug_level"):
+    dst.debug_level = src.debug_level
+
+  if (getattr(dst, "priority", None) is None and
+      hasattr(src, "priority")):
+    dst.priority = src.priority
+
+  if not getattr(dst, opcodes.COMMENT_ATTR, None):
+    dst.comment = defcomment
+
+
+def _ProcessResult(submit_fn, op, result):
+  """Examines opcode result.
+
+  If necessary, additional processing on the result is done.
+
+  """
+  if isinstance(result, cmdlib.ResultWithJobs):
+    # Copy basic parameters (e.g. priority)
+    map(compat.partial(_SetBaseOpParams, op,
+                       "Submitted by %s" % op.OP_ID),
+        itertools.chain(*result.jobs))
+
+    # Submit jobs
+    job_submission = submit_fn(result.jobs)
+
+    # Build dictionary
+    result = result.other
+
+    assert constants.JOB_IDS_KEY not in result, \
+      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
+
+    result[constants.JOB_IDS_KEY] = job_submission
+
+  return result
+
+
+def _FailingSubmitManyJobs(_):
+  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
+
+  """
+  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
+                               " queries) can not submit jobs")
+
+
+def _RpcResultsToHooksResults(rpc_results):
+  """Function to convert RPC results to the format expected by HooksMaster.
+
+  @type rpc_results: dict(node: L{rpc.RpcResult})
+  @param rpc_results: RPC results
+  @rtype: dict(node: (fail_msg, offline, hooks_results))
+  @return: RPC results unpacked according to the format expected by
+    L({mcpu.HooksMaster}
+
+  """
+  return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
+              for (node, rpc_res) in rpc_results.items())
+
+
+def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
+                 _nal_whitelist=_NODE_ALLOC_WHITELIST):
+  """Performs consistency checks on locks acquired by a logical unit.
+
+  @type lu: L{cmdlib.LogicalUnit}
+  @param lu: Logical unit instance
+  @type glm: L{locking.GanetiLockManager}
+  @param glm: Lock manager
+
+  """
+  if not __debug__:
+    return
+
+  have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
+
+  for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
+    # TODO: Verify using actual lock mode, not using LU variables
+    if level in lu.needed_locks:
+      share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
+      share_level = lu.share_locks[level]
+
+      if lu.__class__ in _mode_whitelist:
+        assert share_node_alloc != share_level, \
+          "LU is whitelisted to use different modes for node allocation lock"
+      else:
+        assert bool(share_node_alloc) == bool(share_level), \
+          ("Node allocation lock must be acquired using the same mode as nodes"
+           " and node resources")
+
+      if lu.__class__ in _nal_whitelist:
+        assert not have_nal, \
+          "LU is whitelisted for not acquiring the node allocation lock"
+      elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
+        assert have_nal, \
+          ("Node allocation lock must be used if an LU acquires all nodes"
+           " or node resources")
+
+
 class Processor(object):
   """Object which runs OpCodes"""
   DISPATCH_TABLE = _ComputeDispatchTable()
 
-  def __init__(self, context, ec_id):
+  def __init__(self, context, ec_id, enable_locks=True):
     """Constructor for Processor
 
     @type context: GanetiContext
@@ -188,10 +313,20 @@ class Processor(object):
     self.context = context
     self._ec_id = ec_id
     self._cbs = None
-    self.rpc = rpc.RpcRunner(context.cfg)
+    self.rpc = context.rpc
     self.hmclass = HooksMaster
+    self._enable_locks = enable_locks
+
+  def _CheckLocksEnabled(self):
+    """Checks if locking is enabled.
 
-  def _AcquireLocks(self, level, names, shared, timeout, priority):
+    @raise errors.ProgrammerError: In case locking is not enabled
+
+    """
+    if not self._enable_locks:
+      raise errors.ProgrammerError("Attempted to use disabled locks")
+
+  def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
     """Acquires locks via the Ganeti lock manager.
 
     @type level: int
@@ -200,48 +335,38 @@ class Processor(object):
     @param names: Lock names
     @type shared: bool
     @param shared: Whether the locks should be acquired in shared mode
+    @type opportunistic: bool
+    @param opportunistic: Whether to acquire opportunistically
     @type timeout: None or float
     @param timeout: Timeout for acquiring the locks
     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
         amount of time
 
     """
+    self._CheckLocksEnabled()
+
     if self._cbs:
-      self._cbs.CheckCancel()
+      priority = self._cbs.CurrentPriority()
+    else:
+      priority = None
 
     acquired = self.context.glm.acquire(level, names, shared=shared,
-                                        timeout=timeout, priority=priority)
+                                        timeout=timeout, priority=priority,
+                                        opportunistic=opportunistic)
 
     if acquired is None:
       raise LockAcquireTimeout()
 
     return acquired
 
-  def _ProcessResult(self, result):
-    """
-
-    """
-    if isinstance(result, cmdlib.ResultWithJobs):
-      # Submit jobs
-      job_submission = self._cbs.SubmitManyJobs(result.jobs)
-
-      # Build dictionary
-      result = result.other
-
-      assert constants.JOB_IDS_KEY not in result, \
-        "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
-
-      result[constants.JOB_IDS_KEY] = job_submission
-
-    return result
-
   def _ExecLU(self, lu):
     """Logical Unit execution sequence.
 
     """
     write_count = self.context.cfg.write_count
     lu.CheckPrereq()
-    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
+
+    hm = self.BuildHooksManager(lu)
     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
                      self.Log, None)
@@ -254,8 +379,13 @@ class Processor(object):
                    " the operation")
       return lu.dry_run_result
 
+    if self._cbs:
+      submit_mj_fn = self._cbs.SubmitManyJobs
+    else:
+      submit_mj_fn = _FailingSubmitManyJobs
+
     try:
-      result = self._ProcessResult(lu.Exec(self.Log))
+      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
                                 self.Log, result)
@@ -266,7 +396,10 @@ class Processor(object):
 
     return result
 
-  def _LockAndExecLU(self, lu, level, calc_timeout, priority):
+  def BuildHooksManager(self, lu):
+    return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
+
+  def _LockAndExecLU(self, lu, level, calc_timeout):
     """Execute a Logical Unit, with the needed locks.
 
     This is a recursive function that starts locking the given level, and
@@ -274,13 +407,29 @@ class Processor(object):
     given LU and its opcodes.
 
     """
+    glm = self.context.glm
     adding_locks = level in lu.add_locks
     acquiring_locks = level in lu.needed_locks
+
     if level not in locking.LEVELS:
+      _VerifyLocks(lu, glm)
+
       if self._cbs:
         self._cbs.NotifyStart()
 
-      result = self._ExecLU(lu)
+      try:
+        result = self._ExecLU(lu)
+      except AssertionError, err:
+        # this is a bit ugly, as we don't know from which phase
+        # (prereq, exec) this comes; but it's better than an exception
+        # with no information
+        (_, _, tb) = sys.exc_info()
+        err_info = traceback.format_tb(tb)
+        del tb
+        logging.exception("Detected AssertionError")
+        raise errors.OpExecError("Internal assertion error: please report"
+                                 " this as a bug.\nError message: '%s';"
+                                 " location:\n%s" % (str(err), err_info[-1]))
 
     elif adding_locks and acquiring_locks:
       # We could both acquire and add locks at the same level, but for now we
@@ -289,8 +438,11 @@ class Processor(object):
                                 " others")
 
     elif adding_locks or acquiring_locks:
+      self._CheckLocksEnabled()
+
       lu.DeclareLocks(level)
       share = lu.share_locks[level]
+      opportunistic = lu.opportunistic_locks[level]
 
       try:
         assert adding_locks ^ acquiring_locks, \
@@ -300,40 +452,38 @@ class Processor(object):
           # Acquiring locks
           needed_locks = lu.needed_locks[level]
 
-          acquired = self._AcquireLocks(level, needed_locks, share,
-                                        calc_timeout(), priority)
+          self._AcquireLocks(level, needed_locks, share, opportunistic,
+                             calc_timeout())
         else:
           # Adding locks
           add_locks = lu.add_locks[level]
           lu.remove_locks[level] = add_locks
 
           try:
-            self.context.glm.add(level, add_locks, acquired=1, shared=share)
+            glm.add(level, add_locks, acquired=1, shared=share)
           except errors.LockError:
+            logging.exception("Detected lock error in level %s for locks"
+                              " %s, shared=%s", level, add_locks, share)
             raise errors.OpPrereqError(
-              "Couldn't add locks (%s), probably because of a race condition"
-              " with another job, who added them first" % add_locks,
-              errors.ECODE_FAULT)
-
-          acquired = add_locks
+              "Couldn't add locks (%s), most likely because of another"
+              " job who added them first" % add_locks,
+              errors.ECODE_NOTUNIQUE)
 
         try:
-          lu.acquired_locks[level] = acquired
-
-          result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
+          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
         finally:
           if level in lu.remove_locks:
-            self.context.glm.remove(level, lu.remove_locks[level])
+            glm.remove(level, lu.remove_locks[level])
       finally:
-        if self.context.glm.is_owned(level):
-          self.context.glm.release(level)
+        if glm.is_owned(level):
+          glm.release(level)
 
     else:
-      result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
+      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
 
     return result
 
-  def ExecOpCode(self, op, cbs, timeout=None, priority=None):
+  def ExecOpCode(self, op, cbs, timeout=None):
     """Execute an opcode.
 
     @type op: an OpCode instance
@@ -342,15 +492,13 @@ class Processor(object):
     @param cbs: Runtime callbacks
     @type timeout: float or None
     @param timeout: Maximum time to acquire all locks, None for no timeout
-    @type priority: number or None
-    @param priority: Priority for acquiring lock(s)
     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
         amount of time
 
     """
     if not isinstance(op, opcodes.OpCode):
       raise errors.ProgrammerError("Non-opcode instance passed"
-                                   " to ExecOpcode")
+                                   " to ExecOpcode (%s)" % type(op))
 
     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
     if lu_class is None:
@@ -363,28 +511,48 @@ class Processor(object):
 
     self._cbs = cbs
     try:
-      # Acquire the Big Ganeti Lock exclusively if this LU requires it,
-      # and in a shared fashion otherwise (to prevent concurrent run with
-      # an exclusive LU.
-      self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
-                          not lu_class.REQ_BGL, calc_timeout(),
-                          priority)
+      if self._enable_locks:
+        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
+        # and in a shared fashion otherwise (to prevent concurrent run with
+        # an exclusive LU.
+        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
+                            not lu_class.REQ_BGL, False, calc_timeout())
+      elif lu_class.REQ_BGL:
+        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
+                                     " disabled" % op.OP_ID)
+
       try:
         lu = lu_class(self, op, self.context, self.rpc)
         lu.ExpandNames()
         assert lu.needed_locks is not None, "needed_locks not set by LU"
 
         try:
-          return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
-                                     priority)
+          result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
+                                       calc_timeout)
         finally:
           if self._ec_id:
             self.context.cfg.DropECReservations(self._ec_id)
       finally:
-        self.context.glm.release(locking.LEVEL_CLUSTER)
+        # Release BGL if owned
+        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
+          assert self._enable_locks
+          self.context.glm.release(locking.LEVEL_CLUSTER)
     finally:
       self._cbs = None
 
+    resultcheck_fn = op.OP_RESULT
+    if not (resultcheck_fn is None or resultcheck_fn(result)):
+      logging.error("Expected opcode result matching %s, got %s",
+                    resultcheck_fn, result)
+      if not getattr(op, "dry_run", False):
+        # FIXME: LUs should still behave in dry_run mode, or
+        # alternately we should have OP_DRYRUN_RESULT; in the
+        # meantime, we simply skip the OP_RESULT check in dry-run mode
+        raise errors.OpResultError("Opcode result does not match %s: %s" %
+                                   (resultcheck_fn, utils.Truncate(result, 80)))
+
+    return result
+
   def Log(self, *args):
     """Forward call to feedback callback function.
 
@@ -438,28 +606,53 @@ class Processor(object):
 
 
 class HooksMaster(object):
-  """Hooks master.
+  def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
+               hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
+               cluster_name=None, master_name=None):
+    """Base class for hooks masters.
+
+    This class invokes the execution of hooks according to the behaviour
+    specified by its parameters.
+
+    @type opcode: string
+    @param opcode: opcode of the operation to which the hooks are tied
+    @type hooks_path: string
+    @param hooks_path: prefix of the hooks directories
+    @type nodes: 2-tuple of lists
+    @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
+      run and nodes on which post-hooks must be run
+    @type hooks_execution_fn: function that accepts the following parameters:
+      (node_list, hooks_path, phase, environment)
+    @param hooks_execution_fn: function that will execute the hooks; can be
+      None, indicating that no conversion is necessary.
+    @type hooks_results_adapt_fn: function
+    @param hooks_results_adapt_fn: function that will adapt the return value of
+      hooks_execution_fn to the format expected by RunPhase
+    @type build_env_fn: function that returns a dictionary having strings as
+      keys
+    @param build_env_fn: function that builds the environment for the hooks
+    @type log_fn: function that accepts a string
+    @param log_fn: logging function
+    @type htype: string or None
+    @param htype: None or one of L{constants.HTYPE_CLUSTER},
+     L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
+    @type cluster_name: string
+    @param cluster_name: name of the cluster
+    @type master_name: string
+    @param master_name: name of the master
 
-  This class distributes the run commands to the nodes based on the
-  specific LU class.
-
-  In order to remove the direct dependency on the rpc module, the
-  constructor needs a function which actually does the remote
-  call. This will usually be rpc.call_hooks_runner, but any function
-  which behaves the same works.
+    """
+    self.opcode = opcode
+    self.hooks_path = hooks_path
+    self.hooks_execution_fn = hooks_execution_fn
+    self.hooks_results_adapt_fn = hooks_results_adapt_fn
+    self.build_env_fn = build_env_fn
+    self.log_fn = log_fn
+    self.htype = htype
+    self.cluster_name = cluster_name
+    self.master_name = master_name
 
-  """
-  def __init__(self, callfn, lu):
-    self.callfn = callfn
-    self.lu = lu
-    self.op = lu.op
     self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
-
-    if self.lu.HPATH is None:
-      nodes = (None, None)
-    else:
-      nodes = map(frozenset, self.lu.BuildHooksNodes())
-
     (self.pre_nodes, self.post_nodes) = nodes
 
   def _BuildEnv(self, phase):
@@ -478,12 +671,13 @@ class HooksMaster(object):
 
     env = {}
 
-    if self.lu.HPATH is not None:
-      lu_env = self.lu.BuildHooksEnv()
-      if lu_env:
-        assert not compat.any(key.upper().startswith(prefix) for key in lu_env)
+    if self.hooks_path is not None:
+      phase_env = self.build_env_fn()
+      if phase_env:
+        assert not compat.any(key.upper().startswith(prefix)
+                              for key in phase_env)
         env.update(("%s%s" % (prefix, key), value)
-                   for (key, value) in lu_env.items())
+                   for (key, value) in phase_env.items())
 
     if phase == constants.HOOKS_PHASE_PRE:
       assert compat.all((key.startswith("GANETI_") and
@@ -506,30 +700,29 @@ class HooksMaster(object):
   def _RunWrapper(self, node_list, hpath, phase, phase_env):
     """Simple wrapper over self.callfn.
 
-    This method fixes the environment before doing the rpc call.
+    This method fixes the environment before executing the hooks.
 
     """
-    cfg = self.lu.cfg
-
     env = {
-      "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
+      "PATH": constants.HOOKS_PATH,
       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
-      "GANETI_OP_CODE": self.op.OP_ID,
-      "GANETI_DATA_DIR": constants.DATA_DIR,
+      "GANETI_OP_CODE": self.opcode,
+      "GANETI_DATA_DIR": pathutils.DATA_DIR,
       "GANETI_HOOKS_PHASE": phase,
       "GANETI_HOOKS_PATH": hpath,
       }
 
-    if self.lu.HTYPE:
-      env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE
+    if self.htype:
+      env["GANETI_OBJECT_TYPE"] = self.htype
 
-    if cfg is not None:
-      env["GANETI_CLUSTER"] = cfg.GetClusterName()
-      env["GANETI_MASTER"] = cfg.GetMasterNode()
+    if self.cluster_name is not None:
+      env["GANETI_CLUSTER"] = self.cluster_name
+
+    if self.master_name is not None:
+      env["GANETI_MASTER"] = self.master_name
 
     if phase_env:
-      assert not (set(env) & set(phase_env)), "Environment variables conflict"
-      env.update(phase_env)
+      env = utils.algo.JoinDisjointDicts(env, phase_env)
 
     # Convert everything to strings
     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
@@ -537,12 +730,15 @@ class HooksMaster(object):
     assert compat.all(key == "PATH" or key.startswith("GANETI_")
                       for key in env)
 
-    return self.callfn(node_list, hpath, phase, env)
+    return self.hooks_execution_fn(node_list, hpath, phase, env)
 
   def RunPhase(self, phase, nodes=None):
     """Run all the scripts for a phase.
 
     This is the main function of the HookMaster.
+    It executes self.hooks_execution_fn, and after running
+    self.hooks_results_adapt_fn on its results it expects them to be in the form
+    {node_name: (fail_msg, [(script, result, output), ...]}).
 
     @param phase: one of L{constants.HOOKS_PHASE_POST} or
         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
@@ -569,36 +765,37 @@ class HooksMaster(object):
       # even attempt to run, or this LU doesn't do hooks at all
       return
 
-    results = self._RunWrapper(nodes, self.lu.HPATH, phase, env)
+    results = self._RunWrapper(nodes, self.hooks_path, phase, env)
     if not results:
       msg = "Communication Failure"
       if phase == constants.HOOKS_PHASE_PRE:
         raise errors.HooksFailure(msg)
       else:
-        self.lu.LogWarning(msg)
+        self.log_fn(msg)
         return results
 
+    converted_res = results
+    if self.hooks_results_adapt_fn:
+      converted_res = self.hooks_results_adapt_fn(results)
+
     errs = []
-    for node_name in results:
-      res = results[node_name]
-      if res.offline:
+    for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
+      if offline:
         continue
 
-      msg = res.fail_msg
-      if msg:
-        self.lu.LogWarning("Communication failure to node %s: %s",
-                           node_name, msg)
+      if fail_msg:
+        self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
         continue
 
-      for script, hkr, output in res.payload:
+      for script, hkr, output in hooks_results:
         if hkr == constants.HKR_FAIL:
           if phase == constants.HOOKS_PHASE_PRE:
             errs.append((node_name, script, output))
           else:
             if not output:
               output = "(no output)"
-            self.lu.LogWarning("On %s script %s failed, output: %s" %
-                               (node_name, script, output))
+            self.log_fn("On %s script %s failed, output: %s" %
+                        (node_name, script, output))
 
     if errs and phase == constants.HOOKS_PHASE_PRE:
       raise errors.HooksAbort(errs)
@@ -614,5 +811,21 @@ class HooksMaster(object):
     """
     phase = constants.HOOKS_PHASE_POST
     hpath = constants.HOOKS_NAME_CFGUPDATE
-    nodes = [self.lu.cfg.GetMasterNode()]
+    nodes = [self.master_name]
     self._RunWrapper(nodes, hpath, phase, self.pre_env)
+
+  @staticmethod
+  def BuildFromLu(hooks_execution_fn, lu):
+    if lu.HPATH is None:
+      nodes = (None, None)
+    else:
+      nodes = map(frozenset, lu.BuildHooksNodes())
+
+    master_name = cluster_name = None
+    if lu.cfg:
+      master_name = lu.cfg.GetMasterNode()
+      cluster_name = lu.cfg.GetClusterName()
+
+    return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
+                       _RpcResultsToHooksResults, lu.BuildHooksEnv,
+                       lu.LogWarning, lu.HTYPE, cluster_name, master_name)