Add new query to get cluster config values
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import ssconf
38 from ganeti import logger
39 from ganeti import locking
40
41
42 class Processor(object):
43   """Object which runs OpCodes"""
44   DISPATCH_TABLE = {
45     # Cluster
46     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
50     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53     # node lu
54     opcodes.OpAddNode: cmdlib.LUAddNode,
55     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
58     # instance lu
59     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
60     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
61     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
62     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
63     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
64     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
65     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
66     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
67     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
68     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
69     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
70     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
71     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
72     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
73     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
74     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
75     # os lu
76     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
77     # exports lu
78     opcodes.OpQueryExports: cmdlib.LUQueryExports,
79     opcodes.OpExportInstance: cmdlib.LUExportInstance,
80     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
81     # tags lu
82     opcodes.OpGetTags: cmdlib.LUGetTags,
83     opcodes.OpSearchTags: cmdlib.LUSearchTags,
84     opcodes.OpAddTags: cmdlib.LUAddTags,
85     opcodes.OpDelTags: cmdlib.LUDelTags,
86     # test lu
87     opcodes.OpTestDelay: cmdlib.LUTestDelay,
88     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
89     }
90
91   def __init__(self, context):
92     """Constructor for Processor
93
94     Args:
95      - feedback_fn: the feedback function (taking one string) to be run when
96                     interesting events are happening
97     """
98     self.context = context
99     self._feedback_fn = None
100     self.exclusive_BGL = False
101
102   def _ExecLU(self, lu):
103     """Logical Unit execution sequence.
104
105     """
106     write_count = self.context.cfg.write_count
107     lu.CheckPrereq()
108     hm = HooksMaster(rpc.call_hooks_runner, self, lu)
109     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
110     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
111                      self._feedback_fn, None)
112     try:
113       result = lu.Exec(self._feedback_fn)
114       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
115       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
116                                 self._feedback_fn, result)
117     finally:
118       # FIXME: This needs locks if not lu_class.REQ_BGL
119       if write_count != self.context.cfg.write_count:
120         hm.RunConfigUpdate()
121
122     return result
123
124   def _LockAndExecLU(self, lu, level):
125     """Execute a Logical Unit, with the needed locks.
126
127     This is a recursive function that starts locking the given level, and
128     proceeds up, till there are no more locks to acquire. Then it executes the
129     given LU and its opcodes.
130
131     """
132     adding_locks = level in lu.add_locks
133     acquiring_locks = level in lu.needed_locks
134     if level not in locking.LEVELS:
135       result = self._ExecLU(lu)
136     elif adding_locks and acquiring_locks:
137       # We could both acquire and add locks at the same level, but for now we
138       # don't need this, so we'll avoid the complicated code needed.
139       raise NotImplementedError(
140         "Can't declare locks to acquire when adding others")
141     elif adding_locks or acquiring_locks:
142       lu.DeclareLocks(level)
143       share = lu.share_locks[level]
144       if acquiring_locks:
145         needed_locks = lu.needed_locks[level]
146         lu.acquired_locks[level] = self.context.glm.acquire(level,
147                                                             needed_locks,
148                                                             shared=share)
149       else: # adding_locks
150         add_locks = lu.add_locks[level]
151         lu.remove_locks[level] = add_locks
152         try:
153           self.context.glm.add(level, add_locks, acquired=1, shared=share)
154         except errors.LockError:
155           raise errors.OpPrereqError(
156             "Coudn't add locks (%s), probably because of a race condition"
157             " with another job, who added them first" % add_locks)
158       try:
159         try:
160           if adding_locks:
161             lu.acquired_locks[level] = add_locks
162           result = self._LockAndExecLU(lu, level + 1)
163         finally:
164           if level in lu.remove_locks:
165             self.context.glm.remove(level, lu.remove_locks[level])
166       finally:
167         if self.context.glm.is_owned(level):
168           self.context.glm.release(level)
169     else:
170       result = self._LockAndExecLU(lu, level + 1)
171
172     return result
173
174   def ExecOpCode(self, op, feedback_fn):
175     """Execute an opcode.
176
177     Args:
178       op: the opcode to be executed
179
180     """
181     if not isinstance(op, opcodes.OpCode):
182       raise errors.ProgrammerError("Non-opcode instance passed"
183                                    " to ExecOpcode")
184
185     self._feedback_fn = feedback_fn
186     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
187     if lu_class is None:
188       raise errors.OpCodeUnknown("Unknown opcode")
189
190     if lu_class.REQ_WSSTORE:
191       sstore = ssconf.WritableSimpleStore()
192     else:
193       sstore = ssconf.SimpleStore()
194
195     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
196     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
197     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
198                              shared=not lu_class.REQ_BGL)
199     try:
200       self.exclusive_BGL = lu_class.REQ_BGL
201       lu = lu_class(self, op, self.context, sstore)
202       lu.ExpandNames()
203       assert lu.needed_locks is not None, "needed_locks not set by LU"
204       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
205     finally:
206       self.context.glm.release(locking.LEVEL_CLUSTER)
207       self.exclusive_BGL = False
208
209     return result
210
211   def LogStep(self, current, total, message):
212     """Log a change in LU execution progress.
213
214     """
215     logger.Debug("Step %d/%d %s" % (current, total, message))
216     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
217
218   def LogWarning(self, message, hint=None):
219     """Log a warning to the logs and the user.
220
221     """
222     logger.Error(message)
223     self._feedback_fn(" - WARNING: %s" % message)
224     if hint:
225       self._feedback_fn("      Hint: %s" % hint)
226
227   def LogInfo(self, message):
228     """Log an informational message to the logs and the user.
229
230     """
231     logger.Info(message)
232     self._feedback_fn(" - INFO: %s" % message)
233
234
235 class HooksMaster(object):
236   """Hooks master.
237
238   This class distributes the run commands to the nodes based on the
239   specific LU class.
240
241   In order to remove the direct dependency on the rpc module, the
242   constructor needs a function which actually does the remote
243   call. This will usually be rpc.call_hooks_runner, but any function
244   which behaves the same works.
245
246   """
247   def __init__(self, callfn, proc, lu):
248     self.callfn = callfn
249     self.proc = proc
250     self.lu = lu
251     self.op = lu.op
252     self.env, node_list_pre, node_list_post = self._BuildEnv()
253     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
254                       constants.HOOKS_PHASE_POST: node_list_post}
255
256   def _BuildEnv(self):
257     """Compute the environment and the target nodes.
258
259     Based on the opcode and the current node list, this builds the
260     environment for the hooks and the target node list for the run.
261
262     """
263     env = {
264       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
265       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
266       "GANETI_OP_CODE": self.op.OP_ID,
267       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
268       "GANETI_DATA_DIR": constants.DATA_DIR,
269       }
270
271     if self.lu.HPATH is not None:
272       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
273       if lu_env:
274         for key in lu_env:
275           env["GANETI_" + key] = lu_env[key]
276     else:
277       lu_nodes_pre = lu_nodes_post = []
278
279     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
280
281   def _RunWrapper(self, node_list, hpath, phase):
282     """Simple wrapper over self.callfn.
283
284     This method fixes the environment before doing the rpc call.
285
286     """
287     env = self.env.copy()
288     env["GANETI_HOOKS_PHASE"] = phase
289     env["GANETI_HOOKS_PATH"] = hpath
290     if self.lu.sstore is not None:
291       env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
292       env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
293
294     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
295
296     return self.callfn(node_list, hpath, phase, env)
297
298   def RunPhase(self, phase):
299     """Run all the scripts for a phase.
300
301     This is the main function of the HookMaster.
302
303     Args:
304       phase: the hooks phase to run
305
306     Returns:
307       the result of the hooks multi-node rpc call
308
309     """
310     if not self.node_list[phase]:
311       # empty node list, we should not attempt to run this as either
312       # we're in the cluster init phase and the rpc client part can't
313       # even attempt to run, or this LU doesn't do hooks at all
314       return
315     hpath = self.lu.HPATH
316     results = self._RunWrapper(self.node_list[phase], hpath, phase)
317     if phase == constants.HOOKS_PHASE_PRE:
318       errs = []
319       if not results:
320         raise errors.HooksFailure("Communication failure")
321       for node_name in results:
322         res = results[node_name]
323         if res is False or not isinstance(res, list):
324           self.proc.LogWarning("Communication failure to node %s" % node_name)
325           continue
326         for script, hkr, output in res:
327           if hkr == constants.HKR_FAIL:
328             output = output.strip().encode("string_escape")
329             errs.append((node_name, script, output))
330       if errs:
331         raise errors.HooksAbort(errs)
332     return results
333
334   def RunConfigUpdate(self):
335     """Run the special configuration update hook
336
337     This is a special hook that runs only on the master after each
338     top-level LI if the configuration has been updated.
339
340     """
341     phase = constants.HOOKS_PHASE_POST
342     hpath = constants.HOOKS_NAME_CFGUPDATE
343     if self.lu.sstore is None:
344       raise errors.ProgrammerError("Null sstore on config update hook")
345     nodes = [self.lu.sstore.GetMasterNode()]
346     results = self._RunWrapper(nodes, hpath, phase)