X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/7dcf333ded48a7dcf71a1e51f93f1ecef4a20c94..80ec9f96841dadc4e1e2e6cba422372423a0d547:/lib/mcpu.py diff --git a/lib/mcpu.py b/lib/mcpu.py index 929ab1f..a7ea80c 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -31,6 +31,7 @@ are two kinds of classes defined: import logging import random import time +import itertools from ganeti import opcodes from ganeti import constants @@ -39,6 +40,7 @@ from ganeti import cmdlib from ganeti import locking from ganeti import utils from ganeti import compat +from ganeti import pathutils _OP_PREFIX = "Op" @@ -171,11 +173,81 @@ def _ComputeDispatchTable(): if op.WITH_LU) +def _SetBaseOpParams(src, defcomment, dst): + """Copies basic opcode parameters. + + @type src: L{opcodes.OpCode} + @param src: Source opcode + @type defcomment: string + @param defcomment: Comment to specify if not already given + @type dst: L{opcodes.OpCode} + @param dst: Destination opcode + + """ + if hasattr(src, "debug_level"): + dst.debug_level = src.debug_level + + if (getattr(dst, "priority", None) is None and + hasattr(src, "priority")): + dst.priority = src.priority + + if not getattr(dst, opcodes.COMMENT_ATTR, None): + dst.comment = defcomment + + +def _ProcessResult(submit_fn, op, result): + """Examines opcode result. + + If necessary, additional processing on the result is done. + + """ + if isinstance(result, cmdlib.ResultWithJobs): + # Copy basic parameters (e.g. priority) + map(compat.partial(_SetBaseOpParams, op, + "Submitted by %s" % op.OP_ID), + itertools.chain(*result.jobs)) + + # Submit jobs + job_submission = submit_fn(result.jobs) + + # Build dictionary + result = result.other + + assert constants.JOB_IDS_KEY not in result, \ + "Key '%s' found in additional return values" % constants.JOB_IDS_KEY + + result[constants.JOB_IDS_KEY] = job_submission + + return result + + +def _FailingSubmitManyJobs(_): + """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception. + + """ + raise errors.ProgrammerError("Opcodes processed without callbacks (e.g." + " queries) can not submit jobs") + + +def _RpcResultsToHooksResults(rpc_results): + """Function to convert RPC results to the format expected by HooksMaster. + + @type rpc_results: dict(node: L{rpc.RpcResult}) + @param rpc_results: RPC results + @rtype: dict(node: (fail_msg, offline, hooks_results)) + @return: RPC results unpacked according to the format expected by + L({mcpu.HooksMaster} + + """ + return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload)) + for (node, rpc_res) in rpc_results.items()) + + class Processor(object): """Object which runs OpCodes""" DISPATCH_TABLE = _ComputeDispatchTable() - def __init__(self, context, ec_id): + def __init__(self, context, ec_id, enable_locks=True): """Constructor for Processor @type context: GanetiContext @@ -189,6 +261,16 @@ class Processor(object): self._cbs = None self.rpc = context.rpc self.hmclass = HooksMaster + self._enable_locks = enable_locks + + def _CheckLocksEnabled(self): + """Checks if locking is enabled. + + @raise errors.ProgrammerError: In case locking is not enabled + + """ + if not self._enable_locks: + raise errors.ProgrammerError("Attempted to use disabled locks") def _AcquireLocks(self, level, names, shared, timeout, priority): """Acquires locks via the Ganeti lock manager. @@ -205,6 +287,8 @@ class Processor(object): amount of time """ + self._CheckLocksEnabled() + if self._cbs: self._cbs.CheckCancel() @@ -216,33 +300,14 @@ class Processor(object): return acquired - def _ProcessResult(self, result): - """Examines opcode result. - - If necessary, additional processing on the result is done. - - """ - if isinstance(result, cmdlib.ResultWithJobs): - # Submit jobs - job_submission = self._cbs.SubmitManyJobs(result.jobs) - - # Build dictionary - result = result.other - - assert constants.JOB_IDS_KEY not in result, \ - "Key '%s' found in additional return values" % constants.JOB_IDS_KEY - - result[constants.JOB_IDS_KEY] = job_submission - - return result - def _ExecLU(self, lu): """Logical Unit execution sequence. """ write_count = self.context.cfg.write_count lu.CheckPrereq() - hm = HooksMaster(self.rpc.call_hooks_runner, lu) + + hm = self.BuildHooksManager(lu) h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, self.Log, None) @@ -255,8 +320,13 @@ class Processor(object): " the operation") return lu.dry_run_result + if self._cbs: + submit_mj_fn = self._cbs.SubmitManyJobs + else: + submit_mj_fn = _FailingSubmitManyJobs + try: - result = self._ProcessResult(lu.Exec(self.Log)) + result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log)) h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, self.Log, result) @@ -267,6 +337,9 @@ class Processor(object): return result + def BuildHooksManager(self, lu): + return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu) + def _LockAndExecLU(self, lu, level, calc_timeout, priority): """Execute a Logical Unit, with the needed locks. @@ -290,6 +363,8 @@ class Processor(object): " others") elif adding_locks or acquiring_locks: + self._CheckLocksEnabled() + lu.DeclareLocks(level) share = lu.share_locks[level] @@ -360,12 +435,17 @@ class Processor(object): self._cbs = cbs try: - # Acquire the Big Ganeti Lock exclusively if this LU requires it, - # and in a shared fashion otherwise (to prevent concurrent run with - # an exclusive LU. - self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, - not lu_class.REQ_BGL, calc_timeout(), - priority) + if self._enable_locks: + # Acquire the Big Ganeti Lock exclusively if this LU requires it, + # and in a shared fashion otherwise (to prevent concurrent run with + # an exclusive LU. + self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, + not lu_class.REQ_BGL, calc_timeout(), + priority) + elif lu_class.REQ_BGL: + raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" + " disabled" % op.OP_ID) + try: lu = lu_class(self, op, self.context, self.rpc) lu.ExpandNames() @@ -378,7 +458,10 @@ class Processor(object): if self._ec_id: self.context.cfg.DropECReservations(self._ec_id) finally: - self.context.glm.release(locking.LEVEL_CLUSTER) + # Release BGL if owned + if self.context.glm.is_owned(locking.LEVEL_CLUSTER): + assert self._enable_locks + self.context.glm.release(locking.LEVEL_CLUSTER) finally: self._cbs = None @@ -386,8 +469,8 @@ class Processor(object): if not (resultcheck_fn is None or resultcheck_fn(result)): logging.error("Expected opcode result matching %s, got %s", resultcheck_fn, result) - raise errors.OpResultError("Opcode result does not match %s" % - resultcheck_fn) + raise errors.OpResultError("Opcode result does not match %s: %s" % + (resultcheck_fn, utils.Truncate(result, 80))) return result @@ -444,28 +527,53 @@ class Processor(object): class HooksMaster(object): - """Hooks master. + def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn, + hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, + cluster_name=None, master_name=None): + """Base class for hooks masters. + + This class invokes the execution of hooks according to the behaviour + specified by its parameters. + + @type opcode: string + @param opcode: opcode of the operation to which the hooks are tied + @type hooks_path: string + @param hooks_path: prefix of the hooks directories + @type nodes: 2-tuple of lists + @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be + run and nodes on which post-hooks must be run + @type hooks_execution_fn: function that accepts the following parameters: + (node_list, hooks_path, phase, environment) + @param hooks_execution_fn: function that will execute the hooks; can be + None, indicating that no conversion is necessary. + @type hooks_results_adapt_fn: function + @param hooks_results_adapt_fn: function that will adapt the return value of + hooks_execution_fn to the format expected by RunPhase + @type build_env_fn: function that returns a dictionary having strings as + keys + @param build_env_fn: function that builds the environment for the hooks + @type log_fn: function that accepts a string + @param log_fn: logging function + @type htype: string or None + @param htype: None or one of L{constants.HTYPE_CLUSTER}, + L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE} + @type cluster_name: string + @param cluster_name: name of the cluster + @type master_name: string + @param master_name: name of the master - This class distributes the run commands to the nodes based on the - specific LU class. - - In order to remove the direct dependency on the rpc module, the - constructor needs a function which actually does the remote - call. This will usually be rpc.call_hooks_runner, but any function - which behaves the same works. + """ + self.opcode = opcode + self.hooks_path = hooks_path + self.hooks_execution_fn = hooks_execution_fn + self.hooks_results_adapt_fn = hooks_results_adapt_fn + self.build_env_fn = build_env_fn + self.log_fn = log_fn + self.htype = htype + self.cluster_name = cluster_name + self.master_name = master_name - """ - def __init__(self, callfn, lu): - self.callfn = callfn - self.lu = lu - self.op = lu.op self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE) - - if self.lu.HPATH is None: - nodes = (None, None) - else: - nodes = map(frozenset, self.lu.BuildHooksNodes()) - (self.pre_nodes, self.post_nodes) = nodes def _BuildEnv(self, phase): @@ -484,12 +592,13 @@ class HooksMaster(object): env = {} - if self.lu.HPATH is not None: - lu_env = self.lu.BuildHooksEnv() - if lu_env: - assert not compat.any(key.upper().startswith(prefix) for key in lu_env) + if self.hooks_path is not None: + phase_env = self.build_env_fn() + if phase_env: + assert not compat.any(key.upper().startswith(prefix) + for key in phase_env) env.update(("%s%s" % (prefix, key), value) - for (key, value) in lu_env.items()) + for (key, value) in phase_env.items()) if phase == constants.HOOKS_PHASE_PRE: assert compat.all((key.startswith("GANETI_") and @@ -512,26 +621,26 @@ class HooksMaster(object): def _RunWrapper(self, node_list, hpath, phase, phase_env): """Simple wrapper over self.callfn. - This method fixes the environment before doing the rpc call. + This method fixes the environment before executing the hooks. """ - cfg = self.lu.cfg - env = { - "PATH": "/sbin:/bin:/usr/sbin:/usr/bin", + "PATH": constants.HOOKS_PATH, "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION, - "GANETI_OP_CODE": self.op.OP_ID, - "GANETI_DATA_DIR": constants.DATA_DIR, + "GANETI_OP_CODE": self.opcode, + "GANETI_DATA_DIR": pathutils.DATA_DIR, "GANETI_HOOKS_PHASE": phase, "GANETI_HOOKS_PATH": hpath, } - if self.lu.HTYPE: - env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE + if self.htype: + env["GANETI_OBJECT_TYPE"] = self.htype - if cfg is not None: - env["GANETI_CLUSTER"] = cfg.GetClusterName() - env["GANETI_MASTER"] = cfg.GetMasterNode() + if self.cluster_name is not None: + env["GANETI_CLUSTER"] = self.cluster_name + + if self.master_name is not None: + env["GANETI_MASTER"] = self.master_name if phase_env: env = utils.algo.JoinDisjointDicts(env, phase_env) @@ -542,12 +651,15 @@ class HooksMaster(object): assert compat.all(key == "PATH" or key.startswith("GANETI_") for key in env) - return self.callfn(node_list, hpath, phase, env) + return self.hooks_execution_fn(node_list, hpath, phase, env) def RunPhase(self, phase, nodes=None): """Run all the scripts for a phase. This is the main function of the HookMaster. + It executes self.hooks_execution_fn, and after running + self.hooks_results_adapt_fn on its results it expects them to be in the form + {node_name: (fail_msg, [(script, result, output), ...]}). @param phase: one of L{constants.HOOKS_PHASE_POST} or L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase @@ -574,36 +686,37 @@ class HooksMaster(object): # even attempt to run, or this LU doesn't do hooks at all return - results = self._RunWrapper(nodes, self.lu.HPATH, phase, env) + results = self._RunWrapper(nodes, self.hooks_path, phase, env) if not results: msg = "Communication Failure" if phase == constants.HOOKS_PHASE_PRE: raise errors.HooksFailure(msg) else: - self.lu.LogWarning(msg) + self.log_fn(msg) return results + converted_res = results + if self.hooks_results_adapt_fn: + converted_res = self.hooks_results_adapt_fn(results) + errs = [] - for node_name in results: - res = results[node_name] - if res.offline: + for node_name, (fail_msg, offline, hooks_results) in converted_res.items(): + if offline: continue - msg = res.fail_msg - if msg: - self.lu.LogWarning("Communication failure to node %s: %s", - node_name, msg) + if fail_msg: + self.log_fn("Communication failure to node %s: %s", node_name, fail_msg) continue - for script, hkr, output in res.payload: + for script, hkr, output in hooks_results: if hkr == constants.HKR_FAIL: if phase == constants.HOOKS_PHASE_PRE: errs.append((node_name, script, output)) else: if not output: output = "(no output)" - self.lu.LogWarning("On %s script %s failed, output: %s" % - (node_name, script, output)) + self.log_fn("On %s script %s failed, output: %s" % + (node_name, script, output)) if errs and phase == constants.HOOKS_PHASE_PRE: raise errors.HooksAbort(errs) @@ -619,5 +732,21 @@ class HooksMaster(object): """ phase = constants.HOOKS_PHASE_POST hpath = constants.HOOKS_NAME_CFGUPDATE - nodes = [self.lu.cfg.GetMasterNode()] + nodes = [self.master_name] self._RunWrapper(nodes, hpath, phase, self.pre_env) + + @staticmethod + def BuildFromLu(hooks_execution_fn, lu): + if lu.HPATH is None: + nodes = (None, None) + else: + nodes = map(frozenset, lu.BuildHooksNodes()) + + master_name = cluster_name = None + if lu.cfg: + master_name = lu.cfg.GetMasterNode() + cluster_name = lu.cfg.GetClusterName() + + return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn, + _RpcResultsToHooksResults, lu.BuildHooksEnv, + lu.LogWarning, lu.HTYPE, cluster_name, master_name)