Revision e3200b18 lib/mcpu.py

b/lib/mcpu.py
46 46
  """
47 47

  
48 48

  
49
class _LockTimeoutStrategy(object):
49
def _CalculateLockAttemptTimeouts():
50
  """Calculate timeouts for lock attempts.
51

  
52
  """
53
  running_sum = 0
54
  result = [1.0]
55

  
56
  # Wait for a total of at least 150s before doing a blocking acquire
57
  while sum(result) < 150.0:
58
    timeout = (result[-1] * 1.05) ** 1.25
59

  
60
    # Cap timeout at 10 seconds. This gives other jobs a chance to run
61
    # even if we're still trying to get our locks, before finally moving
62
    # to a blocking acquire.
63
    if timeout > 10.0:
64
      timeout = 10.0
65

  
66
    elif timeout < 0.1:
67
      # Lower boundary for safety
68
      timeout = 0.1
69

  
70
    result.append(timeout)
71

  
72
  return result
73

  
74

  
75
class _LockAttemptTimeoutStrategy(object):
50 76
  """Class with lock acquire timeout strategy.
51 77

  
52 78
  """
53 79
  __slots__ = [
54
    "_attempts",
80
    "_attempt",
55 81
    "_random_fn",
56 82
    "_start_time",
83
    "_time_fn",
57 84
    ]
58 85

  
59
  _MAX_ATTEMPTS = 10
60
  """How many retries before going into blocking mode"""
61

  
62
  _ATTEMPT_FACTOR = 1.75
63
  """Factor between attempts"""
86
  _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
64 87

  
65
  def __init__(self, _random_fn=None):
88
  def __init__(self, attempt=0, _time_fn=time.time, _random_fn=random.random):
66 89
    """Initializes this class.
67 90

  
91
    @type attempt: int
92
    @param attempt: Current attempt number
93
    @param _time_fn: Time function for unittests
68 94
    @param _random_fn: Random number generator for unittests
69 95

  
70 96
    """
71 97
    object.__init__(self)
72 98

  
73
    self._start_time = None
74
    self._attempts = 0
99
    if attempt < 0:
100
      raise ValueError("Attempt must be zero or positive")
75 101

  
76
    if _random_fn is None:
77
      self._random_fn = random.random
78
    else:
79
      self._random_fn = _random_fn
102
    self._attempt = attempt
103
    self._time_fn = _time_fn
104
    self._random_fn = _random_fn
105

  
106
    self._start_time = None
80 107

  
81 108
  def NextAttempt(self):
82
    """Advances to the next attempt.
109
    """Returns the strategy for the next attempt.
83 110

  
84 111
    """
85
    assert self._attempts >= 0
86
    self._attempts += 1
112
    return _LockAttemptTimeoutStrategy(attempt=self._attempt + 1,
113
                                       _time_fn=self._time_fn,
114
                                       _random_fn=self._random_fn)
87 115

  
88 116
  def CalcRemainingTimeout(self):
89 117
    """Returns the remaining timeout.
90 118

  
91 119
    """
92
    assert self._attempts >= 0
93

  
94
    if self._attempts == self._MAX_ATTEMPTS:
95
      # Only blocking acquires after 10 retries
120
    try:
121
      timeout = self._TIMEOUT_PER_ATTEMPT[self._attempt]
122
    except IndexError:
123
      # No more timeouts, do blocking acquire
96 124
      return None
97 125

  
98
    if self._attempts > self._MAX_ATTEMPTS:
99
      raise RuntimeError("Blocking acquire ran into timeout")
100

  
101 126
    # Get start time on first calculation
102 127
    if self._start_time is None:
103
      self._start_time = time.time()
128
      self._start_time = self._time_fn()
104 129

  
105 130
    # Calculate remaining time for this attempt
106
    timeout = (self._start_time + (self._ATTEMPT_FACTOR ** self._attempts) -
107
               time.time())
108

  
109
    if timeout > 10.0:
110
      # Cap timeout at 10 seconds. This gives other jobs a chance to run
111
      # even if we're still trying to get our locks, before finally moving
112
      # to a blocking acquire.
113
      timeout = 10.0
114

  
115
    elif timeout < 0.1:
116
      # Lower boundary
117
      timeout = 0.1
131
    remaining_timeout = self._start_time + timeout - self._time_fn()
118 132

  
119 133
    # Add a small variation (-/+ 5%) to timeouts. This helps in situations
120 134
    # where two or more jobs are fighting for the same lock(s).
121
    variation_range = timeout * 0.1
122
    timeout += (self._random_fn() * variation_range) - (variation_range * 0.5)
135
    variation_range = remaining_timeout * 0.1
136
    remaining_timeout += ((self._random_fn() * variation_range) -
137
                          (variation_range * 0.5))
123 138

  
124
    assert timeout >= 0.0, "Timeout must be positive"
139
    assert remaining_timeout >= 0.0, "Timeout must be positive"
125 140

  
126
    return timeout
141
    return remaining_timeout
127 142

  
128 143

  
129 144
class OpExecCbBase:
......
414 429
      if lu_class is None:
415 430
        raise errors.OpCodeUnknown("Unknown opcode")
416 431

  
417
      timeout_strategy = _LockTimeoutStrategy()
418
      calc_timeout = timeout_strategy.CalcRemainingTimeout
432
      timeout_strategy = _LockAttemptTimeoutStrategy()
419 433

  
420 434
      while True:
421 435
        try:
436
          acquire_timeout = timeout_strategy.CalcRemainingTimeout()
437

  
422 438
          # Acquire the Big Ganeti Lock exclusively if this LU requires it,
423 439
          # and in a shared fashion otherwise (to prevent concurrent run with
424 440
          # an exclusive LU.
425 441
          if self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
426
                                not lu_class.REQ_BGL, calc_timeout()) is None:
442
                                not lu_class.REQ_BGL, acquire_timeout) is None:
427 443
            raise _LockAcquireTimeout()
428 444

  
429 445
          try:
......
431 447
            lu.ExpandNames()
432 448
            assert lu.needed_locks is not None, "needed_locks not set by LU"
433 449

  
434
            return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
450
            return self._LockAndExecLU(lu, locking.LEVEL_INSTANCE,
451
                                       timeout_strategy.CalcRemainingTimeout)
435 452
          finally:
436 453
            self.context.glm.release(locking.LEVEL_CLUSTER)
437 454

  
......
439 456
          # Timeout while waiting for lock, try again
440 457
          pass
441 458

  
442
        timeout_strategy.NextAttempt()
459
        timeout_strategy = timeout_strategy.NextAttempt()
443 460

  
444 461
    finally:
445 462
      self._cbs = None

Also available in: Unified diff