Implement job 'waiting' status
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import logger
38 from ganeti import locking
39
40
41 class Processor(object):
42   """Object which runs OpCodes"""
43   DISPATCH_TABLE = {
44     # Cluster
45     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52     # node lu
53     opcodes.OpAddNode: cmdlib.LUAddNode,
54     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
55     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
56     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
57     # instance lu
58     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
59     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
60     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
61     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
62     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
63     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
64     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
65     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
66     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
67     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
68     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
69     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
70     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
71     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
72     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
73     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
74     # os lu
75     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
76     # exports lu
77     opcodes.OpQueryExports: cmdlib.LUQueryExports,
78     opcodes.OpExportInstance: cmdlib.LUExportInstance,
79     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
80     # tags lu
81     opcodes.OpGetTags: cmdlib.LUGetTags,
82     opcodes.OpSearchTags: cmdlib.LUSearchTags,
83     opcodes.OpAddTags: cmdlib.LUAddTags,
84     opcodes.OpDelTags: cmdlib.LUDelTags,
85     # test lu
86     opcodes.OpTestDelay: cmdlib.LUTestDelay,
87     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
88     }
89
90   def __init__(self, context):
91     """Constructor for Processor
92
93     Args:
94      - feedback_fn: the feedback function (taking one string) to be run when
95                     interesting events are happening
96     """
97     self.context = context
98     self._feedback_fn = None
99     self.exclusive_BGL = False
100
101   def _ExecLU(self, lu):
102     """Logical Unit execution sequence.
103
104     """
105     write_count = self.context.cfg.write_count
106     lu.CheckPrereq()
107     hm = HooksMaster(rpc.call_hooks_runner, self, lu)
108     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
109     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
110                      self._feedback_fn, None)
111     try:
112       result = lu.Exec(self._feedback_fn)
113       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
114       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
115                                 self._feedback_fn, result)
116     finally:
117       # FIXME: This needs locks if not lu_class.REQ_BGL
118       if write_count != self.context.cfg.write_count:
119         hm.RunConfigUpdate()
120
121     return result
122
123   def _LockAndExecLU(self, lu, level):
124     """Execute a Logical Unit, with the needed locks.
125
126     This is a recursive function that starts locking the given level, and
127     proceeds up, till there are no more locks to acquire. Then it executes the
128     given LU and its opcodes.
129
130     """
131     adding_locks = level in lu.add_locks
132     acquiring_locks = level in lu.needed_locks
133     if level not in locking.LEVELS:
134       if callable(self._run_notifier):
135         self._run_notifier()
136       result = self._ExecLU(lu)
137     elif adding_locks and acquiring_locks:
138       # We could both acquire and add locks at the same level, but for now we
139       # don't need this, so we'll avoid the complicated code needed.
140       raise NotImplementedError(
141         "Can't declare locks to acquire when adding others")
142     elif adding_locks or acquiring_locks:
143       lu.DeclareLocks(level)
144       share = lu.share_locks[level]
145       if acquiring_locks:
146         needed_locks = lu.needed_locks[level]
147         lu.acquired_locks[level] = self.context.glm.acquire(level,
148                                                             needed_locks,
149                                                             shared=share)
150       else: # adding_locks
151         add_locks = lu.add_locks[level]
152         lu.remove_locks[level] = add_locks
153         try:
154           self.context.glm.add(level, add_locks, acquired=1, shared=share)
155         except errors.LockError:
156           raise errors.OpPrereqError(
157             "Coudn't add locks (%s), probably because of a race condition"
158             " with another job, who added them first" % add_locks)
159       try:
160         try:
161           if adding_locks:
162             lu.acquired_locks[level] = add_locks
163           result = self._LockAndExecLU(lu, level + 1)
164         finally:
165           if level in lu.remove_locks:
166             self.context.glm.remove(level, lu.remove_locks[level])
167       finally:
168         if self.context.glm.is_owned(level):
169           self.context.glm.release(level)
170     else:
171       result = self._LockAndExecLU(lu, level + 1)
172
173     return result
174
175   def ExecOpCode(self, op, feedback_fn, run_notifier):
176     """Execute an opcode.
177
178     @type op: an OpCode instance
179     @param op: the opcode to be executed
180     @type feedback_fn: a function that takes a single argument
181     @param feedback_fn: this function will be used as feedback from the LU
182                         code to the end-user
183     @type run_notifier: callable (no arguments) or None
184     @param run_notifier:  this function (if callable) will be called when
185                           we are about to call the lu's Exec() method, that
186                           is, after we have aquired all locks
187
188     """
189     if not isinstance(op, opcodes.OpCode):
190       raise errors.ProgrammerError("Non-opcode instance passed"
191                                    " to ExecOpcode")
192
193     self._feedback_fn = feedback_fn
194     self._run_notifier = run_notifier
195     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
196     if lu_class is None:
197       raise errors.OpCodeUnknown("Unknown opcode")
198
199     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
200     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
201     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
202                              shared=not lu_class.REQ_BGL)
203     try:
204       self.exclusive_BGL = lu_class.REQ_BGL
205       lu = lu_class(self, op, self.context)
206       lu.ExpandNames()
207       assert lu.needed_locks is not None, "needed_locks not set by LU"
208       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
209     finally:
210       self.context.glm.release(locking.LEVEL_CLUSTER)
211       self.exclusive_BGL = False
212
213     return result
214
215   def LogStep(self, current, total, message):
216     """Log a change in LU execution progress.
217
218     """
219     logger.Debug("Step %d/%d %s" % (current, total, message))
220     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
221
222   def LogWarning(self, message, hint=None):
223     """Log a warning to the logs and the user.
224
225     """
226     logger.Error(message)
227     self._feedback_fn(" - WARNING: %s" % message)
228     if hint:
229       self._feedback_fn("      Hint: %s" % hint)
230
231   def LogInfo(self, message):
232     """Log an informational message to the logs and the user.
233
234     """
235     logger.Info(message)
236     self._feedback_fn(" - INFO: %s" % message)
237
238
239 class HooksMaster(object):
240   """Hooks master.
241
242   This class distributes the run commands to the nodes based on the
243   specific LU class.
244
245   In order to remove the direct dependency on the rpc module, the
246   constructor needs a function which actually does the remote
247   call. This will usually be rpc.call_hooks_runner, but any function
248   which behaves the same works.
249
250   """
251   def __init__(self, callfn, proc, lu):
252     self.callfn = callfn
253     self.proc = proc
254     self.lu = lu
255     self.op = lu.op
256     self.env, node_list_pre, node_list_post = self._BuildEnv()
257     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
258                       constants.HOOKS_PHASE_POST: node_list_post}
259
260   def _BuildEnv(self):
261     """Compute the environment and the target nodes.
262
263     Based on the opcode and the current node list, this builds the
264     environment for the hooks and the target node list for the run.
265
266     """
267     env = {
268       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
269       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
270       "GANETI_OP_CODE": self.op.OP_ID,
271       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
272       "GANETI_DATA_DIR": constants.DATA_DIR,
273       }
274
275     if self.lu.HPATH is not None:
276       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
277       if lu_env:
278         for key in lu_env:
279           env["GANETI_" + key] = lu_env[key]
280     else:
281       lu_nodes_pre = lu_nodes_post = []
282
283     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
284
285   def _RunWrapper(self, node_list, hpath, phase):
286     """Simple wrapper over self.callfn.
287
288     This method fixes the environment before doing the rpc call.
289
290     """
291     env = self.env.copy()
292     env["GANETI_HOOKS_PHASE"] = phase
293     env["GANETI_HOOKS_PATH"] = hpath
294     if self.lu.cfg is not None:
295       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
296       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
297
298     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
299
300     return self.callfn(node_list, hpath, phase, env)
301
302   def RunPhase(self, phase):
303     """Run all the scripts for a phase.
304
305     This is the main function of the HookMaster.
306
307     Args:
308       phase: the hooks phase to run
309
310     Returns:
311       the result of the hooks multi-node rpc call
312
313     """
314     if not self.node_list[phase]:
315       # empty node list, we should not attempt to run this as either
316       # we're in the cluster init phase and the rpc client part can't
317       # even attempt to run, or this LU doesn't do hooks at all
318       return
319     hpath = self.lu.HPATH
320     results = self._RunWrapper(self.node_list[phase], hpath, phase)
321     if phase == constants.HOOKS_PHASE_PRE:
322       errs = []
323       if not results:
324         raise errors.HooksFailure("Communication failure")
325       for node_name in results:
326         res = results[node_name]
327         if res is False or not isinstance(res, list):
328           self.proc.LogWarning("Communication failure to node %s" % node_name)
329           continue
330         for script, hkr, output in res:
331           if hkr == constants.HKR_FAIL:
332             output = output.strip().encode("string_escape")
333             errs.append((node_name, script, output))
334       if errs:
335         raise errors.HooksAbort(errs)
336     return results
337
338   def RunConfigUpdate(self):
339     """Run the special configuration update hook
340
341     This is a special hook that runs only on the master after each
342     top-level LI if the configuration has been updated.
343
344     """
345     phase = constants.HOOKS_PHASE_POST
346     hpath = constants.HOOKS_NAME_CFGUPDATE
347     nodes = [self.lu.cfg.GetMasterNode()]
348     results = self._RunWrapper(nodes, hpath, phase)