Adding new OpCode for OOB
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41 from ganeti import utils
42
43
44 class LockAcquireTimeout(Exception):
45   """Exception to report timeouts on acquiring locks.
46
47   """
48
49
50 def _CalculateLockAttemptTimeouts():
51   """Calculate timeouts for lock attempts.
52
53   """
54   result = [1.0]
55
56   # Wait for a total of at least 150s before doing a blocking acquire
57   while sum(result) < 150.0:
58     timeout = (result[-1] * 1.05) ** 1.25
59
60     # Cap timeout at 10 seconds. This gives other jobs a chance to run
61     # even if we're still trying to get our locks, before finally moving
62     # to a blocking acquire.
63     if timeout > 10.0:
64       timeout = 10.0
65
66     elif timeout < 0.1:
67       # Lower boundary for safety
68       timeout = 0.1
69
70     result.append(timeout)
71
72   return result
73
74
75 class LockAttemptTimeoutStrategy(object):
76   """Class with lock acquire timeout strategy.
77
78   """
79   __slots__ = [
80     "_timeouts",
81     "_random_fn",
82     "_time_fn",
83     ]
84
85   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
86
87   def __init__(self, _time_fn=time.time, _random_fn=random.random):
88     """Initializes this class.
89
90     @param _time_fn: Time function for unittests
91     @param _random_fn: Random number generator for unittests
92
93     """
94     object.__init__(self)
95
96     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
97     self._time_fn = _time_fn
98     self._random_fn = _random_fn
99
100   def NextAttempt(self):
101     """Returns the timeout for the next attempt.
102
103     """
104     try:
105       timeout = self._timeouts.next()
106     except StopIteration:
107       # No more timeouts, do blocking acquire
108       timeout = None
109
110     if timeout is not None:
111       # Add a small variation (-/+ 5%) to timeout. This helps in situations
112       # where two or more jobs are fighting for the same lock(s).
113       variation_range = timeout * 0.1
114       timeout += ((self._random_fn() * variation_range) -
115                   (variation_range * 0.5))
116
117     return timeout
118
119
120 class OpExecCbBase: # pylint: disable-msg=W0232
121   """Base class for OpCode execution callbacks.
122
123   """
124   def NotifyStart(self):
125     """Called when we are about to execute the LU.
126
127     This function is called when we're about to start the lu's Exec() method,
128     that is, after we have acquired all locks.
129
130     """
131
132   def Feedback(self, *args):
133     """Sends feedback from the LU code to the end-user.
134
135     """
136
137   def CheckCancel(self):
138     """Check whether job has been cancelled.
139
140     """
141
142
143 class Processor(object):
144   """Object which runs OpCodes"""
145   DISPATCH_TABLE = {
146     # Cluster
147     opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
148     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
149     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
150     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
151     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
152     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
153     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
154     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
155     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
156     opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
157     opcodes.OpQuery: cmdlib.LUQuery,
158     opcodes.OpQueryFields: cmdlib.LUQueryFields,
159     # node lu
160     opcodes.OpAddNode: cmdlib.LUAddNode,
161     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
162     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
163     opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
164     opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
165     opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
166     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
167     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
168     opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
169     opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
170     opcodes.OpNodeEvacuationStrategy: cmdlib.LUNodeEvacuationStrategy,
171     # instance lu
172     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
173     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
174     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
175     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
176     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
177     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
178     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
179     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
180     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
181     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
182     opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
183     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
184     opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
185     opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
186     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
187     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
188     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
189     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
190     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
191     # node group lu
192     opcodes.OpQueryGroups: cmdlib.LUQueryGroups,
193     # os lu
194     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
195     # exports lu
196     opcodes.OpQueryExports: cmdlib.LUQueryExports,
197     opcodes.OpPrepareExport: cmdlib.LUPrepareExport,
198     opcodes.OpExportInstance: cmdlib.LUExportInstance,
199     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
200     # tags lu
201     opcodes.OpGetTags: cmdlib.LUGetTags,
202     opcodes.OpSearchTags: cmdlib.LUSearchTags,
203     opcodes.OpAddTags: cmdlib.LUAddTags,
204     opcodes.OpDelTags: cmdlib.LUDelTags,
205     # test lu
206     opcodes.OpTestDelay: cmdlib.LUTestDelay,
207     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
208     opcodes.OpTestJobqueue: cmdlib.LUTestJobqueue,
209     # OOB lu
210     opcodes.OpOutOfBand: cmdlib.LUOutOfBand,
211     }
212
213   def __init__(self, context, ec_id):
214     """Constructor for Processor
215
216     @type context: GanetiContext
217     @param context: global Ganeti context
218     @type ec_id: string
219     @param ec_id: execution context identifier
220
221     """
222     self.context = context
223     self._ec_id = ec_id
224     self._cbs = None
225     self.rpc = rpc.RpcRunner(context.cfg)
226     self.hmclass = HooksMaster
227
228   def _AcquireLocks(self, level, names, shared, timeout, priority):
229     """Acquires locks via the Ganeti lock manager.
230
231     @type level: int
232     @param level: Lock level
233     @type names: list or string
234     @param names: Lock names
235     @type shared: bool
236     @param shared: Whether the locks should be acquired in shared mode
237     @type timeout: None or float
238     @param timeout: Timeout for acquiring the locks
239     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
240         amount of time
241
242     """
243     if self._cbs:
244       self._cbs.CheckCancel()
245
246     acquired = self.context.glm.acquire(level, names, shared=shared,
247                                         timeout=timeout, priority=priority)
248
249     if acquired is None:
250       raise LockAcquireTimeout()
251
252     return acquired
253
254   def _ExecLU(self, lu):
255     """Logical Unit execution sequence.
256
257     """
258     write_count = self.context.cfg.write_count
259     lu.CheckPrereq()
260     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
261     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
262     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
263                      self.Log, None)
264
265     if getattr(lu.op, "dry_run", False):
266       # in this mode, no post-hooks are run, and the config is not
267       # written (as it might have been modified by another LU, and we
268       # shouldn't do writeout on behalf of other threads
269       self.LogInfo("dry-run mode requested, not actually executing"
270                    " the operation")
271       return lu.dry_run_result
272
273     try:
274       result = lu.Exec(self.Log)
275       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
276       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
277                                 self.Log, result)
278     finally:
279       # FIXME: This needs locks if not lu_class.REQ_BGL
280       if write_count != self.context.cfg.write_count:
281         hm.RunConfigUpdate()
282
283     return result
284
285   def _LockAndExecLU(self, lu, level, calc_timeout, priority):
286     """Execute a Logical Unit, with the needed locks.
287
288     This is a recursive function that starts locking the given level, and
289     proceeds up, till there are no more locks to acquire. Then it executes the
290     given LU and its opcodes.
291
292     """
293     adding_locks = level in lu.add_locks
294     acquiring_locks = level in lu.needed_locks
295     if level not in locking.LEVELS:
296       if self._cbs:
297         self._cbs.NotifyStart()
298
299       result = self._ExecLU(lu)
300
301     elif adding_locks and acquiring_locks:
302       # We could both acquire and add locks at the same level, but for now we
303       # don't need this, so we'll avoid the complicated code needed.
304       raise NotImplementedError("Can't declare locks to acquire when adding"
305                                 " others")
306
307     elif adding_locks or acquiring_locks:
308       lu.DeclareLocks(level)
309       share = lu.share_locks[level]
310
311       try:
312         assert adding_locks ^ acquiring_locks, \
313           "Locks must be either added or acquired"
314
315         if acquiring_locks:
316           # Acquiring locks
317           needed_locks = lu.needed_locks[level]
318
319           acquired = self._AcquireLocks(level, needed_locks, share,
320                                         calc_timeout(), priority)
321         else:
322           # Adding locks
323           add_locks = lu.add_locks[level]
324           lu.remove_locks[level] = add_locks
325
326           try:
327             self.context.glm.add(level, add_locks, acquired=1, shared=share)
328           except errors.LockError:
329             raise errors.OpPrereqError(
330               "Couldn't add locks (%s), probably because of a race condition"
331               " with another job, who added them first" % add_locks,
332               errors.ECODE_FAULT)
333
334           acquired = add_locks
335
336         try:
337           lu.acquired_locks[level] = acquired
338
339           result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
340         finally:
341           if level in lu.remove_locks:
342             self.context.glm.remove(level, lu.remove_locks[level])
343       finally:
344         if self.context.glm.is_owned(level):
345           self.context.glm.release(level)
346
347     else:
348       result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
349
350     return result
351
352   def ExecOpCode(self, op, cbs, timeout=None, priority=None):
353     """Execute an opcode.
354
355     @type op: an OpCode instance
356     @param op: the opcode to be executed
357     @type cbs: L{OpExecCbBase}
358     @param cbs: Runtime callbacks
359     @type timeout: float or None
360     @param timeout: Maximum time to acquire all locks, None for no timeout
361     @type priority: number or None
362     @param priority: Priority for acquiring lock(s)
363     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
364         amount of time
365
366     """
367     if not isinstance(op, opcodes.OpCode):
368       raise errors.ProgrammerError("Non-opcode instance passed"
369                                    " to ExecOpcode")
370
371     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
372     if lu_class is None:
373       raise errors.OpCodeUnknown("Unknown opcode")
374
375     if timeout is None:
376       calc_timeout = lambda: None
377     else:
378       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
379
380     self._cbs = cbs
381     try:
382       # Acquire the Big Ganeti Lock exclusively if this LU requires it,
383       # and in a shared fashion otherwise (to prevent concurrent run with
384       # an exclusive LU.
385       self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
386                           not lu_class.REQ_BGL, calc_timeout(),
387                           priority)
388       try:
389         lu = lu_class(self, op, self.context, self.rpc)
390         lu.ExpandNames()
391         assert lu.needed_locks is not None, "needed_locks not set by LU"
392
393         try:
394           return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
395                                      priority)
396         finally:
397           if self._ec_id:
398             self.context.cfg.DropECReservations(self._ec_id)
399       finally:
400         self.context.glm.release(locking.LEVEL_CLUSTER)
401     finally:
402       self._cbs = None
403
404   def Log(self, *args):
405     """Forward call to feedback callback function.
406
407     """
408     if self._cbs:
409       self._cbs.Feedback(*args)
410
411   def LogStep(self, current, total, message):
412     """Log a change in LU execution progress.
413
414     """
415     logging.debug("Step %d/%d %s", current, total, message)
416     self.Log("STEP %d/%d %s" % (current, total, message))
417
418   def LogWarning(self, message, *args, **kwargs):
419     """Log a warning to the logs and the user.
420
421     The optional keyword argument is 'hint' and can be used to show a
422     hint to the user (presumably related to the warning). If the
423     message is empty, it will not be printed at all, allowing one to
424     show only a hint.
425
426     """
427     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
428            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
429     if args:
430       message = message % tuple(args)
431     if message:
432       logging.warning(message)
433       self.Log(" - WARNING: %s" % message)
434     if "hint" in kwargs:
435       self.Log("      Hint: %s" % kwargs["hint"])
436
437   def LogInfo(self, message, *args):
438     """Log an informational message to the logs and the user.
439
440     """
441     if args:
442       message = message % tuple(args)
443     logging.info(message)
444     self.Log(" - INFO: %s" % message)
445
446   def GetECId(self):
447     if not self._ec_id:
448       errors.ProgrammerError("Tried to use execution context id when not set")
449     return self._ec_id
450
451
452 class HooksMaster(object):
453   """Hooks master.
454
455   This class distributes the run commands to the nodes based on the
456   specific LU class.
457
458   In order to remove the direct dependency on the rpc module, the
459   constructor needs a function which actually does the remote
460   call. This will usually be rpc.call_hooks_runner, but any function
461   which behaves the same works.
462
463   """
464   def __init__(self, callfn, lu):
465     self.callfn = callfn
466     self.lu = lu
467     self.op = lu.op
468     self.env, node_list_pre, node_list_post = self._BuildEnv()
469     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
470                       constants.HOOKS_PHASE_POST: node_list_post}
471
472   def _BuildEnv(self):
473     """Compute the environment and the target nodes.
474
475     Based on the opcode and the current node list, this builds the
476     environment for the hooks and the target node list for the run.
477
478     """
479     env = {
480       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
481       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
482       "GANETI_OP_CODE": self.op.OP_ID,
483       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
484       "GANETI_DATA_DIR": constants.DATA_DIR,
485       }
486
487     if self.lu.HPATH is not None:
488       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
489       if lu_env:
490         for key in lu_env:
491           env["GANETI_" + key] = lu_env[key]
492     else:
493       lu_nodes_pre = lu_nodes_post = []
494
495     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
496
497   def _RunWrapper(self, node_list, hpath, phase):
498     """Simple wrapper over self.callfn.
499
500     This method fixes the environment before doing the rpc call.
501
502     """
503     env = self.env.copy()
504     env["GANETI_HOOKS_PHASE"] = phase
505     env["GANETI_HOOKS_PATH"] = hpath
506     if self.lu.cfg is not None:
507       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
508       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
509
510     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
511
512     return self.callfn(node_list, hpath, phase, env)
513
514   def RunPhase(self, phase, nodes=None):
515     """Run all the scripts for a phase.
516
517     This is the main function of the HookMaster.
518
519     @param phase: one of L{constants.HOOKS_PHASE_POST} or
520         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
521     @param nodes: overrides the predefined list of nodes for the given phase
522     @return: the processed results of the hooks multi-node rpc call
523     @raise errors.HooksFailure: on communication failure to the nodes
524     @raise errors.HooksAbort: on failure of one of the hooks
525
526     """
527     if not self.node_list[phase] and not nodes:
528       # empty node list, we should not attempt to run this as either
529       # we're in the cluster init phase and the rpc client part can't
530       # even attempt to run, or this LU doesn't do hooks at all
531       return
532     hpath = self.lu.HPATH
533     if nodes is not None:
534       results = self._RunWrapper(nodes, hpath, phase)
535     else:
536       results = self._RunWrapper(self.node_list[phase], hpath, phase)
537     errs = []
538     if not results:
539       msg = "Communication Failure"
540       if phase == constants.HOOKS_PHASE_PRE:
541         raise errors.HooksFailure(msg)
542       else:
543         self.lu.LogWarning(msg)
544         return results
545     for node_name in results:
546       res = results[node_name]
547       if res.offline:
548         continue
549       msg = res.fail_msg
550       if msg:
551         self.lu.LogWarning("Communication failure to node %s: %s",
552                            node_name, msg)
553         continue
554       for script, hkr, output in res.payload:
555         if hkr == constants.HKR_FAIL:
556           if phase == constants.HOOKS_PHASE_PRE:
557             errs.append((node_name, script, output))
558           else:
559             if not output:
560               output = "(no output)"
561             self.lu.LogWarning("On %s script %s failed, output: %s" %
562                                (node_name, script, output))
563     if errs and phase == constants.HOOKS_PHASE_PRE:
564       raise errors.HooksAbort(errs)
565     return results
566
567   def RunConfigUpdate(self):
568     """Run the special configuration update hook
569
570     This is a special hook that runs only on the master after each
571     top-level LI if the configuration has been updated.
572
573     """
574     phase = constants.HOOKS_PHASE_POST
575     hpath = constants.HOOKS_NAME_CFGUPDATE
576     nodes = [self.lu.cfg.GetMasterNode()]
577     self._RunWrapper(nodes, hpath, phase)