Revision 5aab242c test/ganeti.locking_unittest.py

b/test/ganeti.locking_unittest.py
48 48
  return wrapper
49 49

  
50 50

  
51
def SafeSleep(duration):
52
  start = time.time()
53
  while True:
54
    delay = start + duration - time.time()
55
    if delay <= 0.0:
56
      break
57
    time.sleep(delay)
58

  
59

  
51 60
class _ThreadedTestCase(unittest.TestCase):
52 61
  """Test class that supports adding/waiting on threads"""
53 62
  def setUp(self):
......
966 975
      self.failUnlessEqual(self.done.get_nowait(), 'DONE')
967 976

  
968 977
  @_Repeat
978
  def testSimpleAcquireTimeoutExpiring(self):
979
    names = sorted(self.ls._names())
980
    self.assert_(len(names) >= 3)
981

  
982
    # Get name of first lock
983
    first = names[0]
984

  
985
    # Get name of last lock
986
    last = names.pop()
987

  
988
    checks = [
989
      # Block first and try to lock it again
990
      (first, first),
991

  
992
      # Block last and try to lock all locks
993
      (None, first),
994

  
995
      # Block last and try to lock it again
996
      (last, last),
997
      ]
998

  
999
    for (wanted, block) in checks:
1000
      # Lock in exclusive mode
1001
      self.assert_(self.ls.acquire(block, shared=0))
1002

  
1003
      def _AcquireOne():
1004
        # Try to get the same lock again with a timeout (should never succeed)
1005
        if self.ls.acquire(wanted, timeout=0.1, shared=0):
1006
          self.done.put("acquired")
1007
          self.ls.release()
1008
        else:
1009
          self.assert_(not self.ls._list_owned())
1010
          self.assert_(not self.ls._is_owned())
1011
          self.done.put("not acquired")
1012

  
1013
      self._addThread(target=_AcquireOne)
1014

  
1015
      # Wait for timeout in thread to expire
1016
      self._waitThreads()
1017

  
1018
      # Release exclusive lock again
1019
      self.ls.release()
1020

  
1021
      self.assertEqual(self.done.get_nowait(), "not acquired")
1022
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1023

  
1024
  @_Repeat
1025
  def testDelayedAndExpiringLockAcquire(self):
1026
    self._setUpLS()
1027
    self.ls.add(['five', 'six', 'seven', 'eight', 'nine'])
1028

  
1029
    for expire in (False, True):
1030
      names = sorted(self.ls._names())
1031
      self.assertEqual(len(names), 8)
1032

  
1033
      lock_ev = dict([(i, threading.Event()) for i in names])
1034

  
1035
      # Lock all in exclusive mode
1036
      self.assert_(self.ls.acquire(names, shared=0))
1037

  
1038
      if expire:
1039
        # We'll wait at least 300ms per lock
1040
        lockwait = len(names) * [0.3]
1041

  
1042
        # Fail if we can't acquire all locks in 400ms. There are 8 locks, so
1043
        # this gives us up to 2.4s to fail.
1044
        lockall_timeout = 0.4
1045
      else:
1046
        # This should finish rather quickly
1047
        lockwait = None
1048
        lockall_timeout = len(names) * 5.0
1049

  
1050
      def _LockAll():
1051
        def acquire_notification(name):
1052
          if not expire:
1053
            self.done.put("getting %s" % name)
1054

  
1055
          # Kick next lock
1056
          lock_ev[name].set()
1057

  
1058
        if self.ls.acquire(names, shared=0, timeout=lockall_timeout,
1059
                           test_notify=acquire_notification):
1060
          self.done.put("got all")
1061
          self.ls.release()
1062
        else:
1063
          self.done.put("timeout on all")
1064

  
1065
        # Notify all locks
1066
        for ev in lock_ev.values():
1067
          ev.set()
1068

  
1069
      t = self._addThread(target=_LockAll)
1070

  
1071
      for idx, name in enumerate(names):
1072
        # Wait for actual acquire on this lock to start
1073
        lock_ev[name].wait(10.0)
1074

  
1075
        if expire and t.isAlive():
1076
          # Wait some time after getting the notification to make sure the lock
1077
          # acquire will expire
1078
          SafeSleep(lockwait[idx])
1079

  
1080
        self.ls.release(names=name)
1081

  
1082
      self.assert_(not self.ls._list_owned())
1083

  
1084
      self._waitThreads()
1085

  
1086
      if expire:
1087
        # Not checking which locks were actually acquired. Doing so would be
1088
        # too timing-dependant.
1089
        self.assertEqual(self.done.get_nowait(), "timeout on all")
1090
      else:
1091
        for i in names:
1092
          self.assertEqual(self.done.get_nowait(), "getting %s" % i)
1093
        self.assertEqual(self.done.get_nowait(), "got all")
1094
      self.assertRaises(Queue.Empty, self.done.get_nowait)
1095

  
1096
  @_Repeat
969 1097
  def testConcurrentRemove(self):
970 1098
    self.ls.add('four')
971 1099
    self.ls.acquire(['one', 'two', 'four'])

Also available in: Unified diff