Add logrotate example
[ganeti-local] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the logic behind the cluster operations
23
24 This module implements the logic for doing operations in the cluster. There
25 are two kinds of classes defined:
26   - logical units, which know how to deal with their specific opcode only
27   - the processor, which dispatches the opcodes to their logical units
28
29 """
30
31 import sys
32 import logging
33 import random
34 import time
35 import itertools
36 import traceback
37
38 from ganeti import opcodes
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import hooksmaster
42 from ganeti import cmdlib
43 from ganeti import locking
44 from ganeti import utils
45 from ganeti import compat
46
47
48 _OP_PREFIX = "Op"
49 _LU_PREFIX = "LU"
50
51 #: LU classes which don't need to acquire the node allocation lock
52 #: (L{locking.NAL}) when they acquire all node or node resource locks
53 _NODE_ALLOC_WHITELIST = frozenset([])
54
55 #: LU classes which don't need to acquire the node allocation lock
56 #: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
57 #: or node resource locks
58 _NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
59   cmdlib.LUBackupExport,
60   cmdlib.LUBackupRemove,
61   cmdlib.LUOobCommand,
62   ])
63
64
65 class LockAcquireTimeout(Exception):
66   """Exception to report timeouts on acquiring locks.
67
68   """
69
70
71 def _CalculateLockAttemptTimeouts():
72   """Calculate timeouts for lock attempts.
73
74   """
75   result = [constants.LOCK_ATTEMPTS_MINWAIT]
76   running_sum = result[0]
77
78   # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
79   # blocking acquire
80   while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
81     timeout = (result[-1] * 1.05) ** 1.25
82
83     # Cap max timeout. This gives other jobs a chance to run even if
84     # we're still trying to get our locks, before finally moving to a
85     # blocking acquire.
86     timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
87     # And also cap the lower boundary for safety
88     timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
89
90     result.append(timeout)
91     running_sum += timeout
92
93   return result
94
95
96 class LockAttemptTimeoutStrategy(object):
97   """Class with lock acquire timeout strategy.
98
99   """
100   __slots__ = [
101     "_timeouts",
102     "_random_fn",
103     "_time_fn",
104     ]
105
106   _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
107
108   def __init__(self, _time_fn=time.time, _random_fn=random.random):
109     """Initializes this class.
110
111     @param _time_fn: Time function for unittests
112     @param _random_fn: Random number generator for unittests
113
114     """
115     object.__init__(self)
116
117     self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
118     self._time_fn = _time_fn
119     self._random_fn = _random_fn
120
121   def NextAttempt(self):
122     """Returns the timeout for the next attempt.
123
124     """
125     try:
126       timeout = self._timeouts.next()
127     except StopIteration:
128       # No more timeouts, do blocking acquire
129       timeout = None
130
131     if timeout is not None:
132       # Add a small variation (-/+ 5%) to timeout. This helps in situations
133       # where two or more jobs are fighting for the same lock(s).
134       variation_range = timeout * 0.1
135       timeout += ((self._random_fn() * variation_range) -
136                   (variation_range * 0.5))
137
138     return timeout
139
140
141 class OpExecCbBase: # pylint: disable=W0232
142   """Base class for OpCode execution callbacks.
143
144   """
145   def NotifyStart(self):
146     """Called when we are about to execute the LU.
147
148     This function is called when we're about to start the lu's Exec() method,
149     that is, after we have acquired all locks.
150
151     """
152
153   def Feedback(self, *args):
154     """Sends feedback from the LU code to the end-user.
155
156     """
157
158   def CurrentPriority(self): # pylint: disable=R0201
159     """Returns current priority or C{None}.
160
161     """
162     return None
163
164   def SubmitManyJobs(self, jobs):
165     """Submits jobs for processing.
166
167     See L{jqueue.JobQueue.SubmitManyJobs}.
168
169     """
170     raise NotImplementedError
171
172
173 def _LUNameForOpName(opname):
174   """Computes the LU name for a given OpCode name.
175
176   """
177   assert opname.startswith(_OP_PREFIX), \
178       "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
179
180   return _LU_PREFIX + opname[len(_OP_PREFIX):]
181
182
183 def _ComputeDispatchTable():
184   """Computes the opcode-to-lu dispatch table.
185
186   """
187   return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
188               for op in opcodes.OP_MAPPING.values()
189               if op.WITH_LU)
190
191
192 def _SetBaseOpParams(src, defcomment, dst):
193   """Copies basic opcode parameters.
194
195   @type src: L{opcodes.OpCode}
196   @param src: Source opcode
197   @type defcomment: string
198   @param defcomment: Comment to specify if not already given
199   @type dst: L{opcodes.OpCode}
200   @param dst: Destination opcode
201
202   """
203   if hasattr(src, "debug_level"):
204     dst.debug_level = src.debug_level
205
206   if (getattr(dst, "priority", None) is None and
207       hasattr(src, "priority")):
208     dst.priority = src.priority
209
210   if not getattr(dst, opcodes.COMMENT_ATTR, None):
211     dst.comment = defcomment
212
213
214 def _ProcessResult(submit_fn, op, result):
215   """Examines opcode result.
216
217   If necessary, additional processing on the result is done.
218
219   """
220   if isinstance(result, cmdlib.ResultWithJobs):
221     # Copy basic parameters (e.g. priority)
222     map(compat.partial(_SetBaseOpParams, op,
223                        "Submitted by %s" % op.OP_ID),
224         itertools.chain(*result.jobs))
225
226     # Submit jobs
227     job_submission = submit_fn(result.jobs)
228
229     # Build dictionary
230     result = result.other
231
232     assert constants.JOB_IDS_KEY not in result, \
233       "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
234
235     result[constants.JOB_IDS_KEY] = job_submission
236
237   return result
238
239
240 def _FailingSubmitManyJobs(_):
241   """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
242
243   """
244   raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
245                                " queries) can not submit jobs")
246
247
248 def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
249                  _nal_whitelist=_NODE_ALLOC_WHITELIST):
250   """Performs consistency checks on locks acquired by a logical unit.
251
252   @type lu: L{cmdlib.LogicalUnit}
253   @param lu: Logical unit instance
254   @type glm: L{locking.GanetiLockManager}
255   @param glm: Lock manager
256
257   """
258   if not __debug__:
259     return
260
261   have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
262
263   for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
264     # TODO: Verify using actual lock mode, not using LU variables
265     if level in lu.needed_locks:
266       share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
267       share_level = lu.share_locks[level]
268
269       if lu.__class__ in _mode_whitelist:
270         assert share_node_alloc != share_level, \
271           "LU is whitelisted to use different modes for node allocation lock"
272       else:
273         assert bool(share_node_alloc) == bool(share_level), \
274           ("Node allocation lock must be acquired using the same mode as nodes"
275            " and node resources")
276
277       if lu.__class__ in _nal_whitelist:
278         assert not have_nal, \
279           "LU is whitelisted for not acquiring the node allocation lock"
280       elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
281         assert have_nal, \
282           ("Node allocation lock must be used if an LU acquires all nodes"
283            " or node resources")
284
285
286 class Processor(object):
287   """Object which runs OpCodes"""
288   DISPATCH_TABLE = _ComputeDispatchTable()
289
290   def __init__(self, context, ec_id, enable_locks=True):
291     """Constructor for Processor
292
293     @type context: GanetiContext
294     @param context: global Ganeti context
295     @type ec_id: string
296     @param ec_id: execution context identifier
297
298     """
299     self.context = context
300     self._ec_id = ec_id
301     self._cbs = None
302     self.rpc = context.rpc
303     self.hmclass = hooksmaster.HooksMaster
304     self._enable_locks = enable_locks
305
306   def _CheckLocksEnabled(self):
307     """Checks if locking is enabled.
308
309     @raise errors.ProgrammerError: In case locking is not enabled
310
311     """
312     if not self._enable_locks:
313       raise errors.ProgrammerError("Attempted to use disabled locks")
314
315   def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
316     """Acquires locks via the Ganeti lock manager.
317
318     @type level: int
319     @param level: Lock level
320     @type names: list or string
321     @param names: Lock names
322     @type shared: bool
323     @param shared: Whether the locks should be acquired in shared mode
324     @type opportunistic: bool
325     @param opportunistic: Whether to acquire opportunistically
326     @type timeout: None or float
327     @param timeout: Timeout for acquiring the locks
328     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
329         amount of time
330
331     """
332     self._CheckLocksEnabled()
333
334     if self._cbs:
335       priority = self._cbs.CurrentPriority()
336     else:
337       priority = None
338
339     acquired = self.context.glm.acquire(level, names, shared=shared,
340                                         timeout=timeout, priority=priority,
341                                         opportunistic=opportunistic)
342
343     if acquired is None:
344       raise LockAcquireTimeout()
345
346     return acquired
347
348   def _ExecLU(self, lu):
349     """Logical Unit execution sequence.
350
351     """
352     write_count = self.context.cfg.write_count
353     lu.CheckPrereq()
354
355     hm = self.BuildHooksManager(lu)
356     h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
357     lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
358                      self.Log, None)
359
360     if getattr(lu.op, "dry_run", False):
361       # in this mode, no post-hooks are run, and the config is not
362       # written (as it might have been modified by another LU, and we
363       # shouldn't do writeout on behalf of other threads
364       self.LogInfo("dry-run mode requested, not actually executing"
365                    " the operation")
366       return lu.dry_run_result
367
368     if self._cbs:
369       submit_mj_fn = self._cbs.SubmitManyJobs
370     else:
371       submit_mj_fn = _FailingSubmitManyJobs
372
373     try:
374       result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
375       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
376       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
377                                 self.Log, result)
378     finally:
379       # FIXME: This needs locks if not lu_class.REQ_BGL
380       if write_count != self.context.cfg.write_count:
381         hm.RunConfigUpdate()
382
383     return result
384
385   def BuildHooksManager(self, lu):
386     return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
387
388   def _LockAndExecLU(self, lu, level, calc_timeout):
389     """Execute a Logical Unit, with the needed locks.
390
391     This is a recursive function that starts locking the given level, and
392     proceeds up, till there are no more locks to acquire. Then it executes the
393     given LU and its opcodes.
394
395     """
396     glm = self.context.glm
397     adding_locks = level in lu.add_locks
398     acquiring_locks = level in lu.needed_locks
399
400     if level not in locking.LEVELS:
401       _VerifyLocks(lu, glm)
402
403       if self._cbs:
404         self._cbs.NotifyStart()
405
406       try:
407         result = self._ExecLU(lu)
408       except AssertionError, err:
409         # this is a bit ugly, as we don't know from which phase
410         # (prereq, exec) this comes; but it's better than an exception
411         # with no information
412         (_, _, tb) = sys.exc_info()
413         err_info = traceback.format_tb(tb)
414         del tb
415         logging.exception("Detected AssertionError")
416         raise errors.OpExecError("Internal assertion error: please report"
417                                  " this as a bug.\nError message: '%s';"
418                                  " location:\n%s" % (str(err), err_info[-1]))
419
420     elif adding_locks and acquiring_locks:
421       # We could both acquire and add locks at the same level, but for now we
422       # don't need this, so we'll avoid the complicated code needed.
423       raise NotImplementedError("Can't declare locks to acquire when adding"
424                                 " others")
425
426     elif adding_locks or acquiring_locks:
427       self._CheckLocksEnabled()
428
429       lu.DeclareLocks(level)
430       share = lu.share_locks[level]
431       opportunistic = lu.opportunistic_locks[level]
432
433       try:
434         assert adding_locks ^ acquiring_locks, \
435           "Locks must be either added or acquired"
436
437         if acquiring_locks:
438           # Acquiring locks
439           needed_locks = lu.needed_locks[level]
440
441           self._AcquireLocks(level, needed_locks, share, opportunistic,
442                              calc_timeout())
443         else:
444           # Adding locks
445           add_locks = lu.add_locks[level]
446           lu.remove_locks[level] = add_locks
447
448           try:
449             glm.add(level, add_locks, acquired=1, shared=share)
450           except errors.LockError:
451             logging.exception("Detected lock error in level %s for locks"
452                               " %s, shared=%s", level, add_locks, share)
453             raise errors.OpPrereqError(
454               "Couldn't add locks (%s), most likely because of another"
455               " job who added them first" % add_locks,
456               errors.ECODE_NOTUNIQUE)
457
458         try:
459           result = self._LockAndExecLU(lu, level + 1, calc_timeout)
460         finally:
461           if level in lu.remove_locks:
462             glm.remove(level, lu.remove_locks[level])
463       finally:
464         if glm.is_owned(level):
465           glm.release(level)
466
467     else:
468       result = self._LockAndExecLU(lu, level + 1, calc_timeout)
469
470     return result
471
472   def ExecOpCode(self, op, cbs, timeout=None):
473     """Execute an opcode.
474
475     @type op: an OpCode instance
476     @param op: the opcode to be executed
477     @type cbs: L{OpExecCbBase}
478     @param cbs: Runtime callbacks
479     @type timeout: float or None
480     @param timeout: Maximum time to acquire all locks, None for no timeout
481     @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
482         amount of time
483
484     """
485     if not isinstance(op, opcodes.OpCode):
486       raise errors.ProgrammerError("Non-opcode instance passed"
487                                    " to ExecOpcode (%s)" % type(op))
488
489     lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
490     if lu_class is None:
491       raise errors.OpCodeUnknown("Unknown opcode")
492
493     if timeout is None:
494       calc_timeout = lambda: None
495     else:
496       calc_timeout = utils.RunningTimeout(timeout, False).Remaining
497
498     self._cbs = cbs
499     try:
500       if self._enable_locks:
501         # Acquire the Big Ganeti Lock exclusively if this LU requires it,
502         # and in a shared fashion otherwise (to prevent concurrent run with
503         # an exclusive LU.
504         self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
505                             not lu_class.REQ_BGL, False, calc_timeout())
506       elif lu_class.REQ_BGL:
507         raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
508                                      " disabled" % op.OP_ID)
509
510       try:
511         lu = lu_class(self, op, self.context, self.rpc)
512         lu.ExpandNames()
513         assert lu.needed_locks is not None, "needed_locks not set by LU"
514
515         try:
516           result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
517                                        calc_timeout)
518         finally:
519           if self._ec_id:
520             self.context.cfg.DropECReservations(self._ec_id)
521       finally:
522         # Release BGL if owned
523         if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
524           assert self._enable_locks
525           self.context.glm.release(locking.LEVEL_CLUSTER)
526     finally:
527       self._cbs = None
528
529     resultcheck_fn = op.OP_RESULT
530     if not (resultcheck_fn is None or resultcheck_fn(result)):
531       logging.error("Expected opcode result matching %s, got %s",
532                     resultcheck_fn, result)
533       if not getattr(op, "dry_run", False):
534         # FIXME: LUs should still behave in dry_run mode, or
535         # alternately we should have OP_DRYRUN_RESULT; in the
536         # meantime, we simply skip the OP_RESULT check in dry-run mode
537         raise errors.OpResultError("Opcode result does not match %s: %s" %
538                                    (resultcheck_fn, utils.Truncate(result, 80)))
539
540     return result
541
542   def Log(self, *args):
543     """Forward call to feedback callback function.
544
545     """
546     if self._cbs:
547       self._cbs.Feedback(*args)
548
549   def LogStep(self, current, total, message):
550     """Log a change in LU execution progress.
551
552     """
553     logging.debug("Step %d/%d %s", current, total, message)
554     self.Log("STEP %d/%d %s" % (current, total, message))
555
556   def LogWarning(self, message, *args, **kwargs):
557     """Log a warning to the logs and the user.
558
559     The optional keyword argument is 'hint' and can be used to show a
560     hint to the user (presumably related to the warning). If the
561     message is empty, it will not be printed at all, allowing one to
562     show only a hint.
563
564     """
565     assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
566            "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
567     if args:
568       message = message % tuple(args)
569     if message:
570       logging.warning(message)
571       self.Log(" - WARNING: %s" % message)
572     if "hint" in kwargs:
573       self.Log("      Hint: %s" % kwargs["hint"])
574
575   def LogInfo(self, message, *args):
576     """Log an informational message to the logs and the user.
577
578     """
579     if args:
580       message = message % tuple(args)
581     logging.info(message)
582     self.Log(" - INFO: %s" % message)
583
584   def GetECId(self):
585     """Returns the current execution context ID.
586
587     """
588     if not self._ec_id:
589       raise errors.ProgrammerError("Tried to use execution context id when"
590                                    " not set")
591     return self._ec_id