Documentation updates for mcpu.py
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32
33 from ganeti import opcodes
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import rpc
37 from ganeti import cmdlib
38 from ganeti import locking
39
40
41 class Processor(object):
42   """Object which runs OpCodes"""
43   DISPATCH_TABLE = {
44     # Cluster
45     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52     # node lu
53     opcodes.OpAddNode: cmdlib.LUAddNode,
54     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
55     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
56     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
57     # instance lu
58     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
59     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
60     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
61     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
62     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
63     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
64     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
65     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
66     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
67     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
68     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
69     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
70     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
71     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
72     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
73     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
74     # os lu
75     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
76     # exports lu
77     opcodes.OpQueryExports: cmdlib.LUQueryExports,
78     opcodes.OpExportInstance: cmdlib.LUExportInstance,
79     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
80     # tags lu
81     opcodes.OpGetTags: cmdlib.LUGetTags,
82     opcodes.OpSearchTags: cmdlib.LUSearchTags,
83     opcodes.OpAddTags: cmdlib.LUAddTags,
84     opcodes.OpDelTags: cmdlib.LUDelTags,
85     # test lu
86     opcodes.OpTestDelay: cmdlib.LUTestDelay,
87     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
88     }
89
90   def __init__(self, context):
91     """Constructor for Processor
92
93     Args:
94      - feedback_fn: the feedback function (taking one string) to be run when
95                     interesting events are happening
96     """
97     self.context = context
98     self._feedback_fn = None
99     self.exclusive_BGL = False
100     self.rpc = rpc.RpcRunner(context.cfg)
101
102   def _ExecLU(self, lu):
103     """Logical Unit execution sequence.
104
105     """
106     write_count = self.context.cfg.write_count
107     lu.CheckPrereq()
108     hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
109     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
110     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
111                      self._feedback_fn, None)
112     try:
113       result = lu.Exec(self._feedback_fn)
114       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
115       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
116                                 self._feedback_fn, result)
117     finally:
118       # FIXME: This needs locks if not lu_class.REQ_BGL
119       if write_count != self.context.cfg.write_count:
120         hm.RunConfigUpdate()
121
122     return result
123
124   def _LockAndExecLU(self, lu, level):
125     """Execute a Logical Unit, with the needed locks.
126
127     This is a recursive function that starts locking the given level, and
128     proceeds up, till there are no more locks to acquire. Then it executes the
129     given LU and its opcodes.
130
131     """
132     adding_locks = level in lu.add_locks
133     acquiring_locks = level in lu.needed_locks
134     if level not in locking.LEVELS:
135       if callable(self._run_notifier):
136         self._run_notifier()
137       result = self._ExecLU(lu)
138     elif adding_locks and acquiring_locks:
139       # We could both acquire and add locks at the same level, but for now we
140       # don't need this, so we'll avoid the complicated code needed.
141       raise NotImplementedError(
142         "Can't declare locks to acquire when adding others")
143     elif adding_locks or acquiring_locks:
144       lu.DeclareLocks(level)
145       share = lu.share_locks[level]
146       if acquiring_locks:
147         needed_locks = lu.needed_locks[level]
148         lu.acquired_locks[level] = self.context.glm.acquire(level,
149                                                             needed_locks,
150                                                             shared=share)
151       else: # adding_locks
152         add_locks = lu.add_locks[level]
153         lu.remove_locks[level] = add_locks
154         try:
155           self.context.glm.add(level, add_locks, acquired=1, shared=share)
156         except errors.LockError:
157           raise errors.OpPrereqError(
158             "Coudn't add locks (%s), probably because of a race condition"
159             " with another job, who added them first" % add_locks)
160       try:
161         try:
162           if adding_locks:
163             lu.acquired_locks[level] = add_locks
164           result = self._LockAndExecLU(lu, level + 1)
165         finally:
166           if level in lu.remove_locks:
167             self.context.glm.remove(level, lu.remove_locks[level])
168       finally:
169         if self.context.glm.is_owned(level):
170           self.context.glm.release(level)
171     else:
172       result = self._LockAndExecLU(lu, level + 1)
173
174     return result
175
176   def ExecOpCode(self, op, feedback_fn, run_notifier):
177     """Execute an opcode.
178
179     @type op: an OpCode instance
180     @param op: the opcode to be executed
181     @type feedback_fn: a function that takes a single argument
182     @param feedback_fn: this function will be used as feedback from the LU
183                         code to the end-user
184     @type run_notifier: callable (no arguments) or None
185     @param run_notifier:  this function (if callable) will be called when
186                           we are about to call the lu's Exec() method, that
187                           is, after we have aquired all locks
188
189     """
190     if not isinstance(op, opcodes.OpCode):
191       raise errors.ProgrammerError("Non-opcode instance passed"
192                                    " to ExecOpcode")
193
194     self._feedback_fn = feedback_fn
195     self._run_notifier = run_notifier
196     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
197     if lu_class is None:
198       raise errors.OpCodeUnknown("Unknown opcode")
199
200     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
201     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
202     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
203                              shared=not lu_class.REQ_BGL)
204     try:
205       self.exclusive_BGL = lu_class.REQ_BGL
206       lu = lu_class(self, op, self.context, self.rpc)
207       lu.ExpandNames()
208       assert lu.needed_locks is not None, "needed_locks not set by LU"
209       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
210     finally:
211       self.context.glm.release(locking.LEVEL_CLUSTER)
212       self.exclusive_BGL = False
213
214     return result
215
216   def LogStep(self, current, total, message):
217     """Log a change in LU execution progress.
218
219     """
220     logging.debug("Step %d/%d %s", current, total, message)
221     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
222
223   def LogWarning(self, message, *args, **kwargs):
224     """Log a warning to the logs and the user.
225
226     The optional keyword argument is 'hint' and can be used to show a
227     hint to the user (presumably related to the warning). If the
228     message is empty, it will not be printed at all, allowing one to
229     show only a hint.
230
231     """
232     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
233            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
234     if args:
235       message = message % tuple(args)
236     if message:
237       logging.warning(message)
238       self._feedback_fn(" - WARNING: %s" % message)
239     if "hint" in kwargs:
240       self._feedback_fn("      Hint: %s" % kwargs["hint"])
241
242   def LogInfo(self, message, *args):
243     """Log an informational message to the logs and the user.
244
245     """
246     if args:
247       message = message % tuple(args)
248     logging.info(message)
249     self._feedback_fn(" - INFO: %s" % message)
250
251
252 class HooksMaster(object):
253   """Hooks master.
254
255   This class distributes the run commands to the nodes based on the
256   specific LU class.
257
258   In order to remove the direct dependency on the rpc module, the
259   constructor needs a function which actually does the remote
260   call. This will usually be rpc.call_hooks_runner, but any function
261   which behaves the same works.
262
263   """
264   def __init__(self, callfn, proc, lu):
265     self.callfn = callfn
266     self.proc = proc
267     self.lu = lu
268     self.op = lu.op
269     self.env, node_list_pre, node_list_post = self._BuildEnv()
270     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
271                       constants.HOOKS_PHASE_POST: node_list_post}
272
273   def _BuildEnv(self):
274     """Compute the environment and the target nodes.
275
276     Based on the opcode and the current node list, this builds the
277     environment for the hooks and the target node list for the run.
278
279     """
280     env = {
281       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
282       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
283       "GANETI_OP_CODE": self.op.OP_ID,
284       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
285       "GANETI_DATA_DIR": constants.DATA_DIR,
286       }
287
288     if self.lu.HPATH is not None:
289       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
290       if lu_env:
291         for key in lu_env:
292           env["GANETI_" + key] = lu_env[key]
293     else:
294       lu_nodes_pre = lu_nodes_post = []
295
296     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
297
298   def _RunWrapper(self, node_list, hpath, phase):
299     """Simple wrapper over self.callfn.
300
301     This method fixes the environment before doing the rpc call.
302
303     """
304     env = self.env.copy()
305     env["GANETI_HOOKS_PHASE"] = phase
306     env["GANETI_HOOKS_PATH"] = hpath
307     if self.lu.cfg is not None:
308       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
309       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
310
311     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
312
313     return self.callfn(node_list, hpath, phase, env)
314
315   def RunPhase(self, phase):
316     """Run all the scripts for a phase.
317
318     This is the main function of the HookMaster.
319
320     @param phase: one of L{constants.HOOKS_PHASE_POST} or
321         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
322     @return: the processed results of the hooks multi-node rpc call
323     @raise errors.HooksFailure: on communication failure to the nodes
324
325     """
326     if not self.node_list[phase]:
327       # empty node list, we should not attempt to run this as either
328       # we're in the cluster init phase and the rpc client part can't
329       # even attempt to run, or this LU doesn't do hooks at all
330       return
331     hpath = self.lu.HPATH
332     results = self._RunWrapper(self.node_list[phase], hpath, phase)
333     if phase == constants.HOOKS_PHASE_PRE:
334       errs = []
335       if not results:
336         raise errors.HooksFailure("Communication failure")
337       for node_name in results:
338         res = results[node_name]
339         if res is False or not isinstance(res, list):
340           self.proc.LogWarning("Communication failure to node %s" % node_name)
341           continue
342         for script, hkr, output in res:
343           if hkr == constants.HKR_FAIL:
344             output = output.strip().encode("string_escape")
345             errs.append((node_name, script, output))
346       if errs:
347         raise errors.HooksAbort(errs)
348     return results
349
350   def RunConfigUpdate(self):
351     """Run the special configuration update hook
352
353     This is a special hook that runs only on the master after each
354     top-level LI if the configuration has been updated.
355
356     """
357     phase = constants.HOOKS_PHASE_POST
358     hpath = constants.HOOKS_NAME_CFGUPDATE
359     nodes = [self.lu.cfg.GetMasterNode()]
360     results = self._RunWrapper(nodes, hpath, phase)