Move SetKey to WritableSimpleStore and use it
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import config
38 from ganeti import ssconf
39 from ganeti import logger
40
41
42 class Processor(object):
43   """Object which runs OpCodes"""
44   DISPATCH_TABLE = {
45     # Cluster
46     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48     opcodes.OpClusterCopyFile: cmdlib.LUClusterCopyFile,
49     opcodes.OpRunClusterCommand: cmdlib.LURunClusterCommand,
50     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
51     opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
52     opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
53     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
54     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
55     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
56     # node lu
57     opcodes.OpAddNode: cmdlib.LUAddNode,
58     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
59     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
60     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
61     # instance lu
62     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
63     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
64     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
65     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
66     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
67     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
68     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
69     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
70     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
71     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
72     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
73     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
74     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
75     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
76     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
77     # os lu
78     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
79     # exports lu
80     opcodes.OpQueryExports: cmdlib.LUQueryExports,
81     opcodes.OpExportInstance: cmdlib.LUExportInstance,
82     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
83     # tags lu
84     opcodes.OpGetTags: cmdlib.LUGetTags,
85     opcodes.OpSearchTags: cmdlib.LUSearchTags,
86     opcodes.OpAddTags: cmdlib.LUAddTags,
87     opcodes.OpDelTags: cmdlib.LUDelTags,
88     # test lu
89     opcodes.OpTestDelay: cmdlib.LUTestDelay,
90     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
91     }
92
93   def __init__(self, feedback=None):
94     """Constructor for Processor
95
96     Args:
97      - feedback_fn: the feedback function (taking one string) to be run when
98                     interesting events are happening
99     """
100     self.cfg = None
101     self.sstore = None
102     self._feedback_fn = feedback
103
104   def ExecOpCode(self, op):
105     """Execute an opcode.
106
107     Args:
108       op: the opcode to be executed
109
110     """
111     if not isinstance(op, opcodes.OpCode):
112       raise errors.ProgrammerError("Non-opcode instance passed"
113                                    " to ExecOpcode")
114
115     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
116     if lu_class is None:
117       raise errors.OpCodeUnknown("Unknown opcode")
118
119     if self.cfg is None:
120       self.cfg = config.ConfigWriter()
121       if lu_class.REQ_WSSTORE:
122         self.sstore = ssconf.WritableSimpleStore()
123       else:
124         self.sstore = ssconf.SimpleStore()
125     if self.cfg is not None:
126       write_count = self.cfg.write_count
127     else:
128       write_count = 0
129     lu = lu_class(self, op, self.cfg, self.sstore)
130     lu.CheckPrereq()
131     hm = HooksMaster(rpc.call_hooks_runner, self, lu)
132     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
133     lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
134                      h_results, self._feedback_fn, None)
135     try:
136       result = lu.Exec(self._feedback_fn)
137       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
138       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
139                        h_results, self._feedback_fn, result)
140     finally:
141       if lu.cfg is not None:
142         # we use lu.cfg and not self.cfg as for init cluster, self.cfg
143         # is None but lu.cfg has been recently initialized in the
144         # lu.Exec method
145         if write_count != lu.cfg.write_count:
146           hm.RunConfigUpdate()
147
148     return result
149
150   def ChainOpCode(self, op):
151     """Chain and execute an opcode.
152
153     This is used by LUs when they need to execute a child LU.
154
155     Args:
156      - opcode: the opcode to be executed
157
158     """
159     if not isinstance(op, opcodes.OpCode):
160       raise errors.ProgrammerError("Non-opcode instance passed"
161                                    " to ExecOpcode")
162
163     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
164     if lu_class is None:
165       raise errors.OpCodeUnknown("Unknown opcode")
166
167     if self.cfg is None:
168       self.cfg = config.ConfigWriter()
169       self.sstore = ssconf.SimpleStore()
170     #do_hooks = lu_class.HPATH is not None
171     lu = lu_class(self, op, self.cfg, self.sstore)
172     lu.CheckPrereq()
173     #if do_hooks:
174     #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
175     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
176     #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
177     #                   h_results, self._feedback_fn, None)
178     result = lu.Exec(self._feedback_fn)
179     #if do_hooks:
180     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
181     #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
182     #                   h_results, self._feedback_fn, result)
183     return result
184
185   def LogStep(self, current, total, message):
186     """Log a change in LU execution progress.
187
188     """
189     logger.Debug("Step %d/%d %s" % (current, total, message))
190     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
191
192   def LogWarning(self, message, hint=None):
193     """Log a warning to the logs and the user.
194
195     """
196     logger.Error(message)
197     self._feedback_fn(" - WARNING: %s" % message)
198     if hint:
199       self._feedback_fn("      Hint: %s" % hint)
200
201   def LogInfo(self, message):
202     """Log an informational message to the logs and the user.
203
204     """
205     logger.Info(message)
206     self._feedback_fn(" - INFO: %s" % message)
207
208
209 class HooksMaster(object):
210   """Hooks master.
211
212   This class distributes the run commands to the nodes based on the
213   specific LU class.
214
215   In order to remove the direct dependency on the rpc module, the
216   constructor needs a function which actually does the remote
217   call. This will usually be rpc.call_hooks_runner, but any function
218   which behaves the same works.
219
220   """
221   def __init__(self, callfn, proc, lu):
222     self.callfn = callfn
223     self.proc = proc
224     self.lu = lu
225     self.op = lu.op
226     self.env, node_list_pre, node_list_post = self._BuildEnv()
227     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
228                       constants.HOOKS_PHASE_POST: node_list_post}
229
230   def _BuildEnv(self):
231     """Compute the environment and the target nodes.
232
233     Based on the opcode and the current node list, this builds the
234     environment for the hooks and the target node list for the run.
235
236     """
237     env = {
238       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
239       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
240       "GANETI_OP_CODE": self.op.OP_ID,
241       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
242       "GANETI_DATA_DIR": constants.DATA_DIR,
243       }
244
245     if self.lu.HPATH is not None:
246       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
247       if lu_env:
248         for key in lu_env:
249           env["GANETI_" + key] = lu_env[key]
250     else:
251       lu_nodes_pre = lu_nodes_post = []
252
253     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
254
255   def _RunWrapper(self, node_list, hpath, phase):
256     """Simple wrapper over self.callfn.
257
258     This method fixes the environment before doing the rpc call.
259
260     """
261     env = self.env.copy()
262     env["GANETI_HOOKS_PHASE"] = phase
263     env["GANETI_HOOKS_PATH"] = hpath
264     if self.lu.sstore is not None:
265       env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
266       env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
267
268     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
269
270     return self.callfn(node_list, hpath, phase, env)
271
272   def RunPhase(self, phase):
273     """Run all the scripts for a phase.
274
275     This is the main function of the HookMaster.
276
277     Args:
278       phase: the hooks phase to run
279
280     Returns:
281       the result of the hooks multi-node rpc call
282
283     """
284     if not self.node_list[phase]:
285       # empty node list, we should not attempt to run this as either
286       # we're in the cluster init phase and the rpc client part can't
287       # even attempt to run, or this LU doesn't do hooks at all
288       return
289     hpath = self.lu.HPATH
290     results = self._RunWrapper(self.node_list[phase], hpath, phase)
291     if phase == constants.HOOKS_PHASE_PRE:
292       errs = []
293       if not results:
294         raise errors.HooksFailure("Communication failure")
295       for node_name in results:
296         res = results[node_name]
297         if res is False or not isinstance(res, list):
298           self.proc.LogWarning("Communication failure to node %s" % node_name)
299           continue
300         for script, hkr, output in res:
301           if hkr == constants.HKR_FAIL:
302             output = output.strip().encode("string_escape")
303             errs.append((node_name, script, output))
304       if errs:
305         raise errors.HooksAbort(errs)
306     return results
307
308   def RunConfigUpdate(self):
309     """Run the special configuration update hook
310
311     This is a special hook that runs only on the master after each
312     top-level LI if the configuration has been updated.
313
314     """
315     phase = constants.HOOKS_PHASE_POST
316     hpath = constants.HOOKS_NAME_CFGUPDATE
317     if self.lu.sstore is None:
318       raise errors.ProgrammerError("Null sstore on config update hook")
319     nodes = [self.lu.sstore.GetMasterNode()]
320     results = self._RunWrapper(nodes, hpath, phase)