Processor: remove ChainOpCode
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import ssconf
38 from ganeti import logger
39 from ganeti import locking
40
41
42 class Processor(object):
43   """Object which runs OpCodes"""
44   DISPATCH_TABLE = {
45     # Cluster
46     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49     opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
50     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53     # node lu
54     opcodes.OpAddNode: cmdlib.LUAddNode,
55     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
58     # instance lu
59     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
60     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
61     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
62     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
63     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
64     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
65     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
66     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
67     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
68     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
69     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
70     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
71     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
72     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
73     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
74     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
75     # os lu
76     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
77     # exports lu
78     opcodes.OpQueryExports: cmdlib.LUQueryExports,
79     opcodes.OpExportInstance: cmdlib.LUExportInstance,
80     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
81     # tags lu
82     opcodes.OpGetTags: cmdlib.LUGetTags,
83     opcodes.OpSearchTags: cmdlib.LUSearchTags,
84     opcodes.OpAddTags: cmdlib.LUAddTags,
85     opcodes.OpDelTags: cmdlib.LUDelTags,
86     # test lu
87     opcodes.OpTestDelay: cmdlib.LUTestDelay,
88     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
89     }
90
91   def __init__(self, context):
92     """Constructor for Processor
93
94     Args:
95      - feedback_fn: the feedback function (taking one string) to be run when
96                     interesting events are happening
97     """
98     self.context = context
99     self._feedback_fn = None
100     self.exclusive_BGL = False
101
102   def _ExecLU(self, lu):
103     """Logical Unit execution sequence.
104
105     """
106     write_count = self.context.cfg.write_count
107     lu.CheckPrereq()
108     hm = HooksMaster(rpc.call_hooks_runner, self, lu)
109     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
110     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
111                      self._feedback_fn, None)
112     try:
113       result = lu.Exec(self._feedback_fn)
114       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
115       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
116                                 self._feedback_fn, result)
117     finally:
118       # FIXME: This needs locks if not lu_class.REQ_BGL
119       if write_count != self.context.cfg.write_count:
120         hm.RunConfigUpdate()
121
122     return result
123
124   def _LockAndExecLU(self, lu, level):
125     """Execute a Logical Unit, with the needed locks.
126
127     This is a recursive function that starts locking the given level, and
128     proceeds up, till there are no more locks to acquire. Then it executes the
129     given LU and its opcodes.
130
131     """
132     if level not in locking.LEVELS:
133       result = self._ExecLU(lu)
134     elif level in lu.needed_locks:
135       # This gives a chance to LUs to make last-minute changes after acquiring
136       # locks at any preceding level.
137       lu.DeclareLocks(level)
138       needed_locks = lu.needed_locks[level]
139       share = lu.share_locks[level]
140       # This is always safe to do, as we can't acquire more/less locks than
141       # what was requested.
142       lu.acquired_locks[level] = self.context.glm.acquire(level,
143                                                           needed_locks,
144                                                           shared=share)
145       try:
146         result = self._LockAndExecLU(lu, level + 1)
147       finally:
148         # We need to release the current level if we acquired any lock, or if
149         # we acquired the set-lock (needed_locks is None)
150         if lu.needed_locks[level] is None or lu.acquired_locks[level]:
151           self.context.glm.release(level)
152     else:
153       result = self._LockAndExecLU(lu, level + 1)
154
155     return result
156
157   def ExecOpCode(self, op, feedback_fn):
158     """Execute an opcode.
159
160     Args:
161       op: the opcode to be executed
162
163     """
164     if not isinstance(op, opcodes.OpCode):
165       raise errors.ProgrammerError("Non-opcode instance passed"
166                                    " to ExecOpcode")
167
168     self._feedback_fn = feedback_fn
169     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
170     if lu_class is None:
171       raise errors.OpCodeUnknown("Unknown opcode")
172
173     if lu_class.REQ_WSSTORE:
174       sstore = ssconf.WritableSimpleStore()
175     else:
176       sstore = ssconf.SimpleStore()
177
178     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
179     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
180     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
181                              shared=not lu_class.REQ_BGL)
182     try:
183       self.exclusive_BGL = lu_class.REQ_BGL
184       lu = lu_class(self, op, self.context, sstore)
185       lu.ExpandNames()
186       assert lu.needed_locks is not None, "needed_locks not set by LU"
187       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
188     finally:
189       self.context.glm.release(locking.LEVEL_CLUSTER)
190       self.exclusive_BGL = False
191
192     return result
193
194   def LogStep(self, current, total, message):
195     """Log a change in LU execution progress.
196
197     """
198     logger.Debug("Step %d/%d %s" % (current, total, message))
199     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
200
201   def LogWarning(self, message, hint=None):
202     """Log a warning to the logs and the user.
203
204     """
205     logger.Error(message)
206     self._feedback_fn(" - WARNING: %s" % message)
207     if hint:
208       self._feedback_fn("      Hint: %s" % hint)
209
210   def LogInfo(self, message):
211     """Log an informational message to the logs and the user.
212
213     """
214     logger.Info(message)
215     self._feedback_fn(" - INFO: %s" % message)
216
217
218 class HooksMaster(object):
219   """Hooks master.
220
221   This class distributes the run commands to the nodes based on the
222   specific LU class.
223
224   In order to remove the direct dependency on the rpc module, the
225   constructor needs a function which actually does the remote
226   call. This will usually be rpc.call_hooks_runner, but any function
227   which behaves the same works.
228
229   """
230   def __init__(self, callfn, proc, lu):
231     self.callfn = callfn
232     self.proc = proc
233     self.lu = lu
234     self.op = lu.op
235     self.env, node_list_pre, node_list_post = self._BuildEnv()
236     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
237                       constants.HOOKS_PHASE_POST: node_list_post}
238
239   def _BuildEnv(self):
240     """Compute the environment and the target nodes.
241
242     Based on the opcode and the current node list, this builds the
243     environment for the hooks and the target node list for the run.
244
245     """
246     env = {
247       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
248       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
249       "GANETI_OP_CODE": self.op.OP_ID,
250       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
251       "GANETI_DATA_DIR": constants.DATA_DIR,
252       }
253
254     if self.lu.HPATH is not None:
255       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
256       if lu_env:
257         for key in lu_env:
258           env["GANETI_" + key] = lu_env[key]
259     else:
260       lu_nodes_pre = lu_nodes_post = []
261
262     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
263
264   def _RunWrapper(self, node_list, hpath, phase):
265     """Simple wrapper over self.callfn.
266
267     This method fixes the environment before doing the rpc call.
268
269     """
270     env = self.env.copy()
271     env["GANETI_HOOKS_PHASE"] = phase
272     env["GANETI_HOOKS_PATH"] = hpath
273     if self.lu.sstore is not None:
274       env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
275       env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
276
277     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
278
279     return self.callfn(node_list, hpath, phase, env)
280
281   def RunPhase(self, phase):
282     """Run all the scripts for a phase.
283
284     This is the main function of the HookMaster.
285
286     Args:
287       phase: the hooks phase to run
288
289     Returns:
290       the result of the hooks multi-node rpc call
291
292     """
293     if not self.node_list[phase]:
294       # empty node list, we should not attempt to run this as either
295       # we're in the cluster init phase and the rpc client part can't
296       # even attempt to run, or this LU doesn't do hooks at all
297       return
298     hpath = self.lu.HPATH
299     results = self._RunWrapper(self.node_list[phase], hpath, phase)
300     if phase == constants.HOOKS_PHASE_PRE:
301       errs = []
302       if not results:
303         raise errors.HooksFailure("Communication failure")
304       for node_name in results:
305         res = results[node_name]
306         if res is False or not isinstance(res, list):
307           self.proc.LogWarning("Communication failure to node %s" % node_name)
308           continue
309         for script, hkr, output in res:
310           if hkr == constants.HKR_FAIL:
311             output = output.strip().encode("string_escape")
312             errs.append((node_name, script, output))
313       if errs:
314         raise errors.HooksAbort(errs)
315     return results
316
317   def RunConfigUpdate(self):
318     """Run the special configuration update hook
319
320     This is a special hook that runs only on the master after each
321     top-level LI if the configuration has been updated.
322
323     """
324     phase = constants.HOOKS_PHASE_POST
325     hpath = constants.HOOKS_NAME_CFGUPDATE
326     if self.lu.sstore is None:
327       raise errors.ProgrammerError("Null sstore on config update hook")
328     nodes = [self.lu.sstore.GetMasterNode()]
329     results = self._RunWrapper(nodes, hpath, phase)