Revision 887c7aa6

b/doc/design-2.3.rst
205 205
a restart or crash of the master daemon.
206 206

  
207 207
Priorities also need to be considered inside the locking library to
208
ensure opcodes with higher priorities get locks first, but the design
209
changes for this will be discussed in a separate section.
208
ensure opcodes with higher priorities get locks first. See
209
:ref:`locking priorities <locking-priorities>` for more details.
210 210

  
211 211
Worker pool
212 212
+++++++++++
......
243 243
With these changes, the job queue will be able to implement per-job
244 244
priorities.
245 245

  
246
.. _locking-priorities:
247

  
248
Locking
249
+++++++
250

  
251
In order to support priorities in Ganeti's own lock classes,
252
``locking.SharedLock`` and ``locking.LockSet``, the internal structure
253
of the former class needs to be changed. The last major change in this
254
area was done for Ganeti 2.1 and can be found in the respective
255
:doc:`design document <design-2.1>`.
256

  
257
The plain list (``[]``) used as a queue is replaced by a heap queue,
258
similar to the `worker pool`_. The heap or priority queue does automatic
259
sorting, thereby automatically taking care of priorities. For each
260
priority there's a plain list with pending acquires, like the single
261
queue of pending acquires before this change.
262

  
263
When the lock is released, the code locates the list of pending acquires
264
for the highest priority waiting. The first condition (index 0) is
265
notified. Once all waiting threads received the notification, the
266
condition is removed from the list. If the list of conditions is empty
267
it's removed from the heap queue.
268

  
269
Like before, shared acquires are grouped and skip ahead of exclusive
270
acquires if there's already an existing shared acquire for a priority.
271
To accomplish this, a separate dictionary of shared acquires per
272
priority is maintained.
273

  
274
To simplify the code and reduce memory consumption, the concept of the
275
"active" and "inactive" condition for shared acquires is abolished. The
276
lock can't predict what priorities the next acquires will use and even
277
keeping a cache can become computationally expensive for arguable
278
benefit (the underlying POSIX pipe, see ``pipe(2)``, needs to be
279
re-created for each notification anyway).
280

  
281
The following diagram shows a possible state of the internal queue from
282
a high-level view. Conditions are shown as (waiting) threads. Assuming
283
no modifications are made to the queue (e.g. more acquires or timeouts),
284
the lock would be acquired by the threads in this order (concurrent
285
acquires in parentheses): ``threadE1``, ``threadE2``, (``threadS1``,
286
``threadS2``, ``threadS3``), (``threadS4``, ``threadS5``), ``threadE3``,
287
``threadS6``, ``threadE4``, ``threadE5``.
288

  
289
::
290

  
291
  [
292
    (0, [exc/threadE1, exc/threadE2, shr/threadS1/threadS2/threadS3]),
293
    (2, [shr/threadS4/threadS5]),
294
    (10, [exc/threadE3]),
295
    (33, [shr/threadS6, exc/threadE4, exc/threadE5]),
296
  ]
297

  
298

  
246 299
IPv6 support
247 300
------------
248 301

  
b/lib/locking.py
32 32
import errno
33 33
import weakref
34 34
import logging
35
import heapq
35 36

  
36 37
from ganeti import errors
37 38
from ganeti import utils
......
41 42
_EXCLUSIVE_TEXT = "exclusive"
42 43
_SHARED_TEXT = "shared"
43 44

  
45
_DEFAULT_PRIORITY = 0
46

  
44 47

  
45 48
def ssynchronized(mylock, shared=0):
46 49
  """Shared Synchronization decorator.
......
406 409
    return bool(self._waiters)
407 410

  
408 411

  
412
class _PipeConditionWithMode(PipeCondition):
413
  __slots__ = [
414
    "shared",
415
    ]
416

  
417
  def __init__(self, lock, shared):
418
    """Initializes this class.
419

  
420
    """
421
    self.shared = shared
422
    PipeCondition.__init__(self, lock)
423

  
424

  
409 425
class SharedLock(object):
410 426
  """Implements a shared lock.
411 427

  
......
413 429
  acquire_shared().  In order to acquire the lock in an exclusive way threads
414 430
  can call acquire_exclusive().
415 431

  
416
  The lock prevents starvation but does not guarantee that threads will acquire
417
  the shared lock in the order they queued for it, just that they will
418
  eventually do so.
432
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
433
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
434
  ...]}. Each per-priority queue contains a normal in-order list of conditions
435
  to be notified when the lock can be acquired. Shared locks are grouped
436
  together by priority and the condition for them is stored in
437
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
438
  references for the per-priority queues indexed by priority for faster access.
419 439

  
420 440
  @type name: string
421 441
  @ivar name: the name of the lock
......
423 443
  """
424 444
  __slots__ = [
425 445
    "__weakref__",
426
    "__active_shr_c",
427
    "__inactive_shr_c",
428 446
    "__deleted",
429 447
    "__exc",
430 448
    "__lock",
431 449
    "__pending",
450
    "__pending_by_prio",
451
    "__pending_shared",
432 452
    "__shr",
433 453
    "name",
434 454
    ]
435 455

  
436
  __condition_class = PipeCondition
456
  __condition_class = _PipeConditionWithMode
437 457

  
438 458
  def __init__(self, name, monitor=None):
439 459
    """Construct a new SharedLock.
......
452 472

  
453 473
    # Queue containing waiting acquires
454 474
    self.__pending = []
455

  
456
    # Active and inactive conditions for shared locks
457
    self.__active_shr_c = self.__condition_class(self.__lock)
458
    self.__inactive_shr_c = self.__condition_class(self.__lock)
475
    self.__pending_by_prio = {}
476
    self.__pending_shared = {}
459 477

  
460 478
    # Current lock holders
461 479
    self.__shr = set()
......
509 527
        elif fname == "pending":
510 528
          data = []
511 529

  
512
          for cond in self.__pending:
513
            if cond in (self.__active_shr_c, self.__inactive_shr_c):
514
              mode = _SHARED_TEXT
515
            else:
516
              mode = _EXCLUSIVE_TEXT
530
          # Sorting instead of copying and using heaq functions for simplicity
531
          for (_, prioqueue) in sorted(self.__pending):
532
            for cond in prioqueue:
533
              if cond.shared:
534
                mode = _SHARED_TEXT
535
              else:
536
                mode = _EXCLUSIVE_TEXT
517 537

  
518
            # This function should be fast as it runs with the lock held. Hence
519
            # not using utils.NiceSort.
520
            data.append((mode, sorted([i.getName()
521
                                       for i in cond.get_waiting()])))
538
              # This function should be fast as it runs with the lock held.
539
              # Hence not using utils.NiceSort.
540
              data.append((mode, sorted(i.getName()
541
                                        for i in cond.get_waiting())))
522 542

  
523 543
          info.append(data)
524 544
        else:
......
584 604
    """
585 605
    self.__lock.acquire()
586 606
    try:
587
      return len(self.__pending)
607
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
608
    finally:
609
      self.__lock.release()
610

  
611
  def _check_empty(self):
612
    """Checks whether there are any pending acquires.
613

  
614
    @rtype: bool
615

  
616
    """
617
    self.__lock.acquire()
618
    try:
619
      # Order is important: __find_first_pending_queue modifies __pending
620
      return not (self.__find_first_pending_queue() or
621
                  self.__pending or
622
                  self.__pending_by_prio or
623
                  self.__pending_shared)
588 624
    finally:
589 625
      self.__lock.release()
590 626

  
......
606 642
    else:
607 643
      return len(self.__shr) == 0 and self.__exc is None
608 644

  
645
  def __find_first_pending_queue(self):
646
    """Tries to find the topmost queued entry with pending acquires.
647

  
648
    Removes empty entries while going through the list.
649

  
650
    """
651
    while self.__pending:
652
      (priority, prioqueue) = self.__pending[0]
653

  
654
      if not prioqueue:
655
        heapq.heappop(self.__pending)
656
        del self.__pending_by_prio[priority]
657
        assert priority not in self.__pending_shared
658
        continue
659

  
660
      if prioqueue:
661
        return prioqueue
662

  
663
    return None
664

  
609 665
  def __is_on_top(self, cond):
610 666
    """Checks whether the passed condition is on top of the queue.
611 667

  
612 668
    The caller must make sure the queue isn't empty.
613 669

  
614 670
    """
615
    return self.__pending[0] == cond
671
    return cond == self.__find_first_pending_queue()[0]
616 672

  
617
  def __acquire_unlocked(self, shared, timeout):
673
  def __acquire_unlocked(self, shared, timeout, priority):
618 674
    """Acquire a shared lock.
619 675

  
620 676
    @param shared: whether to acquire in shared mode; by default an
621 677
        exclusive lock will be acquired
622 678
    @param timeout: maximum waiting time before giving up
679
    @type priority: integer
680
    @param priority: Priority for acquiring lock
623 681

  
624 682
    """
625 683
    self.__check_deleted()
......
628 686
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
629 687
                                   " %s" % self.name)
630 688

  
689
    # Remove empty entries from queue
690
    self.__find_first_pending_queue()
691

  
631 692
    # Check whether someone else holds the lock or there are pending acquires.
632 693
    if not self.__pending and self.__can_acquire(shared):
633 694
      # Apparently not, can acquire lock directly.
634 695
      self.__do_acquire(shared)
635 696
      return True
636 697

  
637
    if shared:
638
      wait_condition = self.__active_shr_c
698
    prioqueue = self.__pending_by_prio.get(priority, None)
639 699

  
640
      # Check if we're not yet in the queue
641
      if wait_condition not in self.__pending:
642
        self.__pending.append(wait_condition)
700
    if shared:
701
      # Try to re-use condition for shared acquire
702
      wait_condition = self.__pending_shared.get(priority, None)
703
      assert (wait_condition is None or
704
              (wait_condition.shared and wait_condition in prioqueue))
643 705
    else:
644
      wait_condition = self.__condition_class(self.__lock)
645
      # Always add to queue
646
      self.__pending.append(wait_condition)
706
      wait_condition = None
707

  
708
    if wait_condition is None:
709
      if prioqueue is None:
710
        assert priority not in self.__pending_by_prio
711

  
712
        prioqueue = []
713
        heapq.heappush(self.__pending, (priority, prioqueue))
714
        self.__pending_by_prio[priority] = prioqueue
715

  
716
      wait_condition = self.__condition_class(self.__lock, shared)
717
      prioqueue.append(wait_condition)
718

  
719
      if shared:
720
        # Keep reference for further shared acquires on same priority. This is
721
        # better than trying to find it in the list of pending acquires.
722
        assert priority not in self.__pending_shared
723
        self.__pending_shared[priority] = wait_condition
647 724

  
648 725
    try:
649 726
      # Wait until we become the topmost acquire in the queue or the timeout
650 727
      # expires.
728
      # TODO: Decrease timeout with spurious notifications
651 729
      while not (self.__is_on_top(wait_condition) and
652 730
                 self.__can_acquire(shared)):
653 731
        # Wait for notification
......
664 742
        return True
665 743
    finally:
666 744
      # Remove condition from queue if there are no more waiters
667
      if not wait_condition.has_waiting() and not self.__deleted:
668
        self.__pending.remove(wait_condition)
745
      if not wait_condition.has_waiting():
746
        prioqueue.remove(wait_condition)
747
        if wait_condition.shared:
748
          del self.__pending_shared[priority]
669 749

  
670 750
    return False
671 751

  
672
  def acquire(self, shared=0, timeout=None, test_notify=None):
752
  def acquire(self, shared=0, timeout=None, priority=_DEFAULT_PRIORITY,
753
              test_notify=None):
673 754
    """Acquire a shared lock.
674 755

  
675 756
    @type shared: integer (0/1) used as a boolean
......
677 758
        exclusive lock will be acquired
678 759
    @type timeout: float
679 760
    @param timeout: maximum waiting time before giving up
761
    @type priority: integer
762
    @param priority: Priority for acquiring lock
680 763
    @type test_notify: callable or None
681 764
    @param test_notify: Special callback function for unittesting
682 765

  
......
687 770
      if __debug__ and callable(test_notify):
688 771
        test_notify()
689 772

  
690
      return self.__acquire_unlocked(shared, timeout)
773
      return self.__acquire_unlocked(shared, timeout, priority)
691 774
    finally:
692 775
      self.__lock.release()
693 776

  
......
710 793
        self.__shr.remove(threading.currentThread())
711 794

  
712 795
      # Notify topmost condition in queue
713
      if self.__pending:
714
        first_condition = self.__pending[0]
715
        first_condition.notifyAll()
716

  
717
        if first_condition == self.__active_shr_c:
718
          self.__active_shr_c = self.__inactive_shr_c
719
          self.__inactive_shr_c = first_condition
796
      prioqueue = self.__find_first_pending_queue()
797
      if prioqueue:
798
        prioqueue[0].notifyAll()
720 799

  
721 800
    finally:
722 801
      self.__lock.release()
723 802

  
724
  def delete(self, timeout=None):
803
  def delete(self, timeout=None, priority=_DEFAULT_PRIORITY):
725 804
    """Delete a Shared Lock.
726 805

  
727 806
    This operation will declare the lock for removal. First the lock will be
......
730 809

  
731 810
    @type timeout: float
732 811
    @param timeout: maximum waiting time before giving up
812
    @type priority: integer
813
    @param priority: Priority for acquiring lock
733 814

  
734 815
    """
735 816
    self.__lock.acquire()
......
742 823
      acquired = self.__is_exclusive()
743 824

  
744 825
      if not acquired:
745
        acquired = self.__acquire_unlocked(0, timeout)
826
        acquired = self.__acquire_unlocked(0, timeout, priority)
746 827

  
747 828
        assert self.__is_exclusive() and not self.__is_sharer(), \
748 829
          "Lock wasn't acquired in exclusive mode"
......
754 835
        assert not (self.__exc or self.__shr), "Found owner during deletion"
755 836

  
756 837
        # Notify all acquires. They'll throw an error.
757
        while self.__pending:
758
          self.__pending.pop().notifyAll()
838
        for (_, prioqueue) in self.__pending:
839
          for cond in prioqueue:
840
            cond.notifyAll()
841

  
842
        assert self.__deleted
759 843

  
760 844
      return acquired
761 845
    finally:
......
908 992
        self.__lock.release()
909 993
    return set(result)
910 994

  
911
  def acquire(self, names, timeout=None, shared=0, test_notify=None):
995
  def acquire(self, names, timeout=None, shared=0, priority=_DEFAULT_PRIORITY,
996
              test_notify=None):
912 997
    """Acquire a set of resource locks.
913 998

  
914 999
    @type names: list of strings (or string)
......
919 1004
        exclusive lock will be acquired
920 1005
    @type timeout: float or None
921 1006
    @param timeout: Maximum time to acquire all locks
1007
    @type priority: integer
1008
    @param priority: Priority for acquiring locks
922 1009
    @type test_notify: callable or None
923 1010
    @param test_notify: Special callback function for unittesting
924 1011

  
......
945 1032
        if isinstance(names, basestring):
946 1033
          names = [names]
947 1034

  
948
        return self.__acquire_inner(names, False, shared,
1035
        return self.__acquire_inner(names, False, shared, priority,
949 1036
                                    running_timeout.Remaining, test_notify)
950 1037

  
951 1038
      else:
......
954 1041
        # Some of them may then be deleted later, but we'll cope with this.
955 1042
        #
956 1043
        # We'd like to acquire this lock in a shared way, as it's nice if
957
        # everybody else can use the instances at the same time. If are
1044
        # everybody else can use the instances at the same time. If we are
958 1045
        # acquiring them exclusively though they won't be able to do this
959 1046
        # anyway, though, so we'll get the list lock exclusively as well in
960 1047
        # order to be able to do add() on the set while owning it.
961
        if not self.__lock.acquire(shared=shared,
1048
        if not self.__lock.acquire(shared=shared, priority=priority,
962 1049
                                   timeout=running_timeout.Remaining()):
963 1050
          raise _AcquireTimeout()
964 1051
        try:
965 1052
          # note we own the set-lock
966 1053
          self._add_owned()
967 1054

  
968
          return self.__acquire_inner(self.__names(), True, shared,
1055
          return self.__acquire_inner(self.__names(), True, shared, priority,
969 1056
                                      running_timeout.Remaining, test_notify)
970 1057
        except:
971 1058
          # We shouldn't have problems adding the lock to the owners list, but
......
978 1065
    except _AcquireTimeout:
979 1066
      return None
980 1067

  
981
  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
1068
  def __acquire_inner(self, names, want_all, shared, priority,
1069
                      timeout_fn, test_notify):
982 1070
    """Inner logic for acquiring a number of locks.
983 1071

  
984 1072
    @param names: Names of the locks to be acquired
985 1073
    @param want_all: Whether all locks in the set should be acquired
986 1074
    @param shared: Whether to acquire in shared mode
987 1075
    @param timeout_fn: Function returning remaining timeout
1076
    @param priority: Priority for acquiring locks
988 1077
    @param test_notify: Special callback function for unittesting
989 1078

  
990 1079
    """
......
1028 1117
        try:
1029 1118
          # raises LockError if the lock was deleted
1030 1119
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1120
                                     priority=priority,
1031 1121
                                     test_notify=test_notify_fn)
1032 1122
        except errors.LockError:
1033 1123
          if want_all:
......
1146 1236
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1147 1237

  
1148 1238
        if acquired:
1239
          # No need for priority or timeout here as this lock has just been
1240
          # created
1149 1241
          lock.acquire(shared=shared)
1150 1242
          # now the lock cannot be deleted, we have it!
1151 1243
          try:
b/test/ganeti.locking_unittest.py
28 28
import Queue
29 29
import threading
30 30
import random
31
import itertools
31 32

  
32 33
from ganeti import locking
33 34
from ganeti import errors
34 35
from ganeti import utils
36
from ganeti import compat
35 37

  
36 38
import testutils
37 39

  
......
701 703

  
702 704
    self.assertRaises(Queue.Empty, self.done.get_nowait)
703 705

  
706
  def testPriority(self):
707
    # Acquire in exclusive mode
708
    self.assert_(self.sl.acquire(shared=0))
709

  
710
    # Queue acquires
711
    def _Acquire(prev, next, shared, priority, result):
712
      prev.wait()
713
      self.sl.acquire(shared=shared, priority=priority, test_notify=next.set)
714
      try:
715
        self.done.put(result)
716
      finally:
717
        self.sl.release()
718

  
719
    counter = itertools.count(0)
720
    priorities = range(-20, 30)
721
    first = threading.Event()
722
    prev = first
723

  
724
    # Data structure:
725
    # {
726
    #   priority:
727
    #     [(shared/exclusive, set(acquire names), set(pending threads)),
728
    #      (shared/exclusive, ...),
729
    #      ...,
730
    #     ],
731
    # }
732
    perprio = {}
733

  
734
    # References shared acquire per priority in L{perprio}. Data structure:
735
    # {
736
    #   priority: (shared=1, set(acquire names), set(pending threads)),
737
    # }
738
    prioshared = {}
739

  
740
    for seed in [4979, 9523, 14902, 32440]:
741
      # Use a deterministic random generator
742
      rnd = random.Random(seed)
743
      for priority in [rnd.choice(priorities) for _ in range(30)]:
744
        modes = [0, 1]
745
        rnd.shuffle(modes)
746
        for shared in modes:
747
          # Unique name
748
          acqname = "%s/shr=%s/prio=%s" % (counter.next(), shared, priority)
749

  
750
          ev = threading.Event()
751
          thread = self._addThread(target=_Acquire,
752
                                   args=(prev, ev, shared, priority, acqname))
753
          prev = ev
754

  
755
          # Record expected aqcuire, see above for structure
756
          data = (shared, set([acqname]), set([thread]))
757
          priolist = perprio.setdefault(priority, [])
758
          if shared:
759
            priosh = prioshared.get(priority, None)
760
            if priosh:
761
              # Shared acquires are merged
762
              for i, j in zip(priosh[1:], data[1:]):
763
                i.update(j)
764
              assert data[0] == priosh[0]
765
            else:
766
              prioshared[priority] = data
767
              priolist.append(data)
768
          else:
769
            priolist.append(data)
770

  
771
    # Start all acquires and wait for them
772
    first.set()
773
    prev.wait()
774

  
775
    # Check lock information
776
    self.assertEqual(self.sl.GetInfo(["name"]), [self.sl.name])
777
    self.assertEqual(self.sl.GetInfo(["mode", "owner"]),
778
                     ["exclusive", [threading.currentThread().getName()]])
779
    self.assertEqual(self.sl.GetInfo(["name", "pending"]),
780
                     [self.sl.name,
781
                      [(["exclusive", "shared"][int(bool(shared))],
782
                        sorted([t.getName() for t in threads]))
783
                       for acquires in [perprio[i]
784
                                        for i in sorted(perprio.keys())]
785
                       for (shared, _, threads) in acquires]])
786

  
787
    # Let threads acquire the lock
788
    self.sl.release()
789

  
790
    # Wait for everything to finish
791
    self._waitThreads()
792

  
793
    self.assert_(self.sl._check_empty())
794

  
795
    # Check acquires by priority
796
    for acquires in [perprio[i] for i in sorted(perprio.keys())]:
797
      for (_, names, _) in acquires:
798
        # For shared acquires, the set will contain 1..n entries. For exclusive
799
        # acquires only one.
800
        while names:
801
          names.remove(self.done.get_nowait())
802
      self.assertFalse(compat.any(names for (_, names, _) in acquires))
803

  
804
    self.assertRaises(Queue.Empty, self.done.get_nowait)
805

  
704 806

  
705 807
class TestSharedLockInCondition(_ThreadedTestCase):
706 808
  """SharedLock as a condition lock tests"""
......
1259 1361
    self.assertEqual(self.done.get_nowait(), 'DONE')
1260 1362
    self._setUpLS()
1261 1363

  
1364
  def testPriority(self):
1365
    def _Acquire(prev, next, name, priority, success_fn):
1366
      prev.wait()
1367
      self.assert_(self.ls.acquire(name, shared=0,
1368
                                   priority=priority,
1369
                                   test_notify=lambda _: next.set()))
1370
      try:
1371
        success_fn()
1372
      finally:
1373
        self.ls.release()
1374

  
1375
    # Get all in exclusive mode
1376
    self.assert_(self.ls.acquire(locking.ALL_SET, shared=0))
1377

  
1378
    done_two = Queue.Queue(0)
1379

  
1380
    first = threading.Event()
1381
    prev = first
1382

  
1383
    acquires = [("one", prio, self.done) for prio in range(1, 33)]
1384
    acquires.extend([("two", prio, done_two) for prio in range(1, 33)])
1385

  
1386
    # Use a deterministic random generator
1387
    random.Random(741).shuffle(acquires)
1388

  
1389
    for (name, prio, done) in acquires:
1390
      ev = threading.Event()
1391
      self._addThread(target=_Acquire,
1392
                      args=(prev, ev, name, prio,
1393
                            compat.partial(done.put, "Prio%s" % prio)))
1394
      prev = ev
1395

  
1396
    # Start acquires
1397
    first.set()
1398

  
1399
    # Wait for last acquire to start
1400
    prev.wait()
1401

  
1402
    # Let threads acquire locks
1403
    self.ls.release()
1404

  
1405
    # Wait for threads to finish
1406
    self._waitThreads()
1407

  
1408
    for i in range(1, 33):
1409
      self.assertEqual(self.done.get_nowait(), "Prio%s" % i)
1410
      self.assertEqual(done_two.get_nowait(), "Prio%s" % i)
1411

  
1412
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1413
    self.assertRaises(Queue.Empty, done_two.get_nowait)
1414

  
1262 1415

  
1263 1416
class TestGanetiLockManager(_ThreadedTestCase):
1264 1417

  

Also available in: Unified diff