Merge branch 'stable-2.9' into stable-2.10
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import sys
32 import logging
33 import random
34 import time
35 import itertools
36 import traceback
37
38 from ganeti import opcodes
39 from ganeti import opcodes_base
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import hooksmaster
43 from ganeti import cmdlib
44 from ganeti import locking
45 from ganeti import utils
46 from ganeti import compat
47
48
49 _OP_PREFIX = "Op"
50 _LU_PREFIX = "LU"
51
52 #: LU classes which don't need to acquire the node allocation lock
53 #: (L{locking.NAL}) when they acquire all node or node resource locks
54 _NODE_ALLOC_WHITELIST = frozenset([])
55
56 #: LU classes which don't need to acquire the node allocation lock
57 #: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
58 #: or node resource locks
59 _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
60   cmdlib.LUBackupExport,
61   cmdlib.LUBackupRemove,
62   cmdlib.LUOobCommand,
63   ])
64
65
66 class LockAcquireTimeout(Exception):
67   """Exception to report timeouts on acquiring locks.
68
69   """
70
71
72 def _CalculateLockAttemptTimeouts():
73   """Calculate timeouts for lock attempts.
74
75   """
76   result = [constants.LOCK_ATTEMPTS_MINWAIT]
77   running_sum = result[0]
78
79   # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
80   # blocking acquire
81   while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
82     timeout = (result[-1] * 1.05) ** 1.25
83
84     # Cap max timeout. This gives other jobs a chance to run even if
85     # we're still trying to get our locks, before finally moving to a
86     # blocking acquire.
87     timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
88     # And also cap the lower boundary for safety
89     timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
90
91     result.append(timeout)
92     running_sum += timeout
93
94   return result
95
96
97 class LockAttemptTimeoutStrategy(object):
98   """Class with lock acquire timeout strategy.
99
100   """
101   __slots__ = [
102     "_timeouts",
103     "_random_fn",
104     "_time_fn",
105     ]
106
107   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
108
109   def __init__(self, _time_fn=time.time, _random_fn=random.random):
110     """Initializes this class.
111
112     @param _time_fn: Time function for unittests
113     @param _random_fn: Random number generator for unittests
114
115     """
116     object.__init__(self)
117
118     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
119     self._time_fn = _time_fn
120     self._random_fn = _random_fn
121
122   def NextAttempt(self):
123     """Returns the timeout for the next attempt.
124
125     """
126     try:
127       timeout = self._timeouts.next()
128     except StopIteration:
129       # No more timeouts, do blocking acquire
130       timeout = None
131
132     if timeout is not None:
133       # Add a small variation (-/+ 5%) to timeout. This helps in situations
134       # where two or more jobs are fighting for the same lock(s).
135       variation_range = timeout * 0.1
136       timeout += ((self._random_fn() * variation_range) -
137                   (variation_range * 0.5))
138
139     return timeout
140
141
142 class OpExecCbBase: # pylint: disable=W0232
143   """Base class for OpCode execution callbacks.
144
145   """
146   def NotifyStart(self):
147     """Called when we are about to execute the LU.
148
149     This function is called when we're about to start the lu's Exec() method,
150     that is, after we have acquired all locks.
151
152     """
153
154   def Feedback(self, *args):
155     """Sends feedback from the LU code to the end-user.
156
157     """
158
159   def CurrentPriority(self): # pylint: disable=R0201
160     """Returns current priority or C{None}.
161
162     """
163     return None
164
165   def SubmitManyJobs(self, jobs):
166     """Submits jobs for processing.
167
168     See L{jqueue.JobQueue.SubmitManyJobs}.
169
170     """
171     raise NotImplementedError
172
173
174 def _LUNameForOpName(opname):
175   """Computes the LU name for a given OpCode name.
176
177   """
178   assert opname.startswith(_OP_PREFIX), \
179       "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
180
181   return _LU_PREFIX + opname[len(_OP_PREFIX):]
182
183
184 def _ComputeDispatchTable():
185   """Computes the opcode-to-lu dispatch table.
186
187   """
188   return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
189               for op in opcodes.OP_MAPPING.values()
190               if op.WITH_LU)
191
192
193 def _SetBaseOpParams(src, defcomment, dst):
194   """Copies basic opcode parameters.
195
196   @type src: L{opcodes.OpCode}
197   @param src: Source opcode
198   @type defcomment: string
199   @param defcomment: Comment to specify if not already given
200   @type dst: L{opcodes.OpCode}
201   @param dst: Destination opcode
202
203   """
204   if hasattr(src, "debug_level"):
205     dst.debug_level = src.debug_level
206
207   if (getattr(dst, "priority", None) is None and
208       hasattr(src, "priority")):
209     dst.priority = src.priority
210
211   if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
212     dst.comment = defcomment
213
214
215 def _ProcessResult(submit_fn, op, result):
216   """Examines opcode result.
217
218   If necessary, additional processing on the result is done.
219
220   """
221   if isinstance(result, cmdlib.ResultWithJobs):
222     # Copy basic parameters (e.g. priority)
223     map(compat.partial(_SetBaseOpParams, op,
224                        "Submitted by %s" % op.OP_ID),
225         itertools.chain(*result.jobs))
226
227     # Submit jobs
228     job_submission = submit_fn(result.jobs)
229
230     # Build dictionary
231     result = result.other
232
233     assert constants.JOB_IDS_KEY not in result, \
234       "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
235
236     result[constants.JOB_IDS_KEY] = job_submission
237
238   return result
239
240
241 def _FailingSubmitManyJobs(_):
242   """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
243
244   """
245   raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
246                                " queries) can not submit jobs")
247
248
249 def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
250                  _nal_whitelist=_NODE_ALLOC_WHITELIST):
251   """Performs consistency checks on locks acquired by a logical unit.
252
253   @type lu: L{cmdlib.LogicalUnit}
254   @param lu: Logical unit instance
255   @type glm: L{locking.GanetiLockManager}
256   @param glm: Lock manager
257
258   """
259   if not __debug__:
260     return
261
262   have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
263
264   for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
265     # TODO: Verify using actual lock mode, not using LU variables
266     if level in lu.needed_locks:
267       share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
268       share_level = lu.share_locks[level]
269
270       if lu.__class__ in _mode_whitelist:
271         assert share_node_alloc != share_level, \
272           "LU is whitelisted to use different modes for node allocation lock"
273       else:
274         assert bool(share_node_alloc) == bool(share_level), \
275           ("Node allocation lock must be acquired using the same mode as nodes"
276            " and node resources")
277
278       if lu.__class__ in _nal_whitelist:
279         assert not have_nal, \
280           "LU is whitelisted for not acquiring the node allocation lock"
281       elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
282         assert have_nal, \
283           ("Node allocation lock must be used if an LU acquires all nodes"
284            " or node resources")
285
286
287 class Processor(object):
288   """Object which runs OpCodes"""
289   DISPATCH_TABLE = _ComputeDispatchTable()
290
291   def __init__(self, context, ec_id, enable_locks=True):
292     """Constructor for Processor
293
294     @type context: GanetiContext
295     @param context: global Ganeti context
296     @type ec_id: string
297     @param ec_id: execution context identifier
298
299     """
300     self.context = context
301     self._ec_id = ec_id
302     self._cbs = None
303     self.rpc = context.rpc
304     self.hmclass = hooksmaster.HooksMaster
305     self._enable_locks = enable_locks
306
307   def _CheckLocksEnabled(self):
308     """Checks if locking is enabled.
309
310     @raise errors.ProgrammerError: In case locking is not enabled
311
312     """
313     if not self._enable_locks:
314       raise errors.ProgrammerError("Attempted to use disabled locks")
315
316   def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
317     """Acquires locks via the Ganeti lock manager.
318
319     @type level: int
320     @param level: Lock level
321     @type names: list or string
322     @param names: Lock names
323     @type shared: bool
324     @param shared: Whether the locks should be acquired in shared mode
325     @type opportunistic: bool
326     @param opportunistic: Whether to acquire opportunistically
327     @type timeout: None or float
328     @param timeout: Timeout for acquiring the locks
329     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
330         amount of time
331
332     """
333     self._CheckLocksEnabled()
334
335     if self._cbs:
336       priority = self._cbs.CurrentPriority()
337     else:
338       priority = None
339
340     acquired = self.context.glm.acquire(level, names, shared=shared,
341                                         timeout=timeout, priority=priority,
342                                         opportunistic=opportunistic)
343
344     if acquired is None:
345       raise LockAcquireTimeout()
346
347     return acquired
348
349   def _ExecLU(self, lu):
350     """Logical Unit execution sequence.
351
352     """
353     write_count = self.context.cfg.write_count
354     lu.CheckPrereq()
355
356     hm = self.BuildHooksManager(lu)
357     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
358     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
359                      self.Log, None)
360
361     if getattr(lu.op, "dry_run", False):
362       # in this mode, no post-hooks are run, and the config is not
363       # written (as it might have been modified by another LU, and we
364       # shouldn't do writeout on behalf of other threads
365       self.LogInfo("dry-run mode requested, not actually executing"
366                    " the operation")
367       return lu.dry_run_result
368
369     if self._cbs:
370       submit_mj_fn = self._cbs.SubmitManyJobs
371     else:
372       submit_mj_fn = _FailingSubmitManyJobs
373
374     try:
375       result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
376       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
377       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
378                                 self.Log, result)
379     finally:
380       # FIXME: This needs locks if not lu_class.REQ_BGL
381       if write_count != self.context.cfg.write_count:
382         hm.RunConfigUpdate()
383
384     return result
385
386   def BuildHooksManager(self, lu):
387     return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
388
389   def _LockAndExecLU(self, lu, level, calc_timeout):
390     """Execute a Logical Unit, with the needed locks.
391
392     This is a recursive function that starts locking the given level, and
393     proceeds up, till there are no more locks to acquire. Then it executes the
394     given LU and its opcodes.
395
396     """
397     glm = self.context.glm
398     adding_locks = level in lu.add_locks
399     acquiring_locks = level in lu.needed_locks
400
401     if level not in locking.LEVELS:
402       _VerifyLocks(lu, glm)
403
404       if self._cbs:
405         self._cbs.NotifyStart()
406
407       try:
408         result = self._ExecLU(lu)
409       except AssertionError, err:
410         # this is a bit ugly, as we don't know from which phase
411         # (prereq, exec) this comes; but it's better than an exception
412         # with no information
413         (_, _, tb) = sys.exc_info()
414         err_info = traceback.format_tb(tb)
415         del tb
416         logging.exception("Detected AssertionError")
417         raise errors.OpExecError("Internal assertion error: please report"
418                                  " this as a bug.\nError message: '%s';"
419                                  " location:\n%s" % (str(err), err_info[-1]))
420
421     elif adding_locks and acquiring_locks:
422       # We could both acquire and add locks at the same level, but for now we
423       # don't need this, so we'll avoid the complicated code needed.
424       raise NotImplementedError("Can't declare locks to acquire when adding"
425                                 " others")
426
427     elif adding_locks or acquiring_locks:
428       self._CheckLocksEnabled()
429
430       lu.DeclareLocks(level)
431       share = lu.share_locks[level]
432       opportunistic = lu.opportunistic_locks[level]
433
434       try:
435         assert adding_locks ^ acquiring_locks, \
436           "Locks must be either added or acquired"
437
438         if acquiring_locks:
439           # Acquiring locks
440           needed_locks = lu.needed_locks[level]
441
442           self._AcquireLocks(level, needed_locks, share, opportunistic,
443                              calc_timeout())
444         else:
445           # Adding locks
446           add_locks = lu.add_locks[level]
447           lu.remove_locks[level] = add_locks
448
449           try:
450             glm.add(level, add_locks, acquired=1, shared=share)
451           except errors.LockError:
452             logging.exception("Detected lock error in level %s for locks"
453                               " %s, shared=%s", level, add_locks, share)
454             raise errors.OpPrereqError(
455               "Couldn't add locks (%s), most likely because of another"
456               " job who added them first" % add_locks,
457               errors.ECODE_NOTUNIQUE)
458
459         try:
460           result = self._LockAndExecLU(lu, level + 1, calc_timeout)
461         finally:
462           if level in lu.remove_locks:
463             glm.remove(level, lu.remove_locks[level])
464       finally:
465         if glm.is_owned(level):
466           glm.release(level)
467
468     else:
469       result = self._LockAndExecLU(lu, level + 1, calc_timeout)
470
471     return result
472
473   # pylint: disable=R0201
474   def _CheckLUResult(self, op, result):
475     """Check the LU result against the contract in the opcode.
476
477     """
478     resultcheck_fn = op.OP_RESULT
479     if not (resultcheck_fn is None or resultcheck_fn(result)):
480       logging.error("Expected opcode result matching %s, got %s",
481                     resultcheck_fn, result)
482       if not getattr(op, "dry_run", False):
483         # FIXME: LUs should still behave in dry_run mode, or
484         # alternately we should have OP_DRYRUN_RESULT; in the
485         # meantime, we simply skip the OP_RESULT check in dry-run mode
486         raise errors.OpResultError("Opcode result does not match %s: %s" %
487                                    (resultcheck_fn, utils.Truncate(result, 80)))
488
489   def ExecOpCode(self, op, cbs, timeout=None):
490     """Execute an opcode.
491
492     @type op: an OpCode instance
493     @param op: the opcode to be executed
494     @type cbs: L{OpExecCbBase}
495     @param cbs: Runtime callbacks
496     @type timeout: float or None
497     @param timeout: Maximum time to acquire all locks, None for no timeout
498     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
499         amount of time
500
501     """
502     if not isinstance(op, opcodes.OpCode):
503       raise errors.ProgrammerError("Non-opcode instance passed"
504                                    " to ExecOpcode (%s)" % type(op))
505
506     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
507     if lu_class is None:
508       raise errors.OpCodeUnknown("Unknown opcode")
509
510     if timeout is None:
511       calc_timeout = lambda: None
512     else:
513       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
514
515     self._cbs = cbs
516     try:
517       if self._enable_locks:
518         # Acquire the Big Ganeti Lock exclusively if this LU requires it,
519         # and in a shared fashion otherwise (to prevent concurrent run with
520         # an exclusive LU.
521         self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
522                             not lu_class.REQ_BGL, False, calc_timeout())
523       elif lu_class.REQ_BGL:
524         raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
525                                      " disabled" % op.OP_ID)
526
527       try:
528         lu = lu_class(self, op, self.context, self.rpc)
529         lu.ExpandNames()
530         assert lu.needed_locks is not None, "needed_locks not set by LU"
531
532         try:
533           result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
534                                        calc_timeout)
535         finally:
536           if self._ec_id:
537             self.context.cfg.DropECReservations(self._ec_id)
538       finally:
539         # Release BGL if owned
540         if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
541           assert self._enable_locks
542           self.context.glm.release(locking.LEVEL_CLUSTER)
543     finally:
544       self._cbs = None
545
546     self._CheckLUResult(op, result)
547
548     return result
549
550   def Log(self, *args):
551     """Forward call to feedback callback function.
552
553     """
554     if self._cbs:
555       self._cbs.Feedback(*args)
556
557   def LogStep(self, current, total, message):
558     """Log a change in LU execution progress.
559
560     """
561     logging.debug("Step %d/%d %s", current, total, message)
562     self.Log("STEP %d/%d %s" % (current, total, message))
563
564   def LogWarning(self, message, *args, **kwargs):
565     """Log a warning to the logs and the user.
566
567     The optional keyword argument is 'hint' and can be used to show a
568     hint to the user (presumably related to the warning). If the
569     message is empty, it will not be printed at all, allowing one to
570     show only a hint.
571
572     """
573     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
574            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
575     if args:
576       message = message % tuple(args)
577     if message:
578       logging.warning(message)
579       self.Log(" - WARNING: %s" % message)
580     if "hint" in kwargs:
581       self.Log("      Hint: %s" % kwargs["hint"])
582
583   def LogInfo(self, message, *args):
584     """Log an informational message to the logs and the user.
585
586     """
587     if args:
588       message = message % tuple(args)
589     logging.info(message)
590     self.Log(" - INFO: %s" % message)
591
592   def GetECId(self):
593     """Returns the current execution context ID.
594
595     """
596     if not self._ec_id:
597       raise errors.ProgrammerError("Tried to use execution context id when"
598                                    " not set")
599     return self._ec_id