Add case_sensitive keyword to MatchNameComponent
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32
33 from ganeti import opcodes
34 from ganeti import constants
35 from ganeti import errors
36 from ganeti import rpc
37 from ganeti import cmdlib
38 from ganeti import locking
39 from ganeti import utils
40
41
42 class OpExecCbBase:
43   """Base class for OpCode execution callbacks.
44
45   """
46   def NotifyStart(self):
47     """Called when we are about to execute the LU.
48
49     This function is called when we're about to start the lu's Exec() method,
50     that is, after we have acquired all locks.
51
52     """
53
54   def Feedback(self, *args):
55     """Sends feedback from the LU code to the end-user.
56
57     """
58
59   def ReportLocks(self, msg):
60     """Report lock operations.
61
62     """
63
64
65 class Processor(object):
66   """Object which runs OpCodes"""
67   DISPATCH_TABLE = {
68     # Cluster
69     opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
70     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
71     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
72     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
73     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
74     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
75     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
76     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
77     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
78     opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
79     # node lu
80     opcodes.OpAddNode: cmdlib.LUAddNode,
81     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
82     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
83     opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
84     opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
85     opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
86     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
87     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
88     opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
89     opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
90     opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
91     # instance lu
92     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
93     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
94     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
95     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
96     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
97     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
98     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
99     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
100     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
101     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
102     opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
103     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
104     opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
105     opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
106     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
107     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
108     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
109     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
110     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
111     # os lu
112     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
113     # exports lu
114     opcodes.OpQueryExports: cmdlib.LUQueryExports,
115     opcodes.OpExportInstance: cmdlib.LUExportInstance,
116     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
117     # tags lu
118     opcodes.OpGetTags: cmdlib.LUGetTags,
119     opcodes.OpSearchTags: cmdlib.LUSearchTags,
120     opcodes.OpAddTags: cmdlib.LUAddTags,
121     opcodes.OpDelTags: cmdlib.LUDelTags,
122     # test lu
123     opcodes.OpTestDelay: cmdlib.LUTestDelay,
124     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
125     }
126
127   def __init__(self, context):
128     """Constructor for Processor
129
130     """
131     self.context = context
132     self._cbs = None
133     self.exclusive_BGL = False
134     self.rpc = rpc.RpcRunner(context.cfg)
135     self.hmclass = HooksMaster
136
137   def _ReportLocks(self, level, names, shared, acquired):
138     """Reports lock operations.
139
140     @type level: int
141     @param level: Lock level
142     @type names: list or string
143     @param names: Lock names
144     @type shared: bool
145     @param shared: Whether the lock should be acquired in shared mode
146     @type acquired: bool
147     @param acquired: Whether the lock has already been acquired
148
149     """
150     parts = []
151
152     # Build message
153     if acquired:
154       parts.append("acquired")
155     else:
156       parts.append("waiting")
157
158     parts.append(locking.LEVEL_NAMES[level])
159
160     if names == locking.ALL_SET:
161       parts.append("ALL")
162     elif isinstance(names, basestring):
163       parts.append(names)
164     else:
165       parts.append(",".join(names))
166
167     if shared:
168       parts.append("shared")
169     else:
170       parts.append("exclusive")
171
172     msg = "/".join(parts)
173
174     logging.debug("LU locks %s", msg)
175
176     if self._cbs:
177       self._cbs.ReportLocks(msg)
178
179   def _ExecLU(self, lu):
180     """Logical Unit execution sequence.
181
182     """
183     write_count = self.context.cfg.write_count
184     lu.CheckPrereq()
185     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
186     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
187     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
188                      self._Feedback, None)
189
190     if getattr(lu.op, "dry_run", False):
191       # in this mode, no post-hooks are run, and the config is not
192       # written (as it might have been modified by another LU, and we
193       # shouldn't do writeout on behalf of other threads
194       self.LogInfo("dry-run mode requested, not actually executing"
195                    " the operation")
196       return lu.dry_run_result
197
198     try:
199       result = lu.Exec(self._Feedback)
200       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
201       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
202                                 self._Feedback, result)
203     finally:
204       # FIXME: This needs locks if not lu_class.REQ_BGL
205       if write_count != self.context.cfg.write_count:
206         hm.RunConfigUpdate()
207
208     return result
209
210   def _LockAndExecLU(self, lu, level):
211     """Execute a Logical Unit, with the needed locks.
212
213     This is a recursive function that starts locking the given level, and
214     proceeds up, till there are no more locks to acquire. Then it executes the
215     given LU and its opcodes.
216
217     """
218     adding_locks = level in lu.add_locks
219     acquiring_locks = level in lu.needed_locks
220     if level not in locking.LEVELS:
221       if self._cbs:
222         self._cbs.NotifyStart()
223
224       result = self._ExecLU(lu)
225     elif adding_locks and acquiring_locks:
226       # We could both acquire and add locks at the same level, but for now we
227       # don't need this, so we'll avoid the complicated code needed.
228       raise NotImplementedError(
229         "Can't declare locks to acquire when adding others")
230     elif adding_locks or acquiring_locks:
231       lu.DeclareLocks(level)
232       share = lu.share_locks[level]
233       if acquiring_locks:
234         needed_locks = lu.needed_locks[level]
235
236         self._ReportLocks(level, needed_locks, share, False)
237         lu.acquired_locks[level] = self.context.glm.acquire(level,
238                                                             needed_locks,
239                                                             shared=share)
240         self._ReportLocks(level, needed_locks, share, True)
241
242       else: # adding_locks
243         add_locks = lu.add_locks[level]
244         lu.remove_locks[level] = add_locks
245         try:
246           self.context.glm.add(level, add_locks, acquired=1, shared=share)
247         except errors.LockError:
248           raise errors.OpPrereqError(
249             "Couldn't add locks (%s), probably because of a race condition"
250             " with another job, who added them first" % add_locks)
251       try:
252         try:
253           if adding_locks:
254             lu.acquired_locks[level] = add_locks
255           result = self._LockAndExecLU(lu, level + 1)
256         finally:
257           if level in lu.remove_locks:
258             self.context.glm.remove(level, lu.remove_locks[level])
259       finally:
260         if self.context.glm.is_owned(level):
261           self.context.glm.release(level)
262     else:
263       result = self._LockAndExecLU(lu, level + 1)
264
265     return result
266
267   def ExecOpCode(self, op, cbs):
268     """Execute an opcode.
269
270     @type op: an OpCode instance
271     @param op: the opcode to be executed
272     @type cbs: L{OpExecCbBase}
273     @param cbs: Runtime callbacks
274
275     """
276     if not isinstance(op, opcodes.OpCode):
277       raise errors.ProgrammerError("Non-opcode instance passed"
278                                    " to ExecOpcode")
279
280     self._cbs = cbs
281     try:
282       lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
283       if lu_class is None:
284         raise errors.OpCodeUnknown("Unknown opcode")
285
286       # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
287       # shared fashion otherwise (to prevent concurrent run with an exclusive
288       # LU.
289       self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
290                         not lu_class.REQ_BGL, False)
291       try:
292         self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
293                                  shared=not lu_class.REQ_BGL)
294       finally:
295         self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
296                           not lu_class.REQ_BGL, True)
297       try:
298         self.exclusive_BGL = lu_class.REQ_BGL
299         lu = lu_class(self, op, self.context, self.rpc)
300         lu.ExpandNames()
301         assert lu.needed_locks is not None, "needed_locks not set by LU"
302         result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
303       finally:
304         self.context.glm.release(locking.LEVEL_CLUSTER)
305         self.exclusive_BGL = False
306     finally:
307       self._cbs = None
308
309     return result
310
311   def _Feedback(self, *args):
312     """Forward call to feedback callback function.
313
314     """
315     if self._cbs:
316       self._cbs.Feedback(*args)
317
318   def LogStep(self, current, total, message):
319     """Log a change in LU execution progress.
320
321     """
322     logging.debug("Step %d/%d %s", current, total, message)
323     self._Feedback("STEP %d/%d %s" % (current, total, message))
324
325   def LogWarning(self, message, *args, **kwargs):
326     """Log a warning to the logs and the user.
327
328     The optional keyword argument is 'hint' and can be used to show a
329     hint to the user (presumably related to the warning). If the
330     message is empty, it will not be printed at all, allowing one to
331     show only a hint.
332
333     """
334     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
335            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
336     if args:
337       message = message % tuple(args)
338     if message:
339       logging.warning(message)
340       self._Feedback(" - WARNING: %s" % message)
341     if "hint" in kwargs:
342       self._Feedback("      Hint: %s" % kwargs["hint"])
343
344   def LogInfo(self, message, *args):
345     """Log an informational message to the logs and the user.
346
347     """
348     if args:
349       message = message % tuple(args)
350     logging.info(message)
351     self._Feedback(" - INFO: %s" % message)
352
353
354 class HooksMaster(object):
355   """Hooks master.
356
357   This class distributes the run commands to the nodes based on the
358   specific LU class.
359
360   In order to remove the direct dependency on the rpc module, the
361   constructor needs a function which actually does the remote
362   call. This will usually be rpc.call_hooks_runner, but any function
363   which behaves the same works.
364
365   """
366   def __init__(self, callfn, lu):
367     self.callfn = callfn
368     self.lu = lu
369     self.op = lu.op
370     self.env, node_list_pre, node_list_post = self._BuildEnv()
371     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
372                       constants.HOOKS_PHASE_POST: node_list_post}
373
374   def _BuildEnv(self):
375     """Compute the environment and the target nodes.
376
377     Based on the opcode and the current node list, this builds the
378     environment for the hooks and the target node list for the run.
379
380     """
381     env = {
382       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
383       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
384       "GANETI_OP_CODE": self.op.OP_ID,
385       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
386       "GANETI_DATA_DIR": constants.DATA_DIR,
387       }
388
389     if self.lu.HPATH is not None:
390       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
391       if lu_env:
392         for key in lu_env:
393           env["GANETI_" + key] = lu_env[key]
394     else:
395       lu_nodes_pre = lu_nodes_post = []
396
397     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
398
399   def _RunWrapper(self, node_list, hpath, phase):
400     """Simple wrapper over self.callfn.
401
402     This method fixes the environment before doing the rpc call.
403
404     """
405     env = self.env.copy()
406     env["GANETI_HOOKS_PHASE"] = phase
407     env["GANETI_HOOKS_PATH"] = hpath
408     if self.lu.cfg is not None:
409       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
410       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
411
412     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
413
414     return self.callfn(node_list, hpath, phase, env)
415
416   def RunPhase(self, phase, nodes=None):
417     """Run all the scripts for a phase.
418
419     This is the main function of the HookMaster.
420
421     @param phase: one of L{constants.HOOKS_PHASE_POST} or
422         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
423     @param nodes: overrides the predefined list of nodes for the given phase
424     @return: the processed results of the hooks multi-node rpc call
425     @raise errors.HooksFailure: on communication failure to the nodes
426     @raise errors.HooksAbort: on failure of one of the hooks
427
428     """
429     if not self.node_list[phase] and not nodes:
430       # empty node list, we should not attempt to run this as either
431       # we're in the cluster init phase and the rpc client part can't
432       # even attempt to run, or this LU doesn't do hooks at all
433       return
434     hpath = self.lu.HPATH
435     if nodes is not None:
436       results = self._RunWrapper(nodes, hpath, phase)
437     else:
438       results = self._RunWrapper(self.node_list[phase], hpath, phase)
439     errs = []
440     if not results:
441       msg = "Communication Failure"
442       if phase == constants.HOOKS_PHASE_PRE:
443         raise errors.HooksFailure(msg)
444       else:
445         self.lu.LogWarning(msg)
446         return results
447     for node_name in results:
448       res = results[node_name]
449       if res.offline:
450         continue
451       msg = res.fail_msg
452       if msg:
453         self.lu.LogWarning("Communication failure to node %s: %s",
454                            node_name, msg)
455         continue
456       for script, hkr, output in res.payload:
457         if hkr == constants.HKR_FAIL:
458           if phase == constants.HOOKS_PHASE_PRE:
459             errs.append((node_name, script, output))
460           else:
461             if not output:
462               output = "(no output)"
463             self.lu.LogWarning("On %s script %s failed, output: %s" %
464                                (node_name, script, output))
465     if errs and phase == constants.HOOKS_PHASE_PRE:
466       raise errors.HooksAbort(errs)
467     return results
468
469   def RunConfigUpdate(self):
470     """Run the special configuration update hook
471
472     This is a special hook that runs only on the master after each
473     top-level LI if the configuration has been updated.
474
475     """
476     phase = constants.HOOKS_PHASE_POST
477     hpath = constants.HOOKS_NAME_CFGUPDATE
478     nodes = [self.lu.cfg.GetMasterNode()]
479     self._RunWrapper(nodes, hpath, phase)