4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import config
38 from ganeti import ssconf
39 from ganeti import logger
40 from ganeti import locking
43 class Processor(object):
44 """Object which runs OpCodes"""
47 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
48 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
49 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
50 opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
51 opcodes.OpRenameCluster: cmdlib.LURenameCluster,
52 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
53 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
55 opcodes.OpAddNode: cmdlib.LUAddNode,
56 opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
57 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
58 opcodes.OpRemoveNode: cmdlib.LURemoveNode,
60 opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
61 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
62 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
63 opcodes.OpRenameInstance: cmdlib.LURenameInstance,
64 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
65 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
66 opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
67 opcodes.OpRebootInstance: cmdlib.LURebootInstance,
68 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
69 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
70 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
71 opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
72 opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
73 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
74 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
75 opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
77 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
79 opcodes.OpQueryExports: cmdlib.LUQueryExports,
80 opcodes.OpExportInstance: cmdlib.LUExportInstance,
81 opcodes.OpRemoveExport: cmdlib.LURemoveExport,
83 opcodes.OpGetTags: cmdlib.LUGetTags,
84 opcodes.OpSearchTags: cmdlib.LUSearchTags,
85 opcodes.OpAddTags: cmdlib.LUAddTags,
86 opcodes.OpDelTags: cmdlib.LUDelTags,
88 opcodes.OpTestDelay: cmdlib.LUTestDelay,
89 opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
92 def __init__(self, context):
93 """Constructor for Processor
96 - feedback_fn: the feedback function (taking one string) to be run when
97 interesting events are happening
99 self.context = context
100 self._feedback_fn = None
101 self.exclusive_BGL = False
103 def _ExecLU(self, lu):
104 """Logical Unit execution sequence.
107 write_count = self.context.cfg.write_count
109 hm = HooksMaster(rpc.call_hooks_runner, self, lu)
110 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
111 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
112 self._feedback_fn, None)
114 result = lu.Exec(self._feedback_fn)
115 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
116 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
117 self._feedback_fn, result)
119 # FIXME: This needs locks if not lu_class.REQ_BGL
120 if write_count != self.context.cfg.write_count:
125 def _LockAndExecLU(self, lu, level):
126 """Execute a Logical Unit, with the needed locks.
128 This is a recursive function that starts locking the given level, and
129 proceeds up, till there are no more locks to acquire. Then it executes the
130 given LU and its opcodes.
133 if level in lu.needed_locks:
134 # This is always safe to do, as we can't acquire more/less locks than
135 # what was requested.
136 lu.needed_locks[level] = self.context.glm.acquire(level,
137 lu.needed_locks[level])
139 result = self._LockAndExecLU(lu, level + 1)
141 if lu.needed_locks[level]:
142 self.context.glm.release(level)
144 result = self._ExecLU(lu)
148 def ExecOpCode(self, op, feedback_fn):
149 """Execute an opcode.
152 op: the opcode to be executed
155 if not isinstance(op, opcodes.OpCode):
156 raise errors.ProgrammerError("Non-opcode instance passed"
159 self._feedback_fn = feedback_fn
160 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
162 raise errors.OpCodeUnknown("Unknown opcode")
164 if lu_class.REQ_WSSTORE:
165 sstore = ssconf.WritableSimpleStore()
167 sstore = ssconf.SimpleStore()
169 # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
170 # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
171 self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
172 shared=not lu_class.REQ_BGL)
174 self.exclusive_BGL = lu_class.REQ_BGL
175 lu = lu_class(self, op, self.context, sstore)
177 assert lu.needed_locks is not None, "needed_locks not set by LU"
178 result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
180 self.context.glm.release(locking.LEVEL_CLUSTER)
181 self.exclusive_BGL = False
185 def ChainOpCode(self, op):
186 """Chain and execute an opcode.
188 This is used by LUs when they need to execute a child LU.
191 - opcode: the opcode to be executed
194 if not isinstance(op, opcodes.OpCode):
195 raise errors.ProgrammerError("Non-opcode instance passed"
198 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
200 raise errors.OpCodeUnknown("Unknown opcode")
202 if lu_class.REQ_BGL and not self.exclusive_BGL:
203 raise errors.ProgrammerError("LUs which require the BGL cannot"
204 " be chained to granular ones.")
206 if lu_class.REQ_WSSTORE:
207 sstore = ssconf.WritableSimpleStore()
209 sstore = ssconf.SimpleStore()
211 #do_hooks = lu_class.HPATH is not None
212 lu = lu_class(self, op, self.context, sstore)
215 # hm = HooksMaster(rpc.call_hooks_runner, self, lu)
216 # h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
217 # lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
218 # h_results, self._feedback_fn, None)
219 result = lu.Exec(self._feedback_fn)
221 # h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
222 # result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
223 # h_results, self._feedback_fn, result)
226 def LogStep(self, current, total, message):
227 """Log a change in LU execution progress.
230 logger.Debug("Step %d/%d %s" % (current, total, message))
231 self._feedback_fn("STEP %d/%d %s" % (current, total, message))
233 def LogWarning(self, message, hint=None):
234 """Log a warning to the logs and the user.
237 logger.Error(message)
238 self._feedback_fn(" - WARNING: %s" % message)
240 self._feedback_fn(" Hint: %s" % hint)
242 def LogInfo(self, message):
243 """Log an informational message to the logs and the user.
247 self._feedback_fn(" - INFO: %s" % message)
250 class HooksMaster(object):
253 This class distributes the run commands to the nodes based on the
256 In order to remove the direct dependency on the rpc module, the
257 constructor needs a function which actually does the remote
258 call. This will usually be rpc.call_hooks_runner, but any function
259 which behaves the same works.
262 def __init__(self, callfn, proc, lu):
267 self.env, node_list_pre, node_list_post = self._BuildEnv()
268 self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
269 constants.HOOKS_PHASE_POST: node_list_post}
272 """Compute the environment and the target nodes.
274 Based on the opcode and the current node list, this builds the
275 environment for the hooks and the target node list for the run.
279 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
280 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
281 "GANETI_OP_CODE": self.op.OP_ID,
282 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
283 "GANETI_DATA_DIR": constants.DATA_DIR,
286 if self.lu.HPATH is not None:
287 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
290 env["GANETI_" + key] = lu_env[key]
292 lu_nodes_pre = lu_nodes_post = []
294 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
296 def _RunWrapper(self, node_list, hpath, phase):
297 """Simple wrapper over self.callfn.
299 This method fixes the environment before doing the rpc call.
302 env = self.env.copy()
303 env["GANETI_HOOKS_PHASE"] = phase
304 env["GANETI_HOOKS_PATH"] = hpath
305 if self.lu.sstore is not None:
306 env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
307 env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
309 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
311 return self.callfn(node_list, hpath, phase, env)
313 def RunPhase(self, phase):
314 """Run all the scripts for a phase.
316 This is the main function of the HookMaster.
319 phase: the hooks phase to run
322 the result of the hooks multi-node rpc call
325 if not self.node_list[phase]:
326 # empty node list, we should not attempt to run this as either
327 # we're in the cluster init phase and the rpc client part can't
328 # even attempt to run, or this LU doesn't do hooks at all
330 hpath = self.lu.HPATH
331 results = self._RunWrapper(self.node_list[phase], hpath, phase)
332 if phase == constants.HOOKS_PHASE_PRE:
335 raise errors.HooksFailure("Communication failure")
336 for node_name in results:
337 res = results[node_name]
338 if res is False or not isinstance(res, list):
339 self.proc.LogWarning("Communication failure to node %s" % node_name)
341 for script, hkr, output in res:
342 if hkr == constants.HKR_FAIL:
343 output = output.strip().encode("string_escape")
344 errs.append((node_name, script, output))
346 raise errors.HooksAbort(errs)
349 def RunConfigUpdate(self):
350 """Run the special configuration update hook
352 This is a special hook that runs only on the master after each
353 top-level LI if the configuration has been updated.
356 phase = constants.HOOKS_PHASE_POST
357 hpath = constants.HOOKS_NAME_CFGUPDATE
358 if self.lu.sstore is None:
359 raise errors.ProgrammerError("Null sstore on config update hook")
360 nodes = [self.lu.sstore.GetMasterNode()]
361 results = self._RunWrapper(nodes, hpath, phase)