cli: Move parsing of --net option to separate function
[ganeti-local] / lib / locking.py
index 0b419c7..f901490 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -30,12 +30,18 @@ import select
 import threading
 import time
 import errno
+import weakref
+import logging
 
 from ganeti import errors
 from ganeti import utils
 from ganeti import compat
 
 
+_EXCLUSIVE_TEXT = "exclusive"
+_SHARED_TEXT = "shared"
+
+
 def ssynchronized(mylock, shared=0):
   """Shared Synchronization decorator.
 
@@ -341,7 +347,7 @@ class PipeCondition(_BaseCondition):
 
   """
   __slots__ = [
-    "_nwaiters",
+    "_waiters",
     "_single_condition",
     ]
 
@@ -352,7 +358,7 @@ class PipeCondition(_BaseCondition):
 
     """
     _BaseCondition.__init__(self, lock)
-    self._nwaiters = 0
+    self._waiters = set()
     self._single_condition = self._single_condition_class(self._lock)
 
   def wait(self, timeout=None):
@@ -366,15 +372,14 @@ class PipeCondition(_BaseCondition):
 
     # Keep local reference to the pipe. It could be replaced by another thread
     # notifying while we're waiting.
-    my_condition = self._single_condition
+    cond = self._single_condition
 
-    assert self._nwaiters >= 0
-    self._nwaiters += 1
+    self._waiters.add(threading.currentThread())
     try:
-      my_condition.wait(timeout)
+      cond.wait(timeout)
     finally:
-      assert self._nwaiters > 0
-      self._nwaiters -= 1
+      self._check_owned()
+      self._waiters.remove(threading.currentThread())
 
   def notifyAll(self): # pylint: disable-msg=C0103
     """Notify all currently waiting threads.
@@ -384,28 +389,40 @@ class PipeCondition(_BaseCondition):
     self._single_condition.notifyAll()
     self._single_condition = self._single_condition_class(self._lock)
 
+  def get_waiting(self):
+    """Returns a list of all waiting threads.
+
+    """
+    self._check_owned()
+
+    return self._waiters
+
   def has_waiting(self):
     """Returns whether there are active waiters.
 
     """
     self._check_owned()
 
-    return bool(self._nwaiters)
+    return bool(self._waiters)
 
 
 class SharedLock(object):
   """Implements a shared lock.
 
-  Multiple threads can acquire the lock in a shared way, calling
-  acquire_shared().  In order to acquire the lock in an exclusive way threads
-  can call acquire_exclusive().
+  Multiple threads can acquire the lock in a shared way by calling
+  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
+  threads can call C{acquire(shared=0)}.
 
   The lock prevents starvation but does not guarantee that threads will acquire
   the shared lock in the order they queued for it, just that they will
   eventually do so.
 
+  @type name: string
+  @ivar name: the name of the lock
+
   """
   __slots__ = [
+    "__weakref__",
     "__active_shr_c",
     "__inactive_shr_c",
     "__deleted",
@@ -413,16 +430,23 @@ class SharedLock(object):
     "__lock",
     "__pending",
     "__shr",
+    "name",
     ]
 
   __condition_class = PipeCondition
 
-  def __init__(self):
+  def __init__(self, name, monitor=None):
     """Construct a new SharedLock.
 
+    @param name: the name of the lock
+    @type monitor: L{LockMonitor}
+    @param monitor: Lock monitor with which to register
+
     """
     object.__init__(self)
 
+    self.name = name
+
     # Internal lock
     self.__lock = threading.Lock()
 
@@ -440,12 +464,76 @@ class SharedLock(object):
     # is this lock in the deleted state?
     self.__deleted = False
 
+    # Register with lock monitor
+    if monitor:
+      monitor.RegisterLock(self)
+
+  def GetInfo(self, fields):
+    """Retrieves information for querying locks.
+
+    @type fields: list of strings
+    @param fields: List of fields to return
+
+    """
+    self.__lock.acquire()
+    try:
+      info = []
+
+      # Note: to avoid unintentional race conditions, no references to
+      # modifiable objects should be returned unless they were created in this
+      # function.
+      for fname in fields:
+        if fname == "name":
+          info.append(self.name)
+        elif fname == "mode":
+          if self.__deleted:
+            info.append("deleted")
+            assert not (self.__exc or self.__shr)
+          elif self.__exc:
+            info.append(_EXCLUSIVE_TEXT)
+          elif self.__shr:
+            info.append(_SHARED_TEXT)
+          else:
+            info.append(None)
+        elif fname == "owner":
+          if self.__exc:
+            owner = [self.__exc]
+          else:
+            owner = self.__shr
+
+          if owner:
+            assert not self.__deleted
+            info.append([i.getName() for i in owner])
+          else:
+            info.append(None)
+        elif fname == "pending":
+          data = []
+
+          for cond in self.__pending:
+            if cond in (self.__active_shr_c, self.__inactive_shr_c):
+              mode = _SHARED_TEXT
+            else:
+              mode = _EXCLUSIVE_TEXT
+
+            # This function should be fast as it runs with the lock held. Hence
+            # not using utils.NiceSort.
+            data.append((mode, sorted([i.getName()
+                                       for i in cond.get_waiting()])))
+
+          info.append(data)
+        else:
+          raise errors.OpExecError("Invalid query field '%s'" % fname)
+
+      return info
+    finally:
+      self.__lock.release()
+
   def __check_deleted(self):
     """Raises an exception if the lock has been deleted.
 
     """
     if self.__deleted:
-      raise errors.LockError("Deleted lock")
+      raise errors.LockError("Deleted lock %s" % self.name)
 
   def __is_sharer(self):
     """Is the current thread sharing the lock at this time?
@@ -537,7 +625,8 @@ class SharedLock(object):
     self.__check_deleted()
 
     # We cannot acquire the lock if we already have it
-    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
+    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
+                                   " %s" % self.name)
 
     # Check whether someone else holds the lock or there are pending acquires.
     if not self.__pending and self.__can_acquire(shared):
@@ -662,6 +751,8 @@ class SharedLock(object):
         self.__deleted = True
         self.__exc = None
 
+        assert not (self.__exc or self.__shr), "Found owner during deletion"
+
         # Notify all acquires. They'll throw an error.
         while self.__pending:
           self.__pending.pop().notifyAll()
@@ -700,24 +791,35 @@ class LockSet:
 
   All the locks needed in the same set must be acquired together, though.
 
+  @type name: string
+  @ivar name: the name of the lockset
+
   """
-  def __init__(self, members=None):
+  def __init__(self, members, name, monitor=None):
     """Constructs a new LockSet.
 
     @type members: list of strings
     @param members: initial members of the set
+    @type monitor: L{LockMonitor}
+    @param monitor: Lock monitor with which to register member locks
 
     """
+    assert members is not None, "members parameter is not a list"
+    self.name = name
+
+    # Lock monitor
+    self.__monitor = monitor
+
     # Used internally to guarantee coherency.
-    self.__lock = SharedLock()
+    self.__lock = SharedLock(name)
 
     # The lockdict indexes the relationship name -> lock
     # The order-of-locking is implied by the alphabetical order of names
     self.__lockdict = {}
 
-    if members is not None:
-      for name in members:
-        self.__lockdict[name] = SharedLock()
+    for mname in members:
+      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
+                                          monitor=monitor)
 
     # The owner dict contains the set of locks each thread owns. For
     # performance each thread can access its own key without a global lock on
@@ -728,6 +830,12 @@ class LockSet:
     # will be trouble.
     self.__owners = {}
 
+  def _GetLockName(self, mname):
+    """Returns the name for a member lock.
+
+    """
+    return "%s/%s" % (self.name, mname)
+
   def _is_owned(self):
     """Is the current thread a current level owner?"""
     return threading.currentThread() in self.__owners
@@ -824,7 +932,8 @@ class LockSet:
     assert timeout is None or timeout >= 0.0
 
     # Check we don't already own locks at this level
-    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
+    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
+                                  " (lockset %s)" % self.name)
 
     # We need to keep track of how long we spent waiting for a lock. The
     # timeout passed to this function is over all lock acquires.
@@ -894,7 +1003,8 @@ class LockSet:
           # element is not there anymore.
           continue
 
-        raise errors.LockError("Non-existing lock in set (%s)" % lname)
+        raise errors.LockError("Non-existing lock %s in set %s" %
+                               (lname, self.name))
 
       acquire_list.append((lname, lock))
 
@@ -925,14 +1035,16 @@ class LockSet:
             # particular element is not there anymore.
             continue
 
-          raise errors.LockError("Non-existing lock in set (%s)" % lname)
+          raise errors.LockError("Non-existing lock %s in set %s" %
+                                 (lname, self.name))
 
         if not acq_success:
           # Couldn't get lock or timeout occurred
           if timeout is None:
             # This shouldn't happen as SharedLock.acquire(timeout=None) is
             # blocking.
-            raise errors.LockError("Failed to get lock %s" % lname)
+            raise errors.LockError("Failed to get lock %s (set %s)" %
+                                   (lname, self.name))
 
           raise _AcquireTimeout()
 
@@ -967,7 +1079,8 @@ class LockSet:
         (defaults to all the locks acquired at that level).
 
     """
-    assert self._is_owned(), "release() on lock set while not owner"
+    assert self._is_owned(), ("release() on lock set %s while not owner" %
+                              self.name)
 
     # Support passing in a single resource to release rather than many
     if isinstance(names, basestring):
@@ -978,8 +1091,8 @@ class LockSet:
     else:
       names = set(names)
       assert self._list_owned().issuperset(names), (
-               "release() on unheld resources %s" %
-               names.difference(self._list_owned()))
+               "release() on unheld resources %s (set %s)" %
+               (names.difference(self._list_owned()), self.name))
 
     # First of all let's release the "all elements" lock, if set.
     # After this 'add' can work again
@@ -1006,7 +1119,8 @@ class LockSet:
     """
     # Check we don't already own locks at this level
     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
-      "Cannot add locks if the set is only partially owned, or shared"
+      ("Cannot add locks if the set %s is only partially owned, or shared" %
+       self.name)
 
     # Support passing in a single resource to add rather than many
     if isinstance(names, basestring):
@@ -1025,10 +1139,11 @@ class LockSet:
         # This must be an explicit raise, not an assert, because assert is
         # turned off when using optimization, and this can happen because of
         # concurrency even if the user doesn't want it.
-        raise errors.LockError("duplicate add() (%s)" % invalid_names)
+        raise errors.LockError("duplicate add(%s) on lockset %s" %
+                               (invalid_names, self.name))
 
       for lockname in names:
-        lock = SharedLock()
+        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
 
         if acquired:
           lock.acquire(shared=shared)
@@ -1076,7 +1191,8 @@ class LockSet:
     # to delete. The ownership must also be exclusive, but that will be checked
     # by the lock itself.
     assert not self._is_owned() or self._list_owned().issuperset(names), (
-      "remove() on acquired lockset while not owning all elements")
+      "remove() on acquired lockset %s while not owning all elements" %
+      self.name)
 
     removed = []
 
@@ -1091,7 +1207,8 @@ class LockSet:
         removed.append(lname)
       except (KeyError, errors.LockError):
         # This cannot happen if we were already holding it, verify:
-        assert not self._is_owned(), "remove failed while holding lockset"
+        assert not self._is_owned(), ("remove failed while holding lockset %s"
+                                      % self.name)
       else:
         # If no LockError was raised we are the ones who deleted the lock.
         # This means we can safely remove it from lockdict, as any further or
@@ -1164,13 +1281,24 @@ class GanetiLockManager:
 
     self.__class__._instance = self
 
+    self._monitor = LockMonitor()
+
     # The keyring contains all the locks, at their level and in the correct
     # locking order.
     self.__keyring = {
-      LEVEL_CLUSTER: LockSet([BGL]),
-      LEVEL_NODE: LockSet(nodes),
-      LEVEL_INSTANCE: LockSet(instances),
-    }
+      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
+      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
+      LEVEL_INSTANCE: LockSet(instances, "instances",
+                              monitor=self._monitor),
+      }
+
+  def QueryLocks(self, fields, sync):
+    """Queries information from all locks.
+
+    See L{LockMonitor.QueryLocks}.
+
+    """
+    return self._monitor.QueryLocks(fields, sync)
 
   def _names(self, level):
     """List the lock names at the given level.
@@ -1323,3 +1451,59 @@ class GanetiLockManager:
            "Cannot remove locks at a level while not owning it or"
            " owning some at a greater one")
     return self.__keyring[level].remove(names)
+
+
+class LockMonitor(object):
+  _LOCK_ATTR = "_lock"
+
+  def __init__(self):
+    """Initializes this class.
+
+    """
+    self._lock = SharedLock("LockMonitor")
+
+    # Tracked locks. Weak references are used to avoid issues with circular
+    # references and deletion.
+    self._locks = weakref.WeakKeyDictionary()
+
+  @ssynchronized(_LOCK_ATTR)
+  def RegisterLock(self, lock):
+    """Registers a new lock.
+
+    """
+    logging.debug("Registering lock %s", lock.name)
+    assert lock not in self._locks, "Duplicate lock registration"
+    assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
+           "Found duplicate lock name"
+    self._locks[lock] = None
+
+  @ssynchronized(_LOCK_ATTR)
+  def _GetLockInfo(self, fields):
+    """Get information from all locks while the monitor lock is held.
+
+    """
+    result = {}
+
+    for lock in self._locks.keys():
+      assert lock.name not in result, "Found duplicate lock name"
+      result[lock.name] = lock.GetInfo(fields)
+
+    return result
+
+  def QueryLocks(self, fields, sync):
+    """Queries information from all locks.
+
+    @type fields: list of strings
+    @param fields: List of fields to return
+    @type sync: boolean
+    @param sync: Whether to operate in synchronous mode
+
+    """
+    if sync:
+      raise NotImplementedError("Synchronous queries are not implemented")
+
+    # Get all data without sorting
+    result = self._GetLockInfo(fields)
+
+    # Sort by name
+    return [result[name] for name in utils.NiceSort(result.keys())]