Use is_owned to determine whether to unlock
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31
32 from ganeti import opcodes
33 from ganeti import constants
34 from ganeti import errors
35 from ganeti import rpc
36 from ganeti import cmdlib
37 from ganeti import ssconf
38 from ganeti import logger
39 from ganeti import locking
40
41
42 class Processor(object):
43   """Object which runs OpCodes"""
44   DISPATCH_TABLE = {
45     # Cluster
46     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
47     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
48     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
49     opcodes.OpDumpClusterConfig: cmdlib.LUDumpClusterConfig,
50     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
51     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
52     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
53     # node lu
54     opcodes.OpAddNode: cmdlib.LUAddNode,
55     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
56     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
57     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
58     # instance lu
59     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
60     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
61     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
62     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
63     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
64     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
65     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
66     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
67     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
68     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
69     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
70     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
71     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
72     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
73     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
74     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
75     # os lu
76     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
77     # exports lu
78     opcodes.OpQueryExports: cmdlib.LUQueryExports,
79     opcodes.OpExportInstance: cmdlib.LUExportInstance,
80     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
81     # tags lu
82     opcodes.OpGetTags: cmdlib.LUGetTags,
83     opcodes.OpSearchTags: cmdlib.LUSearchTags,
84     opcodes.OpAddTags: cmdlib.LUAddTags,
85     opcodes.OpDelTags: cmdlib.LUDelTags,
86     # test lu
87     opcodes.OpTestDelay: cmdlib.LUTestDelay,
88     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
89     }
90
91   def __init__(self, context):
92     """Constructor for Processor
93
94     Args:
95      - feedback_fn: the feedback function (taking one string) to be run when
96                     interesting events are happening
97     """
98     self.context = context
99     self._feedback_fn = None
100     self.exclusive_BGL = False
101
102   def _ExecLU(self, lu):
103     """Logical Unit execution sequence.
104
105     """
106     write_count = self.context.cfg.write_count
107     lu.CheckPrereq()
108     hm = HooksMaster(rpc.call_hooks_runner, self, lu)
109     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
110     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
111                      self._feedback_fn, None)
112     try:
113       result = lu.Exec(self._feedback_fn)
114       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
115       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
116                                 self._feedback_fn, result)
117     finally:
118       # FIXME: This needs locks if not lu_class.REQ_BGL
119       if write_count != self.context.cfg.write_count:
120         hm.RunConfigUpdate()
121
122     return result
123
124   def _LockAndExecLU(self, lu, level):
125     """Execute a Logical Unit, with the needed locks.
126
127     This is a recursive function that starts locking the given level, and
128     proceeds up, till there are no more locks to acquire. Then it executes the
129     given LU and its opcodes.
130
131     """
132     if level not in locking.LEVELS:
133       result = self._ExecLU(lu)
134     elif level in lu.needed_locks:
135       # This gives a chance to LUs to make last-minute changes after acquiring
136       # locks at any preceding level.
137       lu.DeclareLocks(level)
138       needed_locks = lu.needed_locks[level]
139       share = lu.share_locks[level]
140       # This is always safe to do, as we can't acquire more/less locks than
141       # what was requested.
142       lu.acquired_locks[level] = self.context.glm.acquire(level,
143                                                           needed_locks,
144                                                           shared=share)
145       try:
146         result = self._LockAndExecLU(lu, level + 1)
147       finally:
148         if self.context.glm.is_owned(level):
149           self.context.glm.release(level)
150     else:
151       result = self._LockAndExecLU(lu, level + 1)
152
153     return result
154
155   def ExecOpCode(self, op, feedback_fn):
156     """Execute an opcode.
157
158     Args:
159       op: the opcode to be executed
160
161     """
162     if not isinstance(op, opcodes.OpCode):
163       raise errors.ProgrammerError("Non-opcode instance passed"
164                                    " to ExecOpcode")
165
166     self._feedback_fn = feedback_fn
167     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
168     if lu_class is None:
169       raise errors.OpCodeUnknown("Unknown opcode")
170
171     if lu_class.REQ_WSSTORE:
172       sstore = ssconf.WritableSimpleStore()
173     else:
174       sstore = ssconf.SimpleStore()
175
176     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
177     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
178     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
179                              shared=not lu_class.REQ_BGL)
180     try:
181       self.exclusive_BGL = lu_class.REQ_BGL
182       lu = lu_class(self, op, self.context, sstore)
183       lu.ExpandNames()
184       assert lu.needed_locks is not None, "needed_locks not set by LU"
185       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
186     finally:
187       self.context.glm.release(locking.LEVEL_CLUSTER)
188       self.exclusive_BGL = False
189
190     return result
191
192   def LogStep(self, current, total, message):
193     """Log a change in LU execution progress.
194
195     """
196     logger.Debug("Step %d/%d %s" % (current, total, message))
197     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
198
199   def LogWarning(self, message, hint=None):
200     """Log a warning to the logs and the user.
201
202     """
203     logger.Error(message)
204     self._feedback_fn(" - WARNING: %s" % message)
205     if hint:
206       self._feedback_fn("      Hint: %s" % hint)
207
208   def LogInfo(self, message):
209     """Log an informational message to the logs and the user.
210
211     """
212     logger.Info(message)
213     self._feedback_fn(" - INFO: %s" % message)
214
215
216 class HooksMaster(object):
217   """Hooks master.
218
219   This class distributes the run commands to the nodes based on the
220   specific LU class.
221
222   In order to remove the direct dependency on the rpc module, the
223   constructor needs a function which actually does the remote
224   call. This will usually be rpc.call_hooks_runner, but any function
225   which behaves the same works.
226
227   """
228   def __init__(self, callfn, proc, lu):
229     self.callfn = callfn
230     self.proc = proc
231     self.lu = lu
232     self.op = lu.op
233     self.env, node_list_pre, node_list_post = self._BuildEnv()
234     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
235                       constants.HOOKS_PHASE_POST: node_list_post}
236
237   def _BuildEnv(self):
238     """Compute the environment and the target nodes.
239
240     Based on the opcode and the current node list, this builds the
241     environment for the hooks and the target node list for the run.
242
243     """
244     env = {
245       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
246       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
247       "GANETI_OP_CODE": self.op.OP_ID,
248       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
249       "GANETI_DATA_DIR": constants.DATA_DIR,
250       }
251
252     if self.lu.HPATH is not None:
253       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
254       if lu_env:
255         for key in lu_env:
256           env["GANETI_" + key] = lu_env[key]
257     else:
258       lu_nodes_pre = lu_nodes_post = []
259
260     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
261
262   def _RunWrapper(self, node_list, hpath, phase):
263     """Simple wrapper over self.callfn.
264
265     This method fixes the environment before doing the rpc call.
266
267     """
268     env = self.env.copy()
269     env["GANETI_HOOKS_PHASE"] = phase
270     env["GANETI_HOOKS_PATH"] = hpath
271     if self.lu.sstore is not None:
272       env["GANETI_CLUSTER"] = self.lu.sstore.GetClusterName()
273       env["GANETI_MASTER"] = self.lu.sstore.GetMasterNode()
274
275     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
276
277     return self.callfn(node_list, hpath, phase, env)
278
279   def RunPhase(self, phase):
280     """Run all the scripts for a phase.
281
282     This is the main function of the HookMaster.
283
284     Args:
285       phase: the hooks phase to run
286
287     Returns:
288       the result of the hooks multi-node rpc call
289
290     """
291     if not self.node_list[phase]:
292       # empty node list, we should not attempt to run this as either
293       # we're in the cluster init phase and the rpc client part can't
294       # even attempt to run, or this LU doesn't do hooks at all
295       return
296     hpath = self.lu.HPATH
297     results = self._RunWrapper(self.node_list[phase], hpath, phase)
298     if phase == constants.HOOKS_PHASE_PRE:
299       errs = []
300       if not results:
301         raise errors.HooksFailure("Communication failure")
302       for node_name in results:
303         res = results[node_name]
304         if res is False or not isinstance(res, list):
305           self.proc.LogWarning("Communication failure to node %s" % node_name)
306           continue
307         for script, hkr, output in res:
308           if hkr == constants.HKR_FAIL:
309             output = output.strip().encode("string_escape")
310             errs.append((node_name, script, output))
311       if errs:
312         raise errors.HooksAbort(errs)
313     return results
314
315   def RunConfigUpdate(self):
316     """Run the special configuration update hook
317
318     This is a special hook that runs only on the master after each
319     top-level LI if the configuration has been updated.
320
321     """
322     phase = constants.HOOKS_PHASE_POST
323     hpath = constants.HOOKS_NAME_CFGUPDATE
324     if self.lu.sstore is None:
325       raise errors.ProgrammerError("Null sstore on config update hook")
326     nodes = [self.lu.sstore.GetMasterNode()]
327     results = self._RunWrapper(nodes, hpath, phase)