Revision a95c53ea

b/lib/locking.py
53 53

  
54 54
# Internal lock acquisition modes for L{LockSet}
55 55
(_LS_ACQUIRE_EXACT,
56
 _LS_ACQUIRE_ALL) = range(1, 3)
56
 _LS_ACQUIRE_ALL,
57
 _LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4)
58

  
59
_LS_ACQUIRE_MODES = frozenset([
60
  _LS_ACQUIRE_EXACT,
61
  _LS_ACQUIRE_ALL,
62
  _LS_ACQUIRE_OPPORTUNISTIC,
63
  ])
57 64

  
58 65

  
59 66
def ssynchronized(mylock, shared=0):
......
917 924
ALL_SET = None
918 925

  
919 926

  
927
def _TimeoutZero():
928
  """Returns the number zero.
929

  
930
  """
931
  return 0
932

  
933

  
934
def _GetLsAcquireModeAndTimeouts(want_all, timeout, opportunistic):
935
  """Determines modes and timeouts for L{LockSet.acquire}.
936

  
937
  @type want_all: boolean
938
  @param want_all: Whether all locks in set should be acquired
939
  @param timeout: Timeout in seconds or C{None}
940
  @param opportunistic: Whther locks should be acquired opportunistically
941
  @rtype: tuple
942
  @return: Tuple containing mode to be passed to L{LockSet.__acquire_inner}
943
    (one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for
944
    acquiring the lockset-internal lock (might be C{None}) and a function to
945
    calculate the timeout for acquiring individual locks
946

  
947
  """
948
  # Short circuit when no running timeout is needed
949
  if opportunistic and not want_all:
950
    assert timeout is None, "Got timeout for an opportunistic acquisition"
951
    return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero)
952

  
953
  # We need to keep track of how long we spent waiting for a lock. The
954
  # timeout passed to this function is over all lock acquisitions.
955
  running_timeout = utils.RunningTimeout(timeout, False)
956

  
957
  if want_all:
958
    mode = _LS_ACQUIRE_ALL
959
    ls_timeout_fn = running_timeout.Remaining
960
  else:
961
    mode = _LS_ACQUIRE_EXACT
962
    ls_timeout_fn = None
963

  
964
  if opportunistic:
965
    mode = _LS_ACQUIRE_OPPORTUNISTIC
966
    timeout_fn = _TimeoutZero
967
  else:
968
    timeout_fn = running_timeout.Remaining
969

  
970
  return (mode, ls_timeout_fn, timeout_fn)
971

  
972

  
920 973
class _AcquireTimeout(Exception):
921 974
  """Internal exception to abort an acquire on a timeout.
922 975

  
......
1114 1167
    return set(result)
1115 1168

  
1116 1169
  def acquire(self, names, timeout=None, shared=0, priority=None,
1117
              test_notify=None):
1170
              opportunistic=False, test_notify=None):
1118 1171
    """Acquire a set of resource locks.
1119 1172

  
1173
    @note: When acquiring locks opportunistically, any number of locks might
1174
      actually be acquired, even zero.
1175

  
1120 1176
    @type names: list of strings (or string)
1121 1177
    @param names: the names of the locks which shall be acquired
1122 1178
        (special lock names, or instance/node names)
......
1124 1180
    @param shared: whether to acquire in shared mode; by default an
1125 1181
        exclusive lock will be acquired
1126 1182
    @type timeout: float or None
1127
    @param timeout: Maximum time to acquire all locks
1183
    @param timeout: Maximum time to acquire all locks; for opportunistic
1184
      acquisitions, a timeout can only be given when C{names} is C{None}, in
1185
      which case it is exclusively used for acquiring the L{LockSet}-internal
1186
      lock; opportunistic acquisitions don't use a timeout for acquiring
1187
      individual locks
1128 1188
    @type priority: integer
1129 1189
    @param priority: Priority for acquiring locks
1190
    @type opportunistic: boolean
1191
    @param opportunistic: Acquire locks opportunistically; use the return value
1192
      to determine which locks were actually acquired
1130 1193
    @type test_notify: callable or None
1131 1194
    @param test_notify: Special callback function for unittesting
1132 1195

  
......
1146 1209
    if priority is None:
1147 1210
      priority = _DEFAULT_PRIORITY
1148 1211

  
1149
    # We need to keep track of how long we spent waiting for a lock. The
1150
    # timeout passed to this function is over all lock acquires.
1151
    running_timeout = utils.RunningTimeout(timeout, False)
1152

  
1153 1212
    try:
1154 1213
      if names is not None:
1214
        assert timeout is None or not opportunistic, \
1215
          ("Opportunistic acquisitions can only use a timeout if no"
1216
           " names are given; see docstring for details")
1217

  
1155 1218
        # Support passing in a single resource to acquire rather than many
1156 1219
        if isinstance(names, basestring):
1157 1220
          names = [names]
1158 1221

  
1159
        return self.__acquire_inner(names, _LS_ACQUIRE_EXACT, shared, priority,
1160
                                    running_timeout.Remaining, test_notify)
1222
        (mode, _, timeout_fn) = \
1223
          _GetLsAcquireModeAndTimeouts(False, timeout, opportunistic)
1224

  
1225
        return self.__acquire_inner(names, mode, shared, priority,
1226
                                    timeout_fn, test_notify)
1161 1227

  
1162 1228
      else:
1229
        (mode, ls_timeout_fn, timeout_fn) = \
1230
          _GetLsAcquireModeAndTimeouts(True, timeout, opportunistic)
1231

  
1163 1232
        # If no names are given acquire the whole set by not letting new names
1164 1233
        # being added before we release, and getting the current list of names.
1165 1234
        # Some of them may then be deleted later, but we'll cope with this.
......
1170 1239
        # anyway, though, so we'll get the list lock exclusively as well in
1171 1240
        # order to be able to do add() on the set while owning it.
1172 1241
        if not self.__lock.acquire(shared=shared, priority=priority,
1173
                                   timeout=running_timeout.Remaining()):
1242
                                   timeout=ls_timeout_fn()):
1174 1243
          raise _AcquireTimeout()
1244

  
1175 1245
        try:
1176 1246
          # note we own the set-lock
1177 1247
          self._add_owned()
1178 1248

  
1179
          return self.__acquire_inner(self.__names(), _LS_ACQUIRE_ALL, shared,
1180
                                      priority, running_timeout.Remaining,
1181
                                      test_notify)
1249
          return self.__acquire_inner(self.__names(), mode, shared,
1250
                                      priority, timeout_fn, test_notify)
1182 1251
        except:
1183 1252
          # We shouldn't have problems adding the lock to the owners list, but
1184 1253
          # if we did we'll try to release this lock and re-raise exception.
......
1194 1263
                      timeout_fn, test_notify):
1195 1264
    """Inner logic for acquiring a number of locks.
1196 1265

  
1266
    Acquisition modes:
1267

  
1268
      - C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but
1269
        deleted locks can be ignored as the whole set is being acquired with
1270
        its internal lock held
1271
      - C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired;
1272
        timeouts and deleted locks are fatal
1273
      - C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially
1274
        all within the set) which should be acquired opportunistically, that is
1275
        failures are ignored
1276

  
1197 1277
    @param names: Names of the locks to be acquired
1198
    @param mode: Lock acquisition mode
1278
    @param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES})
1199 1279
    @param shared: Whether to acquire in shared mode
1200
    @param timeout_fn: Function returning remaining timeout
1280
    @param timeout_fn: Function returning remaining timeout (C{None} for
1281
      opportunistic acquisitions)
1201 1282
    @param priority: Priority for acquiring locks
1202 1283
    @param test_notify: Special callback function for unittesting
1203 1284

  
1204 1285
    """
1205
    assert mode in (_LS_ACQUIRE_EXACT, _LS_ACQUIRE_ALL)
1286
    assert mode in _LS_ACQUIRE_MODES
1206 1287

  
1207 1288
    acquire_list = []
1208 1289

  
......
1246 1327
                                     priority=priority,
1247 1328
                                     test_notify=test_notify_fn)
1248 1329
        except errors.LockError:
1249
          if mode == _LS_ACQUIRE_ALL:
1330
          if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC):
1250 1331
            # We are acquiring the whole set, it doesn't matter if this
1251 1332
            # particular element is not there anymore.
1252 1333
            continue
......
1256 1337

  
1257 1338
        if not acq_success:
1258 1339
          # Couldn't get lock or timeout occurred
1340
          if mode == _LS_ACQUIRE_OPPORTUNISTIC:
1341
            # Ignore timeouts on opportunistic acquisitions
1342
            continue
1343

  
1259 1344
          if timeout is None:
1260 1345
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1261 1346
            # blocking.
b/test/ganeti.locking_unittest.py
1780 1780
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1781 1781
    self.assertRaises(Queue.Empty, done_two.get_nowait)
1782 1782

  
1783
  def testNamesWithOpportunisticAndTimeout(self):
1784
    self.assertRaises(AssertionError, self.ls.acquire,
1785
                      ["one", "two"], timeout=1.0, opportunistic=True)
1786

  
1787
  def testOpportunisticWithUnknownName(self):
1788
    name = "unknown"
1789
    self.assertFalse(name in self.ls._names())
1790
    result = self.ls.acquire(name, opportunistic=True)
1791
    self.assertFalse(result)
1792
    self.assertFalse(self.ls.list_owned())
1793

  
1794
    result = self.ls.acquire(["two", name], opportunistic=True)
1795
    self.assertEqual(result, set(["two"]))
1796
    self.assertEqual(self.ls.list_owned(), set(["two"]))
1797

  
1798
    self.ls.release()
1799

  
1800
  def testSimpleOpportunisticAcquisition(self):
1801
    self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
1802

  
1803
    # Hold a lock in main thread
1804
    self.assertEqual(self.ls.acquire("two", shared=0), set(["two"]))
1805

  
1806
    def fn():
1807
      # The lock "two" is held by the main thread
1808
      result = self.ls.acquire(["one", "two"], shared=0, opportunistic=True)
1809
      self.assertEqual(result, set(["one"]))
1810
      self.assertEqual(self.ls.list_owned(), set(["one"]))
1811
      self.assertFalse(self.ls._get_lock().is_owned())
1812

  
1813
      self.ls.release()
1814
      self.assertFalse(self.ls.list_owned())
1815

  
1816
      # Try to acquire the lock held by the main thread
1817
      result = self.ls.acquire(["two"], shared=0, opportunistic=True)
1818
      self.assertFalse(self.ls._get_lock().is_owned())
1819
      self.assertFalse(result)
1820
      self.assertFalse(self.ls.list_owned())
1821

  
1822
      # Try to acquire all locks
1823
      result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True)
1824
      self.assertTrue(self.ls._get_lock().is_owned(),
1825
                      msg="Internal lock is not owned")
1826
      self.assertEqual(result, set(["one", "three"]))
1827
      self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
1828

  
1829
      self.ls.release()
1830

  
1831
      self.assertFalse(self.ls.list_owned())
1832

  
1833
      self.done.put(True)
1834

  
1835
    self._addThread(target=fn)
1836

  
1837
    # Wait for threads to finish
1838
    self._waitThreads()
1839

  
1840
    self.assertEqual(self.ls.list_owned(), set(["two"]))
1841

  
1842
    self.ls.release()
1843
    self.assertFalse(self.ls.list_owned())
1844
    self.assertFalse(self.ls._get_lock().is_owned())
1845

  
1846
    self.assertTrue(self.done.get_nowait())
1847
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1848

  
1849
  def testOpportunisticAcquisitionWithoutNamesExpires(self):
1850
    self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
1851

  
1852
    # Hold all locks in main thread
1853
    self.ls.acquire(locking.ALL_SET, shared=0)
1854
    self.assertTrue(self.ls._get_lock().is_owned())
1855

  
1856
    def fn():
1857
      # Try to acquire all locks in separate thread
1858
      result = self.ls.acquire(locking.ALL_SET, shared=0, opportunistic=True,
1859
                               timeout=0.1)
1860
      self.assertFalse(result)
1861
      self.assertFalse(self.ls._get_lock().is_owned())
1862
      self.assertFalse(self.ls.list_owned())
1863

  
1864
      # Try once more without a timeout
1865
      self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
1866

  
1867
      self.done.put(True)
1868

  
1869
    self._addThread(target=fn)
1870

  
1871
    # Wait for threads to finish
1872
    self._waitThreads()
1873

  
1874
    self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
1875

  
1876
    self.ls.release()
1877
    self.assertFalse(self.ls.list_owned())
1878
    self.assertFalse(self.ls._get_lock().is_owned(shared=0))
1879

  
1880
    self.assertTrue(self.done.get_nowait())
1881
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1882

  
1883
  def testSharedOpportunisticAcquisitionWithoutNames(self):
1884
    self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
1885

  
1886
    # Hold all locks in main thread
1887
    self.ls.acquire(locking.ALL_SET, shared=1)
1888
    self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1889

  
1890
    def fn():
1891
      # Try to acquire all locks in separate thread in shared mode
1892
      result = self.ls.acquire(locking.ALL_SET, shared=1, opportunistic=True,
1893
                               timeout=0.1)
1894
      self.assertEqual(result, set(["one", "two", "three"]))
1895
      self.assertTrue(self.ls._get_lock().is_owned(shared=1))
1896
      self.ls.release()
1897
      self.assertFalse(self.ls._get_lock().is_owned())
1898

  
1899
      # Try one in exclusive mode
1900
      self.assertFalse(self.ls.acquire("one", shared=0, opportunistic=True))
1901

  
1902
      self.done.put(True)
1903

  
1904
    self._addThread(target=fn)
1905

  
1906
    # Wait for threads to finish
1907
    self._waitThreads()
1908

  
1909
    self.assertEqual(self.ls.list_owned(), set(["one", "two", "three"]))
1910

  
1911
    self.ls.release()
1912
    self.assertFalse(self.ls.list_owned())
1913
    self.assertFalse(self.ls._get_lock().is_owned())
1914

  
1915
    self.assertTrue(self.done.get_nowait())
1916
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1917

  
1918
  def testLockDeleteWithOpportunisticAcquisition(self):
1919
    # This test exercises some code handling LockError on acquisition, that is
1920
    # after all lock names have been gathered. This shouldn't happen in reality
1921
    # as removing locks from the set requires the lockset-internal lock, but
1922
    # the code should handle the situation anyway.
1923
    ready = threading.Event()
1924
    finished = threading.Event()
1925

  
1926
    self.assertEquals(self.ls._names(), set(["one", "two", "three"]))
1927

  
1928
    # Thread function to delete lock
1929
    def fn():
1930
      # Wait for notification
1931
      ready.wait()
1932

  
1933
      # Delete lock named "two" by accessing lockset-internal data
1934
      ld = self.ls._get_lockdict()
1935
      self.assertTrue(ld["two"].delete())
1936

  
1937
      self.done.put("deleted.two")
1938

  
1939
      # Notify helper
1940
      finished.set()
1941

  
1942
    self._addThread(target=fn)
1943

  
1944
    # Notification helper, called when lock already holds internal lock.
1945
    # Therefore only one of the locks not yet locked can be deleted.
1946
    def notify(name):
1947
      self.done.put("notify.%s" % name)
1948

  
1949
      if name == "one":
1950
        # Tell helper thread to delete lock "two"
1951
        ready.set()
1952
        finished.wait()
1953

  
1954
    # Hold all locks in main thread
1955
    self.ls.acquire(locking.ALL_SET, shared=0, test_notify=notify)
1956
    self.assertEqual(self.ls.list_owned(), set(["one", "three"]))
1957

  
1958
    # Wait for threads to finish
1959
    self._waitThreads()
1960

  
1961
    # Release all locks
1962
    self.ls.release()
1963
    self.assertFalse(self.ls.list_owned())
1964
    self.assertFalse(self.ls._get_lock().is_owned())
1965

  
1966
    self.assertEqual(self.done.get_nowait(), "notify.one")
1967
    self.assertEqual(self.done.get_nowait(), "deleted.two")
1968
    self.assertEqual(self.done.get_nowait(), "notify.three")
1969
    self.assertEqual(self.done.get_nowait(), "notify.two")
1970
    self.assertRaises(Queue.Empty, self.done.get_nowait)
1971

  
1972

  
1973
class TestGetLsAcquireModeAndTimeouts(unittest.TestCase):
1974
  def setUp(self):
1975
    self.fn = locking._GetLsAcquireModeAndTimeouts
1976

  
1977
  def testOpportunisticWithoutNames(self):
1978
    (mode, ls_timeout_fn, timeout_fn) = self.fn(False, None, True)
1979
    self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
1980
    self.assertTrue(ls_timeout_fn is None)
1981
    self.assertEqual(timeout_fn(), 0)
1982

  
1983
  def testAllInputCombinations(self):
1984
    for want_all in [False, True]:
1985
      for timeout in [None, 0, 100]:
1986
        for opportunistic in [False, True]:
1987
          if (opportunistic and
1988
              not want_all and
1989
              timeout is not None):
1990
            # Can't accept a timeout when acquiring opportunistically
1991
            self.assertRaises(AssertionError, self.fn,
1992
                              want_all, timeout, opportunistic)
1993
          else:
1994
            (mode, ls_timeout_fn, timeout_fn) = \
1995
              self.fn(want_all, timeout, opportunistic)
1996

  
1997
            if opportunistic:
1998
              self.assertEqual(mode, locking._LS_ACQUIRE_OPPORTUNISTIC)
1999
              self.assertEqual(timeout_fn(), 0)
2000
            else:
2001
              self.assertTrue(callable(timeout_fn))
2002
              if want_all:
2003
                self.assertEqual(mode, locking._LS_ACQUIRE_ALL)
2004
              else:
2005
                self.assertEqual(mode, locking._LS_ACQUIRE_EXACT)
2006

  
2007
            if want_all:
2008
              self.assertTrue(callable(ls_timeout_fn))
2009
            else:
2010
              self.assertTrue(ls_timeout_fn is None)
2011

  
1783 2012

  
1784 2013
class TestGanetiLockManager(_ThreadedTestCase):
1785 2014
  def setUp(self):

Also available in: Unified diff