#
#
-# Copyright (C) 2006, 2007, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
"""
+import sys
import logging
import random
import time
+import itertools
+import traceback
from ganeti import opcodes
from ganeti import constants
from ganeti import locking
from ganeti import utils
from ganeti import compat
+from ganeti import pathutils
_OP_PREFIX = "Op"
"""
- def CheckCancel(self):
- """Check whether job has been cancelled.
+ def CurrentPriority(self): # pylint: disable=R0201
+ """Returns current priority or C{None}.
"""
+ return None
def SubmitManyJobs(self, jobs):
"""Submits jobs for processing.
if op.WITH_LU)
+def _SetBaseOpParams(src, defcomment, dst):
+ """Copies basic opcode parameters.
+
+ @type src: L{opcodes.OpCode}
+ @param src: Source opcode
+ @type defcomment: string
+ @param defcomment: Comment to specify if not already given
+ @type dst: L{opcodes.OpCode}
+ @param dst: Destination opcode
+
+ """
+ if hasattr(src, "debug_level"):
+ dst.debug_level = src.debug_level
+
+ if (getattr(dst, "priority", None) is None and
+ hasattr(src, "priority")):
+ dst.priority = src.priority
+
+ if not getattr(dst, opcodes.COMMENT_ATTR, None):
+ dst.comment = defcomment
+
+
+def _ProcessResult(submit_fn, op, result):
+ """Examines opcode result.
+
+ If necessary, additional processing on the result is done.
+
+ """
+ if isinstance(result, cmdlib.ResultWithJobs):
+ # Copy basic parameters (e.g. priority)
+ map(compat.partial(_SetBaseOpParams, op,
+ "Submitted by %s" % op.OP_ID),
+ itertools.chain(*result.jobs))
+
+ # Submit jobs
+ job_submission = submit_fn(result.jobs)
+
+ # Build dictionary
+ result = result.other
+
+ assert constants.JOB_IDS_KEY not in result, \
+ "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
+
+ result[constants.JOB_IDS_KEY] = job_submission
+
+ return result
+
+
+def _FailingSubmitManyJobs(_):
+ """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
+
+ """
+ raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
+ " queries) can not submit jobs")
+
+
def _RpcResultsToHooksResults(rpc_results):
"""Function to convert RPC results to the format expected by HooksMaster.
"""Object which runs OpCodes"""
DISPATCH_TABLE = _ComputeDispatchTable()
- def __init__(self, context, ec_id):
+ def __init__(self, context, ec_id, enable_locks=True):
"""Constructor for Processor
@type context: GanetiContext
self._cbs = None
self.rpc = context.rpc
self.hmclass = HooksMaster
+ self._enable_locks = enable_locks
- def _AcquireLocks(self, level, names, shared, timeout, priority):
+ def _CheckLocksEnabled(self):
+ """Checks if locking is enabled.
+
+ @raise errors.ProgrammerError: In case locking is not enabled
+
+ """
+ if not self._enable_locks:
+ raise errors.ProgrammerError("Attempted to use disabled locks")
+
+ def _AcquireLocks(self, level, names, shared, timeout):
"""Acquires locks via the Ganeti lock manager.
@type level: int
amount of time
"""
+ self._CheckLocksEnabled()
+
if self._cbs:
- self._cbs.CheckCancel()
+ priority = self._cbs.CurrentPriority()
+ else:
+ priority = None
acquired = self.context.glm.acquire(level, names, shared=shared,
timeout=timeout, priority=priority)
return acquired
- def _ProcessResult(self, result):
- """Examines opcode result.
-
- If necessary, additional processing on the result is done.
-
- """
- if isinstance(result, cmdlib.ResultWithJobs):
- # Submit jobs
- job_submission = self._cbs.SubmitManyJobs(result.jobs)
-
- # Build dictionary
- result = result.other
-
- assert constants.JOB_IDS_KEY not in result, \
- "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
-
- result[constants.JOB_IDS_KEY] = job_submission
-
- return result
-
def _ExecLU(self, lu):
"""Logical Unit execution sequence.
" the operation")
return lu.dry_run_result
+ if self._cbs:
+ submit_mj_fn = self._cbs.SubmitManyJobs
+ else:
+ submit_mj_fn = _FailingSubmitManyJobs
+
try:
- result = self._ProcessResult(lu.Exec(self.Log))
+ result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
self.Log, result)
def BuildHooksManager(self, lu):
return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
- def _LockAndExecLU(self, lu, level, calc_timeout, priority):
+ def _LockAndExecLU(self, lu, level, calc_timeout):
"""Execute a Logical Unit, with the needed locks.
This is a recursive function that starts locking the given level, and
if self._cbs:
self._cbs.NotifyStart()
- result = self._ExecLU(lu)
+ try:
+ result = self._ExecLU(lu)
+ except AssertionError, err:
+ # this is a bit ugly, as we don't know from which phase
+ # (prereq, exec) this comes; but it's better than an exception
+ # with no information
+ (_, _, tb) = sys.exc_info()
+ err_info = traceback.format_tb(tb)
+ del tb
+ logging.exception("Detected AssertionError")
+ raise errors.OpExecError("Internal assertion error: please report"
+ " this as a bug.\nError message: '%s';"
+ " location:\n%s" % (str(err), err_info[-1]))
elif adding_locks and acquiring_locks:
# We could both acquire and add locks at the same level, but for now we
" others")
elif adding_locks or acquiring_locks:
+ self._CheckLocksEnabled()
+
lu.DeclareLocks(level)
share = lu.share_locks[level]
needed_locks = lu.needed_locks[level]
self._AcquireLocks(level, needed_locks, share,
- calc_timeout(), priority)
+ calc_timeout())
else:
# Adding locks
add_locks = lu.add_locks[level]
try:
self.context.glm.add(level, add_locks, acquired=1, shared=share)
except errors.LockError:
+ logging.exception("Detected lock error in level %s for locks"
+ " %s, shared=%s", level, add_locks, share)
raise errors.OpPrereqError(
- "Couldn't add locks (%s), probably because of a race condition"
- " with another job, who added them first" % add_locks,
- errors.ECODE_FAULT)
+ "Couldn't add locks (%s), most likely because of another"
+ " job who added them first" % add_locks,
+ errors.ECODE_NOTUNIQUE)
try:
- result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
+ result = self._LockAndExecLU(lu, level + 1, calc_timeout)
finally:
if level in lu.remove_locks:
self.context.glm.remove(level, lu.remove_locks[level])
self.context.glm.release(level)
else:
- result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
+ result = self._LockAndExecLU(lu, level + 1, calc_timeout)
return result
- def ExecOpCode(self, op, cbs, timeout=None, priority=None):
+ def ExecOpCode(self, op, cbs, timeout=None):
"""Execute an opcode.
@type op: an OpCode instance
@param cbs: Runtime callbacks
@type timeout: float or None
@param timeout: Maximum time to acquire all locks, None for no timeout
- @type priority: number or None
- @param priority: Priority for acquiring lock(s)
@raise LockAcquireTimeout: In case locks couldn't be acquired in specified
amount of time
self._cbs = cbs
try:
- # Acquire the Big Ganeti Lock exclusively if this LU requires it,
- # and in a shared fashion otherwise (to prevent concurrent run with
- # an exclusive LU.
- self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
- not lu_class.REQ_BGL, calc_timeout(),
- priority)
+ if self._enable_locks:
+ # Acquire the Big Ganeti Lock exclusively if this LU requires it,
+ # and in a shared fashion otherwise (to prevent concurrent run with
+ # an exclusive LU.
+ self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
+ not lu_class.REQ_BGL, calc_timeout())
+ elif lu_class.REQ_BGL:
+ raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
+ " disabled" % op.OP_ID)
+
try:
lu = lu_class(self, op, self.context, self.rpc)
lu.ExpandNames()
assert lu.needed_locks is not None, "needed_locks not set by LU"
try:
- result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
- priority)
+ result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
+ calc_timeout)
finally:
if self._ec_id:
self.context.cfg.DropECReservations(self._ec_id)
finally:
- self.context.glm.release(locking.LEVEL_CLUSTER)
+ # Release BGL if owned
+ if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
+ assert self._enable_locks
+ self.context.glm.release(locking.LEVEL_CLUSTER)
finally:
self._cbs = None
if not (resultcheck_fn is None or resultcheck_fn(result)):
logging.error("Expected opcode result matching %s, got %s",
resultcheck_fn, result)
- raise errors.OpResultError("Opcode result does not match %s, got %s" %
- (resultcheck_fn, result[:80]))
+ raise errors.OpResultError("Opcode result does not match %s: %s" %
+ (resultcheck_fn, utils.Truncate(result, 80)))
return result
class HooksMaster(object):
def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
- hooks_results_adapt_fn, build_env_fn, log_fn, htype=None, cluster_name=None,
- master_name=None):
+ hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
+ cluster_name=None, master_name=None):
"""Base class for hooks masters.
This class invokes the execution of hooks according to the behaviour
"PATH": constants.HOOKS_PATH,
"GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
"GANETI_OP_CODE": self.opcode,
- "GANETI_DATA_DIR": constants.DATA_DIR,
+ "GANETI_DATA_DIR": pathutils.DATA_DIR,
"GANETI_HOOKS_PHASE": phase,
"GANETI_HOOKS_PATH": hpath,
}