HooksMaster: list of nodes override
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32
33 from ganeti import opcodes
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import rpc
37 from ganeti import cmdlib
38 from ganeti import locking
39
40
41 class Processor(object):
42   """Object which runs OpCodes"""
43   DISPATCH_TABLE = {
44     # Cluster
45     opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
46     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
50     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
54     opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
55     # node lu
56     opcodes.OpAddNode: cmdlib.LUAddNode,
57     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59     opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
60     opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
61     opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
62     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
63     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
64     opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
65     opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
66     opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
67     # instance lu
68     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
69     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
70     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
71     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
72     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
73     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
74     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
75     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
76     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
77     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
78     opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
79     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
80     opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
81     opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
82     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
83     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
84     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
85     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
86     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
87     # os lu
88     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
89     # exports lu
90     opcodes.OpQueryExports: cmdlib.LUQueryExports,
91     opcodes.OpExportInstance: cmdlib.LUExportInstance,
92     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
93     # tags lu
94     opcodes.OpGetTags: cmdlib.LUGetTags,
95     opcodes.OpSearchTags: cmdlib.LUSearchTags,
96     opcodes.OpAddTags: cmdlib.LUAddTags,
97     opcodes.OpDelTags: cmdlib.LUDelTags,
98     # test lu
99     opcodes.OpTestDelay: cmdlib.LUTestDelay,
100     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
101     }
102
103   def __init__(self, context):
104     """Constructor for Processor
105
106     Args:
107      - feedback_fn: the feedback function (taking one string) to be run when
108                     interesting events are happening
109     """
110     self.context = context
111     self._feedback_fn = None
112     self.exclusive_BGL = False
113     self.rpc = rpc.RpcRunner(context.cfg)
114
115   def _ExecLU(self, lu):
116     """Logical Unit execution sequence.
117
118     """
119     write_count = self.context.cfg.write_count
120     lu.CheckPrereq()
121     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
122     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
123     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
124                      self._feedback_fn, None)
125
126     if getattr(lu.op, "dry_run", False):
127       # in this mode, no post-hooks are run, and the config is not
128       # written (as it might have been modified by another LU, and we
129       # shouldn't do writeout on behalf of other threads
130       self.LogInfo("dry-run mode requested, not actually executing"
131                    " the operation")
132       return lu.dry_run_result
133
134     try:
135       result = lu.Exec(self._feedback_fn)
136       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
137       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
138                                 self._feedback_fn, result)
139     finally:
140       # FIXME: This needs locks if not lu_class.REQ_BGL
141       if write_count != self.context.cfg.write_count:
142         hm.RunConfigUpdate()
143
144     return result
145
146   def _LockAndExecLU(self, lu, level):
147     """Execute a Logical Unit, with the needed locks.
148
149     This is a recursive function that starts locking the given level, and
150     proceeds up, till there are no more locks to acquire. Then it executes the
151     given LU and its opcodes.
152
153     """
154     adding_locks = level in lu.add_locks
155     acquiring_locks = level in lu.needed_locks
156     if level not in locking.LEVELS:
157       if callable(self._run_notifier):
158         self._run_notifier()
159       result = self._ExecLU(lu)
160     elif adding_locks and acquiring_locks:
161       # We could both acquire and add locks at the same level, but for now we
162       # don't need this, so we'll avoid the complicated code needed.
163       raise NotImplementedError(
164         "Can't declare locks to acquire when adding others")
165     elif adding_locks or acquiring_locks:
166       lu.DeclareLocks(level)
167       share = lu.share_locks[level]
168       if acquiring_locks:
169         needed_locks = lu.needed_locks[level]
170         lu.acquired_locks[level] = self.context.glm.acquire(level,
171                                                             needed_locks,
172                                                             shared=share)
173       else: # adding_locks
174         add_locks = lu.add_locks[level]
175         lu.remove_locks[level] = add_locks
176         try:
177           self.context.glm.add(level, add_locks, acquired=1, shared=share)
178         except errors.LockError:
179           raise errors.OpPrereqError(
180             "Couldn't add locks (%s), probably because of a race condition"
181             " with another job, who added them first" % add_locks)
182       try:
183         try:
184           if adding_locks:
185             lu.acquired_locks[level] = add_locks
186           result = self._LockAndExecLU(lu, level + 1)
187         finally:
188           if level in lu.remove_locks:
189             self.context.glm.remove(level, lu.remove_locks[level])
190       finally:
191         if self.context.glm.is_owned(level):
192           self.context.glm.release(level)
193     else:
194       result = self._LockAndExecLU(lu, level + 1)
195
196     return result
197
198   def ExecOpCode(self, op, feedback_fn, run_notifier):
199     """Execute an opcode.
200
201     @type op: an OpCode instance
202     @param op: the opcode to be executed
203     @type feedback_fn: a function that takes a single argument
204     @param feedback_fn: this function will be used as feedback from the LU
205                         code to the end-user
206     @type run_notifier: callable (no arguments) or None
207     @param run_notifier:  this function (if callable) will be called when
208                           we are about to call the lu's Exec() method, that
209                           is, after we have acquired all locks
210
211     """
212     if not isinstance(op, opcodes.OpCode):
213       raise errors.ProgrammerError("Non-opcode instance passed"
214                                    " to ExecOpcode")
215
216     self._feedback_fn = feedback_fn
217     self._run_notifier = run_notifier
218     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
219     if lu_class is None:
220       raise errors.OpCodeUnknown("Unknown opcode")
221
222     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
223     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
224     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
225                              shared=not lu_class.REQ_BGL)
226     try:
227       self.exclusive_BGL = lu_class.REQ_BGL
228       lu = lu_class(self, op, self.context, self.rpc)
229       lu.ExpandNames()
230       assert lu.needed_locks is not None, "needed_locks not set by LU"
231       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
232     finally:
233       self.context.glm.release(locking.LEVEL_CLUSTER)
234       self.exclusive_BGL = False
235
236     return result
237
238   def LogStep(self, current, total, message):
239     """Log a change in LU execution progress.
240
241     """
242     logging.debug("Step %d/%d %s", current, total, message)
243     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
244
245   def LogWarning(self, message, *args, **kwargs):
246     """Log a warning to the logs and the user.
247
248     The optional keyword argument is 'hint' and can be used to show a
249     hint to the user (presumably related to the warning). If the
250     message is empty, it will not be printed at all, allowing one to
251     show only a hint.
252
253     """
254     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
255            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
256     if args:
257       message = message % tuple(args)
258     if message:
259       logging.warning(message)
260       self._feedback_fn(" - WARNING: %s" % message)
261     if "hint" in kwargs:
262       self._feedback_fn("      Hint: %s" % kwargs["hint"])
263
264   def LogInfo(self, message, *args):
265     """Log an informational message to the logs and the user.
266
267     """
268     if args:
269       message = message % tuple(args)
270     logging.info(message)
271     self._feedback_fn(" - INFO: %s" % message)
272
273
274 class HooksMaster(object):
275   """Hooks master.
276
277   This class distributes the run commands to the nodes based on the
278   specific LU class.
279
280   In order to remove the direct dependency on the rpc module, the
281   constructor needs a function which actually does the remote
282   call. This will usually be rpc.call_hooks_runner, but any function
283   which behaves the same works.
284
285   """
286   def __init__(self, callfn, lu):
287     self.callfn = callfn
288     self.lu = lu
289     self.op = lu.op
290     self.env, node_list_pre, node_list_post = self._BuildEnv()
291     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
292                       constants.HOOKS_PHASE_POST: node_list_post}
293
294   def _BuildEnv(self):
295     """Compute the environment and the target nodes.
296
297     Based on the opcode and the current node list, this builds the
298     environment for the hooks and the target node list for the run.
299
300     """
301     env = {
302       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
303       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
304       "GANETI_OP_CODE": self.op.OP_ID,
305       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
306       "GANETI_DATA_DIR": constants.DATA_DIR,
307       }
308
309     if self.lu.HPATH is not None:
310       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
311       if lu_env:
312         for key in lu_env:
313           env["GANETI_" + key] = lu_env[key]
314     else:
315       lu_nodes_pre = lu_nodes_post = []
316
317     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
318
319   def _RunWrapper(self, node_list, hpath, phase):
320     """Simple wrapper over self.callfn.
321
322     This method fixes the environment before doing the rpc call.
323
324     """
325     env = self.env.copy()
326     env["GANETI_HOOKS_PHASE"] = phase
327     env["GANETI_HOOKS_PATH"] = hpath
328     if self.lu.cfg is not None:
329       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
330       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
331
332     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
333
334     return self.callfn(node_list, hpath, phase, env)
335
336   def RunPhase(self, phase, nodes=None):
337     """Run all the scripts for a phase.
338
339     This is the main function of the HookMaster.
340
341     @param phase: one of L{constants.HOOKS_PHASE_POST} or
342         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
343     @param nodes: overrides the predefined list of nodes for the given phase
344     @return: the processed results of the hooks multi-node rpc call
345     @raise errors.HooksFailure: on communication failure to the nodes
346
347     """
348     if not self.node_list[phase] and not nodes:
349       # empty node list, we should not attempt to run this as either
350       # we're in the cluster init phase and the rpc client part can't
351       # even attempt to run, or this LU doesn't do hooks at all
352       return
353     hpath = self.lu.HPATH
354     if nodes is not None:
355       results = self._RunWrapper(nodes, hpath, phase)
356     else:
357       results = self._RunWrapper(self.node_list[phase], hpath, phase)
358     if phase == constants.HOOKS_PHASE_PRE:
359       errs = []
360       if not results:
361         raise errors.HooksFailure("Communication failure")
362       for node_name in results:
363         res = results[node_name]
364         if res.offline:
365           continue
366         msg = res.RemoteFailMsg()
367         if msg:
368           self.lu.LogWarning("Communication failure to node %s: %s",
369                              node_name, msg)
370           continue
371         for script, hkr, output in res.payload:
372           if hkr == constants.HKR_FAIL:
373             errs.append((node_name, script, output))
374       if errs:
375         raise errors.HooksAbort(errs)
376     return results
377
378   def RunConfigUpdate(self):
379     """Run the special configuration update hook
380
381     This is a special hook that runs only on the master after each
382     top-level LI if the configuration has been updated.
383
384     """
385     phase = constants.HOOKS_PHASE_POST
386     hpath = constants.HOOKS_NAME_CFGUPDATE
387     nodes = [self.lu.cfg.GetMasterNode()]
388     self._RunWrapper(nodes, hpath, phase)