Statistics
| Branch: | Tag: | Revision:

root / lib / mcpu.py @ 355d1f32

History | View | Annotate | Download (18.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the logic behind the cluster operations
23

24
This module implements the logic for doing operations in the cluster. There
25
are two kinds of classes defined:
26
  - logical units, which know how to deal with their specific opcode only
27
  - the processor, which dispatches the opcodes to their logical units
28

29
"""
30

    
31
import sys
32
import logging
33
import random
34
import time
35
import itertools
36
import traceback
37

    
38
from ganeti import opcodes
39
from ganeti import opcodes_base
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import hooksmaster
43
from ganeti import cmdlib
44
from ganeti import locking
45
from ganeti import utils
46
from ganeti import compat
47

    
48

    
49
_OP_PREFIX = "Op"
50
_LU_PREFIX = "LU"
51

    
52
#: LU classes which don't need to acquire the node allocation lock
53
#: (L{locking.NAL}) when they acquire all node or node resource locks
54
_NODE_ALLOC_WHITELIST = frozenset([])
55

    
56
#: LU classes which don't need to acquire the node allocation lock
57
#: (L{locking.NAL}) in the same mode (shared/exclusive) as the node
58
#: or node resource locks
59
_NODE_ALLOC_MODE_WHITELIST = compat.UniqueFrozenset([
60
  cmdlib.LUBackupExport,
61
  cmdlib.LUBackupRemove,
62
  cmdlib.LUOobCommand,
63
  ])
64

    
65

    
66
class LockAcquireTimeout(Exception):
67
  """Exception to report timeouts on acquiring locks.
68

69
  """
70

    
71

    
72
def _CalculateLockAttemptTimeouts():
73
  """Calculate timeouts for lock attempts.
74

75
  """
76
  result = [constants.LOCK_ATTEMPTS_MINWAIT]
77
  running_sum = result[0]
78

    
79
  # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
80
  # blocking acquire
81
  while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
82
    timeout = (result[-1] * 1.05) ** 1.25
83

    
84
    # Cap max timeout. This gives other jobs a chance to run even if
85
    # we're still trying to get our locks, before finally moving to a
86
    # blocking acquire.
87
    timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
88
    # And also cap the lower boundary for safety
89
    timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
90

    
91
    result.append(timeout)
92
    running_sum += timeout
93

    
94
  return result
95

    
96

    
97
class LockAttemptTimeoutStrategy(object):
98
  """Class with lock acquire timeout strategy.
99

100
  """
101
  __slots__ = [
102
    "_timeouts",
103
    "_random_fn",
104
    "_time_fn",
105
    ]
106

    
107
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
108

    
109
  def __init__(self, _time_fn=time.time, _random_fn=random.random):
110
    """Initializes this class.
111

112
    @param _time_fn: Time function for unittests
113
    @param _random_fn: Random number generator for unittests
114

115
    """
116
    object.__init__(self)
117

    
118
    self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
119
    self._time_fn = _time_fn
120
    self._random_fn = _random_fn
121

    
122
  def NextAttempt(self):
123
    """Returns the timeout for the next attempt.
124

125
    """
126
    try:
127
      timeout = self._timeouts.next()
128
    except StopIteration:
129
      # No more timeouts, do blocking acquire
130
      timeout = None
131

    
132
    if timeout is not None:
133
      # Add a small variation (-/+ 5%) to timeout. This helps in situations
134
      # where two or more jobs are fighting for the same lock(s).
135
      variation_range = timeout * 0.1
136
      timeout += ((self._random_fn() * variation_range) -
137
                  (variation_range * 0.5))
138

    
139
    return timeout
140

    
141

    
142
class OpExecCbBase: # pylint: disable=W0232
143
  """Base class for OpCode execution callbacks.
144

145
  """
146
  def NotifyStart(self):
147
    """Called when we are about to execute the LU.
148

149
    This function is called when we're about to start the lu's Exec() method,
150
    that is, after we have acquired all locks.
151

152
    """
153

    
154
  def Feedback(self, *args):
155
    """Sends feedback from the LU code to the end-user.
156

157
    """
158

    
159
  def CurrentPriority(self): # pylint: disable=R0201
160
    """Returns current priority or C{None}.
161

162
    """
163
    return None
164

    
165
  def SubmitManyJobs(self, jobs):
166
    """Submits jobs for processing.
167

168
    See L{jqueue.JobQueue.SubmitManyJobs}.
169

170
    """
171
    raise NotImplementedError
172

    
173

    
174
def _LUNameForOpName(opname):
175
  """Computes the LU name for a given OpCode name.
176

177
  """
178
  assert opname.startswith(_OP_PREFIX), \
179
      "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
180

    
181
  return _LU_PREFIX + opname[len(_OP_PREFIX):]
182

    
183

    
184
def _ComputeDispatchTable():
185
  """Computes the opcode-to-lu dispatch table.
186

187
  """
188
  return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
189
              for op in opcodes.OP_MAPPING.values()
190
              if op.WITH_LU)
191

    
192

    
193
def _SetBaseOpParams(src, defcomment, dst):
194
  """Copies basic opcode parameters.
195

196
  @type src: L{opcodes.OpCode}
197
  @param src: Source opcode
198
  @type defcomment: string
199
  @param defcomment: Comment to specify if not already given
200
  @type dst: L{opcodes.OpCode}
201
  @param dst: Destination opcode
202

203
  """
204
  if hasattr(src, "debug_level"):
205
    dst.debug_level = src.debug_level
206

    
207
  if (getattr(dst, "priority", None) is None and
208
      hasattr(src, "priority")):
209
    dst.priority = src.priority
210

    
211
  if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
212
    dst.comment = defcomment
213

    
214

    
215
def _ProcessResult(submit_fn, op, result):
216
  """Examines opcode result.
217

218
  If necessary, additional processing on the result is done.
219

220
  """
221
  if isinstance(result, cmdlib.ResultWithJobs):
222
    # Copy basic parameters (e.g. priority)
223
    map(compat.partial(_SetBaseOpParams, op,
224
                       "Submitted by %s" % op.OP_ID),
225
        itertools.chain(*result.jobs))
226

    
227
    # Submit jobs
228
    job_submission = submit_fn(result.jobs)
229

    
230
    # Build dictionary
231
    result = result.other
232

    
233
    assert constants.JOB_IDS_KEY not in result, \
234
      "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
235

    
236
    result[constants.JOB_IDS_KEY] = job_submission
237

    
238
  return result
239

    
240

    
241
def _FailingSubmitManyJobs(_):
242
  """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
243

244
  """
245
  raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
246
                               " queries) can not submit jobs")
247

    
248

    
249
def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
250
                 _nal_whitelist=_NODE_ALLOC_WHITELIST):
251
  """Performs consistency checks on locks acquired by a logical unit.
252

253
  @type lu: L{cmdlib.LogicalUnit}
254
  @param lu: Logical unit instance
255
  @type glm: L{locking.GanetiLockManager}
256
  @param glm: Lock manager
257

258
  """
259
  if not __debug__:
260
    return
261

    
262
  have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
263

    
264
  for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
265
    # TODO: Verify using actual lock mode, not using LU variables
266
    if level in lu.needed_locks:
267
      share_node_alloc = lu.share_locks[locking.LEVEL_NODE_ALLOC]
268
      share_level = lu.share_locks[level]
269

    
270
      if lu.__class__ in _mode_whitelist:
271
        assert share_node_alloc != share_level, \
272
          "LU is whitelisted to use different modes for node allocation lock"
273
      else:
274
        assert bool(share_node_alloc) == bool(share_level), \
275
          ("Node allocation lock must be acquired using the same mode as nodes"
276
           " and node resources")
277

    
278
      if lu.__class__ in _nal_whitelist:
279
        assert not have_nal, \
280
          "LU is whitelisted for not acquiring the node allocation lock"
281
      elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
282
        assert have_nal, \
283
          ("Node allocation lock must be used if an LU acquires all nodes"
284
           " or node resources")
285

    
286

    
287
class Processor(object):
288
  """Object which runs OpCodes"""
289
  DISPATCH_TABLE = _ComputeDispatchTable()
290

    
291
  def __init__(self, context, ec_id, enable_locks=True):
292
    """Constructor for Processor
293

294
    @type context: GanetiContext
295
    @param context: global Ganeti context
296
    @type ec_id: string
297
    @param ec_id: execution context identifier
298

299
    """
300
    self.context = context
301
    self._ec_id = ec_id
302
    self._cbs = None
303
    self.rpc = context.rpc
304
    self.hmclass = hooksmaster.HooksMaster
305
    self._enable_locks = enable_locks
306

    
307
  def _CheckLocksEnabled(self):
308
    """Checks if locking is enabled.
309

310
    @raise errors.ProgrammerError: In case locking is not enabled
311

312
    """
313
    if not self._enable_locks:
314
      raise errors.ProgrammerError("Attempted to use disabled locks")
315

    
316
  def _AcquireLocks(self, level, names, shared, opportunistic, timeout):
317
    """Acquires locks via the Ganeti lock manager.
318

319
    @type level: int
320
    @param level: Lock level
321
    @type names: list or string
322
    @param names: Lock names
323
    @type shared: bool
324
    @param shared: Whether the locks should be acquired in shared mode
325
    @type opportunistic: bool
326
    @param opportunistic: Whether to acquire opportunistically
327
    @type timeout: None or float
328
    @param timeout: Timeout for acquiring the locks
329
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
330
        amount of time
331

332
    """
333
    self._CheckLocksEnabled()
334

    
335
    if self._cbs:
336
      priority = self._cbs.CurrentPriority()
337
    else:
338
      priority = None
339

    
340
    acquired = self.context.glm.acquire(level, names, shared=shared,
341
                                        timeout=timeout, priority=priority,
342
                                        opportunistic=opportunistic)
343

    
344
    if acquired is None:
345
      raise LockAcquireTimeout()
346

    
347
    return acquired
348

    
349
  def _ExecLU(self, lu):
350
    """Logical Unit execution sequence.
351

352
    """
353
    write_count = self.context.cfg.write_count
354
    lu.CheckPrereq()
355

    
356
    hm = self.BuildHooksManager(lu)
357
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
358
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
359
                     self.Log, None)
360

    
361
    if getattr(lu.op, "dry_run", False):
362
      # in this mode, no post-hooks are run, and the config is not
363
      # written (as it might have been modified by another LU, and we
364
      # shouldn't do writeout on behalf of other threads
365
      self.LogInfo("dry-run mode requested, not actually executing"
366
                   " the operation")
367
      return lu.dry_run_result
368

    
369
    if self._cbs:
370
      submit_mj_fn = self._cbs.SubmitManyJobs
371
    else:
372
      submit_mj_fn = _FailingSubmitManyJobs
373

    
374
    try:
375
      result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
376
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
377
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
378
                                self.Log, result)
379
    finally:
380
      # FIXME: This needs locks if not lu_class.REQ_BGL
381
      if write_count != self.context.cfg.write_count:
382
        hm.RunConfigUpdate()
383

    
384
    return result
385

    
386
  def BuildHooksManager(self, lu):
387
    return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
388

    
389
  def _LockAndExecLU(self, lu, level, calc_timeout):
390
    """Execute a Logical Unit, with the needed locks.
391

392
    This is a recursive function that starts locking the given level, and
393
    proceeds up, till there are no more locks to acquire. Then it executes the
394
    given LU and its opcodes.
395

396
    """
397
    glm = self.context.glm
398
    adding_locks = level in lu.add_locks
399
    acquiring_locks = level in lu.needed_locks
400

    
401
    if level not in locking.LEVELS:
402
      _VerifyLocks(lu, glm)
403

    
404
      if self._cbs:
405
        self._cbs.NotifyStart()
406

    
407
      try:
408
        result = self._ExecLU(lu)
409
      except AssertionError, err:
410
        # this is a bit ugly, as we don't know from which phase
411
        # (prereq, exec) this comes; but it's better than an exception
412
        # with no information
413
        (_, _, tb) = sys.exc_info()
414
        err_info = traceback.format_tb(tb)
415
        del tb
416
        logging.exception("Detected AssertionError")
417
        raise errors.OpExecError("Internal assertion error: please report"
418
                                 " this as a bug.\nError message: '%s';"
419
                                 " location:\n%s" % (str(err), err_info[-1]))
420

    
421
    elif adding_locks and acquiring_locks:
422
      # We could both acquire and add locks at the same level, but for now we
423
      # don't need this, so we'll avoid the complicated code needed.
424
      raise NotImplementedError("Can't declare locks to acquire when adding"
425
                                " others")
426

    
427
    elif adding_locks or acquiring_locks:
428
      self._CheckLocksEnabled()
429

    
430
      lu.DeclareLocks(level)
431
      share = lu.share_locks[level]
432
      opportunistic = lu.opportunistic_locks[level]
433

    
434
      try:
435
        assert adding_locks ^ acquiring_locks, \
436
          "Locks must be either added or acquired"
437

    
438
        if acquiring_locks:
439
          # Acquiring locks
440
          needed_locks = lu.needed_locks[level]
441

    
442
          self._AcquireLocks(level, needed_locks, share, opportunistic,
443
                             calc_timeout())
444
        else:
445
          # Adding locks
446
          add_locks = lu.add_locks[level]
447
          lu.remove_locks[level] = add_locks
448

    
449
          try:
450
            glm.add(level, add_locks, acquired=1, shared=share)
451
          except errors.LockError:
452
            logging.exception("Detected lock error in level %s for locks"
453
                              " %s, shared=%s", level, add_locks, share)
454
            raise errors.OpPrereqError(
455
              "Couldn't add locks (%s), most likely because of another"
456
              " job who added them first" % add_locks,
457
              errors.ECODE_NOTUNIQUE)
458

    
459
        try:
460
          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
461
        finally:
462
          if level in lu.remove_locks:
463
            glm.remove(level, lu.remove_locks[level])
464
      finally:
465
        if glm.is_owned(level):
466
          glm.release(level)
467

    
468
    else:
469
      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
470

    
471
    return result
472

    
473
  # pylint: disable=R0201
474
  def _CheckLUResult(self, op, result):
475
    """Check the LU result against the contract in the opcode.
476

477
    """
478
    resultcheck_fn = op.OP_RESULT
479
    if not (resultcheck_fn is None or resultcheck_fn(result)):
480
      logging.error("Expected opcode result matching %s, got %s",
481
                    resultcheck_fn, result)
482
      if not getattr(op, "dry_run", False):
483
        # FIXME: LUs should still behave in dry_run mode, or
484
        # alternately we should have OP_DRYRUN_RESULT; in the
485
        # meantime, we simply skip the OP_RESULT check in dry-run mode
486
        raise errors.OpResultError("Opcode result does not match %s: %s" %
487
                                   (resultcheck_fn, utils.Truncate(result, 80)))
488

    
489
  def ExecOpCode(self, op, cbs, timeout=None):
490
    """Execute an opcode.
491

492
    @type op: an OpCode instance
493
    @param op: the opcode to be executed
494
    @type cbs: L{OpExecCbBase}
495
    @param cbs: Runtime callbacks
496
    @type timeout: float or None
497
    @param timeout: Maximum time to acquire all locks, None for no timeout
498
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
499
        amount of time
500

501
    """
502
    if not isinstance(op, opcodes.OpCode):
503
      raise errors.ProgrammerError("Non-opcode instance passed"
504
                                   " to ExecOpcode (%s)" % type(op))
505

    
506
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
507
    if lu_class is None:
508
      raise errors.OpCodeUnknown("Unknown opcode")
509

    
510
    if timeout is None:
511
      calc_timeout = lambda: None
512
    else:
513
      calc_timeout = utils.RunningTimeout(timeout, False).Remaining
514

    
515
    self._cbs = cbs
516
    try:
517
      if self._enable_locks:
518
        # Acquire the Big Ganeti Lock exclusively if this LU requires it,
519
        # and in a shared fashion otherwise (to prevent concurrent run with
520
        # an exclusive LU.
521
        self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
522
                            not lu_class.REQ_BGL, False, calc_timeout())
523
      elif lu_class.REQ_BGL:
524
        raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
525
                                     " disabled" % op.OP_ID)
526

    
527
      try:
528
        lu = lu_class(self, op, self.context, self.rpc)
529
        lu.ExpandNames()
530
        assert lu.needed_locks is not None, "needed_locks not set by LU"
531

    
532
        try:
533
          result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
534
                                       calc_timeout)
535
        finally:
536
          if self._ec_id:
537
            self.context.cfg.DropECReservations(self._ec_id)
538
      finally:
539
        # Release BGL if owned
540
        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
541
          assert self._enable_locks
542
          self.context.glm.release(locking.LEVEL_CLUSTER)
543
    finally:
544
      self._cbs = None
545

    
546
    self._CheckLUResult(op, result)
547

    
548
    return result
549

    
550
  def Log(self, *args):
551
    """Forward call to feedback callback function.
552

553
    """
554
    if self._cbs:
555
      self._cbs.Feedback(*args)
556

    
557
  def LogStep(self, current, total, message):
558
    """Log a change in LU execution progress.
559

560
    """
561
    logging.debug("Step %d/%d %s", current, total, message)
562
    self.Log("STEP %d/%d %s" % (current, total, message))
563

    
564
  def LogWarning(self, message, *args, **kwargs):
565
    """Log a warning to the logs and the user.
566

567
    The optional keyword argument is 'hint' and can be used to show a
568
    hint to the user (presumably related to the warning). If the
569
    message is empty, it will not be printed at all, allowing one to
570
    show only a hint.
571

572
    """
573
    assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
574
           "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
575
    if args:
576
      message = message % tuple(args)
577
    if message:
578
      logging.warning(message)
579
      self.Log(" - WARNING: %s" % message)
580
    if "hint" in kwargs:
581
      self.Log("      Hint: %s" % kwargs["hint"])
582

    
583
  def LogInfo(self, message, *args):
584
    """Log an informational message to the logs and the user.
585

586
    """
587
    if args:
588
      message = message % tuple(args)
589
    logging.info(message)
590
    self.Log(" - INFO: %s" % message)
591

    
592
  def GetECId(self):
593
    """Returns the current execution context ID.
594

595
    """
596
    if not self._ec_id:
597
      raise errors.ProgrammerError("Tried to use execution context id when"
598
                                   " not set")
599
    return self._ec_id