4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
36 from ganeti import opcodes
37 from ganeti import constants
38 from ganeti import errors
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import compat
43 from ganeti import pathutils
50 class LockAcquireTimeout(Exception):
51 """Exception to report timeouts on acquiring locks.
56 def _CalculateLockAttemptTimeouts():
57 """Calculate timeouts for lock attempts.
60 result = [constants.LOCK_ATTEMPTS_MINWAIT]
61 running_sum = result[0]
63 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
65 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
66 timeout = (result[-1] * 1.05) ** 1.25
68 # Cap max timeout. This gives other jobs a chance to run even if
69 # we're still trying to get our locks, before finally moving to a
71 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
72 # And also cap the lower boundary for safety
73 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
75 result.append(timeout)
76 running_sum += timeout
81 class LockAttemptTimeoutStrategy(object):
82 """Class with lock acquire timeout strategy.
91 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
93 def __init__(self, _time_fn=time.time, _random_fn=random.random):
94 """Initializes this class.
96 @param _time_fn: Time function for unittests
97 @param _random_fn: Random number generator for unittests
100 object.__init__(self)
102 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
103 self._time_fn = _time_fn
104 self._random_fn = _random_fn
106 def NextAttempt(self):
107 """Returns the timeout for the next attempt.
111 timeout = self._timeouts.next()
112 except StopIteration:
113 # No more timeouts, do blocking acquire
116 if timeout is not None:
117 # Add a small variation (-/+ 5%) to timeout. This helps in situations
118 # where two or more jobs are fighting for the same lock(s).
119 variation_range = timeout * 0.1
120 timeout += ((self._random_fn() * variation_range) -
121 (variation_range * 0.5))
126 class OpExecCbBase: # pylint: disable=W0232
127 """Base class for OpCode execution callbacks.
130 def NotifyStart(self):
131 """Called when we are about to execute the LU.
133 This function is called when we're about to start the lu's Exec() method,
134 that is, after we have acquired all locks.
138 def Feedback(self, *args):
139 """Sends feedback from the LU code to the end-user.
143 def CheckCancel(self):
144 """Check whether job has been cancelled.
148 def SubmitManyJobs(self, jobs):
149 """Submits jobs for processing.
151 See L{jqueue.JobQueue.SubmitManyJobs}.
154 raise NotImplementedError
157 def _LUNameForOpName(opname):
158 """Computes the LU name for a given OpCode name.
161 assert opname.startswith(_OP_PREFIX), \
162 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
164 return _LU_PREFIX + opname[len(_OP_PREFIX):]
167 def _ComputeDispatchTable():
168 """Computes the opcode-to-lu dispatch table.
171 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
172 for op in opcodes.OP_MAPPING.values()
176 def _SetBaseOpParams(src, defcomment, dst):
177 """Copies basic opcode parameters.
179 @type src: L{opcodes.OpCode}
180 @param src: Source opcode
181 @type defcomment: string
182 @param defcomment: Comment to specify if not already given
183 @type dst: L{opcodes.OpCode}
184 @param dst: Destination opcode
187 if hasattr(src, "debug_level"):
188 dst.debug_level = src.debug_level
190 if (getattr(dst, "priority", None) is None and
191 hasattr(src, "priority")):
192 dst.priority = src.priority
194 if not getattr(dst, opcodes.COMMENT_ATTR, None):
195 dst.comment = defcomment
198 def _ProcessResult(submit_fn, op, result):
199 """Examines opcode result.
201 If necessary, additional processing on the result is done.
204 if isinstance(result, cmdlib.ResultWithJobs):
205 # Copy basic parameters (e.g. priority)
206 map(compat.partial(_SetBaseOpParams, op,
207 "Submitted by %s" % op.OP_ID),
208 itertools.chain(*result.jobs))
211 job_submission = submit_fn(result.jobs)
214 result = result.other
216 assert constants.JOB_IDS_KEY not in result, \
217 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
219 result[constants.JOB_IDS_KEY] = job_submission
224 def _FailingSubmitManyJobs(_):
225 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
228 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
229 " queries) can not submit jobs")
232 def _RpcResultsToHooksResults(rpc_results):
233 """Function to convert RPC results to the format expected by HooksMaster.
235 @type rpc_results: dict(node: L{rpc.RpcResult})
236 @param rpc_results: RPC results
237 @rtype: dict(node: (fail_msg, offline, hooks_results))
238 @return: RPC results unpacked according to the format expected by
242 return dict((node, (rpc_res.fail_msg, rpc_res.offline, rpc_res.payload))
243 for (node, rpc_res) in rpc_results.items())
246 class Processor(object):
247 """Object which runs OpCodes"""
248 DISPATCH_TABLE = _ComputeDispatchTable()
250 def __init__(self, context, ec_id, enable_locks=True):
251 """Constructor for Processor
253 @type context: GanetiContext
254 @param context: global Ganeti context
256 @param ec_id: execution context identifier
259 self.context = context
262 self.rpc = context.rpc
263 self.hmclass = HooksMaster
264 self._enable_locks = enable_locks
266 def _CheckLocksEnabled(self):
267 """Checks if locking is enabled.
269 @raise errors.ProgrammerError: In case locking is not enabled
272 if not self._enable_locks:
273 raise errors.ProgrammerError("Attempted to use disabled locks")
275 def _AcquireLocks(self, level, names, shared, timeout, priority):
276 """Acquires locks via the Ganeti lock manager.
279 @param level: Lock level
280 @type names: list or string
281 @param names: Lock names
283 @param shared: Whether the locks should be acquired in shared mode
284 @type timeout: None or float
285 @param timeout: Timeout for acquiring the locks
286 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
290 self._CheckLocksEnabled()
293 self._cbs.CheckCancel()
295 acquired = self.context.glm.acquire(level, names, shared=shared,
296 timeout=timeout, priority=priority)
299 raise LockAcquireTimeout()
303 def _ExecLU(self, lu):
304 """Logical Unit execution sequence.
307 write_count = self.context.cfg.write_count
310 hm = self.BuildHooksManager(lu)
311 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
312 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
315 if getattr(lu.op, "dry_run", False):
316 # in this mode, no post-hooks are run, and the config is not
317 # written (as it might have been modified by another LU, and we
318 # shouldn't do writeout on behalf of other threads
319 self.LogInfo("dry-run mode requested, not actually executing"
321 return lu.dry_run_result
324 submit_mj_fn = self._cbs.SubmitManyJobs
326 submit_mj_fn = _FailingSubmitManyJobs
329 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
330 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
331 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
334 # FIXME: This needs locks if not lu_class.REQ_BGL
335 if write_count != self.context.cfg.write_count:
340 def BuildHooksManager(self, lu):
341 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
343 def _LockAndExecLU(self, lu, level, calc_timeout, priority):
344 """Execute a Logical Unit, with the needed locks.
346 This is a recursive function that starts locking the given level, and
347 proceeds up, till there are no more locks to acquire. Then it executes the
348 given LU and its opcodes.
351 adding_locks = level in lu.add_locks
352 acquiring_locks = level in lu.needed_locks
353 if level not in locking.LEVELS:
355 self._cbs.NotifyStart()
357 result = self._ExecLU(lu)
359 elif adding_locks and acquiring_locks:
360 # We could both acquire and add locks at the same level, but for now we
361 # don't need this, so we'll avoid the complicated code needed.
362 raise NotImplementedError("Can't declare locks to acquire when adding"
365 elif adding_locks or acquiring_locks:
366 self._CheckLocksEnabled()
368 lu.DeclareLocks(level)
369 share = lu.share_locks[level]
372 assert adding_locks ^ acquiring_locks, \
373 "Locks must be either added or acquired"
377 needed_locks = lu.needed_locks[level]
379 self._AcquireLocks(level, needed_locks, share,
380 calc_timeout(), priority)
383 add_locks = lu.add_locks[level]
384 lu.remove_locks[level] = add_locks
387 self.context.glm.add(level, add_locks, acquired=1, shared=share)
388 except errors.LockError:
389 raise errors.OpPrereqError(
390 "Couldn't add locks (%s), probably because of a race condition"
391 " with another job, who added them first" % add_locks,
395 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
397 if level in lu.remove_locks:
398 self.context.glm.remove(level, lu.remove_locks[level])
400 if self.context.glm.is_owned(level):
401 self.context.glm.release(level)
404 result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
408 def ExecOpCode(self, op, cbs, timeout=None, priority=None):
409 """Execute an opcode.
411 @type op: an OpCode instance
412 @param op: the opcode to be executed
413 @type cbs: L{OpExecCbBase}
414 @param cbs: Runtime callbacks
415 @type timeout: float or None
416 @param timeout: Maximum time to acquire all locks, None for no timeout
417 @type priority: number or None
418 @param priority: Priority for acquiring lock(s)
419 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
423 if not isinstance(op, opcodes.OpCode):
424 raise errors.ProgrammerError("Non-opcode instance passed"
425 " to ExecOpcode (%s)" % type(op))
427 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
429 raise errors.OpCodeUnknown("Unknown opcode")
432 calc_timeout = lambda: None
434 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
438 if self._enable_locks:
439 # Acquire the Big Ganeti Lock exclusively if this LU requires it,
440 # and in a shared fashion otherwise (to prevent concurrent run with
442 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
443 not lu_class.REQ_BGL, calc_timeout(),
445 elif lu_class.REQ_BGL:
446 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
447 " disabled" % op.OP_ID)
450 lu = lu_class(self, op, self.context, self.rpc)
452 assert lu.needed_locks is not None, "needed_locks not set by LU"
455 result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
459 self.context.cfg.DropECReservations(self._ec_id)
461 # Release BGL if owned
462 if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
463 assert self._enable_locks
464 self.context.glm.release(locking.LEVEL_CLUSTER)
468 resultcheck_fn = op.OP_RESULT
469 if not (resultcheck_fn is None or resultcheck_fn(result)):
470 logging.error("Expected opcode result matching %s, got %s",
471 resultcheck_fn, result)
472 raise errors.OpResultError("Opcode result does not match %s: %s" %
473 (resultcheck_fn, utils.Truncate(result, 80)))
477 def Log(self, *args):
478 """Forward call to feedback callback function.
482 self._cbs.Feedback(*args)
484 def LogStep(self, current, total, message):
485 """Log a change in LU execution progress.
488 logging.debug("Step %d/%d %s", current, total, message)
489 self.Log("STEP %d/%d %s" % (current, total, message))
491 def LogWarning(self, message, *args, **kwargs):
492 """Log a warning to the logs and the user.
494 The optional keyword argument is 'hint' and can be used to show a
495 hint to the user (presumably related to the warning). If the
496 message is empty, it will not be printed at all, allowing one to
500 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
501 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
503 message = message % tuple(args)
505 logging.warning(message)
506 self.Log(" - WARNING: %s" % message)
508 self.Log(" Hint: %s" % kwargs["hint"])
510 def LogInfo(self, message, *args):
511 """Log an informational message to the logs and the user.
515 message = message % tuple(args)
516 logging.info(message)
517 self.Log(" - INFO: %s" % message)
520 """Returns the current execution context ID.
524 raise errors.ProgrammerError("Tried to use execution context id when"
529 class HooksMaster(object):
530 def __init__(self, opcode, hooks_path, nodes, hooks_execution_fn,
531 hooks_results_adapt_fn, build_env_fn, log_fn, htype=None,
532 cluster_name=None, master_name=None):
533 """Base class for hooks masters.
535 This class invokes the execution of hooks according to the behaviour
536 specified by its parameters.
539 @param opcode: opcode of the operation to which the hooks are tied
540 @type hooks_path: string
541 @param hooks_path: prefix of the hooks directories
542 @type nodes: 2-tuple of lists
543 @param nodes: 2-tuple of lists containing nodes on which pre-hooks must be
544 run and nodes on which post-hooks must be run
545 @type hooks_execution_fn: function that accepts the following parameters:
546 (node_list, hooks_path, phase, environment)
547 @param hooks_execution_fn: function that will execute the hooks; can be
548 None, indicating that no conversion is necessary.
549 @type hooks_results_adapt_fn: function
550 @param hooks_results_adapt_fn: function that will adapt the return value of
551 hooks_execution_fn to the format expected by RunPhase
552 @type build_env_fn: function that returns a dictionary having strings as
554 @param build_env_fn: function that builds the environment for the hooks
555 @type log_fn: function that accepts a string
556 @param log_fn: logging function
557 @type htype: string or None
558 @param htype: None or one of L{constants.HTYPE_CLUSTER},
559 L{constants.HTYPE_NODE}, L{constants.HTYPE_INSTANCE}
560 @type cluster_name: string
561 @param cluster_name: name of the cluster
562 @type master_name: string
563 @param master_name: name of the master
567 self.hooks_path = hooks_path
568 self.hooks_execution_fn = hooks_execution_fn
569 self.hooks_results_adapt_fn = hooks_results_adapt_fn
570 self.build_env_fn = build_env_fn
573 self.cluster_name = cluster_name
574 self.master_name = master_name
576 self.pre_env = self._BuildEnv(constants.HOOKS_PHASE_PRE)
577 (self.pre_nodes, self.post_nodes) = nodes
579 def _BuildEnv(self, phase):
580 """Compute the environment and the target nodes.
582 Based on the opcode and the current node list, this builds the
583 environment for the hooks and the target node list for the run.
586 if phase == constants.HOOKS_PHASE_PRE:
588 elif phase == constants.HOOKS_PHASE_POST:
589 prefix = "GANETI_POST_"
591 raise AssertionError("Unknown phase '%s'" % phase)
595 if self.hooks_path is not None:
596 phase_env = self.build_env_fn()
598 assert not compat.any(key.upper().startswith(prefix)
599 for key in phase_env)
600 env.update(("%s%s" % (prefix, key), value)
601 for (key, value) in phase_env.items())
603 if phase == constants.HOOKS_PHASE_PRE:
604 assert compat.all((key.startswith("GANETI_") and
605 not key.startswith("GANETI_POST_"))
608 elif phase == constants.HOOKS_PHASE_POST:
609 assert compat.all(key.startswith("GANETI_POST_") for key in env)
610 assert isinstance(self.pre_env, dict)
612 # Merge with pre-phase environment
613 assert not compat.any(key.startswith("GANETI_POST_")
614 for key in self.pre_env)
615 env.update(self.pre_env)
617 raise AssertionError("Unknown phase '%s'" % phase)
621 def _RunWrapper(self, node_list, hpath, phase, phase_env):
622 """Simple wrapper over self.callfn.
624 This method fixes the environment before executing the hooks.
628 "PATH": constants.HOOKS_PATH,
629 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
630 "GANETI_OP_CODE": self.opcode,
631 "GANETI_DATA_DIR": pathutils.DATA_DIR,
632 "GANETI_HOOKS_PHASE": phase,
633 "GANETI_HOOKS_PATH": hpath,
637 env["GANETI_OBJECT_TYPE"] = self.htype
639 if self.cluster_name is not None:
640 env["GANETI_CLUSTER"] = self.cluster_name
642 if self.master_name is not None:
643 env["GANETI_MASTER"] = self.master_name
646 env = utils.algo.JoinDisjointDicts(env, phase_env)
648 # Convert everything to strings
649 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
651 assert compat.all(key == "PATH" or key.startswith("GANETI_")
654 return self.hooks_execution_fn(node_list, hpath, phase, env)
656 def RunPhase(self, phase, nodes=None):
657 """Run all the scripts for a phase.
659 This is the main function of the HookMaster.
660 It executes self.hooks_execution_fn, and after running
661 self.hooks_results_adapt_fn on its results it expects them to be in the form
662 {node_name: (fail_msg, [(script, result, output), ...]}).
664 @param phase: one of L{constants.HOOKS_PHASE_POST} or
665 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
666 @param nodes: overrides the predefined list of nodes for the given phase
667 @return: the processed results of the hooks multi-node rpc call
668 @raise errors.HooksFailure: on communication failure to the nodes
669 @raise errors.HooksAbort: on failure of one of the hooks
672 if phase == constants.HOOKS_PHASE_PRE:
674 nodes = self.pre_nodes
676 elif phase == constants.HOOKS_PHASE_POST:
678 nodes = self.post_nodes
679 env = self._BuildEnv(phase)
681 raise AssertionError("Unknown phase '%s'" % phase)
684 # empty node list, we should not attempt to run this as either
685 # we're in the cluster init phase and the rpc client part can't
686 # even attempt to run, or this LU doesn't do hooks at all
689 results = self._RunWrapper(nodes, self.hooks_path, phase, env)
691 msg = "Communication Failure"
692 if phase == constants.HOOKS_PHASE_PRE:
693 raise errors.HooksFailure(msg)
698 converted_res = results
699 if self.hooks_results_adapt_fn:
700 converted_res = self.hooks_results_adapt_fn(results)
703 for node_name, (fail_msg, offline, hooks_results) in converted_res.items():
708 self.log_fn("Communication failure to node %s: %s", node_name, fail_msg)
711 for script, hkr, output in hooks_results:
712 if hkr == constants.HKR_FAIL:
713 if phase == constants.HOOKS_PHASE_PRE:
714 errs.append((node_name, script, output))
717 output = "(no output)"
718 self.log_fn("On %s script %s failed, output: %s" %
719 (node_name, script, output))
721 if errs and phase == constants.HOOKS_PHASE_PRE:
722 raise errors.HooksAbort(errs)
726 def RunConfigUpdate(self):
727 """Run the special configuration update hook
729 This is a special hook that runs only on the master after each
730 top-level LI if the configuration has been updated.
733 phase = constants.HOOKS_PHASE_POST
734 hpath = constants.HOOKS_NAME_CFGUPDATE
735 nodes = [self.master_name]
736 self._RunWrapper(nodes, hpath, phase, self.pre_env)
739 def BuildFromLu(hooks_execution_fn, lu):
743 nodes = map(frozenset, lu.BuildHooksNodes())
745 master_name = cluster_name = None
747 master_name = lu.cfg.GetMasterNode()
748 cluster_name = lu.cfg.GetClusterName()
750 return HooksMaster(lu.op.OP_ID, lu.HPATH, nodes, hooks_execution_fn,
751 _RpcResultsToHooksResults, lu.BuildHooksEnv,
752 lu.LogWarning, lu.HTYPE, cluster_name, master_name)