from ganeti import opcodes
from ganeti import constants
from ganeti import errors
+from ganeti import hooksmaster
from ganeti import cmdlib
from ganeti import locking
from ganeti import utils
from ganeti import compat
-from ganeti import pathutils
_OP_PREFIX = "Op"
_LU_PREFIX = "LU"
+#: LU classes which don't need to acquire the node allocation lock
+#: (L{locking.NAL}) when they acquire all node or node resource locks
+_NODE_ALLOC_WHITELIST = frozenset([])
+
+#: LU classes which don't need to acquire the node allocation lock
+#: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
+#: or node resource locks
+_NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
+ cmdlib.LUBackupExport,
+ cmdlib.LUBackupRemove,
+ cmdlib.LUOobCommand,
+ ])
+
class LockAcquireTimeout(Exception):
"""Exception to report timeouts on acquiring locks.
" queries) can not submit jobs")
-def _RpcResultsToHooksResults(rpc_results):
- """Function to convert RPC results to the format expected by HooksMaster.
+def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
+ _nal_whitelist=_NODE_ALLOC_WHITELIST):
+ """Performs consistency checks on locks acquired by a logical unit.
- @type rpc_results: dict(node: L{rpc.RpcResult})
- @param rpc_results: RPC results
- @rtype: dict(node: (fail_msg, offline, hooks_results))
- @return: RPC results unpacked according to the format expected by
- L({mcpu.HooksMaster}
+ @type lu: L{cmdlib.LogicalUnit}
+ @param lu: Logical unit instance
+ @type glm: L{locking.GanetiLockManager}
+ @param glm: Lock manager
"""
- return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
- for (node, rpc_res) in rpc_results.items())
+ if not __debug__:
+ return
+
+ have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
+
+ for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
+ # TODO: Verify using actual lock mode, not using LU variables
+ if level in lu.needed_locks:
+ share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
+ share_level = lu.share_locks[level]
+
+ if lu.__class__ in _mode_whitelist:
+ assert share_node_alloc != share_level, \
+ "LU is whitelisted to use different modes for node allocation lock"
+ else:
+ assert bool(share_node_alloc) == bool(share_level), \
+ ("Node allocation lock must be acquired using the same mode as nodes"
+ " and node resources")
+
+ if lu.__class__ in _nal_whitelist:
+ assert not have_nal, \
+ "LU is whitelisted for not acquiring the node allocation lock"
+ elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
+ assert have_nal, \
+ ("Node allocation lock must be used if an LU acquires all nodes"
+ " or node resources")
class Processor(object):
self._ec_id = ec_id
self._cbs = None
self.rpc = context.rpc
- self.hmclass = HooksMaster
+ self.hmclass = hooksmaster.HooksMaster
self._enable_locks = enable_locks
def _CheckLocksEnabled(self):
if not self._enable_locks:
raise errors.ProgrammerError("Attempted to use disabled locks")
- def _AcquireLocks(self, level, names, shared, timeout):
+ def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
"""Acquires locks via the Ganeti lock manager.
@type level: int
@param names: Lock names
@type shared: bool
@param shared: Whether the locks should be acquired in shared mode
+ @type opportunistic: bool
+ @param opportunistic: Whether to acquire opportunistically
@type timeout: None or float
@param timeout: Timeout for acquiring the locks
@raise LockAcquireTimeout: In case locks couldn't be acquired in specified
priority = None
acquired = self.context.glm.acquire(level, names, shared=shared,
- timeout=timeout, priority=priority)
+ timeout=timeout, priority=priority,
+ opportunistic=opportunistic)
if acquired is None:
raise LockAcquireTimeout()
given LU and its opcodes.
"""
+ glm = self.context.glm
adding_locks = level in lu.add_locks
acquiring_locks = level in lu.needed_locks
+
if level not in locking.LEVELS:
+ _VerifyLocks(lu, glm)
+
if self._cbs:
self._cbs.NotifyStart()
lu.DeclareLocks(level)
share = lu.share_locks[level]
+ opportunistic = lu.opportunistic_locks[level]
try:
assert adding_locks ^ acquiring_locks, \
# Acquiring locks
needed_locks = lu.needed_locks[level]
- self._AcquireLocks(level, needed_locks, share,
+ self._AcquireLocks(level, needed_locks, share, opportunistic,
calc_timeout())
else:
# Adding locks
lu.remove_locks[level] = add_locks
try:
- self.context.glm.add(level, add_locks, acquired=1, shared=share)
+ glm.add(level, add_locks, acquired=1, shared=share)
except errors.LockError:
logging.exception("Detected lock error in level %s for locks"
" %s, shared=%s", level, add_locks, share)
result = self._LockAndExecLU(lu, level + 1, calc_timeout)
finally:
if level in lu.remove_locks:
- self.context.glm.remove(level, lu.remove_locks[level])
+ glm.remove(level, lu.remove_locks[level])
finally:
- if self.context.glm.is_owned(level):
- self.context.glm.release(level)
+ if glm.is_owned(level):
+ glm.release(level)
else:
result = self._LockAndExecLU(lu, level + 1, calc_timeout)
# and in a shared fashion otherwise (to prevent concurrent run with
# an exclusive LU.
self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
- not lu_class.REQ_BGL, calc_timeout())
+ not lu_class.REQ_BGL, False, calc_timeout())
elif lu_class.REQ_BGL:
raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
" disabled" % op.OP_ID)
if not (resultcheck_fn is None or resultcheck_fn(result)):
logging.error("Expected opcode result matching %s, got %s",
resultcheck_fn, result)
- raise errors.OpResultError("Opcode result does not match %s: %s" %
- (resultcheck_fn, utils.Truncate(result, 80)))
+ if not getattr(op, "dry_run", False):
+ # FIXME: LUs should still behave in dry_run mode, or
+ # alternately we should have OP_DRYRUN_RESULT; in the
+ # meantime, we simply skip the OP_RESULT check in dry-run mode
+ raise errors.OpResultError("Opcode result does not match %s: %s" %
+ (resultcheck_fn, utils.Truncate(result, 80)))
return result
raise errors.ProgrammerError("Tried to use execution context id when"
" not set")
return self._ec_id
-
-
-class HooksMaster(object):
- def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
- hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
- cluster_name=None, master_name=None):
- """Base class for hooks masters.
-
- This class invokes the execution of hooks according to the behaviour
- specified by its parameters.
-
- @type opcode: string
- @param opcode: opcode of the operation to which the hooks are tied
- @type hooks_path: string
- @param hooks_path: prefix of the hooks directories
- @type nodes: 2-tuple of lists
- @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
- run and nodes on which post-hooks must be run
- @type hooks_execution_fn: function that accepts the following parameters:
- (node_list, hooks_path, phase, environment)
- @param hooks_execution_fn: function that will execute the hooks; can be
- None, indicating that no conversion is necessary.
- @type hooks_results_adapt_fn: function
- @param hooks_results_adapt_fn: function that will adapt the return value of
- hooks_execution_fn to the format expected by RunPhase
- @type build_env_fn: function that returns a dictionary having strings as
- keys
- @param build_env_fn: function that builds the environment for the hooks
- @type log_fn: function that accepts a string
- @param log_fn: logging function
- @type htype: string or None
- @param htype: None or one of L{constants.HTYPE_CLUSTER},
- L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
- @type cluster_name: string
- @param cluster_name: name of the cluster
- @type master_name: string
- @param master_name: name of the master
-
- """
- self.opcode = opcode
- self.hooks_path = hooks_path
- self.hooks_execution_fn = hooks_execution_fn
- self.hooks_results_adapt_fn = hooks_results_adapt_fn
- self.build_env_fn = build_env_fn
- self.log_fn = log_fn
- self.htype = htype
- self.cluster_name = cluster_name
- self.master_name = master_name
-
- self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
- (self.pre_nodes, self.post_nodes) = nodes
-
- def _BuildEnv(self, phase):
- """Compute the environment and the target nodes.
-
- Based on the opcode and the current node list, this builds the
- environment for the hooks and the target node list for the run.
-
- """
- if phase == constants.HOOKS_PHASE_PRE:
- prefix = "GANETI_"
- elif phase == constants.HOOKS_PHASE_POST:
- prefix = "GANETI_POST_"
- else:
- raise AssertionError("Unknown phase '%s'" % phase)
-
- env = {}
-
- if self.hooks_path is not None:
- phase_env = self.build_env_fn()
- if phase_env:
- assert not compat.any(key.upper().startswith(prefix)
- for key in phase_env)
- env.update(("%s%s" % (prefix, key), value)
- for (key, value) in phase_env.items())
-
- if phase == constants.HOOKS_PHASE_PRE:
- assert compat.all((key.startswith("GANETI_") and
- not key.startswith("GANETI_POST_"))
- for key in env)
-
- elif phase == constants.HOOKS_PHASE_POST:
- assert compat.all(key.startswith("GANETI_POST_") for key in env)
- assert isinstance(self.pre_env, dict)
-
- # Merge with pre-phase environment
- assert not compat.any(key.startswith("GANETI_POST_")
- for key in self.pre_env)
- env.update(self.pre_env)
- else:
- raise AssertionError("Unknown phase '%s'" % phase)
-
- return env
-
- def _RunWrapper(self, node_list, hpath, phase, phase_env):
- """Simple wrapper over self.callfn.
-
- This method fixes the environment before executing the hooks.
-
- """
- env = {
- "PATH": constants.HOOKS_PATH,
- "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
- "GANETI_OP_CODE": self.opcode,
- "GANETI_DATA_DIR": pathutils.DATA_DIR,
- "GANETI_HOOKS_PHASE": phase,
- "GANETI_HOOKS_PATH": hpath,
- }
-
- if self.htype:
- env["GANETI_OBJECT_TYPE"] = self.htype
-
- if self.cluster_name is not None:
- env["GANETI_CLUSTER"] = self.cluster_name
-
- if self.master_name is not None:
- env["GANETI_MASTER"] = self.master_name
-
- if phase_env:
- env = utils.algo.JoinDisjointDicts(env, phase_env)
-
- # Convert everything to strings
- env = dict([(str(key), str(val)) for key, val in env.iteritems()])
-
- assert compat.all(key == "PATH" or key.startswith("GANETI_")
- for key in env)
-
- return self.hooks_execution_fn(node_list, hpath, phase, env)
-
- def RunPhase(self, phase, nodes=None):
- """Run all the scripts for a phase.
-
- This is the main function of the HookMaster.
- It executes self.hooks_execution_fn, and after running
- self.hooks_results_adapt_fn on its results it expects them to be in the form
- {node_name: (fail_msg, [(script, result, output), ...]}).
-
- @param phase: one of L{constants.HOOKS_PHASE_POST} or
- L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
- @param nodes: overrides the predefined list of nodes for the given phase
- @return: the processed results of the hooks multi-node rpc call
- @raise errors.HooksFailure: on communication failure to the nodes
- @raise errors.HooksAbort: on failure of one of the hooks
-
- """
- if phase == constants.HOOKS_PHASE_PRE:
- if nodes is None:
- nodes = self.pre_nodes
- env = self.pre_env
- elif phase == constants.HOOKS_PHASE_POST:
- if nodes is None:
- nodes = self.post_nodes
- env = self._BuildEnv(phase)
- else:
- raise AssertionError("Unknown phase '%s'" % phase)
-
- if not nodes:
- # empty node list, we should not attempt to run this as either
- # we're in the cluster init phase and the rpc client part can't
- # even attempt to run, or this LU doesn't do hooks at all
- return
-
- results = self._RunWrapper(nodes, self.hooks_path, phase, env)
- if not results:
- msg = "Communication Failure"
- if phase == constants.HOOKS_PHASE_PRE:
- raise errors.HooksFailure(msg)
- else:
- self.log_fn(msg)
- return results
-
- converted_res = results
- if self.hooks_results_adapt_fn:
- converted_res = self.hooks_results_adapt_fn(results)
-
- errs = []
- for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
- if offline:
- continue
-
- if fail_msg:
- self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
- continue
-
- for script, hkr, output in hooks_results:
- if hkr == constants.HKR_FAIL:
- if phase == constants.HOOKS_PHASE_PRE:
- errs.append((node_name, script, output))
- else:
- if not output:
- output = "(no output)"
- self.log_fn("On %s script %s failed, output: %s" %
- (node_name, script, output))
-
- if errs and phase == constants.HOOKS_PHASE_PRE:
- raise errors.HooksAbort(errs)
-
- return results
-
- def RunConfigUpdate(self):
- """Run the special configuration update hook
-
- This is a special hook that runs only on the master after each
- top-level LI if the configuration has been updated.
-
- """
- phase = constants.HOOKS_PHASE_POST
- hpath = constants.HOOKS_NAME_CFGUPDATE
- nodes = [self.master_name]
- self._RunWrapper(nodes, hpath, phase, self.pre_env)
-
- @staticmethod
- def BuildFromLu(hooks_execution_fn, lu):
- if lu.HPATH is None:
- nodes = (None, None)
- else:
- nodes = map(frozenset, lu.BuildHooksNodes())
-
- master_name = cluster_name = None
- if lu.cfg:
- master_name = lu.cfg.GetMasterNode()
- cluster_name = lu.cfg.GetClusterName()
-
- return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
- _RpcResultsToHooksResults, lu.BuildHooksEnv,
- lu.LogWarning, lu.HTYPE, cluster_name, master_name)