4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
38 from ganeti import opcodes
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import cmdlib
42 from ganeti import locking
43 from ganeti import utils
44 from ganeti import compat
45 from ganeti import pathutils
51 #: LU classes which don't need to acquire the node allocation lock
52 #: (L{locking.NAL}) when they acquire all node or node resource locks
53 _NODE_ALLOC_WHITELIST = frozenset([])
55 #: LU classes which don't need to acquire the node allocation lock
56 #: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
57 #: or node resource locks
58 _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
59 cmdlib.LUBackupExport,
60 cmdlib.LUBackupRemove,
65 class LockAcquireTimeout(Exception):
66 """Exception to report timeouts on acquiring locks.
71 def _CalculateLockAttemptTimeouts():
72 """Calculate timeouts for lock attempts.
75 result = [constants.LOCK_ATTEMPTS_MINWAIT]
76 running_sum = result[0]
78 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
80 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
81 timeout = (result[-1] * 1.05) ** 1.25
83 # Cap max timeout. This gives other jobs a chance to run even if
84 # we're still trying to get our locks, before finally moving to a
86 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
87 # And also cap the lower boundary for safety
88 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
90 result.append(timeout)
91 running_sum += timeout
96 class LockAttemptTimeoutStrategy(object):
97 """Class with lock acquire timeout strategy.
106 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
108 def __init__(self, _time_fn=time.time, _random_fn=random.random):
109 """Initializes this class.
111 @param _time_fn: Time function for unittests
112 @param _random_fn: Random number generator for unittests
115 object.__init__(self)
117 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
118 self._time_fn = _time_fn
119 self._random_fn = _random_fn
121 def NextAttempt(self):
122 """Returns the timeout for the next attempt.
126 timeout = self._timeouts.next()
127 except StopIteration:
128 # No more timeouts, do blocking acquire
131 if timeout is not None:
132 # Add a small variation (-/+ 5%) to timeout. This helps in situations
133 # where two or more jobs are fighting for the same lock(s).
134 variation_range = timeout * 0.1
135 timeout += ((self._random_fn() * variation_range) -
136 (variation_range * 0.5))
141 class OpExecCbBase: # pylint: disable=W0232
142 """Base class for OpCode execution callbacks.
145 def NotifyStart(self):
146 """Called when we are about to execute the LU.
148 This function is called when we're about to start the lu's Exec() method,
149 that is, after we have acquired all locks.
153 def Feedback(self, *args):
154 """Sends feedback from the LU code to the end-user.
158 def CurrentPriority(self): # pylint: disable=R0201
159 """Returns current priority or C{None}.
164 def SubmitManyJobs(self, jobs):
165 """Submits jobs for processing.
167 See L{jqueue.JobQueue.SubmitManyJobs}.
170 raise NotImplementedError
173 def _LUNameForOpName(opname):
174 """Computes the LU name for a given OpCode name.
177 assert opname.startswith(_OP_PREFIX), \
178 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
180 return _LU_PREFIX + opname[len(_OP_PREFIX):]
183 def _ComputeDispatchTable():
184 """Computes the opcode-to-lu dispatch table.
187 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
188 for op in opcodes.OP_MAPPING.values()
192 def _SetBaseOpParams(src, defcomment, dst):
193 """Copies basic opcode parameters.
195 @type src: L{opcodes.OpCode}
196 @param src: Source opcode
197 @type defcomment: string
198 @param defcomment: Comment to specify if not already given
199 @type dst: L{opcodes.OpCode}
200 @param dst: Destination opcode
203 if hasattr(src, "debug_level"):
204 dst.debug_level = src.debug_level
206 if (getattr(dst, "priority", None) is None and
207 hasattr(src, "priority")):
208 dst.priority = src.priority
210 if not getattr(dst, opcodes.COMMENT_ATTR, None):
211 dst.comment = defcomment
214 def _ProcessResult(submit_fn, op, result):
215 """Examines opcode result.
217 If necessary, additional processing on the result is done.
220 if isinstance(result, cmdlib.ResultWithJobs):
221 # Copy basic parameters (e.g. priority)
222 map(compat.partial(_SetBaseOpParams, op,
223 "Submitted by %s" % op.OP_ID),
224 itertools.chain(*result.jobs))
227 job_submission = submit_fn(result.jobs)
230 result = result.other
232 assert constants.JOB_IDS_KEY not in result, \
233 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
235 result[constants.JOB_IDS_KEY] = job_submission
240 def _FailingSubmitManyJobs(_):
241 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
244 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
245 " queries) can not submit jobs")
248 def _RpcResultsToHooksResults(rpc_results):
249 """Function to convert RPC results to the format expected by HooksMaster.
251 @type rpc_results: dict(node: L{rpc.RpcResult})
252 @param rpc_results: RPC results
253 @rtype: dict(node: (fail_msg, offline, hooks_results))
254 @return: RPC results unpacked according to the format expected by
258 return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
259 for (node, rpc_res) in rpc_results.items())
262 def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
263 _nal_whitelist=_NODE_ALLOC_WHITELIST):
264 """Performs consistency checks on locks acquired by a logical unit.
266 @type lu: L{cmdlib.LogicalUnit}
267 @param lu: Logical unit instance
268 @type glm: L{locking.GanetiLockManager}
269 @param glm: Lock manager
275 have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
277 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
278 # TODO: Verify using actual lock mode, not using LU variables
279 if level in lu.needed_locks:
280 share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
281 share_level = lu.share_locks[level]
283 if lu.__class__ in _mode_whitelist:
284 assert share_node_alloc != share_level, \
285 "LU is whitelisted to use different modes for node allocation lock"
287 assert bool(share_node_alloc) == bool(share_level), \
288 ("Node allocation lock must be acquired using the same mode as nodes"
289 " and node resources")
291 if lu.__class__ in _nal_whitelist:
292 assert not have_nal, \
293 "LU is whitelisted for not acquiring the node allocation lock"
294 elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
296 ("Node allocation lock must be used if an LU acquires all nodes"
297 " or node resources")
300 class Processor(object):
301 """Object which runs OpCodes"""
302 DISPATCH_TABLE = _ComputeDispatchTable()
304 def __init__(self, context, ec_id, enable_locks=True):
305 """Constructor for Processor
307 @type context: GanetiContext
308 @param context: global Ganeti context
310 @param ec_id: execution context identifier
313 self.context = context
316 self.rpc = context.rpc
317 self.hmclass = HooksMaster
318 self._enable_locks = enable_locks
320 def _CheckLocksEnabled(self):
321 """Checks if locking is enabled.
323 @raise errors.ProgrammerError: In case locking is not enabled
326 if not self._enable_locks:
327 raise errors.ProgrammerError("Attempted to use disabled locks")
329 def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
330 """Acquires locks via the Ganeti lock manager.
333 @param level: Lock level
334 @type names: list or string
335 @param names: Lock names
337 @param shared: Whether the locks should be acquired in shared mode
338 @type opportunistic: bool
339 @param opportunistic: Whether to acquire opportunistically
340 @type timeout: None or float
341 @param timeout: Timeout for acquiring the locks
342 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
346 self._CheckLocksEnabled()
349 priority = self._cbs.CurrentPriority()
353 acquired = self.context.glm.acquire(level, names, shared=shared,
354 timeout=timeout, priority=priority,
355 opportunistic=opportunistic)
358 raise LockAcquireTimeout()
362 def _ExecLU(self, lu):
363 """Logical Unit execution sequence.
366 write_count = self.context.cfg.write_count
369 hm = self.BuildHooksManager(lu)
370 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
371 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
374 if getattr(lu.op, "dry_run", False):
375 # in this mode, no post-hooks are run, and the config is not
376 # written (as it might have been modified by another LU, and we
377 # shouldn't do writeout on behalf of other threads
378 self.LogInfo("dry-run mode requested, not actually executing"
380 return lu.dry_run_result
383 submit_mj_fn = self._cbs.SubmitManyJobs
385 submit_mj_fn = _FailingSubmitManyJobs
388 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
389 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
390 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
393 # FIXME: This needs locks if not lu_class.REQ_BGL
394 if write_count != self.context.cfg.write_count:
399 def BuildHooksManager(self, lu):
400 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
402 def _LockAndExecLU(self, lu, level, calc_timeout):
403 """Execute a Logical Unit, with the needed locks.
405 This is a recursive function that starts locking the given level, and
406 proceeds up, till there are no more locks to acquire. Then it executes the
407 given LU and its opcodes.
410 glm = self.context.glm
411 adding_locks = level in lu.add_locks
412 acquiring_locks = level in lu.needed_locks
414 if level not in locking.LEVELS:
415 _VerifyLocks(lu, glm)
418 self._cbs.NotifyStart()
421 result = self._ExecLU(lu)
422 except AssertionError, err:
423 # this is a bit ugly, as we don't know from which phase
424 # (prereq, exec) this comes; but it's better than an exception
425 # with no information
426 (_, _, tb) = sys.exc_info()
427 err_info = traceback.format_tb(tb)
429 logging.exception("Detected AssertionError")
430 raise errors.OpExecError("Internal assertion error: please report"
431 " this as a bug.\nError message: '%s';"
432 " location:\n%s" % (str(err), err_info[-1]))
434 elif adding_locks and acquiring_locks:
435 # We could both acquire and add locks at the same level, but for now we
436 # don't need this, so we'll avoid the complicated code needed.
437 raise NotImplementedError("Can't declare locks to acquire when adding"
440 elif adding_locks or acquiring_locks:
441 self._CheckLocksEnabled()
443 lu.DeclareLocks(level)
444 share = lu.share_locks[level]
445 opportunistic = lu.opportunistic_locks[level]
448 assert adding_locks ^ acquiring_locks, \
449 "Locks must be either added or acquired"
453 needed_locks = lu.needed_locks[level]
455 self._AcquireLocks(level, needed_locks, share, opportunistic,
459 add_locks = lu.add_locks[level]
460 lu.remove_locks[level] = add_locks
463 glm.add(level, add_locks, acquired=1, shared=share)
464 except errors.LockError:
465 logging.exception("Detected lock error in level %s for locks"
466 " %s, shared=%s", level, add_locks, share)
467 raise errors.OpPrereqError(
468 "Couldn't add locks (%s), most likely because of another"
469 " job who added them first" % add_locks,
470 errors.ECODE_NOTUNIQUE)
473 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
475 if level in lu.remove_locks:
476 glm.remove(level, lu.remove_locks[level])
478 if glm.is_owned(level):
482 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
486 def ExecOpCode(self, op, cbs, timeout=None):
487 """Execute an opcode.
489 @type op: an OpCode instance
490 @param op: the opcode to be executed
491 @type cbs: L{OpExecCbBase}
492 @param cbs: Runtime callbacks
493 @type timeout: float or None
494 @param timeout: Maximum time to acquire all locks, None for no timeout
495 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
499 if not isinstance(op, opcodes.OpCode):
500 raise errors.ProgrammerError("Non-opcode instance passed"
501 " to ExecOpcode (%s)" % type(op))
503 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
505 raise errors.OpCodeUnknown("Unknown opcode")
508 calc_timeout = lambda: None
510 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
514 if self._enable_locks:
515 # Acquire the Big Ganeti Lock exclusively if this LU requires it,
516 # and in a shared fashion otherwise (to prevent concurrent run with
518 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
519 not lu_class.REQ_BGL, False, calc_timeout())
520 elif lu_class.REQ_BGL:
521 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
522 " disabled" % op.OP_ID)
525 lu = lu_class(self, op, self.context, self.rpc)
527 assert lu.needed_locks is not None, "needed_locks not set by LU"
530 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
534 self.context.cfg.DropECReservations(self._ec_id)
536 # Release BGL if owned
537 if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
538 assert self._enable_locks
539 self.context.glm.release(locking.LEVEL_CLUSTER)
543 resultcheck_fn = op.OP_RESULT
544 if not (resultcheck_fn is None or resultcheck_fn(result)):
545 logging.error("Expected opcode result matching %s, got %s",
546 resultcheck_fn, result)
547 if not getattr(op, "dry_run", False):
548 # FIXME: LUs should still behave in dry_run mode, or
549 # alternately we should have OP_DRYRUN_RESULT; in the
550 # meantime, we simply skip the OP_RESULT check in dry-run mode
551 raise errors.OpResultError("Opcode result does not match %s: %s" %
552 (resultcheck_fn, utils.Truncate(result, 80)))
556 def Log(self, *args):
557 """Forward call to feedback callback function.
561 self._cbs.Feedback(*args)
563 def LogStep(self, current, total, message):
564 """Log a change in LU execution progress.
567 logging.debug("Step %d/%d %s", current, total, message)
568 self.Log("STEP %d/%d %s" % (current, total, message))
570 def LogWarning(self, message, *args, **kwargs):
571 """Log a warning to the logs and the user.
573 The optional keyword argument is 'hint' and can be used to show a
574 hint to the user (presumably related to the warning). If the
575 message is empty, it will not be printed at all, allowing one to
579 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
580 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
582 message = message % tuple(args)
584 logging.warning(message)
585 self.Log(" - WARNING: %s" % message)
587 self.Log(" Hint: %s" % kwargs["hint"])
589 def LogInfo(self, message, *args):
590 """Log an informational message to the logs and the user.
594 message = message % tuple(args)
595 logging.info(message)
596 self.Log(" - INFO: %s" % message)
599 """Returns the current execution context ID.
603 raise errors.ProgrammerError("Tried to use execution context id when"
608 class HooksMaster(object):
609 def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
610 hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
611 cluster_name=None, master_name=None):
612 """Base class for hooks masters.
614 This class invokes the execution of hooks according to the behaviour
615 specified by its parameters.
618 @param opcode: opcode of the operation to which the hooks are tied
619 @type hooks_path: string
620 @param hooks_path: prefix of the hooks directories
621 @type nodes: 2-tuple of lists
622 @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
623 run and nodes on which post-hooks must be run
624 @type hooks_execution_fn: function that accepts the following parameters:
625 (node_list, hooks_path, phase, environment)
626 @param hooks_execution_fn: function that will execute the hooks; can be
627 None, indicating that no conversion is necessary.
628 @type hooks_results_adapt_fn: function
629 @param hooks_results_adapt_fn: function that will adapt the return value of
630 hooks_execution_fn to the format expected by RunPhase
631 @type build_env_fn: function that returns a dictionary having strings as
633 @param build_env_fn: function that builds the environment for the hooks
634 @type log_fn: function that accepts a string
635 @param log_fn: logging function
636 @type htype: string or None
637 @param htype: None or one of L{constants.HTYPE_CLUSTER},
638 L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
639 @type cluster_name: string
640 @param cluster_name: name of the cluster
641 @type master_name: string
642 @param master_name: name of the master
646 self.hooks_path = hooks_path
647 self.hooks_execution_fn = hooks_execution_fn
648 self.hooks_results_adapt_fn = hooks_results_adapt_fn
649 self.build_env_fn = build_env_fn
652 self.cluster_name = cluster_name
653 self.master_name = master_name
655 self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
656 (self.pre_nodes, self.post_nodes) = nodes
658 def _BuildEnv(self, phase):
659 """Compute the environment and the target nodes.
661 Based on the opcode and the current node list, this builds the
662 environment for the hooks and the target node list for the run.
665 if phase == constants.HOOKS_PHASE_PRE:
667 elif phase == constants.HOOKS_PHASE_POST:
668 prefix = "GANETI_POST_"
670 raise AssertionError("Unknown phase '%s'" % phase)
674 if self.hooks_path is not None:
675 phase_env = self.build_env_fn()
677 assert not compat.any(key.upper().startswith(prefix)
678 for key in phase_env)
679 env.update(("%s%s" % (prefix, key), value)
680 for (key, value) in phase_env.items())
682 if phase == constants.HOOKS_PHASE_PRE:
683 assert compat.all((key.startswith("GANETI_") and
684 not key.startswith("GANETI_POST_"))
687 elif phase == constants.HOOKS_PHASE_POST:
688 assert compat.all(key.startswith("GANETI_POST_") for key in env)
689 assert isinstance(self.pre_env, dict)
691 # Merge with pre-phase environment
692 assert not compat.any(key.startswith("GANETI_POST_")
693 for key in self.pre_env)
694 env.update(self.pre_env)
696 raise AssertionError("Unknown phase '%s'" % phase)
700 def _RunWrapper(self, node_list, hpath, phase, phase_env):
701 """Simple wrapper over self.callfn.
703 This method fixes the environment before executing the hooks.
707 "PATH": constants.HOOKS_PATH,
708 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
709 "GANETI_OP_CODE": self.opcode,
710 "GANETI_DATA_DIR": pathutils.DATA_DIR,
711 "GANETI_HOOKS_PHASE": phase,
712 "GANETI_HOOKS_PATH": hpath,
716 env["GANETI_OBJECT_TYPE"] = self.htype
718 if self.cluster_name is not None:
719 env["GANETI_CLUSTER"] = self.cluster_name
721 if self.master_name is not None:
722 env["GANETI_MASTER"] = self.master_name
725 env = utils.algo.JoinDisjointDicts(env, phase_env)
727 # Convert everything to strings
728 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
730 assert compat.all(key == "PATH" or key.startswith("GANETI_")
733 return self.hooks_execution_fn(node_list, hpath, phase, env)
735 def RunPhase(self, phase, nodes=None):
736 """Run all the scripts for a phase.
738 This is the main function of the HookMaster.
739 It executes self.hooks_execution_fn, and after running
740 self.hooks_results_adapt_fn on its results it expects them to be in the form
741 {node_name: (fail_msg, [(script, result, output), ...]}).
743 @param phase: one of L{constants.HOOKS_PHASE_POST} or
744 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
745 @param nodes: overrides the predefined list of nodes for the given phase
746 @return: the processed results of the hooks multi-node rpc call
747 @raise errors.HooksFailure: on communication failure to the nodes
748 @raise errors.HooksAbort: on failure of one of the hooks
751 if phase == constants.HOOKS_PHASE_PRE:
753 nodes = self.pre_nodes
755 elif phase == constants.HOOKS_PHASE_POST:
757 nodes = self.post_nodes
758 env = self._BuildEnv(phase)
760 raise AssertionError("Unknown phase '%s'" % phase)
763 # empty node list, we should not attempt to run this as either
764 # we're in the cluster init phase and the rpc client part can't
765 # even attempt to run, or this LU doesn't do hooks at all
768 results = self._RunWrapper(nodes, self.hooks_path, phase, env)
770 msg = "Communication Failure"
771 if phase == constants.HOOKS_PHASE_PRE:
772 raise errors.HooksFailure(msg)
777 converted_res = results
778 if self.hooks_results_adapt_fn:
779 converted_res = self.hooks_results_adapt_fn(results)
782 for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
787 self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
790 for script, hkr, output in hooks_results:
791 if hkr == constants.HKR_FAIL:
792 if phase == constants.HOOKS_PHASE_PRE:
793 errs.append((node_name, script, output))
796 output = "(no output)"
797 self.log_fn("On %s script %s failed, output: %s" %
798 (node_name, script, output))
800 if errs and phase == constants.HOOKS_PHASE_PRE:
801 raise errors.HooksAbort(errs)
805 def RunConfigUpdate(self):
806 """Run the special configuration update hook
808 This is a special hook that runs only on the master after each
809 top-level LI if the configuration has been updated.
812 phase = constants.HOOKS_PHASE_POST
813 hpath = constants.HOOKS_NAME_CFGUPDATE
814 nodes = [self.master_name]
815 self._RunWrapper(nodes, hpath, phase, self.pre_env)
818 def BuildFromLu(hooks_execution_fn, lu):
822 nodes = map(frozenset, lu.BuildHooksNodes())
824 master_name = cluster_name = None
826 master_name = lu.cfg.GetMasterNode()
827 cluster_name = lu.cfg.GetClusterName()
829 return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
830 _RpcResultsToHooksResults, lu.BuildHooksEnv,
831 lu.LogWarning, lu.HTYPE, cluster_name, master_name)