4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
33 from ganeti import opcodes
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import rpc
37 from ganeti import cmdlib
38 from ganeti import locking
41 class Processor(object):
42 """Object which runs OpCodes"""
45 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48 opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49 opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52 opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
53 opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
55 opcodes.OpAddNode: cmdlib.LUAddNode,
56 opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
57 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
58 opcodes.OpRemoveNode: cmdlib.LURemoveNode,
59 opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
61 opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
62 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
63 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
64 opcodes.OpRenameInstance: cmdlib.LURenameInstance,
65 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
66 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
67 opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
68 opcodes.OpRebootInstance: cmdlib.LURebootInstance,
69 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
70 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
71 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
72 opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
73 opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
74 opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
75 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
76 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
77 opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
79 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
81 opcodes.OpQueryExports: cmdlib.LUQueryExports,
82 opcodes.OpExportInstance: cmdlib.LUExportInstance,
83 opcodes.OpRemoveExport: cmdlib.LURemoveExport,
85 opcodes.OpGetTags: cmdlib.LUGetTags,
86 opcodes.OpSearchTags: cmdlib.LUSearchTags,
87 opcodes.OpAddTags: cmdlib.LUAddTags,
88 opcodes.OpDelTags: cmdlib.LUDelTags,
90 opcodes.OpTestDelay: cmdlib.LUTestDelay,
91 opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
94 def __init__(self, context):
95 """Constructor for Processor
98 - feedback_fn: the feedback function (taking one string) to be run when
99 interesting events are happening
101 self.context = context
102 self._feedback_fn = None
103 self.exclusive_BGL = False
104 self.rpc = rpc.RpcRunner(context.cfg)
106 def _ExecLU(self, lu):
107 """Logical Unit execution sequence.
110 write_count = self.context.cfg.write_count
112 hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
113 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
114 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
115 self._feedback_fn, None)
117 result = lu.Exec(self._feedback_fn)
118 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
119 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
120 self._feedback_fn, result)
122 # FIXME: This needs locks if not lu_class.REQ_BGL
123 if write_count != self.context.cfg.write_count:
128 def _LockAndExecLU(self, lu, level):
129 """Execute a Logical Unit, with the needed locks.
131 This is a recursive function that starts locking the given level, and
132 proceeds up, till there are no more locks to acquire. Then it executes the
133 given LU and its opcodes.
136 adding_locks = level in lu.add_locks
137 acquiring_locks = level in lu.needed_locks
138 if level not in locking.LEVELS:
139 if callable(self._run_notifier):
141 result = self._ExecLU(lu)
142 elif adding_locks and acquiring_locks:
143 # We could both acquire and add locks at the same level, but for now we
144 # don't need this, so we'll avoid the complicated code needed.
145 raise NotImplementedError(
146 "Can't declare locks to acquire when adding others")
147 elif adding_locks or acquiring_locks:
148 lu.DeclareLocks(level)
149 share = lu.share_locks[level]
151 needed_locks = lu.needed_locks[level]
152 lu.acquired_locks[level] = self.context.glm.acquire(level,
156 add_locks = lu.add_locks[level]
157 lu.remove_locks[level] = add_locks
159 self.context.glm.add(level, add_locks, acquired=1, shared=share)
160 except errors.LockError:
161 raise errors.OpPrereqError(
162 "Couldn't add locks (%s), probably because of a race condition"
163 " with another job, who added them first" % add_locks)
167 lu.acquired_locks[level] = add_locks
168 result = self._LockAndExecLU(lu, level + 1)
170 if level in lu.remove_locks:
171 self.context.glm.remove(level, lu.remove_locks[level])
173 if self.context.glm.is_owned(level):
174 self.context.glm.release(level)
176 result = self._LockAndExecLU(lu, level + 1)
180 def ExecOpCode(self, op, feedback_fn, run_notifier):
181 """Execute an opcode.
183 @type op: an OpCode instance
184 @param op: the opcode to be executed
185 @type feedback_fn: a function that takes a single argument
186 @param feedback_fn: this function will be used as feedback from the LU
188 @type run_notifier: callable (no arguments) or None
189 @param run_notifier: this function (if callable) will be called when
190 we are about to call the lu's Exec() method, that
191 is, after we have acquired all locks
194 if not isinstance(op, opcodes.OpCode):
195 raise errors.ProgrammerError("Non-opcode instance passed"
198 self._feedback_fn = feedback_fn
199 self._run_notifier = run_notifier
200 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
202 raise errors.OpCodeUnknown("Unknown opcode")
204 # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
205 # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
206 self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
207 shared=not lu_class.REQ_BGL)
209 self.exclusive_BGL = lu_class.REQ_BGL
210 lu = lu_class(self, op, self.context, self.rpc)
212 assert lu.needed_locks is not None, "needed_locks not set by LU"
213 result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
215 self.context.glm.release(locking.LEVEL_CLUSTER)
216 self.exclusive_BGL = False
220 def LogStep(self, current, total, message):
221 """Log a change in LU execution progress.
224 logging.debug("Step %d/%d %s", current, total, message)
225 self._feedback_fn("STEP %d/%d %s" % (current, total, message))
227 def LogWarning(self, message, *args, **kwargs):
228 """Log a warning to the logs and the user.
230 The optional keyword argument is 'hint' and can be used to show a
231 hint to the user (presumably related to the warning). If the
232 message is empty, it will not be printed at all, allowing one to
236 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
237 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
239 message = message % tuple(args)
241 logging.warning(message)
242 self._feedback_fn(" - WARNING: %s" % message)
244 self._feedback_fn(" Hint: %s" % kwargs["hint"])
246 def LogInfo(self, message, *args):
247 """Log an informational message to the logs and the user.
251 message = message % tuple(args)
252 logging.info(message)
253 self._feedback_fn(" - INFO: %s" % message)
256 class HooksMaster(object):
259 This class distributes the run commands to the nodes based on the
262 In order to remove the direct dependency on the rpc module, the
263 constructor needs a function which actually does the remote
264 call. This will usually be rpc.call_hooks_runner, but any function
265 which behaves the same works.
268 def __init__(self, callfn, proc, lu):
273 self.env, node_list_pre, node_list_post = self._BuildEnv()
274 self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
275 constants.HOOKS_PHASE_POST: node_list_post}
278 """Compute the environment and the target nodes.
280 Based on the opcode and the current node list, this builds the
281 environment for the hooks and the target node list for the run.
285 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
286 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
287 "GANETI_OP_CODE": self.op.OP_ID,
288 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
289 "GANETI_DATA_DIR": constants.DATA_DIR,
292 if self.lu.HPATH is not None:
293 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
296 env["GANETI_" + key] = lu_env[key]
298 lu_nodes_pre = lu_nodes_post = []
300 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
302 def _RunWrapper(self, node_list, hpath, phase):
303 """Simple wrapper over self.callfn.
305 This method fixes the environment before doing the rpc call.
308 env = self.env.copy()
309 env["GANETI_HOOKS_PHASE"] = phase
310 env["GANETI_HOOKS_PATH"] = hpath
311 if self.lu.cfg is not None:
312 env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
313 env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
315 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
317 return self.callfn(node_list, hpath, phase, env)
319 def RunPhase(self, phase):
320 """Run all the scripts for a phase.
322 This is the main function of the HookMaster.
324 @param phase: one of L{constants.HOOKS_PHASE_POST} or
325 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
326 @return: the processed results of the hooks multi-node rpc call
327 @raise errors.HooksFailure: on communication failure to the nodes
330 if not self.node_list[phase]:
331 # empty node list, we should not attempt to run this as either
332 # we're in the cluster init phase and the rpc client part can't
333 # even attempt to run, or this LU doesn't do hooks at all
335 hpath = self.lu.HPATH
336 results = self._RunWrapper(self.node_list[phase], hpath, phase)
337 if phase == constants.HOOKS_PHASE_PRE:
340 raise errors.HooksFailure("Communication failure")
341 for node_name in results:
342 res = results[node_name]
343 if res.failed or res.data is False or not isinstance(res.data, list):
345 self.proc.LogWarning("Communication failure to node %s" %
348 for script, hkr, output in res.data:
349 if hkr == constants.HKR_FAIL:
350 errs.append((node_name, script, output))
352 raise errors.HooksAbort(errs)
355 def RunConfigUpdate(self):
356 """Run the special configuration update hook
358 This is a special hook that runs only on the master after each
359 top-level LI if the configuration has been updated.
362 phase = constants.HOOKS_PHASE_POST
363 hpath = constants.HOOKS_NAME_CFGUPDATE
364 nodes = [self.lu.cfg.GetMasterNode()]
365 self._RunWrapper(nodes, hpath, phase)