from ganeti import opcodes
from ganeti import constants
from ganeti import errors
-from ganeti import rpc
from ganeti import cmdlib
from ganeti import locking
from ganeti import utils
"""Calculate timeouts for lock attempts.
"""
- result = [1.0]
+ result = [constants.LOCK_ATTEMPTS_MINWAIT]
+ running_sum = result[0]
- # Wait for a total of at least 150s before doing a blocking acquire
- while sum(result) < 150.0:
+ # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
+ # blocking acquire
+ while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
timeout = (result[-1] * 1.05) ** 1.25
- # Cap timeout at 10 seconds. This gives other jobs a chance to run
- # even if we're still trying to get our locks, before finally moving
- # to a blocking acquire.
- if timeout > 10.0:
- timeout = 10.0
-
- elif timeout < 0.1:
- # Lower boundary for safety
- timeout = 0.1
+ # Cap max timeout. This gives other jobs a chance to run even if
+ # we're still trying to get our locks, before finally moving to a
+ # blocking acquire.
+ timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
+ # And also cap the lower boundary for safety
+ timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
result.append(timeout)
+ running_sum += timeout
return result
return timeout
-class OpExecCbBase: # pylint: disable-msg=W0232
+class OpExecCbBase: # pylint: disable=W0232
"""Base class for OpCode execution callbacks.
"""
"""
+ def SubmitManyJobs(self, jobs):
+ """Submits jobs for processing.
+
+ See L{jqueue.JobQueue.SubmitManyJobs}.
+
+ """
+ raise NotImplementedError
+
def _LUNameForOpName(opname):
"""Computes the LU name for a given OpCode name.
if op.WITH_LU)
+def _RpcResultsToHooksResults(rpc_results):
+ """Function to convert RPC results to the format expected by HooksMaster.
+
+ @type rpc_results: dict(node: L{rpc.RpcResult})
+ @param rpc_results: RPC results
+ @rtype: dict(node: (fail_msg, offline, hooks_results))
+ @return: RPC results unpacked according to the format expected by
+ L({mcpu.HooksMaster}
+
+ """
+ return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
+ for (node, rpc_res) in rpc_results.items())
+
+
class Processor(object):
"""Object which runs OpCodes"""
DISPATCH_TABLE = _ComputeDispatchTable()
self.context = context
self._ec_id = ec_id
self._cbs = None
- self.rpc = rpc.RpcRunner(context.cfg)
+ self.rpc = context.rpc
self.hmclass = HooksMaster
def _AcquireLocks(self, level, names, shared, timeout, priority):
return acquired
+ def _ProcessResult(self, result):
+ """Examines opcode result.
+
+ If necessary, additional processing on the result is done.
+
+ """
+ if isinstance(result, cmdlib.ResultWithJobs):
+ # Submit jobs
+ job_submission = self._cbs.SubmitManyJobs(result.jobs)
+
+ # Build dictionary
+ result = result.other
+
+ assert constants.JOB_IDS_KEY not in result, \
+ "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
+
+ result[constants.JOB_IDS_KEY] = job_submission
+
+ return result
+
def _ExecLU(self, lu):
"""Logical Unit execution sequence.
"""
write_count = self.context.cfg.write_count
lu.CheckPrereq()
- hm = HooksMaster(self.rpc.call_hooks_runner, lu)
+
+ hm = self.BuildHooksManager(lu)
h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
self.Log, None)
return lu.dry_run_result
try:
- result = lu.Exec(self.Log)
+ result = self._ProcessResult(lu.Exec(self.Log))
h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
self.Log, result)
return result
+ def BuildHooksManager(self, lu):
+ return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
+
def _LockAndExecLU(self, lu, level, calc_timeout, priority):
"""Execute a Logical Unit, with the needed locks.
# Acquiring locks
needed_locks = lu.needed_locks[level]
- acquired = self._AcquireLocks(level, needed_locks, share,
- calc_timeout(), priority)
+ self._AcquireLocks(level, needed_locks, share,
+ calc_timeout(), priority)
else:
# Adding locks
add_locks = lu.add_locks[level]
" with another job, who added them first" % add_locks,
errors.ECODE_FAULT)
- acquired = add_locks
-
try:
- lu.acquired_locks[level] = acquired
-
result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
finally:
if level in lu.remove_locks:
"""
if not isinstance(op, opcodes.OpCode):
raise errors.ProgrammerError("Non-opcode instance passed"
- " to ExecOpcode")
+ " to ExecOpcode (%s)" % type(op))
lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
if lu_class is None:
assert lu.needed_locks is not None, "needed_locks not set by LU"
try:
- return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
- priority)
+ result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
+ priority)
finally:
if self._ec_id:
self.context.cfg.DropECReservations(self._ec_id)
finally:
self._cbs = None
+ resultcheck_fn = op.OP_RESULT
+ if not (resultcheck_fn is None or resultcheck_fn(result)):
+ logging.error("Expected opcode result matching %s, got %s",
+ resultcheck_fn, result)
+ raise errors.OpResultError("Opcode result does not match %s" %
+ resultcheck_fn)
+
+ return result
+
def Log(self, *args):
"""Forward call to feedback callback function.
class HooksMaster(object):
- """Hooks master.
-
- This class distributes the run commands to the nodes based on the
- specific LU class.
+ def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
+ hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, cluster_name=None,
+ master_name=None):
+ """Base class for hooks masters.
+
+ This class invokes the execution of hooks according to the behaviour
+ specified by its parameters.
+
+ @type opcode: string
+ @param opcode: opcode of the operation to which the hooks are tied
+ @type hooks_path: string
+ @param hooks_path: prefix of the hooks directories
+ @type nodes: 2-tuple of lists
+ @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
+ run and nodes on which post-hooks must be run
+ @type hooks_execution_fn: function that accepts the following parameters:
+ (node_list, hooks_path, phase, environment)
+ @param hooks_execution_fn: function that will execute the hooks; can be
+ None, indicating that no conversion is necessary.
+ @type hooks_results_adapt_fn: function
+ @param hooks_results_adapt_fn: function that will adapt the return value of
+ hooks_execution_fn to the format expected by RunPhase
+ @type build_env_fn: function that returns a dictionary having strings as
+ keys
+ @param build_env_fn: function that builds the environment for the hooks
+ @type log_fn: function that accepts a string
+ @param log_fn: logging function
+ @type htype: string or None
+ @param htype: None or one of L{constants.HTYPE_CLUSTER},
+ L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
+ @type cluster_name: string
+ @param cluster_name: name of the cluster
+ @type master_name: string
+ @param master_name: name of the master
- In order to remove the direct dependency on the rpc module, the
- constructor needs a function which actually does the remote
- call. This will usually be rpc.call_hooks_runner, but any function
- which behaves the same works.
-
- """
- def __init__(self, callfn, lu):
- self.callfn = callfn
- self.lu = lu
- self.op = lu.op
- self.pre_env = None
- self.pre_nodes = None
+ """
+ self.opcode = opcode
+ self.hooks_path = hooks_path
+ self.hooks_execution_fn = hooks_execution_fn
+ self.hooks_results_adapt_fn = hooks_results_adapt_fn
+ self.build_env_fn = build_env_fn
+ self.log_fn = log_fn
+ self.htype = htype
+ self.cluster_name = cluster_name
+ self.master_name = master_name
+
+ self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
+ (self.pre_nodes, self.post_nodes) = nodes
def _BuildEnv(self, phase):
"""Compute the environment and the target nodes.
env = {}
- if self.lu.HPATH is not None:
- (lu_env, lu_nodes_pre, lu_nodes_post) = self.lu.BuildHooksEnv()
- if lu_env:
+ if self.hooks_path is not None:
+ phase_env = self.build_env_fn()
+ if phase_env:
assert not compat.any(key.upper().startswith(prefix)
- for key in lu_env)
+ for key in phase_env)
env.update(("%s%s" % (prefix, key), value)
- for (key, value) in lu_env.items())
- else:
- lu_nodes_pre = lu_nodes_post = []
+ for (key, value) in phase_env.items())
if phase == constants.HOOKS_PHASE_PRE:
assert compat.all((key.startswith("GANETI_") and
not key.startswith("GANETI_POST_"))
for key in env)
- # Record environment for any post-phase hooks
- self.pre_env = env
-
elif phase == constants.HOOKS_PHASE_POST:
assert compat.all(key.startswith("GANETI_POST_") for key in env)
+ assert isinstance(self.pre_env, dict)
- if self.pre_env:
- assert not compat.any(key.startswith("GANETI_POST_")
- for key in self.pre_env)
- env.update(self.pre_env)
+ # Merge with pre-phase environment
+ assert not compat.any(key.startswith("GANETI_POST_")
+ for key in self.pre_env)
+ env.update(self.pre_env)
else:
raise AssertionError("Unknown phase '%s'" % phase)
- return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
+ return env
def _RunWrapper(self, node_list, hpath, phase, phase_env):
"""Simple wrapper over self.callfn.
- This method fixes the environment before doing the rpc call.
+ This method fixes the environment before executing the hooks.
"""
- cfg = self.lu.cfg
-
env = {
- "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
+ "PATH": constants.HOOKS_PATH,
"GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
- "GANETI_OP_CODE": self.op.OP_ID,
- "GANETI_OBJECT_TYPE": self.lu.HTYPE,
+ "GANETI_OP_CODE": self.opcode,
"GANETI_DATA_DIR": constants.DATA_DIR,
"GANETI_HOOKS_PHASE": phase,
"GANETI_HOOKS_PATH": hpath,
}
- if cfg is not None:
- env["GANETI_CLUSTER"] = cfg.GetClusterName()
- env["GANETI_MASTER"] = cfg.GetMasterNode()
+ if self.htype:
+ env["GANETI_OBJECT_TYPE"] = self.htype
+
+ if self.cluster_name is not None:
+ env["GANETI_CLUSTER"] = self.cluster_name
+
+ if self.master_name is not None:
+ env["GANETI_MASTER"] = self.master_name
if phase_env:
- assert not (set(env) & set(phase_env)), "Environment variables conflict"
- env.update(phase_env)
+ env = utils.algo.JoinDisjointDicts(env, phase_env)
# Convert everything to strings
env = dict([(str(key), str(val)) for key, val in env.iteritems()])
assert compat.all(key == "PATH" or key.startswith("GANETI_")
for key in env)
- return self.callfn(node_list, hpath, phase, env)
+ return self.hooks_execution_fn(node_list, hpath, phase, env)
def RunPhase(self, phase, nodes=None):
"""Run all the scripts for a phase.
This is the main function of the HookMaster.
+ It executes self.hooks_execution_fn, and after running
+ self.hooks_results_adapt_fn on its results it expects them to be in the form
+ {node_name: (fail_msg, [(script, result, output), ...]}).
@param phase: one of L{constants.HOOKS_PHASE_POST} or
L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
@raise errors.HooksAbort: on failure of one of the hooks
"""
- (env, node_list_pre, node_list_post) = self._BuildEnv(phase)
- if nodes is None:
- if phase == constants.HOOKS_PHASE_PRE:
- self.pre_nodes = (node_list_pre, node_list_post)
- nodes = node_list_pre
- elif phase == constants.HOOKS_PHASE_POST:
- post_nodes = (node_list_pre, node_list_post)
- assert self.pre_nodes == post_nodes, \
- ("Node lists returned for post-phase hook don't match pre-phase"
- " lists (pre %s, post %s)" % (self.pre_nodes, post_nodes))
- nodes = node_list_post
- else:
- raise AssertionError("Unknown phase '%s'" % phase)
+ if phase == constants.HOOKS_PHASE_PRE:
+ if nodes is None:
+ nodes = self.pre_nodes
+ env = self.pre_env
+ elif phase == constants.HOOKS_PHASE_POST:
+ if nodes is None:
+ nodes = self.post_nodes
+ env = self._BuildEnv(phase)
+ else:
+ raise AssertionError("Unknown phase '%s'" % phase)
if not nodes:
# empty node list, we should not attempt to run this as either
# even attempt to run, or this LU doesn't do hooks at all
return
- results = self._RunWrapper(nodes, self.lu.HPATH, phase, env)
+ results = self._RunWrapper(nodes, self.hooks_path, phase, env)
if not results:
msg = "Communication Failure"
if phase == constants.HOOKS_PHASE_PRE:
raise errors.HooksFailure(msg)
else:
- self.lu.LogWarning(msg)
+ self.log_fn(msg)
return results
+ converted_res = results
+ if self.hooks_results_adapt_fn:
+ converted_res = self.hooks_results_adapt_fn(results)
+
errs = []
- for node_name in results:
- res = results[node_name]
- if res.offline:
+ for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
+ if offline:
continue
- msg = res.fail_msg
- if msg:
- self.lu.LogWarning("Communication failure to node %s: %s",
- node_name, msg)
+ if fail_msg:
+ self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
continue
- for script, hkr, output in res.payload:
+ for script, hkr, output in hooks_results:
if hkr == constants.HKR_FAIL:
if phase == constants.HOOKS_PHASE_PRE:
errs.append((node_name, script, output))
else:
if not output:
output = "(no output)"
- self.lu.LogWarning("On %s script %s failed, output: %s" %
- (node_name, script, output))
+ self.log_fn("On %s script %s failed, output: %s" %
+ (node_name, script, output))
if errs and phase == constants.HOOKS_PHASE_PRE:
raise errors.HooksAbort(errs)
top-level LI if the configuration has been updated.
"""
- if self.pre_env is None:
- raise AssertionError("Pre-phase must be run before configuration update")
-
phase = constants.HOOKS_PHASE_POST
hpath = constants.HOOKS_NAME_CFGUPDATE
- nodes = [self.lu.cfg.GetMasterNode()]
+ nodes = [self.master_name]
self._RunWrapper(nodes, hpath, phase, self.pre_env)
+
+ @staticmethod
+ def BuildFromLu(hooks_execution_fn, lu):
+ if lu.HPATH is None:
+ nodes = (None, None)
+ else:
+ nodes = map(frozenset, lu.BuildHooksNodes())
+
+ master_name = cluster_name = None
+ if lu.cfg:
+ master_name = lu.cfg.GetMasterNode()
+ cluster_name = lu.cfg.GetClusterName()
+
+ return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
+ _RpcResultsToHooksResults, lu.BuildHooksEnv,
+ lu.LogWarning, lu.HTYPE, cluster_name, master_name)