KVM: allow netboot
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32
33 from ganeti import opcodes
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import rpc
37 from ganeti import cmdlib
38 from ganeti import locking
39
40
41 class Processor(object):
42   """Object which runs OpCodes"""
43   DISPATCH_TABLE = {
44     # Cluster
45     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
53     # node lu
54     opcodes.OpAddNode: cmdlib.LUAddNode,
55     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
58     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
59     # instance lu
60     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
61     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
62     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
63     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
64     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
65     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
66     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
67     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
68     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
69     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
70     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
71     opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
72     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
73     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
74     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
75     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
76     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
77     # os lu
78     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
79     # exports lu
80     opcodes.OpQueryExports: cmdlib.LUQueryExports,
81     opcodes.OpExportInstance: cmdlib.LUExportInstance,
82     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
83     # tags lu
84     opcodes.OpGetTags: cmdlib.LUGetTags,
85     opcodes.OpSearchTags: cmdlib.LUSearchTags,
86     opcodes.OpAddTags: cmdlib.LUAddTags,
87     opcodes.OpDelTags: cmdlib.LUDelTags,
88     # test lu
89     opcodes.OpTestDelay: cmdlib.LUTestDelay,
90     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
91     }
92
93   def __init__(self, context):
94     """Constructor for Processor
95
96     Args:
97      - feedback_fn: the feedback function (taking one string) to be run when
98                     interesting events are happening
99     """
100     self.context = context
101     self._feedback_fn = None
102     self.exclusive_BGL = False
103     self.rpc = rpc.RpcRunner(context.cfg)
104
105   def _ExecLU(self, lu):
106     """Logical Unit execution sequence.
107
108     """
109     write_count = self.context.cfg.write_count
110     lu.CheckPrereq()
111     hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
112     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
113     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
114                      self._feedback_fn, None)
115     try:
116       result = lu.Exec(self._feedback_fn)
117       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
118       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
119                                 self._feedback_fn, result)
120     finally:
121       # FIXME: This needs locks if not lu_class.REQ_BGL
122       if write_count != self.context.cfg.write_count:
123         hm.RunConfigUpdate()
124
125     return result
126
127   def _LockAndExecLU(self, lu, level):
128     """Execute a Logical Unit, with the needed locks.
129
130     This is a recursive function that starts locking the given level, and
131     proceeds up, till there are no more locks to acquire. Then it executes the
132     given LU and its opcodes.
133
134     """
135     adding_locks = level in lu.add_locks
136     acquiring_locks = level in lu.needed_locks
137     if level not in locking.LEVELS:
138       if callable(self._run_notifier):
139         self._run_notifier()
140       result = self._ExecLU(lu)
141     elif adding_locks and acquiring_locks:
142       # We could both acquire and add locks at the same level, but for now we
143       # don't need this, so we'll avoid the complicated code needed.
144       raise NotImplementedError(
145         "Can't declare locks to acquire when adding others")
146     elif adding_locks or acquiring_locks:
147       lu.DeclareLocks(level)
148       share = lu.share_locks[level]
149       if acquiring_locks:
150         needed_locks = lu.needed_locks[level]
151         lu.acquired_locks[level] = self.context.glm.acquire(level,
152                                                             needed_locks,
153                                                             shared=share)
154       else: # adding_locks
155         add_locks = lu.add_locks[level]
156         lu.remove_locks[level] = add_locks
157         try:
158           self.context.glm.add(level, add_locks, acquired=1, shared=share)
159         except errors.LockError:
160           raise errors.OpPrereqError(
161             "Coudn't add locks (%s), probably because of a race condition"
162             " with another job, who added them first" % add_locks)
163       try:
164         try:
165           if adding_locks:
166             lu.acquired_locks[level] = add_locks
167           result = self._LockAndExecLU(lu, level + 1)
168         finally:
169           if level in lu.remove_locks:
170             self.context.glm.remove(level, lu.remove_locks[level])
171       finally:
172         if self.context.glm.is_owned(level):
173           self.context.glm.release(level)
174     else:
175       result = self._LockAndExecLU(lu, level + 1)
176
177     return result
178
179   def ExecOpCode(self, op, feedback_fn, run_notifier):
180     """Execute an opcode.
181
182     @type op: an OpCode instance
183     @param op: the opcode to be executed
184     @type feedback_fn: a function that takes a single argument
185     @param feedback_fn: this function will be used as feedback from the LU
186                         code to the end-user
187     @type run_notifier: callable (no arguments) or None
188     @param run_notifier:  this function (if callable) will be called when
189                           we are about to call the lu's Exec() method, that
190                           is, after we have aquired all locks
191
192     """
193     if not isinstance(op, opcodes.OpCode):
194       raise errors.ProgrammerError("Non-opcode instance passed"
195                                    " to ExecOpcode")
196
197     self._feedback_fn = feedback_fn
198     self._run_notifier = run_notifier
199     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
200     if lu_class is None:
201       raise errors.OpCodeUnknown("Unknown opcode")
202
203     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
204     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
205     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
206                              shared=not lu_class.REQ_BGL)
207     try:
208       self.exclusive_BGL = lu_class.REQ_BGL
209       lu = lu_class(self, op, self.context, self.rpc)
210       lu.ExpandNames()
211       assert lu.needed_locks is not None, "needed_locks not set by LU"
212       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
213     finally:
214       self.context.glm.release(locking.LEVEL_CLUSTER)
215       self.exclusive_BGL = False
216
217     return result
218
219   def LogStep(self, current, total, message):
220     """Log a change in LU execution progress.
221
222     """
223     logging.debug("Step %d/%d %s", current, total, message)
224     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
225
226   def LogWarning(self, message, *args, **kwargs):
227     """Log a warning to the logs and the user.
228
229     The optional keyword argument is 'hint' and can be used to show a
230     hint to the user (presumably related to the warning). If the
231     message is empty, it will not be printed at all, allowing one to
232     show only a hint.
233
234     """
235     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
236            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
237     if args:
238       message = message % tuple(args)
239     if message:
240       logging.warning(message)
241       self._feedback_fn(" - WARNING: %s" % message)
242     if "hint" in kwargs:
243       self._feedback_fn("      Hint: %s" % kwargs["hint"])
244
245   def LogInfo(self, message, *args):
246     """Log an informational message to the logs and the user.
247
248     """
249     if args:
250       message = message % tuple(args)
251     logging.info(message)
252     self._feedback_fn(" - INFO: %s" % message)
253
254
255 class HooksMaster(object):
256   """Hooks master.
257
258   This class distributes the run commands to the nodes based on the
259   specific LU class.
260
261   In order to remove the direct dependency on the rpc module, the
262   constructor needs a function which actually does the remote
263   call. This will usually be rpc.call_hooks_runner, but any function
264   which behaves the same works.
265
266   """
267   def __init__(self, callfn, proc, lu):
268     self.callfn = callfn
269     self.proc = proc
270     self.lu = lu
271     self.op = lu.op
272     self.env, node_list_pre, node_list_post = self._BuildEnv()
273     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
274                       constants.HOOKS_PHASE_POST: node_list_post}
275
276   def _BuildEnv(self):
277     """Compute the environment and the target nodes.
278
279     Based on the opcode and the current node list, this builds the
280     environment for the hooks and the target node list for the run.
281
282     """
283     env = {
284       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
285       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
286       "GANETI_OP_CODE": self.op.OP_ID,
287       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
288       "GANETI_DATA_DIR": constants.DATA_DIR,
289       }
290
291     if self.lu.HPATH is not None:
292       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
293       if lu_env:
294         for key in lu_env:
295           env["GANETI_" + key] = lu_env[key]
296     else:
297       lu_nodes_pre = lu_nodes_post = []
298
299     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
300
301   def _RunWrapper(self, node_list, hpath, phase):
302     """Simple wrapper over self.callfn.
303
304     This method fixes the environment before doing the rpc call.
305
306     """
307     env = self.env.copy()
308     env["GANETI_HOOKS_PHASE"] = phase
309     env["GANETI_HOOKS_PATH"] = hpath
310     if self.lu.cfg is not None:
311       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
312       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
313
314     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
315
316     return self.callfn(node_list, hpath, phase, env)
317
318   def RunPhase(self, phase):
319     """Run all the scripts for a phase.
320
321     This is the main function of the HookMaster.
322
323     @param phase: one of L{constants.HOOKS_PHASE_POST} or
324         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
325     @return: the processed results of the hooks multi-node rpc call
326     @raise errors.HooksFailure: on communication failure to the nodes
327
328     """
329     if not self.node_list[phase]:
330       # empty node list, we should not attempt to run this as either
331       # we're in the cluster init phase and the rpc client part can't
332       # even attempt to run, or this LU doesn't do hooks at all
333       return
334     hpath = self.lu.HPATH
335     results = self._RunWrapper(self.node_list[phase], hpath, phase)
336     if phase == constants.HOOKS_PHASE_PRE:
337       errs = []
338       if not results:
339         raise errors.HooksFailure("Communication failure")
340       for node_name in results:
341         res = results[node_name]
342         if res.failed or res.data is False or not isinstance(res.data, list):
343           if not res.offline:
344             self.proc.LogWarning("Communication failure to node %s" %
345                                  node_name)
346           continue
347         for script, hkr, output in res.data:
348           if hkr == constants.HKR_FAIL:
349             errs.append((node_name, script, output))
350       if errs:
351         raise errors.HooksAbort(errs)
352     return results
353
354   def RunConfigUpdate(self):
355     """Run the special configuration update hook
356
357     This is a special hook that runs only on the master after each
358     top-level LI if the configuration has been updated.
359
360     """
361     phase = constants.HOOKS_PHASE_POST
362     hpath = constants.HOOKS_NAME_CFGUPDATE
363     nodes = [self.lu.cfg.GetMasterNode()]
364     results = self._RunWrapper(nodes, hpath, phase)