Processor: Acquire locks before executing an LU
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import config
38 from ganeti import ssconf
39 from ganeti import logger
40 from ganeti import locking
41
42
43 class Processor(object):
44   """Object which runs OpCodes"""
45   DISPATCH_TABLE = {
46     # Cluster
47     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
48     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
49     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
50     opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
51     opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
52     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
53     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
54     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
55     # node lu
56     opcodes.OpAddNode: cmdlib.LUAddNode,
57     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
60     # instance lu
61     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
62     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
63     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
64     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
65     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
66     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
67     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
68     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
69     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
70     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
71     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
72     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
73     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
74     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
75     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
76     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
77     # os lu
78     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
79     # exports lu
80     opcodes.OpQueryExports: cmdlib.LUQueryExports,
81     opcodes.OpExportInstance: cmdlib.LUExportInstance,
82     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
83     # tags lu
84     opcodes.OpGetTags: cmdlib.LUGetTags,
85     opcodes.OpSearchTags: cmdlib.LUSearchTags,
86     opcodes.OpAddTags: cmdlib.LUAddTags,
87     opcodes.OpDelTags: cmdlib.LUDelTags,
88     # test lu
89     opcodes.OpTestDelay: cmdlib.LUTestDelay,
90     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
91     }
92
93   def __init__(self, context, feedback=None):
94     """Constructor for Processor
95
96     Args:
97      - feedback_fn: the feedback function (taking one string) to be run when
98                     interesting events are happening
99     """
100     self.context = context
101     self._feedback_fn = feedback
102     self.exclusive_BGL = False
103
104   def _ExecLU(self, lu):
105     """Logical Unit execution sequence.
106
107     """
108     write_count = self.context.cfg.write_count
109     lu.CheckPrereq()
110     hm = HooksMaster(rpc.call_hooks_runner, self, lu)
111     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
112     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
113                      self._feedback_fn, None)
114     try:
115       result = lu.Exec(self._feedback_fn)
116       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
117       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
118                                 self._feedback_fn, result)
119     finally:
120       # FIXME: This needs locks if not lu_class.REQ_BGL
121       if write_count != self.context.cfg.write_count:
122         hm.RunConfigUpdate()
123
124     return result
125
126   def _LockAndExecLU(self, lu, level):
127     """Execute a Logical Unit, with the needed locks.
128
129     This is a recursive function that starts locking the given level, and
130     proceeds up, till there are no more locks to acquire. Then it executes the
131     given LU and its opcodes.
132
133     """
134     if level in lu.needed_locks:
135       # This is always safe to do, as we can't acquire more/less locks than
136       # what was requested.
137       lu.needed_locks[level] = self.context.glm.acquire(level,
138                                                         lu.needed_locks[level])
139       try:
140         result = self._LockAndExecLU(lu, level + 1)
141       finally:
142         if lu.needed_locks[level]:
143           self.context.glm.release(level)
144     else:
145       result = self._ExecLU(lu)
146
147     return result
148
149   def ExecOpCode(self, op):
150     """Execute an opcode.
151
152     Args:
153       op: the opcode to be executed
154
155     """
156     if not isinstance(op, opcodes.OpCode):
157       raise errors.ProgrammerError("Non-opcode instance passed"
158                                    " to ExecOpcode")
159
160     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
161     if lu_class is None:
162       raise errors.OpCodeUnknown("Unknown opcode")
163
164     if lu_class.REQ_WSSTORE:
165       sstore = ssconf.WritableSimpleStore()
166     else:
167       sstore = ssconf.SimpleStore()
168
169     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
170     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
171     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
172                              shared=not lu_class.REQ_BGL)
173     try:
174       self.exclusive_BGL = lu_class.REQ_BGL
175       lu = lu_class(self, op, self.context, sstore)
176       lu.ExpandNames()
177       assert lu.needed_locks is not None, "needed_locks not set by LU"
178       result = self._LockAndExecLU(lu, locking.LEVEL_NODE)
179     finally:
180       self.context.glm.release(locking.LEVEL_CLUSTER)
181       self.exclusive_BGL = False
182
183     return result
184
185   def ChainOpCode(self, op):
186     """Chain and execute an opcode.
187
188     This is used by LUs when they need to execute a child LU.
189
190     Args:
191      - opcode: the opcode to be executed
192
193     """
194     if not isinstance(op, opcodes.OpCode):
195       raise errors.ProgrammerError("Non-opcode instance passed"
196                                    " to ExecOpcode")
197
198     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
199     if lu_class is None:
200       raise errors.OpCodeUnknown("Unknown opcode")
201
202     if lu_class.REQ_BGL and not self.exclusive_BGL:
203       raise errors.ProgrammerError("LUs which require the BGL cannot"
204                                    " be chained to granular ones.")
205
206     if lu_class.REQ_WSSTORE:
207       sstore = ssconf.WritableSimpleStore()
208     else:
209       sstore = ssconf.SimpleStore()
210
211     #do_hooks = lu_class.HPATH is not None
212     lu = lu_class(self, op, self.context, sstore)
213     lu.CheckPrereq()
214     #if do_hooks:
215     #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
216     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
217     #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
218     #                   h_results, self._feedback_fn, None)
219     result = lu.Exec(self._feedback_fn)
220     #if do_hooks:
221     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
222     #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
223     #                   h_results, self._feedback_fn, result)
224     return result
225
226   def LogStep(self, current, total, message):
227     """Log a change in LU execution progress.
228
229     """
230     logger.Debug("Step %d/%d %s" % (current, total, message))
231     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
232
233   def LogWarning(self, message, hint=None):
234     """Log a warning to the logs and the user.
235
236     """
237     logger.Error(message)
238     self._feedback_fn(" - WARNING: %s" % message)
239     if hint:
240       self._feedback_fn("      Hint: %s" % hint)
241
242   def LogInfo(self, message):
243     """Log an informational message to the logs and the user.
244
245     """
246     logger.Info(message)
247     self._feedback_fn(" - INFO: %s" % message)
248
249
250 class HooksMaster(object):
251   """Hooks master.
252
253   This class distributes the run commands to the nodes based on the
254   specific LU class.
255
256   In order to remove the direct dependency on the rpc module, the
257   constructor needs a function which actually does the remote
258   call. This will usually be rpc.call_hooks_runner, but any function
259   which behaves the same works.
260
261   """
262   def __init__(self, callfn, proc, lu):
263     self.callfn = callfn
264     self.proc = proc
265     self.lu = lu
266     self.op = lu.op
267     self.env, node_list_pre, node_list_post = self._BuildEnv()
268     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
269                       constants.HOOKS_PHASE_POST: node_list_post}
270
271   def _BuildEnv(self):
272     """Compute the environment and the target nodes.
273
274     Based on the opcode and the current node list, this builds the
275     environment for the hooks and the target node list for the run.
276
277     """
278     env = {
279       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
280       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
281       "GANETI_OP_CODE": self.op.OP_ID,
282       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
283       "GANETI_DATA_DIR": constants.DATA_DIR,
284       }
285
286     if self.lu.HPATH is not None:
287       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
288       if lu_env:
289         for key in lu_env:
290           env["GANETI_" + key] = lu_env[key]
291     else:
292       lu_nodes_pre = lu_nodes_post = []
293
294     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
295
296   def _RunWrapper(self, node_list, hpath, phase):
297     """Simple wrapper over self.callfn.
298
299     This method fixes the environment before doing the rpc call.
300
301     """
302     env = self.env.copy()
303     env["GANETI_HOOKS_PHASE"] = phase
304     env["GANETI_HOOKS_PATH"] = hpath
305     if self.lu.sstore is not None:
306       env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
307       env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
308
309     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
310
311     return self.callfn(node_list, hpath, phase, env)
312
313   def RunPhase(self, phase):
314     """Run all the scripts for a phase.
315
316     This is the main function of the HookMaster.
317
318     Args:
319       phase: the hooks phase to run
320
321     Returns:
322       the result of the hooks multi-node rpc call
323
324     """
325     if not self.node_list[phase]:
326       # empty node list, we should not attempt to run this as either
327       # we're in the cluster init phase and the rpc client part can't
328       # even attempt to run, or this LU doesn't do hooks at all
329       return
330     hpath = self.lu.HPATH
331     results = self._RunWrapper(self.node_list[phase], hpath, phase)
332     if phase == constants.HOOKS_PHASE_PRE:
333       errs = []
334       if not results:
335         raise errors.HooksFailure("Communication failure")
336       for node_name in results:
337         res = results[node_name]
338         if res is False or not isinstance(res, list):
339           self.proc.LogWarning("Communication failure to node %s" % node_name)
340           continue
341         for script, hkr, output in res:
342           if hkr == constants.HKR_FAIL:
343             output = output.strip().encode("string_escape")
344             errs.append((node_name, script, output))
345       if errs:
346         raise errors.HooksAbort(errs)
347     return results
348
349   def RunConfigUpdate(self):
350     """Run the special configuration update hook
351
352     This is a special hook that runs only on the master after each
353     top-level LI if the configuration has been updated.
354
355     """
356     phase = constants.HOOKS_PHASE_POST
357     hpath = constants.HOOKS_NAME_CFGUPDATE
358     if self.lu.sstore is None:
359       raise errors.ProgrammerError("Null sstore on config update hook")
360     nodes = [self.lu.sstore.GetMasterNode()]
361     results = self._RunWrapper(nodes, hpath, phase)