Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 7af7da68

History | View | Annotate | Download (17.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import sys
32
import logging
33
import random
34
import time
35
import itertools
36
import traceback
37

    
38
from ganeti import opcodes
39
from ganeti import constants
40
from ganeti import errors
41
from ganeti import hooksmaster
42
from ganeti import cmdlib
43
from ganeti import locking
44
from ganeti import utils
45
from ganeti import compat
46

    
47

    
48
_OP_PREFIX = "Op"
49
_LU_PREFIX = "LU"
50

    
51
#: LU classes which don't need to acquire the node allocation lock
52
#: (L{locking.NAL}) when they acquire all node or node resource locks
53
_NODE_ALLOC_WHITELIST = frozenset([])
54

    
55
#: LU classes which don't need to acquire the node allocation lock
56
#: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
57
#: or node resource locks
58
_NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
59
  cmdlib.LUBackupExport,
60
  cmdlib.LUBackupRemove,
61
  cmdlib.LUOobCommand,
62
  ])
63

    
64

    
65
class LockAcquireTimeout(Exception):
66
  """Exception to report timeouts on acquiring locks.
67

68
  """
69

    
70

    
71
def _CalculateLockAttemptTimeouts():
72
  """Calculate timeouts for lock attempts.
73

74
  """
75
  result = [constants.LOCK_ATTEMPTS_MINWAIT]
76
  running_sum = result[0]
77

    
78
  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
79
  # blocking acquire
80
  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
81
    timeout = (result[-1] * 1.05) ** 1.25
82

    
83
    # Cap max timeout. This gives other jobs a chance to run even if
84
    # we're still trying to get our locks, before finally moving to a
85
    # blocking acquire.
86
    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
87
    # And also cap the lower boundary for safety
88
    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
89

    
90
    result.append(timeout)
91
    running_sum += timeout
92

    
93
  return result
94

    
95

    
96
class LockAttemptTimeoutStrategy(object):
97
  """Class with lock acquire timeout strategy.
98

99
  """
100
  __slots__ = [
101
    "_timeouts",
102
    "_random_fn",
103
    "_time_fn",
104
    ]
105

    
106
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
107

    
108
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
109
    """Initializes this class.
110

111
    @param _time_fn: Time function for unittests
112
    @param _random_fn: Random number generator for unittests
113

114
    """
115
    object.__init__(self)
116

    
117
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
118
    self._time_fn = _time_fn
119
    self._random_fn = _random_fn
120

    
121
  def NextAttempt(self):
122
    """Returns the timeout for the next attempt.
123

124
    """
125
    try:
126
      timeout = self._timeouts.next()
127
    except StopIteration:
128
      # No more timeouts, do blocking acquire
129
      timeout = None
130

    
131
    if timeout is not None:
132
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
133
      # where two or more jobs are fighting for the same lock(s).
134
      variation_range = timeout * 0.1
135
      timeout += ((self._random_fn() * variation_range) -
136
                  (variation_range * 0.5))
137

    
138
    return timeout
139

    
140

    
141
class OpExecCbBase: # pylint: disable=W0232
142
  """Base class for OpCode execution callbacks.
143

144
  """
145
  def NotifyStart(self):
146
    """Called when we are about to execute the LU.
147

148
    This function is called when we're about to start the lu's Exec() method,
149
    that is, after we have acquired all locks.
150

151
    """
152

    
153
  def Feedback(self, *args):
154
    """Sends feedback from the LU code to the end-user.
155

156
    """
157

    
158
  def CurrentPriority(self): # pylint: disable=R0201
159
    """Returns current priority or C{None}.
160

161
    """
162
    return None
163

    
164
  def SubmitManyJobs(self, jobs):
165
    """Submits jobs for processing.
166

167
    See L{jqueue.JobQueue.SubmitManyJobs}.
168

169
    """
170
    raise NotImplementedError
171

    
172

    
173
def _LUNameForOpName(opname):
174
  """Computes the LU name for a given OpCode name.
175

176
  """
177
  assert opname.startswith(_OP_PREFIX), \
178
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
179

    
180
  return _LU_PREFIX + opname[len(_OP_PREFIX):]
181

    
182

    
183
def _ComputeDispatchTable():
184
  """Computes the opcode-to-lu dispatch table.
185

186
  """
187
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
188
              for op in opcodes.OP_MAPPING.values()
189
              if op.WITH_LU)
190

    
191

    
192
def _SetBaseOpParams(src, defcomment, dst):
193
  """Copies basic opcode parameters.
194

195
  @type src: L{opcodes.OpCode}
196
  @param src: Source opcode
197
  @type defcomment: string
198
  @param defcomment: Comment to specify if not already given
199
  @type dst: L{opcodes.OpCode}
200
  @param dst: Destination opcode
201

202
  """
203
  if hasattr(src, "debug_level"):
204
    dst.debug_level = src.debug_level
205

    
206
  if (getattr(dst, "priority", None) is None and
207
      hasattr(src, "priority")):
208
    dst.priority = src.priority
209

    
210
  if not getattr(dst, opcodes.COMMENT_ATTR, None):
211
    dst.comment = defcomment
212

    
213

    
214
def _ProcessResult(submit_fn, op, result):
215
  """Examines opcode result.
216

217
  If necessary, additional processing on the result is done.
218

219
  """
220
  if isinstance(result, cmdlib.ResultWithJobs):
221
    # Copy basic parameters (e.g. priority)
222
    map(compat.partial(_SetBaseOpParams, op,
223
                       "Submitted by %s" % op.OP_ID),
224
        itertools.chain(*result.jobs))
225

    
226
    # Submit jobs
227
    job_submission = submit_fn(result.jobs)
228

    
229
    # Build dictionary
230
    result = result.other
231

    
232
    assert constants.JOB_IDS_KEY not in result, \
233
      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
234

    
235
    result[constants.JOB_IDS_KEY] = job_submission
236

    
237
  return result
238

    
239

    
240
def _FailingSubmitManyJobs(_):
241
  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
242

243
  """
244
  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
245
                               " queries) can not submit jobs")
246

    
247

    
248
def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
249
                 _nal_whitelist=_NODE_ALLOC_WHITELIST):
250
  """Performs consistency checks on locks acquired by a logical unit.
251

252
  @type lu: L{cmdlib.LogicalUnit}
253
  @param lu: Logical unit instance
254
  @type glm: L{locking.GanetiLockManager}
255
  @param glm: Lock manager
256

257
  """
258
  if not __debug__:
259
    return
260

    
261
  have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
262

    
263
  for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
264
    # TODO: Verify using actual lock mode, not using LU variables
265
    if level in lu.needed_locks:
266
      share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
267
      share_level = lu.share_locks[level]
268

    
269
      if lu.__class__ in _mode_whitelist:
270
        assert share_node_alloc != share_level, \
271
          "LU is whitelisted to use different modes for node allocation lock"
272
      else:
273
        assert bool(share_node_alloc) == bool(share_level), \
274
          ("Node allocation lock must be acquired using the same mode as nodes"
275
           " and node resources")
276

    
277
      if lu.__class__ in _nal_whitelist:
278
        assert not have_nal, \
279
          "LU is whitelisted for not acquiring the node allocation lock"
280
      elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
281
        assert have_nal, \
282
          ("Node allocation lock must be used if an LU acquires all nodes"
283
           " or node resources")
284

    
285

    
286
class Processor(object):
287
  """Object which runs OpCodes"""
288
  DISPATCH_TABLE = _ComputeDispatchTable()
289

    
290
  def __init__(self, context, ec_id, enable_locks=True):
291
    """Constructor for Processor
292

293
    @type context: GanetiContext
294
    @param context: global Ganeti context
295
    @type ec_id: string
296
    @param ec_id: execution context identifier
297

298
    """
299
    self.context = context
300
    self._ec_id = ec_id
301
    self._cbs = None
302
    self.rpc = context.rpc
303
    self.hmclass = hooksmaster.HooksMaster
304
    self._enable_locks = enable_locks
305

    
306
  def _CheckLocksEnabled(self):
307
    """Checks if locking is enabled.
308

309
    @raise errors.ProgrammerError: In case locking is not enabled
310

311
    """
312
    if not self._enable_locks:
313
      raise errors.ProgrammerError("Attempted to use disabled locks")
314

    
315
  def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
316
    """Acquires locks via the Ganeti lock manager.
317

318
    @type level: int
319
    @param level: Lock level
320
    @type names: list or string
321
    @param names: Lock names
322
    @type shared: bool
323
    @param shared: Whether the locks should be acquired in shared mode
324
    @type opportunistic: bool
325
    @param opportunistic: Whether to acquire opportunistically
326
    @type timeout: None or float
327
    @param timeout: Timeout for acquiring the locks
328
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
329
        amount of time
330

331
    """
332
    self._CheckLocksEnabled()
333

    
334
    if self._cbs:
335
      priority = self._cbs.CurrentPriority()
336
    else:
337
      priority = None
338

    
339
    acquired = self.context.glm.acquire(level, names, shared=shared,
340
                                        timeout=timeout, priority=priority,
341
                                        opportunistic=opportunistic)
342

    
343
    if acquired is None:
344
      raise LockAcquireTimeout()
345

    
346
    return acquired
347

    
348
  def _ExecLU(self, lu):
349
    """Logical Unit execution sequence.
350

351
    """
352
    write_count = self.context.cfg.write_count
353
    lu.CheckPrereq()
354

    
355
    hm = self.BuildHooksManager(lu)
356
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
357
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
358
                     self.Log, None)
359

    
360
    if getattr(lu.op, "dry_run", False):
361
      # in this mode, no post-hooks are run, and the config is not
362
      # written (as it might have been modified by another LU, and we
363
      # shouldn't do writeout on behalf of other threads
364
      self.LogInfo("dry-run mode requested, not actually executing"
365
                   " the operation")
366
      return lu.dry_run_result
367

    
368
    if self._cbs:
369
      submit_mj_fn = self._cbs.SubmitManyJobs
370
    else:
371
      submit_mj_fn = _FailingSubmitManyJobs
372

    
373
    try:
374
      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
375
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
376
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
377
                                self.Log, result)
378
    finally:
379
      # FIXME: This needs locks if not lu_class.REQ_BGL
380
      if write_count != self.context.cfg.write_count:
381
        hm.RunConfigUpdate()
382

    
383
    return result
384

    
385
  def BuildHooksManager(self, lu):
386
    return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
387

    
388
  def _LockAndExecLU(self, lu, level, calc_timeout):
389
    """Execute a Logical Unit, with the needed locks.
390

391
    This is a recursive function that starts locking the given level, and
392
    proceeds up, till there are no more locks to acquire. Then it executes the
393
    given LU and its opcodes.
394

395
    """
396
    glm = self.context.glm
397
    adding_locks = level in lu.add_locks
398
    acquiring_locks = level in lu.needed_locks
399

    
400
    if level not in locking.LEVELS:
401
      _VerifyLocks(lu, glm)
402

    
403
      if self._cbs:
404
        self._cbs.NotifyStart()
405

    
406
      try:
407
        result = self._ExecLU(lu)
408
      except AssertionError, err:
409
        # this is a bit ugly, as we don't know from which phase
410
        # (prereq, exec) this comes; but it's better than an exception
411
        # with no information
412
        (_, _, tb) = sys.exc_info()
413
        err_info = traceback.format_tb(tb)
414
        del tb
415
        logging.exception("Detected AssertionError")
416
        raise errors.OpExecError("Internal assertion error: please report"
417
                                 " this as a bug.\nError message: '%s';"
418
                                 " location:\n%s" % (str(err), err_info[-1]))
419

    
420
    elif adding_locks and acquiring_locks:
421
      # We could both acquire and add locks at the same level, but for now we
422
      # don't need this, so we'll avoid the complicated code needed.
423
      raise NotImplementedError("Can't declare locks to acquire when adding"
424
                                " others")
425

    
426
    elif adding_locks or acquiring_locks:
427
      self._CheckLocksEnabled()
428

    
429
      lu.DeclareLocks(level)
430
      share = lu.share_locks[level]
431
      opportunistic = lu.opportunistic_locks[level]
432

    
433
      try:
434
        assert adding_locks ^ acquiring_locks, \
435
          "Locks must be either added or acquired"
436

    
437
        if acquiring_locks:
438
          # Acquiring locks
439
          needed_locks = lu.needed_locks[level]
440

    
441
          self._AcquireLocks(level, needed_locks, share, opportunistic,
442
                             calc_timeout())
443
        else:
444
          # Adding locks
445
          add_locks = lu.add_locks[level]
446
          lu.remove_locks[level] = add_locks
447

    
448
          try:
449
            glm.add(level, add_locks, acquired=1, shared=share)
450
          except errors.LockError:
451
            logging.exception("Detected lock error in level %s for locks"
452
                              " %s, shared=%s", level, add_locks, share)
453
            raise errors.OpPrereqError(
454
              "Couldn't add locks (%s), most likely because of another"
455
              " job who added them first" % add_locks,
456
              errors.ECODE_NOTUNIQUE)
457

    
458
        try:
459
          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
460
        finally:
461
          if level in lu.remove_locks:
462
            glm.remove(level, lu.remove_locks[level])
463
      finally:
464
        if glm.is_owned(level):
465
          glm.release(level)
466

    
467
    else:
468
      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
469

    
470
    return result
471

    
472
  def ExecOpCode(self, op, cbs, timeout=None):
473
    """Execute an opcode.
474

475
    @type op: an OpCode instance
476
    @param op: the opcode to be executed
477
    @type cbs: L{OpExecCbBase}
478
    @param cbs: Runtime callbacks
479
    @type timeout: float or None
480
    @param timeout: Maximum time to acquire all locks, None for no timeout
481
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
482
        amount of time
483

484
    """
485
    if not isinstance(op, opcodes.OpCode):
486
      raise errors.ProgrammerError("Non-opcode instance passed"
487
                                   " to ExecOpcode (%s)" % type(op))
488

    
489
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
490
    if lu_class is None:
491
      raise errors.OpCodeUnknown("Unknown opcode")
492

    
493
    if timeout is None:
494
      calc_timeout = lambda: None
495
    else:
496
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
497

    
498
    self._cbs = cbs
499
    try:
500
      if self._enable_locks:
501
        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
502
        # and in a shared fashion otherwise (to prevent concurrent run with
503
        # an exclusive LU.
504
        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
505
                            not lu_class.REQ_BGL, False, calc_timeout())
506
      elif lu_class.REQ_BGL:
507
        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
508
                                     " disabled" % op.OP_ID)
509

    
510
      try:
511
        lu = lu_class(self, op, self.context, self.rpc)
512
        lu.ExpandNames()
513
        assert lu.needed_locks is not None, "needed_locks not set by LU"
514

    
515
        try:
516
          result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
517
                                       calc_timeout)
518
        finally:
519
          if self._ec_id:
520
            self.context.cfg.DropECReservations(self._ec_id)
521
      finally:
522
        # Release BGL if owned
523
        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
524
          assert self._enable_locks
525
          self.context.glm.release(locking.LEVEL_CLUSTER)
526
    finally:
527
      self._cbs = None
528

    
529
    resultcheck_fn = op.OP_RESULT
530
    if not (resultcheck_fn is None or resultcheck_fn(result)):
531
      logging.error("Expected opcode result matching %s, got %s",
532
                    resultcheck_fn, result)
533
      if not getattr(op, "dry_run", False):
534
        # FIXME: LUs should still behave in dry_run mode, or
535
        # alternately we should have OP_DRYRUN_RESULT; in the
536
        # meantime, we simply skip the OP_RESULT check in dry-run mode
537
        raise errors.OpResultError("Opcode result does not match %s: %s" %
538
                                   (resultcheck_fn, utils.Truncate(result, 80)))
539

    
540
    return result
541

    
542
  def Log(self, *args):
543
    """Forward call to feedback callback function.
544

545
    """
546
    if self._cbs:
547
      self._cbs.Feedback(*args)
548

    
549
  def LogStep(self, current, total, message):
550
    """Log a change in LU execution progress.
551

552
    """
553
    logging.debug("Step %d/%d %s", current, total, message)
554
    self.Log("STEP %d/%d %s" % (current, total, message))
555

    
556
  def LogWarning(self, message, *args, **kwargs):
557
    """Log a warning to the logs and the user.
558

559
    The optional keyword argument is 'hint' and can be used to show a
560
    hint to the user (presumably related to the warning). If the
561
    message is empty, it will not be printed at all, allowing one to
562
    show only a hint.
563

564
    """
565
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
566
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
567
    if args:
568
      message = message % tuple(args)
569
    if message:
570
      logging.warning(message)
571
      self.Log(" - WARNING: %s" % message)
572
    if "hint" in kwargs:
573
      self.Log("      Hint: %s" % kwargs["hint"])
574

    
575
  def LogInfo(self, message, *args):
576
    """Log an informational message to the logs and the user.
577

578
    """
579
    if args:
580
      message = message % tuple(args)
581
    logging.info(message)
582
    self.Log(" - INFO: %s" % message)
583

    
584
  def GetECId(self):
585
    """Returns the current execution context ID.
586

587
    """
588
    if not self._ec_id:
589
      raise errors.ProgrammerError("Tried to use execution context id when"
590
                                   " not set")
591
    return self._ec_id