opcodes: document OP_DSC_FIELD in OpCode and OpCode.Summary()
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42
43
44 class LockAcquireTimeout(Exception):
45   """Exception to report timeouts on acquiring locks.
46
47   """
48
49
50 def _CalculateLockAttemptTimeouts():
51   """Calculate timeouts for lock attempts.
52
53   """
54   result = [1.0]
55
56   # Wait for a total of at least 150s before doing a blocking acquire
57   while sum(result) < 150.0:
58     timeout = (result[-1] * 1.05) ** 1.25
59
60     # Cap timeout at 10 seconds. This gives other jobs a chance to run
61     # even if we're still trying to get our locks, before finally moving
62     # to a blocking acquire.
63     if timeout > 10.0:
64       timeout = 10.0
65
66     elif timeout < 0.1:
67       # Lower boundary for safety
68       timeout = 0.1
69
70     result.append(timeout)
71
72   return result
73
74
75 class LockAttemptTimeoutStrategy(object):
76   """Class with lock acquire timeout strategy.
77
78   """
79   __slots__ = [
80     "_timeouts",
81     "_random_fn",
82     "_time_fn",
83     ]
84
85   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
86
87   def __init__(self, _time_fn=time.time, _random_fn=random.random):
88     """Initializes this class.
89
90     @param _time_fn: Time function for unittests
91     @param _random_fn: Random number generator for unittests
92
93     """
94     object.__init__(self)
95
96     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
97     self._time_fn = _time_fn
98     self._random_fn = _random_fn
99
100   def NextAttempt(self):
101     """Returns the timeout for the next attempt.
102
103     """
104     try:
105       timeout = self._timeouts.next()
106     except StopIteration:
107       # No more timeouts, do blocking acquire
108       timeout = None
109
110     if timeout is not None:
111       # Add a small variation (-/+ 5%) to timeout. This helps in situations
112       # where two or more jobs are fighting for the same lock(s).
113       variation_range = timeout * 0.1
114       timeout += ((self._random_fn() * variation_range) -
115                   (variation_range * 0.5))
116
117     return timeout
118
119
120 class OpExecCbBase: # pylint: disable-msg=W0232
121   """Base class for OpCode execution callbacks.
122
123   """
124   def NotifyStart(self):
125     """Called when we are about to execute the LU.
126
127     This function is called when we're about to start the lu's Exec() method,
128     that is, after we have acquired all locks.
129
130     """
131
132   def Feedback(self, *args):
133     """Sends feedback from the LU code to the end-user.
134
135     """
136
137   def CheckCancel(self):
138     """Check whether job has been cancelled.
139
140     """
141
142
143 class Processor(object):
144   """Object which runs OpCodes"""
145   DISPATCH_TABLE = {
146     # Cluster
147     opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
148     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
149     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
150     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
151     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
152     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
153     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
154     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
155     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
156     opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
157     opcodes.OpQuery: cmdlib.LUQuery,
158     opcodes.OpQueryFields: cmdlib.LUQueryFields,
159     # node lu
160     opcodes.OpAddNode: cmdlib.LUAddNode,
161     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
162     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
163     opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
164     opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
165     opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
166     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
167     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
168     opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
169     opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
170     opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy,
171     # instance lu
172     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
173     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
174     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
175     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
176     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
177     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
178     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
179     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
180     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
181     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
182     opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
183     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
184     opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
185     opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
186     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
187     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
188     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
189     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
190     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
191     # node group lu
192     opcodes.OpQueryGroups: cmdlib.LUQueryGroups,
193     # os lu
194     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
195     # exports lu
196     opcodes.OpQueryExports: cmdlib.LUQueryExports,
197     opcodes.OpPrepareExport: cmdlib.LUPrepareExport,
198     opcodes.OpExportInstance: cmdlib.LUExportInstance,
199     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
200     # tags lu
201     opcodes.OpGetTags: cmdlib.LUGetTags,
202     opcodes.OpSearchTags: cmdlib.LUSearchTags,
203     opcodes.OpAddTags: cmdlib.LUAddTags,
204     opcodes.OpDelTags: cmdlib.LUDelTags,
205     # test lu
206     opcodes.OpTestDelay: cmdlib.LUTestDelay,
207     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
208     opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue,
209     }
210
211   def __init__(self, context, ec_id):
212     """Constructor for Processor
213
214     @type context: GanetiContext
215     @param context: global Ganeti context
216     @type ec_id: string
217     @param ec_id: execution context identifier
218
219     """
220     self.context = context
221     self._ec_id = ec_id
222     self._cbs = None
223     self.rpc = rpc.RpcRunner(context.cfg)
224     self.hmclass = HooksMaster
225
226   def _AcquireLocks(self, level, names, shared, timeout, priority):
227     """Acquires locks via the Ganeti lock manager.
228
229     @type level: int
230     @param level: Lock level
231     @type names: list or string
232     @param names: Lock names
233     @type shared: bool
234     @param shared: Whether the locks should be acquired in shared mode
235     @type timeout: None or float
236     @param timeout: Timeout for acquiring the locks
237     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
238         amount of time
239
240     """
241     if self._cbs:
242       self._cbs.CheckCancel()
243
244     acquired = self.context.glm.acquire(level, names, shared=shared,
245                                         timeout=timeout, priority=priority)
246
247     if acquired is None:
248       raise LockAcquireTimeout()
249
250     return acquired
251
252   def _ExecLU(self, lu):
253     """Logical Unit execution sequence.
254
255     """
256     write_count = self.context.cfg.write_count
257     lu.CheckPrereq()
258     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
259     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
260     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
261                      self.Log, None)
262
263     if getattr(lu.op, "dry_run", False):
264       # in this mode, no post-hooks are run, and the config is not
265       # written (as it might have been modified by another LU, and we
266       # shouldn't do writeout on behalf of other threads
267       self.LogInfo("dry-run mode requested, not actually executing"
268                    " the operation")
269       return lu.dry_run_result
270
271     try:
272       result = lu.Exec(self.Log)
273       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
274       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
275                                 self.Log, result)
276     finally:
277       # FIXME: This needs locks if not lu_class.REQ_BGL
278       if write_count != self.context.cfg.write_count:
279         hm.RunConfigUpdate()
280
281     return result
282
283   def _LockAndExecLU(self, lu, level, calc_timeout, priority):
284     """Execute a Logical Unit, with the needed locks.
285
286     This is a recursive function that starts locking the given level, and
287     proceeds up, till there are no more locks to acquire. Then it executes the
288     given LU and its opcodes.
289
290     """
291     adding_locks = level in lu.add_locks
292     acquiring_locks = level in lu.needed_locks
293     if level not in locking.LEVELS:
294       if self._cbs:
295         self._cbs.NotifyStart()
296
297       result = self._ExecLU(lu)
298
299     elif adding_locks and acquiring_locks:
300       # We could both acquire and add locks at the same level, but for now we
301       # don't need this, so we'll avoid the complicated code needed.
302       raise NotImplementedError("Can't declare locks to acquire when adding"
303                                 " others")
304
305     elif adding_locks or acquiring_locks:
306       lu.DeclareLocks(level)
307       share = lu.share_locks[level]
308
309       try:
310         assert adding_locks ^ acquiring_locks, \
311           "Locks must be either added or acquired"
312
313         if acquiring_locks:
314           # Acquiring locks
315           needed_locks = lu.needed_locks[level]
316
317           acquired = self._AcquireLocks(level, needed_locks, share,
318                                         calc_timeout(), priority)
319         else:
320           # Adding locks
321           add_locks = lu.add_locks[level]
322           lu.remove_locks[level] = add_locks
323
324           try:
325             self.context.glm.add(level, add_locks, acquired=1, shared=share)
326           except errors.LockError:
327             raise errors.OpPrereqError(
328               "Couldn't add locks (%s), probably because of a race condition"
329               " with another job, who added them first" % add_locks,
330               errors.ECODE_FAULT)
331
332           acquired = add_locks
333
334         try:
335           lu.acquired_locks[level] = acquired
336
337           result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
338         finally:
339           if level in lu.remove_locks:
340             self.context.glm.remove(level, lu.remove_locks[level])
341       finally:
342         if self.context.glm.is_owned(level):
343           self.context.glm.release(level)
344
345     else:
346       result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
347
348     return result
349
350   def ExecOpCode(self, op, cbs, timeout=None, priority=None):
351     """Execute an opcode.
352
353     @type op: an OpCode instance
354     @param op: the opcode to be executed
355     @type cbs: L{OpExecCbBase}
356     @param cbs: Runtime callbacks
357     @type timeout: float or None
358     @param timeout: Maximum time to acquire all locks, None for no timeout
359     @type priority: number or None
360     @param priority: Priority for acquiring lock(s)
361     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
362         amount of time
363
364     """
365     if not isinstance(op, opcodes.OpCode):
366       raise errors.ProgrammerError("Non-opcode instance passed"
367                                    " to ExecOpcode")
368
369     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
370     if lu_class is None:
371       raise errors.OpCodeUnknown("Unknown opcode")
372
373     if timeout is None:
374       calc_timeout = lambda: None
375     else:
376       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
377
378     self._cbs = cbs
379     try:
380       # Acquire the Big Ganeti Lock exclusively if this LU requires it,
381       # and in a shared fashion otherwise (to prevent concurrent run with
382       # an exclusive LU.
383       self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
384                           not lu_class.REQ_BGL, calc_timeout(),
385                           priority)
386       try:
387         lu = lu_class(self, op, self.context, self.rpc)
388         lu.ExpandNames()
389         assert lu.needed_locks is not None, "needed_locks not set by LU"
390
391         try:
392           return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
393                                      priority)
394         finally:
395           if self._ec_id:
396             self.context.cfg.DropECReservations(self._ec_id)
397       finally:
398         self.context.glm.release(locking.LEVEL_CLUSTER)
399     finally:
400       self._cbs = None
401
402   def Log(self, *args):
403     """Forward call to feedback callback function.
404
405     """
406     if self._cbs:
407       self._cbs.Feedback(*args)
408
409   def LogStep(self, current, total, message):
410     """Log a change in LU execution progress.
411
412     """
413     logging.debug("Step %d/%d %s", current, total, message)
414     self.Log("STEP %d/%d %s" % (current, total, message))
415
416   def LogWarning(self, message, *args, **kwargs):
417     """Log a warning to the logs and the user.
418
419     The optional keyword argument is 'hint' and can be used to show a
420     hint to the user (presumably related to the warning). If the
421     message is empty, it will not be printed at all, allowing one to
422     show only a hint.
423
424     """
425     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
426            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
427     if args:
428       message = message % tuple(args)
429     if message:
430       logging.warning(message)
431       self.Log(" - WARNING: %s" % message)
432     if "hint" in kwargs:
433       self.Log("      Hint: %s" % kwargs["hint"])
434
435   def LogInfo(self, message, *args):
436     """Log an informational message to the logs and the user.
437
438     """
439     if args:
440       message = message % tuple(args)
441     logging.info(message)
442     self.Log(" - INFO: %s" % message)
443
444   def GetECId(self):
445     if not self._ec_id:
446       errors.ProgrammerError("Tried to use execution context id when not set")
447     return self._ec_id
448
449
450 class HooksMaster(object):
451   """Hooks master.
452
453   This class distributes the run commands to the nodes based on the
454   specific LU class.
455
456   In order to remove the direct dependency on the rpc module, the
457   constructor needs a function which actually does the remote
458   call. This will usually be rpc.call_hooks_runner, but any function
459   which behaves the same works.
460
461   """
462   def __init__(self, callfn, lu):
463     self.callfn = callfn
464     self.lu = lu
465     self.op = lu.op
466     self.env, node_list_pre, node_list_post = self._BuildEnv()
467     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
468                       constants.HOOKS_PHASE_POST: node_list_post}
469
470   def _BuildEnv(self):
471     """Compute the environment and the target nodes.
472
473     Based on the opcode and the current node list, this builds the
474     environment for the hooks and the target node list for the run.
475
476     """
477     env = {
478       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
479       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
480       "GANETI_OP_CODE": self.op.OP_ID,
481       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
482       "GANETI_DATA_DIR": constants.DATA_DIR,
483       }
484
485     if self.lu.HPATH is not None:
486       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
487       if lu_env:
488         for key in lu_env:
489           env["GANETI_" + key] = lu_env[key]
490     else:
491       lu_nodes_pre = lu_nodes_post = []
492
493     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
494
495   def _RunWrapper(self, node_list, hpath, phase):
496     """Simple wrapper over self.callfn.
497
498     This method fixes the environment before doing the rpc call.
499
500     """
501     env = self.env.copy()
502     env["GANETI_HOOKS_PHASE"] = phase
503     env["GANETI_HOOKS_PATH"] = hpath
504     if self.lu.cfg is not None:
505       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
506       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
507
508     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
509
510     return self.callfn(node_list, hpath, phase, env)
511
512   def RunPhase(self, phase, nodes=None):
513     """Run all the scripts for a phase.
514
515     This is the main function of the HookMaster.
516
517     @param phase: one of L{constants.HOOKS_PHASE_POST} or
518         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
519     @param nodes: overrides the predefined list of nodes for the given phase
520     @return: the processed results of the hooks multi-node rpc call
521     @raise errors.HooksFailure: on communication failure to the nodes
522     @raise errors.HooksAbort: on failure of one of the hooks
523
524     """
525     if not self.node_list[phase] and not nodes:
526       # empty node list, we should not attempt to run this as either
527       # we're in the cluster init phase and the rpc client part can't
528       # even attempt to run, or this LU doesn't do hooks at all
529       return
530     hpath = self.lu.HPATH
531     if nodes is not None:
532       results = self._RunWrapper(nodes, hpath, phase)
533     else:
534       results = self._RunWrapper(self.node_list[phase], hpath, phase)
535     errs = []
536     if not results:
537       msg = "Communication Failure"
538       if phase == constants.HOOKS_PHASE_PRE:
539         raise errors.HooksFailure(msg)
540       else:
541         self.lu.LogWarning(msg)
542         return results
543     for node_name in results:
544       res = results[node_name]
545       if res.offline:
546         continue
547       msg = res.fail_msg
548       if msg:
549         self.lu.LogWarning("Communication failure to node %s: %s",
550                            node_name, msg)
551         continue
552       for script, hkr, output in res.payload:
553         if hkr == constants.HKR_FAIL:
554           if phase == constants.HOOKS_PHASE_PRE:
555             errs.append((node_name, script, output))
556           else:
557             if not output:
558               output = "(no output)"
559             self.lu.LogWarning("On %s script %s failed, output: %s" %
560                                (node_name, script, output))
561     if errs and phase == constants.HOOKS_PHASE_PRE:
562       raise errors.HooksAbort(errs)
563     return results
564
565   def RunConfigUpdate(self):
566     """Run the special configuration update hook
567
568     This is a special hook that runs only on the master after each
569     top-level LI if the configuration has been updated.
570
571     """
572     phase = constants.HOOKS_PHASE_POST
573     hpath = constants.HOOKS_NAME_CFGUPDATE
574     nodes = [self.lu.cfg.GetMasterNode()]
575     self._RunWrapper(nodes, hpath, phase)