4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module implementing the logic behind the cluster operations
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26 - logical units, which know how to deal with their specific opcode only
27 - the processor, which dispatches the opcodes to their logical units
33 from ganeti import opcodes
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import rpc
37 from ganeti import cmdlib
38 from ganeti import locking
41 class Processor(object):
42 """Object which runs OpCodes"""
45 opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46 opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47 opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48 opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49 opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50 opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51 opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53 opcodes.OpAddNode: cmdlib.LUAddNode,
54 opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
55 opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
56 opcodes.OpRemoveNode: cmdlib.LURemoveNode,
58 opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
59 opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
60 opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
61 opcodes.OpRenameInstance: cmdlib.LURenameInstance,
62 opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
63 opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
64 opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
65 opcodes.OpRebootInstance: cmdlib.LURebootInstance,
66 opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
67 opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
68 opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
69 opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
70 opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
71 opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
72 opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
73 opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
75 opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
77 opcodes.OpQueryExports: cmdlib.LUQueryExports,
78 opcodes.OpExportInstance: cmdlib.LUExportInstance,
79 opcodes.OpRemoveExport: cmdlib.LURemoveExport,
81 opcodes.OpGetTags: cmdlib.LUGetTags,
82 opcodes.OpSearchTags: cmdlib.LUSearchTags,
83 opcodes.OpAddTags: cmdlib.LUAddTags,
84 opcodes.OpDelTags: cmdlib.LUDelTags,
86 opcodes.OpTestDelay: cmdlib.LUTestDelay,
87 opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
90 def __init__(self, context):
91 """Constructor for Processor
94 - feedback_fn: the feedback function (taking one string) to be run when
95 interesting events are happening
97 self.context = context
98 self._feedback_fn = None
99 self.exclusive_BGL = False
100 self.rpc = rpc.RpcRunner(context.cfg)
102 def _ExecLU(self, lu):
103 """Logical Unit execution sequence.
106 write_count = self.context.cfg.write_count
108 hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
109 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
110 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
111 self._feedback_fn, None)
113 result = lu.Exec(self._feedback_fn)
114 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
115 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
116 self._feedback_fn, result)
118 # FIXME: This needs locks if not lu_class.REQ_BGL
119 if write_count != self.context.cfg.write_count:
124 def _LockAndExecLU(self, lu, level):
125 """Execute a Logical Unit, with the needed locks.
127 This is a recursive function that starts locking the given level, and
128 proceeds up, till there are no more locks to acquire. Then it executes the
129 given LU and its opcodes.
132 adding_locks = level in lu.add_locks
133 acquiring_locks = level in lu.needed_locks
134 if level not in locking.LEVELS:
135 if callable(self._run_notifier):
137 result = self._ExecLU(lu)
138 elif adding_locks and acquiring_locks:
139 # We could both acquire and add locks at the same level, but for now we
140 # don't need this, so we'll avoid the complicated code needed.
141 raise NotImplementedError(
142 "Can't declare locks to acquire when adding others")
143 elif adding_locks or acquiring_locks:
144 lu.DeclareLocks(level)
145 share = lu.share_locks[level]
147 needed_locks = lu.needed_locks[level]
148 lu.acquired_locks[level] = self.context.glm.acquire(level,
152 add_locks = lu.add_locks[level]
153 lu.remove_locks[level] = add_locks
155 self.context.glm.add(level, add_locks, acquired=1, shared=share)
156 except errors.LockError:
157 raise errors.OpPrereqError(
158 "Coudn't add locks (%s), probably because of a race condition"
159 " with another job, who added them first" % add_locks)
163 lu.acquired_locks[level] = add_locks
164 result = self._LockAndExecLU(lu, level + 1)
166 if level in lu.remove_locks:
167 self.context.glm.remove(level, lu.remove_locks[level])
169 if self.context.glm.is_owned(level):
170 self.context.glm.release(level)
172 result = self._LockAndExecLU(lu, level + 1)
176 def ExecOpCode(self, op, feedback_fn, run_notifier):
177 """Execute an opcode.
179 @type op: an OpCode instance
180 @param op: the opcode to be executed
181 @type feedback_fn: a function that takes a single argument
182 @param feedback_fn: this function will be used as feedback from the LU
184 @type run_notifier: callable (no arguments) or None
185 @param run_notifier: this function (if callable) will be called when
186 we are about to call the lu's Exec() method, that
187 is, after we have aquired all locks
190 if not isinstance(op, opcodes.OpCode):
191 raise errors.ProgrammerError("Non-opcode instance passed"
194 self._feedback_fn = feedback_fn
195 self._run_notifier = run_notifier
196 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
198 raise errors.OpCodeUnknown("Unknown opcode")
200 # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
201 # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
202 self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
203 shared=not lu_class.REQ_BGL)
205 self.exclusive_BGL = lu_class.REQ_BGL
206 lu = lu_class(self, op, self.context, self.rpc)
208 assert lu.needed_locks is not None, "needed_locks not set by LU"
209 result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
211 self.context.glm.release(locking.LEVEL_CLUSTER)
212 self.exclusive_BGL = False
216 def LogStep(self, current, total, message):
217 """Log a change in LU execution progress.
220 logging.debug("Step %d/%d %s", current, total, message)
221 self._feedback_fn("STEP %d/%d %s" % (current, total, message))
223 def LogWarning(self, message, *args, **kwargs):
224 """Log a warning to the logs and the user.
226 The optional keyword argument is 'hint' and can be used to show a
227 hint to the user (presumably related to the warning). If the
228 message is empty, it will not be printed at all, allowing one to
232 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
233 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
235 message = message % tuple(args)
237 logging.warning(message)
238 self._feedback_fn(" - WARNING: %s" % message)
240 self._feedback_fn(" Hint: %s" % kwargs["hint"])
242 def LogInfo(self, message, *args):
243 """Log an informational message to the logs and the user.
247 message = message % tuple(args)
248 logging.info(message)
249 self._feedback_fn(" - INFO: %s" % message)
252 class HooksMaster(object):
255 This class distributes the run commands to the nodes based on the
258 In order to remove the direct dependency on the rpc module, the
259 constructor needs a function which actually does the remote
260 call. This will usually be rpc.call_hooks_runner, but any function
261 which behaves the same works.
264 def __init__(self, callfn, proc, lu):
269 self.env, node_list_pre, node_list_post = self._BuildEnv()
270 self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
271 constants.HOOKS_PHASE_POST: node_list_post}
274 """Compute the environment and the target nodes.
276 Based on the opcode and the current node list, this builds the
277 environment for the hooks and the target node list for the run.
281 "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
282 "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
283 "GANETI_OP_CODE": self.op.OP_ID,
284 "GANETI_OBJECT_TYPE": self.lu.HTYPE,
285 "GANETI_DATA_DIR": constants.DATA_DIR,
288 if self.lu.HPATH is not None:
289 lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
292 env["GANETI_" + key] = lu_env[key]
294 lu_nodes_pre = lu_nodes_post = []
296 return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
298 def _RunWrapper(self, node_list, hpath, phase):
299 """Simple wrapper over self.callfn.
301 This method fixes the environment before doing the rpc call.
304 env = self.env.copy()
305 env["GANETI_HOOKS_PHASE"] = phase
306 env["GANETI_HOOKS_PATH"] = hpath
307 if self.lu.cfg is not None:
308 env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
309 env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
311 env = dict([(str(key), str(val)) for key, val in env.iteritems()])
313 return self.callfn(node_list, hpath, phase, env)
315 def RunPhase(self, phase):
316 """Run all the scripts for a phase.
318 This is the main function of the HookMaster.
320 @param phase: one of L{constants.HOOKS_PHASE_POST} or
321 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
322 @return: the processed results of the hooks multi-node rpc call
323 @raise errors.HooksFailure: on communication failure to the nodes
326 if not self.node_list[phase]:
327 # empty node list, we should not attempt to run this as either
328 # we're in the cluster init phase and the rpc client part can't
329 # even attempt to run, or this LU doesn't do hooks at all
331 hpath = self.lu.HPATH
332 results = self._RunWrapper(self.node_list[phase], hpath, phase)
333 if phase == constants.HOOKS_PHASE_PRE:
336 raise errors.HooksFailure("Communication failure")
337 for node_name in results:
338 res = results[node_name]
339 if res is False or not isinstance(res, list):
340 self.proc.LogWarning("Communication failure to node %s" % node_name)
342 for script, hkr, output in res:
343 if hkr == constants.HKR_FAIL:
344 output = output.strip().encode("string_escape")
345 errs.append((node_name, script, output))
347 raise errors.HooksAbort(errs)
350 def RunConfigUpdate(self):
351 """Run the special configuration update hook
353 This is a special hook that runs only on the master after each
354 top-level LI if the configuration has been updated.
357 phase = constants.HOOKS_PHASE_POST
358 hpath = constants.HOOKS_NAME_CFGUPDATE
359 nodes = [self.lu.cfg.GetMasterNode()]
360 results = self._RunWrapper(nodes, hpath, phase)