Revision 887c7aa6 test/ganeti.locking_unittest.py

b/test/ganeti.locking_unittest.py
28 28
import Queue
29 29
import threading
30 30
import random
31
import itertools
31 32

  
32 33
from ganeti import locking
33 34
from ganeti import errors
34 35
from ganeti import utils
36
from ganeti import compat
35 37

  
36 38
import testutils
37 39

  
......
701 703

  
702 704
    self.assertRaises(Queue.Empty, self.done.get_nowait)
703 705

  
706
  def testPriority(self):
707
    # Acquire in exclusive mode
708
    self.assert_(self.sl.acquire(shared=0))
709

  
710
    # Queue acquires
711
    def _Acquire(prev, next, shared, priority, result):
712
      prev.wait()
713
      self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
714
      try:
715
        self.done.put(result)
716
      finally:
717
        self.sl.release()
718

  
719
    counter = itertools.count(0)
720
    priorities = range(-20, 30)
721
    first = threading.Event()
722
    prev = first
723

  
724
    # Data structure:
725
    # {
726
    #   priority:
727
    #     [(shared/exclusive, set(acquire names), set(pending threads)),
728
    #      (shared/exclusive, ...),
729
    #      ...,
730
    #     ],
731
    # }
732
    perprio = {}
733

  
734
    # References shared acquire per priority in L{perprio}. Data structure:
735
    # {
736
    #   priority: (shared=1, set(acquire names), set(pending threads)),
737
    # }
738
    prioshared = {}
739

  
740
    for seed in [4979, 9523, 14902, 32440]:
741
      # Use a deterministic random generator
742
      rnd = random.Random(seed)
743
      for priority in [rnd.choice(priorities) for _ in range(30)]:
744
        modes = [0, 1]
745
        rnd.shuffle(modes)
746
        for shared in modes:
747
          # Unique name
748
          acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
749

  
750
          ev = threading.Event()
751
          thread = self._addThread(target=_Acquire,
752
                                   args=(prev, ev, shared, priority, acqname))
753
          prev = ev
754

  
755
          # Record expected aqcuire, see above for structure
756
          data = (shared, set([acqname]), set([thread]))
757
          priolist = perprio.setdefault(priority, [])
758
          if shared:
759
            priosh = prioshared.get(priority, None)
760
            if priosh:
761
              # Shared acquires are merged
762
              for i, j in zip(priosh[1:], data[1:]):
763
                i.update(j)
764
              assert data[0] == priosh[0]
765
            else:
766
              prioshared[priority] = data
767
              priolist.append(data)
768
          else:
769
            priolist.append(data)
770

  
771
    # Start all acquires and wait for them
772
    first.set()
773
    prev.wait()
774

  
775
    # Check lock information
776
    self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name])
777
    self.assertEqual(self.sl.GetInfo(["mode", "owner"]),
778
                     ["exclusive", [threading.currentThread().getName()]])
779
    self.assertEqual(self.sl.GetInfo(["name", "pending"]),
780
                     [self.sl.name,
781
                      [(["exclusive", "shared"][int(bool(shared))],
782
                        sorted([t.getName() for t in threads]))
783
                       for acquires in [perprio[i]
784
                                        for i in sorted(perprio.keys())]
785
                       for (shared, _, threads) in acquires]])
786

  
787
    # Let threads acquire the lock
788
    self.sl.release()
789

  
790
    # Wait for everything to finish
791
    self._waitThreads()
792

  
793
    self.assert_(self.sl._check_empty())
794

  
795
    # Check acquires by priority
796
    for acquires in [perprio[i] for i in sorted(perprio.keys())]:
797
      for (_, names, _) in acquires:
798
        # For shared acquires, the set will contain 1..n entries. For exclusive
799
        # acquires only one.
800
        while names:
801
          names.remove(self.done.get_nowait())
802
      self.assertFalse(compat.any(names for (_, names, _) in acquires))
803

  
804
    self.assertRaises(Queue.Empty, self.done.get_nowait)
805

  
704 806

  
705 807
class TestSharedLockInCondition(_ThreadedTestCase):
706 808
  """SharedLock as a condition lock tests"""
......
1259 1361
    self.assertEqual(self.done.get_nowait(), 'DONE')
1260 1362
    self._setUpLS()
1261 1363

  
1364
  def testPriority(self):
1365
    def _Acquire(prev, next, name, priority, success_fn):
1366
      prev.wait()
1367
      self.assert_(self.ls.acquire(name, shared=0,
1368
                                   priority=priority,
1369
                                   test_notify=lambda _: next.set()))
1370
      try:
1371
        success_fn()
1372
      finally:
1373
        self.ls.release()
1374

  
1375
    # Get all in exclusive mode
1376
    self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1377

  
1378
    done_two = Queue.Queue(0)
1379

  
1380
    first = threading.Event()
1381
    prev = first
1382

  
1383
    acquires = [("one", prio, self.done) for prio in range(1, 33)]
1384
    acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1385

  
1386
    # Use a deterministic random generator
1387
    random.Random(741).shuffle(acquires)
1388

  
1389
    for (name, prio, done) in acquires:
1390
      ev = threading.Event()
1391
      self._addThread(target=_Acquire,
1392
                      args=(prev, ev, name, prio,
1393
                            compat.partial(done.put, "Prio%s" % prio)))
1394
      prev = ev
1395

  
1396
    # Start acquires
1397
    first.set()
1398

  
1399
    # Wait for last acquire to start
1400
    prev.wait()
1401

  
1402
    # Let threads acquire locks
1403
    self.ls.release()
1404

  
1405
    # Wait for threads to finish
1406
    self._waitThreads()
1407

  
1408
    for i in range(1, 33):
1409
      self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1410
      self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1411

  
1412
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1413
    self.assertRaises(Queue.Empty, done_two.get_nowait)
1414

  
1262 1415

  
1263 1416
class TestGanetiLockManager(_ThreadedTestCase):
1264 1417

  

Also available in: Unified diff