Add a test opcode that sleeps for a given duration
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import config
38 from ganeti import ssconf
39 from ganeti import logger
40
41 class Processor(object):
42   """Object which runs OpCodes"""
43   DISPATCH_TABLE = {
44     # Cluster
45     opcodes.OpInitCluster: cmdlib.LUInitCluster,
46     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48     opcodes.OpClusterCopyFile: cmdlib.LUClusterCopyFile,
49     opcodes.OpRunClusterCommand: cmdlib.LURunClusterCommand,
50     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
51     opcodes.OpMasterFailover: cmdlib.LUMasterFailover,
52     opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
53     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
54     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
55     # node lu
56     opcodes.OpAddNode: cmdlib.LUAddNode,
57     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
60     # instance lu
61     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
62     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
63     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
64     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
65     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
66     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
67     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
68     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
69     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
70     opcodes.OpAddMDDRBDComponent: cmdlib.LUAddMDDRBDComponent,
71     opcodes.OpRemoveMDDRBDComponent: cmdlib.LURemoveMDDRBDComponent,
72     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
73     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
74     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
75     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
76     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
77     opcodes.OpSetInstanceParms: cmdlib.LUSetInstanceParms,
78     # os lu
79     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
80     # exports lu
81     opcodes.OpQueryExports: cmdlib.LUQueryExports,
82     opcodes.OpExportInstance: cmdlib.LUExportInstance,
83     # tags lu
84     opcodes.OpGetTags: cmdlib.LUGetTags,
85     opcodes.OpSearchTags: cmdlib.LUSearchTags,
86     opcodes.OpAddTags: cmdlib.LUAddTags,
87     opcodes.OpDelTags: cmdlib.LUDelTags,
88     # test lu
89     opcodes.OpTestDelay: cmdlib.LUTestDelay,
90     }
91
92   def __init__(self, feedback=None):
93     """Constructor for Processor
94
95     Args:
96      - feedback_fn: the feedback function (taking one string) to be run when
97                     interesting events are happening
98     """
99     self.cfg = None
100     self.sstore = None
101     self._feedback_fn = feedback
102
103   def ExecOpCode(self, op):
104     """Execute an opcode.
105
106     Args:
107      - cfg: the configuration in which we execute this opcode
108      - opcode: the opcode to be executed
109
110     """
111     if not isinstance(op, opcodes.OpCode):
112       raise errors.ProgrammerError("Non-opcode instance passed"
113                                    " to ExecOpcode")
114
115     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
116     if lu_class is None:
117       raise errors.OpCodeUnknown("Unknown opcode")
118
119     if lu_class.REQ_CLUSTER and self.cfg is None:
120       self.cfg = config.ConfigWriter()
121       self.sstore = ssconf.SimpleStore()
122     if self.cfg is not None:
123       write_count = self.cfg.write_count
124     else:
125       write_count = 0
126     lu = lu_class(self, op, self.cfg, self.sstore)
127     lu.CheckPrereq()
128     hm = HooksMaster(rpc.call_hooks_runner, self, lu)
129     hm.RunPhase(constants.HOOKS_PHASE_PRE)
130     result = lu.Exec(self._feedback_fn)
131     hm.RunPhase(constants.HOOKS_PHASE_POST)
132     if lu.cfg is not None:
133       # we use lu.cfg and not self.cfg as for init cluster, self.cfg
134       # is None but lu.cfg has been recently initialized in the
135       # lu.Exec method
136       if write_count != lu.cfg.write_count:
137         hm.RunConfigUpdate()
138
139     return result
140
141   def ChainOpCode(self, op):
142     """Chain and execute an opcode.
143
144     This is used by LUs when they need to execute a child LU.
145
146     Args:
147      - opcode: the opcode to be executed
148
149     """
150     if not isinstance(op, opcodes.OpCode):
151       raise errors.ProgrammerError("Non-opcode instance passed"
152                                    " to ExecOpcode")
153
154     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
155     if lu_class is None:
156       raise errors.OpCodeUnknown("Unknown opcode")
157
158     if lu_class.REQ_CLUSTER and self.cfg is None:
159       self.cfg = config.ConfigWriter()
160       self.sstore = ssconf.SimpleStore()
161     #do_hooks = lu_class.HPATH is not None
162     lu = lu_class(self, op, self.cfg, self.sstore)
163     lu.CheckPrereq()
164     #if do_hooks:
165     #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
166     #  hm.RunPhase(constants.HOOKS_PHASE_PRE)
167     result = lu.Exec(self._feedback_fn)
168     #if do_hooks:
169     #  hm.RunPhase(constants.HOOKS_PHASE_POST)
170     return result
171
172   def LogStep(self, current, total, message):
173     """Log a change in LU execution progress.
174
175     """
176     logger.Debug("Step %d/%d %s" % (current, total, message))
177     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
178
179   def LogWarning(self, message, hint=None):
180     """Log a warning to the logs and the user.
181
182     """
183     logger.Error(message)
184     self._feedback_fn(" - WARNING: %s" % message)
185     if hint:
186       self._feedback_fn("      Hint: %s" % hint)
187
188   def LogInfo(self, message):
189     """Log an informational message to the logs and the user.
190
191     """
192     logger.Info(message)
193     self._feedback_fn(" - INFO: %s" % message)
194
195
196 class HooksMaster(object):
197   """Hooks master.
198
199   This class distributes the run commands to the nodes based on the
200   specific LU class.
201
202   In order to remove the direct dependency on the rpc module, the
203   constructor needs a function which actually does the remote
204   call. This will usually be rpc.call_hooks_runner, but any function
205   which behaves the same works.
206
207   """
208   def __init__(self, callfn, proc, lu):
209     self.callfn = callfn
210     self.proc = proc
211     self.lu = lu
212     self.op = lu.op
213     self.env, node_list_pre, node_list_post = self._BuildEnv()
214     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
215                       constants.HOOKS_PHASE_POST: node_list_post}
216
217   def _BuildEnv(self):
218     """Compute the environment and the target nodes.
219
220     Based on the opcode and the current node list, this builds the
221     environment for the hooks and the target node list for the run.
222
223     """
224     env = {
225       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
226       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
227       "GANETI_OP_CODE": self.op.OP_ID,
228       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
229       "GANETI_DATA_DIR": constants.DATA_DIR,
230       }
231
232     if self.lu.HPATH is not None:
233       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
234       if lu_env:
235         for key in lu_env:
236           env["GANETI_" + key] = lu_env[key]
237     else:
238       lu_nodes_pre = lu_nodes_post = []
239
240     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
241
242   def _RunWrapper(self, node_list, hpath, phase):
243     """Simple wrapper over self.callfn.
244
245     This method fixes the environment before doing the rpc call.
246
247     """
248     env = self.env.copy()
249     env["GANETI_HOOKS_PHASE"] = phase
250     env["GANETI_HOOKS_PATH"] = hpath
251     if self.lu.sstore is not None:
252       env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
253       env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
254
255     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
256
257     return self.callfn(node_list, hpath, phase, env)
258
259   def RunPhase(self, phase):
260     """Run all the scripts for a phase.
261
262     This is the main function of the HookMaster.
263
264     """
265     if not self.node_list[phase]:
266       # empty node list, we should not attempt to run this as either
267       # we're in the cluster init phase and the rpc client part can't
268       # even attempt to run, or this LU doesn't do hooks at all
269       return
270     hpath = self.lu.HPATH
271     results = self._RunWrapper(self.node_list[phase], hpath, phase)
272     if phase == constants.HOOKS_PHASE_PRE:
273       errs = []
274       if not results:
275         raise errors.HooksFailure("Communication failure")
276       for node_name in results:
277         res = results[node_name]
278         if res is False or not isinstance(res, list):
279           self.proc.LogWarning("Communication failure to node %s" % node_name)
280           continue
281         for script, hkr, output in res:
282           if hkr == constants.HKR_FAIL:
283             output = output.strip().encode("string_escape")
284             errs.append((node_name, script, output))
285       if errs:
286         raise errors.HooksAbort(errs)
287
288   def RunConfigUpdate(self):
289     """Run the special configuration update hook
290
291     This is a special hook that runs only on the master after each
292     top-level LI if the configuration has been updated.
293
294     """
295     phase = constants.HOOKS_PHASE_POST
296     hpath = constants.HOOKS_NAME_CFGUPDATE
297     if self.lu.sstore is None:
298       raise errors.ProgrammerError("Null sstore on config update hook")
299     nodes = [self.lu.sstore.GetMasterNode()]
300     results = self._RunWrapper(nodes, hpath, phase)