mcpu: Make sure added locks are released on errors
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import logging
32 import random
33 import time
34
35 from ganeti import opcodes
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import rpc
39 from ganeti import cmdlib
40 from ganeti import locking
41
42
43 class _LockAcquireTimeout(Exception):
44   """Internal exception to report timeouts on acquiring locks.
45
46   """
47
48
49 def _CalculateLockAttemptTimeouts():
50   """Calculate timeouts for lock attempts.
51
52   """
53   running_sum = 0
54   result = [1.0]
55
56   # Wait for a total of at least 150s before doing a blocking acquire
57   while sum(result) < 150.0:
58     timeout = (result[-1] * 1.05) ** 1.25
59
60     # Cap timeout at 10 seconds. This gives other jobs a chance to run
61     # even if we're still trying to get our locks, before finally moving
62     # to a blocking acquire.
63     if timeout > 10.0:
64       timeout = 10.0
65
66     elif timeout < 0.1:
67       # Lower boundary for safety
68       timeout = 0.1
69
70     result.append(timeout)
71
72   return result
73
74
75 class _LockAttemptTimeoutStrategy(object):
76   """Class with lock acquire timeout strategy.
77
78   """
79   __slots__ = [
80     "_attempt",
81     "_random_fn",
82     "_start_time",
83     "_time_fn",
84     ]
85
86   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
87
88   def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
89     """Initializes this class.
90
91     @type attempt: int
92     @param attempt: Current attempt number
93     @param _time_fn: Time function for unittests
94     @param _random_fn: Random number generator for unittests
95
96     """
97     object.__init__(self)
98
99     if attempt < 0:
100       raise ValueError("Attempt must be zero or positive")
101
102     self._attempt = attempt
103     self._time_fn = _time_fn
104     self._random_fn = _random_fn
105
106     self._start_time = None
107
108   def NextAttempt(self):
109     """Returns the strategy for the next attempt.
110
111     """
112     return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
113                                        _time_fn=self._time_fn,
114                                        _random_fn=self._random_fn)
115
116   def CalcRemainingTimeout(self):
117     """Returns the remaining timeout.
118
119     """
120     try:
121       timeout = self._TIMEOUT_PER_ATTEMPT[self._attempt]
122     except IndexError:
123       # No more timeouts, do blocking acquire
124       return None
125
126     # Get start time on first calculation
127     if self._start_time is None:
128       self._start_time = self._time_fn()
129
130     # Calculate remaining time for this attempt
131     remaining_timeout = self._start_time + timeout - self._time_fn()
132
133     # Add a small variation (-/+ 5%) to timeouts. This helps in situations
134     # where two or more jobs are fighting for the same lock(s).
135     variation_range = remaining_timeout * 0.1
136     remaining_timeout += ((self._random_fn() * variation_range) -
137                           (variation_range * 0.5))
138
139     assert remaining_timeout >= 0.0, "Timeout must be positive"
140
141     return remaining_timeout
142
143
144 class OpExecCbBase:
145   """Base class for OpCode execution callbacks.
146
147   """
148   def NotifyStart(self):
149     """Called when we are about to execute the LU.
150
151     This function is called when we're about to start the lu's Exec() method,
152     that is, after we have acquired all locks.
153
154     """
155
156   def Feedback(self, *args):
157     """Sends feedback from the LU code to the end-user.
158
159     """
160
161   def ReportLocks(self, msg):
162     """Report lock operations.
163
164     """
165
166
167 class Processor(object):
168   """Object which runs OpCodes"""
169   DISPATCH_TABLE = {
170     # Cluster
171     opcodes.OpPostInitCluster: cmdlib.LUPostInitCluster,
172     opcodes.OpDestroyCluster: cmdlib.LUDestroyCluster,
173     opcodes.OpQueryClusterInfo: cmdlib.LUQueryClusterInfo,
174     opcodes.OpVerifyCluster: cmdlib.LUVerifyCluster,
175     opcodes.OpQueryConfigValues: cmdlib.LUQueryConfigValues,
176     opcodes.OpRenameCluster: cmdlib.LURenameCluster,
177     opcodes.OpVerifyDisks: cmdlib.LUVerifyDisks,
178     opcodes.OpSetClusterParams: cmdlib.LUSetClusterParams,
179     opcodes.OpRedistributeConfig: cmdlib.LURedistributeConfig,
180     opcodes.OpRepairDiskSizes: cmdlib.LURepairDiskSizes,
181     # node lu
182     opcodes.OpAddNode: cmdlib.LUAddNode,
183     opcodes.OpQueryNodes: cmdlib.LUQueryNodes,
184     opcodes.OpQueryNodeVolumes: cmdlib.LUQueryNodeVolumes,
185     opcodes.OpQueryNodeStorage: cmdlib.LUQueryNodeStorage,
186     opcodes.OpModifyNodeStorage: cmdlib.LUModifyNodeStorage,
187     opcodes.OpRepairNodeStorage: cmdlib.LURepairNodeStorage,
188     opcodes.OpRemoveNode: cmdlib.LURemoveNode,
189     opcodes.OpSetNodeParams: cmdlib.LUSetNodeParams,
190     opcodes.OpPowercycleNode: cmdlib.LUPowercycleNode,
191     opcodes.OpEvacuateNode: cmdlib.LUEvacuateNode,
192     opcodes.OpMigrateNode: cmdlib.LUMigrateNode,
193     # instance lu
194     opcodes.OpCreateInstance: cmdlib.LUCreateInstance,
195     opcodes.OpReinstallInstance: cmdlib.LUReinstallInstance,
196     opcodes.OpRemoveInstance: cmdlib.LURemoveInstance,
197     opcodes.OpRenameInstance: cmdlib.LURenameInstance,
198     opcodes.OpActivateInstanceDisks: cmdlib.LUActivateInstanceDisks,
199     opcodes.OpShutdownInstance: cmdlib.LUShutdownInstance,
200     opcodes.OpStartupInstance: cmdlib.LUStartupInstance,
201     opcodes.OpRebootInstance: cmdlib.LURebootInstance,
202     opcodes.OpDeactivateInstanceDisks: cmdlib.LUDeactivateInstanceDisks,
203     opcodes.OpReplaceDisks: cmdlib.LUReplaceDisks,
204     opcodes.OpRecreateInstanceDisks: cmdlib.LURecreateInstanceDisks,
205     opcodes.OpFailoverInstance: cmdlib.LUFailoverInstance,
206     opcodes.OpMigrateInstance: cmdlib.LUMigrateInstance,
207     opcodes.OpMoveInstance: cmdlib.LUMoveInstance,
208     opcodes.OpConnectConsole: cmdlib.LUConnectConsole,
209     opcodes.OpQueryInstances: cmdlib.LUQueryInstances,
210     opcodes.OpQueryInstanceData: cmdlib.LUQueryInstanceData,
211     opcodes.OpSetInstanceParams: cmdlib.LUSetInstanceParams,
212     opcodes.OpGrowDisk: cmdlib.LUGrowDisk,
213     # os lu
214     opcodes.OpDiagnoseOS: cmdlib.LUDiagnoseOS,
215     # exports lu
216     opcodes.OpQueryExports: cmdlib.LUQueryExports,
217     opcodes.OpExportInstance: cmdlib.LUExportInstance,
218     opcodes.OpRemoveExport: cmdlib.LURemoveExport,
219     # tags lu
220     opcodes.OpGetTags: cmdlib.LUGetTags,
221     opcodes.OpSearchTags: cmdlib.LUSearchTags,
222     opcodes.OpAddTags: cmdlib.LUAddTags,
223     opcodes.OpDelTags: cmdlib.LUDelTags,
224     # test lu
225     opcodes.OpTestDelay: cmdlib.LUTestDelay,
226     opcodes.OpTestAllocator: cmdlib.LUTestAllocator,
227     }
228
229   def __init__(self, context):
230     """Constructor for Processor
231
232     """
233     self.context = context
234     self._cbs = None
235     self.rpc = rpc.RpcRunner(context.cfg)
236     self.hmclass = HooksMaster
237
238   def _ReportLocks(self, level, names, shared, timeout, acquired, result):
239     """Reports lock operations.
240
241     @type level: int
242     @param level: Lock level
243     @type names: list or string
244     @param names: Lock names
245     @type shared: bool
246     @param shared: Whether the locks should be acquired in shared mode
247     @type timeout: None or float
248     @param timeout: Timeout for acquiring the locks
249     @type acquired: bool
250     @param acquired: Whether the locks have already been acquired
251     @type result: None or set
252     @param result: Result from L{locking.GanetiLockManager.acquire}
253
254     """
255     parts = []
256
257     # Build message
258     if acquired:
259       if result is None:
260         parts.append("timeout")
261       else:
262         parts.append("acquired")
263     else:
264       parts.append("waiting")
265       if timeout is None:
266         parts.append("blocking")
267       else:
268         parts.append("timeout=%0.6fs" % timeout)
269
270     parts.append(locking.LEVEL_NAMES[level])
271
272     if names == locking.ALL_SET:
273       parts.append("ALL")
274     elif isinstance(names, basestring):
275       parts.append(names)
276     else:
277       parts.append(",".join(names))
278
279     if shared:
280       parts.append("shared")
281     else:
282       parts.append("exclusive")
283
284     msg = "/".join(parts)
285
286     logging.debug("LU locks %s", msg)
287
288     if self._cbs:
289       self._cbs.ReportLocks(msg)
290
291   def _AcquireLocks(self, level, names, shared, timeout):
292     """Acquires locks via the Ganeti lock manager.
293
294     @type level: int
295     @param level: Lock level
296     @type names: list or string
297     @param names: Lock names
298     @type shared: bool
299     @param shared: Whether the locks should be acquired in shared mode
300     @type timeout: None or float
301     @param timeout: Timeout for acquiring the locks
302
303     """
304     self._ReportLocks(level, names, shared, timeout, False, None)
305
306     acquired = self.context.glm.acquire(level, names, shared=shared,
307                                         timeout=timeout)
308
309     self._ReportLocks(level, names, shared, timeout, True, acquired)
310
311     return acquired
312
313   def _ExecLU(self, lu):
314     """Logical Unit execution sequence.
315
316     """
317     write_count = self.context.cfg.write_count
318     lu.CheckPrereq()
319     hm = HooksMaster(self.rpc.call_hooks_runner, lu)
320     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
321     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
322                      self._Feedback, None)
323
324     if getattr(lu.op, "dry_run", False):
325       # in this mode, no post-hooks are run, and the config is not
326       # written (as it might have been modified by another LU, and we
327       # shouldn't do writeout on behalf of other threads
328       self.LogInfo("dry-run mode requested, not actually executing"
329                    " the operation")
330       return lu.dry_run_result
331
332     try:
333       result = lu.Exec(self._Feedback)
334       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
335       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
336                                 self._Feedback, result)
337     finally:
338       # FIXME: This needs locks if not lu_class.REQ_BGL
339       if write_count != self.context.cfg.write_count:
340         hm.RunConfigUpdate()
341
342     return result
343
344   def _LockAndExecLU(self, lu, level, calc_timeout):
345     """Execute a Logical Unit, with the needed locks.
346
347     This is a recursive function that starts locking the given level, and
348     proceeds up, till there are no more locks to acquire. Then it executes the
349     given LU and its opcodes.
350
351     """
352     adding_locks = level in lu.add_locks
353     acquiring_locks = level in lu.needed_locks
354     if level not in locking.LEVELS:
355       if self._cbs:
356         self._cbs.NotifyStart()
357
358       result = self._ExecLU(lu)
359
360     elif adding_locks and acquiring_locks:
361       # We could both acquire and add locks at the same level, but for now we
362       # don't need this, so we'll avoid the complicated code needed.
363       raise NotImplementedError("Can't declare locks to acquire when adding"
364                                 " others")
365
366     elif adding_locks or acquiring_locks:
367       lu.DeclareLocks(level)
368       share = lu.share_locks[level]
369
370       try:
371         assert adding_locks ^ acquiring_locks, \
372           "Locks must be either added or acquired"
373
374         if acquiring_locks:
375           # Acquiring locks
376           needed_locks = lu.needed_locks[level]
377
378           acquired = self._AcquireLocks(level, needed_locks, share,
379                                         calc_timeout())
380
381           if acquired is None:
382             raise _LockAcquireTimeout()
383
384         else:
385           # Adding locks
386           add_locks = lu.add_locks[level]
387           lu.remove_locks[level] = add_locks
388
389           try:
390             self.context.glm.add(level, add_locks, acquired=1, shared=share)
391           except errors.LockError:
392             raise errors.OpPrereqError(
393               "Couldn't add locks (%s), probably because of a race condition"
394               " with another job, who added them first" % add_locks)
395
396           acquired = add_locks
397
398         try:
399           lu.acquired_locks[level] = acquired
400
401           result = self._LockAndExecLU(lu, level + 1, calc_timeout)
402         finally:
403           if level in lu.remove_locks:
404             self.context.glm.remove(level, lu.remove_locks[level])
405       finally:
406         if self.context.glm.is_owned(level):
407           self.context.glm.release(level)
408
409     else:
410       result = self._LockAndExecLU(lu, level + 1, calc_timeout)
411
412     return result
413
414   def ExecOpCode(self, op, cbs):
415     """Execute an opcode.
416
417     @type op: an OpCode instance
418     @param op: the opcode to be executed
419     @type cbs: L{OpExecCbBase}
420     @param cbs: Runtime callbacks
421
422     """
423     if not isinstance(op, opcodes.OpCode):
424       raise errors.ProgrammerError("Non-opcode instance passed"
425                                    " to ExecOpcode")
426
427     self._cbs = cbs
428     try:
429       lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
430       if lu_class is None:
431         raise errors.OpCodeUnknown("Unknown opcode")
432
433       timeout_strategy = _LockAttemptTimeoutStrategy()
434
435       while True:
436         try:
437           acquire_timeout = timeout_strategy.CalcRemainingTimeout()
438
439           # Acquire the Big Ganeti Lock exclusively if this LU requires it,
440           # and in a shared fashion otherwise (to prevent concurrent run with
441           # an exclusive LU.
442           if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
443                                 not lu_class.REQ_BGL, acquire_timeout) is None:
444             raise _LockAcquireTimeout()
445
446           try:
447             lu = lu_class(self, op, self.context, self.rpc)
448             lu.ExpandNames()
449             assert lu.needed_locks is not None, "needed_locks not set by LU"
450
451             return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
452                                        timeout_strategy.CalcRemainingTimeout)
453           finally:
454             self.context.glm.release(locking.LEVEL_CLUSTER)
455
456         except _LockAcquireTimeout:
457           # Timeout while waiting for lock, try again
458           pass
459
460         timeout_strategy = timeout_strategy.NextAttempt()
461
462     finally:
463       self._cbs = None
464
465   def _Feedback(self, *args):
466     """Forward call to feedback callback function.
467
468     """
469     if self._cbs:
470       self._cbs.Feedback(*args)
471
472   def LogStep(self, current, total, message):
473     """Log a change in LU execution progress.
474
475     """
476     logging.debug("Step %d/%d %s", current, total, message)
477     self._Feedback("STEP %d/%d %s" % (current, total, message))
478
479   def LogWarning(self, message, *args, **kwargs):
480     """Log a warning to the logs and the user.
481
482     The optional keyword argument is 'hint' and can be used to show a
483     hint to the user (presumably related to the warning). If the
484     message is empty, it will not be printed at all, allowing one to
485     show only a hint.
486
487     """
488     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
489            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
490     if args:
491       message = message % tuple(args)
492     if message:
493       logging.warning(message)
494       self._Feedback(" - WARNING: %s" % message)
495     if "hint" in kwargs:
496       self._Feedback("      Hint: %s" % kwargs["hint"])
497
498   def LogInfo(self, message, *args):
499     """Log an informational message to the logs and the user.
500
501     """
502     if args:
503       message = message % tuple(args)
504     logging.info(message)
505     self._Feedback(" - INFO: %s" % message)
506
507
508 class HooksMaster(object):
509   """Hooks master.
510
511   This class distributes the run commands to the nodes based on the
512   specific LU class.
513
514   In order to remove the direct dependency on the rpc module, the
515   constructor needs a function which actually does the remote
516   call. This will usually be rpc.call_hooks_runner, but any function
517   which behaves the same works.
518
519   """
520   def __init__(self, callfn, lu):
521     self.callfn = callfn
522     self.lu = lu
523     self.op = lu.op
524     self.env, node_list_pre, node_list_post = self._BuildEnv()
525     self.node_list = {constants.HOOKS_PHASE_PRE: node_list_pre,
526                       constants.HOOKS_PHASE_POST: node_list_post}
527
528   def _BuildEnv(self):
529     """Compute the environment and the target nodes.
530
531     Based on the opcode and the current node list, this builds the
532     environment for the hooks and the target node list for the run.
533
534     """
535     env = {
536       "PATH": "/sbin:/bin:/usr/sbin:/usr/bin",
537       "GANETI_HOOKS_VERSION": constants.HOOKS_VERSION,
538       "GANETI_OP_CODE": self.op.OP_ID,
539       "GANETI_OBJECT_TYPE": self.lu.HTYPE,
540       "GANETI_DATA_DIR": constants.DATA_DIR,
541       }
542
543     if self.lu.HPATH is not None:
544       lu_env, lu_nodes_pre, lu_nodes_post = self.lu.BuildHooksEnv()
545       if lu_env:
546         for key in lu_env:
547           env["GANETI_" + key] = lu_env[key]
548     else:
549       lu_nodes_pre = lu_nodes_post = []
550
551     return env, frozenset(lu_nodes_pre), frozenset(lu_nodes_post)
552
553   def _RunWrapper(self, node_list, hpath, phase):
554     """Simple wrapper over self.callfn.
555
556     This method fixes the environment before doing the rpc call.
557
558     """
559     env = self.env.copy()
560     env["GANETI_HOOKS_PHASE"] = phase
561     env["GANETI_HOOKS_PATH"] = hpath
562     if self.lu.cfg is not None:
563       env["GANETI_CLUSTER"] = self.lu.cfg.GetClusterName()
564       env["GANETI_MASTER"] = self.lu.cfg.GetMasterNode()
565
566     env = dict([(str(key), str(val)) for key, val in env.iteritems()])
567
568     return self.callfn(node_list, hpath, phase, env)
569
570   def RunPhase(self, phase, nodes=None):
571     """Run all the scripts for a phase.
572
573     This is the main function of the HookMaster.
574
575     @param phase: one of L{constants.HOOKS_PHASE_POST} or
576         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
577     @param nodes: overrides the predefined list of nodes for the given phase
578     @return: the processed results of the hooks multi-node rpc call
579     @raise errors.HooksFailure: on communication failure to the nodes
580     @raise errors.HooksAbort: on failure of one of the hooks
581
582     """
583     if not self.node_list[phase] and not nodes:
584       # empty node list, we should not attempt to run this as either
585       # we're in the cluster init phase and the rpc client part can't
586       # even attempt to run, or this LU doesn't do hooks at all
587       return
588     hpath = self.lu.HPATH
589     if nodes is not None:
590       results = self._RunWrapper(nodes, hpath, phase)
591     else:
592       results = self._RunWrapper(self.node_list[phase], hpath, phase)
593     errs = []
594     if not results:
595       msg = "Communication Failure"
596       if phase == constants.HOOKS_PHASE_PRE:
597         raise errors.HooksFailure(msg)
598       else:
599         self.lu.LogWarning(msg)
600         return results
601     for node_name in results:
602       res = results[node_name]
603       if res.offline:
604         continue
605       msg = res.fail_msg
606       if msg:
607         self.lu.LogWarning("Communication failure to node %s: %s",
608                            node_name, msg)
609         continue
610       for script, hkr, output in res.payload:
611         if hkr == constants.HKR_FAIL:
612           if phase == constants.HOOKS_PHASE_PRE:
613             errs.append((node_name, script, output))
614           else:
615             if not output:
616               output = "(no output)"
617             self.lu.LogWarning("On %s script %s failed, output: %s" %
618                                (node_name, script, output))
619     if errs and phase == constants.HOOKS_PHASE_PRE:
620       raise errors.HooksAbort(errs)
621     return results
622
623   def RunConfigUpdate(self):
624     """Run the special configuration update hook
625
626     This is a special hook that runs only on the master after each
627     top-level LI if the configuration has been updated.
628
629     """
630     phase = constants.HOOKS_PHASE_POST
631     hpath = constants.HOOKS_NAME_CFGUPDATE
632     nodes = [self.lu.cfg.GetMasterNode()]
633     self._RunWrapper(nodes, hpath, phase)