4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
38 from ganeti import opcodes
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import hooksmaster
42 from ganeti import cmdlib
43 from ganeti import locking
44 from ganeti import utils
45 from ganeti import compat
51 #: LU classes which don't need to acquire the node allocation lock
52 #: (L{locking.NAL}) when they acquire all node or node resource locks
53 _NODE_ALLOC_WHITELIST = frozenset([])
55 #: LU classes which don't need to acquire the node allocation lock
56 #: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
57 #: or node resource locks
58 _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
59 cmdlib.LUBackupExport,
60 cmdlib.LUBackupRemove,
65 class LockAcquireTimeout(Exception):
66 """Exception to report timeouts on acquiring locks.
71 def _CalculateLockAttemptTimeouts():
72 """Calculate timeouts for lock attempts.
75 result = [constants.LOCK_ATTEMPTS_MINWAIT]
76 running_sum = result[0]
78 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
80 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
81 timeout = (result[-1] * 1.05) ** 1.25
83 # Cap max timeout. This gives other jobs a chance to run even if
84 # we're still trying to get our locks, before finally moving to a
86 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
87 # And also cap the lower boundary for safety
88 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
90 result.append(timeout)
91 running_sum += timeout
96 class LockAttemptTimeoutStrategy(object):
97 """Class with lock acquire timeout strategy.
106 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
108 def __init__(self, _time_fn=time.time, _random_fn=random.random):
109 """Initializes this class.
111 @param _time_fn: Time function for unittests
112 @param _random_fn: Random number generator for unittests
115 object.__init__(self)
117 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
118 self._time_fn = _time_fn
119 self._random_fn = _random_fn
121 def NextAttempt(self):
122 """Returns the timeout for the next attempt.
126 timeout = self._timeouts.next()
127 except StopIteration:
128 # No more timeouts, do blocking acquire
131 if timeout is not None:
132 # Add a small variation (-/+ 5%) to timeout. This helps in situations
133 # where two or more jobs are fighting for the same lock(s).
134 variation_range = timeout * 0.1
135 timeout += ((self._random_fn() * variation_range) -
136 (variation_range * 0.5))
141 class OpExecCbBase: # pylint: disable=W0232
142 """Base class for OpCode execution callbacks.
145 def NotifyStart(self):
146 """Called when we are about to execute the LU.
148 This function is called when we're about to start the lu's Exec() method,
149 that is, after we have acquired all locks.
153 def Feedback(self, *args):
154 """Sends feedback from the LU code to the end-user.
158 def CurrentPriority(self): # pylint: disable=R0201
159 """Returns current priority or C{None}.
164 def SubmitManyJobs(self, jobs):
165 """Submits jobs for processing.
167 See L{jqueue.JobQueue.SubmitManyJobs}.
170 raise NotImplementedError
173 def _LUNameForOpName(opname):
174 """Computes the LU name for a given OpCode name.
177 assert opname.startswith(_OP_PREFIX), \
178 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
180 return _LU_PREFIX + opname[len(_OP_PREFIX):]
183 def _ComputeDispatchTable():
184 """Computes the opcode-to-lu dispatch table.
187 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
188 for op in opcodes.OP_MAPPING.values()
192 def _SetBaseOpParams(src, defcomment, dst):
193 """Copies basic opcode parameters.
195 @type src: L{opcodes.OpCode}
196 @param src: Source opcode
197 @type defcomment: string
198 @param defcomment: Comment to specify if not already given
199 @type dst: L{opcodes.OpCode}
200 @param dst: Destination opcode
203 if hasattr(src, "debug_level"):
204 dst.debug_level = src.debug_level
206 if (getattr(dst, "priority", None) is None and
207 hasattr(src, "priority")):
208 dst.priority = src.priority
210 if not getattr(dst, opcodes.COMMENT_ATTR, None):
211 dst.comment = defcomment
214 def _ProcessResult(submit_fn, op, result):
215 """Examines opcode result.
217 If necessary, additional processing on the result is done.
220 if isinstance(result, cmdlib.ResultWithJobs):
221 # Copy basic parameters (e.g. priority)
222 map(compat.partial(_SetBaseOpParams, op,
223 "Submitted by %s" % op.OP_ID),
224 itertools.chain(*result.jobs))
227 job_submission = submit_fn(result.jobs)
230 result = result.other
232 assert constants.JOB_IDS_KEY not in result, \
233 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
235 result[constants.JOB_IDS_KEY] = job_submission
240 def _FailingSubmitManyJobs(_):
241 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
244 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
245 " queries) can not submit jobs")
248 def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
249 _nal_whitelist=_NODE_ALLOC_WHITELIST):
250 """Performs consistency checks on locks acquired by a logical unit.
252 @type lu: L{cmdlib.LogicalUnit}
253 @param lu: Logical unit instance
254 @type glm: L{locking.GanetiLockManager}
255 @param glm: Lock manager
261 have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
263 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
264 # TODO: Verify using actual lock mode, not using LU variables
265 if level in lu.needed_locks:
266 share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
267 share_level = lu.share_locks[level]
269 if lu.__class__ in _mode_whitelist:
270 assert share_node_alloc != share_level, \
271 "LU is whitelisted to use different modes for node allocation lock"
273 assert bool(share_node_alloc) == bool(share_level), \
274 ("Node allocation lock must be acquired using the same mode as nodes"
275 " and node resources")
277 if lu.__class__ in _nal_whitelist:
278 assert not have_nal, \
279 "LU is whitelisted for not acquiring the node allocation lock"
280 elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
282 ("Node allocation lock must be used if an LU acquires all nodes"
283 " or node resources")
286 class Processor(object):
287 """Object which runs OpCodes"""
288 DISPATCH_TABLE = _ComputeDispatchTable()
290 def __init__(self, context, ec_id, enable_locks=True):
291 """Constructor for Processor
293 @type context: GanetiContext
294 @param context: global Ganeti context
296 @param ec_id: execution context identifier
299 self.context = context
302 self.rpc = context.rpc
303 self.hmclass = hooksmaster.HooksMaster
304 self._enable_locks = enable_locks
306 def _CheckLocksEnabled(self):
307 """Checks if locking is enabled.
309 @raise errors.ProgrammerError: In case locking is not enabled
312 if not self._enable_locks:
313 raise errors.ProgrammerError("Attempted to use disabled locks")
315 def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
316 """Acquires locks via the Ganeti lock manager.
319 @param level: Lock level
320 @type names: list or string
321 @param names: Lock names
323 @param shared: Whether the locks should be acquired in shared mode
324 @type opportunistic: bool
325 @param opportunistic: Whether to acquire opportunistically
326 @type timeout: None or float
327 @param timeout: Timeout for acquiring the locks
328 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
332 self._CheckLocksEnabled()
335 priority = self._cbs.CurrentPriority()
339 acquired = self.context.glm.acquire(level, names, shared=shared,
340 timeout=timeout, priority=priority,
341 opportunistic=opportunistic)
344 raise LockAcquireTimeout()
348 def _ExecLU(self, lu):
349 """Logical Unit execution sequence.
352 write_count = self.context.cfg.write_count
355 hm = self.BuildHooksManager(lu)
356 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
357 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
360 if getattr(lu.op, "dry_run", False):
361 # in this mode, no post-hooks are run, and the config is not
362 # written (as it might have been modified by another LU, and we
363 # shouldn't do writeout on behalf of other threads
364 self.LogInfo("dry-run mode requested, not actually executing"
366 return lu.dry_run_result
369 submit_mj_fn = self._cbs.SubmitManyJobs
371 submit_mj_fn = _FailingSubmitManyJobs
374 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
375 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
376 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
379 # FIXME: This needs locks if not lu_class.REQ_BGL
380 if write_count != self.context.cfg.write_count:
385 def BuildHooksManager(self, lu):
386 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
388 def _LockAndExecLU(self, lu, level, calc_timeout):
389 """Execute a Logical Unit, with the needed locks.
391 This is a recursive function that starts locking the given level, and
392 proceeds up, till there are no more locks to acquire. Then it executes the
393 given LU and its opcodes.
396 glm = self.context.glm
397 adding_locks = level in lu.add_locks
398 acquiring_locks = level in lu.needed_locks
400 if level not in locking.LEVELS:
401 _VerifyLocks(lu, glm)
404 self._cbs.NotifyStart()
407 result = self._ExecLU(lu)
408 except AssertionError, err:
409 # this is a bit ugly, as we don't know from which phase
410 # (prereq, exec) this comes; but it's better than an exception
411 # with no information
412 (_, _, tb) = sys.exc_info()
413 err_info = traceback.format_tb(tb)
415 logging.exception("Detected AssertionError")
416 raise errors.OpExecError("Internal assertion error: please report"
417 " this as a bug.\nError message: '%s';"
418 " location:\n%s" % (str(err), err_info[-1]))
420 elif adding_locks and acquiring_locks:
421 # We could both acquire and add locks at the same level, but for now we
422 # don't need this, so we'll avoid the complicated code needed.
423 raise NotImplementedError("Can't declare locks to acquire when adding"
426 elif adding_locks or acquiring_locks:
427 self._CheckLocksEnabled()
429 lu.DeclareLocks(level)
430 share = lu.share_locks[level]
431 opportunistic = lu.opportunistic_locks[level]
434 assert adding_locks ^ acquiring_locks, \
435 "Locks must be either added or acquired"
439 needed_locks = lu.needed_locks[level]
441 self._AcquireLocks(level, needed_locks, share, opportunistic,
445 add_locks = lu.add_locks[level]
446 lu.remove_locks[level] = add_locks
449 glm.add(level, add_locks, acquired=1, shared=share)
450 except errors.LockError:
451 logging.exception("Detected lock error in level %s for locks"
452 " %s, shared=%s", level, add_locks, share)
453 raise errors.OpPrereqError(
454 "Couldn't add locks (%s), most likely because of another"
455 " job who added them first" % add_locks,
456 errors.ECODE_NOTUNIQUE)
459 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
461 if level in lu.remove_locks:
462 glm.remove(level, lu.remove_locks[level])
464 if glm.is_owned(level):
468 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
472 def ExecOpCode(self, op, cbs, timeout=None):
473 """Execute an opcode.
475 @type op: an OpCode instance
476 @param op: the opcode to be executed
477 @type cbs: L{OpExecCbBase}
478 @param cbs: Runtime callbacks
479 @type timeout: float or None
480 @param timeout: Maximum time to acquire all locks, None for no timeout
481 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
485 if not isinstance(op, opcodes.OpCode):
486 raise errors.ProgrammerError("Non-opcode instance passed"
487 " to ExecOpcode (%s)" % type(op))
489 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
491 raise errors.OpCodeUnknown("Unknown opcode")
494 calc_timeout = lambda: None
496 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
500 if self._enable_locks:
501 # Acquire the Big Ganeti Lock exclusively if this LU requires it,
502 # and in a shared fashion otherwise (to prevent concurrent run with
504 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
505 not lu_class.REQ_BGL, False, calc_timeout())
506 elif lu_class.REQ_BGL:
507 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
508 " disabled" % op.OP_ID)
511 lu = lu_class(self, op, self.context, self.rpc)
513 assert lu.needed_locks is not None, "needed_locks not set by LU"
516 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
520 self.context.cfg.DropECReservations(self._ec_id)
522 # Release BGL if owned
523 if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
524 assert self._enable_locks
525 self.context.glm.release(locking.LEVEL_CLUSTER)
529 resultcheck_fn = op.OP_RESULT
530 if not (resultcheck_fn is None or resultcheck_fn(result)):
531 logging.error("Expected opcode result matching %s, got %s",
532 resultcheck_fn, result)
533 if not getattr(op, "dry_run", False):
534 # FIXME: LUs should still behave in dry_run mode, or
535 # alternately we should have OP_DRYRUN_RESULT; in the
536 # meantime, we simply skip the OP_RESULT check in dry-run mode
537 raise errors.OpResultError("Opcode result does not match %s: %s" %
538 (resultcheck_fn, utils.Truncate(result, 80)))
542 def Log(self, *args):
543 """Forward call to feedback callback function.
547 self._cbs.Feedback(*args)
549 def LogStep(self, current, total, message):
550 """Log a change in LU execution progress.
553 logging.debug("Step %d/%d %s", current, total, message)
554 self.Log("STEP %d/%d %s" % (current, total, message))
556 def LogWarning(self, message, *args, **kwargs):
557 """Log a warning to the logs and the user.
559 The optional keyword argument is 'hint' and can be used to show a
560 hint to the user (presumably related to the warning). If the
561 message is empty, it will not be printed at all, allowing one to
565 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
566 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
568 message = message % tuple(args)
570 logging.warning(message)
571 self.Log(" - WARNING: %s" % message)
573 self.Log(" Hint: %s" % kwargs["hint"])
575 def LogInfo(self, message, *args):
576 """Log an informational message to the logs and the user.
580 message = message % tuple(args)
581 logging.info(message)
582 self.Log(" - INFO: %s" % message)
585 """Returns the current execution context ID.
589 raise errors.ProgrammerError("Tried to use execution context id when"