Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 31d3b918

History | View | Annotate | Download (18.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import sys
32
import logging
33
import random
34
import time
35
import itertools
36
import traceback
37

    
38
from ganeti import opcodes
39
from ganeti import opcodes_base
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import hooksmaster
43
from ganeti import cmdlib
44
from ganeti import locking
45
from ganeti import utils
46
from ganeti import compat
47

    
48

    
49
_OP_PREFIX = "Op"
50
_LU_PREFIX = "LU"
51

    
52
#: LU classes which don't need to acquire the node allocation lock
53
#: (L{locking.NAL}) when they acquire all node or node resource locks
54
_NODE_ALLOC_WHITELIST = frozenset([])
55

    
56
#: LU classes which don't need to acquire the node allocation lock
57
#: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
58
#: or node resource locks
59
_NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
60
  cmdlib.LUBackupExport,
61
  cmdlib.LUBackupRemove,
62
  cmdlib.LUOobCommand,
63
  ])
64

    
65

    
66
class LockAcquireTimeout(Exception):
67
  """Exception to report timeouts on acquiring locks.
68

69
  """
70

    
71

    
72
def _CalculateLockAttemptTimeouts():
73
  """Calculate timeouts for lock attempts.
74

75
  """
76
  result = [constants.LOCK_ATTEMPTS_MINWAIT]
77
  running_sum = result[0]
78

    
79
  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
80
  # blocking acquire
81
  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
82
    timeout = (result[-1] * 1.05) ** 1.25
83

    
84
    # Cap max timeout. This gives other jobs a chance to run even if
85
    # we're still trying to get our locks, before finally moving to a
86
    # blocking acquire.
87
    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
88
    # And also cap the lower boundary for safety
89
    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
90

    
91
    result.append(timeout)
92
    running_sum += timeout
93

    
94
  return result
95

    
96

    
97
class LockAttemptTimeoutStrategy(object):
98
  """Class with lock acquire timeout strategy.
99

100
  """
101
  __slots__ = [
102
    "_timeouts",
103
    "_random_fn",
104
    "_time_fn",
105
    ]
106

    
107
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
108

    
109
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
110
    """Initializes this class.
111

112
    @param _time_fn: Time function for unittests
113
    @param _random_fn: Random number generator for unittests
114

115
    """
116
    object.__init__(self)
117

    
118
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
119
    self._time_fn = _time_fn
120
    self._random_fn = _random_fn
121

    
122
  def NextAttempt(self):
123
    """Returns the timeout for the next attempt.
124

125
    """
126
    try:
127
      timeout = self._timeouts.next()
128
    except StopIteration:
129
      # No more timeouts, do blocking acquire
130
      timeout = None
131

    
132
    if timeout is not None:
133
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
134
      # where two or more jobs are fighting for the same lock(s).
135
      variation_range = timeout * 0.1
136
      timeout += ((self._random_fn() * variation_range) -
137
                  (variation_range * 0.5))
138

    
139
    return timeout
140

    
141

    
142
class OpExecCbBase: # pylint: disable=W0232
143
  """Base class for OpCode execution callbacks.
144

145
  """
146
  def NotifyStart(self):
147
    """Called when we are about to execute the LU.
148

149
    This function is called when we're about to start the lu's Exec() method,
150
    that is, after we have acquired all locks.
151

152
    """
153

    
154
  def Feedback(self, *args):
155
    """Sends feedback from the LU code to the end-user.
156

157
    """
158

    
159
  def CurrentPriority(self): # pylint: disable=R0201
160
    """Returns current priority or C{None}.
161

162
    """
163
    return None
164

    
165
  def SubmitManyJobs(self, jobs):
166
    """Submits jobs for processing.
167

168
    See L{jqueue.JobQueue.SubmitManyJobs}.
169

170
    """
171
    raise NotImplementedError
172

    
173

    
174
def _LUNameForOpName(opname):
175
  """Computes the LU name for a given OpCode name.
176

177
  """
178
  assert opname.startswith(_OP_PREFIX), \
179
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
180

    
181
  return _LU_PREFIX + opname[len(_OP_PREFIX):]
182

    
183

    
184
def _ComputeDispatchTable():
185
  """Computes the opcode-to-lu dispatch table.
186

187
  """
188
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
189
              for op in opcodes.OP_MAPPING.values()
190
              if op.WITH_LU)
191

    
192

    
193
def _SetBaseOpParams(src, defcomment, dst):
194
  """Copies basic opcode parameters.
195

196
  @type src: L{opcodes.OpCode}
197
  @param src: Source opcode
198
  @type defcomment: string
199
  @param defcomment: Comment to specify if not already given
200
  @type dst: L{opcodes.OpCode}
201
  @param dst: Destination opcode
202

203
  """
204
  if hasattr(src, "debug_level"):
205
    dst.debug_level = src.debug_level
206

    
207
  if (getattr(dst, "priority", None) is None and
208
      hasattr(src, "priority")):
209
    dst.priority = src.priority
210

    
211
  if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
212
    dst.comment = defcomment
213

    
214
  # FIXME: extend reason trail, showing the derivedness
215
  if not getattr(dst, constants.OPCODE_REASON, None):
216
    dst.reason = getattr(src, constants.OPCODE_REASON, [])
217

    
218

    
219
def _ProcessResult(submit_fn, op, result):
220
  """Examines opcode result.
221

222
  If necessary, additional processing on the result is done.
223

224
  """
225
  if isinstance(result, cmdlib.ResultWithJobs):
226
    # Copy basic parameters (e.g. priority)
227
    map(compat.partial(_SetBaseOpParams, op,
228
                       "Submitted by %s" % op.OP_ID),
229
        itertools.chain(*result.jobs))
230

    
231
    # Submit jobs
232
    job_submission = submit_fn(result.jobs)
233

    
234
    # Build dictionary
235
    result = result.other
236

    
237
    assert constants.JOB_IDS_KEY not in result, \
238
      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
239

    
240
    result[constants.JOB_IDS_KEY] = job_submission
241

    
242
  return result
243

    
244

    
245
def _FailingSubmitManyJobs(_):
246
  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
247

248
  """
249
  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
250
                               " queries) can not submit jobs")
251

    
252

    
253
def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
254
                 _nal_whitelist=_NODE_ALLOC_WHITELIST):
255
  """Performs consistency checks on locks acquired by a logical unit.
256

257
  @type lu: L{cmdlib.LogicalUnit}
258
  @param lu: Logical unit instance
259
  @type glm: L{locking.GanetiLockManager}
260
  @param glm: Lock manager
261

262
  """
263
  if not __debug__:
264
    return
265

    
266
  have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
267

    
268
  for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
269
    # TODO: Verify using actual lock mode, not using LU variables
270
    if level in lu.needed_locks:
271
      share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
272
      share_level = lu.share_locks[level]
273

    
274
      if lu.__class__ in _mode_whitelist:
275
        assert share_node_alloc != share_level, \
276
          "LU is whitelisted to use different modes for node allocation lock"
277
      else:
278
        assert bool(share_node_alloc) == bool(share_level), \
279
          ("Node allocation lock must be acquired using the same mode as nodes"
280
           " and node resources")
281

    
282
      if lu.__class__ in _nal_whitelist:
283
        assert not have_nal, \
284
          "LU is whitelisted for not acquiring the node allocation lock"
285
      elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
286
        assert have_nal, \
287
          ("Node allocation lock must be used if an LU acquires all nodes"
288
           " or node resources")
289

    
290

    
291
class Processor(object):
292
  """Object which runs OpCodes"""
293
  DISPATCH_TABLE = _ComputeDispatchTable()
294

    
295
  def __init__(self, context, ec_id, enable_locks=True):
296
    """Constructor for Processor
297

298
    @type context: GanetiContext
299
    @param context: global Ganeti context
300
    @type ec_id: string
301
    @param ec_id: execution context identifier
302

303
    """
304
    self.context = context
305
    self._ec_id = ec_id
306
    self._cbs = None
307
    self.rpc = context.rpc
308
    self.hmclass = hooksmaster.HooksMaster
309
    self._enable_locks = enable_locks
310

    
311
  def _CheckLocksEnabled(self):
312
    """Checks if locking is enabled.
313

314
    @raise errors.ProgrammerError: In case locking is not enabled
315

316
    """
317
    if not self._enable_locks:
318
      raise errors.ProgrammerError("Attempted to use disabled locks")
319

    
320
  def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
321
    """Acquires locks via the Ganeti lock manager.
322

323
    @type level: int
324
    @param level: Lock level
325
    @type names: list or string
326
    @param names: Lock names
327
    @type shared: bool
328
    @param shared: Whether the locks should be acquired in shared mode
329
    @type opportunistic: bool
330
    @param opportunistic: Whether to acquire opportunistically
331
    @type timeout: None or float
332
    @param timeout: Timeout for acquiring the locks
333
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
334
        amount of time
335

336
    """
337
    self._CheckLocksEnabled()
338

    
339
    if self._cbs:
340
      priority = self._cbs.CurrentPriority()
341
    else:
342
      priority = None
343

    
344
    acquired = self.context.glm.acquire(level, names, shared=shared,
345
                                        timeout=timeout, priority=priority,
346
                                        opportunistic=opportunistic)
347

    
348
    if acquired is None:
349
      raise LockAcquireTimeout()
350

    
351
    return acquired
352

    
353
  def _ExecLU(self, lu):
354
    """Logical Unit execution sequence.
355

356
    """
357
    write_count = self.context.cfg.write_count
358
    lu.CheckPrereq()
359

    
360
    hm = self.BuildHooksManager(lu)
361
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
362
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
363
                     self.Log, None)
364

    
365
    if getattr(lu.op, "dry_run", False):
366
      # in this mode, no post-hooks are run, and the config is not
367
      # written (as it might have been modified by another LU, and we
368
      # shouldn't do writeout on behalf of other threads
369
      self.LogInfo("dry-run mode requested, not actually executing"
370
                   " the operation")
371
      return lu.dry_run_result
372

    
373
    if self._cbs:
374
      submit_mj_fn = self._cbs.SubmitManyJobs
375
    else:
376
      submit_mj_fn = _FailingSubmitManyJobs
377

    
378
    try:
379
      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
380
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
381
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
382
                                self.Log, result)
383
    finally:
384
      # FIXME: This needs locks if not lu_class.REQ_BGL
385
      if write_count != self.context.cfg.write_count:
386
        hm.RunConfigUpdate()
387

    
388
    return result
389

    
390
  def BuildHooksManager(self, lu):
391
    return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
392

    
393
  def _LockAndExecLU(self, lu, level, calc_timeout):
394
    """Execute a Logical Unit, with the needed locks.
395

396
    This is a recursive function that starts locking the given level, and
397
    proceeds up, till there are no more locks to acquire. Then it executes the
398
    given LU and its opcodes.
399

400
    """
401
    glm = self.context.glm
402
    adding_locks = level in lu.add_locks
403
    acquiring_locks = level in lu.needed_locks
404

    
405
    if level not in locking.LEVELS:
406
      _VerifyLocks(lu, glm)
407

    
408
      if self._cbs:
409
        self._cbs.NotifyStart()
410

    
411
      try:
412
        result = self._ExecLU(lu)
413
      except AssertionError, err:
414
        # this is a bit ugly, as we don't know from which phase
415
        # (prereq, exec) this comes; but it's better than an exception
416
        # with no information
417
        (_, _, tb) = sys.exc_info()
418
        err_info = traceback.format_tb(tb)
419
        del tb
420
        logging.exception("Detected AssertionError")
421
        raise errors.OpExecError("Internal assertion error: please report"
422
                                 " this as a bug.\nError message: '%s';"
423
                                 " location:\n%s" % (str(err), err_info[-1]))
424

    
425
    elif adding_locks and acquiring_locks:
426
      # We could both acquire and add locks at the same level, but for now we
427
      # don't need this, so we'll avoid the complicated code needed.
428
      raise NotImplementedError("Can't declare locks to acquire when adding"
429
                                " others")
430

    
431
    elif adding_locks or acquiring_locks:
432
      self._CheckLocksEnabled()
433

    
434
      lu.DeclareLocks(level)
435
      share = lu.share_locks[level]
436
      opportunistic = lu.opportunistic_locks[level]
437

    
438
      try:
439
        assert adding_locks ^ acquiring_locks, \
440
          "Locks must be either added or acquired"
441

    
442
        if acquiring_locks:
443
          # Acquiring locks
444
          needed_locks = lu.needed_locks[level]
445

    
446
          self._AcquireLocks(level, needed_locks, share, opportunistic,
447
                             calc_timeout())
448
        else:
449
          # Adding locks
450
          add_locks = lu.add_locks[level]
451
          lu.remove_locks[level] = add_locks
452

    
453
          try:
454
            glm.add(level, add_locks, acquired=1, shared=share)
455
          except errors.LockError:
456
            logging.exception("Detected lock error in level %s for locks"
457
                              " %s, shared=%s", level, add_locks, share)
458
            raise errors.OpPrereqError(
459
              "Couldn't add locks (%s), most likely because of another"
460
              " job who added them first" % add_locks,
461
              errors.ECODE_NOTUNIQUE)
462

    
463
        try:
464
          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
465
        finally:
466
          if level in lu.remove_locks:
467
            glm.remove(level, lu.remove_locks[level])
468
      finally:
469
        if glm.is_owned(level):
470
          glm.release(level)
471

    
472
    else:
473
      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
474

    
475
    return result
476

    
477
  # pylint: disable=R0201
478
  def _CheckLUResult(self, op, result):
479
    """Check the LU result against the contract in the opcode.
480

481
    """
482
    resultcheck_fn = op.OP_RESULT
483
    if not (resultcheck_fn is None or resultcheck_fn(result)):
484
      logging.error("Expected opcode result matching %s, got %s",
485
                    resultcheck_fn, result)
486
      if not getattr(op, "dry_run", False):
487
        # FIXME: LUs should still behave in dry_run mode, or
488
        # alternately we should have OP_DRYRUN_RESULT; in the
489
        # meantime, we simply skip the OP_RESULT check in dry-run mode
490
        raise errors.OpResultError("Opcode result does not match %s: %s" %
491
                                   (resultcheck_fn, utils.Truncate(result, 80)))
492

    
493
  def ExecOpCode(self, op, cbs, timeout=None):
494
    """Execute an opcode.
495

496
    @type op: an OpCode instance
497
    @param op: the opcode to be executed
498
    @type cbs: L{OpExecCbBase}
499
    @param cbs: Runtime callbacks
500
    @type timeout: float or None
501
    @param timeout: Maximum time to acquire all locks, None for no timeout
502
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
503
        amount of time
504

505
    """
506
    if not isinstance(op, opcodes.OpCode):
507
      raise errors.ProgrammerError("Non-opcode instance passed"
508
                                   " to ExecOpcode (%s)" % type(op))
509

    
510
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
511
    if lu_class is None:
512
      raise errors.OpCodeUnknown("Unknown opcode")
513

    
514
    if timeout is None:
515
      calc_timeout = lambda: None
516
    else:
517
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
518

    
519
    self._cbs = cbs
520
    try:
521
      if self._enable_locks:
522
        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
523
        # and in a shared fashion otherwise (to prevent concurrent run with
524
        # an exclusive LU.
525
        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
526
                            not lu_class.REQ_BGL, False, calc_timeout())
527
      elif lu_class.REQ_BGL:
528
        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
529
                                     " disabled" % op.OP_ID)
530

    
531
      try:
532
        lu = lu_class(self, op, self.context, self.rpc)
533
        lu.ExpandNames()
534
        assert lu.needed_locks is not None, "needed_locks not set by LU"
535

    
536
        try:
537
          result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
538
                                       calc_timeout)
539
        finally:
540
          if self._ec_id:
541
            self.context.cfg.DropECReservations(self._ec_id)
542
      finally:
543
        # Release BGL if owned
544
        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
545
          assert self._enable_locks
546
          self.context.glm.release(locking.LEVEL_CLUSTER)
547
    finally:
548
      self._cbs = None
549

    
550
    self._CheckLUResult(op, result)
551

    
552
    return result
553

    
554
  def Log(self, *args):
555
    """Forward call to feedback callback function.
556

557
    """
558
    if self._cbs:
559
      self._cbs.Feedback(*args)
560

    
561
  def LogStep(self, current, total, message):
562
    """Log a change in LU execution progress.
563

564
    """
565
    logging.debug("Step %d/%d %s", current, total, message)
566
    self.Log("STEP %d/%d %s" % (current, total, message))
567

    
568
  def LogWarning(self, message, *args, **kwargs):
569
    """Log a warning to the logs and the user.
570

571
    The optional keyword argument is 'hint' and can be used to show a
572
    hint to the user (presumably related to the warning). If the
573
    message is empty, it will not be printed at all, allowing one to
574
    show only a hint.
575

576
    """
577
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
578
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
579
    if args:
580
      message = message % tuple(args)
581
    if message:
582
      logging.warning(message)
583
      self.Log(" - WARNING: %s" % message)
584
    if "hint" in kwargs:
585
      self.Log("      Hint: %s" % kwargs["hint"])
586

    
587
  def LogInfo(self, message, *args):
588
    """Log an informational message to the logs and the user.
589

590
    """
591
    if args:
592
      message = message % tuple(args)
593
    logging.info(message)
594
    self.Log(" - INFO: %s" % message)
595

    
596
  def GetECId(self):
597
    """Returns the current execution context ID.
598

599
    """
600
    if not self._ec_id:
601
      raise errors.ProgrammerError("Tried to use execution context id when"
602
                                   " not set")
603
    return self._ec_id