4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import ssconf
38 from ganeti import logger
39 from ganeti import locking
42 class Processor(object):
43 """Object which runs OpCodes"""
46 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49 opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
50 opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
54 opcodes.OpAddNode: cmdlib.LUAddNode,
55 opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57 opcodes.OpRemoveNode: cmdlib.LURemoveNode,
59 opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
60 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
61 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
62 opcodes.OpRenameInstance: cmdlib.LURenameInstance,
63 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
64 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
65 opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
66 opcodes.OpRebootInstance: cmdlib.LURebootInstance,
67 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
68 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
69 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
70 opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
71 opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
72 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
73 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
74 opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
76 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
78 opcodes.OpQueryExports: cmdlib.LUQueryExports,
79 opcodes.OpExportInstance: cmdlib.LUExportInstance,
80 opcodes.OpRemoveExport: cmdlib.LURemoveExport,
82 opcodes.OpGetTags: cmdlib.LUGetTags,
83 opcodes.OpSearchTags: cmdlib.LUSearchTags,
84 opcodes.OpAddTags: cmdlib.LUAddTags,
85 opcodes.OpDelTags: cmdlib.LUDelTags,
87 opcodes.OpTestDelay: cmdlib.LUTestDelay,
88 opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
91 def __init__(self, context):
92 """Constructor for Processor
95 - feedback_fn: the feedback function (taking one string) to be run when
96 interesting events are happening
98 self.context = context
99 self._feedback_fn = None
100 self.exclusive_BGL = False
102 def _ExecLU(self, lu):
103 """Logical Unit execution sequence.
106 write_count = self.context.cfg.write_count
108 hm = HooksMaster(rpc.call_hooks_runner, self, lu)
109 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
110 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
111 self._feedback_fn, None)
113 result = lu.Exec(self._feedback_fn)
114 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
115 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
116 self._feedback_fn, result)
118 # FIXME: This needs locks if not lu_class.REQ_BGL
119 if write_count != self.context.cfg.write_count:
124 def _LockAndExecLU(self, lu, level):
125 """Execute a Logical Unit, with the needed locks.
127 This is a recursive function that starts locking the given level, and
128 proceeds up, till there are no more locks to acquire. Then it executes the
129 given LU and its opcodes.
132 if level not in locking.LEVELS:
133 result = self._ExecLU(lu)
134 elif level in lu.needed_locks:
135 # This gives a chance to LUs to make last-minute changes after acquiring
136 # locks at any preceding level.
137 lu.DeclareLocks(level)
138 needed_locks = lu.needed_locks[level]
139 share = lu.share_locks[level]
140 # This is always safe to do, as we can't acquire more/less locks than
141 # what was requested.
142 lu.acquired_locks[level] = self.context.glm.acquire(level,
146 result = self._LockAndExecLU(lu, level + 1)
148 # We need to release the current level if we acquired any lock, or if
149 # we acquired the set-lock (needed_locks is None)
150 if lu.needed_locks[level] is None or lu.acquired_locks[level]:
151 self.context.glm.release(level)
153 result = self._LockAndExecLU(lu, level + 1)
157 def ExecOpCode(self, op, feedback_fn):
158 """Execute an opcode.
161 op: the opcode to be executed
164 if not isinstance(op, opcodes.OpCode):
165 raise errors.ProgrammerError("Non-opcode instance passed"
168 self._feedback_fn = feedback_fn
169 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
171 raise errors.OpCodeUnknown("Unknown opcode")
173 if lu_class.REQ_WSSTORE:
174 sstore = ssconf.WritableSimpleStore()
176 sstore = ssconf.SimpleStore()
178 # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
179 # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
180 self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
181 shared=not lu_class.REQ_BGL)
183 self.exclusive_BGL = lu_class.REQ_BGL
184 lu = lu_class(self, op, self.context, sstore)
186 assert lu.needed_locks is not None, "needed_locks not set by LU"
187 result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
189 self.context.glm.release(locking.LEVEL_CLUSTER)
190 self.exclusive_BGL = False
194 def ChainOpCode(self, op):
195 """Chain and execute an opcode.
197 This is used by LUs when they need to execute a child LU.
200 - opcode: the opcode to be executed
203 if not isinstance(op, opcodes.OpCode):
204 raise errors.ProgrammerError("Non-opcode instance passed"
207 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
209 raise errors.OpCodeUnknown("Unknown opcode")
211 if lu_class.REQ_BGL and not self.exclusive_BGL:
212 raise errors.ProgrammerError("LUs which require the BGL cannot"
213 " be chained to granular ones.")
215 assert lu_class.REQ_BGL, "ChainOpCode is still BGL-only"
217 if lu_class.REQ_WSSTORE:
218 sstore = ssconf.WritableSimpleStore()
220 sstore = ssconf.SimpleStore()
222 #do_hooks = lu_class.HPATH is not None
223 lu = lu_class(self, op, self.context, sstore)
226 # hm = HooksMaster(rpc.call_hooks_runner, self, lu)
227 # h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
228 # lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
229 # h_results, self._feedback_fn, None)
230 result = lu.Exec(self._feedback_fn)
232 # h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
233 # result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
234 # h_results, self._feedback_fn, result)
237 def LogStep(self, current, total, message):
238 """Log a change in LU execution progress.
241 logger.Debug("Step %d/%d %s" % (current, total, message))
242 self._feedback_fn("STEP %d/%d %s" % (current, total, message))
244 def LogWarning(self, message, hint=None):
245 """Log a warning to the logs and the user.
248 logger.Error(message)
249 self._feedback_fn(" - WARNING: %s" % message)
251 self._feedback_fn(" Hint: %s" % hint)
253 def LogInfo(self, message):
254 """Log an informational message to the logs and the user.
258 self._feedback_fn(" - INFO: %s" % message)
261 class HooksMaster(object):
264 This class distributes the run commands to the nodes based on the
267 In order to remove the direct dependency on the rpc module, the
268 constructor needs a function which actually does the remote
269 call. This will usually be rpc.call_hooks_runner, but any function
270 which behaves the same works.
273 def __init__(self, callfn, proc, lu):
278 self.env, node_list_pre, node_list_post = self._BuildEnv()
279 self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
280 constants.HOOKS_PHASE_POST: node_list_post}
283 """Compute the environment and the target nodes.
285 Based on the opcode and the current node list, this builds the
286 environment for the hooks and the target node list for the run.
290 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
291 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
292 "GANETI_OP_CODE": self.op.OP_ID,
293 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
294 "GANETI_DATA_DIR": constants.DATA_DIR,
297 if self.lu.HPATH is not None:
298 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
301 env["GANETI_" + key] = lu_env[key]
303 lu_nodes_pre = lu_nodes_post = []
305 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
307 def _RunWrapper(self, node_list, hpath, phase):
308 """Simple wrapper over self.callfn.
310 This method fixes the environment before doing the rpc call.
313 env = self.env.copy()
314 env["GANETI_HOOKS_PHASE"] = phase
315 env["GANETI_HOOKS_PATH"] = hpath
316 if self.lu.sstore is not None:
317 env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
318 env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
320 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
322 return self.callfn(node_list, hpath, phase, env)
324 def RunPhase(self, phase):
325 """Run all the scripts for a phase.
327 This is the main function of the HookMaster.
330 phase: the hooks phase to run
333 the result of the hooks multi-node rpc call
336 if not self.node_list[phase]:
337 # empty node list, we should not attempt to run this as either
338 # we're in the cluster init phase and the rpc client part can't
339 # even attempt to run, or this LU doesn't do hooks at all
341 hpath = self.lu.HPATH
342 results = self._RunWrapper(self.node_list[phase], hpath, phase)
343 if phase == constants.HOOKS_PHASE_PRE:
346 raise errors.HooksFailure("Communication failure")
347 for node_name in results:
348 res = results[node_name]
349 if res is False or not isinstance(res, list):
350 self.proc.LogWarning("Communication failure to node %s" % node_name)
352 for script, hkr, output in res:
353 if hkr == constants.HKR_FAIL:
354 output = output.strip().encode("string_escape")
355 errs.append((node_name, script, output))
357 raise errors.HooksAbort(errs)
360 def RunConfigUpdate(self):
361 """Run the special configuration update hook
363 This is a special hook that runs only on the master after each
364 top-level LI if the configuration has been updated.
367 phase = constants.HOOKS_PHASE_POST
368 hpath = constants.HOOKS_NAME_CFGUPDATE
369 if self.lu.sstore is None:
370 raise errors.ProgrammerError("Null sstore on config update hook")
371 nodes = [self.lu.sstore.GetMasterNode()]
372 results = self._RunWrapper(nodes, hpath, phase)