X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/aa7b59acb860751297223036fbddbc6277882a45..2a6f6ef7ce23bdf1b55627785320d8086dcb77aa:/lib/mcpu.py diff --git a/lib/mcpu.py b/lib/mcpu.py index ba07b83..4498747 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -28,14 +28,17 @@ are two kinds of classes defined: """ +import sys import logging import random import time +import itertools +import traceback from ganeti import opcodes from ganeti import constants from ganeti import errors -from ganeti import rpc +from ganeti import hooksmaster from ganeti import cmdlib from ganeti import locking from ganeti import utils @@ -45,6 +48,19 @@ from ganeti import compat _OP_PREFIX = "Op" _LU_PREFIX = "LU" +#: LU classes which don't need to acquire the node allocation lock +#: (L{locking.NAL}) when they acquire all node or node resource locks +_NODE_ALLOC_WHITELIST = frozenset([]) + +#: LU classes which don't need to acquire the node allocation lock +#: (L{locking.NAL}) in the same mode (shared/exclusive) as the node +#: or node resource locks +_NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([ + cmdlib.LUBackupExport, + cmdlib.LUBackupRemove, + cmdlib.LUOobCommand, + ]) + class LockAcquireTimeout(Exception): """Exception to report timeouts on acquiring locks. @@ -139,10 +155,11 @@ class OpExecCbBase: # pylint: disable=W0232 """ - def CheckCancel(self): - """Check whether job has been cancelled. + def CurrentPriority(self): # pylint: disable=R0201 + """Returns current priority or C{None}. """ + return None def SubmitManyJobs(self, jobs): """Submits jobs for processing. @@ -172,11 +189,105 @@ def _ComputeDispatchTable(): if op.WITH_LU) +def _SetBaseOpParams(src, defcomment, dst): + """Copies basic opcode parameters. + + @type src: L{opcodes.OpCode} + @param src: Source opcode + @type defcomment: string + @param defcomment: Comment to specify if not already given + @type dst: L{opcodes.OpCode} + @param dst: Destination opcode + + """ + if hasattr(src, "debug_level"): + dst.debug_level = src.debug_level + + if (getattr(dst, "priority", None) is None and + hasattr(src, "priority")): + dst.priority = src.priority + + if not getattr(dst, opcodes.COMMENT_ATTR, None): + dst.comment = defcomment + + +def _ProcessResult(submit_fn, op, result): + """Examines opcode result. + + If necessary, additional processing on the result is done. + + """ + if isinstance(result, cmdlib.ResultWithJobs): + # Copy basic parameters (e.g. priority) + map(compat.partial(_SetBaseOpParams, op, + "Submitted by %s" % op.OP_ID), + itertools.chain(*result.jobs)) + + # Submit jobs + job_submission = submit_fn(result.jobs) + + # Build dictionary + result = result.other + + assert constants.JOB_IDS_KEY not in result, \ + "Key '%s' found in additional return values" % constants.JOB_IDS_KEY + + result[constants.JOB_IDS_KEY] = job_submission + + return result + + +def _FailingSubmitManyJobs(_): + """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception. + + """ + raise errors.ProgrammerError("Opcodes processed without callbacks (e.g." + " queries) can not submit jobs") + + +def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST, + _nal_whitelist=_NODE_ALLOC_WHITELIST): + """Performs consistency checks on locks acquired by a logical unit. + + @type lu: L{cmdlib.LogicalUnit} + @param lu: Logical unit instance + @type glm: L{locking.GanetiLockManager} + @param glm: Lock manager + + """ + if not __debug__: + return + + have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL) + + for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]: + # TODO: Verify using actual lock mode, not using LU variables + if level in lu.needed_locks: + share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC] + share_level = lu.share_locks[level] + + if lu.__class__ in _mode_whitelist: + assert share_node_alloc != share_level, \ + "LU is whitelisted to use different modes for node allocation lock" + else: + assert bool(share_node_alloc) == bool(share_level), \ + ("Node allocation lock must be acquired using the same mode as nodes" + " and node resources") + + if lu.__class__ in _nal_whitelist: + assert not have_nal, \ + "LU is whitelisted for not acquiring the node allocation lock" + elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level): + assert have_nal, \ + ("Node allocation lock must be used if an LU acquires all nodes" + " or node resources") + + class Processor(object): """Object which runs OpCodes""" DISPATCH_TABLE = _ComputeDispatchTable() - def __init__(self, context, ec_id): + def __init__(self, context, ec_id, enable_locks=True): """Constructor for Processor @type context: GanetiContext @@ -188,10 +299,20 @@ class Processor(object): self.context = context self._ec_id = ec_id self._cbs = None - self.rpc = rpc.RpcRunner(context.cfg) - self.hmclass = HooksMaster + self.rpc = context.rpc + self.hmclass = hooksmaster.HooksMaster + self._enable_locks = enable_locks + + def _CheckLocksEnabled(self): + """Checks if locking is enabled. + + @raise errors.ProgrammerError: In case locking is not enabled - def _AcquireLocks(self, level, names, shared, timeout, priority): + """ + if not self._enable_locks: + raise errors.ProgrammerError("Attempted to use disabled locks") + + def _AcquireLocks(self, level, names, shared, opportunistic, timeout): """Acquires locks via the Ganeti lock manager. @type level: int @@ -200,50 +321,38 @@ class Processor(object): @param names: Lock names @type shared: bool @param shared: Whether the locks should be acquired in shared mode + @type opportunistic: bool + @param opportunistic: Whether to acquire opportunistically @type timeout: None or float @param timeout: Timeout for acquiring the locks @raise LockAcquireTimeout: In case locks couldn't be acquired in specified amount of time """ + self._CheckLocksEnabled() + if self._cbs: - self._cbs.CheckCancel() + priority = self._cbs.CurrentPriority() + else: + priority = None acquired = self.context.glm.acquire(level, names, shared=shared, - timeout=timeout, priority=priority) + timeout=timeout, priority=priority, + opportunistic=opportunistic) if acquired is None: raise LockAcquireTimeout() return acquired - def _ProcessResult(self, result): - """Examines opcode result. - - If necessary, additional processing on the result is done. - - """ - if isinstance(result, cmdlib.ResultWithJobs): - # Submit jobs - job_submission = self._cbs.SubmitManyJobs(result.jobs) - - # Build dictionary - result = result.other - - assert constants.JOB_IDS_KEY not in result, \ - "Key '%s' found in additional return values" % constants.JOB_IDS_KEY - - result[constants.JOB_IDS_KEY] = job_submission - - return result - def _ExecLU(self, lu): """Logical Unit execution sequence. """ write_count = self.context.cfg.write_count lu.CheckPrereq() - hm = HooksMaster(self.rpc.call_hooks_runner, lu) + + hm = self.BuildHooksManager(lu) h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE) lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results, self.Log, None) @@ -256,8 +365,13 @@ class Processor(object): " the operation") return lu.dry_run_result + if self._cbs: + submit_mj_fn = self._cbs.SubmitManyJobs + else: + submit_mj_fn = _FailingSubmitManyJobs + try: - result = self._ProcessResult(lu.Exec(self.Log)) + result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log)) h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, self.Log, result) @@ -268,7 +382,10 @@ class Processor(object): return result - def _LockAndExecLU(self, lu, level, calc_timeout, priority): + def BuildHooksManager(self, lu): + return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu) + + def _LockAndExecLU(self, lu, level, calc_timeout): """Execute a Logical Unit, with the needed locks. This is a recursive function that starts locking the given level, and @@ -276,13 +393,29 @@ class Processor(object): given LU and its opcodes. """ + glm = self.context.glm adding_locks = level in lu.add_locks acquiring_locks = level in lu.needed_locks + if level not in locking.LEVELS: + _VerifyLocks(lu, glm) + if self._cbs: self._cbs.NotifyStart() - result = self._ExecLU(lu) + try: + result = self._ExecLU(lu) + except AssertionError, err: + # this is a bit ugly, as we don't know from which phase + # (prereq, exec) this comes; but it's better than an exception + # with no information + (_, _, tb) = sys.exc_info() + err_info = traceback.format_tb(tb) + del tb + logging.exception("Detected AssertionError") + raise errors.OpExecError("Internal assertion error: please report" + " this as a bug.\nError message: '%s';" + " location:\n%s" % (str(err), err_info[-1])) elif adding_locks and acquiring_locks: # We could both acquire and add locks at the same level, but for now we @@ -291,8 +424,11 @@ class Processor(object): " others") elif adding_locks or acquiring_locks: + self._CheckLocksEnabled() + lu.DeclareLocks(level) share = lu.share_locks[level] + opportunistic = lu.opportunistic_locks[level] try: assert adding_locks ^ acquiring_locks, \ @@ -302,36 +438,38 @@ class Processor(object): # Acquiring locks needed_locks = lu.needed_locks[level] - self._AcquireLocks(level, needed_locks, share, - calc_timeout(), priority) + self._AcquireLocks(level, needed_locks, share, opportunistic, + calc_timeout()) else: # Adding locks add_locks = lu.add_locks[level] lu.remove_locks[level] = add_locks try: - self.context.glm.add(level, add_locks, acquired=1, shared=share) + glm.add(level, add_locks, acquired=1, shared=share) except errors.LockError: + logging.exception("Detected lock error in level %s for locks" + " %s, shared=%s", level, add_locks, share) raise errors.OpPrereqError( - "Couldn't add locks (%s), probably because of a race condition" - " with another job, who added them first" % add_locks, - errors.ECODE_FAULT) + "Couldn't add locks (%s), most likely because of another" + " job who added them first" % add_locks, + errors.ECODE_NOTUNIQUE) try: - result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) + result = self._LockAndExecLU(lu, level + 1, calc_timeout) finally: if level in lu.remove_locks: - self.context.glm.remove(level, lu.remove_locks[level]) + glm.remove(level, lu.remove_locks[level]) finally: - if self.context.glm.is_owned(level): - self.context.glm.release(level) + if glm.is_owned(level): + glm.release(level) else: - result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) + result = self._LockAndExecLU(lu, level + 1, calc_timeout) return result - def ExecOpCode(self, op, cbs, timeout=None, priority=None): + def ExecOpCode(self, op, cbs, timeout=None): """Execute an opcode. @type op: an OpCode instance @@ -340,8 +478,6 @@ class Processor(object): @param cbs: Runtime callbacks @type timeout: float or None @param timeout: Maximum time to acquire all locks, None for no timeout - @type priority: number or None - @param priority: Priority for acquiring lock(s) @raise LockAcquireTimeout: In case locks couldn't be acquired in specified amount of time @@ -361,25 +497,32 @@ class Processor(object): self._cbs = cbs try: - # Acquire the Big Ganeti Lock exclusively if this LU requires it, - # and in a shared fashion otherwise (to prevent concurrent run with - # an exclusive LU. - self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, - not lu_class.REQ_BGL, calc_timeout(), - priority) + if self._enable_locks: + # Acquire the Big Ganeti Lock exclusively if this LU requires it, + # and in a shared fashion otherwise (to prevent concurrent run with + # an exclusive LU. + self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, + not lu_class.REQ_BGL, False, calc_timeout()) + elif lu_class.REQ_BGL: + raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" + " disabled" % op.OP_ID) + try: lu = lu_class(self, op, self.context, self.rpc) lu.ExpandNames() assert lu.needed_locks is not None, "needed_locks not set by LU" try: - result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout, - priority) + result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1, + calc_timeout) finally: if self._ec_id: self.context.cfg.DropECReservations(self._ec_id) finally: - self.context.glm.release(locking.LEVEL_CLUSTER) + # Release BGL if owned + if self.context.glm.is_owned(locking.LEVEL_CLUSTER): + assert self._enable_locks + self.context.glm.release(locking.LEVEL_CLUSTER) finally: self._cbs = None @@ -387,8 +530,12 @@ class Processor(object): if not (resultcheck_fn is None or resultcheck_fn(result)): logging.error("Expected opcode result matching %s, got %s", resultcheck_fn, result) - raise errors.OpResultError("Opcode result does not match %s" % - resultcheck_fn) + if not getattr(op, "dry_run", False): + # FIXME: LUs should still behave in dry_run mode, or + # alternately we should have OP_DRYRUN_RESULT; in the + # meantime, we simply skip the OP_RESULT check in dry-run mode + raise errors.OpResultError("Opcode result does not match %s: %s" % + (resultcheck_fn, utils.Truncate(result, 80))) return result @@ -442,184 +589,3 @@ class Processor(object): raise errors.ProgrammerError("Tried to use execution context id when" " not set") return self._ec_id - - -class HooksMaster(object): - """Hooks master. - - This class distributes the run commands to the nodes based on the - specific LU class. - - In order to remove the direct dependency on the rpc module, the - constructor needs a function which actually does the remote - call. This will usually be rpc.call_hooks_runner, but any function - which behaves the same works. - - """ - def __init__(self, callfn, lu): - self.callfn = callfn - self.lu = lu - self.op = lu.op - self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE) - - if self.lu.HPATH is None: - nodes = (None, None) - else: - nodes = map(frozenset, self.lu.BuildHooksNodes()) - - (self.pre_nodes, self.post_nodes) = nodes - - def _BuildEnv(self, phase): - """Compute the environment and the target nodes. - - Based on the opcode and the current node list, this builds the - environment for the hooks and the target node list for the run. - - """ - if phase == constants.HOOKS_PHASE_PRE: - prefix = "GANETI_" - elif phase == constants.HOOKS_PHASE_POST: - prefix = "GANETI_POST_" - else: - raise AssertionError("Unknown phase '%s'" % phase) - - env = {} - - if self.lu.HPATH is not None: - lu_env = self.lu.BuildHooksEnv() - if lu_env: - assert not compat.any(key.upper().startswith(prefix) for key in lu_env) - env.update(("%s%s" % (prefix, key), value) - for (key, value) in lu_env.items()) - - if phase == constants.HOOKS_PHASE_PRE: - assert compat.all((key.startswith("GANETI_") and - not key.startswith("GANETI_POST_")) - for key in env) - - elif phase == constants.HOOKS_PHASE_POST: - assert compat.all(key.startswith("GANETI_POST_") for key in env) - assert isinstance(self.pre_env, dict) - - # Merge with pre-phase environment - assert not compat.any(key.startswith("GANETI_POST_") - for key in self.pre_env) - env.update(self.pre_env) - else: - raise AssertionError("Unknown phase '%s'" % phase) - - return env - - def _RunWrapper(self, node_list, hpath, phase, phase_env): - """Simple wrapper over self.callfn. - - This method fixes the environment before doing the rpc call. - - """ - cfg = self.lu.cfg - - env = { - "PATH": constants.HOOKS_PATH, - "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION, - "GANETI_OP_CODE": self.op.OP_ID, - "GANETI_DATA_DIR": constants.DATA_DIR, - "GANETI_HOOKS_PHASE": phase, - "GANETI_HOOKS_PATH": hpath, - } - - if self.lu.HTYPE: - env["GANETI_OBJECT_TYPE"] = self.lu.HTYPE - - if cfg is not None: - env["GANETI_CLUSTER"] = cfg.GetClusterName() - env["GANETI_MASTER"] = cfg.GetMasterNode() - - if phase_env: - assert not (set(env) & set(phase_env)), "Environment variables conflict" - env.update(phase_env) - - # Convert everything to strings - env = dict([(str(key), str(val)) for key, val in env.iteritems()]) - - assert compat.all(key == "PATH" or key.startswith("GANETI_") - for key in env) - - return self.callfn(node_list, hpath, phase, env) - - def RunPhase(self, phase, nodes=None): - """Run all the scripts for a phase. - - This is the main function of the HookMaster. - - @param phase: one of L{constants.HOOKS_PHASE_POST} or - L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase - @param nodes: overrides the predefined list of nodes for the given phase - @return: the processed results of the hooks multi-node rpc call - @raise errors.HooksFailure: on communication failure to the nodes - @raise errors.HooksAbort: on failure of one of the hooks - - """ - if phase == constants.HOOKS_PHASE_PRE: - if nodes is None: - nodes = self.pre_nodes - env = self.pre_env - elif phase == constants.HOOKS_PHASE_POST: - if nodes is None: - nodes = self.post_nodes - env = self._BuildEnv(phase) - else: - raise AssertionError("Unknown phase '%s'" % phase) - - if not nodes: - # empty node list, we should not attempt to run this as either - # we're in the cluster init phase and the rpc client part can't - # even attempt to run, or this LU doesn't do hooks at all - return - - results = self._RunWrapper(nodes, self.lu.HPATH, phase, env) - if not results: - msg = "Communication Failure" - if phase == constants.HOOKS_PHASE_PRE: - raise errors.HooksFailure(msg) - else: - self.lu.LogWarning(msg) - return results - - errs = [] - for node_name in results: - res = results[node_name] - if res.offline: - continue - - msg = res.fail_msg - if msg: - self.lu.LogWarning("Communication failure to node %s: %s", - node_name, msg) - continue - - for script, hkr, output in res.payload: - if hkr == constants.HKR_FAIL: - if phase == constants.HOOKS_PHASE_PRE: - errs.append((node_name, script, output)) - else: - if not output: - output = "(no output)" - self.lu.LogWarning("On %s script %s failed, output: %s" % - (node_name, script, output)) - - if errs and phase == constants.HOOKS_PHASE_PRE: - raise errors.HooksAbort(errs) - - return results - - def RunConfigUpdate(self): - """Run the special configuration update hook - - This is a special hook that runs only on the master after each - top-level LI if the configuration has been updated. - - """ - phase = constants.HOOKS_PHASE_POST - hpath = constants.HOOKS_NAME_CFGUPDATE - nodes = [self.lu.cfg.GetMasterNode()] - self._RunWrapper(nodes, hpath, phase, self.pre_env)