Add new opcode to list physical volumes
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32
33 from ganeti import opcodes
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import rpc
37 from ganeti import cmdlib
38 from ganeti import locking
39
40
41 class Processor(object):
42   """Object which runs OpCodes"""
43   DISPATCH_TABLE = {
44     # Cluster
45     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
53     # node lu
54     opcodes.OpAddNode: cmdlib.LUAddNode,
55     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57     opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
58     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
59     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
60     opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
61     opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
62     opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
63     # instance lu
64     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
65     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
66     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
67     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
68     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
69     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
70     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
71     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
72     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
73     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
74     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
75     opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
76     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
77     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
78     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
79     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
80     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
81     # os lu
82     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
83     # exports lu
84     opcodes.OpQueryExports: cmdlib.LUQueryExports,
85     opcodes.OpExportInstance: cmdlib.LUExportInstance,
86     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
87     # tags lu
88     opcodes.OpGetTags: cmdlib.LUGetTags,
89     opcodes.OpSearchTags: cmdlib.LUSearchTags,
90     opcodes.OpAddTags: cmdlib.LUAddTags,
91     opcodes.OpDelTags: cmdlib.LUDelTags,
92     # test lu
93     opcodes.OpTestDelay: cmdlib.LUTestDelay,
94     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
95     }
96
97   def __init__(self, context):
98     """Constructor for Processor
99
100     Args:
101      - feedback_fn: the feedback function (taking one string) to be run when
102                     interesting events are happening
103     """
104     self.context = context
105     self._feedback_fn = None
106     self.exclusive_BGL = False
107     self.rpc = rpc.RpcRunner(context.cfg)
108
109   def _ExecLU(self, lu):
110     """Logical Unit execution sequence.
111
112     """
113     write_count = self.context.cfg.write_count
114     lu.CheckPrereq()
115     hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
116     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
117     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
118                      self._feedback_fn, None)
119
120     if getattr(lu.op, "dry_run", False):
121       # in this mode, no post-hooks are run, and the config is not
122       # written (as it might have been modified by another LU, and we
123       # shouldn't do writeout on behalf of other threads
124       self.LogInfo("dry-run mode requested, not actually executing"
125                    " the operation")
126       return lu.dry_run_result
127
128     try:
129       result = lu.Exec(self._feedback_fn)
130       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
131       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
132                                 self._feedback_fn, result)
133     finally:
134       # FIXME: This needs locks if not lu_class.REQ_BGL
135       if write_count != self.context.cfg.write_count:
136         hm.RunConfigUpdate()
137
138     return result
139
140   def _LockAndExecLU(self, lu, level):
141     """Execute a Logical Unit, with the needed locks.
142
143     This is a recursive function that starts locking the given level, and
144     proceeds up, till there are no more locks to acquire. Then it executes the
145     given LU and its opcodes.
146
147     """
148     adding_locks = level in lu.add_locks
149     acquiring_locks = level in lu.needed_locks
150     if level not in locking.LEVELS:
151       if callable(self._run_notifier):
152         self._run_notifier()
153       result = self._ExecLU(lu)
154     elif adding_locks and acquiring_locks:
155       # We could both acquire and add locks at the same level, but for now we
156       # don't need this, so we'll avoid the complicated code needed.
157       raise NotImplementedError(
158         "Can't declare locks to acquire when adding others")
159     elif adding_locks or acquiring_locks:
160       lu.DeclareLocks(level)
161       share = lu.share_locks[level]
162       if acquiring_locks:
163         needed_locks = lu.needed_locks[level]
164         lu.acquired_locks[level] = self.context.glm.acquire(level,
165                                                             needed_locks,
166                                                             shared=share)
167       else: # adding_locks
168         add_locks = lu.add_locks[level]
169         lu.remove_locks[level] = add_locks
170         try:
171           self.context.glm.add(level, add_locks, acquired=1, shared=share)
172         except errors.LockError:
173           raise errors.OpPrereqError(
174             "Couldn't add locks (%s), probably because of a race condition"
175             " with another job, who added them first" % add_locks)
176       try:
177         try:
178           if adding_locks:
179             lu.acquired_locks[level] = add_locks
180           result = self._LockAndExecLU(lu, level + 1)
181         finally:
182           if level in lu.remove_locks:
183             self.context.glm.remove(level, lu.remove_locks[level])
184       finally:
185         if self.context.glm.is_owned(level):
186           self.context.glm.release(level)
187     else:
188       result = self._LockAndExecLU(lu, level + 1)
189
190     return result
191
192   def ExecOpCode(self, op, feedback_fn, run_notifier):
193     """Execute an opcode.
194
195     @type op: an OpCode instance
196     @param op: the opcode to be executed
197     @type feedback_fn: a function that takes a single argument
198     @param feedback_fn: this function will be used as feedback from the LU
199                         code to the end-user
200     @type run_notifier: callable (no arguments) or None
201     @param run_notifier:  this function (if callable) will be called when
202                           we are about to call the lu's Exec() method, that
203                           is, after we have acquired all locks
204
205     """
206     if not isinstance(op, opcodes.OpCode):
207       raise errors.ProgrammerError("Non-opcode instance passed"
208                                    " to ExecOpcode")
209
210     self._feedback_fn = feedback_fn
211     self._run_notifier = run_notifier
212     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
213     if lu_class is None:
214       raise errors.OpCodeUnknown("Unknown opcode")
215
216     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
217     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
218     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
219                              shared=not lu_class.REQ_BGL)
220     try:
221       self.exclusive_BGL = lu_class.REQ_BGL
222       lu = lu_class(self, op, self.context, self.rpc)
223       lu.ExpandNames()
224       assert lu.needed_locks is not None, "needed_locks not set by LU"
225       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
226     finally:
227       self.context.glm.release(locking.LEVEL_CLUSTER)
228       self.exclusive_BGL = False
229
230     return result
231
232   def LogStep(self, current, total, message):
233     """Log a change in LU execution progress.
234
235     """
236     logging.debug("Step %d/%d %s", current, total, message)
237     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
238
239   def LogWarning(self, message, *args, **kwargs):
240     """Log a warning to the logs and the user.
241
242     The optional keyword argument is 'hint' and can be used to show a
243     hint to the user (presumably related to the warning). If the
244     message is empty, it will not be printed at all, allowing one to
245     show only a hint.
246
247     """
248     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
249            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
250     if args:
251       message = message % tuple(args)
252     if message:
253       logging.warning(message)
254       self._feedback_fn(" - WARNING: %s" % message)
255     if "hint" in kwargs:
256       self._feedback_fn("      Hint: %s" % kwargs["hint"])
257
258   def LogInfo(self, message, *args):
259     """Log an informational message to the logs and the user.
260
261     """
262     if args:
263       message = message % tuple(args)
264     logging.info(message)
265     self._feedback_fn(" - INFO: %s" % message)
266
267
268 class HooksMaster(object):
269   """Hooks master.
270
271   This class distributes the run commands to the nodes based on the
272   specific LU class.
273
274   In order to remove the direct dependency on the rpc module, the
275   constructor needs a function which actually does the remote
276   call. This will usually be rpc.call_hooks_runner, but any function
277   which behaves the same works.
278
279   """
280   def __init__(self, callfn, proc, lu):
281     self.callfn = callfn
282     self.proc = proc
283     self.lu = lu
284     self.op = lu.op
285     self.env, node_list_pre, node_list_post = self._BuildEnv()
286     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
287                       constants.HOOKS_PHASE_POST: node_list_post}
288
289   def _BuildEnv(self):
290     """Compute the environment and the target nodes.
291
292     Based on the opcode and the current node list, this builds the
293     environment for the hooks and the target node list for the run.
294
295     """
296     env = {
297       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
298       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
299       "GANETI_OP_CODE": self.op.OP_ID,
300       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
301       "GANETI_DATA_DIR": constants.DATA_DIR,
302       }
303
304     if self.lu.HPATH is not None:
305       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
306       if lu_env:
307         for key in lu_env:
308           env["GANETI_" + key] = lu_env[key]
309     else:
310       lu_nodes_pre = lu_nodes_post = []
311
312     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
313
314   def _RunWrapper(self, node_list, hpath, phase):
315     """Simple wrapper over self.callfn.
316
317     This method fixes the environment before doing the rpc call.
318
319     """
320     env = self.env.copy()
321     env["GANETI_HOOKS_PHASE"] = phase
322     env["GANETI_HOOKS_PATH"] = hpath
323     if self.lu.cfg is not None:
324       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
325       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
326
327     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
328
329     return self.callfn(node_list, hpath, phase, env)
330
331   def RunPhase(self, phase):
332     """Run all the scripts for a phase.
333
334     This is the main function of the HookMaster.
335
336     @param phase: one of L{constants.HOOKS_PHASE_POST} or
337         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
338     @return: the processed results of the hooks multi-node rpc call
339     @raise errors.HooksFailure: on communication failure to the nodes
340
341     """
342     if not self.node_list[phase]:
343       # empty node list, we should not attempt to run this as either
344       # we're in the cluster init phase and the rpc client part can't
345       # even attempt to run, or this LU doesn't do hooks at all
346       return
347     hpath = self.lu.HPATH
348     results = self._RunWrapper(self.node_list[phase], hpath, phase)
349     if phase == constants.HOOKS_PHASE_PRE:
350       errs = []
351       if not results:
352         raise errors.HooksFailure("Communication failure")
353       for node_name in results:
354         res = results[node_name]
355         if res.offline:
356           continue
357         msg = res.RemoteFailMsg()
358         if msg:
359           self.proc.LogWarning("Communication failure to node %s: %s",
360                                node_name, msg)
361           continue
362         for script, hkr, output in res.payload:
363           if hkr == constants.HKR_FAIL:
364             errs.append((node_name, script, output))
365       if errs:
366         raise errors.HooksAbort(errs)
367     return results
368
369   def RunConfigUpdate(self):
370     """Run the special configuration update hook
371
372     This is a special hook that runs only on the master after each
373     top-level LI if the configuration has been updated.
374
375     """
376     phase = constants.HOOKS_PHASE_POST
377     hpath = constants.HOOKS_NAME_CFGUPDATE
378     nodes = [self.lu.cfg.GetMasterNode()]
379     self._RunWrapper(nodes, hpath, phase)