4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import config
38 from ganeti import ssconf
39 from ganeti import logger
40 from ganeti import locking
43 class Processor(object):
44 """Object which runs OpCodes"""
47 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
48 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
49 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
50 opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
51 opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
52 opcodes.OpRenameCluster: cmdlib.LURenameCluster,
53 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
54 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
56 opcodes.OpAddNode: cmdlib.LUAddNode,
57 opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59 opcodes.OpRemoveNode: cmdlib.LURemoveNode,
61 opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
62 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
63 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
64 opcodes.OpRenameInstance: cmdlib.LURenameInstance,
65 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
66 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
67 opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
68 opcodes.OpRebootInstance: cmdlib.LURebootInstance,
69 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
70 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
71 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
72 opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
73 opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
74 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
75 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
76 opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
78 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
80 opcodes.OpQueryExports: cmdlib.LUQueryExports,
81 opcodes.OpExportInstance: cmdlib.LUExportInstance,
82 opcodes.OpRemoveExport: cmdlib.LURemoveExport,
84 opcodes.OpGetTags: cmdlib.LUGetTags,
85 opcodes.OpSearchTags: cmdlib.LUSearchTags,
86 opcodes.OpAddTags: cmdlib.LUAddTags,
87 opcodes.OpDelTags: cmdlib.LUDelTags,
89 opcodes.OpTestDelay: cmdlib.LUTestDelay,
90 opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
93 def __init__(self, context):
94 """Constructor for Processor
97 - feedback_fn: the feedback function (taking one string) to be run when
98 interesting events are happening
100 self.context = context
101 self._feedback_fn = None
102 self.exclusive_BGL = False
104 def _ExecLU(self, lu):
105 """Logical Unit execution sequence.
108 write_count = self.context.cfg.write_count
110 hm = HooksMaster(rpc.call_hooks_runner, self, lu)
111 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
112 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
113 self._feedback_fn, None)
115 result = lu.Exec(self._feedback_fn)
116 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
117 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
118 self._feedback_fn, result)
120 # FIXME: This needs locks if not lu_class.REQ_BGL
121 if write_count != self.context.cfg.write_count:
126 def _LockAndExecLU(self, lu, level):
127 """Execute a Logical Unit, with the needed locks.
129 This is a recursive function that starts locking the given level, and
130 proceeds up, till there are no more locks to acquire. Then it executes the
131 given LU and its opcodes.
134 if level in lu.needed_locks:
135 # This is always safe to do, as we can't acquire more/less locks than
136 # what was requested.
137 lu.needed_locks[level] = self.context.glm.acquire(level,
138 lu.needed_locks[level])
140 result = self._LockAndExecLU(lu, level + 1)
142 if lu.needed_locks[level]:
143 self.context.glm.release(level)
145 result = self._ExecLU(lu)
149 def ExecOpCode(self, op, feedback_fn):
150 """Execute an opcode.
153 op: the opcode to be executed
156 if not isinstance(op, opcodes.OpCode):
157 raise errors.ProgrammerError("Non-opcode instance passed"
160 self._feedback_fn = feedback_fn
161 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
163 raise errors.OpCodeUnknown("Unknown opcode")
165 if lu_class.REQ_WSSTORE:
166 sstore = ssconf.WritableSimpleStore()
168 sstore = ssconf.SimpleStore()
170 # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
171 # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
172 self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
173 shared=not lu_class.REQ_BGL)
175 self.exclusive_BGL = lu_class.REQ_BGL
176 lu = lu_class(self, op, self.context, sstore)
178 assert lu.needed_locks is not None, "needed_locks not set by LU"
179 result = self._LockAndExecLU(lu, locking.LEVEL_NODE)
181 self.context.glm.release(locking.LEVEL_CLUSTER)
182 self.exclusive_BGL = False
186 def ChainOpCode(self, op):
187 """Chain and execute an opcode.
189 This is used by LUs when they need to execute a child LU.
192 - opcode: the opcode to be executed
195 if not isinstance(op, opcodes.OpCode):
196 raise errors.ProgrammerError("Non-opcode instance passed"
199 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
201 raise errors.OpCodeUnknown("Unknown opcode")
203 if lu_class.REQ_BGL and not self.exclusive_BGL:
204 raise errors.ProgrammerError("LUs which require the BGL cannot"
205 " be chained to granular ones.")
207 if lu_class.REQ_WSSTORE:
208 sstore = ssconf.WritableSimpleStore()
210 sstore = ssconf.SimpleStore()
212 #do_hooks = lu_class.HPATH is not None
213 lu = lu_class(self, op, self.context, sstore)
216 # hm = HooksMaster(rpc.call_hooks_runner, self, lu)
217 # h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
218 # lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
219 # h_results, self._feedback_fn, None)
220 result = lu.Exec(self._feedback_fn)
222 # h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
223 # result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
224 # h_results, self._feedback_fn, result)
227 def LogStep(self, current, total, message):
228 """Log a change in LU execution progress.
231 logger.Debug("Step %d/%d %s" % (current, total, message))
232 self._feedback_fn("STEP %d/%d %s" % (current, total, message))
234 def LogWarning(self, message, hint=None):
235 """Log a warning to the logs and the user.
238 logger.Error(message)
239 self._feedback_fn(" - WARNING: %s" % message)
241 self._feedback_fn(" Hint: %s" % hint)
243 def LogInfo(self, message):
244 """Log an informational message to the logs and the user.
248 self._feedback_fn(" - INFO: %s" % message)
251 class HooksMaster(object):
254 This class distributes the run commands to the nodes based on the
257 In order to remove the direct dependency on the rpc module, the
258 constructor needs a function which actually does the remote
259 call. This will usually be rpc.call_hooks_runner, but any function
260 which behaves the same works.
263 def __init__(self, callfn, proc, lu):
268 self.env, node_list_pre, node_list_post = self._BuildEnv()
269 self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
270 constants.HOOKS_PHASE_POST: node_list_post}
273 """Compute the environment and the target nodes.
275 Based on the opcode and the current node list, this builds the
276 environment for the hooks and the target node list for the run.
280 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
281 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
282 "GANETI_OP_CODE": self.op.OP_ID,
283 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
284 "GANETI_DATA_DIR": constants.DATA_DIR,
287 if self.lu.HPATH is not None:
288 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
291 env["GANETI_" + key] = lu_env[key]
293 lu_nodes_pre = lu_nodes_post = []
295 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
297 def _RunWrapper(self, node_list, hpath, phase):
298 """Simple wrapper over self.callfn.
300 This method fixes the environment before doing the rpc call.
303 env = self.env.copy()
304 env["GANETI_HOOKS_PHASE"] = phase
305 env["GANETI_HOOKS_PATH"] = hpath
306 if self.lu.sstore is not None:
307 env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
308 env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
310 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
312 return self.callfn(node_list, hpath, phase, env)
314 def RunPhase(self, phase):
315 """Run all the scripts for a phase.
317 This is the main function of the HookMaster.
320 phase: the hooks phase to run
323 the result of the hooks multi-node rpc call
326 if not self.node_list[phase]:
327 # empty node list, we should not attempt to run this as either
328 # we're in the cluster init phase and the rpc client part can't
329 # even attempt to run, or this LU doesn't do hooks at all
331 hpath = self.lu.HPATH
332 results = self._RunWrapper(self.node_list[phase], hpath, phase)
333 if phase == constants.HOOKS_PHASE_PRE:
336 raise errors.HooksFailure("Communication failure")
337 for node_name in results:
338 res = results[node_name]
339 if res is False or not isinstance(res, list):
340 self.proc.LogWarning("Communication failure to node %s" % node_name)
342 for script, hkr, output in res:
343 if hkr == constants.HKR_FAIL:
344 output = output.strip().encode("string_escape")
345 errs.append((node_name, script, output))
347 raise errors.HooksAbort(errs)
350 def RunConfigUpdate(self):
351 """Run the special configuration update hook
353 This is a special hook that runs only on the master after each
354 top-level LI if the configuration has been updated.
357 phase = constants.HOOKS_PHASE_POST
358 hpath = constants.HOOKS_NAME_CFGUPDATE
359 if self.lu.sstore is None:
360 raise errors.ProgrammerError("Null sstore on config update hook")
361 nodes = [self.lu.sstore.GetMasterNode()]
362 results = self._RunWrapper(nodes, hpath, phase)