Revision aaae9bc0 lib/locking.py

b/lib/locking.py
259 259
    finally:
260 260
      self.__lock.release()
261 261

  
262

  
263
class LockSet:
264
  """Implements a set of locks.
265

  
266
  This abstraction implements a set of shared locks for the same resource type,
267
  distinguished by name. The user can lock a subset of the resources and the
268
  LockSet will take care of acquiring the locks always in the same order, thus
269
  preventing deadlock.
270

  
271
  All the locks needed in the same set must be acquired together, though.
272

  
273
  """
274
  def __init__(self, members=None):
275
    """Constructs a new LockSet.
276

  
277
    Args:
278
      members: initial members of the set
279

  
280
    """
281
    # Used internally to guarantee coherency.
282
    self.__lock = SharedLock()
283

  
284
    # The lockdict indexes the relationship name -> lock
285
    # The order-of-locking is implied by the alphabetical order of names
286
    self.__lockdict = {}
287

  
288
    if members is not None:
289
      for name in members:
290
        self.__lockdict[name] = SharedLock()
291

  
292
    # The owner dict contains the set of locks each thread owns. For
293
    # performance each thread can access its own key without a global lock on
294
    # this structure. It is paramount though that *no* other type of access is
295
    # done to this structure (eg. no looping over its keys). *_owner helper
296
    # function are defined to guarantee access is correct, but in general never
297
    # do anything different than __owners[threading.currentThread()], or there
298
    # will be trouble.
299
    self.__owners = {}
300

  
301
  def _is_owned(self):
302
    """Is the current thread a current level owner?"""
303
    return threading.currentThread() in self.__owners
304

  
305
  def _add_owned(self, name):
306
    """Note the current thread owns the given lock"""
307
    if self._is_owned():
308
      self.__owners[threading.currentThread()].add(name)
309
    else:
310
       self.__owners[threading.currentThread()] = set([name])
311

  
312
  def _del_owned(self, name):
313
    """Note the current thread owns the given lock"""
314
    self.__owners[threading.currentThread()].remove(name)
315

  
316
    if not self.__owners[threading.currentThread()]:
317
      del self.__owners[threading.currentThread()]
318

  
319
  def _list_owned(self):
320
    """Get the set of resource names owned by the current thread"""
321
    if self._is_owned():
322
      return self.__owners[threading.currentThread()].copy()
323
    else:
324
      return set()
325

  
326
  def __names(self):
327
    """Return the current set of names.
328

  
329
    Only call this function while holding __lock and don't iterate on the
330
    result after releasing the lock.
331

  
332
    """
333
    return set(self.__lockdict.keys())
334

  
335
  def _names(self):
336
    """Return a copy of the current set of elements.
337

  
338
    Used only for debugging purposes.
339
    """
340
    self.__lock.acquire(shared=1)
341
    try:
342
      result = self.__names()
343
    finally:
344
      self.__lock.release()
345
    return result
346

  
347
  def acquire(self, names, blocking=1, shared=0):
348
    """Acquire a set of resource locks.
349

  
350
    Args:
351
      names: the names of the locks which shall be acquired.
352
             (special lock names, or instance/node names)
353
      shared: whether to acquire in shared mode. By default an exclusive lock
354
              will be acquired.
355
      blocking: whether to block while trying to acquire or to operate in try-lock mode.
356
                this locking mode is not supported yet.
357

  
358
    Returns:
359
      True: when all the locks are successfully acquired
360

  
361
    Raises:
362
      errors.LockError: when any lock we try to acquire has been deleted
363
      before we succeed. In this case none of the locks requested will be
364
      acquired.
365

  
366
    """
367
    if not blocking:
368
      # We don't have non-blocking mode for now
369
      raise NotImplementedError
370

  
371
    # Check we don't already own locks at this level
372
    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
373

  
374
    # Support passing in a single resource to acquire rather than many
375
    if isinstance(names, basestring):
376
      names = [names]
377
    else:
378
      names.sort()
379

  
380
    # Now names contains a sorted list of resources whose lock we want to
381
    # acquire. In order to get them we loop on this (private) list and look
382
    # them up in __lockdict. Since we have no lock held on lockdict we have no
383
    # guarantees on their presence, and they may even disappear after we looked
384
    # them up. This is fine though as .acquire() itself is safe and will alert
385
    # us if the lock gets deleted.
386

  
387
    try:
388
      for lname in names:
389
        lock = self.__lockdict[lname] # raises KeyError if the lock is not there
390
        lock.acquire(shared=shared) # raises LockError if the lock is deleted
391
        try:
392
          # now the lock cannot be deleted, we have it!
393
          self._add_owned(lname)
394
        except:
395
          # We shouldn't have problems adding the lock to the owners list, but
396
          # if we did we'll try to release this lock and re-raise exception.
397
          # Of course something is going to be really wrong, after this.
398
          lock.release()
399
          raise
400

  
401
    except (KeyError, errors.LockError):
402
      name_fail = lname
403
      for lname in self._list_owned():
404
        self.__lockdict[lname].release()
405
        self._del_owned(lname)
406
      raise errors.LockError('non-existing lock in set (%s)' % name_fail)
407

  
408
    return True
409

  
410
  def release(self, names=None):
411
    """Release a set of resource locks, at the same level.
412

  
413
    You must have acquired the locks, either in shared or in exclusive mode,
414
    before releasing them.
415

  
416
    Args:
417
      names: the names of the locks which shall be released.
418
             (defaults to all the locks acquired at that level).
419

  
420
    """
421

  
422
    assert self._is_owned(), "release() on lock set while not owner"
423

  
424
    # Support passing in a single resource to release rather than many
425
    if isinstance(names, basestring):
426
      names = [names]
427

  
428
    if names is None:
429
      names = self._list_owned()
430
    else:
431
      names = set(names)
432
      assert self._list_owned().issuperset(names), (
433
               "release() on unheld resources %s" %
434
               names.difference(self._list_owned()))
435

  
436
    for lockname in names:
437
      # If we are sure the lock doesn't leave __lockdict without being
438
      # exclusively held we can do this...
439
      self.__lockdict[lockname].release()
440
      self._del_owned(lockname)
441

  
442
  def add(self, names, acquired=0, shared=0):
443
    """Add a new set of elements to the set
444

  
445
    Args:
446
      names: names of the new elements to add
447
      acquired: pre-acquire the new resource?
448
      shared: is the pre-acquisition shared?
449

  
450
    """
451
    # Support passing in a single resource to add rather than many
452
    if isinstance(names, basestring):
453
      names = [names]
454

  
455
    # Acquire the internal lock in an exclusive way, so there cannot be a
456
    # conflicting add()
457
    self.__lock.acquire()
458
    try:
459
      invalid_names = self.__names().intersection(names)
460
      if invalid_names:
461
        # This must be an explicit raise, not an assert, because assert is
462
        # turned off when using optimization, and this can happen because of
463
        # concurrency even if the user doesn't want it.
464
        raise errors.LockError("duplicate add() (%s)" % invalid_names)
465

  
466
      for lockname in names:
467
        lock = SharedLock()
468

  
469
        if acquired:
470
          lock.acquire(shared=shared)
471
          # now the lock cannot be deleted, we have it!
472
          try:
473
            self._add_owned(lockname)
474
          except:
475
            # We shouldn't have problems adding the lock to the owners list,
476
            # but if we did we'll try to release this lock and re-raise
477
            # exception.  Of course something is going to be really wrong,
478
            # after this.  On the other hand the lock hasn't been added to the
479
            # __lockdict yet so no other threads should be pending on it. This
480
            # release is just a safety measure.
481
            lock.release()
482
            raise
483

  
484
        self.__lockdict[lockname] = lock
485

  
486
    finally:
487
      self.__lock.release()
488

  
489
    return True
490

  
491
  def remove(self, names, blocking=1):
492
    """Remove elements from the lock set.
493

  
494
    You can either not hold anything in the lockset or already hold a superset
495
    of the elements you want to delete, exclusively.
496

  
497
    Args:
498
      names: names of the resource to remove.
499
      blocking: whether to block while trying to acquire or to operate in
500
                try-lock mode.  this locking mode is not supported yet unless
501
                you are already holding exclusively the locks.
502

  
503
    Returns:
504
      A list of lock which we failed to delete. The list is always empty if we
505
      were holding all the locks exclusively.
506

  
507
    """
508
    if not blocking and not self._is_owned():
509
      # We don't have non-blocking mode for now
510
      raise NotImplementedError
511

  
512
    # Support passing in a single resource to remove rather than many
513
    if isinstance(names, basestring):
514
      names = [names]
515

  
516
    # If we own any subset of this lock it must be a superset of what we want
517
    # to delete. The ownership must also be exclusive, but that will be checked
518
    # by the lock itself.
519
    assert not self._is_owned() or self._list_owned().issuperset(names), (
520
      "remove() on acquired lockset while not owning all elements")
521

  
522
    delete_failed=[]
523

  
524
    for lname in names:
525
      # Calling delete() acquires the lock exclusively if we don't already own
526
      # it, and causes all pending and subsequent lock acquires to fail. It's
527
      # fine to call it out of order because delete() also implies release(),
528
      # and the assertion above guarantees that if we either already hold
529
      # everything we want to delete, or we hold none.
530
      try:
531
        self.__lockdict[lname].delete()
532
      except (KeyError, errors.LockError):
533
        delete_failed.append(lname)
534
        # This cannot happen if we were already holding it, verify:
535
        assert not self._is_owned(), "remove failed while holding lockset"
536
      else:
537
        # If no LockError was raised we are the ones who deleted the lock.
538
        # This means we can safely remove it from lockdict, as any further or
539
        # pending delete() or acquire() will fail (and nobody can have the lock
540
        # since before our call to delete()).
541
        #
542
        # This is done in an else clause because if the exception was thrown
543
        # it's the job of the one who actually deleted it.
544
        del self.__lockdict[lname]
545
        # And let's remove it from our private list if we owned it.
546
        if self._is_owned():
547
          self._del_owned(lname)
548

  
549
    return delete_failed
550

  

Also available in: Unified diff