Convert rpc results to a custom type
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32
33 from ganeti import opcodes
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import rpc
37 from ganeti import cmdlib
38 from ganeti import locking
39
40
41 class Processor(object):
42   """Object which runs OpCodes"""
43   DISPATCH_TABLE = {
44     # Cluster
45     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52     # node lu
53     opcodes.OpAddNode: cmdlib.LUAddNode,
54     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
55     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
56     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
57     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
58     # instance lu
59     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
60     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
61     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
62     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
63     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
64     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
65     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
66     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
67     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
68     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
69     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
70     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
71     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
72     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
73     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
74     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
75     # os lu
76     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
77     # exports lu
78     opcodes.OpQueryExports: cmdlib.LUQueryExports,
79     opcodes.OpExportInstance: cmdlib.LUExportInstance,
80     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
81     # tags lu
82     opcodes.OpGetTags: cmdlib.LUGetTags,
83     opcodes.OpSearchTags: cmdlib.LUSearchTags,
84     opcodes.OpAddTags: cmdlib.LUAddTags,
85     opcodes.OpDelTags: cmdlib.LUDelTags,
86     # test lu
87     opcodes.OpTestDelay: cmdlib.LUTestDelay,
88     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
89     }
90
91   def __init__(self, context):
92     """Constructor for Processor
93
94     Args:
95      - feedback_fn: the feedback function (taking one string) to be run when
96                     interesting events are happening
97     """
98     self.context = context
99     self._feedback_fn = None
100     self.exclusive_BGL = False
101     self.rpc = rpc.RpcRunner(context.cfg)
102
103   def _ExecLU(self, lu):
104     """Logical Unit execution sequence.
105
106     """
107     write_count = self.context.cfg.write_count
108     lu.CheckPrereq()
109     hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
110     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
111     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
112                      self._feedback_fn, None)
113     try:
114       result = lu.Exec(self._feedback_fn)
115       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
116       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
117                                 self._feedback_fn, result)
118     finally:
119       # FIXME: This needs locks if not lu_class.REQ_BGL
120       if write_count != self.context.cfg.write_count:
121         hm.RunConfigUpdate()
122
123     return result
124
125   def _LockAndExecLU(self, lu, level):
126     """Execute a Logical Unit, with the needed locks.
127
128     This is a recursive function that starts locking the given level, and
129     proceeds up, till there are no more locks to acquire. Then it executes the
130     given LU and its opcodes.
131
132     """
133     adding_locks = level in lu.add_locks
134     acquiring_locks = level in lu.needed_locks
135     if level not in locking.LEVELS:
136       if callable(self._run_notifier):
137         self._run_notifier()
138       result = self._ExecLU(lu)
139     elif adding_locks and acquiring_locks:
140       # We could both acquire and add locks at the same level, but for now we
141       # don't need this, so we'll avoid the complicated code needed.
142       raise NotImplementedError(
143         "Can't declare locks to acquire when adding others")
144     elif adding_locks or acquiring_locks:
145       lu.DeclareLocks(level)
146       share = lu.share_locks[level]
147       if acquiring_locks:
148         needed_locks = lu.needed_locks[level]
149         lu.acquired_locks[level] = self.context.glm.acquire(level,
150                                                             needed_locks,
151                                                             shared=share)
152       else: # adding_locks
153         add_locks = lu.add_locks[level]
154         lu.remove_locks[level] = add_locks
155         try:
156           self.context.glm.add(level, add_locks, acquired=1, shared=share)
157         except errors.LockError:
158           raise errors.OpPrereqError(
159             "Coudn't add locks (%s), probably because of a race condition"
160             " with another job, who added them first" % add_locks)
161       try:
162         try:
163           if adding_locks:
164             lu.acquired_locks[level] = add_locks
165           result = self._LockAndExecLU(lu, level + 1)
166         finally:
167           if level in lu.remove_locks:
168             self.context.glm.remove(level, lu.remove_locks[level])
169       finally:
170         if self.context.glm.is_owned(level):
171           self.context.glm.release(level)
172     else:
173       result = self._LockAndExecLU(lu, level + 1)
174
175     return result
176
177   def ExecOpCode(self, op, feedback_fn, run_notifier):
178     """Execute an opcode.
179
180     @type op: an OpCode instance
181     @param op: the opcode to be executed
182     @type feedback_fn: a function that takes a single argument
183     @param feedback_fn: this function will be used as feedback from the LU
184                         code to the end-user
185     @type run_notifier: callable (no arguments) or None
186     @param run_notifier:  this function (if callable) will be called when
187                           we are about to call the lu's Exec() method, that
188                           is, after we have aquired all locks
189
190     """
191     if not isinstance(op, opcodes.OpCode):
192       raise errors.ProgrammerError("Non-opcode instance passed"
193                                    " to ExecOpcode")
194
195     self._feedback_fn = feedback_fn
196     self._run_notifier = run_notifier
197     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
198     if lu_class is None:
199       raise errors.OpCodeUnknown("Unknown opcode")
200
201     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
202     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
203     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
204                              shared=not lu_class.REQ_BGL)
205     try:
206       self.exclusive_BGL = lu_class.REQ_BGL
207       lu = lu_class(self, op, self.context, self.rpc)
208       lu.ExpandNames()
209       assert lu.needed_locks is not None, "needed_locks not set by LU"
210       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
211     finally:
212       self.context.glm.release(locking.LEVEL_CLUSTER)
213       self.exclusive_BGL = False
214
215     return result
216
217   def LogStep(self, current, total, message):
218     """Log a change in LU execution progress.
219
220     """
221     logging.debug("Step %d/%d %s", current, total, message)
222     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
223
224   def LogWarning(self, message, *args, **kwargs):
225     """Log a warning to the logs and the user.
226
227     The optional keyword argument is 'hint' and can be used to show a
228     hint to the user (presumably related to the warning). If the
229     message is empty, it will not be printed at all, allowing one to
230     show only a hint.
231
232     """
233     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
234            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
235     if args:
236       message = message % tuple(args)
237     if message:
238       logging.warning(message)
239       self._feedback_fn(" - WARNING: %s" % message)
240     if "hint" in kwargs:
241       self._feedback_fn("      Hint: %s" % kwargs["hint"])
242
243   def LogInfo(self, message, *args):
244     """Log an informational message to the logs and the user.
245
246     """
247     if args:
248       message = message % tuple(args)
249     logging.info(message)
250     self._feedback_fn(" - INFO: %s" % message)
251
252
253 class HooksMaster(object):
254   """Hooks master.
255
256   This class distributes the run commands to the nodes based on the
257   specific LU class.
258
259   In order to remove the direct dependency on the rpc module, the
260   constructor needs a function which actually does the remote
261   call. This will usually be rpc.call_hooks_runner, but any function
262   which behaves the same works.
263
264   """
265   def __init__(self, callfn, proc, lu):
266     self.callfn = callfn
267     self.proc = proc
268     self.lu = lu
269     self.op = lu.op
270     self.env, node_list_pre, node_list_post = self._BuildEnv()
271     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
272                       constants.HOOKS_PHASE_POST: node_list_post}
273
274   def _BuildEnv(self):
275     """Compute the environment and the target nodes.
276
277     Based on the opcode and the current node list, this builds the
278     environment for the hooks and the target node list for the run.
279
280     """
281     env = {
282       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
283       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
284       "GANETI_OP_CODE": self.op.OP_ID,
285       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
286       "GANETI_DATA_DIR": constants.DATA_DIR,
287       }
288
289     if self.lu.HPATH is not None:
290       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
291       if lu_env:
292         for key in lu_env:
293           env["GANETI_" + key] = lu_env[key]
294     else:
295       lu_nodes_pre = lu_nodes_post = []
296
297     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
298
299   def _RunWrapper(self, node_list, hpath, phase):
300     """Simple wrapper over self.callfn.
301
302     This method fixes the environment before doing the rpc call.
303
304     """
305     env = self.env.copy()
306     env["GANETI_HOOKS_PHASE"] = phase
307     env["GANETI_HOOKS_PATH"] = hpath
308     if self.lu.cfg is not None:
309       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
310       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
311
312     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
313
314     return self.callfn(node_list, hpath, phase, env)
315
316   def RunPhase(self, phase):
317     """Run all the scripts for a phase.
318
319     This is the main function of the HookMaster.
320
321     @param phase: one of L{constants.HOOKS_PHASE_POST} or
322         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
323     @return: the processed results of the hooks multi-node rpc call
324     @raise errors.HooksFailure: on communication failure to the nodes
325
326     """
327     if not self.node_list[phase]:
328       # empty node list, we should not attempt to run this as either
329       # we're in the cluster init phase and the rpc client part can't
330       # even attempt to run, or this LU doesn't do hooks at all
331       return
332     hpath = self.lu.HPATH
333     results = self._RunWrapper(self.node_list[phase], hpath, phase)
334     if phase == constants.HOOKS_PHASE_PRE:
335       errs = []
336       if not results:
337         raise errors.HooksFailure("Communication failure")
338       for node_name in results:
339         res = results[node_name]
340         if res.failed or res.data is False or not isinstance(res.data, list):
341           self.proc.LogWarning("Communication failure to node %s" % node_name)
342           continue
343         for script, hkr, output in res.data:
344           if hkr == constants.HKR_FAIL:
345             output = output.strip().encode("string_escape")
346             errs.append((node_name, script, output))
347       if errs:
348         raise errors.HooksAbort(errs)
349     return results
350
351   def RunConfigUpdate(self):
352     """Run the special configuration update hook
353
354     This is a special hook that runs only on the master after each
355     top-level LI if the configuration has been updated.
356
357     """
358     phase = constants.HOOKS_PHASE_POST
359     hpath = constants.HOOKS_NAME_CFGUPDATE
360     nodes = [self.lu.cfg.GetMasterNode()]
361     results = self._RunWrapper(nodes, hpath, phase)