Revision 008b92fa test/ganeti.locking_unittest.py

b/test/ganeti.locking_unittest.py
450 450

  
451 451
  @_Repeat
452 452
  def testExclusiveAcquireTimeout(self):
453
    def _LockExclusive(wait):
454
      self.sl.acquire(shared=0)
455
      self.done.put("A: start sleep")
456
      time.sleep(wait)
457
      self.done.put("A: end sleep")
458
      self.sl.release()
459

  
460 453
    for shared in [0, 1]:
461
      # Start thread to hold lock for 20 ms
462
      self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
454
      on_queue = threading.Event()
455
      release_exclusive = threading.Event()
456

  
457
      def _LockExclusive():
458
        self.sl.acquire(shared=0, test_notify=on_queue.set)
459
        self.done.put("A: start wait")
460
        release_exclusive.wait()
461
        self.done.put("A: end wait")
462
        self.sl.release()
463

  
464
      # Start thread to hold lock in exclusive mode
465
      self._addThread(target=_LockExclusive)
463 466

  
464
      # Wait for sleep to begin
465
      self.assertEqual(self.done.get(), "A: start sleep")
467
      # Wait for wait to begin
468
      self.assertEqual(self.done.get(timeout=60), "A: start wait")
469

  
470
      # Wait up to 60s to get lock, but release exclusive lock as soon as we're
471
      # on the queue
472
      self.failUnless(self.sl.acquire(shared=shared, timeout=60,
473
                                      test_notify=release_exclusive.set))
466 474

  
467
      # Wait up to 100 ms to get lock
468
      self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
469 475
      self.done.put("got 2nd")
470 476
      self.sl.release()
471 477

  
472 478
      self._waitThreads()
473 479

  
474
      self.assertEqual(self.done.get_nowait(), "A: end sleep")
480
      self.assertEqual(self.done.get_nowait(), "A: end wait")
475 481
      self.assertEqual(self.done.get_nowait(), "got 2nd")
476 482
      self.assertRaises(Queue.Empty, self.done.get_nowait)
477 483

  
......
507 513
    # Tests whether shared acquires jump in front of exclusive acquires in the
508 514
    # queue.
509 515

  
510
    # Get exclusive lock while we fill the queue
511
    self.sl.acquire()
516
    def _Acquire(shared, name, notify_ev, wait_ev):
517
      if notify_ev:
518
        notify_fn = notify_ev.set
519
      else:
520
        notify_fn = None
512 521

  
513
    def _Acquire(shared, name):
514
      if not self.sl.acquire(shared=shared):
522
      if wait_ev:
523
        wait_ev.wait()
524

  
525
      if not self.sl.acquire(shared=shared, test_notify=notify_fn):
515 526
        return
516 527

  
517 528
      self.done.put(name)
518 529
      self.sl.release()
519 530

  
520
    # Start shared acquires
521
    for _ in xrange(5):
522
      self._addThread(target=_Acquire, args=(1, "shared A"))
531
    # Get exclusive lock while we fill the queue
532
    self.sl.acquire()
523 533

  
524
    # Start exclusive acquires
525
    for _ in xrange(3):
526
      self._addThread(target=_Acquire, args=(0, "exclusive B"))
534
    shrcnt1 = 5
535
    shrcnt2 = 7
536
    shrcnt3 = 9
537
    shrcnt4 = 2
527 538

  
528
    # More shared acquires
529
    for _ in xrange(5):
530
      self._addThread(target=_Acquire, args=(1, "shared C"))
539
    # Add acquires using threading.Event for synchronization. They'll be
540
    # acquired exactly in the order defined in this list.
541
    acquires = (shrcnt1 * [(1, "shared 1")] +
542
                3 * [(0, "exclusive 1")] +
543
                shrcnt2 * [(1, "shared 2")] +
544
                shrcnt3 * [(1, "shared 3")] +
545
                shrcnt4 * [(1, "shared 4")] +
546
                3 * [(0, "exclusive 2")])
531 547

  
532
    # More exclusive acquires
533
    for _ in xrange(3):
534
      self._addThread(target=_Acquire, args=(0, "exclusive D"))
548
    ev_cur = None
549
    ev_prev = None
550

  
551
    for args in acquires:
552
      ev_cur = threading.Event()
553
      self._addThread(target=_Acquire, args=args + (ev_cur, ev_prev))
554
      ev_prev = ev_cur
555

  
556
    # Wait for last acquire to start
557
    ev_prev.wait()
535 558

  
536 559
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
537
    # together. There's no way to wait for SharedLock.acquire to start
538
    # its work. Hence the timeout of 2 seconds.
539
    pending = 0
540
    end_time = time.time() + 2.0
541
    while time.time() < end_time:
542
      pending = self.sl._count_pending()
543
      self.assert_(pending >= 0 and pending <= 7)
544
      if pending == 7:
545
        break
546
      time.sleep(0.05)
547
    self.assertEqual(pending, 7)
560
    # together
561
    self.assertEqual(self.sl._count_pending(), 7)
548 562

  
549 563
    # Release exclusive lock and wait
550 564
    self.sl.release()
......
552 566
    self._waitThreads()
553 567

  
554 568
    # Check sequence
555
    shr_a = 0
556
    shr_c = 0
557
    for _ in xrange(10):
569
    for _ in range(shrcnt1 + shrcnt2 + shrcnt3 + shrcnt4):
558 570
      # Shared locks aren't guaranteed to be notified in order, but they'll be
559 571
      # first
560 572
      tmp = self.done.get_nowait()
561
      if tmp == "shared A":
562
        shr_a += 1
563
      elif tmp == "shared C":
564
        shr_c += 1
565
    self.assertEqual(shr_a, 5)
566
    self.assertEqual(shr_c, 5)
573
      if tmp == "shared 1":
574
        shrcnt1 -= 1
575
      elif tmp == "shared 2":
576
        shrcnt2 -= 1
577
      elif tmp == "shared 3":
578
        shrcnt3 -= 1
579
      elif tmp == "shared 4":
580
        shrcnt4 -= 1
581
    self.assertEqual(shrcnt1, 0)
582
    self.assertEqual(shrcnt2, 0)
583
    self.assertEqual(shrcnt3, 0)
584
    self.assertEqual(shrcnt3, 0)
567 585

  
568 586
    for _ in xrange(3):
569
      self.assertEqual(self.done.get_nowait(), "exclusive B")
587
      self.assertEqual(self.done.get_nowait(), "exclusive 1")
570 588

  
571 589
    for _ in xrange(3):
572
      self.assertEqual(self.done.get_nowait(), "exclusive D")
590
      self.assertEqual(self.done.get_nowait(), "exclusive 2")
573 591

  
574 592
    self.assertRaises(Queue.Empty, self.done.get_nowait)
575 593

  

Also available in: Unified diff