Merge branch 'master' into next
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32
33 from ganeti import opcodes
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import rpc
37 from ganeti import cmdlib
38 from ganeti import locking
39
40
41 class Processor(object):
42   """Object which runs OpCodes"""
43   DISPATCH_TABLE = {
44     # Cluster
45     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
53     opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
54     # node lu
55     opcodes.OpAddNode: cmdlib.LUAddNode,
56     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
57     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
58     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
59     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
60     # instance lu
61     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
62     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
63     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
64     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
65     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
66     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
67     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
68     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
69     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
70     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
71     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
72     opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
73     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
74     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
75     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
76     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
77     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
78     # os lu
79     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
80     # exports lu
81     opcodes.OpQueryExports: cmdlib.LUQueryExports,
82     opcodes.OpExportInstance: cmdlib.LUExportInstance,
83     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
84     # tags lu
85     opcodes.OpGetTags: cmdlib.LUGetTags,
86     opcodes.OpSearchTags: cmdlib.LUSearchTags,
87     opcodes.OpAddTags: cmdlib.LUAddTags,
88     opcodes.OpDelTags: cmdlib.LUDelTags,
89     # test lu
90     opcodes.OpTestDelay: cmdlib.LUTestDelay,
91     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
92     }
93
94   def __init__(self, context):
95     """Constructor for Processor
96
97     Args:
98      - feedback_fn: the feedback function (taking one string) to be run when
99                     interesting events are happening
100     """
101     self.context = context
102     self._feedback_fn = None
103     self.exclusive_BGL = False
104     self.rpc = rpc.RpcRunner(context.cfg)
105
106   def _ExecLU(self, lu):
107     """Logical Unit execution sequence.
108
109     """
110     write_count = self.context.cfg.write_count
111     lu.CheckPrereq()
112     hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
113     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
114     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
115                      self._feedback_fn, None)
116     try:
117       result = lu.Exec(self._feedback_fn)
118       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
119       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
120                                 self._feedback_fn, result)
121     finally:
122       # FIXME: This needs locks if not lu_class.REQ_BGL
123       if write_count != self.context.cfg.write_count:
124         hm.RunConfigUpdate()
125
126     return result
127
128   def _LockAndExecLU(self, lu, level):
129     """Execute a Logical Unit, with the needed locks.
130
131     This is a recursive function that starts locking the given level, and
132     proceeds up, till there are no more locks to acquire. Then it executes the
133     given LU and its opcodes.
134
135     """
136     adding_locks = level in lu.add_locks
137     acquiring_locks = level in lu.needed_locks
138     if level not in locking.LEVELS:
139       if callable(self._run_notifier):
140         self._run_notifier()
141       result = self._ExecLU(lu)
142     elif adding_locks and acquiring_locks:
143       # We could both acquire and add locks at the same level, but for now we
144       # don't need this, so we'll avoid the complicated code needed.
145       raise NotImplementedError(
146         "Can't declare locks to acquire when adding others")
147     elif adding_locks or acquiring_locks:
148       lu.DeclareLocks(level)
149       share = lu.share_locks[level]
150       if acquiring_locks:
151         needed_locks = lu.needed_locks[level]
152         lu.acquired_locks[level] = self.context.glm.acquire(level,
153                                                             needed_locks,
154                                                             shared=share)
155       else: # adding_locks
156         add_locks = lu.add_locks[level]
157         lu.remove_locks[level] = add_locks
158         try:
159           self.context.glm.add(level, add_locks, acquired=1, shared=share)
160         except errors.LockError:
161           raise errors.OpPrereqError(
162             "Couldn't add locks (%s), probably because of a race condition"
163             " with another job, who added them first" % add_locks)
164       try:
165         try:
166           if adding_locks:
167             lu.acquired_locks[level] = add_locks
168           result = self._LockAndExecLU(lu, level + 1)
169         finally:
170           if level in lu.remove_locks:
171             self.context.glm.remove(level, lu.remove_locks[level])
172       finally:
173         if self.context.glm.is_owned(level):
174           self.context.glm.release(level)
175     else:
176       result = self._LockAndExecLU(lu, level + 1)
177
178     return result
179
180   def ExecOpCode(self, op, feedback_fn, run_notifier):
181     """Execute an opcode.
182
183     @type op: an OpCode instance
184     @param op: the opcode to be executed
185     @type feedback_fn: a function that takes a single argument
186     @param feedback_fn: this function will be used as feedback from the LU
187                         code to the end-user
188     @type run_notifier: callable (no arguments) or None
189     @param run_notifier:  this function (if callable) will be called when
190                           we are about to call the lu's Exec() method, that
191                           is, after we have acquired all locks
192
193     """
194     if not isinstance(op, opcodes.OpCode):
195       raise errors.ProgrammerError("Non-opcode instance passed"
196                                    " to ExecOpcode")
197
198     self._feedback_fn = feedback_fn
199     self._run_notifier = run_notifier
200     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
201     if lu_class is None:
202       raise errors.OpCodeUnknown("Unknown opcode")
203
204     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
205     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
206     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
207                              shared=not lu_class.REQ_BGL)
208     try:
209       self.exclusive_BGL = lu_class.REQ_BGL
210       lu = lu_class(self, op, self.context, self.rpc)
211       lu.ExpandNames()
212       assert lu.needed_locks is not None, "needed_locks not set by LU"
213       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
214     finally:
215       self.context.glm.release(locking.LEVEL_CLUSTER)
216       self.exclusive_BGL = False
217
218     return result
219
220   def LogStep(self, current, total, message):
221     """Log a change in LU execution progress.
222
223     """
224     logging.debug("Step %d/%d %s", current, total, message)
225     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
226
227   def LogWarning(self, message, *args, **kwargs):
228     """Log a warning to the logs and the user.
229
230     The optional keyword argument is 'hint' and can be used to show a
231     hint to the user (presumably related to the warning). If the
232     message is empty, it will not be printed at all, allowing one to
233     show only a hint.
234
235     """
236     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
237            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
238     if args:
239       message = message % tuple(args)
240     if message:
241       logging.warning(message)
242       self._feedback_fn(" - WARNING: %s" % message)
243     if "hint" in kwargs:
244       self._feedback_fn("      Hint: %s" % kwargs["hint"])
245
246   def LogInfo(self, message, *args):
247     """Log an informational message to the logs and the user.
248
249     """
250     if args:
251       message = message % tuple(args)
252     logging.info(message)
253     self._feedback_fn(" - INFO: %s" % message)
254
255
256 class HooksMaster(object):
257   """Hooks master.
258
259   This class distributes the run commands to the nodes based on the
260   specific LU class.
261
262   In order to remove the direct dependency on the rpc module, the
263   constructor needs a function which actually does the remote
264   call. This will usually be rpc.call_hooks_runner, but any function
265   which behaves the same works.
266
267   """
268   def __init__(self, callfn, proc, lu):
269     self.callfn = callfn
270     self.proc = proc
271     self.lu = lu
272     self.op = lu.op
273     self.env, node_list_pre, node_list_post = self._BuildEnv()
274     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
275                       constants.HOOKS_PHASE_POST: node_list_post}
276
277   def _BuildEnv(self):
278     """Compute the environment and the target nodes.
279
280     Based on the opcode and the current node list, this builds the
281     environment for the hooks and the target node list for the run.
282
283     """
284     env = {
285       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
286       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
287       "GANETI_OP_CODE": self.op.OP_ID,
288       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
289       "GANETI_DATA_DIR": constants.DATA_DIR,
290       }
291
292     if self.lu.HPATH is not None:
293       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
294       if lu_env:
295         for key in lu_env:
296           env["GANETI_" + key] = lu_env[key]
297     else:
298       lu_nodes_pre = lu_nodes_post = []
299
300     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
301
302   def _RunWrapper(self, node_list, hpath, phase):
303     """Simple wrapper over self.callfn.
304
305     This method fixes the environment before doing the rpc call.
306
307     """
308     env = self.env.copy()
309     env["GANETI_HOOKS_PHASE"] = phase
310     env["GANETI_HOOKS_PATH"] = hpath
311     if self.lu.cfg is not None:
312       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
313       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
314
315     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
316
317     return self.callfn(node_list, hpath, phase, env)
318
319   def RunPhase(self, phase):
320     """Run all the scripts for a phase.
321
322     This is the main function of the HookMaster.
323
324     @param phase: one of L{constants.HOOKS_PHASE_POST} or
325         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
326     @return: the processed results of the hooks multi-node rpc call
327     @raise errors.HooksFailure: on communication failure to the nodes
328
329     """
330     if not self.node_list[phase]:
331       # empty node list, we should not attempt to run this as either
332       # we're in the cluster init phase and the rpc client part can't
333       # even attempt to run, or this LU doesn't do hooks at all
334       return
335     hpath = self.lu.HPATH
336     results = self._RunWrapper(self.node_list[phase], hpath, phase)
337     if phase == constants.HOOKS_PHASE_PRE:
338       errs = []
339       if not results:
340         raise errors.HooksFailure("Communication failure")
341       for node_name in results:
342         res = results[node_name]
343         if res.failed or res.data is False or not isinstance(res.data, list):
344           if not res.offline:
345             self.proc.LogWarning("Communication failure to node %s" %
346                                  node_name)
347           continue
348         for script, hkr, output in res.data:
349           if hkr == constants.HKR_FAIL:
350             errs.append((node_name, script, output))
351       if errs:
352         raise errors.HooksAbort(errs)
353     return results
354
355   def RunConfigUpdate(self):
356     """Run the special configuration update hook
357
358     This is a special hook that runs only on the master after each
359     top-level LI if the configuration has been updated.
360
361     """
362     phase = constants.HOOKS_PHASE_POST
363     hpath = constants.HOOKS_NAME_CFGUPDATE
364     nodes = [self.lu.cfg.GetMasterNode()]
365     self._RunWrapper(nodes, hpath, phase)