HooksMaster: fix RunPhase logging
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32
33 from ganeti import opcodes
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import rpc
37 from ganeti import cmdlib
38 from ganeti import locking
39
40
41 class Processor(object):
42   """Object which runs OpCodes"""
43   DISPATCH_TABLE = {
44     # Cluster
45     opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
46     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
50     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
54     opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
55     # node lu
56     opcodes.OpAddNode: cmdlib.LUAddNode,
57     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
58     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
59     opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
60     opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
61     opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
62     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
63     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
64     opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
65     opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
66     opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
67     # instance lu
68     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
69     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
70     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
71     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
72     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
73     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
74     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
75     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
76     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
77     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
78     opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
79     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
80     opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
81     opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
82     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
83     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
84     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
85     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
86     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
87     # os lu
88     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
89     # exports lu
90     opcodes.OpQueryExports: cmdlib.LUQueryExports,
91     opcodes.OpExportInstance: cmdlib.LUExportInstance,
92     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
93     # tags lu
94     opcodes.OpGetTags: cmdlib.LUGetTags,
95     opcodes.OpSearchTags: cmdlib.LUSearchTags,
96     opcodes.OpAddTags: cmdlib.LUAddTags,
97     opcodes.OpDelTags: cmdlib.LUDelTags,
98     # test lu
99     opcodes.OpTestDelay: cmdlib.LUTestDelay,
100     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
101     }
102
103   def __init__(self, context):
104     """Constructor for Processor
105
106     Args:
107      - feedback_fn: the feedback function (taking one string) to be run when
108                     interesting events are happening
109     """
110     self.context = context
111     self._feedback_fn = None
112     self.exclusive_BGL = False
113     self.rpc = rpc.RpcRunner(context.cfg)
114     self.hmclass = HooksMaster
115
116   def _ExecLU(self, lu):
117     """Logical Unit execution sequence.
118
119     """
120     write_count = self.context.cfg.write_count
121     lu.CheckPrereq()
122     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
123     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
124     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
125                      self._feedback_fn, None)
126
127     if getattr(lu.op, "dry_run", False):
128       # in this mode, no post-hooks are run, and the config is not
129       # written (as it might have been modified by another LU, and we
130       # shouldn't do writeout on behalf of other threads
131       self.LogInfo("dry-run mode requested, not actually executing"
132                    " the operation")
133       return lu.dry_run_result
134
135     try:
136       result = lu.Exec(self._feedback_fn)
137       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
138       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
139                                 self._feedback_fn, result)
140     finally:
141       # FIXME: This needs locks if not lu_class.REQ_BGL
142       if write_count != self.context.cfg.write_count:
143         hm.RunConfigUpdate()
144
145     return result
146
147   def _LockAndExecLU(self, lu, level):
148     """Execute a Logical Unit, with the needed locks.
149
150     This is a recursive function that starts locking the given level, and
151     proceeds up, till there are no more locks to acquire. Then it executes the
152     given LU and its opcodes.
153
154     """
155     adding_locks = level in lu.add_locks
156     acquiring_locks = level in lu.needed_locks
157     if level not in locking.LEVELS:
158       if callable(self._run_notifier):
159         self._run_notifier()
160       result = self._ExecLU(lu)
161     elif adding_locks and acquiring_locks:
162       # We could both acquire and add locks at the same level, but for now we
163       # don't need this, so we'll avoid the complicated code needed.
164       raise NotImplementedError(
165         "Can't declare locks to acquire when adding others")
166     elif adding_locks or acquiring_locks:
167       lu.DeclareLocks(level)
168       share = lu.share_locks[level]
169       if acquiring_locks:
170         needed_locks = lu.needed_locks[level]
171         lu.acquired_locks[level] = self.context.glm.acquire(level,
172                                                             needed_locks,
173                                                             shared=share)
174       else: # adding_locks
175         add_locks = lu.add_locks[level]
176         lu.remove_locks[level] = add_locks
177         try:
178           self.context.glm.add(level, add_locks, acquired=1, shared=share)
179         except errors.LockError:
180           raise errors.OpPrereqError(
181             "Couldn't add locks (%s), probably because of a race condition"
182             " with another job, who added them first" % add_locks)
183       try:
184         try:
185           if adding_locks:
186             lu.acquired_locks[level] = add_locks
187           result = self._LockAndExecLU(lu, level + 1)
188         finally:
189           if level in lu.remove_locks:
190             self.context.glm.remove(level, lu.remove_locks[level])
191       finally:
192         if self.context.glm.is_owned(level):
193           self.context.glm.release(level)
194     else:
195       result = self._LockAndExecLU(lu, level + 1)
196
197     return result
198
199   def ExecOpCode(self, op, feedback_fn, run_notifier):
200     """Execute an opcode.
201
202     @type op: an OpCode instance
203     @param op: the opcode to be executed
204     @type feedback_fn: a function that takes a single argument
205     @param feedback_fn: this function will be used as feedback from the LU
206                         code to the end-user
207     @type run_notifier: callable (no arguments) or None
208     @param run_notifier:  this function (if callable) will be called when
209                           we are about to call the lu's Exec() method, that
210                           is, after we have acquired all locks
211
212     """
213     if not isinstance(op, opcodes.OpCode):
214       raise errors.ProgrammerError("Non-opcode instance passed"
215                                    " to ExecOpcode")
216
217     self._feedback_fn = feedback_fn
218     self._run_notifier = run_notifier
219     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
220     if lu_class is None:
221       raise errors.OpCodeUnknown("Unknown opcode")
222
223     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
224     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
225     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
226                              shared=not lu_class.REQ_BGL)
227     try:
228       self.exclusive_BGL = lu_class.REQ_BGL
229       lu = lu_class(self, op, self.context, self.rpc)
230       lu.ExpandNames()
231       assert lu.needed_locks is not None, "needed_locks not set by LU"
232       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
233     finally:
234       self.context.glm.release(locking.LEVEL_CLUSTER)
235       self.exclusive_BGL = False
236
237     return result
238
239   def LogStep(self, current, total, message):
240     """Log a change in LU execution progress.
241
242     """
243     logging.debug("Step %d/%d %s", current, total, message)
244     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
245
246   def LogWarning(self, message, *args, **kwargs):
247     """Log a warning to the logs and the user.
248
249     The optional keyword argument is 'hint' and can be used to show a
250     hint to the user (presumably related to the warning). If the
251     message is empty, it will not be printed at all, allowing one to
252     show only a hint.
253
254     """
255     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
256            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
257     if args:
258       message = message % tuple(args)
259     if message:
260       logging.warning(message)
261       self._feedback_fn(" - WARNING: %s" % message)
262     if "hint" in kwargs:
263       self._feedback_fn("      Hint: %s" % kwargs["hint"])
264
265   def LogInfo(self, message, *args):
266     """Log an informational message to the logs and the user.
267
268     """
269     if args:
270       message = message % tuple(args)
271     logging.info(message)
272     self._feedback_fn(" - INFO: %s" % message)
273
274
275 class HooksMaster(object):
276   """Hooks master.
277
278   This class distributes the run commands to the nodes based on the
279   specific LU class.
280
281   In order to remove the direct dependency on the rpc module, the
282   constructor needs a function which actually does the remote
283   call. This will usually be rpc.call_hooks_runner, but any function
284   which behaves the same works.
285
286   """
287   def __init__(self, callfn, lu):
288     self.callfn = callfn
289     self.lu = lu
290     self.op = lu.op
291     self.env, node_list_pre, node_list_post = self._BuildEnv()
292     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
293                       constants.HOOKS_PHASE_POST: node_list_post}
294
295   def _BuildEnv(self):
296     """Compute the environment and the target nodes.
297
298     Based on the opcode and the current node list, this builds the
299     environment for the hooks and the target node list for the run.
300
301     """
302     env = {
303       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
304       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
305       "GANETI_OP_CODE": self.op.OP_ID,
306       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
307       "GANETI_DATA_DIR": constants.DATA_DIR,
308       }
309
310     if self.lu.HPATH is not None:
311       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
312       if lu_env:
313         for key in lu_env:
314           env["GANETI_" + key] = lu_env[key]
315     else:
316       lu_nodes_pre = lu_nodes_post = []
317
318     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
319
320   def _RunWrapper(self, node_list, hpath, phase):
321     """Simple wrapper over self.callfn.
322
323     This method fixes the environment before doing the rpc call.
324
325     """
326     env = self.env.copy()
327     env["GANETI_HOOKS_PHASE"] = phase
328     env["GANETI_HOOKS_PATH"] = hpath
329     if self.lu.cfg is not None:
330       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
331       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
332
333     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
334
335     return self.callfn(node_list, hpath, phase, env)
336
337   def RunPhase(self, phase, nodes=None):
338     """Run all the scripts for a phase.
339
340     This is the main function of the HookMaster.
341
342     @param phase: one of L{constants.HOOKS_PHASE_POST} or
343         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
344     @param nodes: overrides the predefined list of nodes for the given phase
345     @return: the processed results of the hooks multi-node rpc call
346     @raise errors.HooksFailure: on communication failure to the nodes
347     @raise errors.HooksAbort: on failure of one of the hooks
348
349     """
350     if not self.node_list[phase] and not nodes:
351       # empty node list, we should not attempt to run this as either
352       # we're in the cluster init phase and the rpc client part can't
353       # even attempt to run, or this LU doesn't do hooks at all
354       return
355     hpath = self.lu.HPATH
356     if nodes is not None:
357       results = self._RunWrapper(nodes, hpath, phase)
358     else:
359       results = self._RunWrapper(self.node_list[phase], hpath, phase)
360     errs = []
361     if not results:
362       msg = "Communication Failure"
363       if phase == constants.HOOKS_PHASE_PRE:
364         raise errors.HooksFailure(msg)
365       else:
366         self.lu.LogWarning(msg)
367         return results
368     for node_name in results:
369       res = results[node_name]
370       if res.offline:
371         continue
372       msg = res.RemoteFailMsg()
373       if msg:
374         self.lu.LogWarning("Communication failure to node %s: %s",
375                            node_name, msg)
376         continue
377       for script, hkr, output in res.payload:
378         if hkr == constants.HKR_FAIL:
379           if phase == constants.HOOKS_PHASE_PRE:
380             errs.append((node_name, script, output))
381           else:
382             if not output:
383               output = "(no output)"
384             self.lu.LogWarning("On %s script %s failed, output: %s" %
385                                (node_name, script, output))
386     if errs and phase == constants.HOOKS_PHASE_PRE:
387       raise errors.HooksAbort(errs)
388     return results
389
390   def RunConfigUpdate(self):
391     """Run the special configuration update hook
392
393     This is a special hook that runs only on the master after each
394     top-level LI if the configuration has been updated.
395
396     """
397     phase = constants.HOOKS_PHASE_POST
398     hpath = constants.HOOKS_NAME_CFGUPDATE
399     nodes = [self.lu.cfg.GetMasterNode()]
400     self._RunWrapper(nodes, hpath, phase)