4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import config
38 from ganeti import ssconf
39 from ganeti import logger
42 class Processor(object):
43 """Object which runs OpCodes"""
46 opcodes.OpInitCluster: cmdlib.LUInitCluster,
47 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
48 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
49 opcodes.OpClusterCopyFile: cmdlib.LUClusterCopyFile,
50 opcodes.OpRunClusterCommand: cmdlib.LURunClusterCommand,
51 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
52 opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
53 opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
54 opcodes.OpRenameCluster: cmdlib.LURenameCluster,
55 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
56 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
58 opcodes.OpAddNode: cmdlib.LUAddNode,
59 opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
60 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
61 opcodes.OpRemoveNode: cmdlib.LURemoveNode,
63 opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
64 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
65 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
66 opcodes.OpRenameInstance: cmdlib.LURenameInstance,
67 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
68 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
69 opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
70 opcodes.OpRebootInstance: cmdlib.LURebootInstance,
71 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
72 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
73 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
74 opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
75 opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
76 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
77 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
79 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
81 opcodes.OpQueryExports: cmdlib.LUQueryExports,
82 opcodes.OpExportInstance: cmdlib.LUExportInstance,
83 opcodes.OpRemoveExport: cmdlib.LURemoveExport,
85 opcodes.OpGetTags: cmdlib.LUGetTags,
86 opcodes.OpSearchTags: cmdlib.LUSearchTags,
87 opcodes.OpAddTags: cmdlib.LUAddTags,
88 opcodes.OpDelTags: cmdlib.LUDelTags,
90 opcodes.OpTestDelay: cmdlib.LUTestDelay,
91 opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
94 def __init__(self, feedback=None):
95 """Constructor for Processor
98 - feedback_fn: the feedback function (taking one string) to be run when
99 interesting events are happening
103 self._feedback_fn = feedback
105 def ExecOpCode(self, op):
106 """Execute an opcode.
109 op: the opcode to be executed
112 if not isinstance(op, opcodes.OpCode):
113 raise errors.ProgrammerError("Non-opcode instance passed"
116 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
118 raise errors.OpCodeUnknown("Unknown opcode")
120 if lu_class.REQ_CLUSTER and self.cfg is None:
121 self.cfg = config.ConfigWriter()
122 self.sstore = ssconf.SimpleStore()
123 if self.cfg is not None:
124 write_count = self.cfg.write_count
127 lu = lu_class(self, op, self.cfg, self.sstore)
129 hm = HooksMaster(rpc.call_hooks_runner, self, lu)
130 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
131 lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
132 h_results, self._feedback_fn, None)
134 result = lu.Exec(self._feedback_fn)
135 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
136 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
137 h_results, self._feedback_fn, result)
139 if lu.cfg is not None:
140 # we use lu.cfg and not self.cfg as for init cluster, self.cfg
141 # is None but lu.cfg has been recently initialized in the
143 if write_count != lu.cfg.write_count:
148 def ChainOpCode(self, op):
149 """Chain and execute an opcode.
151 This is used by LUs when they need to execute a child LU.
154 - opcode: the opcode to be executed
157 if not isinstance(op, opcodes.OpCode):
158 raise errors.ProgrammerError("Non-opcode instance passed"
161 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
163 raise errors.OpCodeUnknown("Unknown opcode")
165 if lu_class.REQ_CLUSTER and self.cfg is None:
166 self.cfg = config.ConfigWriter()
167 self.sstore = ssconf.SimpleStore()
168 #do_hooks = lu_class.HPATH is not None
169 lu = lu_class(self, op, self.cfg, self.sstore)
172 # hm = HooksMaster(rpc.call_hooks_runner, self, lu)
173 # h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
174 # lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
175 # h_results, self._feedback_fn, None)
176 result = lu.Exec(self._feedback_fn)
178 # h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
179 # result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
180 # h_results, self._feedback_fn, result)
183 def LogStep(self, current, total, message):
184 """Log a change in LU execution progress.
187 logger.Debug("Step %d/%d %s" % (current, total, message))
188 self._feedback_fn("STEP %d/%d %s" % (current, total, message))
190 def LogWarning(self, message, hint=None):
191 """Log a warning to the logs and the user.
194 logger.Error(message)
195 self._feedback_fn(" - WARNING: %s" % message)
197 self._feedback_fn(" Hint: %s" % hint)
199 def LogInfo(self, message):
200 """Log an informational message to the logs and the user.
204 self._feedback_fn(" - INFO: %s" % message)
207 class HooksMaster(object):
210 This class distributes the run commands to the nodes based on the
213 In order to remove the direct dependency on the rpc module, the
214 constructor needs a function which actually does the remote
215 call. This will usually be rpc.call_hooks_runner, but any function
216 which behaves the same works.
219 def __init__(self, callfn, proc, lu):
224 self.env, node_list_pre, node_list_post = self._BuildEnv()
225 self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
226 constants.HOOKS_PHASE_POST: node_list_post}
229 """Compute the environment and the target nodes.
231 Based on the opcode and the current node list, this builds the
232 environment for the hooks and the target node list for the run.
236 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
237 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
238 "GANETI_OP_CODE": self.op.OP_ID,
239 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
240 "GANETI_DATA_DIR": constants.DATA_DIR,
243 if self.lu.HPATH is not None:
244 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
247 env["GANETI_" + key] = lu_env[key]
249 lu_nodes_pre = lu_nodes_post = []
251 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
253 def _RunWrapper(self, node_list, hpath, phase):
254 """Simple wrapper over self.callfn.
256 This method fixes the environment before doing the rpc call.
259 env = self.env.copy()
260 env["GANETI_HOOKS_PHASE"] = phase
261 env["GANETI_HOOKS_PATH"] = hpath
262 if self.lu.sstore is not None:
263 env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
264 env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
266 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
268 return self.callfn(node_list, hpath, phase, env)
270 def RunPhase(self, phase):
271 """Run all the scripts for a phase.
273 This is the main function of the HookMaster.
276 phase: the hooks phase to run
279 the result of the hooks multi-node rpc call
282 if not self.node_list[phase]:
283 # empty node list, we should not attempt to run this as either
284 # we're in the cluster init phase and the rpc client part can't
285 # even attempt to run, or this LU doesn't do hooks at all
287 hpath = self.lu.HPATH
288 results = self._RunWrapper(self.node_list[phase], hpath, phase)
289 if phase == constants.HOOKS_PHASE_PRE:
292 raise errors.HooksFailure("Communication failure")
293 for node_name in results:
294 res = results[node_name]
295 if res is False or not isinstance(res, list):
296 self.proc.LogWarning("Communication failure to node %s" % node_name)
298 for script, hkr, output in res:
299 if hkr == constants.HKR_FAIL:
300 output = output.strip().encode("string_escape")
301 errs.append((node_name, script, output))
303 raise errors.HooksAbort(errs)
306 def RunConfigUpdate(self):
307 """Run the special configuration update hook
309 This is a special hook that runs only on the master after each
310 top-level LI if the configuration has been updated.
313 phase = constants.HOOKS_PHASE_POST
314 hpath = constants.HOOKS_NAME_CFGUPDATE
315 if self.lu.sstore is None:
316 raise errors.ProgrammerError("Null sstore on config update hook")
317 nodes = [self.lu.sstore.GetMasterNode()]
318 results = self._RunWrapper(nodes, hpath, phase)