Merge commit 'origin/next' into branch-2.1
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32
33 from ganeti import opcodes
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import rpc
37 from ganeti import cmdlib
38 from ganeti import locking
39
40
41 class Processor(object):
42   """Object which runs OpCodes"""
43   DISPATCH_TABLE = {
44     # Cluster
45     opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
46     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
50     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
54     opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
55     # node lu
56     opcodes.OpAddNode: cmdlib.LUAddNode,
57     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59     opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
60     opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
61     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
62     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
63     opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
64     opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
65     opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
66     # instance lu
67     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
68     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
69     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
70     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
71     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
72     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
73     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
74     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
75     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
76     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
77     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
78     opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
79     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
80     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
81     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
82     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
83     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
84     # os lu
85     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
86     # exports lu
87     opcodes.OpQueryExports: cmdlib.LUQueryExports,
88     opcodes.OpExportInstance: cmdlib.LUExportInstance,
89     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
90     # tags lu
91     opcodes.OpGetTags: cmdlib.LUGetTags,
92     opcodes.OpSearchTags: cmdlib.LUSearchTags,
93     opcodes.OpAddTags: cmdlib.LUAddTags,
94     opcodes.OpDelTags: cmdlib.LUDelTags,
95     # test lu
96     opcodes.OpTestDelay: cmdlib.LUTestDelay,
97     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
98     }
99
100   def __init__(self, context):
101     """Constructor for Processor
102
103     Args:
104      - feedback_fn: the feedback function (taking one string) to be run when
105                     interesting events are happening
106     """
107     self.context = context
108     self._feedback_fn = None
109     self.exclusive_BGL = False
110     self.rpc = rpc.RpcRunner(context.cfg)
111
112   def _ExecLU(self, lu):
113     """Logical Unit execution sequence.
114
115     """
116     write_count = self.context.cfg.write_count
117     lu.CheckPrereq()
118     hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
119     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
120     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
121                      self._feedback_fn, None)
122
123     if getattr(lu.op, "dry_run", False):
124       # in this mode, no post-hooks are run, and the config is not
125       # written (as it might have been modified by another LU, and we
126       # shouldn't do writeout on behalf of other threads
127       self.LogInfo("dry-run mode requested, not actually executing"
128                    " the operation")
129       return lu.dry_run_result
130
131     try:
132       result = lu.Exec(self._feedback_fn)
133       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
134       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
135                                 self._feedback_fn, result)
136     finally:
137       # FIXME: This needs locks if not lu_class.REQ_BGL
138       if write_count != self.context.cfg.write_count:
139         hm.RunConfigUpdate()
140
141     return result
142
143   def _LockAndExecLU(self, lu, level):
144     """Execute a Logical Unit, with the needed locks.
145
146     This is a recursive function that starts locking the given level, and
147     proceeds up, till there are no more locks to acquire. Then it executes the
148     given LU and its opcodes.
149
150     """
151     adding_locks = level in lu.add_locks
152     acquiring_locks = level in lu.needed_locks
153     if level not in locking.LEVELS:
154       if callable(self._run_notifier):
155         self._run_notifier()
156       result = self._ExecLU(lu)
157     elif adding_locks and acquiring_locks:
158       # We could both acquire and add locks at the same level, but for now we
159       # don't need this, so we'll avoid the complicated code needed.
160       raise NotImplementedError(
161         "Can't declare locks to acquire when adding others")
162     elif adding_locks or acquiring_locks:
163       lu.DeclareLocks(level)
164       share = lu.share_locks[level]
165       if acquiring_locks:
166         needed_locks = lu.needed_locks[level]
167         lu.acquired_locks[level] = self.context.glm.acquire(level,
168                                                             needed_locks,
169                                                             shared=share)
170       else: # adding_locks
171         add_locks = lu.add_locks[level]
172         lu.remove_locks[level] = add_locks
173         try:
174           self.context.glm.add(level, add_locks, acquired=1, shared=share)
175         except errors.LockError:
176           raise errors.OpPrereqError(
177             "Couldn't add locks (%s), probably because of a race condition"
178             " with another job, who added them first" % add_locks)
179       try:
180         try:
181           if adding_locks:
182             lu.acquired_locks[level] = add_locks
183           result = self._LockAndExecLU(lu, level + 1)
184         finally:
185           if level in lu.remove_locks:
186             self.context.glm.remove(level, lu.remove_locks[level])
187       finally:
188         if self.context.glm.is_owned(level):
189           self.context.glm.release(level)
190     else:
191       result = self._LockAndExecLU(lu, level + 1)
192
193     return result
194
195   def ExecOpCode(self, op, feedback_fn, run_notifier):
196     """Execute an opcode.
197
198     @type op: an OpCode instance
199     @param op: the opcode to be executed
200     @type feedback_fn: a function that takes a single argument
201     @param feedback_fn: this function will be used as feedback from the LU
202                         code to the end-user
203     @type run_notifier: callable (no arguments) or None
204     @param run_notifier:  this function (if callable) will be called when
205                           we are about to call the lu's Exec() method, that
206                           is, after we have acquired all locks
207
208     """
209     if not isinstance(op, opcodes.OpCode):
210       raise errors.ProgrammerError("Non-opcode instance passed"
211                                    " to ExecOpcode")
212
213     self._feedback_fn = feedback_fn
214     self._run_notifier = run_notifier
215     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
216     if lu_class is None:
217       raise errors.OpCodeUnknown("Unknown opcode")
218
219     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
220     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
221     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
222                              shared=not lu_class.REQ_BGL)
223     try:
224       self.exclusive_BGL = lu_class.REQ_BGL
225       lu = lu_class(self, op, self.context, self.rpc)
226       lu.ExpandNames()
227       assert lu.needed_locks is not None, "needed_locks not set by LU"
228       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
229     finally:
230       self.context.glm.release(locking.LEVEL_CLUSTER)
231       self.exclusive_BGL = False
232
233     return result
234
235   def LogStep(self, current, total, message):
236     """Log a change in LU execution progress.
237
238     """
239     logging.debug("Step %d/%d %s", current, total, message)
240     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
241
242   def LogWarning(self, message, *args, **kwargs):
243     """Log a warning to the logs and the user.
244
245     The optional keyword argument is 'hint' and can be used to show a
246     hint to the user (presumably related to the warning). If the
247     message is empty, it will not be printed at all, allowing one to
248     show only a hint.
249
250     """
251     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
252            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
253     if args:
254       message = message % tuple(args)
255     if message:
256       logging.warning(message)
257       self._feedback_fn(" - WARNING: %s" % message)
258     if "hint" in kwargs:
259       self._feedback_fn("      Hint: %s" % kwargs["hint"])
260
261   def LogInfo(self, message, *args):
262     """Log an informational message to the logs and the user.
263
264     """
265     if args:
266       message = message % tuple(args)
267     logging.info(message)
268     self._feedback_fn(" - INFO: %s" % message)
269
270
271 class HooksMaster(object):
272   """Hooks master.
273
274   This class distributes the run commands to the nodes based on the
275   specific LU class.
276
277   In order to remove the direct dependency on the rpc module, the
278   constructor needs a function which actually does the remote
279   call. This will usually be rpc.call_hooks_runner, but any function
280   which behaves the same works.
281
282   """
283   def __init__(self, callfn, proc, lu):
284     self.callfn = callfn
285     self.proc = proc
286     self.lu = lu
287     self.op = lu.op
288     self.env, node_list_pre, node_list_post = self._BuildEnv()
289     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
290                       constants.HOOKS_PHASE_POST: node_list_post}
291
292   def _BuildEnv(self):
293     """Compute the environment and the target nodes.
294
295     Based on the opcode and the current node list, this builds the
296     environment for the hooks and the target node list for the run.
297
298     """
299     env = {
300       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
301       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
302       "GANETI_OP_CODE": self.op.OP_ID,
303       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
304       "GANETI_DATA_DIR": constants.DATA_DIR,
305       }
306
307     if self.lu.HPATH is not None:
308       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
309       if lu_env:
310         for key in lu_env:
311           env["GANETI_" + key] = lu_env[key]
312     else:
313       lu_nodes_pre = lu_nodes_post = []
314
315     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
316
317   def _RunWrapper(self, node_list, hpath, phase):
318     """Simple wrapper over self.callfn.
319
320     This method fixes the environment before doing the rpc call.
321
322     """
323     env = self.env.copy()
324     env["GANETI_HOOKS_PHASE"] = phase
325     env["GANETI_HOOKS_PATH"] = hpath
326     if self.lu.cfg is not None:
327       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
328       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
329
330     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
331
332     return self.callfn(node_list, hpath, phase, env)
333
334   def RunPhase(self, phase):
335     """Run all the scripts for a phase.
336
337     This is the main function of the HookMaster.
338
339     @param phase: one of L{constants.HOOKS_PHASE_POST} or
340         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
341     @return: the processed results of the hooks multi-node rpc call
342     @raise errors.HooksFailure: on communication failure to the nodes
343
344     """
345     if not self.node_list[phase]:
346       # empty node list, we should not attempt to run this as either
347       # we're in the cluster init phase and the rpc client part can't
348       # even attempt to run, or this LU doesn't do hooks at all
349       return
350     hpath = self.lu.HPATH
351     results = self._RunWrapper(self.node_list[phase], hpath, phase)
352     if phase == constants.HOOKS_PHASE_PRE:
353       errs = []
354       if not results:
355         raise errors.HooksFailure("Communication failure")
356       for node_name in results:
357         res = results[node_name]
358         if res.offline:
359           continue
360         msg = res.RemoteFailMsg()
361         if msg:
362           self.proc.LogWarning("Communication failure to node %s: %s",
363                                node_name, msg)
364           continue
365         for script, hkr, output in res.payload:
366           if hkr == constants.HKR_FAIL:
367             errs.append((node_name, script, output))
368       if errs:
369         raise errors.HooksAbort(errs)
370     return results
371
372   def RunConfigUpdate(self):
373     """Run the special configuration update hook
374
375     This is a special hook that runs only on the master after each
376     top-level LI if the configuration has been updated.
377
378     """
379     phase = constants.HOOKS_PHASE_POST
380     hpath = constants.HOOKS_NAME_CFGUPDATE
381     nodes = [self.lu.cfg.GetMasterNode()]
382     self._RunWrapper(nodes, hpath, phase)