Processor: lock all levels even if one is missing
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import ssconf
38 from ganeti import logger
39 from ganeti import locking
40
41
42 class Processor(object):
43   """Object which runs OpCodes"""
44   DISPATCH_TABLE = {
45     # Cluster
46     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49     opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
50     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53     # node lu
54     opcodes.OpAddNode: cmdlib.LUAddNode,
55     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
58     # instance lu
59     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
60     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
61     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
62     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
63     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
64     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
65     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
66     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
67     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
68     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
69     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
70     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
71     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
72     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
73     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
74     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
75     # os lu
76     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
77     # exports lu
78     opcodes.OpQueryExports: cmdlib.LUQueryExports,
79     opcodes.OpExportInstance: cmdlib.LUExportInstance,
80     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
81     # tags lu
82     opcodes.OpGetTags: cmdlib.LUGetTags,
83     opcodes.OpSearchTags: cmdlib.LUSearchTags,
84     opcodes.OpAddTags: cmdlib.LUAddTags,
85     opcodes.OpDelTags: cmdlib.LUDelTags,
86     # test lu
87     opcodes.OpTestDelay: cmdlib.LUTestDelay,
88     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
89     }
90
91   def __init__(self, context):
92     """Constructor for Processor
93
94     Args:
95      - feedback_fn: the feedback function (taking one string) to be run when
96                     interesting events are happening
97     """
98     self.context = context
99     self._feedback_fn = None
100     self.exclusive_BGL = False
101
102   def _ExecLU(self, lu):
103     """Logical Unit execution sequence.
104
105     """
106     write_count = self.context.cfg.write_count
107     lu.CheckPrereq()
108     hm = HooksMaster(rpc.call_hooks_runner, self, lu)
109     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
110     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
111                      self._feedback_fn, None)
112     try:
113       result = lu.Exec(self._feedback_fn)
114       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
115       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
116                                 self._feedback_fn, result)
117     finally:
118       # FIXME: This needs locks if not lu_class.REQ_BGL
119       if write_count != self.context.cfg.write_count:
120         hm.RunConfigUpdate()
121
122     return result
123
124   def _LockAndExecLU(self, lu, level):
125     """Execute a Logical Unit, with the needed locks.
126
127     This is a recursive function that starts locking the given level, and
128     proceeds up, till there are no more locks to acquire. Then it executes the
129     given LU and its opcodes.
130
131     """
132     if level not in locking.LEVELS:
133       result = self._ExecLU(lu)
134     elif level in lu.needed_locks:
135       # This gives a chance to LUs to make last-minute changes after acquiring
136       # locks at any preceding level.
137       lu.DeclareLocks(level)
138       needed_locks = lu.needed_locks[level]
139       share = lu.share_locks[level]
140       # This is always safe to do, as we can't acquire more/less locks than
141       # what was requested.
142       lu.needed_locks[level] = self.context.glm.acquire(level,
143                                                         needed_locks,
144                                                         shared=share)
145       try:
146         result = self._LockAndExecLU(lu, level + 1)
147       finally:
148         if lu.needed_locks[level]:
149           self.context.glm.release(level)
150     else:
151       result = self._LockAndExecLU(lu, level + 1)
152
153     return result
154
155   def ExecOpCode(self, op, feedback_fn):
156     """Execute an opcode.
157
158     Args:
159       op: the opcode to be executed
160
161     """
162     if not isinstance(op, opcodes.OpCode):
163       raise errors.ProgrammerError("Non-opcode instance passed"
164                                    " to ExecOpcode")
165
166     self._feedback_fn = feedback_fn
167     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
168     if lu_class is None:
169       raise errors.OpCodeUnknown("Unknown opcode")
170
171     if lu_class.REQ_WSSTORE:
172       sstore = ssconf.WritableSimpleStore()
173     else:
174       sstore = ssconf.SimpleStore()
175
176     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
177     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
178     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
179                              shared=not lu_class.REQ_BGL)
180     try:
181       self.exclusive_BGL = lu_class.REQ_BGL
182       lu = lu_class(self, op, self.context, sstore)
183       lu.ExpandNames()
184       assert lu.needed_locks is not None, "needed_locks not set by LU"
185       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
186     finally:
187       self.context.glm.release(locking.LEVEL_CLUSTER)
188       self.exclusive_BGL = False
189
190     return result
191
192   def ChainOpCode(self, op):
193     """Chain and execute an opcode.
194
195     This is used by LUs when they need to execute a child LU.
196
197     Args:
198      - opcode: the opcode to be executed
199
200     """
201     if not isinstance(op, opcodes.OpCode):
202       raise errors.ProgrammerError("Non-opcode instance passed"
203                                    " to ExecOpcode")
204
205     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
206     if lu_class is None:
207       raise errors.OpCodeUnknown("Unknown opcode")
208
209     if lu_class.REQ_BGL and not self.exclusive_BGL:
210       raise errors.ProgrammerError("LUs which require the BGL cannot"
211                                    " be chained to granular ones.")
212
213     assert lu_class.REQ_BGL, "ChainOpCode is still BGL-only"
214
215     if lu_class.REQ_WSSTORE:
216       sstore = ssconf.WritableSimpleStore()
217     else:
218       sstore = ssconf.SimpleStore()
219
220     #do_hooks = lu_class.HPATH is not None
221     lu = lu_class(self, op, self.context, sstore)
222     lu.CheckPrereq()
223     #if do_hooks:
224     #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
225     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
226     #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
227     #                   h_results, self._feedback_fn, None)
228     result = lu.Exec(self._feedback_fn)
229     #if do_hooks:
230     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
231     #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
232     #                   h_results, self._feedback_fn, result)
233     return result
234
235   def LogStep(self, current, total, message):
236     """Log a change in LU execution progress.
237
238     """
239     logger.Debug("Step %d/%d %s" % (current, total, message))
240     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
241
242   def LogWarning(self, message, hint=None):
243     """Log a warning to the logs and the user.
244
245     """
246     logger.Error(message)
247     self._feedback_fn(" - WARNING: %s" % message)
248     if hint:
249       self._feedback_fn("      Hint: %s" % hint)
250
251   def LogInfo(self, message):
252     """Log an informational message to the logs and the user.
253
254     """
255     logger.Info(message)
256     self._feedback_fn(" - INFO: %s" % message)
257
258
259 class HooksMaster(object):
260   """Hooks master.
261
262   This class distributes the run commands to the nodes based on the
263   specific LU class.
264
265   In order to remove the direct dependency on the rpc module, the
266   constructor needs a function which actually does the remote
267   call. This will usually be rpc.call_hooks_runner, but any function
268   which behaves the same works.
269
270   """
271   def __init__(self, callfn, proc, lu):
272     self.callfn = callfn
273     self.proc = proc
274     self.lu = lu
275     self.op = lu.op
276     self.env, node_list_pre, node_list_post = self._BuildEnv()
277     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
278                       constants.HOOKS_PHASE_POST: node_list_post}
279
280   def _BuildEnv(self):
281     """Compute the environment and the target nodes.
282
283     Based on the opcode and the current node list, this builds the
284     environment for the hooks and the target node list for the run.
285
286     """
287     env = {
288       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
289       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
290       "GANETI_OP_CODE": self.op.OP_ID,
291       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
292       "GANETI_DATA_DIR": constants.DATA_DIR,
293       }
294
295     if self.lu.HPATH is not None:
296       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
297       if lu_env:
298         for key in lu_env:
299           env["GANETI_" + key] = lu_env[key]
300     else:
301       lu_nodes_pre = lu_nodes_post = []
302
303     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
304
305   def _RunWrapper(self, node_list, hpath, phase):
306     """Simple wrapper over self.callfn.
307
308     This method fixes the environment before doing the rpc call.
309
310     """
311     env = self.env.copy()
312     env["GANETI_HOOKS_PHASE"] = phase
313     env["GANETI_HOOKS_PATH"] = hpath
314     if self.lu.sstore is not None:
315       env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
316       env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
317
318     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
319
320     return self.callfn(node_list, hpath, phase, env)
321
322   def RunPhase(self, phase):
323     """Run all the scripts for a phase.
324
325     This is the main function of the HookMaster.
326
327     Args:
328       phase: the hooks phase to run
329
330     Returns:
331       the result of the hooks multi-node rpc call
332
333     """
334     if not self.node_list[phase]:
335       # empty node list, we should not attempt to run this as either
336       # we're in the cluster init phase and the rpc client part can't
337       # even attempt to run, or this LU doesn't do hooks at all
338       return
339     hpath = self.lu.HPATH
340     results = self._RunWrapper(self.node_list[phase], hpath, phase)
341     if phase == constants.HOOKS_PHASE_PRE:
342       errs = []
343       if not results:
344         raise errors.HooksFailure("Communication failure")
345       for node_name in results:
346         res = results[node_name]
347         if res is False or not isinstance(res, list):
348           self.proc.LogWarning("Communication failure to node %s" % node_name)
349           continue
350         for script, hkr, output in res:
351           if hkr == constants.HKR_FAIL:
352             output = output.strip().encode("string_escape")
353             errs.append((node_name, script, output))
354       if errs:
355         raise errors.HooksAbort(errs)
356     return results
357
358   def RunConfigUpdate(self):
359     """Run the special configuration update hook
360
361     This is a special hook that runs only on the master after each
362     top-level LI if the configuration has been updated.
363
364     """
365     phase = constants.HOOKS_PHASE_POST
366     hpath = constants.HOOKS_NAME_CFGUPDATE
367     if self.lu.sstore is None:
368       raise errors.ProgrammerError("Null sstore on config update hook")
369     nodes = [self.lu.sstore.GetMasterNode()]
370     results = self._RunWrapper(nodes, hpath, phase)