from ganeti import mcpu
from ganeti import opcodes
+from ganeti.constants import \
+ LOCK_ATTEMPTS_TIMEOUT, \
+ LOCK_ATTEMPTS_MAXWAIT, \
+ LOCK_ATTEMPTS_MINWAIT
import testutils
class TestLockAttemptTimeoutStrategy(unittest.TestCase):
def testConstants(self):
tpa = mcpu.LockAttemptTimeoutStrategy._TIMEOUT_PER_ATTEMPT
- self.assert_(len(tpa) > 10)
- self.assert_(sum(tpa) >= 150.0)
+ self.assert_(len(tpa) > LOCK_ATTEMPTS_TIMEOUT / LOCK_ATTEMPTS_MAXWAIT)
+ self.assert_(sum(tpa) >= LOCK_ATTEMPTS_TIMEOUT)
def testSimple(self):
strat = mcpu.LockAttemptTimeoutStrategy(_random_fn=lambda: 0.5,
timeout = strat.NextAttempt()
self.assert_(timeout is not None)
- self.assert_(timeout <= 10.0)
- self.assert_(timeout >= 0.0)
+ self.assert_(timeout <= LOCK_ATTEMPTS_MAXWAIT)
+ self.assert_(timeout >= LOCK_ATTEMPTS_MINWAIT)
self.assert_(prev is None or timeout >= prev)
prev = timeout