4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
33 from ganeti import opcodes
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import rpc
37 from ganeti import cmdlib
38 from ganeti import locking
39 from ganeti import utils
43 """Base class for OpCode execution callbacks.
46 def NotifyStart(self):
47 """Called when we are about to execute the LU.
49 This function is called when we're about to start the lu's Exec() method,
50 that is, after we have acquired all locks.
54 def Feedback(self, *args):
55 """Sends feedback from the LU code to the end-user.
59 def ReportLocks(self, msg):
60 """Report lock operations.
65 class Processor(object):
66 """Object which runs OpCodes"""
69 opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
70 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
71 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
72 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
73 opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
74 opcodes.OpRenameCluster: cmdlib.LURenameCluster,
75 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
76 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
77 opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
78 opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
80 opcodes.OpAddNode: cmdlib.LUAddNode,
81 opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
82 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
83 opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
84 opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
85 opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
86 opcodes.OpRemoveNode: cmdlib.LURemoveNode,
87 opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
88 opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
89 opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
90 opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
92 opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
93 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
94 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
95 opcodes.OpRenameInstance: cmdlib.LURenameInstance,
96 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
97 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
98 opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
99 opcodes.OpRebootInstance: cmdlib.LURebootInstance,
100 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
101 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
102 opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
103 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
104 opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
105 opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
106 opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
107 opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
108 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
109 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
110 opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
112 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
114 opcodes.OpQueryExports: cmdlib.LUQueryExports,
115 opcodes.OpExportInstance: cmdlib.LUExportInstance,
116 opcodes.OpRemoveExport: cmdlib.LURemoveExport,
118 opcodes.OpGetTags: cmdlib.LUGetTags,
119 opcodes.OpSearchTags: cmdlib.LUSearchTags,
120 opcodes.OpAddTags: cmdlib.LUAddTags,
121 opcodes.OpDelTags: cmdlib.LUDelTags,
123 opcodes.OpTestDelay: cmdlib.LUTestDelay,
124 opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
127 def __init__(self, context):
128 """Constructor for Processor
131 self.context = context
133 self.exclusive_BGL = False
134 self.rpc = rpc.RpcRunner(context.cfg)
135 self.hmclass = HooksMaster
137 def _ReportLocks(self, level, names, shared, acquired):
138 """Reports lock operations.
141 @param level: Lock level
142 @type names: list or string
143 @param names: Lock names
145 @param shared: Whether the lock should be acquired in shared mode
147 @param acquired: Whether the lock has already been acquired
154 parts.append("acquired")
156 parts.append("waiting")
158 parts.append(locking.LEVEL_NAMES[level])
160 if names == locking.ALL_SET:
162 elif isinstance(names, basestring):
165 parts.append(",".join(names))
168 parts.append("shared")
170 parts.append("exclusive")
172 msg = "/".join(parts)
174 logging.debug("LU locks %s", msg)
177 self._cbs.ReportLocks(msg)
179 def _ExecLU(self, lu):
180 """Logical Unit execution sequence.
183 write_count = self.context.cfg.write_count
185 hm = HooksMaster(self.rpc.call_hooks_runner, lu)
186 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
187 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
188 self._Feedback, None)
190 if getattr(lu.op, "dry_run", False):
191 # in this mode, no post-hooks are run, and the config is not
192 # written (as it might have been modified by another LU, and we
193 # shouldn't do writeout on behalf of other threads
194 self.LogInfo("dry-run mode requested, not actually executing"
196 return lu.dry_run_result
199 result = lu.Exec(self._Feedback)
200 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
201 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
202 self._Feedback, result)
204 # FIXME: This needs locks if not lu_class.REQ_BGL
205 if write_count != self.context.cfg.write_count:
210 def _LockAndExecLU(self, lu, level):
211 """Execute a Logical Unit, with the needed locks.
213 This is a recursive function that starts locking the given level, and
214 proceeds up, till there are no more locks to acquire. Then it executes the
215 given LU and its opcodes.
218 adding_locks = level in lu.add_locks
219 acquiring_locks = level in lu.needed_locks
220 if level not in locking.LEVELS:
222 self._cbs.NotifyStart()
224 result = self._ExecLU(lu)
225 elif adding_locks and acquiring_locks:
226 # We could both acquire and add locks at the same level, but for now we
227 # don't need this, so we'll avoid the complicated code needed.
228 raise NotImplementedError(
229 "Can't declare locks to acquire when adding others")
230 elif adding_locks or acquiring_locks:
231 lu.DeclareLocks(level)
232 share = lu.share_locks[level]
234 needed_locks = lu.needed_locks[level]
236 self._ReportLocks(level, needed_locks, share, False)
237 lu.acquired_locks[level] = self.context.glm.acquire(level,
240 self._ReportLocks(level, needed_locks, share, True)
243 add_locks = lu.add_locks[level]
244 lu.remove_locks[level] = add_locks
246 self.context.glm.add(level, add_locks, acquired=1, shared=share)
247 except errors.LockError:
248 raise errors.OpPrereqError(
249 "Couldn't add locks (%s), probably because of a race condition"
250 " with another job, who added them first" % add_locks)
254 lu.acquired_locks[level] = add_locks
255 result = self._LockAndExecLU(lu, level + 1)
257 if level in lu.remove_locks:
258 self.context.glm.remove(level, lu.remove_locks[level])
260 if self.context.glm.is_owned(level):
261 self.context.glm.release(level)
263 result = self._LockAndExecLU(lu, level + 1)
267 def ExecOpCode(self, op, cbs):
268 """Execute an opcode.
270 @type op: an OpCode instance
271 @param op: the opcode to be executed
272 @type cbs: L{OpExecCbBase}
273 @param cbs: Runtime callbacks
276 if not isinstance(op, opcodes.OpCode):
277 raise errors.ProgrammerError("Non-opcode instance passed"
282 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
284 raise errors.OpCodeUnknown("Unknown opcode")
286 # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
287 # shared fashion otherwise (to prevent concurrent run with an exclusive
289 self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
290 not lu_class.REQ_BGL, False)
292 self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
293 shared=not lu_class.REQ_BGL)
295 self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
296 not lu_class.REQ_BGL, True)
298 self.exclusive_BGL = lu_class.REQ_BGL
299 lu = lu_class(self, op, self.context, self.rpc)
301 assert lu.needed_locks is not None, "needed_locks not set by LU"
302 result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
304 self.context.glm.release(locking.LEVEL_CLUSTER)
305 self.exclusive_BGL = False
311 def _Feedback(self, *args):
312 """Forward call to feedback callback function.
316 self._cbs.Feedback(*args)
318 def LogStep(self, current, total, message):
319 """Log a change in LU execution progress.
322 logging.debug("Step %d/%d %s", current, total, message)
323 self._Feedback("STEP %d/%d %s" % (current, total, message))
325 def LogWarning(self, message, *args, **kwargs):
326 """Log a warning to the logs and the user.
328 The optional keyword argument is 'hint' and can be used to show a
329 hint to the user (presumably related to the warning). If the
330 message is empty, it will not be printed at all, allowing one to
334 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
335 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
337 message = message % tuple(args)
339 logging.warning(message)
340 self._Feedback(" - WARNING: %s" % message)
342 self._Feedback(" Hint: %s" % kwargs["hint"])
344 def LogInfo(self, message, *args):
345 """Log an informational message to the logs and the user.
349 message = message % tuple(args)
350 logging.info(message)
351 self._Feedback(" - INFO: %s" % message)
354 class HooksMaster(object):
357 This class distributes the run commands to the nodes based on the
360 In order to remove the direct dependency on the rpc module, the
361 constructor needs a function which actually does the remote
362 call. This will usually be rpc.call_hooks_runner, but any function
363 which behaves the same works.
366 def __init__(self, callfn, lu):
370 self.env, node_list_pre, node_list_post = self._BuildEnv()
371 self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
372 constants.HOOKS_PHASE_POST: node_list_post}
375 """Compute the environment and the target nodes.
377 Based on the opcode and the current node list, this builds the
378 environment for the hooks and the target node list for the run.
382 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
383 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
384 "GANETI_OP_CODE": self.op.OP_ID,
385 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
386 "GANETI_DATA_DIR": constants.DATA_DIR,
389 if self.lu.HPATH is not None:
390 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
393 env["GANETI_" + key] = lu_env[key]
395 lu_nodes_pre = lu_nodes_post = []
397 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
399 def _RunWrapper(self, node_list, hpath, phase):
400 """Simple wrapper over self.callfn.
402 This method fixes the environment before doing the rpc call.
405 env = self.env.copy()
406 env["GANETI_HOOKS_PHASE"] = phase
407 env["GANETI_HOOKS_PATH"] = hpath
408 if self.lu.cfg is not None:
409 env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
410 env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
412 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
414 return self.callfn(node_list, hpath, phase, env)
416 def RunPhase(self, phase, nodes=None):
417 """Run all the scripts for a phase.
419 This is the main function of the HookMaster.
421 @param phase: one of L{constants.HOOKS_PHASE_POST} or
422 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
423 @param nodes: overrides the predefined list of nodes for the given phase
424 @return: the processed results of the hooks multi-node rpc call
425 @raise errors.HooksFailure: on communication failure to the nodes
426 @raise errors.HooksAbort: on failure of one of the hooks
429 if not self.node_list[phase] and not nodes:
430 # empty node list, we should not attempt to run this as either
431 # we're in the cluster init phase and the rpc client part can't
432 # even attempt to run, or this LU doesn't do hooks at all
434 hpath = self.lu.HPATH
435 if nodes is not None:
436 results = self._RunWrapper(nodes, hpath, phase)
438 results = self._RunWrapper(self.node_list[phase], hpath, phase)
441 msg = "Communication Failure"
442 if phase == constants.HOOKS_PHASE_PRE:
443 raise errors.HooksFailure(msg)
445 self.lu.LogWarning(msg)
447 for node_name in results:
448 res = results[node_name]
453 self.lu.LogWarning("Communication failure to node %s: %s",
456 for script, hkr, output in res.payload:
457 if hkr == constants.HKR_FAIL:
458 if phase == constants.HOOKS_PHASE_PRE:
459 errs.append((node_name, script, output))
462 output = "(no output)"
463 self.lu.LogWarning("On %s script %s failed, output: %s" %
464 (node_name, script, output))
465 if errs and phase == constants.HOOKS_PHASE_PRE:
466 raise errors.HooksAbort(errs)
469 def RunConfigUpdate(self):
470 """Run the special configuration update hook
472 This is a special hook that runs only on the master after each
473 top-level LI if the configuration has been updated.
476 phase = constants.HOOKS_PHASE_POST
477 hpath = constants.HOOKS_NAME_CFGUPDATE
478 nodes = [self.lu.cfg.GetMasterNode()]
479 self._RunWrapper(nodes, hpath, phase)