Revision aaae9bc0

b/lib/locking.py
259 259
    finally:
260 260
      self.__lock.release()
261 261

  
262

  
263
class LockSet:
264
  """Implements a set of locks.
265

  
266
  This abstraction implements a set of shared locks for the same resource type,
267
  distinguished by name. The user can lock a subset of the resources and the
268
  LockSet will take care of acquiring the locks always in the same order, thus
269
  preventing deadlock.
270

  
271
  All the locks needed in the same set must be acquired together, though.
272

  
273
  """
274
  def __init__(self, members=None):
275
    """Constructs a new LockSet.
276

  
277
    Args:
278
      members: initial members of the set
279

  
280
    """
281
    # Used internally to guarantee coherency.
282
    self.__lock = SharedLock()
283

  
284
    # The lockdict indexes the relationship name -> lock
285
    # The order-of-locking is implied by the alphabetical order of names
286
    self.__lockdict = {}
287

  
288
    if members is not None:
289
      for name in members:
290
        self.__lockdict[name] = SharedLock()
291

  
292
    # The owner dict contains the set of locks each thread owns. For
293
    # performance each thread can access its own key without a global lock on
294
    # this structure. It is paramount though that *no* other type of access is
295
    # done to this structure (eg. no looping over its keys). *_owner helper
296
    # function are defined to guarantee access is correct, but in general never
297
    # do anything different than __owners[threading.currentThread()], or there
298
    # will be trouble.
299
    self.__owners = {}
300

  
301
  def _is_owned(self):
302
    """Is the current thread a current level owner?"""
303
    return threading.currentThread() in self.__owners
304

  
305
  def _add_owned(self, name):
306
    """Note the current thread owns the given lock"""
307
    if self._is_owned():
308
      self.__owners[threading.currentThread()].add(name)
309
    else:
310
       self.__owners[threading.currentThread()] = set([name])
311

  
312
  def _del_owned(self, name):
313
    """Note the current thread owns the given lock"""
314
    self.__owners[threading.currentThread()].remove(name)
315

  
316
    if not self.__owners[threading.currentThread()]:
317
      del self.__owners[threading.currentThread()]
318

  
319
  def _list_owned(self):
320
    """Get the set of resource names owned by the current thread"""
321
    if self._is_owned():
322
      return self.__owners[threading.currentThread()].copy()
323
    else:
324
      return set()
325

  
326
  def __names(self):
327
    """Return the current set of names.
328

  
329
    Only call this function while holding __lock and don't iterate on the
330
    result after releasing the lock.
331

  
332
    """
333
    return set(self.__lockdict.keys())
334

  
335
  def _names(self):
336
    """Return a copy of the current set of elements.
337

  
338
    Used only for debugging purposes.
339
    """
340
    self.__lock.acquire(shared=1)
341
    try:
342
      result = self.__names()
343
    finally:
344
      self.__lock.release()
345
    return result
346

  
347
  def acquire(self, names, blocking=1, shared=0):
348
    """Acquire a set of resource locks.
349

  
350
    Args:
351
      names: the names of the locks which shall be acquired.
352
             (special lock names, or instance/node names)
353
      shared: whether to acquire in shared mode. By default an exclusive lock
354
              will be acquired.
355
      blocking: whether to block while trying to acquire or to operate in try-lock mode.
356
                this locking mode is not supported yet.
357

  
358
    Returns:
359
      True: when all the locks are successfully acquired
360

  
361
    Raises:
362
      errors.LockError: when any lock we try to acquire has been deleted
363
      before we succeed. In this case none of the locks requested will be
364
      acquired.
365

  
366
    """
367
    if not blocking:
368
      # We don't have non-blocking mode for now
369
      raise NotImplementedError
370

  
371
    # Check we don't already own locks at this level
372
    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
373

  
374
    # Support passing in a single resource to acquire rather than many
375
    if isinstance(names, basestring):
376
      names = [names]
377
    else:
378
      names.sort()
379

  
380
    # Now names contains a sorted list of resources whose lock we want to
381
    # acquire. In order to get them we loop on this (private) list and look
382
    # them up in __lockdict. Since we have no lock held on lockdict we have no
383
    # guarantees on their presence, and they may even disappear after we looked
384
    # them up. This is fine though as .acquire() itself is safe and will alert
385
    # us if the lock gets deleted.
386

  
387
    try:
388
      for lname in names:
389
        lock = self.__lockdict[lname] # raises KeyError if the lock is not there
390
        lock.acquire(shared=shared) # raises LockError if the lock is deleted
391
        try:
392
          # now the lock cannot be deleted, we have it!
393
          self._add_owned(lname)
394
        except:
395
          # We shouldn't have problems adding the lock to the owners list, but
396
          # if we did we'll try to release this lock and re-raise exception.
397
          # Of course something is going to be really wrong, after this.
398
          lock.release()
399
          raise
400

  
401
    except (KeyError, errors.LockError):
402
      name_fail = lname
403
      for lname in self._list_owned():
404
        self.__lockdict[lname].release()
405
        self._del_owned(lname)
406
      raise errors.LockError('non-existing lock in set (%s)' % name_fail)
407

  
408
    return True
409

  
410
  def release(self, names=None):
411
    """Release a set of resource locks, at the same level.
412

  
413
    You must have acquired the locks, either in shared or in exclusive mode,
414
    before releasing them.
415

  
416
    Args:
417
      names: the names of the locks which shall be released.
418
             (defaults to all the locks acquired at that level).
419

  
420
    """
421

  
422
    assert self._is_owned(), "release() on lock set while not owner"
423

  
424
    # Support passing in a single resource to release rather than many
425
    if isinstance(names, basestring):
426
      names = [names]
427

  
428
    if names is None:
429
      names = self._list_owned()
430
    else:
431
      names = set(names)
432
      assert self._list_owned().issuperset(names), (
433
               "release() on unheld resources %s" %
434
               names.difference(self._list_owned()))
435

  
436
    for lockname in names:
437
      # If we are sure the lock doesn't leave __lockdict without being
438
      # exclusively held we can do this...
439
      self.__lockdict[lockname].release()
440
      self._del_owned(lockname)
441

  
442
  def add(self, names, acquired=0, shared=0):
443
    """Add a new set of elements to the set
444

  
445
    Args:
446
      names: names of the new elements to add
447
      acquired: pre-acquire the new resource?
448
      shared: is the pre-acquisition shared?
449

  
450
    """
451
    # Support passing in a single resource to add rather than many
452
    if isinstance(names, basestring):
453
      names = [names]
454

  
455
    # Acquire the internal lock in an exclusive way, so there cannot be a
456
    # conflicting add()
457
    self.__lock.acquire()
458
    try:
459
      invalid_names = self.__names().intersection(names)
460
      if invalid_names:
461
        # This must be an explicit raise, not an assert, because assert is
462
        # turned off when using optimization, and this can happen because of
463
        # concurrency even if the user doesn't want it.
464
        raise errors.LockError("duplicate add() (%s)" % invalid_names)
465

  
466
      for lockname in names:
467
        lock = SharedLock()
468

  
469
        if acquired:
470
          lock.acquire(shared=shared)
471
          # now the lock cannot be deleted, we have it!
472
          try:
473
            self._add_owned(lockname)
474
          except:
475
            # We shouldn't have problems adding the lock to the owners list,
476
            # but if we did we'll try to release this lock and re-raise
477
            # exception.  Of course something is going to be really wrong,
478
            # after this.  On the other hand the lock hasn't been added to the
479
            # __lockdict yet so no other threads should be pending on it. This
480
            # release is just a safety measure.
481
            lock.release()
482
            raise
483

  
484
        self.__lockdict[lockname] = lock
485

  
486
    finally:
487
      self.__lock.release()
488

  
489
    return True
490

  
491
  def remove(self, names, blocking=1):
492
    """Remove elements from the lock set.
493

  
494
    You can either not hold anything in the lockset or already hold a superset
495
    of the elements you want to delete, exclusively.
496

  
497
    Args:
498
      names: names of the resource to remove.
499
      blocking: whether to block while trying to acquire or to operate in
500
                try-lock mode.  this locking mode is not supported yet unless
501
                you are already holding exclusively the locks.
502

  
503
    Returns:
504
      A list of lock which we failed to delete. The list is always empty if we
505
      were holding all the locks exclusively.
506

  
507
    """
508
    if not blocking and not self._is_owned():
509
      # We don't have non-blocking mode for now
510
      raise NotImplementedError
511

  
512
    # Support passing in a single resource to remove rather than many
513
    if isinstance(names, basestring):
514
      names = [names]
515

  
516
    # If we own any subset of this lock it must be a superset of what we want
517
    # to delete. The ownership must also be exclusive, but that will be checked
518
    # by the lock itself.
519
    assert not self._is_owned() or self._list_owned().issuperset(names), (
520
      "remove() on acquired lockset while not owning all elements")
521

  
522
    delete_failed=[]
523

  
524
    for lname in names:
525
      # Calling delete() acquires the lock exclusively if we don't already own
526
      # it, and causes all pending and subsequent lock acquires to fail. It's
527
      # fine to call it out of order because delete() also implies release(),
528
      # and the assertion above guarantees that if we either already hold
529
      # everything we want to delete, or we hold none.
530
      try:
531
        self.__lockdict[lname].delete()
532
      except (KeyError, errors.LockError):
533
        delete_failed.append(lname)
534
        # This cannot happen if we were already holding it, verify:
535
        assert not self._is_owned(), "remove failed while holding lockset"
536
      else:
537
        # If no LockError was raised we are the ones who deleted the lock.
538
        # This means we can safely remove it from lockdict, as any further or
539
        # pending delete() or acquire() will fail (and nobody can have the lock
540
        # since before our call to delete()).
541
        #
542
        # This is done in an else clause because if the exception was thrown
543
        # it's the job of the one who actually deleted it.
544
        del self.__lockdict[lname]
545
        # And let's remove it from our private list if we owned it.
546
        if self._is_owned():
547
          self._del_owned(lname)
548

  
549
    return delete_failed
550

  
b/test/ganeti.locking_unittest.py
230 230
    self.assertEqual(self.done.get(True, 1), 'ERR')
231 231

  
232 232

  
233
class TestLockSet(unittest.TestCase):
234
  """LockSet tests"""
235

  
236
  def setUp(self):
237
    self.resources = ['one', 'two', 'three']
238
    self.ls = locking.LockSet(self.resources)
239
    # helper threads use the 'done' queue to tell the master they finished.
240
    self.done = Queue.Queue(0)
241

  
242
  def testResources(self):
243
    self.assertEquals(self.ls._names(), set(self.resources))
244
    newls = locking.LockSet()
245
    self.assertEquals(newls._names(), set())
246

  
247
  def testAcquireRelease(self):
248
    self.ls.acquire('one')
249
    self.assertEquals(self.ls._list_owned(), set(['one']))
250
    self.ls.release()
251
    self.assertEquals(self.ls._list_owned(), set())
252
    self.ls.acquire(['one'])
253
    self.assertEquals(self.ls._list_owned(), set(['one']))
254
    self.ls.release()
255
    self.assertEquals(self.ls._list_owned(), set())
256
    self.ls.acquire(['one', 'two', 'three'])
257
    self.assertEquals(self.ls._list_owned(), set(['one', 'two', 'three']))
258
    self.ls.release('one')
259
    self.assertEquals(self.ls._list_owned(), set(['two', 'three']))
260
    self.ls.release(['three'])
261
    self.assertEquals(self.ls._list_owned(), set(['two']))
262
    self.ls.release()
263
    self.assertEquals(self.ls._list_owned(), set())
264
    self.ls.acquire(['one', 'three'])
265
    self.assertEquals(self.ls._list_owned(), set(['one', 'three']))
266
    self.ls.release()
267
    self.assertEquals(self.ls._list_owned(), set())
268

  
269
  def testNoDoubleAcquire(self):
270
    self.ls.acquire('one')
271
    self.assertRaises(AssertionError, self.ls.acquire, 'one')
272
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
273
    self.assertRaises(AssertionError, self.ls.acquire, ['two', 'three'])
274
    self.ls.release()
275
    self.ls.acquire(['one', 'three'])
276
    self.ls.release('one')
277
    self.assertRaises(AssertionError, self.ls.acquire, ['two'])
278
    self.ls.release('three')
279

  
280
  def testNoWrongRelease(self):
281
    self.assertRaises(AssertionError, self.ls.release)
282
    self.ls.acquire('one')
283
    self.assertRaises(AssertionError, self.ls.release, 'two')
284

  
285
  def testAddRemove(self):
286
    self.ls.add('four')
287
    self.assertEquals(self.ls._list_owned(), set())
288
    self.assert_('four' in self.ls._names())
289
    self.ls.add(['five', 'six', 'seven'], acquired=1)
290
    self.assert_('five' in self.ls._names())
291
    self.assert_('six' in self.ls._names())
292
    self.assert_('seven' in self.ls._names())
293
    self.assertEquals(self.ls._list_owned(), set(['five', 'six', 'seven']))
294
    self.ls.remove(['five', 'six'])
295
    self.assert_('five' not in self.ls._names())
296
    self.assert_('six' not in self.ls._names())
297
    self.assertEquals(self.ls._list_owned(), set(['seven']))
298
    self.ls.add('eight', acquired=1, shared=1)
299
    self.assert_('eight' in self.ls._names())
300
    self.assertEquals(self.ls._list_owned(), set(['seven', 'eight']))
301
    self.ls.remove('seven')
302
    self.assert_('seven' not in self.ls._names())
303
    self.assertEquals(self.ls._list_owned(), set(['eight']))
304
    self.ls.release()
305
    self.ls.remove(['two'])
306
    self.assert_('two' not in self.ls._names())
307
    self.ls.acquire('three')
308
    self.ls.remove(['three'])
309
    self.assert_('three' not in self.ls._names())
310
    self.assertEquals(self.ls.remove('three'), ['three'])
311
    self.assertEquals(self.ls.remove(['one', 'three', 'six']), ['three', 'six'])
312
    self.assert_('one' not in self.ls._names())
313

  
314
  def testRemoveNonBlocking(self):
315
    self.assertRaises(NotImplementedError, self.ls.remove, 'one', blocking=0)
316
    self.ls.acquire('one')
317
    self.assertEquals(self.ls.remove('one', blocking=0), [])
318
    self.ls.acquire(['two', 'three'])
319
    self.assertEquals(self.ls.remove(['two', 'three'], blocking=0), [])
320

  
321
  def testNoDoubleAdd(self):
322
    self.assertRaises(errors.LockError, self.ls.add, 'two')
323
    self.ls.add('four')
324
    self.assertRaises(errors.LockError, self.ls.add, 'four')
325

  
326
  def testNoWrongRemoves(self):
327
    self.ls.acquire(['one', 'three'], shared=1)
328
    # Cannot remove 'two' while holding something which is not a superset
329
    self.assertRaises(AssertionError, self.ls.remove, 'two')
330
    # Cannot remove 'three' as we are sharing it
331
    self.assertRaises(AssertionError, self.ls.remove, 'three')
332

  
333
  def _doLockSet(self, set, shared):
334
    try:
335
      self.ls.acquire(set, shared=shared)
336
      self.done.put('DONE')
337
      self.ls.release()
338
    except errors.LockError:
339
      self.done.put('ERR')
340

  
341
  def _doRemoveSet(self, set):
342
    self.done.put(self.ls.remove(set))
343

  
344
  def testConcurrentSharedAcquire(self):
345
    self.ls.acquire(['one', 'two'], shared=1)
346
    Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
347
    self.assertEqual(self.done.get(True, 1), 'DONE')
348
    Thread(target=self._doLockSet, args=(['one', 'two', 'three'], 1)).start()
349
    self.assertEqual(self.done.get(True, 1), 'DONE')
350
    Thread(target=self._doLockSet, args=('three', 1)).start()
351
    self.assertEqual(self.done.get(True, 1), 'DONE')
352
    Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
353
    Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start()
354
    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
355
    self.ls.release()
356
    self.assertEqual(self.done.get(True, 1), 'DONE')
357
    self.assertEqual(self.done.get(True, 1), 'DONE')
358

  
359
  def testConcurrentExclusiveAcquire(self):
360
    self.ls.acquire(['one', 'two'])
361
    Thread(target=self._doLockSet, args=('three', 1)).start()
362
    self.assertEqual(self.done.get(True, 1), 'DONE')
363
    Thread(target=self._doLockSet, args=('three', 0)).start()
364
    self.assertEqual(self.done.get(True, 1), 'DONE')
365
    Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
366
    Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
367
    Thread(target=self._doLockSet, args=('one', 0)).start()
368
    Thread(target=self._doLockSet, args=('one', 1)).start()
369
    Thread(target=self._doLockSet, args=(['two', 'three'], 0)).start()
370
    Thread(target=self._doLockSet, args=(['two', 'three'], 1)).start()
371
    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
372
    self.ls.release()
373
    self.assertEqual(self.done.get(True, 1), 'DONE')
374
    self.assertEqual(self.done.get(True, 1), 'DONE')
375
    self.assertEqual(self.done.get(True, 1), 'DONE')
376
    self.assertEqual(self.done.get(True, 1), 'DONE')
377
    self.assertEqual(self.done.get(True, 1), 'DONE')
378
    self.assertEqual(self.done.get(True, 1), 'DONE')
379

  
380
  def testConcurrentRemove(self):
381
    self.ls.add('four')
382
    self.ls.acquire(['one', 'two', 'four'])
383
    Thread(target=self._doLockSet, args=(['one', 'four'], 0)).start()
384
    Thread(target=self._doLockSet, args=(['one', 'four'], 1)).start()
385
    Thread(target=self._doLockSet, args=(['one', 'two'], 0)).start()
386
    Thread(target=self._doLockSet, args=(['one', 'two'], 1)).start()
387
    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
388
    self.ls.remove('one')
389
    self.ls.release()
390
    self.assertEqual(self.done.get(True, 1), 'ERR')
391
    self.assertEqual(self.done.get(True, 1), 'ERR')
392
    self.assertEqual(self.done.get(True, 1), 'ERR')
393
    self.assertEqual(self.done.get(True, 1), 'ERR')
394
    self.ls.add(['five', 'six'], acquired=1)
395
    Thread(target=self._doLockSet, args=(['three', 'six'], 1)).start()
396
    Thread(target=self._doLockSet, args=(['three', 'six'], 0)).start()
397
    Thread(target=self._doLockSet, args=(['four', 'six'], 1)).start()
398
    Thread(target=self._doLockSet, args=(['four', 'six'], 0)).start()
399
    self.ls.remove('five')
400
    self.ls.release()
401
    self.assertEqual(self.done.get(True, 1), 'DONE')
402
    self.assertEqual(self.done.get(True, 1), 'DONE')
403
    self.assertEqual(self.done.get(True, 1), 'DONE')
404
    self.assertEqual(self.done.get(True, 1), 'DONE')
405
    self.ls.acquire(['three', 'four'])
406
    Thread(target=self._doRemoveSet, args=(['four', 'six'], )).start()
407
    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
408
    self.ls.remove('four')
409
    self.assertEqual(self.done.get(True, 1), ['four'])
410
    Thread(target=self._doRemoveSet, args=(['two'])).start()
411
    self.assertEqual(self.done.get(True, 1), [])
412
    self.ls.release()
413

  
414

  
233 415
if __name__ == '__main__':
234 416
  unittest.main()
235 417
  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)

Also available in: Unified diff