Revision 887c7aa6 lib/locking.py

b/lib/locking.py
32 32
import errno
33 33
import weakref
34 34
import logging
35
import heapq
35 36

  
36 37
from ganeti import errors
37 38
from ganeti import utils
......
41 42
_EXCLUSIVE_TEXT = "exclusive"
42 43
_SHARED_TEXT = "shared"
43 44

  
45
_DEFAULT_PRIORITY = 0
46

  
44 47

  
45 48
def ssynchronized(mylock, shared=0):
46 49
  """Shared Synchronization decorator.
......
406 409
    return bool(self._waiters)
407 410

  
408 411

  
412
class _PipeConditionWithMode(PipeCondition):
413
  __slots__ = [
414
    "shared",
415
    ]
416

  
417
  def __init__(self, lock, shared):
418
    """Initializes this class.
419

  
420
    """
421
    self.shared = shared
422
    PipeCondition.__init__(self, lock)
423

  
424

  
409 425
class SharedLock(object):
410 426
  """Implements a shared lock.
411 427

  
......
413 429
  acquire_shared().  In order to acquire the lock in an exclusive way threads
414 430
  can call acquire_exclusive().
415 431

  
416
  The lock prevents starvation but does not guarantee that threads will acquire
417
  the shared lock in the order they queued for it, just that they will
418
  eventually do so.
432
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
433
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
434
  ...]}. Each per-priority queue contains a normal in-order list of conditions
435
  to be notified when the lock can be acquired. Shared locks are grouped
436
  together by priority and the condition for them is stored in
437
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
438
  references for the per-priority queues indexed by priority for faster access.
419 439

  
420 440
  @type name: string
421 441
  @ivar name: the name of the lock
......
423 443
  """
424 444
  __slots__ = [
425 445
    "__weakref__",
426
    "__active_shr_c",
427
    "__inactive_shr_c",
428 446
    "__deleted",
429 447
    "__exc",
430 448
    "__lock",
431 449
    "__pending",
450
    "__pending_by_prio",
451
    "__pending_shared",
432 452
    "__shr",
433 453
    "name",
434 454
    ]
435 455

  
436
  __condition_class = PipeCondition
456
  __condition_class = _PipeConditionWithMode
437 457

  
438 458
  def __init__(self, name, monitor=None):
439 459
    """Construct a new SharedLock.
......
452 472

  
453 473
    # Queue containing waiting acquires
454 474
    self.__pending = []
455

  
456
    # Active and inactive conditions for shared locks
457
    self.__active_shr_c = self.__condition_class(self.__lock)
458
    self.__inactive_shr_c = self.__condition_class(self.__lock)
475
    self.__pending_by_prio = {}
476
    self.__pending_shared = {}
459 477

  
460 478
    # Current lock holders
461 479
    self.__shr = set()
......
509 527
        elif fname == "pending":
510 528
          data = []
511 529

  
512
          for cond in self.__pending:
513
            if cond in (self.__active_shr_c, self.__inactive_shr_c):
514
              mode = _SHARED_TEXT
515
            else:
516
              mode = _EXCLUSIVE_TEXT
530
          # Sorting instead of copying and using heaq functions for simplicity
531
          for (_, prioqueue) in sorted(self.__pending):
532
            for cond in prioqueue:
533
              if cond.shared:
534
                mode = _SHARED_TEXT
535
              else:
536
                mode = _EXCLUSIVE_TEXT
517 537

  
518
            # This function should be fast as it runs with the lock held. Hence
519
            # not using utils.NiceSort.
520
            data.append((mode, sorted([i.getName()
521
                                       for i in cond.get_waiting()])))
538
              # This function should be fast as it runs with the lock held.
539
              # Hence not using utils.NiceSort.
540
              data.append((mode, sorted(i.getName()
541
                                        for i in cond.get_waiting())))
522 542

  
523 543
          info.append(data)
524 544
        else:
......
584 604
    """
585 605
    self.__lock.acquire()
586 606
    try:
587
      return len(self.__pending)
607
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
608
    finally:
609
      self.__lock.release()
610

  
611
  def _check_empty(self):
612
    """Checks whether there are any pending acquires.
613

  
614
    @rtype: bool
615

  
616
    """
617
    self.__lock.acquire()
618
    try:
619
      # Order is important: __find_first_pending_queue modifies __pending
620
      return not (self.__find_first_pending_queue() or
621
                  self.__pending or
622
                  self.__pending_by_prio or
623
                  self.__pending_shared)
588 624
    finally:
589 625
      self.__lock.release()
590 626

  
......
606 642
    else:
607 643
      return len(self.__shr) == 0 and self.__exc is None
608 644

  
645
  def __find_first_pending_queue(self):
646
    """Tries to find the topmost queued entry with pending acquires.
647

  
648
    Removes empty entries while going through the list.
649

  
650
    """
651
    while self.__pending:
652
      (priority, prioqueue) = self.__pending[0]
653

  
654
      if not prioqueue:
655
        heapq.heappop(self.__pending)
656
        del self.__pending_by_prio[priority]
657
        assert priority not in self.__pending_shared
658
        continue
659

  
660
      if prioqueue:
661
        return prioqueue
662

  
663
    return None
664

  
609 665
  def __is_on_top(self, cond):
610 666
    """Checks whether the passed condition is on top of the queue.
611 667

  
612 668
    The caller must make sure the queue isn't empty.
613 669

  
614 670
    """
615
    return self.__pending[0] == cond
671
    return cond == self.__find_first_pending_queue()[0]
616 672

  
617
  def __acquire_unlocked(self, shared, timeout):
673
  def __acquire_unlocked(self, shared, timeout, priority):
618 674
    """Acquire a shared lock.
619 675

  
620 676
    @param shared: whether to acquire in shared mode; by default an
621 677
        exclusive lock will be acquired
622 678
    @param timeout: maximum waiting time before giving up
679
    @type priority: integer
680
    @param priority: Priority for acquiring lock
623 681

  
624 682
    """
625 683
    self.__check_deleted()
......
628 686
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
629 687
                                   " %s" % self.name)
630 688

  
689
    # Remove empty entries from queue
690
    self.__find_first_pending_queue()
691

  
631 692
    # Check whether someone else holds the lock or there are pending acquires.
632 693
    if not self.__pending and self.__can_acquire(shared):
633 694
      # Apparently not, can acquire lock directly.
634 695
      self.__do_acquire(shared)
635 696
      return True
636 697

  
637
    if shared:
638
      wait_condition = self.__active_shr_c
698
    prioqueue = self.__pending_by_prio.get(priority, None)
639 699

  
640
      # Check if we're not yet in the queue
641
      if wait_condition not in self.__pending:
642
        self.__pending.append(wait_condition)
700
    if shared:
701
      # Try to re-use condition for shared acquire
702
      wait_condition = self.__pending_shared.get(priority, None)
703
      assert (wait_condition is None or
704
              (wait_condition.shared and wait_condition in prioqueue))
643 705
    else:
644
      wait_condition = self.__condition_class(self.__lock)
645
      # Always add to queue
646
      self.__pending.append(wait_condition)
706
      wait_condition = None
707

  
708
    if wait_condition is None:
709
      if prioqueue is None:
710
        assert priority not in self.__pending_by_prio
711

  
712
        prioqueue = []
713
        heapq.heappush(self.__pending, (priority, prioqueue))
714
        self.__pending_by_prio[priority] = prioqueue
715

  
716
      wait_condition = self.__condition_class(self.__lock, shared)
717
      prioqueue.append(wait_condition)
718

  
719
      if shared:
720
        # Keep reference for further shared acquires on same priority. This is
721
        # better than trying to find it in the list of pending acquires.
722
        assert priority not in self.__pending_shared
723
        self.__pending_shared[priority] = wait_condition
647 724

  
648 725
    try:
649 726
      # Wait until we become the topmost acquire in the queue or the timeout
650 727
      # expires.
728
      # TODO: Decrease timeout with spurious notifications
651 729
      while not (self.__is_on_top(wait_condition) and
652 730
                 self.__can_acquire(shared)):
653 731
        # Wait for notification
......
664 742
        return True
665 743
    finally:
666 744
      # Remove condition from queue if there are no more waiters
667
      if not wait_condition.has_waiting() and not self.__deleted:
668
        self.__pending.remove(wait_condition)
745
      if not wait_condition.has_waiting():
746
        prioqueue.remove(wait_condition)
747
        if wait_condition.shared:
748
          del self.__pending_shared[priority]
669 749

  
670 750
    return False
671 751

  
672
  def acquire(self, shared=0, timeout=None, test_notify=None):
752
  def acquire(self, shared=0, timeout=None, priority=_DEFAULT_PRIORITY,
753
              test_notify=None):
673 754
    """Acquire a shared lock.
674 755

  
675 756
    @type shared: integer (0/1) used as a boolean
......
677 758
        exclusive lock will be acquired
678 759
    @type timeout: float
679 760
    @param timeout: maximum waiting time before giving up
761
    @type priority: integer
762
    @param priority: Priority for acquiring lock
680 763
    @type test_notify: callable or None
681 764
    @param test_notify: Special callback function for unittesting
682 765

  
......
687 770
      if __debug__ and callable(test_notify):
688 771
        test_notify()
689 772

  
690
      return self.__acquire_unlocked(shared, timeout)
773
      return self.__acquire_unlocked(shared, timeout, priority)
691 774
    finally:
692 775
      self.__lock.release()
693 776

  
......
710 793
        self.__shr.remove(threading.currentThread())
711 794

  
712 795
      # Notify topmost condition in queue
713
      if self.__pending:
714
        first_condition = self.__pending[0]
715
        first_condition.notifyAll()
716

  
717
        if first_condition == self.__active_shr_c:
718
          self.__active_shr_c = self.__inactive_shr_c
719
          self.__inactive_shr_c = first_condition
796
      prioqueue = self.__find_first_pending_queue()
797
      if prioqueue:
798
        prioqueue[0].notifyAll()
720 799

  
721 800
    finally:
722 801
      self.__lock.release()
723 802

  
724
  def delete(self, timeout=None):
803
  def delete(self, timeout=None, priority=_DEFAULT_PRIORITY):
725 804
    """Delete a Shared Lock.
726 805

  
727 806
    This operation will declare the lock for removal. First the lock will be
......
730 809

  
731 810
    @type timeout: float
732 811
    @param timeout: maximum waiting time before giving up
812
    @type priority: integer
813
    @param priority: Priority for acquiring lock
733 814

  
734 815
    """
735 816
    self.__lock.acquire()
......
742 823
      acquired = self.__is_exclusive()
743 824

  
744 825
      if not acquired:
745
        acquired = self.__acquire_unlocked(0, timeout)
826
        acquired = self.__acquire_unlocked(0, timeout, priority)
746 827

  
747 828
        assert self.__is_exclusive() and not self.__is_sharer(), \
748 829
          "Lock wasn't acquired in exclusive mode"
......
754 835
        assert not (self.__exc or self.__shr), "Found owner during deletion"
755 836

  
756 837
        # Notify all acquires. They'll throw an error.
757
        while self.__pending:
758
          self.__pending.pop().notifyAll()
838
        for (_, prioqueue) in self.__pending:
839
          for cond in prioqueue:
840
            cond.notifyAll()
841

  
842
        assert self.__deleted
759 843

  
760 844
      return acquired
761 845
    finally:
......
908 992
        self.__lock.release()
909 993
    return set(result)
910 994

  
911
  def acquire(self, names, timeout=None, shared=0, test_notify=None):
995
  def acquire(self, names, timeout=None, shared=0, priority=_DEFAULT_PRIORITY,
996
              test_notify=None):
912 997
    """Acquire a set of resource locks.
913 998

  
914 999
    @type names: list of strings (or string)
......
919 1004
        exclusive lock will be acquired
920 1005
    @type timeout: float or None
921 1006
    @param timeout: Maximum time to acquire all locks
1007
    @type priority: integer
1008
    @param priority: Priority for acquiring locks
922 1009
    @type test_notify: callable or None
923 1010
    @param test_notify: Special callback function for unittesting
924 1011

  
......
945 1032
        if isinstance(names, basestring):
946 1033
          names = [names]
947 1034

  
948
        return self.__acquire_inner(names, False, shared,
1035
        return self.__acquire_inner(names, False, shared, priority,
949 1036
                                    running_timeout.Remaining, test_notify)
950 1037

  
951 1038
      else:
......
954 1041
        # Some of them may then be deleted later, but we'll cope with this.
955 1042
        #
956 1043
        # We'd like to acquire this lock in a shared way, as it's nice if
957
        # everybody else can use the instances at the same time. If are
1044
        # everybody else can use the instances at the same time. If we are
958 1045
        # acquiring them exclusively though they won't be able to do this
959 1046
        # anyway, though, so we'll get the list lock exclusively as well in
960 1047
        # order to be able to do add() on the set while owning it.
961
        if not self.__lock.acquire(shared=shared,
1048
        if not self.__lock.acquire(shared=shared, priority=priority,
962 1049
                                   timeout=running_timeout.Remaining()):
963 1050
          raise _AcquireTimeout()
964 1051
        try:
965 1052
          # note we own the set-lock
966 1053
          self._add_owned()
967 1054

  
968
          return self.__acquire_inner(self.__names(), True, shared,
1055
          return self.__acquire_inner(self.__names(), True, shared, priority,
969 1056
                                      running_timeout.Remaining, test_notify)
970 1057
        except:
971 1058
          # We shouldn't have problems adding the lock to the owners list, but
......
978 1065
    except _AcquireTimeout:
979 1066
      return None
980 1067

  
981
  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
1068
  def __acquire_inner(self, names, want_all, shared, priority,
1069
                      timeout_fn, test_notify):
982 1070
    """Inner logic for acquiring a number of locks.
983 1071

  
984 1072
    @param names: Names of the locks to be acquired
985 1073
    @param want_all: Whether all locks in the set should be acquired
986 1074
    @param shared: Whether to acquire in shared mode
987 1075
    @param timeout_fn: Function returning remaining timeout
1076
    @param priority: Priority for acquiring locks
988 1077
    @param test_notify: Special callback function for unittesting
989 1078

  
990 1079
    """
......
1028 1117
        try:
1029 1118
          # raises LockError if the lock was deleted
1030 1119
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1120
                                     priority=priority,
1031 1121
                                     test_notify=test_notify_fn)
1032 1122
        except errors.LockError:
1033 1123
          if want_all:
......
1146 1236
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1147 1237

  
1148 1238
        if acquired:
1239
          # No need for priority or timeout here as this lock has just been
1240
          # created
1149 1241
          lock.acquire(shared=shared)
1150 1242
          # now the lock cannot be deleted, we have it!
1151 1243
          try:

Also available in: Unified diff