Move InitCluster opcode into a single function
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import config
38 from ganeti import ssconf
39 from ganeti import logger
40
41
42 class Processor(object):
43   """Object which runs OpCodes"""
44   DISPATCH_TABLE = {
45     # Cluster
46     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48     opcodes.OpClusterCopyFile: cmdlib.LUClusterCopyFile,
49     opcodes.OpRunClusterCommand: cmdlib.LURunClusterCommand,
50     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
51     opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
52     opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
53     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
54     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
55     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
56     # node lu
57     opcodes.OpAddNode: cmdlib.LUAddNode,
58     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
59     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
60     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
61     # instance lu
62     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
63     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
64     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
65     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
66     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
67     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
68     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
69     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
70     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
71     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
72     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
73     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
74     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
75     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
76     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
77     # os lu
78     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
79     # exports lu
80     opcodes.OpQueryExports: cmdlib.LUQueryExports,
81     opcodes.OpExportInstance: cmdlib.LUExportInstance,
82     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
83     # tags lu
84     opcodes.OpGetTags: cmdlib.LUGetTags,
85     opcodes.OpSearchTags: cmdlib.LUSearchTags,
86     opcodes.OpAddTags: cmdlib.LUAddTags,
87     opcodes.OpDelTags: cmdlib.LUDelTags,
88     # test lu
89     opcodes.OpTestDelay: cmdlib.LUTestDelay,
90     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
91     }
92
93   def __init__(self, feedback=None):
94     """Constructor for Processor
95
96     Args:
97      - feedback_fn: the feedback function (taking one string) to be run when
98                     interesting events are happening
99     """
100     self.cfg = None
101     self.sstore = None
102     self._feedback_fn = feedback
103
104   def ExecOpCode(self, op):
105     """Execute an opcode.
106
107     Args:
108       op: the opcode to be executed
109
110     """
111     if not isinstance(op, opcodes.OpCode):
112       raise errors.ProgrammerError("Non-opcode instance passed"
113                                    " to ExecOpcode")
114
115     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
116     if lu_class is None:
117       raise errors.OpCodeUnknown("Unknown opcode")
118
119     if self.cfg is None:
120       self.cfg = config.ConfigWriter()
121       self.sstore = ssconf.SimpleStore()
122     if self.cfg is not None:
123       write_count = self.cfg.write_count
124     else:
125       write_count = 0
126     lu = lu_class(self, op, self.cfg, self.sstore)
127     lu.CheckPrereq()
128     hm = HooksMaster(rpc.call_hooks_runner, self, lu)
129     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
130     lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
131                      h_results, self._feedback_fn, None)
132     try:
133       result = lu.Exec(self._feedback_fn)
134       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
135       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
136                        h_results, self._feedback_fn, result)
137     finally:
138       if lu.cfg is not None:
139         # we use lu.cfg and not self.cfg as for init cluster, self.cfg
140         # is None but lu.cfg has been recently initialized in the
141         # lu.Exec method
142         if write_count != lu.cfg.write_count:
143           hm.RunConfigUpdate()
144
145     return result
146
147   def ChainOpCode(self, op):
148     """Chain and execute an opcode.
149
150     This is used by LUs when they need to execute a child LU.
151
152     Args:
153      - opcode: the opcode to be executed
154
155     """
156     if not isinstance(op, opcodes.OpCode):
157       raise errors.ProgrammerError("Non-opcode instance passed"
158                                    " to ExecOpcode")
159
160     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
161     if lu_class is None:
162       raise errors.OpCodeUnknown("Unknown opcode")
163
164     if self.cfg is None:
165       self.cfg = config.ConfigWriter()
166       self.sstore = ssconf.SimpleStore()
167     #do_hooks = lu_class.HPATH is not None
168     lu = lu_class(self, op, self.cfg, self.sstore)
169     lu.CheckPrereq()
170     #if do_hooks:
171     #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
172     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
173     #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
174     #                   h_results, self._feedback_fn, None)
175     result = lu.Exec(self._feedback_fn)
176     #if do_hooks:
177     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
178     #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
179     #                   h_results, self._feedback_fn, result)
180     return result
181
182   def LogStep(self, current, total, message):
183     """Log a change in LU execution progress.
184
185     """
186     logger.Debug("Step %d/%d %s" % (current, total, message))
187     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
188
189   def LogWarning(self, message, hint=None):
190     """Log a warning to the logs and the user.
191
192     """
193     logger.Error(message)
194     self._feedback_fn(" - WARNING: %s" % message)
195     if hint:
196       self._feedback_fn("      Hint: %s" % hint)
197
198   def LogInfo(self, message):
199     """Log an informational message to the logs and the user.
200
201     """
202     logger.Info(message)
203     self._feedback_fn(" - INFO: %s" % message)
204
205
206 class HooksMaster(object):
207   """Hooks master.
208
209   This class distributes the run commands to the nodes based on the
210   specific LU class.
211
212   In order to remove the direct dependency on the rpc module, the
213   constructor needs a function which actually does the remote
214   call. This will usually be rpc.call_hooks_runner, but any function
215   which behaves the same works.
216
217   """
218   def __init__(self, callfn, proc, lu):
219     self.callfn = callfn
220     self.proc = proc
221     self.lu = lu
222     self.op = lu.op
223     self.env, node_list_pre, node_list_post = self._BuildEnv()
224     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
225                       constants.HOOKS_PHASE_POST: node_list_post}
226
227   def _BuildEnv(self):
228     """Compute the environment and the target nodes.
229
230     Based on the opcode and the current node list, this builds the
231     environment for the hooks and the target node list for the run.
232
233     """
234     env = {
235       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
236       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
237       "GANETI_OP_CODE": self.op.OP_ID,
238       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
239       "GANETI_DATA_DIR": constants.DATA_DIR,
240       }
241
242     if self.lu.HPATH is not None:
243       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
244       if lu_env:
245         for key in lu_env:
246           env["GANETI_" + key] = lu_env[key]
247     else:
248       lu_nodes_pre = lu_nodes_post = []
249
250     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
251
252   def _RunWrapper(self, node_list, hpath, phase):
253     """Simple wrapper over self.callfn.
254
255     This method fixes the environment before doing the rpc call.
256
257     """
258     env = self.env.copy()
259     env["GANETI_HOOKS_PHASE"] = phase
260     env["GANETI_HOOKS_PATH"] = hpath
261     if self.lu.sstore is not None:
262       env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
263       env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
264
265     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
266
267     return self.callfn(node_list, hpath, phase, env)
268
269   def RunPhase(self, phase):
270     """Run all the scripts for a phase.
271
272     This is the main function of the HookMaster.
273
274     Args:
275       phase: the hooks phase to run
276
277     Returns:
278       the result of the hooks multi-node rpc call
279
280     """
281     if not self.node_list[phase]:
282       # empty node list, we should not attempt to run this as either
283       # we're in the cluster init phase and the rpc client part can't
284       # even attempt to run, or this LU doesn't do hooks at all
285       return
286     hpath = self.lu.HPATH
287     results = self._RunWrapper(self.node_list[phase], hpath, phase)
288     if phase == constants.HOOKS_PHASE_PRE:
289       errs = []
290       if not results:
291         raise errors.HooksFailure("Communication failure")
292       for node_name in results:
293         res = results[node_name]
294         if res is False or not isinstance(res, list):
295           self.proc.LogWarning("Communication failure to node %s" % node_name)
296           continue
297         for script, hkr, output in res:
298           if hkr == constants.HKR_FAIL:
299             output = output.strip().encode("string_escape")
300             errs.append((node_name, script, output))
301       if errs:
302         raise errors.HooksAbort(errs)
303     return results
304
305   def RunConfigUpdate(self):
306     """Run the special configuration update hook
307
308     This is a special hook that runs only on the master after each
309     top-level LI if the configuration has been updated.
310
311     """
312     phase = constants.HOOKS_PHASE_POST
313     hpath = constants.HOOKS_NAME_CFGUPDATE
314     if self.lu.sstore is None:
315       raise errors.ProgrammerError("Null sstore on config update hook")
316     nodes = [self.lu.sstore.GetMasterNode()]
317     results = self._RunWrapper(nodes, hpath, phase)