X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/07e0896f6bce624453a983b2d44d557181c25b35..a85f23fa4fdb4118c5a36155103a1ba573b1340f:/lib/mcpu.py diff --git a/lib/mcpu.py b/lib/mcpu.py index 863cf9a..95f04e0 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -35,7 +35,6 @@ import time from ganeti import opcodes from ganeti import constants from ganeti import errors -from ganeti import rpc from ganeti import cmdlib from ganeti import locking from ganeti import utils @@ -56,23 +55,23 @@ def _CalculateLockAttemptTimeouts(): """Calculate timeouts for lock attempts. """ - result = [1.0] + result = [constants.LOCK_ATTEMPTS_MINWAIT] + running_sum = result[0] - # Wait for a total of at least 150s before doing a blocking acquire - while sum(result) < 150.0: + # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a + # blocking acquire + while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT: timeout = (result[-1] * 1.05) ** 1.25 - # Cap timeout at 10 seconds. This gives other jobs a chance to run - # even if we're still trying to get our locks, before finally moving - # to a blocking acquire. - if timeout > 10.0: - timeout = 10.0 - - elif timeout < 0.1: - # Lower boundary for safety - timeout = 0.1 + # Cap max timeout. This gives other jobs a chance to run even if + # we're still trying to get our locks, before finally moving to a + # blocking acquire. + timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT) + # And also cap the lower boundary for safety + timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT) result.append(timeout) + running_sum += timeout return result @@ -122,7 +121,7 @@ class LockAttemptTimeoutStrategy(object): return timeout -class OpExecCbBase: # pylint: disable-msg=W0232 +class OpExecCbBase: # pylint: disable=W0232 """Base class for OpCode execution callbacks. """ @@ -144,6 +143,14 @@ class OpExecCbBase: # pylint: disable-msg=W0232 """ + def SubmitManyJobs(self, jobs): + """Submits jobs for processing. + + See L{jqueue.JobQueue.SubmitManyJobs}. + + """ + raise NotImplementedError + def _LUNameForOpName(opname): """Computes the LU name for a given OpCode name. @@ -164,6 +171,20 @@ def _ComputeDispatchTable(): if op.WITH_LU) +def _RpcResultsToHooksResults(rpc_results): + """Function to convert RPC results to the format expected by HooksMaster. + + @type rpc_results: dict(node: L{rpc.RpcResult}) + @param rpc_results: RPC results + @rtype: dict(node: (fail_msg, offline, hooks_results)) + @return: RPC results unpacked according to the format expected by + L({mcpu.HooksMaster} + + """ + return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload)) + for (node, rpc_res) in rpc_results.items()) + + class Processor(object): """Object which runs OpCodes""" DISPATCH_TABLE = _ComputeDispatchTable() @@ -180,7 +201,7 @@ class Processor(object): self.context = context self._ec_id = ec_id self._cbs = None - self.rpc = rpc.RpcRunner(context.cfg) + self.rpc = context.rpc self.hmclass = HooksMaster def _AcquireLocks(self, level, names, shared, timeout, priority): @@ -209,13 +230,34 @@ class Processor(object): return acquired + def _ProcessResult(self, result): + """Examines opcode result. + + If necessary, additional processing on the result is done. + + """ + if isinstance(result, cmdlib.ResultWithJobs): + # Submit jobs + job_submission = self._cbs.SubmitManyJobs(result.jobs) + + # Build dictionary + result = result.other + + assert constants.JOB_IDS_KEY not in result, \ + "Key '%s' found in additional return values" % constants.JOB_IDS_KEY + + result[constants.JOB_IDS_KEY] = job_submission + + return result + def _ExecLU(self, lu): """Logical Unit execution sequence. """ write_count = self.context.cfg.write_count lu.CheckPrereq() - hm = HooksMaster(self.rpc.call_hooks_runner, lu) + + hm = self.BuildHooksManager(lu) h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, self.Log, None) @@ -229,7 +271,7 @@ class Processor(object): return lu.dry_run_result try: - result = lu.Exec(self.Log) + result = self._ProcessResult(lu.Exec(self.Log)) h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, self.Log, result) @@ -240,6 +282,9 @@ class Processor(object): return result + def BuildHooksManager(self, lu): + return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu) + def _LockAndExecLU(self, lu, level, calc_timeout, priority): """Execute a Logical Unit, with the needed locks. @@ -274,8 +319,8 @@ class Processor(object): # Acquiring locks needed_locks = lu.needed_locks[level] - acquired = self._AcquireLocks(level, needed_locks, share, - calc_timeout(), priority) + self._AcquireLocks(level, needed_locks, share, + calc_timeout(), priority) else: # Adding locks add_locks = lu.add_locks[level] @@ -289,11 +334,7 @@ class Processor(object): " with another job, who added them first" % add_locks, errors.ECODE_FAULT) - acquired = add_locks - try: - lu.acquired_locks[level] = acquired - result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) finally: if level in lu.remove_locks: @@ -324,7 +365,7 @@ class Processor(object): """ if not isinstance(op, opcodes.OpCode): raise errors.ProgrammerError("Non-opcode instance passed" - " to ExecOpcode") + " to ExecOpcode (%s)" % type(op)) lu_class = self.DISPATCH_TABLE.get(op.__class__, None) if lu_class is None: @@ -349,8 +390,8 @@ class Processor(object): assert lu.needed_locks is not None, "needed_locks not set by LU" try: - return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout, - priority) + result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout, + priority) finally: if self._ec_id: self.context.cfg.DropECReservations(self._ec_id) @@ -359,6 +400,15 @@ class Processor(object): finally: self._cbs = None + resultcheck_fn = op.OP_RESULT + if not (resultcheck_fn is None or resultcheck_fn(result)): + logging.error("Expected opcode result matching %s, got %s", + resultcheck_fn, result) + raise errors.OpResultError("Opcode result does not match %s" % + resultcheck_fn) + + return result + def Log(self, *args): """Forward call to feedback callback function. @@ -412,28 +462,53 @@ class Processor(object): class HooksMaster(object): - """Hooks master. + def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn, + hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, cluster_name=None, + master_name=None): + """Base class for hooks masters. + + This class invokes the execution of hooks according to the behaviour + specified by its parameters. + + @type opcode: string + @param opcode: opcode of the operation to which the hooks are tied + @type hooks_path: string + @param hooks_path: prefix of the hooks directories + @type nodes: 2-tuple of lists + @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be + run and nodes on which post-hooks must be run + @type hooks_execution_fn: function that accepts the following parameters: + (node_list, hooks_path, phase, environment) + @param hooks_execution_fn: function that will execute the hooks; can be + None, indicating that no conversion is necessary. + @type hooks_results_adapt_fn: function + @param hooks_results_adapt_fn: function that will adapt the return value of + hooks_execution_fn to the format expected by RunPhase + @type build_env_fn: function that returns a dictionary having strings as + keys + @param build_env_fn: function that builds the environment for the hooks + @type log_fn: function that accepts a string + @param log_fn: logging function + @type htype: string or None + @param htype: None or one of L{constants.HTYPE_CLUSTER}, + L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE} + @type cluster_name: string + @param cluster_name: name of the cluster + @type master_name: string + @param master_name: name of the master - This class distributes the run commands to the nodes based on the - specific LU class. - - In order to remove the direct dependency on the rpc module, the - constructor needs a function which actually does the remote - call. This will usually be rpc.call_hooks_runner, but any function - which behaves the same works. + """ + self.opcode = opcode + self.hooks_path = hooks_path + self.hooks_execution_fn = hooks_execution_fn + self.hooks_results_adapt_fn = hooks_results_adapt_fn + self.build_env_fn = build_env_fn + self.log_fn = log_fn + self.htype = htype + self.cluster_name = cluster_name + self.master_name = master_name - """ - def __init__(self, callfn, lu): - self.callfn = callfn - self.lu = lu - self.op = lu.op self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE) - - if self.lu.HPATH is None: - nodes = (None, None) - else: - nodes = map(frozenset, self.lu.BuildHooksNodes()) - (self.pre_nodes, self.post_nodes) = nodes def _BuildEnv(self, phase): @@ -452,12 +527,13 @@ class HooksMaster(object): env = {} - if self.lu.HPATH is not None: - lu_env = self.lu.BuildHooksEnv() - if lu_env: - assert not compat.any(key.upper().startswith(prefix) for key in lu_env) + if self.hooks_path is not None: + phase_env = self.build_env_fn() + if phase_env: + assert not compat.any(key.upper().startswith(prefix) + for key in phase_env) env.update(("%s%s" % (prefix, key), value) - for (key, value) in lu_env.items()) + for (key, value) in phase_env.items()) if phase == constants.HOOKS_PHASE_PRE: assert compat.all((key.startswith("GANETI_") and @@ -480,30 +556,29 @@ class HooksMaster(object): def _RunWrapper(self, node_list, hpath, phase, phase_env): """Simple wrapper over self.callfn. - This method fixes the environment before doing the rpc call. + This method fixes the environment before executing the hooks. """ - cfg = self.lu.cfg - env = { - "PATH": "/sbin:/bin:/usr/sbin:/usr/bin", + "PATH": constants.HOOKS_PATH, "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION, - "GANETI_OP_CODE": self.op.OP_ID, + "GANETI_OP_CODE": self.opcode, "GANETI_DATA_DIR": constants.DATA_DIR, "GANETI_HOOKS_PHASE": phase, "GANETI_HOOKS_PATH": hpath, } - if self.lu.HTYPE: - env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE + if self.htype: + env["GANETI_OBJECT_TYPE"] = self.htype - if cfg is not None: - env["GANETI_CLUSTER"] = cfg.GetClusterName() - env["GANETI_MASTER"] = cfg.GetMasterNode() + if self.cluster_name is not None: + env["GANETI_CLUSTER"] = self.cluster_name + + if self.master_name is not None: + env["GANETI_MASTER"] = self.master_name if phase_env: - assert not (set(env) & set(phase_env)), "Environment variables conflict" - env.update(phase_env) + env = utils.algo.JoinDisjointDicts(env, phase_env) # Convert everything to strings env = dict([(str(key), str(val)) for key, val in env.iteritems()]) @@ -511,12 +586,15 @@ class HooksMaster(object): assert compat.all(key == "PATH" or key.startswith("GANETI_") for key in env) - return self.callfn(node_list, hpath, phase, env) + return self.hooks_execution_fn(node_list, hpath, phase, env) def RunPhase(self, phase, nodes=None): """Run all the scripts for a phase. This is the main function of the HookMaster. + It executes self.hooks_execution_fn, and after running + self.hooks_results_adapt_fn on its results it expects them to be in the form + {node_name: (fail_msg, [(script, result, output), ...]}). @param phase: one of L{constants.HOOKS_PHASE_POST} or L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase @@ -543,36 +621,37 @@ class HooksMaster(object): # even attempt to run, or this LU doesn't do hooks at all return - results = self._RunWrapper(nodes, self.lu.HPATH, phase, env) + results = self._RunWrapper(nodes, self.hooks_path, phase, env) if not results: msg = "Communication Failure" if phase == constants.HOOKS_PHASE_PRE: raise errors.HooksFailure(msg) else: - self.lu.LogWarning(msg) + self.log_fn(msg) return results + converted_res = results + if self.hooks_results_adapt_fn: + converted_res = self.hooks_results_adapt_fn(results) + errs = [] - for node_name in results: - res = results[node_name] - if res.offline: + for node_name, (fail_msg, offline, hooks_results) in converted_res.items(): + if offline: continue - msg = res.fail_msg - if msg: - self.lu.LogWarning("Communication failure to node %s: %s", - node_name, msg) + if fail_msg: + self.log_fn("Communication failure to node %s: %s", node_name, fail_msg) continue - for script, hkr, output in res.payload: + for script, hkr, output in hooks_results: if hkr == constants.HKR_FAIL: if phase == constants.HOOKS_PHASE_PRE: errs.append((node_name, script, output)) else: if not output: output = "(no output)" - self.lu.LogWarning("On %s script %s failed, output: %s" % - (node_name, script, output)) + self.log_fn("On %s script %s failed, output: %s" % + (node_name, script, output)) if errs and phase == constants.HOOKS_PHASE_PRE: raise errors.HooksAbort(errs) @@ -588,5 +667,21 @@ class HooksMaster(object): """ phase = constants.HOOKS_PHASE_POST hpath = constants.HOOKS_NAME_CFGUPDATE - nodes = [self.lu.cfg.GetMasterNode()] + nodes = [self.master_name] self._RunWrapper(nodes, hpath, phase, self.pre_env) + + @staticmethod + def BuildFromLu(hooks_execution_fn, lu): + if lu.HPATH is None: + nodes = (None, None) + else: + nodes = map(frozenset, lu.BuildHooksNodes()) + + master_name = cluster_name = None + if lu.cfg: + master_name = lu.cfg.GetMasterNode() + cluster_name = lu.cfg.GetClusterName() + + return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn, + _RpcResultsToHooksResults, lu.BuildHooksEnv, + lu.LogWarning, lu.HTYPE, cluster_name, master_name)