First version of user feedback fixes
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import config
38 from ganeti import ssconf
39 from ganeti import logger
40 from ganeti import locking
41
42
43 class Processor(object):
44   """Object which runs OpCodes"""
45   DISPATCH_TABLE = {
46     # Cluster
47     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
48     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
49     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
50     opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
51     opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
52     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
53     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
54     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
55     # node lu
56     opcodes.OpAddNode: cmdlib.LUAddNode,
57     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
60     # instance lu
61     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
62     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
63     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
64     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
65     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
66     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
67     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
68     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
69     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
70     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
71     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
72     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
73     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
74     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
75     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
76     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
77     # os lu
78     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
79     # exports lu
80     opcodes.OpQueryExports: cmdlib.LUQueryExports,
81     opcodes.OpExportInstance: cmdlib.LUExportInstance,
82     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
83     # tags lu
84     opcodes.OpGetTags: cmdlib.LUGetTags,
85     opcodes.OpSearchTags: cmdlib.LUSearchTags,
86     opcodes.OpAddTags: cmdlib.LUAddTags,
87     opcodes.OpDelTags: cmdlib.LUDelTags,
88     # test lu
89     opcodes.OpTestDelay: cmdlib.LUTestDelay,
90     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
91     }
92
93   def __init__(self, context):
94     """Constructor for Processor
95
96     Args:
97      - feedback_fn: the feedback function (taking one string) to be run when
98                     interesting events are happening
99     """
100     self.context = context
101     self._feedback_fn = None
102     self.exclusive_BGL = False
103
104   def _ExecLU(self, lu):
105     """Logical Unit execution sequence.
106
107     """
108     write_count = self.context.cfg.write_count
109     lu.CheckPrereq()
110     hm = HooksMaster(rpc.call_hooks_runner, self, lu)
111     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
112     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
113                      self._feedback_fn, None)
114     try:
115       result = lu.Exec(self._feedback_fn)
116       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
117       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
118                                 self._feedback_fn, result)
119     finally:
120       # FIXME: This needs locks if not lu_class.REQ_BGL
121       if write_count != self.context.cfg.write_count:
122         hm.RunConfigUpdate()
123
124     return result
125
126   def _LockAndExecLU(self, lu, level):
127     """Execute a Logical Unit, with the needed locks.
128
129     This is a recursive function that starts locking the given level, and
130     proceeds up, till there are no more locks to acquire. Then it executes the
131     given LU and its opcodes.
132
133     """
134     if level in lu.needed_locks:
135       # This is always safe to do, as we can't acquire more/less locks than
136       # what was requested.
137       lu.needed_locks[level] = self.context.glm.acquire(level,
138                                                         lu.needed_locks[level])
139       try:
140         result = self._LockAndExecLU(lu, level + 1)
141       finally:
142         if lu.needed_locks[level]:
143           self.context.glm.release(level)
144     else:
145       result = self._ExecLU(lu)
146
147     return result
148
149   def ExecOpCode(self, op, feedback_fn):
150     """Execute an opcode.
151
152     Args:
153       op: the opcode to be executed
154
155     """
156     if not isinstance(op, opcodes.OpCode):
157       raise errors.ProgrammerError("Non-opcode instance passed"
158                                    " to ExecOpcode")
159
160     self._feedback_fn = feedback_fn
161     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
162     if lu_class is None:
163       raise errors.OpCodeUnknown("Unknown opcode")
164
165     if lu_class.REQ_WSSTORE:
166       sstore = ssconf.WritableSimpleStore()
167     else:
168       sstore = ssconf.SimpleStore()
169
170     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
171     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
172     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
173                              shared=not lu_class.REQ_BGL)
174     try:
175       self.exclusive_BGL = lu_class.REQ_BGL
176       lu = lu_class(self, op, self.context, sstore)
177       lu.ExpandNames()
178       assert lu.needed_locks is not None, "needed_locks not set by LU"
179       result = self._LockAndExecLU(lu, locking.LEVEL_NODE)
180     finally:
181       self.context.glm.release(locking.LEVEL_CLUSTER)
182       self.exclusive_BGL = False
183
184     return result
185
186   def ChainOpCode(self, op):
187     """Chain and execute an opcode.
188
189     This is used by LUs when they need to execute a child LU.
190
191     Args:
192      - opcode: the opcode to be executed
193
194     """
195     if not isinstance(op, opcodes.OpCode):
196       raise errors.ProgrammerError("Non-opcode instance passed"
197                                    " to ExecOpcode")
198
199     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
200     if lu_class is None:
201       raise errors.OpCodeUnknown("Unknown opcode")
202
203     if lu_class.REQ_BGL and not self.exclusive_BGL:
204       raise errors.ProgrammerError("LUs which require the BGL cannot"
205                                    " be chained to granular ones.")
206
207     if lu_class.REQ_WSSTORE:
208       sstore = ssconf.WritableSimpleStore()
209     else:
210       sstore = ssconf.SimpleStore()
211
212     #do_hooks = lu_class.HPATH is not None
213     lu = lu_class(self, op, self.context, sstore)
214     lu.CheckPrereq()
215     #if do_hooks:
216     #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
217     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
218     #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
219     #                   h_results, self._feedback_fn, None)
220     result = lu.Exec(self._feedback_fn)
221     #if do_hooks:
222     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
223     #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
224     #                   h_results, self._feedback_fn, result)
225     return result
226
227   def LogStep(self, current, total, message):
228     """Log a change in LU execution progress.
229
230     """
231     logger.Debug("Step %d/%d %s" % (current, total, message))
232     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
233
234   def LogWarning(self, message, hint=None):
235     """Log a warning to the logs and the user.
236
237     """
238     logger.Error(message)
239     self._feedback_fn(" - WARNING: %s" % message)
240     if hint:
241       self._feedback_fn("      Hint: %s" % hint)
242
243   def LogInfo(self, message):
244     """Log an informational message to the logs and the user.
245
246     """
247     logger.Info(message)
248     self._feedback_fn(" - INFO: %s" % message)
249
250
251 class HooksMaster(object):
252   """Hooks master.
253
254   This class distributes the run commands to the nodes based on the
255   specific LU class.
256
257   In order to remove the direct dependency on the rpc module, the
258   constructor needs a function which actually does the remote
259   call. This will usually be rpc.call_hooks_runner, but any function
260   which behaves the same works.
261
262   """
263   def __init__(self, callfn, proc, lu):
264     self.callfn = callfn
265     self.proc = proc
266     self.lu = lu
267     self.op = lu.op
268     self.env, node_list_pre, node_list_post = self._BuildEnv()
269     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
270                       constants.HOOKS_PHASE_POST: node_list_post}
271
272   def _BuildEnv(self):
273     """Compute the environment and the target nodes.
274
275     Based on the opcode and the current node list, this builds the
276     environment for the hooks and the target node list for the run.
277
278     """
279     env = {
280       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
281       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
282       "GANETI_OP_CODE": self.op.OP_ID,
283       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
284       "GANETI_DATA_DIR": constants.DATA_DIR,
285       }
286
287     if self.lu.HPATH is not None:
288       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
289       if lu_env:
290         for key in lu_env:
291           env["GANETI_" + key] = lu_env[key]
292     else:
293       lu_nodes_pre = lu_nodes_post = []
294
295     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
296
297   def _RunWrapper(self, node_list, hpath, phase):
298     """Simple wrapper over self.callfn.
299
300     This method fixes the environment before doing the rpc call.
301
302     """
303     env = self.env.copy()
304     env["GANETI_HOOKS_PHASE"] = phase
305     env["GANETI_HOOKS_PATH"] = hpath
306     if self.lu.sstore is not None:
307       env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
308       env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
309
310     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
311
312     return self.callfn(node_list, hpath, phase, env)
313
314   def RunPhase(self, phase):
315     """Run all the scripts for a phase.
316
317     This is the main function of the HookMaster.
318
319     Args:
320       phase: the hooks phase to run
321
322     Returns:
323       the result of the hooks multi-node rpc call
324
325     """
326     if not self.node_list[phase]:
327       # empty node list, we should not attempt to run this as either
328       # we're in the cluster init phase and the rpc client part can't
329       # even attempt to run, or this LU doesn't do hooks at all
330       return
331     hpath = self.lu.HPATH
332     results = self._RunWrapper(self.node_list[phase], hpath, phase)
333     if phase == constants.HOOKS_PHASE_PRE:
334       errs = []
335       if not results:
336         raise errors.HooksFailure("Communication failure")
337       for node_name in results:
338         res = results[node_name]
339         if res is False or not isinstance(res, list):
340           self.proc.LogWarning("Communication failure to node %s" % node_name)
341           continue
342         for script, hkr, output in res:
343           if hkr == constants.HKR_FAIL:
344             output = output.strip().encode("string_escape")
345             errs.append((node_name, script, output))
346       if errs:
347         raise errors.HooksAbort(errs)
348     return results
349
350   def RunConfigUpdate(self):
351     """Run the special configuration update hook
352
353     This is a special hook that runs only on the master after each
354     top-level LI if the configuration has been updated.
355
356     """
357     phase = constants.HOOKS_PHASE_POST
358     hpath = constants.HOOKS_NAME_CFGUPDATE
359     if self.lu.sstore is None:
360       raise errors.ProgrammerError("Null sstore on config update hook")
361     nodes = [self.lu.sstore.GetMasterNode()]
362     results = self._RunWrapper(nodes, hpath, phase)