4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import config
38 from ganeti import ssconf
39 from ganeti import logger
40 from ganeti import locking
43 class Processor(object):
44 """Object which runs OpCodes"""
47 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
48 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
49 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
50 opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
51 opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
52 opcodes.OpRenameCluster: cmdlib.LURenameCluster,
53 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
54 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
56 opcodes.OpAddNode: cmdlib.LUAddNode,
57 opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59 opcodes.OpRemoveNode: cmdlib.LURemoveNode,
61 opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
62 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
63 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
64 opcodes.OpRenameInstance: cmdlib.LURenameInstance,
65 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
66 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
67 opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
68 opcodes.OpRebootInstance: cmdlib.LURebootInstance,
69 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
70 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
71 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
72 opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
73 opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
74 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
75 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
76 opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
78 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
80 opcodes.OpQueryExports: cmdlib.LUQueryExports,
81 opcodes.OpExportInstance: cmdlib.LUExportInstance,
82 opcodes.OpRemoveExport: cmdlib.LURemoveExport,
84 opcodes.OpGetTags: cmdlib.LUGetTags,
85 opcodes.OpSearchTags: cmdlib.LUSearchTags,
86 opcodes.OpAddTags: cmdlib.LUAddTags,
87 opcodes.OpDelTags: cmdlib.LUDelTags,
89 opcodes.OpTestDelay: cmdlib.LUTestDelay,
90 opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
93 def __init__(self, context, feedback=None):
94 """Constructor for Processor
97 - feedback_fn: the feedback function (taking one string) to be run when
98 interesting events are happening
100 self.context = context
101 self._feedback_fn = feedback
102 self.exclusive_BGL = False
104 def ExecOpCode(self, op):
105 """Execute an opcode.
108 op: the opcode to be executed
111 if not isinstance(op, opcodes.OpCode):
112 raise errors.ProgrammerError("Non-opcode instance passed"
115 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
117 raise errors.OpCodeUnknown("Unknown opcode")
119 if lu_class.REQ_WSSTORE:
120 sstore = ssconf.WritableSimpleStore()
122 sstore = ssconf.SimpleStore()
124 write_count = self.context.cfg.write_count
126 # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
127 # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
128 self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
129 shared=not lu_class.REQ_BGL)
131 self.exclusive_BGL = lu_class.REQ_BGL
132 lu = lu_class(self, op, self.context, sstore)
134 hm = HooksMaster(rpc.call_hooks_runner, self, lu)
135 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
136 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
137 self._feedback_fn, None)
139 result = lu.Exec(self._feedback_fn)
140 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
141 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
142 self._feedback_fn, result)
144 # FIXME: This needs locks if not lu_class.REQ_BGL
145 if write_count != self.context.cfg.write_count:
148 self.context.glm.release(locking.LEVEL_CLUSTER)
149 self.exclusive_BGL = False
153 def ChainOpCode(self, op):
154 """Chain and execute an opcode.
156 This is used by LUs when they need to execute a child LU.
159 - opcode: the opcode to be executed
162 if not isinstance(op, opcodes.OpCode):
163 raise errors.ProgrammerError("Non-opcode instance passed"
166 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
168 raise errors.OpCodeUnknown("Unknown opcode")
170 if lu_class.REQ_BGL and not self.exclusive_BGL:
171 raise errors.ProgrammerError("LUs which require the BGL cannot"
172 " be chained to granular ones.")
174 if lu_class.REQ_WSSTORE:
175 sstore = ssconf.WritableSimpleStore()
177 sstore = ssconf.SimpleStore()
179 #do_hooks = lu_class.HPATH is not None
180 lu = lu_class(self, op, self.context, sstore)
183 # hm = HooksMaster(rpc.call_hooks_runner, self, lu)
184 # h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
185 # lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
186 # h_results, self._feedback_fn, None)
187 result = lu.Exec(self._feedback_fn)
189 # h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
190 # result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
191 # h_results, self._feedback_fn, result)
194 def LogStep(self, current, total, message):
195 """Log a change in LU execution progress.
198 logger.Debug("Step %d/%d %s" % (current, total, message))
199 self._feedback_fn("STEP %d/%d %s" % (current, total, message))
201 def LogWarning(self, message, hint=None):
202 """Log a warning to the logs and the user.
205 logger.Error(message)
206 self._feedback_fn(" - WARNING: %s" % message)
208 self._feedback_fn(" Hint: %s" % hint)
210 def LogInfo(self, message):
211 """Log an informational message to the logs and the user.
215 self._feedback_fn(" - INFO: %s" % message)
218 class HooksMaster(object):
221 This class distributes the run commands to the nodes based on the
224 In order to remove the direct dependency on the rpc module, the
225 constructor needs a function which actually does the remote
226 call. This will usually be rpc.call_hooks_runner, but any function
227 which behaves the same works.
230 def __init__(self, callfn, proc, lu):
235 self.env, node_list_pre, node_list_post = self._BuildEnv()
236 self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
237 constants.HOOKS_PHASE_POST: node_list_post}
240 """Compute the environment and the target nodes.
242 Based on the opcode and the current node list, this builds the
243 environment for the hooks and the target node list for the run.
247 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
248 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
249 "GANETI_OP_CODE": self.op.OP_ID,
250 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
251 "GANETI_DATA_DIR": constants.DATA_DIR,
254 if self.lu.HPATH is not None:
255 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
258 env["GANETI_" + key] = lu_env[key]
260 lu_nodes_pre = lu_nodes_post = []
262 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
264 def _RunWrapper(self, node_list, hpath, phase):
265 """Simple wrapper over self.callfn.
267 This method fixes the environment before doing the rpc call.
270 env = self.env.copy()
271 env["GANETI_HOOKS_PHASE"] = phase
272 env["GANETI_HOOKS_PATH"] = hpath
273 if self.lu.sstore is not None:
274 env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
275 env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
277 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
279 return self.callfn(node_list, hpath, phase, env)
281 def RunPhase(self, phase):
282 """Run all the scripts for a phase.
284 This is the main function of the HookMaster.
287 phase: the hooks phase to run
290 the result of the hooks multi-node rpc call
293 if not self.node_list[phase]:
294 # empty node list, we should not attempt to run this as either
295 # we're in the cluster init phase and the rpc client part can't
296 # even attempt to run, or this LU doesn't do hooks at all
298 hpath = self.lu.HPATH
299 results = self._RunWrapper(self.node_list[phase], hpath, phase)
300 if phase == constants.HOOKS_PHASE_PRE:
303 raise errors.HooksFailure("Communication failure")
304 for node_name in results:
305 res = results[node_name]
306 if res is False or not isinstance(res, list):
307 self.proc.LogWarning("Communication failure to node %s" % node_name)
309 for script, hkr, output in res:
310 if hkr == constants.HKR_FAIL:
311 output = output.strip().encode("string_escape")
312 errs.append((node_name, script, output))
314 raise errors.HooksAbort(errs)
317 def RunConfigUpdate(self):
318 """Run the special configuration update hook
320 This is a special hook that runs only on the master after each
321 top-level LI if the configuration has been updated.
324 phase = constants.HOOKS_PHASE_POST
325 hpath = constants.HOOKS_NAME_CFGUPDATE
326 if self.lu.sstore is None:
327 raise errors.ProgrammerError("Null sstore on config update hook")
328 nodes = [self.lu.sstore.GetMasterNode()]
329 results = self._RunWrapper(nodes, hpath, phase)