4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
33 from ganeti import opcodes
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import rpc
37 from ganeti import cmdlib
38 from ganeti import locking
41 class Processor(object):
42 """Object which runs OpCodes"""
45 opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
46 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49 opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
50 opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53 opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
54 opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
56 opcodes.OpAddNode: cmdlib.LUAddNode,
57 opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59 opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
60 opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
61 opcodes.OpRemoveNode: cmdlib.LURemoveNode,
62 opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
63 opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
64 opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
65 opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
67 opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
68 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
69 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
70 opcodes.OpRenameInstance: cmdlib.LURenameInstance,
71 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
72 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
73 opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
74 opcodes.OpRebootInstance: cmdlib.LURebootInstance,
75 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
76 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
77 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
78 opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
79 opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
80 opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
81 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
82 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
83 opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
85 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
87 opcodes.OpQueryExports: cmdlib.LUQueryExports,
88 opcodes.OpExportInstance: cmdlib.LUExportInstance,
89 opcodes.OpRemoveExport: cmdlib.LURemoveExport,
91 opcodes.OpGetTags: cmdlib.LUGetTags,
92 opcodes.OpSearchTags: cmdlib.LUSearchTags,
93 opcodes.OpAddTags: cmdlib.LUAddTags,
94 opcodes.OpDelTags: cmdlib.LUDelTags,
96 opcodes.OpTestDelay: cmdlib.LUTestDelay,
97 opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
100 def __init__(self, context):
101 """Constructor for Processor
104 - feedback_fn: the feedback function (taking one string) to be run when
105 interesting events are happening
107 self.context = context
108 self._feedback_fn = None
109 self.exclusive_BGL = False
110 self.rpc = rpc.RpcRunner(context.cfg)
112 def _ExecLU(self, lu):
113 """Logical Unit execution sequence.
116 write_count = self.context.cfg.write_count
118 hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
119 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
120 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
121 self._feedback_fn, None)
123 if getattr(lu.op, "dry_run", False):
124 # in this mode, no post-hooks are run, and the config is not
125 # written (as it might have been modified by another LU, and we
126 # shouldn't do writeout on behalf of other threads
127 self.LogInfo("dry-run mode requested, not actually executing"
129 return lu.dry_run_result
132 result = lu.Exec(self._feedback_fn)
133 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
134 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
135 self._feedback_fn, result)
137 # FIXME: This needs locks if not lu_class.REQ_BGL
138 if write_count != self.context.cfg.write_count:
143 def _LockAndExecLU(self, lu, level):
144 """Execute a Logical Unit, with the needed locks.
146 This is a recursive function that starts locking the given level, and
147 proceeds up, till there are no more locks to acquire. Then it executes the
148 given LU and its opcodes.
151 adding_locks = level in lu.add_locks
152 acquiring_locks = level in lu.needed_locks
153 if level not in locking.LEVELS:
154 if callable(self._run_notifier):
156 result = self._ExecLU(lu)
157 elif adding_locks and acquiring_locks:
158 # We could both acquire and add locks at the same level, but for now we
159 # don't need this, so we'll avoid the complicated code needed.
160 raise NotImplementedError(
161 "Can't declare locks to acquire when adding others")
162 elif adding_locks or acquiring_locks:
163 lu.DeclareLocks(level)
164 share = lu.share_locks[level]
166 needed_locks = lu.needed_locks[level]
167 lu.acquired_locks[level] = self.context.glm.acquire(level,
171 add_locks = lu.add_locks[level]
172 lu.remove_locks[level] = add_locks
174 self.context.glm.add(level, add_locks, acquired=1, shared=share)
175 except errors.LockError:
176 raise errors.OpPrereqError(
177 "Couldn't add locks (%s), probably because of a race condition"
178 " with another job, who added them first" % add_locks)
182 lu.acquired_locks[level] = add_locks
183 result = self._LockAndExecLU(lu, level + 1)
185 if level in lu.remove_locks:
186 self.context.glm.remove(level, lu.remove_locks[level])
188 if self.context.glm.is_owned(level):
189 self.context.glm.release(level)
191 result = self._LockAndExecLU(lu, level + 1)
195 def ExecOpCode(self, op, feedback_fn, run_notifier):
196 """Execute an opcode.
198 @type op: an OpCode instance
199 @param op: the opcode to be executed
200 @type feedback_fn: a function that takes a single argument
201 @param feedback_fn: this function will be used as feedback from the LU
203 @type run_notifier: callable (no arguments) or None
204 @param run_notifier: this function (if callable) will be called when
205 we are about to call the lu's Exec() method, that
206 is, after we have acquired all locks
209 if not isinstance(op, opcodes.OpCode):
210 raise errors.ProgrammerError("Non-opcode instance passed"
213 self._feedback_fn = feedback_fn
214 self._run_notifier = run_notifier
215 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
217 raise errors.OpCodeUnknown("Unknown opcode")
219 # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
220 # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
221 self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
222 shared=not lu_class.REQ_BGL)
224 self.exclusive_BGL = lu_class.REQ_BGL
225 lu = lu_class(self, op, self.context, self.rpc)
227 assert lu.needed_locks is not None, "needed_locks not set by LU"
228 result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
230 self.context.glm.release(locking.LEVEL_CLUSTER)
231 self.exclusive_BGL = False
235 def LogStep(self, current, total, message):
236 """Log a change in LU execution progress.
239 logging.debug("Step %d/%d %s", current, total, message)
240 self._feedback_fn("STEP %d/%d %s" % (current, total, message))
242 def LogWarning(self, message, *args, **kwargs):
243 """Log a warning to the logs and the user.
245 The optional keyword argument is 'hint' and can be used to show a
246 hint to the user (presumably related to the warning). If the
247 message is empty, it will not be printed at all, allowing one to
251 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
252 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
254 message = message % tuple(args)
256 logging.warning(message)
257 self._feedback_fn(" - WARNING: %s" % message)
259 self._feedback_fn(" Hint: %s" % kwargs["hint"])
261 def LogInfo(self, message, *args):
262 """Log an informational message to the logs and the user.
266 message = message % tuple(args)
267 logging.info(message)
268 self._feedback_fn(" - INFO: %s" % message)
271 class HooksMaster(object):
274 This class distributes the run commands to the nodes based on the
277 In order to remove the direct dependency on the rpc module, the
278 constructor needs a function which actually does the remote
279 call. This will usually be rpc.call_hooks_runner, but any function
280 which behaves the same works.
283 def __init__(self, callfn, proc, lu):
288 self.env, node_list_pre, node_list_post = self._BuildEnv()
289 self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
290 constants.HOOKS_PHASE_POST: node_list_post}
293 """Compute the environment and the target nodes.
295 Based on the opcode and the current node list, this builds the
296 environment for the hooks and the target node list for the run.
300 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
301 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
302 "GANETI_OP_CODE": self.op.OP_ID,
303 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
304 "GANETI_DATA_DIR": constants.DATA_DIR,
307 if self.lu.HPATH is not None:
308 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
311 env["GANETI_" + key] = lu_env[key]
313 lu_nodes_pre = lu_nodes_post = []
315 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
317 def _RunWrapper(self, node_list, hpath, phase):
318 """Simple wrapper over self.callfn.
320 This method fixes the environment before doing the rpc call.
323 env = self.env.copy()
324 env["GANETI_HOOKS_PHASE"] = phase
325 env["GANETI_HOOKS_PATH"] = hpath
326 if self.lu.cfg is not None:
327 env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
328 env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
330 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
332 return self.callfn(node_list, hpath, phase, env)
334 def RunPhase(self, phase):
335 """Run all the scripts for a phase.
337 This is the main function of the HookMaster.
339 @param phase: one of L{constants.HOOKS_PHASE_POST} or
340 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
341 @return: the processed results of the hooks multi-node rpc call
342 @raise errors.HooksFailure: on communication failure to the nodes
345 if not self.node_list[phase]:
346 # empty node list, we should not attempt to run this as either
347 # we're in the cluster init phase and the rpc client part can't
348 # even attempt to run, or this LU doesn't do hooks at all
350 hpath = self.lu.HPATH
351 results = self._RunWrapper(self.node_list[phase], hpath, phase)
352 if phase == constants.HOOKS_PHASE_PRE:
355 raise errors.HooksFailure("Communication failure")
356 for node_name in results:
357 res = results[node_name]
360 msg = res.RemoteFailMsg()
362 self.proc.LogWarning("Communication failure to node %s: %s",
365 for script, hkr, output in res.payload:
366 if hkr == constants.HKR_FAIL:
367 errs.append((node_name, script, output))
369 raise errors.HooksAbort(errs)
372 def RunConfigUpdate(self):
373 """Run the special configuration update hook
375 This is a special hook that runs only on the master after each
376 top-level LI if the configuration has been updated.
379 phase = constants.HOOKS_PHASE_POST
380 hpath = constants.HOOKS_NAME_CFGUPDATE
381 nodes = [self.lu.cfg.GetMasterNode()]
382 self._RunWrapper(nodes, hpath, phase)