Revision 19b9ba9a lib/locking.py

b/lib/locking.py
30 30
import threading
31 31
import time
32 32
import errno
33
import weakref
34
import logging
33 35

  
34 36
from ganeti import errors
35 37
from ganeti import utils
......
409 411

  
410 412
  """
411 413
  __slots__ = [
414
    "__weakref__",
412 415
    "__active_shr_c",
413 416
    "__inactive_shr_c",
414 417
    "__deleted",
......
421 424

  
422 425
  __condition_class = PipeCondition
423 426

  
424
  def __init__(self, name):
427
  def __init__(self, name, monitor=None):
425 428
    """Construct a new SharedLock.
426 429

  
427 430
    @param name: the name of the lock
431
    @type monitor: L{LockMonitor}
432
    @param monitor: Lock monitor with which to register
428 433

  
429 434
    """
430 435
    object.__init__(self)
......
448 453
    # is this lock in the deleted state?
449 454
    self.__deleted = False
450 455

  
456
    # Register with lock monitor
457
    if monitor:
458
      monitor.RegisterLock(self)
459

  
460
  def GetInfo(self, fields):
461
    """Retrieves information for querying locks.
462

  
463
    @type fields: list of strings
464
    @param fields: List of fields to return
465

  
466
    """
467
    self.__lock.acquire()
468
    try:
469
      info = []
470

  
471
      # Note: to avoid unintentional race conditions, no references to
472
      # modifiable objects should be returned unless they were created in this
473
      # function.
474
      for fname in fields:
475
        if fname == "name":
476
          info.append(self.name)
477
        elif fname == "mode":
478
          if self.__deleted:
479
            info.append("deleted")
480
            assert not (self.__exc or self.__shr)
481
          elif self.__exc:
482
            info.append("exclusive")
483
          elif self.__shr:
484
            info.append("shared")
485
          else:
486
            info.append(None)
487
        elif fname == "owner":
488
          if self.__exc:
489
            owner = [self.__exc]
490
          else:
491
            owner = self.__shr
492

  
493
          if owner:
494
            assert not self.__deleted
495
            info.append([i.getName() for i in owner])
496
          else:
497
            info.append(None)
498
        else:
499
          raise errors.OpExecError("Invalid query field '%s'" % fname)
500

  
501
      return info
502
    finally:
503
      self.__lock.release()
504

  
451 505
  def __check_deleted(self):
452 506
    """Raises an exception if the lock has been deleted.
453 507

  
......
671 725
        self.__deleted = True
672 726
        self.__exc = None
673 727

  
728
        assert not (self.__exc or self.__shr), "Found owner during deletion"
729

  
674 730
        # Notify all acquires. They'll throw an error.
675 731
        while self.__pending:
676 732
          self.__pending.pop().notifyAll()
......
713 769
  @ivar name: the name of the lockset
714 770

  
715 771
  """
716
  def __init__(self, members, name):
772
  def __init__(self, members, name, monitor=None):
717 773
    """Constructs a new LockSet.
718 774

  
719 775
    @type members: list of strings
720 776
    @param members: initial members of the set
777
    @type monitor: L{LockMonitor}
778
    @param monitor: Lock monitor with which to register member locks
721 779

  
722 780
    """
723 781
    assert members is not None, "members parameter is not a list"
724 782
    self.name = name
725 783

  
784
    # Lock monitor
785
    self.__monitor = monitor
786

  
726 787
    # Used internally to guarantee coherency.
727 788
    self.__lock = SharedLock(name)
728 789

  
......
731 792
    self.__lockdict = {}
732 793

  
733 794
    for mname in members:
734
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname))
795
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
796
                                          monitor=monitor)
735 797

  
736 798
    # The owner dict contains the set of locks each thread owns. For
737 799
    # performance each thread can access its own key without a global lock on
......
1055 1117
                               (invalid_names, self.name))
1056 1118

  
1057 1119
      for lockname in names:
1058
        lock = SharedLock(self._GetLockName(lockname))
1120
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1059 1121

  
1060 1122
        if acquired:
1061 1123
          lock.acquire(shared=shared)
......
1193 1255

  
1194 1256
    self.__class__._instance = self
1195 1257

  
1258
    self._monitor = LockMonitor()
1259

  
1196 1260
    # The keyring contains all the locks, at their level and in the correct
1197 1261
    # locking order.
1198 1262
    self.__keyring = {
1199
      LEVEL_CLUSTER: LockSet([BGL], "bgl lockset"),
1200
      LEVEL_NODE: LockSet(nodes, "nodes lockset"),
1201
      LEVEL_INSTANCE: LockSet(instances, "instances lockset"),
1202
    }
1263
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1264
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1265
      LEVEL_INSTANCE: LockSet(instances, "instances",
1266
                              monitor=self._monitor),
1267
      }
1268

  
1269
  def QueryLocks(self, fields, sync):
1270
    """Queries information from all locks.
1271

  
1272
    See L{LockMonitor.QueryLocks}.
1273

  
1274
    """
1275
    return self._monitor.QueryLocks(fields, sync)
1203 1276

  
1204 1277
  def _names(self, level):
1205 1278
    """List the lock names at the given level.
......
1352 1425
           "Cannot remove locks at a level while not owning it or"
1353 1426
           " owning some at a greater one")
1354 1427
    return self.__keyring[level].remove(names)
1428

  
1429

  
1430
class LockMonitor(object):
1431
  _LOCK_ATTR = "_lock"
1432

  
1433
  def __init__(self):
1434
    """Initializes this class.
1435

  
1436
    """
1437
    self._lock = SharedLock("LockMonitor")
1438

  
1439
    # Tracked locks. Weak references are used to avoid issues with circular
1440
    # references and deletion.
1441
    self._locks = weakref.WeakKeyDictionary()
1442

  
1443
  @ssynchronized(_LOCK_ATTR)
1444
  def RegisterLock(self, lock):
1445
    """Registers a new lock.
1446

  
1447
    """
1448
    logging.debug("Registering lock %s", lock.name)
1449
    assert lock not in self._locks, "Duplicate lock registration"
1450
    assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
1451
           "Found duplicate lock name"
1452
    self._locks[lock] = None
1453

  
1454
  @ssynchronized(_LOCK_ATTR)
1455
  def _GetLockInfo(self, fields):
1456
    """Get information from all locks while the monitor lock is held.
1457

  
1458
    """
1459
    result = {}
1460

  
1461
    for lock in self._locks.keys():
1462
      assert lock.name not in result, "Found duplicate lock name"
1463
      result[lock.name] = lock.GetInfo(fields)
1464

  
1465
    return result
1466

  
1467
  def QueryLocks(self, fields, sync):
1468
    """Queries information from all locks.
1469

  
1470
    @type fields: list of strings
1471
    @param fields: List of fields to return
1472
    @type sync: boolean
1473
    @param sync: Whether to operate in synchronous mode
1474

  
1475
    """
1476
    if sync:
1477
      raise NotImplementedError("Synchronous queries are not implemented")
1478

  
1479
    # Get all data without sorting
1480
    result = self._GetLockInfo(fields)
1481

  
1482
    # Sort by name
1483
    return [result[name] for name in utils.NiceSort(result.keys())]

Also available in: Unified diff