Fix cli.PollJob
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import ssconf
38 from ganeti import logger
39 from ganeti import locking
40
41
42 class Processor(object):
43   """Object which runs OpCodes"""
44   DISPATCH_TABLE = {
45     # Cluster
46     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49     opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
50     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53     # node lu
54     opcodes.OpAddNode: cmdlib.LUAddNode,
55     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
58     # instance lu
59     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
60     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
61     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
62     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
63     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
64     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
65     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
66     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
67     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
68     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
69     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
70     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
71     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
72     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
73     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
74     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
75     # os lu
76     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
77     # exports lu
78     opcodes.OpQueryExports: cmdlib.LUQueryExports,
79     opcodes.OpExportInstance: cmdlib.LUExportInstance,
80     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
81     # tags lu
82     opcodes.OpGetTags: cmdlib.LUGetTags,
83     opcodes.OpSearchTags: cmdlib.LUSearchTags,
84     opcodes.OpAddTags: cmdlib.LUAddTags,
85     opcodes.OpDelTags: cmdlib.LUDelTags,
86     # test lu
87     opcodes.OpTestDelay: cmdlib.LUTestDelay,
88     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
89     }
90
91   def __init__(self, context):
92     """Constructor for Processor
93
94     Args:
95      - feedback_fn: the feedback function (taking one string) to be run when
96                     interesting events are happening
97     """
98     self.context = context
99     self._feedback_fn = None
100     self.exclusive_BGL = False
101
102   def _ExecLU(self, lu):
103     """Logical Unit execution sequence.
104
105     """
106     write_count = self.context.cfg.write_count
107     lu.CheckPrereq()
108     hm = HooksMaster(rpc.call_hooks_runner, self, lu)
109     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
110     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
111                      self._feedback_fn, None)
112     try:
113       result = lu.Exec(self._feedback_fn)
114       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
115       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
116                                 self._feedback_fn, result)
117     finally:
118       # FIXME: This needs locks if not lu_class.REQ_BGL
119       if write_count != self.context.cfg.write_count:
120         hm.RunConfigUpdate()
121
122     return result
123
124   def _LockAndExecLU(self, lu, level):
125     """Execute a Logical Unit, with the needed locks.
126
127     This is a recursive function that starts locking the given level, and
128     proceeds up, till there are no more locks to acquire. Then it executes the
129     given LU and its opcodes.
130
131     """
132     if level in lu.needed_locks:
133       # This gives a chance to LUs to make last-minute changes after acquiring
134       # locks at any preceding level.
135       lu.DeclareLocks(level)
136       needed_locks = lu.needed_locks[level]
137       share = lu.share_locks[level]
138       # This is always safe to do, as we can't acquire more/less locks than
139       # what was requested.
140       lu.needed_locks[level] = self.context.glm.acquire(level,
141                                                         needed_locks,
142                                                         shared=share)
143       try:
144         result = self._LockAndExecLU(lu, level + 1)
145       finally:
146         if lu.needed_locks[level]:
147           self.context.glm.release(level)
148     else:
149       result = self._ExecLU(lu)
150
151     return result
152
153   def ExecOpCode(self, op, feedback_fn):
154     """Execute an opcode.
155
156     Args:
157       op: the opcode to be executed
158
159     """
160     if not isinstance(op, opcodes.OpCode):
161       raise errors.ProgrammerError("Non-opcode instance passed"
162                                    " to ExecOpcode")
163
164     self._feedback_fn = feedback_fn
165     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
166     if lu_class is None:
167       raise errors.OpCodeUnknown("Unknown opcode")
168
169     if lu_class.REQ_WSSTORE:
170       sstore = ssconf.WritableSimpleStore()
171     else:
172       sstore = ssconf.SimpleStore()
173
174     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
175     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
176     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
177                              shared=not lu_class.REQ_BGL)
178     try:
179       self.exclusive_BGL = lu_class.REQ_BGL
180       lu = lu_class(self, op, self.context, sstore)
181       lu.ExpandNames()
182       assert lu.needed_locks is not None, "needed_locks not set by LU"
183       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
184     finally:
185       self.context.glm.release(locking.LEVEL_CLUSTER)
186       self.exclusive_BGL = False
187
188     return result
189
190   def ChainOpCode(self, op):
191     """Chain and execute an opcode.
192
193     This is used by LUs when they need to execute a child LU.
194
195     Args:
196      - opcode: the opcode to be executed
197
198     """
199     if not isinstance(op, opcodes.OpCode):
200       raise errors.ProgrammerError("Non-opcode instance passed"
201                                    " to ExecOpcode")
202
203     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
204     if lu_class is None:
205       raise errors.OpCodeUnknown("Unknown opcode")
206
207     if lu_class.REQ_BGL and not self.exclusive_BGL:
208       raise errors.ProgrammerError("LUs which require the BGL cannot"
209                                    " be chained to granular ones.")
210
211     assert lu_class.REQ_BGL, "ChainOpCode is still BGL-only"
212
213     if lu_class.REQ_WSSTORE:
214       sstore = ssconf.WritableSimpleStore()
215     else:
216       sstore = ssconf.SimpleStore()
217
218     #do_hooks = lu_class.HPATH is not None
219     lu = lu_class(self, op, self.context, sstore)
220     lu.CheckPrereq()
221     #if do_hooks:
222     #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
223     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
224     #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
225     #                   h_results, self._feedback_fn, None)
226     result = lu.Exec(self._feedback_fn)
227     #if do_hooks:
228     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
229     #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
230     #                   h_results, self._feedback_fn, result)
231     return result
232
233   def LogStep(self, current, total, message):
234     """Log a change in LU execution progress.
235
236     """
237     logger.Debug("Step %d/%d %s" % (current, total, message))
238     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
239
240   def LogWarning(self, message, hint=None):
241     """Log a warning to the logs and the user.
242
243     """
244     logger.Error(message)
245     self._feedback_fn(" - WARNING: %s" % message)
246     if hint:
247       self._feedback_fn("      Hint: %s" % hint)
248
249   def LogInfo(self, message):
250     """Log an informational message to the logs and the user.
251
252     """
253     logger.Info(message)
254     self._feedback_fn(" - INFO: %s" % message)
255
256
257 class HooksMaster(object):
258   """Hooks master.
259
260   This class distributes the run commands to the nodes based on the
261   specific LU class.
262
263   In order to remove the direct dependency on the rpc module, the
264   constructor needs a function which actually does the remote
265   call. This will usually be rpc.call_hooks_runner, but any function
266   which behaves the same works.
267
268   """
269   def __init__(self, callfn, proc, lu):
270     self.callfn = callfn
271     self.proc = proc
272     self.lu = lu
273     self.op = lu.op
274     self.env, node_list_pre, node_list_post = self._BuildEnv()
275     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
276                       constants.HOOKS_PHASE_POST: node_list_post}
277
278   def _BuildEnv(self):
279     """Compute the environment and the target nodes.
280
281     Based on the opcode and the current node list, this builds the
282     environment for the hooks and the target node list for the run.
283
284     """
285     env = {
286       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
287       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
288       "GANETI_OP_CODE": self.op.OP_ID,
289       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
290       "GANETI_DATA_DIR": constants.DATA_DIR,
291       }
292
293     if self.lu.HPATH is not None:
294       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
295       if lu_env:
296         for key in lu_env:
297           env["GANETI_" + key] = lu_env[key]
298     else:
299       lu_nodes_pre = lu_nodes_post = []
300
301     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
302
303   def _RunWrapper(self, node_list, hpath, phase):
304     """Simple wrapper over self.callfn.
305
306     This method fixes the environment before doing the rpc call.
307
308     """
309     env = self.env.copy()
310     env["GANETI_HOOKS_PHASE"] = phase
311     env["GANETI_HOOKS_PATH"] = hpath
312     if self.lu.sstore is not None:
313       env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
314       env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
315
316     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
317
318     return self.callfn(node_list, hpath, phase, env)
319
320   def RunPhase(self, phase):
321     """Run all the scripts for a phase.
322
323     This is the main function of the HookMaster.
324
325     Args:
326       phase: the hooks phase to run
327
328     Returns:
329       the result of the hooks multi-node rpc call
330
331     """
332     if not self.node_list[phase]:
333       # empty node list, we should not attempt to run this as either
334       # we're in the cluster init phase and the rpc client part can't
335       # even attempt to run, or this LU doesn't do hooks at all
336       return
337     hpath = self.lu.HPATH
338     results = self._RunWrapper(self.node_list[phase], hpath, phase)
339     if phase == constants.HOOKS_PHASE_PRE:
340       errs = []
341       if not results:
342         raise errors.HooksFailure("Communication failure")
343       for node_name in results:
344         res = results[node_name]
345         if res is False or not isinstance(res, list):
346           self.proc.LogWarning("Communication failure to node %s" % node_name)
347           continue
348         for script, hkr, output in res:
349           if hkr == constants.HKR_FAIL:
350             output = output.strip().encode("string_escape")
351             errs.append((node_name, script, output))
352       if errs:
353         raise errors.HooksAbort(errs)
354     return results
355
356   def RunConfigUpdate(self):
357     """Run the special configuration update hook
358
359     This is a special hook that runs only on the master after each
360     top-level LI if the configuration has been updated.
361
362     """
363     phase = constants.HOOKS_PHASE_POST
364     hpath = constants.HOOKS_NAME_CFGUPDATE
365     if self.lu.sstore is None:
366       raise errors.ProgrammerError("Null sstore on config update hook")
367     nodes = [self.lu.sstore.GetMasterNode()]
368     results = self._RunWrapper(nodes, hpath, phase)