4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import ssconf
38 from ganeti import logger
39 from ganeti import locking
42 class Processor(object):
43 """Object which runs OpCodes"""
46 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49 opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
50 opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
54 opcodes.OpAddNode: cmdlib.LUAddNode,
55 opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57 opcodes.OpRemoveNode: cmdlib.LURemoveNode,
59 opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
60 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
61 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
62 opcodes.OpRenameInstance: cmdlib.LURenameInstance,
63 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
64 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
65 opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
66 opcodes.OpRebootInstance: cmdlib.LURebootInstance,
67 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
68 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
69 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
70 opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
71 opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
72 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
73 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
74 opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
76 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
78 opcodes.OpQueryExports: cmdlib.LUQueryExports,
79 opcodes.OpExportInstance: cmdlib.LUExportInstance,
80 opcodes.OpRemoveExport: cmdlib.LURemoveExport,
82 opcodes.OpGetTags: cmdlib.LUGetTags,
83 opcodes.OpSearchTags: cmdlib.LUSearchTags,
84 opcodes.OpAddTags: cmdlib.LUAddTags,
85 opcodes.OpDelTags: cmdlib.LUDelTags,
87 opcodes.OpTestDelay: cmdlib.LUTestDelay,
88 opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
91 def __init__(self, context):
92 """Constructor for Processor
95 - feedback_fn: the feedback function (taking one string) to be run when
96 interesting events are happening
98 self.context = context
99 self._feedback_fn = None
100 self.exclusive_BGL = False
102 def _ExecLU(self, lu):
103 """Logical Unit execution sequence.
106 write_count = self.context.cfg.write_count
108 hm = HooksMaster(rpc.call_hooks_runner, self, lu)
109 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
110 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
111 self._feedback_fn, None)
113 result = lu.Exec(self._feedback_fn)
114 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
115 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
116 self._feedback_fn, result)
118 # FIXME: This needs locks if not lu_class.REQ_BGL
119 if write_count != self.context.cfg.write_count:
124 def _LockAndExecLU(self, lu, level):
125 """Execute a Logical Unit, with the needed locks.
127 This is a recursive function that starts locking the given level, and
128 proceeds up, till there are no more locks to acquire. Then it executes the
129 given LU and its opcodes.
132 adding_locks = level in lu.add_locks
133 acquiring_locks = level in lu.needed_locks
134 if level not in locking.LEVELS:
135 result = self._ExecLU(lu)
136 elif adding_locks and acquiring_locks:
137 # We could both acquire and add locks at the same level, but for now we
138 # don't need this, so we'll avoid the complicated code needed.
139 raise NotImplementedError(
140 "Can't declare locks to acquire when adding others")
141 elif adding_locks or acquiring_locks:
142 lu.DeclareLocks(level)
143 share = lu.share_locks[level]
145 needed_locks = lu.needed_locks[level]
146 lu.acquired_locks[level] = self.context.glm.acquire(level,
150 add_locks = lu.add_locks[level]
151 lu.remove_locks[level] = add_locks
153 self.context.glm.add(level, add_locks, acquired=1, shared=share)
154 except errors.LockError:
155 raise errors.OpPrereqError(
156 "Coudn't add locks (%s), probably because of a race condition"
157 " with another job, who added them first" % add_locks)
161 lu.acquired_locks[level] = add_locks
162 result = self._LockAndExecLU(lu, level + 1)
164 if level in lu.remove_locks:
165 self.context.glm.remove(level, lu.remove_locks[level])
167 if self.context.glm.is_owned(level):
168 self.context.glm.release(level)
170 result = self._LockAndExecLU(lu, level + 1)
174 def ExecOpCode(self, op, feedback_fn):
175 """Execute an opcode.
178 op: the opcode to be executed
181 if not isinstance(op, opcodes.OpCode):
182 raise errors.ProgrammerError("Non-opcode instance passed"
185 self._feedback_fn = feedback_fn
186 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
188 raise errors.OpCodeUnknown("Unknown opcode")
190 if lu_class.REQ_WSSTORE:
191 sstore = ssconf.WritableSimpleStore()
193 sstore = ssconf.SimpleStore()
195 # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
196 # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
197 self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
198 shared=not lu_class.REQ_BGL)
200 self.exclusive_BGL = lu_class.REQ_BGL
201 lu = lu_class(self, op, self.context, sstore)
203 assert lu.needed_locks is not None, "needed_locks not set by LU"
204 result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
206 self.context.glm.release(locking.LEVEL_CLUSTER)
207 self.exclusive_BGL = False
211 def LogStep(self, current, total, message):
212 """Log a change in LU execution progress.
215 logger.Debug("Step %d/%d %s" % (current, total, message))
216 self._feedback_fn("STEP %d/%d %s" % (current, total, message))
218 def LogWarning(self, message, hint=None):
219 """Log a warning to the logs and the user.
222 logger.Error(message)
223 self._feedback_fn(" - WARNING: %s" % message)
225 self._feedback_fn(" Hint: %s" % hint)
227 def LogInfo(self, message):
228 """Log an informational message to the logs and the user.
232 self._feedback_fn(" - INFO: %s" % message)
235 class HooksMaster(object):
238 This class distributes the run commands to the nodes based on the
241 In order to remove the direct dependency on the rpc module, the
242 constructor needs a function which actually does the remote
243 call. This will usually be rpc.call_hooks_runner, but any function
244 which behaves the same works.
247 def __init__(self, callfn, proc, lu):
252 self.env, node_list_pre, node_list_post = self._BuildEnv()
253 self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
254 constants.HOOKS_PHASE_POST: node_list_post}
257 """Compute the environment and the target nodes.
259 Based on the opcode and the current node list, this builds the
260 environment for the hooks and the target node list for the run.
264 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
265 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
266 "GANETI_OP_CODE": self.op.OP_ID,
267 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
268 "GANETI_DATA_DIR": constants.DATA_DIR,
271 if self.lu.HPATH is not None:
272 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
275 env["GANETI_" + key] = lu_env[key]
277 lu_nodes_pre = lu_nodes_post = []
279 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
281 def _RunWrapper(self, node_list, hpath, phase):
282 """Simple wrapper over self.callfn.
284 This method fixes the environment before doing the rpc call.
287 env = self.env.copy()
288 env["GANETI_HOOKS_PHASE"] = phase
289 env["GANETI_HOOKS_PATH"] = hpath
290 if self.lu.sstore is not None:
291 env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
292 env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
294 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
296 return self.callfn(node_list, hpath, phase, env)
298 def RunPhase(self, phase):
299 """Run all the scripts for a phase.
301 This is the main function of the HookMaster.
304 phase: the hooks phase to run
307 the result of the hooks multi-node rpc call
310 if not self.node_list[phase]:
311 # empty node list, we should not attempt to run this as either
312 # we're in the cluster init phase and the rpc client part can't
313 # even attempt to run, or this LU doesn't do hooks at all
315 hpath = self.lu.HPATH
316 results = self._RunWrapper(self.node_list[phase], hpath, phase)
317 if phase == constants.HOOKS_PHASE_PRE:
320 raise errors.HooksFailure("Communication failure")
321 for node_name in results:
322 res = results[node_name]
323 if res is False or not isinstance(res, list):
324 self.proc.LogWarning("Communication failure to node %s" % node_name)
326 for script, hkr, output in res:
327 if hkr == constants.HKR_FAIL:
328 output = output.strip().encode("string_escape")
329 errs.append((node_name, script, output))
331 raise errors.HooksAbort(errs)
334 def RunConfigUpdate(self):
335 """Run the special configuration update hook
337 This is a special hook that runs only on the master after each
338 top-level LI if the configuration has been updated.
341 phase = constants.HOOKS_PHASE_POST
342 hpath = constants.HOOKS_NAME_CFGUPDATE
343 if self.lu.sstore is None:
344 raise errors.ProgrammerError("Null sstore on config update hook")
345 nodes = [self.lu.sstore.GetMasterNode()]
346 results = self._RunWrapper(nodes, hpath, phase)