Convert mcpu.py to use the logging module
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32
33 from ganeti import opcodes
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import rpc
37 from ganeti import cmdlib
38 from ganeti import locking
39
40
41 class Processor(object):
42   """Object which runs OpCodes"""
43   DISPATCH_TABLE = {
44     # Cluster
45     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
46     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
47     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
48     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
49     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
50     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
51     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
52     # node lu
53     opcodes.OpAddNode: cmdlib.LUAddNode,
54     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
55     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
56     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
57     # instance lu
58     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
59     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
60     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
61     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
62     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
63     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
64     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
65     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
66     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
67     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
68     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
69     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
70     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
71     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
72     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
73     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
74     # os lu
75     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
76     # exports lu
77     opcodes.OpQueryExports: cmdlib.LUQueryExports,
78     opcodes.OpExportInstance: cmdlib.LUExportInstance,
79     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
80     # tags lu
81     opcodes.OpGetTags: cmdlib.LUGetTags,
82     opcodes.OpSearchTags: cmdlib.LUSearchTags,
83     opcodes.OpAddTags: cmdlib.LUAddTags,
84     opcodes.OpDelTags: cmdlib.LUDelTags,
85     # test lu
86     opcodes.OpTestDelay: cmdlib.LUTestDelay,
87     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
88     }
89
90   def __init__(self, context):
91     """Constructor for Processor
92
93     Args:
94      - feedback_fn: the feedback function (taking one string) to be run when
95                     interesting events are happening
96     """
97     self.context = context
98     self._feedback_fn = None
99     self.exclusive_BGL = False
100     self.rpc = rpc.RpcRunner(context.cfg)
101
102   def _ExecLU(self, lu):
103     """Logical Unit execution sequence.
104
105     """
106     write_count = self.context.cfg.write_count
107     lu.CheckPrereq()
108     hm = HooksMaster(self.rpc.call_hooks_runner, self, lu)
109     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
110     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
111                      self._feedback_fn, None)
112     try:
113       result = lu.Exec(self._feedback_fn)
114       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
115       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
116                                 self._feedback_fn, result)
117     finally:
118       # FIXME: This needs locks if not lu_class.REQ_BGL
119       if write_count != self.context.cfg.write_count:
120         hm.RunConfigUpdate()
121
122     return result
123
124   def _LockAndExecLU(self, lu, level):
125     """Execute a Logical Unit, with the needed locks.
126
127     This is a recursive function that starts locking the given level, and
128     proceeds up, till there are no more locks to acquire. Then it executes the
129     given LU and its opcodes.
130
131     """
132     adding_locks = level in lu.add_locks
133     acquiring_locks = level in lu.needed_locks
134     if level not in locking.LEVELS:
135       if callable(self._run_notifier):
136         self._run_notifier()
137       result = self._ExecLU(lu)
138     elif adding_locks and acquiring_locks:
139       # We could both acquire and add locks at the same level, but for now we
140       # don't need this, so we'll avoid the complicated code needed.
141       raise NotImplementedError(
142         "Can't declare locks to acquire when adding others")
143     elif adding_locks or acquiring_locks:
144       lu.DeclareLocks(level)
145       share = lu.share_locks[level]
146       if acquiring_locks:
147         needed_locks = lu.needed_locks[level]
148         lu.acquired_locks[level] = self.context.glm.acquire(level,
149                                                             needed_locks,
150                                                             shared=share)
151       else: # adding_locks
152         add_locks = lu.add_locks[level]
153         lu.remove_locks[level] = add_locks
154         try:
155           self.context.glm.add(level, add_locks, acquired=1, shared=share)
156         except errors.LockError:
157           raise errors.OpPrereqError(
158             "Coudn't add locks (%s), probably because of a race condition"
159             " with another job, who added them first" % add_locks)
160       try:
161         try:
162           if adding_locks:
163             lu.acquired_locks[level] = add_locks
164           result = self._LockAndExecLU(lu, level + 1)
165         finally:
166           if level in lu.remove_locks:
167             self.context.glm.remove(level, lu.remove_locks[level])
168       finally:
169         if self.context.glm.is_owned(level):
170           self.context.glm.release(level)
171     else:
172       result = self._LockAndExecLU(lu, level + 1)
173
174     return result
175
176   def ExecOpCode(self, op, feedback_fn, run_notifier):
177     """Execute an opcode.
178
179     @type op: an OpCode instance
180     @param op: the opcode to be executed
181     @type feedback_fn: a function that takes a single argument
182     @param feedback_fn: this function will be used as feedback from the LU
183                         code to the end-user
184     @type run_notifier: callable (no arguments) or None
185     @param run_notifier:  this function (if callable) will be called when
186                           we are about to call the lu's Exec() method, that
187                           is, after we have aquired all locks
188
189     """
190     if not isinstance(op, opcodes.OpCode):
191       raise errors.ProgrammerError("Non-opcode instance passed"
192                                    " to ExecOpcode")
193
194     self._feedback_fn = feedback_fn
195     self._run_notifier = run_notifier
196     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
197     if lu_class is None:
198       raise errors.OpCodeUnknown("Unknown opcode")
199
200     # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
201     # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
202     self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
203                              shared=not lu_class.REQ_BGL)
204     try:
205       self.exclusive_BGL = lu_class.REQ_BGL
206       lu = lu_class(self, op, self.context, self.rpc)
207       lu.ExpandNames()
208       assert lu.needed_locks is not None, "needed_locks not set by LU"
209       result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
210     finally:
211       self.context.glm.release(locking.LEVEL_CLUSTER)
212       self.exclusive_BGL = False
213
214     return result
215
216   def LogStep(self, current, total, message):
217     """Log a change in LU execution progress.
218
219     """
220     logging.debug("Step %d/%d %s", current, total, message)
221     self._feedback_fn("STEP %d/%d %s" % (current, total, message))
222
223   def LogWarning(self, message, hint=None):
224     """Log a warning to the logs and the user.
225
226     """
227     logging.warning(message)
228     self._feedback_fn(" - WARNING: %s" % message)
229     if hint:
230       self._feedback_fn("      Hint: %s" % hint)
231
232   def LogInfo(self, message):
233     """Log an informational message to the logs and the user.
234
235     """
236     logging.info(message)
237     self._feedback_fn(" - INFO: %s" % message)
238
239
240 class HooksMaster(object):
241   """Hooks master.
242
243   This class distributes the run commands to the nodes based on the
244   specific LU class.
245
246   In order to remove the direct dependency on the rpc module, the
247   constructor needs a function which actually does the remote
248   call. This will usually be rpc.call_hooks_runner, but any function
249   which behaves the same works.
250
251   """
252   def __init__(self, callfn, proc, lu):
253     self.callfn = callfn
254     self.proc = proc
255     self.lu = lu
256     self.op = lu.op
257     self.env, node_list_pre, node_list_post = self._BuildEnv()
258     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
259                       constants.HOOKS_PHASE_POST: node_list_post}
260
261   def _BuildEnv(self):
262     """Compute the environment and the target nodes.
263
264     Based on the opcode and the current node list, this builds the
265     environment for the hooks and the target node list for the run.
266
267     """
268     env = {
269       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
270       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
271       "GANETI_OP_CODE": self.op.OP_ID,
272       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
273       "GANETI_DATA_DIR": constants.DATA_DIR,
274       }
275
276     if self.lu.HPATH is not None:
277       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
278       if lu_env:
279         for key in lu_env:
280           env["GANETI_" + key] = lu_env[key]
281     else:
282       lu_nodes_pre = lu_nodes_post = []
283
284     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
285
286   def _RunWrapper(self, node_list, hpath, phase):
287     """Simple wrapper over self.callfn.
288
289     This method fixes the environment before doing the rpc call.
290
291     """
292     env = self.env.copy()
293     env["GANETI_HOOKS_PHASE"] = phase
294     env["GANETI_HOOKS_PATH"] = hpath
295     if self.lu.cfg is not None:
296       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
297       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
298
299     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
300
301     return self.callfn(node_list, hpath, phase, env)
302
303   def RunPhase(self, phase):
304     """Run all the scripts for a phase.
305
306     This is the main function of the HookMaster.
307
308     Args:
309       phase: the hooks phase to run
310
311     Returns:
312       the result of the hooks multi-node rpc call
313
314     """
315     if not self.node_list[phase]:
316       # empty node list, we should not attempt to run this as either
317       # we're in the cluster init phase and the rpc client part can't
318       # even attempt to run, or this LU doesn't do hooks at all
319       return
320     hpath = self.lu.HPATH
321     results = self._RunWrapper(self.node_list[phase], hpath, phase)
322     if phase == constants.HOOKS_PHASE_PRE:
323       errs = []
324       if not results:
325         raise errors.HooksFailure("Communication failure")
326       for node_name in results:
327         res = results[node_name]
328         if res is False or not isinstance(res, list):
329           self.proc.LogWarning("Communication failure to node %s" % node_name)
330           continue
331         for script, hkr, output in res:
332           if hkr == constants.HKR_FAIL:
333             output = output.strip().encode("string_escape")
334             errs.append((node_name, script, output))
335       if errs:
336         raise errors.HooksAbort(errs)
337     return results
338
339   def RunConfigUpdate(self):
340     """Run the special configuration update hook
341
342     This is a special hook that runs only on the master after each
343     top-level LI if the configuration has been updated.
344
345     """
346     phase = constants.HOOKS_PHASE_POST
347     hpath = constants.HOOKS_NAME_CFGUPDATE
348     nodes = [self.lu.cfg.GetMasterNode()]
349     results = self._RunWrapper(nodes, hpath, phase)