4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import config
38 from ganeti import ssconf
39 from ganeti import logger
42 class Processor(object):
43 """Object which runs OpCodes"""
46 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49 opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
50 opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
51 opcodes.OpRenameCluster: cmdlib.LURenameCluster,
52 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
53 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
55 opcodes.OpAddNode: cmdlib.LUAddNode,
56 opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
57 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
58 opcodes.OpRemoveNode: cmdlib.LURemoveNode,
60 opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
61 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
62 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
63 opcodes.OpRenameInstance: cmdlib.LURenameInstance,
64 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
65 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
66 opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
67 opcodes.OpRebootInstance: cmdlib.LURebootInstance,
68 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
69 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
70 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
71 opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
72 opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
73 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
74 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
75 opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
77 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
79 opcodes.OpQueryExports: cmdlib.LUQueryExports,
80 opcodes.OpExportInstance: cmdlib.LUExportInstance,
81 opcodes.OpRemoveExport: cmdlib.LURemoveExport,
83 opcodes.OpGetTags: cmdlib.LUGetTags,
84 opcodes.OpSearchTags: cmdlib.LUSearchTags,
85 opcodes.OpAddTags: cmdlib.LUAddTags,
86 opcodes.OpDelTags: cmdlib.LUDelTags,
88 opcodes.OpTestDelay: cmdlib.LUTestDelay,
89 opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
92 def __init__(self, feedback=None):
93 """Constructor for Processor
96 - feedback_fn: the feedback function (taking one string) to be run when
97 interesting events are happening
101 self._feedback_fn = feedback
103 def ExecOpCode(self, op):
104 """Execute an opcode.
107 op: the opcode to be executed
110 if not isinstance(op, opcodes.OpCode):
111 raise errors.ProgrammerError("Non-opcode instance passed"
114 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
116 raise errors.OpCodeUnknown("Unknown opcode")
119 self.cfg = config.ConfigWriter()
120 if lu_class.REQ_WSSTORE:
121 self.sstore = ssconf.WritableSimpleStore()
123 self.sstore = ssconf.SimpleStore()
124 if self.cfg is not None:
125 write_count = self.cfg.write_count
128 lu = lu_class(self, op, self.cfg, self.sstore)
130 hm = HooksMaster(rpc.call_hooks_runner, self, lu)
131 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
132 lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
133 h_results, self._feedback_fn, None)
135 result = lu.Exec(self._feedback_fn)
136 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
137 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
138 h_results, self._feedback_fn, result)
140 if lu.cfg is not None:
141 # we use lu.cfg and not self.cfg as for init cluster, self.cfg
142 # is None but lu.cfg has been recently initialized in the
144 if write_count != lu.cfg.write_count:
149 def ChainOpCode(self, op):
150 """Chain and execute an opcode.
152 This is used by LUs when they need to execute a child LU.
155 - opcode: the opcode to be executed
158 if not isinstance(op, opcodes.OpCode):
159 raise errors.ProgrammerError("Non-opcode instance passed"
162 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
164 raise errors.OpCodeUnknown("Unknown opcode")
167 self.cfg = config.ConfigWriter()
168 self.sstore = ssconf.SimpleStore()
169 #do_hooks = lu_class.HPATH is not None
170 lu = lu_class(self, op, self.cfg, self.sstore)
173 # hm = HooksMaster(rpc.call_hooks_runner, self, lu)
174 # h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
175 # lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
176 # h_results, self._feedback_fn, None)
177 result = lu.Exec(self._feedback_fn)
179 # h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
180 # result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
181 # h_results, self._feedback_fn, result)
184 def LogStep(self, current, total, message):
185 """Log a change in LU execution progress.
188 logger.Debug("Step %d/%d %s" % (current, total, message))
189 self._feedback_fn("STEP %d/%d %s" % (current, total, message))
191 def LogWarning(self, message, hint=None):
192 """Log a warning to the logs and the user.
195 logger.Error(message)
196 self._feedback_fn(" - WARNING: %s" % message)
198 self._feedback_fn(" Hint: %s" % hint)
200 def LogInfo(self, message):
201 """Log an informational message to the logs and the user.
205 self._feedback_fn(" - INFO: %s" % message)
208 class HooksMaster(object):
211 This class distributes the run commands to the nodes based on the
214 In order to remove the direct dependency on the rpc module, the
215 constructor needs a function which actually does the remote
216 call. This will usually be rpc.call_hooks_runner, but any function
217 which behaves the same works.
220 def __init__(self, callfn, proc, lu):
225 self.env, node_list_pre, node_list_post = self._BuildEnv()
226 self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
227 constants.HOOKS_PHASE_POST: node_list_post}
230 """Compute the environment and the target nodes.
232 Based on the opcode and the current node list, this builds the
233 environment for the hooks and the target node list for the run.
237 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
238 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
239 "GANETI_OP_CODE": self.op.OP_ID,
240 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
241 "GANETI_DATA_DIR": constants.DATA_DIR,
244 if self.lu.HPATH is not None:
245 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
248 env["GANETI_" + key] = lu_env[key]
250 lu_nodes_pre = lu_nodes_post = []
252 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
254 def _RunWrapper(self, node_list, hpath, phase):
255 """Simple wrapper over self.callfn.
257 This method fixes the environment before doing the rpc call.
260 env = self.env.copy()
261 env["GANETI_HOOKS_PHASE"] = phase
262 env["GANETI_HOOKS_PATH"] = hpath
263 if self.lu.sstore is not None:
264 env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
265 env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
267 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
269 return self.callfn(node_list, hpath, phase, env)
271 def RunPhase(self, phase):
272 """Run all the scripts for a phase.
274 This is the main function of the HookMaster.
277 phase: the hooks phase to run
280 the result of the hooks multi-node rpc call
283 if not self.node_list[phase]:
284 # empty node list, we should not attempt to run this as either
285 # we're in the cluster init phase and the rpc client part can't
286 # even attempt to run, or this LU doesn't do hooks at all
288 hpath = self.lu.HPATH
289 results = self._RunWrapper(self.node_list[phase], hpath, phase)
290 if phase == constants.HOOKS_PHASE_PRE:
293 raise errors.HooksFailure("Communication failure")
294 for node_name in results:
295 res = results[node_name]
296 if res is False or not isinstance(res, list):
297 self.proc.LogWarning("Communication failure to node %s" % node_name)
299 for script, hkr, output in res:
300 if hkr == constants.HKR_FAIL:
301 output = output.strip().encode("string_escape")
302 errs.append((node_name, script, output))
304 raise errors.HooksAbort(errs)
307 def RunConfigUpdate(self):
308 """Run the special configuration update hook
310 This is a special hook that runs only on the master after each
311 top-level LI if the configuration has been updated.
314 phase = constants.HOOKS_PHASE_POST
315 hpath = constants.HOOKS_NAME_CFGUPDATE
316 if self.lu.sstore is None:
317 raise errors.ProgrammerError("Null sstore on config update hook")
318 nodes = [self.lu.sstore.GetMasterNode()]
319 results = self._RunWrapper(nodes, hpath, phase)