serializer.DumpJson: Control indentation by parameter
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import config
38 from ganeti import ssconf
39 from ganeti import logger
40 from ganeti import locking
41
42
43 class Processor(object):
44   """Object which runs OpCodes"""
45   DISPATCH_TABLE = {
46     # Cluster
47     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
48     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
49     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
50     opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
51     opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
52     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
53     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
54     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
55     # node lu
56     opcodes.OpAddNode: cmdlib.LUAddNode,
57     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
60     # instance lu
61     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
62     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
63     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
64     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
65     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
66     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
67     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
68     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
69     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
70     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
71     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
72     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
73     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
74     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
75     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
76     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
77     # os lu
78     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
79     # exports lu
80     opcodes.OpQueryExports: cmdlib.LUQueryExports,
81     opcodes.OpExportInstance: cmdlib.LUExportInstance,
82     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
83     # tags lu
84     opcodes.OpGetTags: cmdlib.LUGetTags,
85     opcodes.OpSearchTags: cmdlib.LUSearchTags,
86     opcodes.OpAddTags: cmdlib.LUAddTags,
87     opcodes.OpDelTags: cmdlib.LUDelTags,
88     # test lu
89     opcodes.OpTestDelay: cmdlib.LUTestDelay,
90     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
91     }
92
93   def __init__(self, context, feedback=None):
94     """Constructor for Processor
95
96     Args:
97      - feedback_fn: the feedback function (taking one string) to be run when
98                     interesting events are happening
99     """
100     self.context = context
101     self._feedback_fn = feedback
102     self.exclusive_BGL = False
103
104   def ExecOpCode(self, op):
105     """Execute an opcode.
106
107     Args:
108       op: the opcode to be executed
109
110     """
111     if not isinstance(op, opcodes.OpCode):
112       raise errors.ProgrammerError("Non-opcode instance passed"
113                                    " to ExecOpcode")
114
115     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
116     if lu_class is None:
117       raise errors.OpCodeUnknown("Unknown opcode")
118
119     if lu_class.REQ_WSSTORE:
120       sstore = ssconf.WritableSimpleStore()
121     else:
122       sstore = ssconf.SimpleStore()
123
124     write_count = self.context.cfg.write_count
125
126     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
127     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
128     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
129                              shared=not lu_class.REQ_BGL)
130     try:
131       self.exclusive_BGL = lu_class.REQ_BGL
132       lu = lu_class(self, op, self.context, sstore)
133       lu.CheckPrereq()
134       hm = HooksMaster(rpc.call_hooks_runner, self, lu)
135       h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
136       lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
137                        self._feedback_fn, None)
138       try:
139         result = lu.Exec(self._feedback_fn)
140         h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
141         result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
142                                   self._feedback_fn, result)
143       finally:
144         # FIXME: This needs locks if not lu_class.REQ_BGL
145         if write_count != self.context.cfg.write_count:
146           hm.RunConfigUpdate()
147     finally:
148       self.context.glm.release(locking.LEVEL_CLUSTER)
149       self.exclusive_BGL = False
150
151     return result
152
153   def ChainOpCode(self, op):
154     """Chain and execute an opcode.
155
156     This is used by LUs when they need to execute a child LU.
157
158     Args:
159      - opcode: the opcode to be executed
160
161     """
162     if not isinstance(op, opcodes.OpCode):
163       raise errors.ProgrammerError("Non-opcode instance passed"
164                                    " to ExecOpcode")
165
166     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
167     if lu_class is None:
168       raise errors.OpCodeUnknown("Unknown opcode")
169
170     if lu_class.REQ_BGL and not self.exclusive_BGL:
171       raise errors.ProgrammerError("LUs which require the BGL cannot"
172                                    " be chained to granular ones.")
173
174     if lu_class.REQ_WSSTORE:
175       sstore = ssconf.WritableSimpleStore()
176     else:
177       sstore = ssconf.SimpleStore()
178
179     #do_hooks = lu_class.HPATH is not None
180     lu = lu_class(self, op, self.context, sstore)
181     lu.CheckPrereq()
182     #if do_hooks:
183     #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
184     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
185     #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
186     #                   h_results, self._feedback_fn, None)
187     result = lu.Exec(self._feedback_fn)
188     #if do_hooks:
189     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
190     #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
191     #                   h_results, self._feedback_fn, result)
192     return result
193
194   def LogStep(self, current, total, message):
195     """Log a change in LU execution progress.
196
197     """
198     logger.Debug("Step %d/%d %s" % (current, total, message))
199     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
200
201   def LogWarning(self, message, hint=None):
202     """Log a warning to the logs and the user.
203
204     """
205     logger.Error(message)
206     self._feedback_fn(" - WARNING: %s" % message)
207     if hint:
208       self._feedback_fn("      Hint: %s" % hint)
209
210   def LogInfo(self, message):
211     """Log an informational message to the logs and the user.
212
213     """
214     logger.Info(message)
215     self._feedback_fn(" - INFO: %s" % message)
216
217
218 class HooksMaster(object):
219   """Hooks master.
220
221   This class distributes the run commands to the nodes based on the
222   specific LU class.
223
224   In order to remove the direct dependency on the rpc module, the
225   constructor needs a function which actually does the remote
226   call. This will usually be rpc.call_hooks_runner, but any function
227   which behaves the same works.
228
229   """
230   def __init__(self, callfn, proc, lu):
231     self.callfn = callfn
232     self.proc = proc
233     self.lu = lu
234     self.op = lu.op
235     self.env, node_list_pre, node_list_post = self._BuildEnv()
236     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
237                       constants.HOOKS_PHASE_POST: node_list_post}
238
239   def _BuildEnv(self):
240     """Compute the environment and the target nodes.
241
242     Based on the opcode and the current node list, this builds the
243     environment for the hooks and the target node list for the run.
244
245     """
246     env = {
247       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
248       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
249       "GANETI_OP_CODE": self.op.OP_ID,
250       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
251       "GANETI_DATA_DIR": constants.DATA_DIR,
252       }
253
254     if self.lu.HPATH is not None:
255       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
256       if lu_env:
257         for key in lu_env:
258           env["GANETI_" + key] = lu_env[key]
259     else:
260       lu_nodes_pre = lu_nodes_post = []
261
262     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
263
264   def _RunWrapper(self, node_list, hpath, phase):
265     """Simple wrapper over self.callfn.
266
267     This method fixes the environment before doing the rpc call.
268
269     """
270     env = self.env.copy()
271     env["GANETI_HOOKS_PHASE"] = phase
272     env["GANETI_HOOKS_PATH"] = hpath
273     if self.lu.sstore is not None:
274       env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
275       env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
276
277     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
278
279     return self.callfn(node_list, hpath, phase, env)
280
281   def RunPhase(self, phase):
282     """Run all the scripts for a phase.
283
284     This is the main function of the HookMaster.
285
286     Args:
287       phase: the hooks phase to run
288
289     Returns:
290       the result of the hooks multi-node rpc call
291
292     """
293     if not self.node_list[phase]:
294       # empty node list, we should not attempt to run this as either
295       # we're in the cluster init phase and the rpc client part can't
296       # even attempt to run, or this LU doesn't do hooks at all
297       return
298     hpath = self.lu.HPATH
299     results = self._RunWrapper(self.node_list[phase], hpath, phase)
300     if phase == constants.HOOKS_PHASE_PRE:
301       errs = []
302       if not results:
303         raise errors.HooksFailure("Communication failure")
304       for node_name in results:
305         res = results[node_name]
306         if res is False or not isinstance(res, list):
307           self.proc.LogWarning("Communication failure to node %s" % node_name)
308           continue
309         for script, hkr, output in res:
310           if hkr == constants.HKR_FAIL:
311             output = output.strip().encode("string_escape")
312             errs.append((node_name, script, output))
313       if errs:
314         raise errors.HooksAbort(errs)
315     return results
316
317   def RunConfigUpdate(self):
318     """Run the special configuration update hook
319
320     This is a special hook that runs only on the master after each
321     top-level LI if the configuration has been updated.
322
323     """
324     phase = constants.HOOKS_PHASE_POST
325     hpath = constants.HOOKS_NAME_CFGUPDATE
326     if self.lu.sstore is None:
327       raise errors.ProgrammerError("Null sstore on config update hook")
328     nodes = [self.lu.sstore.GetMasterNode()]
329     results = self._RunWrapper(nodes, hpath, phase)