X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/dd7f6776235601c60ce4257c710a2bc0be80a206..596f913e8955cd75ed82de3a802cdbb4488b0615:/lib/mcpu.py diff --git a/lib/mcpu.py b/lib/mcpu.py index 7ab7c18..594e16e 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -28,23 +28,39 @@ are two kinds of classes defined: """ +import sys import logging import random import time +import itertools +import traceback from ganeti import opcodes from ganeti import constants from ganeti import errors -from ganeti import rpc from ganeti import cmdlib from ganeti import locking from ganeti import utils from ganeti import compat +from ganeti import pathutils _OP_PREFIX = "Op" _LU_PREFIX = "LU" +#: LU classes which don't need to acquire the node allocation lock +#: (L{locking.NAL}) when they acquire all node or node resource locks +_NODE_ALLOC_WHITELIST = frozenset([]) + +#: LU classes which don't need to acquire the node allocation lock +#: (L{locking.NAL}) in the same mode (shared/exclusive) as the node +#: or node resource locks +_NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([ + cmdlib.LUBackupExport, + cmdlib.LUBackupRemove, + cmdlib.LUOobCommand, + ]) + class LockAcquireTimeout(Exception): """Exception to report timeouts on acquiring locks. @@ -56,23 +72,23 @@ def _CalculateLockAttemptTimeouts(): """Calculate timeouts for lock attempts. """ - result = [1.0] + result = [constants.LOCK_ATTEMPTS_MINWAIT] + running_sum = result[0] - # Wait for a total of at least 150s before doing a blocking acquire - while sum(result) < 150.0: + # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a + # blocking acquire + while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: timeout = (result[-1] * 1.05) ** 1.25 - # Cap timeout at 10 seconds. This gives other jobs a chance to run - # even if we're still trying to get our locks, before finally moving - # to a blocking acquire. - if timeout > 10.0: - timeout = 10.0 - - elif timeout < 0.1: - # Lower boundary for safety - timeout = 0.1 + # Cap max timeout. This gives other jobs a chance to run even if + # we're still trying to get our locks, before finally moving to a + # blocking acquire. + timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) + # And also cap the lower boundary for safety + timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) result.append(timeout) + running_sum += timeout return result @@ -122,7 +138,7 @@ class LockAttemptTimeoutStrategy(object): return timeout -class OpExecCbBase: # pylint: disable-msg=W0232 +class OpExecCbBase: # pylint: disable=W0232 """Base class for OpCode execution callbacks. """ @@ -139,10 +155,19 @@ class OpExecCbBase: # pylint: disable-msg=W0232 """ - def CheckCancel(self): - """Check whether job has been cancelled. + def CurrentPriority(self): # pylint: disable=R0201 + """Returns current priority or C{None}. """ + return None + + def SubmitManyJobs(self, jobs): + """Submits jobs for processing. + + See L{jqueue.JobQueue.SubmitManyJobs}. + + """ + raise NotImplementedError def _LUNameForOpName(opname): @@ -164,11 +189,119 @@ def _ComputeDispatchTable(): if op.WITH_LU) +def _SetBaseOpParams(src, defcomment, dst): + """Copies basic opcode parameters. + + @type src: L{opcodes.OpCode} + @param src: Source opcode + @type defcomment: string + @param defcomment: Comment to specify if not already given + @type dst: L{opcodes.OpCode} + @param dst: Destination opcode + + """ + if hasattr(src, "debug_level"): + dst.debug_level = src.debug_level + + if (getattr(dst, "priority", None) is None and + hasattr(src, "priority")): + dst.priority = src.priority + + if not getattr(dst, opcodes.COMMENT_ATTR, None): + dst.comment = defcomment + + +def _ProcessResult(submit_fn, op, result): + """Examines opcode result. + + If necessary, additional processing on the result is done. + + """ + if isinstance(result, cmdlib.ResultWithJobs): + # Copy basic parameters (e.g. priority) + map(compat.partial(_SetBaseOpParams, op, + "Submitted by %s" % op.OP_ID), + itertools.chain(*result.jobs)) + + # Submit jobs + job_submission = submit_fn(result.jobs) + + # Build dictionary + result = result.other + + assert constants.JOB_IDS_KEY not in result, \ + "Key '%s' found in additional return values" % constants.JOB_IDS_KEY + + result[constants.JOB_IDS_KEY] = job_submission + + return result + + +def _FailingSubmitManyJobs(_): + """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception. + + """ + raise errors.ProgrammerError("Opcodes processed without callbacks (e.g." + " queries) can not submit jobs") + + +def _RpcResultsToHooksResults(rpc_results): + """Function to convert RPC results to the format expected by HooksMaster. + + @type rpc_results: dict(node: L{rpc.RpcResult}) + @param rpc_results: RPC results + @rtype: dict(node: (fail_msg, offline, hooks_results)) + @return: RPC results unpacked according to the format expected by + L({mcpu.HooksMaster} + + """ + return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload)) + for (node, rpc_res) in rpc_results.items()) + + +def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST, + _nal_whitelist=_NODE_ALLOC_WHITELIST): + """Performs consistency checks on locks acquired by a logical unit. + + @type lu: L{cmdlib.LogicalUnit} + @param lu: Logical unit instance + @type glm: L{locking.GanetiLockManager} + @param glm: Lock manager + + """ + if not __debug__: + return + + have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL) + + for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]: + # TODO: Verify using actual lock mode, not using LU variables + if level in lu.needed_locks: + share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC] + share_level = lu.share_locks[level] + + if lu.__class__ in _mode_whitelist: + assert share_node_alloc != share_level, \ + "LU is whitelisted to use different modes for node allocation lock" + else: + assert bool(share_node_alloc) == bool(share_level), \ + ("Node allocation lock must be acquired using the same mode as nodes" + " and node resources") + + if lu.__class__ in _nal_whitelist: + assert not have_nal, \ + "LU is whitelisted for not acquiring the node allocation lock" + elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level): + assert have_nal, \ + ("Node allocation lock must be used if an LU acquires all nodes" + " or node resources") + + class Processor(object): """Object which runs OpCodes""" DISPATCH_TABLE = _ComputeDispatchTable() - def __init__(self, context, ec_id): + def __init__(self, context, ec_id, enable_locks=True): """Constructor for Processor @type context: GanetiContext @@ -180,10 +313,20 @@ class Processor(object): self.context = context self._ec_id = ec_id self._cbs = None - self.rpc = rpc.RpcRunner(context.cfg) + self.rpc = context.rpc self.hmclass = HooksMaster + self._enable_locks = enable_locks + + def _CheckLocksEnabled(self): + """Checks if locking is enabled. + + @raise errors.ProgrammerError: In case locking is not enabled - def _AcquireLocks(self, level, names, shared, timeout, priority): + """ + if not self._enable_locks: + raise errors.ProgrammerError("Attempted to use disabled locks") + + def _AcquireLocks(self, level, names, shared, opportunistic, timeout): """Acquires locks via the Ganeti lock manager. @type level: int @@ -192,17 +335,24 @@ class Processor(object): @param names: Lock names @type shared: bool @param shared: Whether the locks should be acquired in shared mode + @type opportunistic: bool + @param opportunistic: Whether to acquire opportunistically @type timeout: None or float @param timeout: Timeout for acquiring the locks @raise LockAcquireTimeout: In case locks couldn't be acquired in specified amount of time """ + self._CheckLocksEnabled() + if self._cbs: - self._cbs.CheckCancel() + priority = self._cbs.CurrentPriority() + else: + priority = None acquired = self.context.glm.acquire(level, names, shared=shared, - timeout=timeout, priority=priority) + timeout=timeout, priority=priority, + opportunistic=opportunistic) if acquired is None: raise LockAcquireTimeout() @@ -215,7 +365,8 @@ class Processor(object): """ write_count = self.context.cfg.write_count lu.CheckPrereq() - hm = HooksMaster(self.rpc.call_hooks_runner, lu) + + hm = self.BuildHooksManager(lu) h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, self.Log, None) @@ -228,8 +379,13 @@ class Processor(object): " the operation") return lu.dry_run_result + if self._cbs: + submit_mj_fn = self._cbs.SubmitManyJobs + else: + submit_mj_fn = _FailingSubmitManyJobs + try: - result = lu.Exec(self.Log) + result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log)) h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, self.Log, result) @@ -240,7 +396,10 @@ class Processor(object): return result - def _LockAndExecLU(self, lu, level, calc_timeout, priority): + def BuildHooksManager(self, lu): + return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu) + + def _LockAndExecLU(self, lu, level, calc_timeout): """Execute a Logical Unit, with the needed locks. This is a recursive function that starts locking the given level, and @@ -248,13 +407,29 @@ class Processor(object): given LU and its opcodes. """ + glm = self.context.glm adding_locks = level in lu.add_locks acquiring_locks = level in lu.needed_locks + if level not in locking.LEVELS: + _VerifyLocks(lu, glm) + if self._cbs: self._cbs.NotifyStart() - result = self._ExecLU(lu) + try: + result = self._ExecLU(lu) + except AssertionError, err: + # this is a bit ugly, as we don't know from which phase + # (prereq, exec) this comes; but it's better than an exception + # with no information + (_, _, tb) = sys.exc_info() + err_info = traceback.format_tb(tb) + del tb + logging.exception("Detected AssertionError") + raise errors.OpExecError("Internal assertion error: please report" + " this as a bug.\nError message: '%s';" + " location:\n%s" % (str(err), err_info[-1])) elif adding_locks and acquiring_locks: # We could both acquire and add locks at the same level, but for now we @@ -263,8 +438,11 @@ class Processor(object): " others") elif adding_locks or acquiring_locks: + self._CheckLocksEnabled() + lu.DeclareLocks(level) share = lu.share_locks[level] + opportunistic = lu.opportunistic_locks[level] try: assert adding_locks ^ acquiring_locks, \ @@ -274,40 +452,38 @@ class Processor(object): # Acquiring locks needed_locks = lu.needed_locks[level] - acquired = self._AcquireLocks(level, needed_locks, share, - calc_timeout(), priority) + self._AcquireLocks(level, needed_locks, share, opportunistic, + calc_timeout()) else: # Adding locks add_locks = lu.add_locks[level] lu.remove_locks[level] = add_locks try: - self.context.glm.add(level, add_locks, acquired=1, shared=share) + glm.add(level, add_locks, acquired=1, shared=share) except errors.LockError: + logging.exception("Detected lock error in level %s for locks" + " %s, shared=%s", level, add_locks, share) raise errors.OpPrereqError( - "Couldn't add locks (%s), probably because of a race condition" - " with another job, who added them first" % add_locks, - errors.ECODE_FAULT) - - acquired = add_locks + "Couldn't add locks (%s), most likely because of another" + " job who added them first" % add_locks, + errors.ECODE_NOTUNIQUE) try: - lu.acquired_locks[level] = acquired - - result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) + result = self._LockAndExecLU(lu, level + 1, calc_timeout) finally: if level in lu.remove_locks: - self.context.glm.remove(level, lu.remove_locks[level]) + glm.remove(level, lu.remove_locks[level]) finally: - if self.context.glm.is_owned(level): - self.context.glm.release(level) + if glm.is_owned(level): + glm.release(level) else: - result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) + result = self._LockAndExecLU(lu, level + 1, calc_timeout) return result - def ExecOpCode(self, op, cbs, timeout=None, priority=None): + def ExecOpCode(self, op, cbs, timeout=None): """Execute an opcode. @type op: an OpCode instance @@ -316,15 +492,13 @@ class Processor(object): @param cbs: Runtime callbacks @type timeout: float or None @param timeout: Maximum time to acquire all locks, None for no timeout - @type priority: number or None - @param priority: Priority for acquiring lock(s) @raise LockAcquireTimeout: In case locks couldn't be acquired in specified amount of time """ if not isinstance(op, opcodes.OpCode): raise errors.ProgrammerError("Non-opcode instance passed" - " to ExecOpcode") + " to ExecOpcode (%s)" % type(op)) lu_class = self.DISPATCH_TABLE.get(op.__class__, None) if lu_class is None: @@ -337,28 +511,48 @@ class Processor(object): self._cbs = cbs try: - # Acquire the Big Ganeti Lock exclusively if this LU requires it, - # and in a shared fashion otherwise (to prevent concurrent run with - # an exclusive LU. - self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, - not lu_class.REQ_BGL, calc_timeout(), - priority) + if self._enable_locks: + # Acquire the Big Ganeti Lock exclusively if this LU requires it, + # and in a shared fashion otherwise (to prevent concurrent run with + # an exclusive LU. + self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, + not lu_class.REQ_BGL, False, calc_timeout()) + elif lu_class.REQ_BGL: + raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" + " disabled" % op.OP_ID) + try: lu = lu_class(self, op, self.context, self.rpc) lu.ExpandNames() assert lu.needed_locks is not None, "needed_locks not set by LU" try: - return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout, - priority) + result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1, + calc_timeout) finally: if self._ec_id: self.context.cfg.DropECReservations(self._ec_id) finally: - self.context.glm.release(locking.LEVEL_CLUSTER) + # Release BGL if owned + if self.context.glm.is_owned(locking.LEVEL_CLUSTER): + assert self._enable_locks + self.context.glm.release(locking.LEVEL_CLUSTER) finally: self._cbs = None + resultcheck_fn = op.OP_RESULT + if not (resultcheck_fn is None or resultcheck_fn(result)): + logging.error("Expected opcode result matching %s, got %s", + resultcheck_fn, result) + if not getattr(op, "dry_run", False): + # FIXME: LUs should still behave in dry_run mode, or + # alternately we should have OP_DRYRUN_RESULT; in the + # meantime, we simply skip the OP_RESULT check in dry-run mode + raise errors.OpResultError("Opcode result does not match %s: %s" % + (resultcheck_fn, utils.Truncate(result, 80))) + + return result + def Log(self, *args): """Forward call to feedback callback function. @@ -412,23 +606,54 @@ class Processor(object): class HooksMaster(object): - """Hooks master. + def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn, + hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, + cluster_name=None, master_name=None): + """Base class for hooks masters. + + This class invokes the execution of hooks according to the behaviour + specified by its parameters. + + @type opcode: string + @param opcode: opcode of the operation to which the hooks are tied + @type hooks_path: string + @param hooks_path: prefix of the hooks directories + @type nodes: 2-tuple of lists + @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be + run and nodes on which post-hooks must be run + @type hooks_execution_fn: function that accepts the following parameters: + (node_list, hooks_path, phase, environment) + @param hooks_execution_fn: function that will execute the hooks; can be + None, indicating that no conversion is necessary. + @type hooks_results_adapt_fn: function + @param hooks_results_adapt_fn: function that will adapt the return value of + hooks_execution_fn to the format expected by RunPhase + @type build_env_fn: function that returns a dictionary having strings as + keys + @param build_env_fn: function that builds the environment for the hooks + @type log_fn: function that accepts a string + @param log_fn: logging function + @type htype: string or None + @param htype: None or one of L{constants.HTYPE_CLUSTER}, + L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE} + @type cluster_name: string + @param cluster_name: name of the cluster + @type master_name: string + @param master_name: name of the master - This class distributes the run commands to the nodes based on the - specific LU class. - - In order to remove the direct dependency on the rpc module, the - constructor needs a function which actually does the remote - call. This will usually be rpc.call_hooks_runner, but any function - which behaves the same works. - - """ - def __init__(self, callfn, lu): - self.callfn = callfn - self.lu = lu - self.op = lu.op - self.pre_env = None - self.pre_nodes = None + """ + self.opcode = opcode + self.hooks_path = hooks_path + self.hooks_execution_fn = hooks_execution_fn + self.hooks_results_adapt_fn = hooks_results_adapt_fn + self.build_env_fn = build_env_fn + self.log_fn = log_fn + self.htype = htype + self.cluster_name = cluster_name + self.master_name = master_name + + self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE) + (self.pre_nodes, self.post_nodes) = nodes def _BuildEnv(self, phase): """Compute the environment and the target nodes. @@ -446,61 +671,58 @@ class HooksMaster(object): env = {} - if self.lu.HPATH is not None: - (lu_env, lu_nodes_pre, lu_nodes_post) = self.lu.BuildHooksEnv() - if lu_env: + if self.hooks_path is not None: + phase_env = self.build_env_fn() + if phase_env: assert not compat.any(key.upper().startswith(prefix) - for key in lu_env) + for key in phase_env) env.update(("%s%s" % (prefix, key), value) - for (key, value) in lu_env.items()) - else: - lu_nodes_pre = lu_nodes_post = [] + for (key, value) in phase_env.items()) if phase == constants.HOOKS_PHASE_PRE: assert compat.all((key.startswith("GANETI_") and not key.startswith("GANETI_POST_")) for key in env) - # Record environment for any post-phase hooks - self.pre_env = env - elif phase == constants.HOOKS_PHASE_POST: assert compat.all(key.startswith("GANETI_POST_") for key in env) + assert isinstance(self.pre_env, dict) - if self.pre_env: - assert not compat.any(key.startswith("GANETI_POST_") - for key in self.pre_env) - env.update(self.pre_env) + # Merge with pre-phase environment + assert not compat.any(key.startswith("GANETI_POST_") + for key in self.pre_env) + env.update(self.pre_env) else: raise AssertionError("Unknown phase '%s'" % phase) - return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post) + return env def _RunWrapper(self, node_list, hpath, phase, phase_env): """Simple wrapper over self.callfn. - This method fixes the environment before doing the rpc call. + This method fixes the environment before executing the hooks. """ - cfg = self.lu.cfg - env = { - "PATH": "/sbin:/bin:/usr/sbin:/usr/bin", + "PATH": constants.HOOKS_PATH, "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION, - "GANETI_OP_CODE": self.op.OP_ID, - "GANETI_OBJECT_TYPE": self.lu.HTYPE, - "GANETI_DATA_DIR": constants.DATA_DIR, + "GANETI_OP_CODE": self.opcode, + "GANETI_DATA_DIR": pathutils.DATA_DIR, "GANETI_HOOKS_PHASE": phase, "GANETI_HOOKS_PATH": hpath, } - if cfg is not None: - env["GANETI_CLUSTER"] = cfg.GetClusterName() - env["GANETI_MASTER"] = cfg.GetMasterNode() + if self.htype: + env["GANETI_OBJECT_TYPE"] = self.htype + + if self.cluster_name is not None: + env["GANETI_CLUSTER"] = self.cluster_name + + if self.master_name is not None: + env["GANETI_MASTER"] = self.master_name if phase_env: - assert not (set(env) & set(phase_env)), "Environment variables conflict" - env.update(phase_env) + env = utils.algo.JoinDisjointDicts(env, phase_env) # Convert everything to strings env = dict([(str(key), str(val)) for key, val in env.iteritems()]) @@ -508,12 +730,15 @@ class HooksMaster(object): assert compat.all(key == "PATH" or key.startswith("GANETI_") for key in env) - return self.callfn(node_list, hpath, phase, env) + return self.hooks_execution_fn(node_list, hpath, phase, env) def RunPhase(self, phase, nodes=None): """Run all the scripts for a phase. This is the main function of the HookMaster. + It executes self.hooks_execution_fn, and after running + self.hooks_results_adapt_fn on its results it expects them to be in the form + {node_name: (fail_msg, [(script, result, output), ...]}). @param phase: one of L{constants.HOOKS_PHASE_POST} or L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase @@ -523,19 +748,16 @@ class HooksMaster(object): @raise errors.HooksAbort: on failure of one of the hooks """ - (env, node_list_pre, node_list_post) = self._BuildEnv(phase) - if nodes is None: - if phase == constants.HOOKS_PHASE_PRE: - self.pre_nodes = (node_list_pre, node_list_post) - nodes = node_list_pre - elif phase == constants.HOOKS_PHASE_POST: - post_nodes = (node_list_pre, node_list_post) - assert self.pre_nodes == post_nodes, \ - ("Node lists returned for post-phase hook don't match pre-phase" - " lists (pre %s, post %s)" % (self.pre_nodes, post_nodes)) - nodes = node_list_post - else: - raise AssertionError("Unknown phase '%s'" % phase) + if phase == constants.HOOKS_PHASE_PRE: + if nodes is None: + nodes = self.pre_nodes + env = self.pre_env + elif phase == constants.HOOKS_PHASE_POST: + if nodes is None: + nodes = self.post_nodes + env = self._BuildEnv(phase) + else: + raise AssertionError("Unknown phase '%s'" % phase) if not nodes: # empty node list, we should not attempt to run this as either @@ -543,36 +765,37 @@ class HooksMaster(object): # even attempt to run, or this LU doesn't do hooks at all return - results = self._RunWrapper(nodes, self.lu.HPATH, phase, env) + results = self._RunWrapper(nodes, self.hooks_path, phase, env) if not results: msg = "Communication Failure" if phase == constants.HOOKS_PHASE_PRE: raise errors.HooksFailure(msg) else: - self.lu.LogWarning(msg) + self.log_fn(msg) return results + converted_res = results + if self.hooks_results_adapt_fn: + converted_res = self.hooks_results_adapt_fn(results) + errs = [] - for node_name in results: - res = results[node_name] - if res.offline: + for node_name, (fail_msg, offline, hooks_results) in converted_res.items(): + if offline: continue - msg = res.fail_msg - if msg: - self.lu.LogWarning("Communication failure to node %s: %s", - node_name, msg) + if fail_msg: + self.log_fn("Communication failure to node %s: %s", node_name, fail_msg) continue - for script, hkr, output in res.payload: + for script, hkr, output in hooks_results: if hkr == constants.HKR_FAIL: if phase == constants.HOOKS_PHASE_PRE: errs.append((node_name, script, output)) else: if not output: output = "(no output)" - self.lu.LogWarning("On %s script %s failed, output: %s" % - (node_name, script, output)) + self.log_fn("On %s script %s failed, output: %s" % + (node_name, script, output)) if errs and phase == constants.HOOKS_PHASE_PRE: raise errors.HooksAbort(errs) @@ -586,10 +809,23 @@ class HooksMaster(object): top-level LI if the configuration has been updated. """ - if self.pre_env is None: - raise AssertionError("Pre-phase must be run before configuration update") - phase = constants.HOOKS_PHASE_POST hpath = constants.HOOKS_NAME_CFGUPDATE - nodes = [self.lu.cfg.GetMasterNode()] + nodes = [self.master_name] self._RunWrapper(nodes, hpath, phase, self.pre_env) + + @staticmethod + def BuildFromLu(hooks_execution_fn, lu): + if lu.HPATH is None: + nodes = (None, None) + else: + nodes = map(frozenset, lu.BuildHooksNodes()) + + master_name = cluster_name = None + if lu.cfg: + master_name = lu.cfg.GetMasterNode() + cluster_name = lu.cfg.GetClusterName() + + return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn, + _RpcResultsToHooksResults, lu.BuildHooksEnv, + lu.LogWarning, lu.HTYPE, cluster_name, master_name)