Revision 84e344d4

b/lib/locking.py
20 20

  
21 21
"""Module implementing the Ganeti locking code."""
22 22

  
23
# pylint: disable-msg=W0613,W0201
24

  
25 23
import threading
26
# Wouldn't it be better to define LockingError in the locking module?
27
# Well, for now that's how the rest of the code does it...
24

  
28 25
from ganeti import errors
29 26
from ganeti import utils
30 27

  
......
48 45
  return wrap
49 46

  
50 47

  
51
class SharedLock:
48
class _CountingCondition(object):
49
  """Wrapper for Python's built-in threading.Condition class.
50

  
51
  This wrapper keeps a count of active waiters. We can't access the internal
52
  "__waiters" attribute of threading.Condition because it's not thread-safe.
53

  
54
  """
55
  __slots__ = [
56
    "_cond",
57
    "_nwaiters",
58
    ]
59

  
60
  def __init__(self, lock):
61
    """Initializes this class.
62

  
63
    """
64
    object.__init__(self)
65
    self._cond = threading.Condition(lock=lock)
66
    self._nwaiters = 0
67

  
68
  def notifyAll(self):
69
    """Notifies the condition.
70

  
71
    """
72
    return self._cond.notifyAll()
73

  
74
  def wait(self, timeout=None):
75
    """Waits for the condition to be notified.
76

  
77
    @type timeout: float or None
78
    @param timeout: Timeout in seconds
79

  
80
    """
81
    assert self._nwaiters >= 0
82

  
83
    self._nwaiters += 1
84
    try:
85
      return self._cond.wait(timeout=timeout)
86
    finally:
87
      self._nwaiters -= 1
88

  
89
  def has_waiting(self):
90
    """Returns whether there are active waiters.
91

  
92
    """
93
    return bool(self._nwaiters)
94

  
95

  
96
class SharedLock(object):
52 97
  """Implements a shared lock.
53 98

  
54 99
  Multiple threads can acquire the lock in a shared way, calling
......
60 105
  eventually do so.
61 106

  
62 107
  """
108
  __slots__ = [
109
    "__active_shr_c",
110
    "__inactive_shr_c",
111
    "__deleted",
112
    "__exc",
113
    "__lock",
114
    "__pending",
115
    "__shr",
116
    ]
117

  
118
  __condition_class = _CountingCondition
119

  
63 120
  def __init__(self):
64
    """Construct a new SharedLock"""
65
    # we have two conditions, c_shr and c_exc, sharing the same lock.
121
    """Construct a new SharedLock.
122

  
123
    """
124
    object.__init__(self)
125

  
126
    # Internal lock
66 127
    self.__lock = threading.Lock()
67
    self.__turn_shr = threading.Condition(self.__lock)
68
    self.__turn_exc = threading.Condition(self.__lock)
69 128

  
70
    # current lock holders
129
    # Queue containing waiting acquires
130
    self.__pending = []
131

  
132
    # Active and inactive conditions for shared locks
133
    self.__active_shr_c = self.__condition_class(self.__lock)
134
    self.__inactive_shr_c = self.__condition_class(self.__lock)
135

  
136
    # Current lock holders
71 137
    self.__shr = set()
72 138
    self.__exc = None
73 139

  
74
    # lock waiters
75
    self.__nwait_exc = 0
76
    self.__nwait_shr = 0
77
    self.__npass_shr = 0
78

  
79 140
    # is this lock in the deleted state?
80 141
    self.__deleted = False
81 142

  
143
  def __check_deleted(self):
144
    """Raises an exception if the lock has been deleted.
145

  
146
    """
147
    if self.__deleted:
148
      raise errors.LockError("Deleted lock")
149

  
82 150
  def __is_sharer(self):
83
    """Is the current thread sharing the lock at this time?"""
151
    """Is the current thread sharing the lock at this time?
152

  
153
    """
84 154
    return threading.currentThread() in self.__shr
85 155

  
86 156
  def __is_exclusive(self):
87
    """Is the current thread holding the lock exclusively at this time?"""
157
    """Is the current thread holding the lock exclusively at this time?
158

  
159
    """
88 160
    return threading.currentThread() == self.__exc
89 161

  
90 162
  def __is_owned(self, shared=-1):
......
112 184
    """
113 185
    self.__lock.acquire()
114 186
    try:
115
      result = self.__is_owned(shared=shared)
187
      return self.__is_owned(shared=shared)
116 188
    finally:
117 189
      self.__lock.release()
118 190

  
119
    return result
120

  
121
  def __wait(self, c):
122
    """Wait on the given condition, and raise an exception if the current lock
123
    is declared deleted in the meantime.
191
  def _count_pending(self):
192
    """Returns the number of pending acquires.
124 193

  
125
    @param c: the condition to wait on
194
    @rtype: int
126 195

  
127 196
    """
128
    c.wait()
129
    if self.__deleted:
130
      raise errors.LockError('deleted lock')
197
    self.__lock.acquire()
198
    try:
199
      return len(self.__pending)
200
    finally:
201
      self.__lock.release()
131 202

  
132
  def __exclusive_acquire(self):
133
    """Acquire the lock exclusively.
203
  def __do_acquire(self, shared):
204
    """Actually acquire the lock.
134 205

  
135
    This is a private function that presumes you are already holding the
136
    internal lock. It's defined separately to avoid code duplication between
137
    acquire() and delete()
206
    """
207
    if shared:
208
      self.__shr.add(threading.currentThread())
209
    else:
210
      self.__exc = threading.currentThread()
211

  
212
  def __can_acquire(self, shared):
213
    """Determine whether lock can be acquired.
138 214

  
139 215
    """
140
    self.__nwait_exc += 1
141
    try:
142
      # This is to save ourselves from a nasty race condition that could
143
      # theoretically make the sharers starve.
144
      if self.__nwait_shr > 0 or self.__nwait_exc > 1:
145
        self.__wait(self.__turn_exc)
216
    if shared:
217
      return self.__exc is None
218
    else:
219
      return len(self.__shr) == 0 and self.__exc is None
146 220

  
147
      while len(self.__shr) > 0 or self.__exc is not None:
148
        self.__wait(self.__turn_exc)
221
  def __is_on_top(self, cond):
222
    """Checks whether the passed condition is on top of the queue.
149 223

  
150
      self.__exc = threading.currentThread()
151
    finally:
152
      self.__nwait_exc -= 1
224
    The caller must make sure the queue isn't empty.
153 225

  
154
    assert self.__npass_shr == 0, "SharedLock: internal fairness violation"
226
    """
227
    return self.__pending[0] == cond
155 228

  
156
  def __shared_acquire(self):
157
    """Acquire the lock in shared mode
229
  def __acquire_unlocked(self, shared=0, timeout=None):
230
    """Acquire a shared lock.
158 231

  
159
    This is a private function that presumes you are already holding the
160
    internal lock.
232
    @param shared: whether to acquire in shared mode; by default an
233
        exclusive lock will be acquired
234
    @param timeout: maximum waiting time before giving up
161 235

  
162 236
    """
163
    self.__nwait_shr += 1
164
    try:
165
      wait = False
166
      # If there is an exclusive holder waiting we have to wait.
167
      # We'll only do this once, though, when we start waiting for
168
      # the lock. Then we'll just wait while there are no
169
      # exclusive holders.
170
      if self.__nwait_exc > 0:
171
        # TODO: if !blocking...
172
        wait = True
173
        self.__wait(self.__turn_shr)
174

  
175
      while self.__exc is not None:
176
        wait = True
177
        # TODO: if !blocking...
178
        self.__wait(self.__turn_shr)
237
    self.__check_deleted()
179 238

  
180
      self.__shr.add(threading.currentThread())
239
    # We cannot acquire the lock if we already have it
240
    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
241

  
242
    # Check whether someone else holds the lock or there are pending acquires.
243
    if not self.__pending and self.__can_acquire(shared):
244
      # Apparently not, can acquire lock directly.
245
      self.__do_acquire(shared)
246
      return True
181 247

  
182
      # If we were waiting note that we passed
183
      if wait:
184
        self.__npass_shr -= 1
248
    if shared:
249
      wait_condition = self.__active_shr_c
185 250

  
251
      # Check if we're not yet in the queue
252
      if wait_condition not in self.__pending:
253
        self.__pending.append(wait_condition)
254
    else:
255
      wait_condition = self.__condition_class(self.__lock)
256
      # Always add to queue
257
      self.__pending.append(wait_condition)
258

  
259
    try:
260
      # Wait until we become the topmost acquire in the queue or the timeout
261
      # expires.
262
      while not (self.__is_on_top(wait_condition) and
263
                 self.__can_acquire(shared)):
264
        # Wait for notification
265
        wait_condition.wait(timeout)
266
        self.__check_deleted()
267

  
268
        # A lot of code assumes blocking acquires always succeed. Loop
269
        # internally for that case.
270
        if timeout is not None:
271
          break
272

  
273
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
274
        self.__do_acquire(shared)
275
        return True
186 276
    finally:
187
      self.__nwait_shr -= 1
277
      # Remove condition from queue if there are no more waiters
278
      if not wait_condition.has_waiting() and not self.__deleted:
279
        self.__pending.remove(wait_condition)
188 280

  
189
    assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
281
    return False
190 282

  
191
  def acquire(self, blocking=1, shared=0):
283
  def acquire(self, shared=0, timeout=None):
192 284
    """Acquire a shared lock.
193 285

  
286
    @type shared: int
194 287
    @param shared: whether to acquire in shared mode; by default an
195 288
        exclusive lock will be acquired
196
    @param blocking: whether to block while trying to acquire or to
197
        operate in try-lock mode (this locking mode is not supported yet)
289
    @type timeout: float
290
    @param timeout: maximum waiting time before giving up
198 291

  
199 292
    """
200
    if not blocking:
201
      # We don't have non-blocking mode for now
202
      raise NotImplementedError
203

  
204 293
    self.__lock.acquire()
205 294
    try:
206
      if self.__deleted:
207
        raise errors.LockError('deleted lock')
208

  
209
      # We cannot acquire the lock if we already have it
210
      assert not self.__is_owned(), "double acquire() on a non-recursive lock"
211
      assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
212

  
213
      if shared:
214
        self.__shared_acquire()
215
      else:
216
        # TODO: if !blocking...
217
        # (or modify __exclusive_acquire for non-blocking mode)
218
        self.__exclusive_acquire()
219

  
295
      return self.__acquire_unlocked(shared, timeout)
220 296
    finally:
221 297
      self.__lock.release()
222 298

  
223
    return True
224

  
225 299
  def release(self):
226 300
    """Release a Shared Lock.
227 301

  
......
231 305
    """
232 306
    self.__lock.acquire()
233 307
    try:
234
      assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
308
      assert self.__is_exclusive() or self.__is_sharer(), \
309
        "Cannot release non-owned lock"
310

  
235 311
      # Autodetect release type
236 312
      if self.__is_exclusive():
237 313
        self.__exc = None
238

  
239
        # An exclusive holder has just had the lock, time to put it in shared
240
        # mode if there are shared holders waiting. Otherwise wake up the next
241
        # exclusive holder.
242
        if self.__nwait_shr > 0:
243
          # Make sure at least the ones which were blocked pass.
244
          self.__npass_shr = self.__nwait_shr
245
          self.__turn_shr.notifyAll()
246
        elif self.__nwait_exc > 0:
247
          self.__turn_exc.notify()
248

  
249
      elif self.__is_sharer():
314
      else:
250 315
        self.__shr.remove(threading.currentThread())
251 316

  
252
        # If there are shared holders waiting (and not just scheduled to pass)
253
        # there *must* be an exclusive holder waiting as well; otherwise what
254
        # were they waiting for?
255
        assert (self.__nwait_exc > 0 or
256
                self.__npass_shr == self.__nwait_shr), \
257
                "Lock sharers waiting while no exclusive is queueing"
258

  
259
        # If there are no more shared holders either in or scheduled to pass,
260
        # and some exclusive holders are waiting let's wake one up.
261
        if (len(self.__shr) == 0 and
262
            self.__nwait_exc > 0 and
263
            not self.__npass_shr > 0):
264
          self.__turn_exc.notify()
317
      # Notify topmost condition in queue
318
      if self.__pending:
319
        first_condition = self.__pending[0]
320
        first_condition.notifyAll()
265 321

  
266
      else:
267
        assert False, "Cannot release non-owned lock"
322
        if first_condition == self.__active_shr_c:
323
          self.__active_shr_c = self.__inactive_shr_c
324
          self.__inactive_shr_c = first_condition
268 325

  
269 326
    finally:
270 327
      self.__lock.release()
271 328

  
272
  def delete(self, blocking=1):
329
  def delete(self, timeout=None):
273 330
    """Delete a Shared Lock.
274 331

  
275 332
    This operation will declare the lock for removal. First the lock will be
276 333
    acquired in exclusive mode if you don't already own it, then the lock
277 334
    will be put in a state where any future and pending acquire() fail.
278 335

  
279
    @param blocking: whether to block while trying to acquire or to
280
        operate in try-lock mode.  this locking mode is not supported
281
        yet unless you are already holding exclusively the lock.
336
    @type timeout: float
337
    @param timeout: maximum waiting time before giving up
282 338

  
283 339
    """
284 340
    self.__lock.acquire()
285 341
    try:
286
      assert not self.__is_sharer(), "cannot delete() a lock while sharing it"
342
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
343

  
344
      self.__check_deleted()
287 345

  
288
      if self.__deleted:
289
        raise errors.LockError('deleted lock')
346
      # The caller is allowed to hold the lock exclusively already.
347
      acquired = self.__is_exclusive()
290 348

  
291
      if not self.__is_exclusive():
292
        if not blocking:
293
          # We don't have non-blocking mode for now
294
          raise NotImplementedError
295
        self.__exclusive_acquire()
349
      if not acquired:
350
        acquired = self.__acquire_unlocked(timeout)
351

  
352
      if acquired:
353
        self.__deleted = True
354
        self.__exc = None
296 355

  
297
      self.__deleted = True
298
      self.__exc = None
299
      # Wake up everybody, they will fail acquiring the lock and
300
      # raise an exception instead.
301
      self.__turn_exc.notifyAll()
302
      self.__turn_shr.notifyAll()
356
        # Notify all acquires. They'll throw an error.
357
        while self.__pending:
358
          self.__pending.pop().notifyAll()
303 359

  
360
      return acquired
304 361
    finally:
305 362
      self.__lock.release()
306 363

  
b/test/ganeti.locking_unittest.py
26 26
import unittest
27 27
import time
28 28
import Queue
29
import threading
29 30

  
30 31
from ganeti import locking
31 32
from ganeti import errors
32
from threading import Thread
33 33

  
34 34

  
35 35
# This is used to test the ssynchronize decorator.
......
39 39
#: List for looping tests
40 40
ITERATIONS = range(8)
41 41

  
42

  
42 43
def _Repeat(fn):
43 44
  """Decorator for executing a function many times"""
44 45
  def wrapper(*args, **kwargs):
......
46 47
      fn(*args, **kwargs)
47 48
  return wrapper
48 49

  
50

  
49 51
class _ThreadedTestCase(unittest.TestCase):
50 52
  """Test class that supports adding/waiting on threads"""
51 53
  def setUp(self):
......
54 56

  
55 57
  def _addThread(self, *args, **kwargs):
56 58
    """Create and remember a new thread"""
57
    t = Thread(*args, **kwargs)
59
    t = threading.Thread(*args, **kwargs)
58 60
    self.threads.append(t)
59 61
    t.start()
60 62
    return t
......
147 149

  
148 150
  def testSharersCanCoexist(self):
149 151
    self.sl.acquire(shared=1)
150
    Thread(target=self._doItSharer).start()
152
    threading.Thread(target=self._doItSharer).start()
151 153
    self.assert_(self.done.get(True, 1))
152 154
    self.sl.release()
153 155

  
......
234 236
    self.assertEqual(self.done.get_nowait(), 'SHR')
235 237
    self.assertEqual(self.done.get_nowait(), 'EXC')
236 238

  
237
  def testNoNonBlocking(self):
238
    self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0)
239
    self.assertRaises(NotImplementedError, self.sl.delete, blocking=0)
240
    self.sl.acquire()
241
    self.sl.delete(blocking=0) # Fine, because the lock is already acquired
242

  
243 239
  def testDelete(self):
244 240
    self.sl.delete()
245 241
    self.assertRaises(errors.LockError, self.sl.acquire)
......
280 276
    self.assertEqual(self.done.get_nowait(), 'ERR')
281 277
    self.sl = locking.SharedLock()
282 278

  
279
  @_Repeat
280
  def testExclusiveAcquireTimeout(self):
281
    def _LockExclusive(wait):
282
      self.sl.acquire(shared=0)
283
      self.done.put("A: start sleep")
284
      time.sleep(wait)
285
      self.done.put("A: end sleep")
286
      self.sl.release()
287

  
288
    for shared in [0, 1]:
289
      # Start thread to hold lock for 20 ms
290
      self._addThread(target=_LockExclusive, args=(20.0 / 1000.0, ))
291

  
292
      # Wait up to 100 ms to get lock
293
      self.failUnless(self.sl.acquire(shared=shared, timeout=0.1))
294
      self.done.put("got 2nd")
295
      self.sl.release()
296

  
297
      self._waitThreads()
298

  
299
      self.assertEqual(self.done.get_nowait(), "A: start sleep")
300
      self.assertEqual(self.done.get_nowait(), "A: end sleep")
301
      self.assertEqual(self.done.get_nowait(), "got 2nd")
302
      self.assertRaises(Queue.Empty, self.done.get_nowait)
303

  
304
  @_Repeat
305
  def testAcquireExpiringTimeout(self):
306
    def _AcquireWithTimeout(shared, timeout):
307
      if not self.sl.acquire(shared=shared, timeout=timeout):
308
        self.done.put("timeout")
309

  
310
    for shared in [0, 1]:
311
      # Lock exclusively
312
      self.sl.acquire()
313

  
314
      # Start shared acquires with timeout between 0 and 20 ms
315
      for i in xrange(11):
316
        self._addThread(target=_AcquireWithTimeout,
317
                        args=(shared, i * 2.0 / 1000.0))
318

  
319
      # Wait for threads to finish (makes sure the acquire timeout expires
320
      # before releasing the lock)
321
      self._waitThreads()
322

  
323
      # Release lock
324
      self.sl.release()
325

  
326
      for _ in xrange(11):
327
        self.assertEqual(self.done.get_nowait(), "timeout")
328

  
329
      self.assertRaises(Queue.Empty, self.done.get_nowait)
330

  
331
  @_Repeat
332
  def testSharedSkipExclusiveAcquires(self):
333
    # Tests whether shared acquires jump in front of exclusive acquires in the
334
    # queue.
335

  
336
    # Get exclusive lock while we fill the queue
337
    self.sl.acquire()
338

  
339
    def _Acquire(shared, name):
340
      if not self.sl.acquire(shared=shared):
341
        return
342

  
343
      self.done.put(name)
344
      self.sl.release()
345

  
346
    # Start shared acquires
347
    for _ in xrange(5):
348
      self._addThread(target=_Acquire, args=(1, "shared A"))
349

  
350
    # Start exclusive acquires
351
    for _ in xrange(3):
352
      self._addThread(target=_Acquire, args=(0, "exclusive B"))
353

  
354
    # More shared acquires
355
    for _ in xrange(5):
356
      self._addThread(target=_Acquire, args=(1, "shared C"))
357

  
358
    # More exclusive acquires
359
    for _ in xrange(3):
360
      self._addThread(target=_Acquire, args=(0, "exclusive D"))
361

  
362
    # Expect 6 pending exclusive acquires and 1 for all shared acquires
363
    # together
364
    self.assertEqual(self.sl._count_pending(), 7)
365

  
366
    # Release exclusive lock and wait
367
    self.sl.release()
368

  
369
    self._waitThreads()
370

  
371
    # Check sequence
372
    for _ in xrange(10):
373
      # Shared locks aren't guaranteed to be notified in order, but they'll be
374
      # first
375
      self.assert_(self.done.get_nowait() in ("shared A", "shared C"))
376

  
377
    for _ in xrange(3):
378
      self.assertEqual(self.done.get_nowait(), "exclusive B")
379

  
380
    for _ in xrange(3):
381
      self.assertEqual(self.done.get_nowait(), "exclusive D")
382

  
383
    self.assertRaises(Queue.Empty, self.done.get_nowait)
384

  
385
  @_Repeat
386
  def testMixedAcquireTimeout(self):
387
    sync = threading.Condition()
388

  
389
    def _AcquireShared(ev):
390
      if not self.sl.acquire(shared=1, timeout=None):
391
        return
392

  
393
      self.done.put("shared")
394

  
395
      # Notify main thread
396
      ev.set()
397

  
398
      # Wait for notification
399
      sync.acquire()
400
      try:
401
        sync.wait()
402
      finally:
403
        sync.release()
404

  
405
      # Release lock
406
      self.sl.release()
407

  
408
    acquires = []
409
    for _ in xrange(3):
410
      ev = threading.Event()
411
      self._addThread(target=_AcquireShared, args=(ev, ))
412
      acquires.append(ev)
413

  
414
    # Wait for all acquires to finish
415
    for i in acquires:
416
      i.wait()
417

  
418
    self.assertEqual(self.sl._count_pending(), 0)
419

  
420
    # Try to get exclusive lock
421
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
422

  
423
    # Acquire exclusive without timeout
424
    exclsync = threading.Condition()
425
    exclev = threading.Event()
426

  
427
    def _AcquireExclusive():
428
      if not self.sl.acquire(shared=0):
429
        return
430

  
431
      self.done.put("exclusive")
432

  
433
      # Notify main thread
434
      exclev.set()
435

  
436
      exclsync.acquire()
437
      try:
438
        exclsync.wait()
439
      finally:
440
        exclsync.release()
441

  
442
      self.sl.release()
443

  
444
    self._addThread(target=_AcquireExclusive)
445

  
446
    # Try to get exclusive lock
447
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
448

  
449
    # Make all shared holders release their locks
450
    sync.acquire()
451
    try:
452
      sync.notifyAll()
453
    finally:
454
      sync.release()
455

  
456
    # Wait for exclusive acquire to succeed
457
    exclev.wait()
458

  
459
    self.assertEqual(self.sl._count_pending(), 0)
460

  
461
    # Try to get exclusive lock
462
    self.failIf(self.sl.acquire(shared=0, timeout=0.02))
463

  
464
    def _AcquireSharedSimple():
465
      if self.sl.acquire(shared=1, timeout=None):
466
        self.done.put("shared2")
467
        self.sl.release()
468

  
469
    for _ in xrange(10):
470
      self._addThread(target=_AcquireSharedSimple)
471

  
472
    # Tell exclusive lock to release
473
    exclsync.acquire()
474
    try:
475
      exclsync.notifyAll()
476
    finally:
477
      exclsync.release()
478

  
479
    # Wait for everything to finish
480
    self._waitThreads()
481

  
482
    self.assertEqual(self.sl._count_pending(), 0)
483

  
484
    # Check sequence
485
    for _ in xrange(3):
486
      self.assertEqual(self.done.get_nowait(), "shared")
487

  
488
    self.assertEqual(self.done.get_nowait(), "exclusive")
489

  
490
    for _ in xrange(10):
491
      self.assertEqual(self.done.get_nowait(), "shared2")
492

  
493
    self.assertRaises(Queue.Empty, self.done.get_nowait)
494

  
283 495

  
284 496
class TestSSynchronizedDecorator(_ThreadedTestCase):
285 497
  """Shared Lock Synchronized decorator test"""
......
307 519

  
308 520
  def testSharersCanCoexist(self):
309 521
    _decoratorlock.acquire(shared=1)
310
    Thread(target=self._doItSharer).start()
522
    threading.Thread(target=self._doItSharer).start()
311 523
    self.assert_(self.done.get(True, 1))
312 524
    _decoratorlock.release()
313 525

  
......
354 566
    self.resources = ['one', 'two', 'three']
355 567
    self.ls = locking.LockSet(members=self.resources)
356 568

  
357

  
358 569
  def testResources(self):
359 570
    self.assertEquals(self.ls._names(), set(self.resources))
360 571
    newls = locking.LockSet()
......
489 700
    # We haven't really acquired anything, so we cannot release
490 701
    self.assertRaises(AssertionError, self.ls.release)
491 702

  
492
  def _doLockSet(self, set, shared):
703
  def _doLockSet(self, names, shared):
493 704
    try:
494
      self.ls.acquire(set, shared=shared)
705
      self.ls.acquire(names, shared=shared)
495 706
      self.done.put('DONE')
496 707
      self.ls.release()
497 708
    except errors.LockError:
498 709
      self.done.put('ERR')
499 710

  
500
  def _doAddSet(self, set):
711
  def _doAddSet(self, names):
501 712
    try:
502
      self.ls.add(set, acquired=1)
713
      self.ls.add(names, acquired=1)
503 714
      self.done.put('DONE')
504 715
      self.ls.release()
505 716
    except errors.LockError:
506 717
      self.done.put('ERR')
507 718

  
508
  def _doRemoveSet(self, set):
509
    self.done.put(self.ls.remove(set))
719
  def _doRemoveSet(self, names):
720
    self.done.put(self.ls.remove(names))
510 721

  
511 722
  @_Repeat
512 723
  def testConcurrentSharedAcquire(self):
......
537 748
    self._addThread(target=self._doLockSet, args=('three', 0))
538 749
    self._waitThreads()
539 750
    self.assertEqual(self.done.get_nowait(), 'DONE')
751
    self.assertRaises(Queue.Empty, self.done.get_nowait)
540 752
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 0))
541 753
    self._addThread(target=self._doLockSet, args=(['one', 'two'], 1))
542 754
    self._addThread(target=self._doLockSet, args=('one', 0))

Also available in: Unified diff