Add a LU Hooks notification function
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import config
38 from ganeti import ssconf
39 from ganeti import logger
40
41
42 class Processor(object):
43   """Object which runs OpCodes"""
44   DISPATCH_TABLE = {
45     # Cluster
46     opcodes.OpInitCluster: cmdlib.LUInitCluster,
47     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
48     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
49     opcodes.OpClusterCopyFile: cmdlib.LUClusterCopyFile,
50     opcodes.OpRunClusterCommand: cmdlib.LURunClusterCommand,
51     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
52     opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
53     opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
54     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
55     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
56     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
57     # node lu
58     opcodes.OpAddNode: cmdlib.LUAddNode,
59     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
60     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
61     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
62     # instance lu
63     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
64     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
65     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
66     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
67     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
68     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
69     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
70     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
71     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
72     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
73     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
74     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
75     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
76     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
77     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
78     # os lu
79     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
80     # exports lu
81     opcodes.OpQueryExports: cmdlib.LUQueryExports,
82     opcodes.OpExportInstance: cmdlib.LUExportInstance,
83     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
84     # tags lu
85     opcodes.OpGetTags: cmdlib.LUGetTags,
86     opcodes.OpSearchTags: cmdlib.LUSearchTags,
87     opcodes.OpAddTags: cmdlib.LUAddTags,
88     opcodes.OpDelTags: cmdlib.LUDelTags,
89     # test lu
90     opcodes.OpTestDelay: cmdlib.LUTestDelay,
91     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
92     }
93
94   def __init__(self, feedback=None):
95     """Constructor for Processor
96
97     Args:
98      - feedback_fn: the feedback function (taking one string) to be run when
99                     interesting events are happening
100     """
101     self.cfg = None
102     self.sstore = None
103     self._feedback_fn = feedback
104
105   def ExecOpCode(self, op):
106     """Execute an opcode.
107
108     Args:
109       op: the opcode to be executed
110
111     """
112     if not isinstance(op, opcodes.OpCode):
113       raise errors.ProgrammerError("Non-opcode instance passed"
114                                    " to ExecOpcode")
115
116     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
117     if lu_class is None:
118       raise errors.OpCodeUnknown("Unknown opcode")
119
120     if lu_class.REQ_CLUSTER and self.cfg is None:
121       self.cfg = config.ConfigWriter()
122       self.sstore = ssconf.SimpleStore()
123     if self.cfg is not None:
124       write_count = self.cfg.write_count
125     else:
126       write_count = 0
127     lu = lu_class(self, op, self.cfg, self.sstore)
128     lu.CheckPrereq()
129     hm = HooksMaster(rpc.call_hooks_runner, self, lu)
130     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
131     lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
132                      h_results, self._feedback_fn, None)
133     try:
134       result = lu.Exec(self._feedback_fn)
135       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
136       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
137                        h_results, self._feedback_fn, result)
138     finally:
139       if lu.cfg is not None:
140         # we use lu.cfg and not self.cfg as for init cluster, self.cfg
141         # is None but lu.cfg has been recently initialized in the
142         # lu.Exec method
143         if write_count != lu.cfg.write_count:
144           hm.RunConfigUpdate()
145
146     return result
147
148   def ChainOpCode(self, op):
149     """Chain and execute an opcode.
150
151     This is used by LUs when they need to execute a child LU.
152
153     Args:
154      - opcode: the opcode to be executed
155
156     """
157     if not isinstance(op, opcodes.OpCode):
158       raise errors.ProgrammerError("Non-opcode instance passed"
159                                    " to ExecOpcode")
160
161     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
162     if lu_class is None:
163       raise errors.OpCodeUnknown("Unknown opcode")
164
165     if lu_class.REQ_CLUSTER and self.cfg is None:
166       self.cfg = config.ConfigWriter()
167       self.sstore = ssconf.SimpleStore()
168     #do_hooks = lu_class.HPATH is not None
169     lu = lu_class(self, op, self.cfg, self.sstore)
170     lu.CheckPrereq()
171     #if do_hooks:
172     #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
173     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
174     #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
175     #                   h_results, self._feedback_fn, None)
176     result = lu.Exec(self._feedback_fn)
177     #if do_hooks:
178     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
179     #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
180     #                   h_results, self._feedback_fn, result)
181     return result
182
183   def LogStep(self, current, total, message):
184     """Log a change in LU execution progress.
185
186     """
187     logger.Debug("Step %d/%d %s" % (current, total, message))
188     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
189
190   def LogWarning(self, message, hint=None):
191     """Log a warning to the logs and the user.
192
193     """
194     logger.Error(message)
195     self._feedback_fn(" - WARNING: %s" % message)
196     if hint:
197       self._feedback_fn("      Hint: %s" % hint)
198
199   def LogInfo(self, message):
200     """Log an informational message to the logs and the user.
201
202     """
203     logger.Info(message)
204     self._feedback_fn(" - INFO: %s" % message)
205
206
207 class HooksMaster(object):
208   """Hooks master.
209
210   This class distributes the run commands to the nodes based on the
211   specific LU class.
212
213   In order to remove the direct dependency on the rpc module, the
214   constructor needs a function which actually does the remote
215   call. This will usually be rpc.call_hooks_runner, but any function
216   which behaves the same works.
217
218   """
219   def __init__(self, callfn, proc, lu):
220     self.callfn = callfn
221     self.proc = proc
222     self.lu = lu
223     self.op = lu.op
224     self.env, node_list_pre, node_list_post = self._BuildEnv()
225     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
226                       constants.HOOKS_PHASE_POST: node_list_post}
227
228   def _BuildEnv(self):
229     """Compute the environment and the target nodes.
230
231     Based on the opcode and the current node list, this builds the
232     environment for the hooks and the target node list for the run.
233
234     """
235     env = {
236       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
237       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
238       "GANETI_OP_CODE": self.op.OP_ID,
239       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
240       "GANETI_DATA_DIR": constants.DATA_DIR,
241       }
242
243     if self.lu.HPATH is not None:
244       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
245       if lu_env:
246         for key in lu_env:
247           env["GANETI_" + key] = lu_env[key]
248     else:
249       lu_nodes_pre = lu_nodes_post = []
250
251     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
252
253   def _RunWrapper(self, node_list, hpath, phase):
254     """Simple wrapper over self.callfn.
255
256     This method fixes the environment before doing the rpc call.
257
258     """
259     env = self.env.copy()
260     env["GANETI_HOOKS_PHASE"] = phase
261     env["GANETI_HOOKS_PATH"] = hpath
262     if self.lu.sstore is not None:
263       env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
264       env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
265
266     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
267
268     return self.callfn(node_list, hpath, phase, env)
269
270   def RunPhase(self, phase):
271     """Run all the scripts for a phase.
272
273     This is the main function of the HookMaster.
274
275     Args:
276       phase: the hooks phase to run
277
278     Returns:
279       the result of the hooks multi-node rpc call
280
281     """
282     if not self.node_list[phase]:
283       # empty node list, we should not attempt to run this as either
284       # we're in the cluster init phase and the rpc client part can't
285       # even attempt to run, or this LU doesn't do hooks at all
286       return
287     hpath = self.lu.HPATH
288     results = self._RunWrapper(self.node_list[phase], hpath, phase)
289     if phase == constants.HOOKS_PHASE_PRE:
290       errs = []
291       if not results:
292         raise errors.HooksFailure("Communication failure")
293       for node_name in results:
294         res = results[node_name]
295         if res is False or not isinstance(res, list):
296           self.proc.LogWarning("Communication failure to node %s" % node_name)
297           continue
298         for script, hkr, output in res:
299           if hkr == constants.HKR_FAIL:
300             output = output.strip().encode("string_escape")
301             errs.append((node_name, script, output))
302       if errs:
303         raise errors.HooksAbort(errs)
304     return results
305
306   def RunConfigUpdate(self):
307     """Run the special configuration update hook
308
309     This is a special hook that runs only on the master after each
310     top-level LI if the configuration has been updated.
311
312     """
313     phase = constants.HOOKS_PHASE_POST
314     hpath = constants.HOOKS_NAME_CFGUPDATE
315     if self.lu.sstore is None:
316       raise errors.ProgrammerError("Null sstore on config update hook")
317     nodes = [self.lu.sstore.GetMasterNode()]
318     results = self._RunWrapper(nodes, hpath, phase)