4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
38 from ganeti import opcodes
39 from ganeti import opcodes_base
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import hooksmaster
43 from ganeti import cmdlib
44 from ganeti import locking
45 from ganeti import utils
46 from ganeti import compat
52 #: LU classes which don't need to acquire the node allocation lock
53 #: (L{locking.NAL}) when they acquire all node or node resource locks
54 _NODE_ALLOC_WHITELIST = frozenset([])
56 #: LU classes which don't need to acquire the node allocation lock
57 #: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
58 #: or node resource locks
59 _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
60 cmdlib.LUBackupExport,
61 cmdlib.LUBackupRemove,
66 class LockAcquireTimeout(Exception):
67 """Exception to report timeouts on acquiring locks.
72 def _CalculateLockAttemptTimeouts():
73 """Calculate timeouts for lock attempts.
76 result = [constants.LOCK_ATTEMPTS_MINWAIT]
77 running_sum = result[0]
79 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
81 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
82 timeout = (result[-1] * 1.05) ** 1.25
84 # Cap max timeout. This gives other jobs a chance to run even if
85 # we're still trying to get our locks, before finally moving to a
87 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
88 # And also cap the lower boundary for safety
89 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
91 result.append(timeout)
92 running_sum += timeout
97 class LockAttemptTimeoutStrategy(object):
98 """Class with lock acquire timeout strategy.
107 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
109 def __init__(self, _time_fn=time.time, _random_fn=random.random):
110 """Initializes this class.
112 @param _time_fn: Time function for unittests
113 @param _random_fn: Random number generator for unittests
116 object.__init__(self)
118 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
119 self._time_fn = _time_fn
120 self._random_fn = _random_fn
122 def NextAttempt(self):
123 """Returns the timeout for the next attempt.
127 timeout = self._timeouts.next()
128 except StopIteration:
129 # No more timeouts, do blocking acquire
132 if timeout is not None:
133 # Add a small variation (-/+ 5%) to timeout. This helps in situations
134 # where two or more jobs are fighting for the same lock(s).
135 variation_range = timeout * 0.1
136 timeout += ((self._random_fn() * variation_range) -
137 (variation_range * 0.5))
142 class OpExecCbBase: # pylint: disable=W0232
143 """Base class for OpCode execution callbacks.
146 def NotifyStart(self):
147 """Called when we are about to execute the LU.
149 This function is called when we're about to start the lu's Exec() method,
150 that is, after we have acquired all locks.
154 def Feedback(self, *args):
155 """Sends feedback from the LU code to the end-user.
159 def CurrentPriority(self): # pylint: disable=R0201
160 """Returns current priority or C{None}.
165 def SubmitManyJobs(self, jobs):
166 """Submits jobs for processing.
168 See L{jqueue.JobQueue.SubmitManyJobs}.
171 raise NotImplementedError
174 def _LUNameForOpName(opname):
175 """Computes the LU name for a given OpCode name.
178 assert opname.startswith(_OP_PREFIX), \
179 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
181 return _LU_PREFIX + opname[len(_OP_PREFIX):]
184 def _ComputeDispatchTable():
185 """Computes the opcode-to-lu dispatch table.
188 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
189 for op in opcodes.OP_MAPPING.values()
193 def _SetBaseOpParams(src, defcomment, dst):
194 """Copies basic opcode parameters.
196 @type src: L{opcodes.OpCode}
197 @param src: Source opcode
198 @type defcomment: string
199 @param defcomment: Comment to specify if not already given
200 @type dst: L{opcodes.OpCode}
201 @param dst: Destination opcode
204 if hasattr(src, "debug_level"):
205 dst.debug_level = src.debug_level
207 if (getattr(dst, "priority", None) is None and
208 hasattr(src, "priority")):
209 dst.priority = src.priority
211 if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
212 dst.comment = defcomment
215 def _ProcessResult(submit_fn, op, result):
216 """Examines opcode result.
218 If necessary, additional processing on the result is done.
221 if isinstance(result, cmdlib.ResultWithJobs):
222 # Copy basic parameters (e.g. priority)
223 map(compat.partial(_SetBaseOpParams, op,
224 "Submitted by %s" % op.OP_ID),
225 itertools.chain(*result.jobs))
228 job_submission = submit_fn(result.jobs)
231 result = result.other
233 assert constants.JOB_IDS_KEY not in result, \
234 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
236 result[constants.JOB_IDS_KEY] = job_submission
241 def _FailingSubmitManyJobs(_):
242 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
245 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
246 " queries) can not submit jobs")
249 def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
250 _nal_whitelist=_NODE_ALLOC_WHITELIST):
251 """Performs consistency checks on locks acquired by a logical unit.
253 @type lu: L{cmdlib.LogicalUnit}
254 @param lu: Logical unit instance
255 @type glm: L{locking.GanetiLockManager}
256 @param glm: Lock manager
262 have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
264 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
265 # TODO: Verify using actual lock mode, not using LU variables
266 if level in lu.needed_locks:
267 share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
268 share_level = lu.share_locks[level]
270 if lu.__class__ in _mode_whitelist:
271 assert share_node_alloc != share_level, \
272 "LU is whitelisted to use different modes for node allocation lock"
274 assert bool(share_node_alloc) == bool(share_level), \
275 ("Node allocation lock must be acquired using the same mode as nodes"
276 " and node resources")
278 if lu.__class__ in _nal_whitelist:
279 assert not have_nal, \
280 "LU is whitelisted for not acquiring the node allocation lock"
281 elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
283 ("Node allocation lock must be used if an LU acquires all nodes"
284 " or node resources")
287 class Processor(object):
288 """Object which runs OpCodes"""
289 DISPATCH_TABLE = _ComputeDispatchTable()
291 def __init__(self, context, ec_id, enable_locks=True):
292 """Constructor for Processor
294 @type context: GanetiContext
295 @param context: global Ganeti context
297 @param ec_id: execution context identifier
300 self.context = context
303 self.rpc = context.rpc
304 self.hmclass = hooksmaster.HooksMaster
305 self._enable_locks = enable_locks
307 def _CheckLocksEnabled(self):
308 """Checks if locking is enabled.
310 @raise errors.ProgrammerError: In case locking is not enabled
313 if not self._enable_locks:
314 raise errors.ProgrammerError("Attempted to use disabled locks")
316 def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
317 """Acquires locks via the Ganeti lock manager.
320 @param level: Lock level
321 @type names: list or string
322 @param names: Lock names
324 @param shared: Whether the locks should be acquired in shared mode
325 @type opportunistic: bool
326 @param opportunistic: Whether to acquire opportunistically
327 @type timeout: None or float
328 @param timeout: Timeout for acquiring the locks
329 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
333 self._CheckLocksEnabled()
336 priority = self._cbs.CurrentPriority()
340 acquired = self.context.glm.acquire(level, names, shared=shared,
341 timeout=timeout, priority=priority,
342 opportunistic=opportunistic)
345 raise LockAcquireTimeout()
349 def _ExecLU(self, lu):
350 """Logical Unit execution sequence.
353 write_count = self.context.cfg.write_count
356 hm = self.BuildHooksManager(lu)
357 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
358 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
361 if getattr(lu.op, "dry_run", False):
362 # in this mode, no post-hooks are run, and the config is not
363 # written (as it might have been modified by another LU, and we
364 # shouldn't do writeout on behalf of other threads
365 self.LogInfo("dry-run mode requested, not actually executing"
367 return lu.dry_run_result
370 submit_mj_fn = self._cbs.SubmitManyJobs
372 submit_mj_fn = _FailingSubmitManyJobs
375 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
376 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
377 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
380 # FIXME: This needs locks if not lu_class.REQ_BGL
381 if write_count != self.context.cfg.write_count:
386 def BuildHooksManager(self, lu):
387 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
389 def _LockAndExecLU(self, lu, level, calc_timeout):
390 """Execute a Logical Unit, with the needed locks.
392 This is a recursive function that starts locking the given level, and
393 proceeds up, till there are no more locks to acquire. Then it executes the
394 given LU and its opcodes.
397 glm = self.context.glm
398 adding_locks = level in lu.add_locks
399 acquiring_locks = level in lu.needed_locks
401 if level not in locking.LEVELS:
402 _VerifyLocks(lu, glm)
405 self._cbs.NotifyStart()
408 result = self._ExecLU(lu)
409 except AssertionError, err:
410 # this is a bit ugly, as we don't know from which phase
411 # (prereq, exec) this comes; but it's better than an exception
412 # with no information
413 (_, _, tb) = sys.exc_info()
414 err_info = traceback.format_tb(tb)
416 logging.exception("Detected AssertionError")
417 raise errors.OpExecError("Internal assertion error: please report"
418 " this as a bug.\nError message: '%s';"
419 " location:\n%s" % (str(err), err_info[-1]))
421 elif adding_locks and acquiring_locks:
422 # We could both acquire and add locks at the same level, but for now we
423 # don't need this, so we'll avoid the complicated code needed.
424 raise NotImplementedError("Can't declare locks to acquire when adding"
427 elif adding_locks or acquiring_locks:
428 self._CheckLocksEnabled()
430 lu.DeclareLocks(level)
431 share = lu.share_locks[level]
432 opportunistic = lu.opportunistic_locks[level]
435 assert adding_locks ^ acquiring_locks, \
436 "Locks must be either added or acquired"
440 needed_locks = lu.needed_locks[level]
442 self._AcquireLocks(level, needed_locks, share, opportunistic,
446 add_locks = lu.add_locks[level]
447 lu.remove_locks[level] = add_locks
450 glm.add(level, add_locks, acquired=1, shared=share)
451 except errors.LockError:
452 logging.exception("Detected lock error in level %s for locks"
453 " %s, shared=%s", level, add_locks, share)
454 raise errors.OpPrereqError(
455 "Couldn't add locks (%s), most likely because of another"
456 " job who added them first" % add_locks,
457 errors.ECODE_NOTUNIQUE)
460 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
462 if level in lu.remove_locks:
463 glm.remove(level, lu.remove_locks[level])
465 if glm.is_owned(level):
469 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
473 # pylint: disable=R0201
474 def _CheckLUResult(self, op, result):
475 """Check the LU result against the contract in the opcode.
478 resultcheck_fn = op.OP_RESULT
479 if not (resultcheck_fn is None or resultcheck_fn(result)):
480 logging.error("Expected opcode result matching %s, got %s",
481 resultcheck_fn, result)
482 if not getattr(op, "dry_run", False):
483 # FIXME: LUs should still behave in dry_run mode, or
484 # alternately we should have OP_DRYRUN_RESULT; in the
485 # meantime, we simply skip the OP_RESULT check in dry-run mode
486 raise errors.OpResultError("Opcode result does not match %s: %s" %
487 (resultcheck_fn, utils.Truncate(result, 80)))
489 def ExecOpCode(self, op, cbs, timeout=None):
490 """Execute an opcode.
492 @type op: an OpCode instance
493 @param op: the opcode to be executed
494 @type cbs: L{OpExecCbBase}
495 @param cbs: Runtime callbacks
496 @type timeout: float or None
497 @param timeout: Maximum time to acquire all locks, None for no timeout
498 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
502 if not isinstance(op, opcodes.OpCode):
503 raise errors.ProgrammerError("Non-opcode instance passed"
504 " to ExecOpcode (%s)" % type(op))
506 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
508 raise errors.OpCodeUnknown("Unknown opcode")
511 calc_timeout = lambda: None
513 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
517 if self._enable_locks:
518 # Acquire the Big Ganeti Lock exclusively if this LU requires it,
519 # and in a shared fashion otherwise (to prevent concurrent run with
521 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
522 not lu_class.REQ_BGL, False, calc_timeout())
523 elif lu_class.REQ_BGL:
524 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
525 " disabled" % op.OP_ID)
528 lu = lu_class(self, op, self.context, self.rpc)
530 assert lu.needed_locks is not None, "needed_locks not set by LU"
533 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
537 self.context.cfg.DropECReservations(self._ec_id)
539 # Release BGL if owned
540 if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
541 assert self._enable_locks
542 self.context.glm.release(locking.LEVEL_CLUSTER)
546 self._CheckLUResult(op, result)
550 def Log(self, *args):
551 """Forward call to feedback callback function.
555 self._cbs.Feedback(*args)
557 def LogStep(self, current, total, message):
558 """Log a change in LU execution progress.
561 logging.debug("Step %d/%d %s", current, total, message)
562 self.Log("STEP %d/%d %s" % (current, total, message))
564 def LogWarning(self, message, *args, **kwargs):
565 """Log a warning to the logs and the user.
567 The optional keyword argument is 'hint' and can be used to show a
568 hint to the user (presumably related to the warning). If the
569 message is empty, it will not be printed at all, allowing one to
573 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
574 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
576 message = message % tuple(args)
578 logging.warning(message)
579 self.Log(" - WARNING: %s" % message)
581 self.Log(" Hint: %s" % kwargs["hint"])
583 def LogInfo(self, message, *args):
584 """Log an informational message to the logs and the user.
588 message = message % tuple(args)
589 logging.info(message)
590 self.Log(" - INFO: %s" % message)
593 """Returns the current execution context ID.
597 raise errors.ProgrammerError("Tried to use execution context id when"