Revision 831bbbc1

b/lib/mcpu.py
40 40
from ganeti import locking
41 41

  
42 42

  
43
class _LockAcquireTimeout(Exception):
44
  """Internal exception to report timeouts on acquiring locks.
43
class LockAcquireTimeout(Exception):
44
  """Exception to report timeouts on acquiring locks.
45 45

  
46 46
  """
47 47

  
......
328 328
                                        calc_timeout())
329 329

  
330 330
          if acquired is None:
331
            raise _LockAcquireTimeout()
331
            raise LockAcquireTimeout()
332 332

  
333 333
        else:
334 334
          # Adding locks
......
361 361

  
362 362
    return result
363 363

  
364
  def ExecOpCode(self, op, cbs):
364
  def ExecOpCode(self, op, cbs, timeout=None):
365 365
    """Execute an opcode.
366 366

  
367 367
    @type op: an OpCode instance
368 368
    @param op: the opcode to be executed
369 369
    @type cbs: L{OpExecCbBase}
370 370
    @param cbs: Runtime callbacks
371
    @type timeout: float or None
372
    @param timeout: Maximum time to acquire all locks, None for no timeout
373
    @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
374
        amount of time
371 375

  
372 376
    """
373 377
    if not isinstance(op, opcodes.OpCode):
374 378
      raise errors.ProgrammerError("Non-opcode instance passed"
375 379
                                   " to ExecOpcode")
376 380

  
381
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
382
    if lu_class is None:
383
      raise errors.OpCodeUnknown("Unknown opcode")
384

  
385
    if timeout is None:
386
      calc_timeout = lambda: None
387
    else:
388
      calc_timeout = locking.RunningTimeout(timeout, False).Remaining
389

  
377 390
    self._cbs = cbs
378 391
    try:
379
      lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
380
      if lu_class is None:
381
        raise errors.OpCodeUnknown("Unknown opcode")
392
      # Acquire the Big Ganeti Lock exclusively if this LU requires it,
393
      # and in a shared fashion otherwise (to prevent concurrent run with
394
      # an exclusive LU.
395
      if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
396
                            not lu_class.REQ_BGL, calc_timeout()) is None:
397
        raise LockAcquireTimeout()
382 398

  
383
      timeout_strategy = _LockAttemptTimeoutStrategy()
399
      try:
400
        lu = lu_class(self, op, self.context, self.rpc)
401
        lu.ExpandNames()
402
        assert lu.needed_locks is not None, "needed_locks not set by LU"
384 403

  
385
      while True:
386 404
        try:
387
          acquire_timeout = timeout_strategy.CalcRemainingTimeout()
388

  
389
          # Acquire the Big Ganeti Lock exclusively if this LU requires it,
390
          # and in a shared fashion otherwise (to prevent concurrent run with
391
          # an exclusive LU.
392
          if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
393
                                not lu_class.REQ_BGL, acquire_timeout) is None:
394
            raise _LockAcquireTimeout()
395

  
396
          try:
397
            lu = lu_class(self, op, self.context, self.rpc)
398
            lu.ExpandNames()
399
            assert lu.needed_locks is not None, "needed_locks not set by LU"
400

  
401
            try:
402
              return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
403
                                         timeout_strategy.CalcRemainingTimeout)
404
            finally:
405
              if self._ec_id:
406
                self.context.cfg.DropECReservations(self._ec_id)
407

  
408
          finally:
409
            self.context.glm.release(locking.LEVEL_CLUSTER)
410

  
411
        except _LockAcquireTimeout:
412
          # Timeout while waiting for lock, try again
413
          pass
414

  
415
        timeout_strategy = timeout_strategy.NextAttempt()
416

  
405
          return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
406
        finally:
407
          if self._ec_id:
408
            self.context.cfg.DropECReservations(self._ec_id)
409
      finally:
410
        self.context.glm.release(locking.LEVEL_CLUSTER)
417 411
    finally:
418 412
      self._cbs = None
419 413

  

Also available in: Unified diff