4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
38 from ganeti import opcodes
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import cmdlib
42 from ganeti import locking
43 from ganeti import utils
44 from ganeti import compat
45 from ganeti import pathutils
51 #: LU classes which don't need to acquire the node allocation lock
52 #: (L{locking.NAL}) when they acquire all node or node resource locks
53 _NODE_ALLOC_WHITELIST = frozenset([])
55 #: LU classes which don't need to acquire the node allocation lock
56 #: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
57 #: or node resource locks
58 _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
59 cmdlib.LUBackupExport,
60 cmdlib.LUBackupRemove,
65 class LockAcquireTimeout(Exception):
66 """Exception to report timeouts on acquiring locks.
71 def _CalculateLockAttemptTimeouts():
72 """Calculate timeouts for lock attempts.
75 result = [constants.LOCK_ATTEMPTS_MINWAIT]
76 running_sum = result[0]
78 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
80 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
81 timeout = (result[-1] * 1.05) ** 1.25
83 # Cap max timeout. This gives other jobs a chance to run even if
84 # we're still trying to get our locks, before finally moving to a
86 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
87 # And also cap the lower boundary for safety
88 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
90 result.append(timeout)
91 running_sum += timeout
96 class LockAttemptTimeoutStrategy(object):
97 """Class with lock acquire timeout strategy.
106 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
108 def __init__(self, _time_fn=time.time, _random_fn=random.random):
109 """Initializes this class.
111 @param _time_fn: Time function for unittests
112 @param _random_fn: Random number generator for unittests
115 object.__init__(self)
117 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
118 self._time_fn = _time_fn
119 self._random_fn = _random_fn
121 def NextAttempt(self):
122 """Returns the timeout for the next attempt.
126 timeout = self._timeouts.next()
127 except StopIteration:
128 # No more timeouts, do blocking acquire
131 if timeout is not None:
132 # Add a small variation (-/+ 5%) to timeout. This helps in situations
133 # where two or more jobs are fighting for the same lock(s).
134 variation_range = timeout * 0.1
135 timeout += ((self._random_fn() * variation_range) -
136 (variation_range * 0.5))
141 class OpExecCbBase: # pylint: disable=W0232
142 """Base class for OpCode execution callbacks.
145 def NotifyStart(self):
146 """Called when we are about to execute the LU.
148 This function is called when we're about to start the lu's Exec() method,
149 that is, after we have acquired all locks.
153 def Feedback(self, *args):
154 """Sends feedback from the LU code to the end-user.
158 def CurrentPriority(self): # pylint: disable=R0201
159 """Returns current priority or C{None}.
164 def SubmitManyJobs(self, jobs):
165 """Submits jobs for processing.
167 See L{jqueue.JobQueue.SubmitManyJobs}.
170 raise NotImplementedError
173 def _LUNameForOpName(opname):
174 """Computes the LU name for a given OpCode name.
177 assert opname.startswith(_OP_PREFIX), \
178 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
180 return _LU_PREFIX + opname[len(_OP_PREFIX):]
183 def _ComputeDispatchTable():
184 """Computes the opcode-to-lu dispatch table.
187 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
188 for op in opcodes.OP_MAPPING.values()
192 def _SetBaseOpParams(src, defcomment, dst):
193 """Copies basic opcode parameters.
195 @type src: L{opcodes.OpCode}
196 @param src: Source opcode
197 @type defcomment: string
198 @param defcomment: Comment to specify if not already given
199 @type dst: L{opcodes.OpCode}
200 @param dst: Destination opcode
203 if hasattr(src, "debug_level"):
204 dst.debug_level = src.debug_level
206 if (getattr(dst, "priority", None) is None and
207 hasattr(src, "priority")):
208 dst.priority = src.priority
210 if not getattr(dst, opcodes.COMMENT_ATTR, None):
211 dst.comment = defcomment
214 def _ProcessResult(submit_fn, op, result):
215 """Examines opcode result.
217 If necessary, additional processing on the result is done.
220 if isinstance(result, cmdlib.ResultWithJobs):
221 # Copy basic parameters (e.g. priority)
222 map(compat.partial(_SetBaseOpParams, op,
223 "Submitted by %s" % op.OP_ID),
224 itertools.chain(*result.jobs))
227 job_submission = submit_fn(result.jobs)
230 result = result.other
232 assert constants.JOB_IDS_KEY not in result, \
233 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
235 result[constants.JOB_IDS_KEY] = job_submission
240 def _FailingSubmitManyJobs(_):
241 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
244 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
245 " queries) can not submit jobs")
248 def _RpcResultsToHooksResults(rpc_results):
249 """Function to convert RPC results to the format expected by HooksMaster.
251 @type rpc_results: dict(node: L{rpc.RpcResult})
252 @param rpc_results: RPC results
253 @rtype: dict(node: (fail_msg, offline, hooks_results))
254 @return: RPC results unpacked according to the format expected by
258 return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
259 for (node, rpc_res) in rpc_results.items())
262 def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
263 _nal_whitelist=_NODE_ALLOC_WHITELIST):
264 """Performs consistency checks on locks acquired by a logical unit.
266 @type lu: L{cmdlib.LogicalUnit}
267 @param lu: Logical unit instance
268 @type glm: L{locking.GanetiLockManager}
269 @param glm: Lock manager
275 have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
277 for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
278 # TODO: Verify using actual lock mode, not using LU variables
279 if level in lu.needed_locks:
280 share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
281 share_level = lu.share_locks[level]
283 if lu.__class__ in _mode_whitelist:
284 assert share_node_alloc != share_level, \
285 "LU is whitelisted to use different modes for node allocation lock"
287 assert bool(share_node_alloc) == bool(share_level), \
288 ("Node allocation lock must be acquired using the same mode as nodes"
289 " and node resources")
291 if lu.__class__ in _nal_whitelist:
292 assert not have_nal, \
293 "LU is whitelisted for not acquiring the node allocation lock"
294 elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
296 ("Node allocation lock must be used if an LU acquires all nodes"
297 " or node resources")
300 class Processor(object):
301 """Object which runs OpCodes"""
302 DISPATCH_TABLE = _ComputeDispatchTable()
304 def __init__(self, context, ec_id, enable_locks=True):
305 """Constructor for Processor
307 @type context: GanetiContext
308 @param context: global Ganeti context
310 @param ec_id: execution context identifier
313 self.context = context
316 self.rpc = context.rpc
317 self.hmclass = HooksMaster
318 self._enable_locks = enable_locks
320 def _CheckLocksEnabled(self):
321 """Checks if locking is enabled.
323 @raise errors.ProgrammerError: In case locking is not enabled
326 if not self._enable_locks:
327 raise errors.ProgrammerError("Attempted to use disabled locks")
329 def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
330 """Acquires locks via the Ganeti lock manager.
333 @param level: Lock level
334 @type names: list or string
335 @param names: Lock names
337 @param shared: Whether the locks should be acquired in shared mode
338 @type opportunistic: bool
339 @param opportunistic: Whether to acquire opportunistically
340 @type timeout: None or float
341 @param timeout: Timeout for acquiring the locks
342 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
346 self._CheckLocksEnabled()
349 priority = self._cbs.CurrentPriority()
353 acquired = self.context.glm.acquire(level, names, shared=shared,
354 timeout=timeout, priority=priority,
355 opportunistic=opportunistic)
358 raise LockAcquireTimeout()
362 def _ExecLU(self, lu):
363 """Logical Unit execution sequence.
366 write_count = self.context.cfg.write_count
369 hm = self.BuildHooksManager(lu)
370 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
371 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
374 if getattr(lu.op, "dry_run", False):
375 # in this mode, no post-hooks are run, and the config is not
376 # written (as it might have been modified by another LU, and we
377 # shouldn't do writeout on behalf of other threads
378 self.LogInfo("dry-run mode requested, not actually executing"
380 return lu.dry_run_result
383 submit_mj_fn = self._cbs.SubmitManyJobs
385 submit_mj_fn = _FailingSubmitManyJobs
388 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
389 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
390 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
393 # FIXME: This needs locks if not lu_class.REQ_BGL
394 if write_count != self.context.cfg.write_count:
399 def BuildHooksManager(self, lu):
400 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
402 def _LockAndExecLU(self, lu, level, calc_timeout):
403 """Execute a Logical Unit, with the needed locks.
405 This is a recursive function that starts locking the given level, and
406 proceeds up, till there are no more locks to acquire. Then it executes the
407 given LU and its opcodes.
410 glm = self.context.glm
411 adding_locks = level in lu.add_locks
412 acquiring_locks = level in lu.needed_locks
414 if level not in locking.LEVELS:
415 _VerifyLocks(lu, glm)
418 self._cbs.NotifyStart()
421 result = self._ExecLU(lu)
422 except AssertionError, err:
423 # this is a bit ugly, as we don't know from which phase
424 # (prereq, exec) this comes; but it's better than an exception
425 # with no information
426 (_, _, tb) = sys.exc_info()
427 err_info = traceback.format_tb(tb)
429 logging.exception("Detected AssertionError")
430 raise errors.OpExecError("Internal assertion error: please report"
431 " this as a bug.\nError message: '%s';"
432 " location:\n%s" % (str(err), err_info[-1]))
434 elif adding_locks and acquiring_locks:
435 # We could both acquire and add locks at the same level, but for now we
436 # don't need this, so we'll avoid the complicated code needed.
437 raise NotImplementedError("Can't declare locks to acquire when adding"
440 elif adding_locks or acquiring_locks:
441 self._CheckLocksEnabled()
443 lu.DeclareLocks(level)
444 share = lu.share_locks[level]
445 opportunistic = lu.opportunistic_locks[level]
448 assert adding_locks ^ acquiring_locks, \
449 "Locks must be either added or acquired"
453 needed_locks = lu.needed_locks[level]
455 self._AcquireLocks(level, needed_locks, share, opportunistic,
459 add_locks = lu.add_locks[level]
460 lu.remove_locks[level] = add_locks
463 glm.add(level, add_locks, acquired=1, shared=share)
464 except errors.LockError:
465 logging.exception("Detected lock error in level %s for locks"
466 " %s, shared=%s", level, add_locks, share)
467 raise errors.OpPrereqError(
468 "Couldn't add locks (%s), most likely because of another"
469 " job who added them first" % add_locks,
470 errors.ECODE_NOTUNIQUE)
473 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
475 if level in lu.remove_locks:
476 glm.remove(level, lu.remove_locks[level])
478 if glm.is_owned(level):
482 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
486 def ExecOpCode(self, op, cbs, timeout=None):
487 """Execute an opcode.
489 @type op: an OpCode instance
490 @param op: the opcode to be executed
491 @type cbs: L{OpExecCbBase}
492 @param cbs: Runtime callbacks
493 @type timeout: float or None
494 @param timeout: Maximum time to acquire all locks, None for no timeout
495 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
499 if not isinstance(op, opcodes.OpCode):
500 raise errors.ProgrammerError("Non-opcode instance passed"
501 " to ExecOpcode (%s)" % type(op))
503 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
505 raise errors.OpCodeUnknown("Unknown opcode")
508 calc_timeout = lambda: None
510 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
514 if self._enable_locks:
515 # Acquire the Big Ganeti Lock exclusively if this LU requires it,
516 # and in a shared fashion otherwise (to prevent concurrent run with
518 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
519 not lu_class.REQ_BGL, False, calc_timeout())
520 elif lu_class.REQ_BGL:
521 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
522 " disabled" % op.OP_ID)
525 lu = lu_class(self, op, self.context, self.rpc)
527 assert lu.needed_locks is not None, "needed_locks not set by LU"
530 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
534 self.context.cfg.DropECReservations(self._ec_id)
536 # Release BGL if owned
537 if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
538 assert self._enable_locks
539 self.context.glm.release(locking.LEVEL_CLUSTER)
543 resultcheck_fn = op.OP_RESULT
544 if not (resultcheck_fn is None or resultcheck_fn(result)):
545 logging.error("Expected opcode result matching %s, got %s",
546 resultcheck_fn, result)
547 raise errors.OpResultError("Opcode result does not match %s: %s" %
548 (resultcheck_fn, utils.Truncate(result, 80)))
552 def Log(self, *args):
553 """Forward call to feedback callback function.
557 self._cbs.Feedback(*args)
559 def LogStep(self, current, total, message):
560 """Log a change in LU execution progress.
563 logging.debug("Step %d/%d %s", current, total, message)
564 self.Log("STEP %d/%d %s" % (current, total, message))
566 def LogWarning(self, message, *args, **kwargs):
567 """Log a warning to the logs and the user.
569 The optional keyword argument is 'hint' and can be used to show a
570 hint to the user (presumably related to the warning). If the
571 message is empty, it will not be printed at all, allowing one to
575 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
576 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
578 message = message % tuple(args)
580 logging.warning(message)
581 self.Log(" - WARNING: %s" % message)
583 self.Log(" Hint: %s" % kwargs["hint"])
585 def LogInfo(self, message, *args):
586 """Log an informational message to the logs and the user.
590 message = message % tuple(args)
591 logging.info(message)
592 self.Log(" - INFO: %s" % message)
595 """Returns the current execution context ID.
599 raise errors.ProgrammerError("Tried to use execution context id when"
604 class HooksMaster(object):
605 def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
606 hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
607 cluster_name=None, master_name=None):
608 """Base class for hooks masters.
610 This class invokes the execution of hooks according to the behaviour
611 specified by its parameters.
614 @param opcode: opcode of the operation to which the hooks are tied
615 @type hooks_path: string
616 @param hooks_path: prefix of the hooks directories
617 @type nodes: 2-tuple of lists
618 @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
619 run and nodes on which post-hooks must be run
620 @type hooks_execution_fn: function that accepts the following parameters:
621 (node_list, hooks_path, phase, environment)
622 @param hooks_execution_fn: function that will execute the hooks; can be
623 None, indicating that no conversion is necessary.
624 @type hooks_results_adapt_fn: function
625 @param hooks_results_adapt_fn: function that will adapt the return value of
626 hooks_execution_fn to the format expected by RunPhase
627 @type build_env_fn: function that returns a dictionary having strings as
629 @param build_env_fn: function that builds the environment for the hooks
630 @type log_fn: function that accepts a string
631 @param log_fn: logging function
632 @type htype: string or None
633 @param htype: None or one of L{constants.HTYPE_CLUSTER},
634 L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
635 @type cluster_name: string
636 @param cluster_name: name of the cluster
637 @type master_name: string
638 @param master_name: name of the master
642 self.hooks_path = hooks_path
643 self.hooks_execution_fn = hooks_execution_fn
644 self.hooks_results_adapt_fn = hooks_results_adapt_fn
645 self.build_env_fn = build_env_fn
648 self.cluster_name = cluster_name
649 self.master_name = master_name
651 self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
652 (self.pre_nodes, self.post_nodes) = nodes
654 def _BuildEnv(self, phase):
655 """Compute the environment and the target nodes.
657 Based on the opcode and the current node list, this builds the
658 environment for the hooks and the target node list for the run.
661 if phase == constants.HOOKS_PHASE_PRE:
663 elif phase == constants.HOOKS_PHASE_POST:
664 prefix = "GANETI_POST_"
666 raise AssertionError("Unknown phase '%s'" % phase)
670 if self.hooks_path is not None:
671 phase_env = self.build_env_fn()
673 assert not compat.any(key.upper().startswith(prefix)
674 for key in phase_env)
675 env.update(("%s%s" % (prefix, key), value)
676 for (key, value) in phase_env.items())
678 if phase == constants.HOOKS_PHASE_PRE:
679 assert compat.all((key.startswith("GANETI_") and
680 not key.startswith("GANETI_POST_"))
683 elif phase == constants.HOOKS_PHASE_POST:
684 assert compat.all(key.startswith("GANETI_POST_") for key in env)
685 assert isinstance(self.pre_env, dict)
687 # Merge with pre-phase environment
688 assert not compat.any(key.startswith("GANETI_POST_")
689 for key in self.pre_env)
690 env.update(self.pre_env)
692 raise AssertionError("Unknown phase '%s'" % phase)
696 def _RunWrapper(self, node_list, hpath, phase, phase_env):
697 """Simple wrapper over self.callfn.
699 This method fixes the environment before executing the hooks.
703 "PATH": constants.HOOKS_PATH,
704 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
705 "GANETI_OP_CODE": self.opcode,
706 "GANETI_DATA_DIR": pathutils.DATA_DIR,
707 "GANETI_HOOKS_PHASE": phase,
708 "GANETI_HOOKS_PATH": hpath,
712 env["GANETI_OBJECT_TYPE"] = self.htype
714 if self.cluster_name is not None:
715 env["GANETI_CLUSTER"] = self.cluster_name
717 if self.master_name is not None:
718 env["GANETI_MASTER"] = self.master_name
721 env = utils.algo.JoinDisjointDicts(env, phase_env)
723 # Convert everything to strings
724 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
726 assert compat.all(key == "PATH" or key.startswith("GANETI_")
729 return self.hooks_execution_fn(node_list, hpath, phase, env)
731 def RunPhase(self, phase, nodes=None):
732 """Run all the scripts for a phase.
734 This is the main function of the HookMaster.
735 It executes self.hooks_execution_fn, and after running
736 self.hooks_results_adapt_fn on its results it expects them to be in the form
737 {node_name: (fail_msg, [(script, result, output), ...]}).
739 @param phase: one of L{constants.HOOKS_PHASE_POST} or
740 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
741 @param nodes: overrides the predefined list of nodes for the given phase
742 @return: the processed results of the hooks multi-node rpc call
743 @raise errors.HooksFailure: on communication failure to the nodes
744 @raise errors.HooksAbort: on failure of one of the hooks
747 if phase == constants.HOOKS_PHASE_PRE:
749 nodes = self.pre_nodes
751 elif phase == constants.HOOKS_PHASE_POST:
753 nodes = self.post_nodes
754 env = self._BuildEnv(phase)
756 raise AssertionError("Unknown phase '%s'" % phase)
759 # empty node list, we should not attempt to run this as either
760 # we're in the cluster init phase and the rpc client part can't
761 # even attempt to run, or this LU doesn't do hooks at all
764 results = self._RunWrapper(nodes, self.hooks_path, phase, env)
766 msg = "Communication Failure"
767 if phase == constants.HOOKS_PHASE_PRE:
768 raise errors.HooksFailure(msg)
773 converted_res = results
774 if self.hooks_results_adapt_fn:
775 converted_res = self.hooks_results_adapt_fn(results)
778 for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
783 self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
786 for script, hkr, output in hooks_results:
787 if hkr == constants.HKR_FAIL:
788 if phase == constants.HOOKS_PHASE_PRE:
789 errs.append((node_name, script, output))
792 output = "(no output)"
793 self.log_fn("On %s script %s failed, output: %s" %
794 (node_name, script, output))
796 if errs and phase == constants.HOOKS_PHASE_PRE:
797 raise errors.HooksAbort(errs)
801 def RunConfigUpdate(self):
802 """Run the special configuration update hook
804 This is a special hook that runs only on the master after each
805 top-level LI if the configuration has been updated.
808 phase = constants.HOOKS_PHASE_POST
809 hpath = constants.HOOKS_NAME_CFGUPDATE
810 nodes = [self.master_name]
811 self._RunWrapper(nodes, hpath, phase, self.pre_env)
814 def BuildFromLu(hooks_execution_fn, lu):
818 nodes = map(frozenset, lu.BuildHooksNodes())
820 master_name = cluster_name = None
822 master_name = lu.cfg.GetMasterNode()
823 cluster_name = lu.cfg.GetClusterName()
825 return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
826 _RpcResultsToHooksResults, lu.BuildHooksEnv,
827 lu.LogWarning, lu.HTYPE, cluster_name, master_name)