4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
43 class _LockAcquireTimeout(Exception):
44 """Internal exception to report timeouts on acquiring locks.
49 def _CalculateLockAttemptTimeouts():
50 """Calculate timeouts for lock attempts.
55 # Wait for a total of at least 150s before doing a blocking acquire
56 while sum(result) < 150.0:
57 timeout = (result[-1] * 1.05) ** 1.25
59 # Cap timeout at 10 seconds. This gives other jobs a chance to run
60 # even if we're still trying to get our locks, before finally moving
61 # to a blocking acquire.
66 # Lower boundary for safety
69 result.append(timeout)
74 class _LockAttemptTimeoutStrategy(object):
75 """Class with lock acquire timeout strategy.
86 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
88 def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
89 """Initializes this class.
92 @param attempt: Current attempt number
93 @param _time_fn: Time function for unittests
94 @param _random_fn: Random number generator for unittests
100 raise ValueError("Attempt must be zero or positive")
102 self._attempt = attempt
103 self._time_fn = _time_fn
104 self._random_fn = _random_fn
107 timeout = self._TIMEOUT_PER_ATTEMPT[attempt]
109 # No more timeouts, do blocking acquire
112 self._running_timeout = locking.RunningTimeout(timeout, False,
115 def NextAttempt(self):
116 """Returns the strategy for the next attempt.
119 return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
120 _time_fn=self._time_fn,
121 _random_fn=self._random_fn)
123 def CalcRemainingTimeout(self):
124 """Returns the remaining timeout.
127 timeout = self._running_timeout.Remaining()
129 if timeout is not None:
130 # Add a small variation (-/+ 5%) to timeout. This helps in situations
131 # where two or more jobs are fighting for the same lock(s).
132 variation_range = timeout * 0.1
133 timeout += ((self._random_fn() * variation_range) -
134 (variation_range * 0.5))
139 class OpExecCbBase: # pylint: disable-msg=W0232
140 """Base class for OpCode execution callbacks.
143 def NotifyStart(self):
144 """Called when we are about to execute the LU.
146 This function is called when we're about to start the lu's Exec() method,
147 that is, after we have acquired all locks.
151 def Feedback(self, *args):
152 """Sends feedback from the LU code to the end-user.
156 def ReportLocks(self, msg):
157 """Report lock operations.
162 class Processor(object):
163 """Object which runs OpCodes"""
166 opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
167 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
168 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
169 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
170 opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
171 opcodes.OpRenameCluster: cmdlib.LURenameCluster,
172 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
173 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
174 opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
175 opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
177 opcodes.OpAddNode: cmdlib.LUAddNode,
178 opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
179 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
180 opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
181 opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
182 opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
183 opcodes.OpRemoveNode: cmdlib.LURemoveNode,
184 opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
185 opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
186 opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
187 opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
188 opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy,
190 opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
191 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
192 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
193 opcodes.OpRenameInstance: cmdlib.LURenameInstance,
194 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
195 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
196 opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
197 opcodes.OpRebootInstance: cmdlib.LURebootInstance,
198 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
199 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
200 opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
201 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
202 opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
203 opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
204 opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
205 opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
206 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
207 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
208 opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
210 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
212 opcodes.OpQueryExports: cmdlib.LUQueryExports,
213 opcodes.OpExportInstance: cmdlib.LUExportInstance,
214 opcodes.OpRemoveExport: cmdlib.LURemoveExport,
216 opcodes.OpGetTags: cmdlib.LUGetTags,
217 opcodes.OpSearchTags: cmdlib.LUSearchTags,
218 opcodes.OpAddTags: cmdlib.LUAddTags,
219 opcodes.OpDelTags: cmdlib.LUDelTags,
221 opcodes.OpTestDelay: cmdlib.LUTestDelay,
222 opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
225 def __init__(self, context, ec_id):
226 """Constructor for Processor
228 @type context: GanetiContext
229 @param context: global Ganeti context
231 @param ec_id: execution context identifier
234 self.context = context
237 self.rpc = rpc.RpcRunner(context.cfg)
238 self.hmclass = HooksMaster
240 def _ReportLocks(self, level, names, shared, timeout, acquired, result):
241 """Reports lock operations.
244 @param level: Lock level
245 @type names: list or string
246 @param names: Lock names
248 @param shared: Whether the locks should be acquired in shared mode
249 @type timeout: None or float
250 @param timeout: Timeout for acquiring the locks
252 @param acquired: Whether the locks have already been acquired
253 @type result: None or set
254 @param result: Result from L{locking.GanetiLockManager.acquire}
262 parts.append("timeout")
264 parts.append("acquired")
266 parts.append("waiting")
268 parts.append("blocking")
270 parts.append("timeout=%0.6fs" % timeout)
272 parts.append(locking.LEVEL_NAMES[level])
274 if names == locking.ALL_SET:
276 elif isinstance(names, basestring):
279 parts.append(",".join(sorted(names)))
282 parts.append("shared")
284 parts.append("exclusive")
286 msg = "/".join(parts)
288 logging.debug("LU locks %s", msg)
291 self._cbs.ReportLocks(msg)
293 def _AcquireLocks(self, level, names, shared, timeout):
294 """Acquires locks via the Ganeti lock manager.
297 @param level: Lock level
298 @type names: list or string
299 @param names: Lock names
301 @param shared: Whether the locks should be acquired in shared mode
302 @type timeout: None or float
303 @param timeout: Timeout for acquiring the locks
306 self._ReportLocks(level, names, shared, timeout, False, None)
308 acquired = self.context.glm.acquire(level, names, shared=shared,
311 self._ReportLocks(level, names, shared, timeout, True, acquired)
315 def _ExecLU(self, lu):
316 """Logical Unit execution sequence.
319 write_count = self.context.cfg.write_count
321 hm = HooksMaster(self.rpc.call_hooks_runner, lu)
322 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
323 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
324 self._Feedback, None)
326 if getattr(lu.op, "dry_run", False):
327 # in this mode, no post-hooks are run, and the config is not
328 # written (as it might have been modified by another LU, and we
329 # shouldn't do writeout on behalf of other threads
330 self.LogInfo("dry-run mode requested, not actually executing"
332 return lu.dry_run_result
335 result = lu.Exec(self._Feedback)
336 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
337 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
338 self._Feedback, result)
340 # FIXME: This needs locks if not lu_class.REQ_BGL
341 if write_count != self.context.cfg.write_count:
346 def _LockAndExecLU(self, lu, level, calc_timeout):
347 """Execute a Logical Unit, with the needed locks.
349 This is a recursive function that starts locking the given level, and
350 proceeds up, till there are no more locks to acquire. Then it executes the
351 given LU and its opcodes.
354 adding_locks = level in lu.add_locks
355 acquiring_locks = level in lu.needed_locks
356 if level not in locking.LEVELS:
358 self._cbs.NotifyStart()
360 result = self._ExecLU(lu)
362 elif adding_locks and acquiring_locks:
363 # We could both acquire and add locks at the same level, but for now we
364 # don't need this, so we'll avoid the complicated code needed.
365 raise NotImplementedError("Can't declare locks to acquire when adding"
368 elif adding_locks or acquiring_locks:
369 lu.DeclareLocks(level)
370 share = lu.share_locks[level]
373 assert adding_locks ^ acquiring_locks, \
374 "Locks must be either added or acquired"
378 needed_locks = lu.needed_locks[level]
380 acquired = self._AcquireLocks(level, needed_locks, share,
384 raise _LockAcquireTimeout()
388 add_locks = lu.add_locks[level]
389 lu.remove_locks[level] = add_locks
392 self.context.glm.add(level, add_locks, acquired=1, shared=share)
393 except errors.LockError:
394 raise errors.OpPrereqError(
395 "Couldn't add locks (%s), probably because of a race condition"
396 " with another job, who added them first" % add_locks,
402 lu.acquired_locks[level] = acquired
404 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
406 if level in lu.remove_locks:
407 self.context.glm.remove(level, lu.remove_locks[level])
409 if self.context.glm.is_owned(level):
410 self.context.glm.release(level)
413 result = self._LockAndExecLU(lu, level + 1, calc_timeout)
417 def ExecOpCode(self, op, cbs):
418 """Execute an opcode.
420 @type op: an OpCode instance
421 @param op: the opcode to be executed
422 @type cbs: L{OpExecCbBase}
423 @param cbs: Runtime callbacks
426 if not isinstance(op, opcodes.OpCode):
427 raise errors.ProgrammerError("Non-opcode instance passed"
432 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
434 raise errors.OpCodeUnknown("Unknown opcode")
436 timeout_strategy = _LockAttemptTimeoutStrategy()
440 acquire_timeout = timeout_strategy.CalcRemainingTimeout()
442 # Acquire the Big Ganeti Lock exclusively if this LU requires it,
443 # and in a shared fashion otherwise (to prevent concurrent run with
445 if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
446 not lu_class.REQ_BGL, acquire_timeout) is None:
447 raise _LockAcquireTimeout()
450 lu = lu_class(self, op, self.context, self.rpc)
452 assert lu.needed_locks is not None, "needed_locks not set by LU"
455 return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
456 timeout_strategy.CalcRemainingTimeout)
459 self.context.cfg.DropECReservations(self._ec_id)
462 self.context.glm.release(locking.LEVEL_CLUSTER)
464 except _LockAcquireTimeout:
465 # Timeout while waiting for lock, try again
468 timeout_strategy = timeout_strategy.NextAttempt()
473 def _Feedback(self, *args):
474 """Forward call to feedback callback function.
478 self._cbs.Feedback(*args)
480 def LogStep(self, current, total, message):
481 """Log a change in LU execution progress.
484 logging.debug("Step %d/%d %s", current, total, message)
485 self._Feedback("STEP %d/%d %s" % (current, total, message))
487 def LogWarning(self, message, *args, **kwargs):
488 """Log a warning to the logs and the user.
490 The optional keyword argument is 'hint' and can be used to show a
491 hint to the user (presumably related to the warning). If the
492 message is empty, it will not be printed at all, allowing one to
496 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
497 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
499 message = message % tuple(args)
501 logging.warning(message)
502 self._Feedback(" - WARNING: %s" % message)
504 self._Feedback(" Hint: %s" % kwargs["hint"])
506 def LogInfo(self, message, *args):
507 """Log an informational message to the logs and the user.
511 message = message % tuple(args)
512 logging.info(message)
513 self._Feedback(" - INFO: %s" % message)
517 errors.ProgrammerError("Tried to use execution context id when not set")
521 class HooksMaster(object):
524 This class distributes the run commands to the nodes based on the
527 In order to remove the direct dependency on the rpc module, the
528 constructor needs a function which actually does the remote
529 call. This will usually be rpc.call_hooks_runner, but any function
530 which behaves the same works.
533 def __init__(self, callfn, lu):
537 self.env, node_list_pre, node_list_post = self._BuildEnv()
538 self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
539 constants.HOOKS_PHASE_POST: node_list_post}
542 """Compute the environment and the target nodes.
544 Based on the opcode and the current node list, this builds the
545 environment for the hooks and the target node list for the run.
549 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
550 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
551 "GANETI_OP_CODE": self.op.OP_ID,
552 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
553 "GANETI_DATA_DIR": constants.DATA_DIR,
556 if self.lu.HPATH is not None:
557 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
560 env["GANETI_" + key] = lu_env[key]
562 lu_nodes_pre = lu_nodes_post = []
564 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
566 def _RunWrapper(self, node_list, hpath, phase):
567 """Simple wrapper over self.callfn.
569 This method fixes the environment before doing the rpc call.
572 env = self.env.copy()
573 env["GANETI_HOOKS_PHASE"] = phase
574 env["GANETI_HOOKS_PATH"] = hpath
575 if self.lu.cfg is not None:
576 env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
577 env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
579 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
581 return self.callfn(node_list, hpath, phase, env)
583 def RunPhase(self, phase, nodes=None):
584 """Run all the scripts for a phase.
586 This is the main function of the HookMaster.
588 @param phase: one of L{constants.HOOKS_PHASE_POST} or
589 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
590 @param nodes: overrides the predefined list of nodes for the given phase
591 @return: the processed results of the hooks multi-node rpc call
592 @raise errors.HooksFailure: on communication failure to the nodes
593 @raise errors.HooksAbort: on failure of one of the hooks
596 if not self.node_list[phase] and not nodes:
597 # empty node list, we should not attempt to run this as either
598 # we're in the cluster init phase and the rpc client part can't
599 # even attempt to run, or this LU doesn't do hooks at all
601 hpath = self.lu.HPATH
602 if nodes is not None:
603 results = self._RunWrapper(nodes, hpath, phase)
605 results = self._RunWrapper(self.node_list[phase], hpath, phase)
608 msg = "Communication Failure"
609 if phase == constants.HOOKS_PHASE_PRE:
610 raise errors.HooksFailure(msg)
612 self.lu.LogWarning(msg)
614 for node_name in results:
615 res = results[node_name]
620 self.lu.LogWarning("Communication failure to node %s: %s",
623 for script, hkr, output in res.payload:
624 if hkr == constants.HKR_FAIL:
625 if phase == constants.HOOKS_PHASE_PRE:
626 errs.append((node_name, script, output))
629 output = "(no output)"
630 self.lu.LogWarning("On %s script %s failed, output: %s" %
631 (node_name, script, output))
632 if errs and phase == constants.HOOKS_PHASE_PRE:
633 raise errors.HooksAbort(errs)
636 def RunConfigUpdate(self):
637 """Run the special configuration update hook
639 This is a special hook that runs only on the master after each
640 top-level LI if the configuration has been updated.
643 phase = constants.HOOKS_PHASE_POST
644 hpath = constants.HOOKS_NAME_CFGUPDATE
645 nodes = [self.lu.cfg.GetMasterNode()]
646 self._RunWrapper(nodes, hpath, phase)