Fix issue when acquiring empty lock sets
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import ssconf
38 from ganeti import logger
39 from ganeti import locking
40
41
42 class Processor(object):
43   """Object which runs OpCodes"""
44   DISPATCH_TABLE = {
45     # Cluster
46     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49     opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
50     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53     # node lu
54     opcodes.OpAddNode: cmdlib.LUAddNode,
55     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
58     # instance lu
59     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
60     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
61     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
62     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
63     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
64     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
65     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
66     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
67     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
68     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
69     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
70     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
71     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
72     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
73     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
74     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
75     # os lu
76     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
77     # exports lu
78     opcodes.OpQueryExports: cmdlib.LUQueryExports,
79     opcodes.OpExportInstance: cmdlib.LUExportInstance,
80     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
81     # tags lu
82     opcodes.OpGetTags: cmdlib.LUGetTags,
83     opcodes.OpSearchTags: cmdlib.LUSearchTags,
84     opcodes.OpAddTags: cmdlib.LUAddTags,
85     opcodes.OpDelTags: cmdlib.LUDelTags,
86     # test lu
87     opcodes.OpTestDelay: cmdlib.LUTestDelay,
88     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
89     }
90
91   def __init__(self, context):
92     """Constructor for Processor
93
94     Args:
95      - feedback_fn: the feedback function (taking one string) to be run when
96                     interesting events are happening
97     """
98     self.context = context
99     self._feedback_fn = None
100     self.exclusive_BGL = False
101
102   def _ExecLU(self, lu):
103     """Logical Unit execution sequence.
104
105     """
106     write_count = self.context.cfg.write_count
107     lu.CheckPrereq()
108     hm = HooksMaster(rpc.call_hooks_runner, self, lu)
109     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
110     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
111                      self._feedback_fn, None)
112     try:
113       result = lu.Exec(self._feedback_fn)
114       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
115       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
116                                 self._feedback_fn, result)
117     finally:
118       # FIXME: This needs locks if not lu_class.REQ_BGL
119       if write_count != self.context.cfg.write_count:
120         hm.RunConfigUpdate()
121
122     return result
123
124   def _LockAndExecLU(self, lu, level):
125     """Execute a Logical Unit, with the needed locks.
126
127     This is a recursive function that starts locking the given level, and
128     proceeds up, till there are no more locks to acquire. Then it executes the
129     given LU and its opcodes.
130
131     """
132     if level not in locking.LEVELS:
133       result = self._ExecLU(lu)
134     elif level in lu.needed_locks:
135       # This gives a chance to LUs to make last-minute changes after acquiring
136       # locks at any preceding level.
137       lu.DeclareLocks(level)
138       needed_locks = lu.needed_locks[level]
139       share = lu.share_locks[level]
140       # This is always safe to do, as we can't acquire more/less locks than
141       # what was requested.
142       lu.acquired_locks[level] = self.context.glm.acquire(level,
143                                                           needed_locks,
144                                                           shared=share)
145       try:
146         result = self._LockAndExecLU(lu, level + 1)
147       finally:
148         # We need to release the current level if we acquired any lock, or if
149         # we acquired the set-lock (needed_locks is None)
150         if lu.needed_locks[level] is None or lu.acquired_locks[level]:
151           self.context.glm.release(level)
152     else:
153       result = self._LockAndExecLU(lu, level + 1)
154
155     return result
156
157   def ExecOpCode(self, op, feedback_fn):
158     """Execute an opcode.
159
160     Args:
161       op: the opcode to be executed
162
163     """
164     if not isinstance(op, opcodes.OpCode):
165       raise errors.ProgrammerError("Non-opcode instance passed"
166                                    " to ExecOpcode")
167
168     self._feedback_fn = feedback_fn
169     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
170     if lu_class is None:
171       raise errors.OpCodeUnknown("Unknown opcode")
172
173     if lu_class.REQ_WSSTORE:
174       sstore = ssconf.WritableSimpleStore()
175     else:
176       sstore = ssconf.SimpleStore()
177
178     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
179     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
180     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
181                              shared=not lu_class.REQ_BGL)
182     try:
183       self.exclusive_BGL = lu_class.REQ_BGL
184       lu = lu_class(self, op, self.context, sstore)
185       lu.ExpandNames()
186       assert lu.needed_locks is not None, "needed_locks not set by LU"
187       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
188     finally:
189       self.context.glm.release(locking.LEVEL_CLUSTER)
190       self.exclusive_BGL = False
191
192     return result
193
194   def ChainOpCode(self, op):
195     """Chain and execute an opcode.
196
197     This is used by LUs when they need to execute a child LU.
198
199     Args:
200      - opcode: the opcode to be executed
201
202     """
203     if not isinstance(op, opcodes.OpCode):
204       raise errors.ProgrammerError("Non-opcode instance passed"
205                                    " to ExecOpcode")
206
207     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
208     if lu_class is None:
209       raise errors.OpCodeUnknown("Unknown opcode")
210
211     if lu_class.REQ_BGL and not self.exclusive_BGL:
212       raise errors.ProgrammerError("LUs which require the BGL cannot"
213                                    " be chained to granular ones.")
214
215     assert lu_class.REQ_BGL, "ChainOpCode is still BGL-only"
216
217     if lu_class.REQ_WSSTORE:
218       sstore = ssconf.WritableSimpleStore()
219     else:
220       sstore = ssconf.SimpleStore()
221
222     #do_hooks = lu_class.HPATH is not None
223     lu = lu_class(self, op, self.context, sstore)
224     lu.CheckPrereq()
225     #if do_hooks:
226     #  hm = HooksMaster(rpc.call_hooks_runner, self, lu)
227     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
228     #  lu.HooksCallBack(constants.HOOKS_PHASE_PRE,
229     #                   h_results, self._feedback_fn, None)
230     result = lu.Exec(self._feedback_fn)
231     #if do_hooks:
232     #  h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
233     #  result = lu.HooksCallBack(constants.HOOKS_PHASE_POST,
234     #                   h_results, self._feedback_fn, result)
235     return result
236
237   def LogStep(self, current, total, message):
238     """Log a change in LU execution progress.
239
240     """
241     logger.Debug("Step %d/%d %s" % (current, total, message))
242     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
243
244   def LogWarning(self, message, hint=None):
245     """Log a warning to the logs and the user.
246
247     """
248     logger.Error(message)
249     self._feedback_fn(" - WARNING: %s" % message)
250     if hint:
251       self._feedback_fn("      Hint: %s" % hint)
252
253   def LogInfo(self, message):
254     """Log an informational message to the logs and the user.
255
256     """
257     logger.Info(message)
258     self._feedback_fn(" - INFO: %s" % message)
259
260
261 class HooksMaster(object):
262   """Hooks master.
263
264   This class distributes the run commands to the nodes based on the
265   specific LU class.
266
267   In order to remove the direct dependency on the rpc module, the
268   constructor needs a function which actually does the remote
269   call. This will usually be rpc.call_hooks_runner, but any function
270   which behaves the same works.
271
272   """
273   def __init__(self, callfn, proc, lu):
274     self.callfn = callfn
275     self.proc = proc
276     self.lu = lu
277     self.op = lu.op
278     self.env, node_list_pre, node_list_post = self._BuildEnv()
279     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
280                       constants.HOOKS_PHASE_POST: node_list_post}
281
282   def _BuildEnv(self):
283     """Compute the environment and the target nodes.
284
285     Based on the opcode and the current node list, this builds the
286     environment for the hooks and the target node list for the run.
287
288     """
289     env = {
290       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
291       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
292       "GANETI_OP_CODE": self.op.OP_ID,
293       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
294       "GANETI_DATA_DIR": constants.DATA_DIR,
295       }
296
297     if self.lu.HPATH is not None:
298       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
299       if lu_env:
300         for key in lu_env:
301           env["GANETI_" + key] = lu_env[key]
302     else:
303       lu_nodes_pre = lu_nodes_post = []
304
305     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
306
307   def _RunWrapper(self, node_list, hpath, phase):
308     """Simple wrapper over self.callfn.
309
310     This method fixes the environment before doing the rpc call.
311
312     """
313     env = self.env.copy()
314     env["GANETI_HOOKS_PHASE"] = phase
315     env["GANETI_HOOKS_PATH"] = hpath
316     if self.lu.sstore is not None:
317       env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
318       env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
319
320     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
321
322     return self.callfn(node_list, hpath, phase, env)
323
324   def RunPhase(self, phase):
325     """Run all the scripts for a phase.
326
327     This is the main function of the HookMaster.
328
329     Args:
330       phase: the hooks phase to run
331
332     Returns:
333       the result of the hooks multi-node rpc call
334
335     """
336     if not self.node_list[phase]:
337       # empty node list, we should not attempt to run this as either
338       # we're in the cluster init phase and the rpc client part can't
339       # even attempt to run, or this LU doesn't do hooks at all
340       return
341     hpath = self.lu.HPATH
342     results = self._RunWrapper(self.node_list[phase], hpath, phase)
343     if phase == constants.HOOKS_PHASE_PRE:
344       errs = []
345       if not results:
346         raise errors.HooksFailure("Communication failure")
347       for node_name in results:
348         res = results[node_name]
349         if res is False or not isinstance(res, list):
350           self.proc.LogWarning("Communication failure to node %s" % node_name)
351           continue
352         for script, hkr, output in res:
353           if hkr == constants.HKR_FAIL:
354             output = output.strip().encode("string_escape")
355             errs.append((node_name, script, output))
356       if errs:
357         raise errors.HooksAbort(errs)
358     return results
359
360   def RunConfigUpdate(self):
361     """Run the special configuration update hook
362
363     This is a special hook that runs only on the master after each
364     top-level LI if the configuration has been updated.
365
366     """
367     phase = constants.HOOKS_PHASE_POST
368     hpath = constants.HOOKS_NAME_CFGUPDATE
369     if self.lu.sstore is None:
370       raise errors.ProgrammerError("Null sstore on config update hook")
371     nodes = [self.lu.sstore.GetMasterNode()]
372     results = self._RunWrapper(nodes, hpath, phase)