cli: Move parsing of --net option to separate function
[ganeti-local] / lib / locking.py
index 0b419c7..f901490 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
 #
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -30,12 +30,18 @@ import select
 import threading
 import time
 import errno
 import threading
 import time
 import errno
+import weakref
+import logging
 
 from ganeti import errors
 from ganeti import utils
 from ganeti import compat
 
 
 
 from ganeti import errors
 from ganeti import utils
 from ganeti import compat
 
 
+_EXCLUSIVE_TEXT = "exclusive"
+_SHARED_TEXT = "shared"
+
+
 def ssynchronized(mylock, shared=0):
   """Shared Synchronization decorator.
 
 def ssynchronized(mylock, shared=0):
   """Shared Synchronization decorator.
 
@@ -341,7 +347,7 @@ class PipeCondition(_BaseCondition):
 
   """
   __slots__ = [
 
   """
   __slots__ = [
-    "_nwaiters",
+    "_waiters",
     "_single_condition",
     ]
 
     "_single_condition",
     ]
 
@@ -352,7 +358,7 @@ class PipeCondition(_BaseCondition):
 
     """
     _BaseCondition.__init__(self, lock)
 
     """
     _BaseCondition.__init__(self, lock)
-    self._nwaiters = 0
+    self._waiters = set()
     self._single_condition = self._single_condition_class(self._lock)
 
   def wait(self, timeout=None):
     self._single_condition = self._single_condition_class(self._lock)
 
   def wait(self, timeout=None):
@@ -366,15 +372,14 @@ class PipeCondition(_BaseCondition):
 
     # Keep local reference to the pipe. It could be replaced by another thread
     # notifying while we're waiting.
 
     # Keep local reference to the pipe. It could be replaced by another thread
     # notifying while we're waiting.
-    my_condition = self._single_condition
+    cond = self._single_condition
 
 
-    assert self._nwaiters >= 0
-    self._nwaiters += 1
+    self._waiters.add(threading.currentThread())
     try:
     try:
-      my_condition.wait(timeout)
+      cond.wait(timeout)
     finally:
     finally:
-      assert self._nwaiters > 0
-      self._nwaiters -= 1
+      self._check_owned()
+      self._waiters.remove(threading.currentThread())
 
   def notifyAll(self): # pylint: disable-msg=C0103
     """Notify all currently waiting threads.
 
   def notifyAll(self): # pylint: disable-msg=C0103
     """Notify all currently waiting threads.
@@ -384,28 +389,40 @@ class PipeCondition(_BaseCondition):
     self._single_condition.notifyAll()
     self._single_condition = self._single_condition_class(self._lock)
 
     self._single_condition.notifyAll()
     self._single_condition = self._single_condition_class(self._lock)
 
+  def get_waiting(self):
+    """Returns a list of all waiting threads.
+
+    """
+    self._check_owned()
+
+    return self._waiters
+
   def has_waiting(self):
     """Returns whether there are active waiters.
 
     """
     self._check_owned()
 
   def has_waiting(self):
     """Returns whether there are active waiters.
 
     """
     self._check_owned()
 
-    return bool(self._nwaiters)
+    return bool(self._waiters)
 
 
 class SharedLock(object):
   """Implements a shared lock.
 
 
 
 class SharedLock(object):
   """Implements a shared lock.
 
-  Multiple threads can acquire the lock in a shared way, calling
-  acquire_shared().  In order to acquire the lock in an exclusive way threads
-  can call acquire_exclusive().
+  Multiple threads can acquire the lock in a shared way by calling
+  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
+  threads can call C{acquire(shared=0)}.
 
   The lock prevents starvation but does not guarantee that threads will acquire
   the shared lock in the order they queued for it, just that they will
   eventually do so.
 
 
   The lock prevents starvation but does not guarantee that threads will acquire
   the shared lock in the order they queued for it, just that they will
   eventually do so.
 
+  @type name: string
+  @ivar name: the name of the lock
+
   """
   __slots__ = [
   """
   __slots__ = [
+    "__weakref__",
     "__active_shr_c",
     "__inactive_shr_c",
     "__deleted",
     "__active_shr_c",
     "__inactive_shr_c",
     "__deleted",
@@ -413,16 +430,23 @@ class SharedLock(object):
     "__lock",
     "__pending",
     "__shr",
     "__lock",
     "__pending",
     "__shr",
+    "name",
     ]
 
   __condition_class = PipeCondition
 
     ]
 
   __condition_class = PipeCondition
 
-  def __init__(self):
+  def __init__(self, name, monitor=None):
     """Construct a new SharedLock.
 
     """Construct a new SharedLock.
 
+    @param name: the name of the lock
+    @type monitor: L{LockMonitor}
+    @param monitor: Lock monitor with which to register
+
     """
     object.__init__(self)
 
     """
     object.__init__(self)
 
+    self.name = name
+
     # Internal lock
     self.__lock = threading.Lock()
 
     # Internal lock
     self.__lock = threading.Lock()
 
@@ -440,12 +464,76 @@ class SharedLock(object):
     # is this lock in the deleted state?
     self.__deleted = False
 
     # is this lock in the deleted state?
     self.__deleted = False
 
+    # Register with lock monitor
+    if monitor:
+      monitor.RegisterLock(self)
+
+  def GetInfo(self, fields):
+    """Retrieves information for querying locks.
+
+    @type fields: list of strings
+    @param fields: List of fields to return
+
+    """
+    self.__lock.acquire()
+    try:
+      info = []
+
+      # Note: to avoid unintentional race conditions, no references to
+      # modifiable objects should be returned unless they were created in this
+      # function.
+      for fname in fields:
+        if fname == "name":
+          info.append(self.name)
+        elif fname == "mode":
+          if self.__deleted:
+            info.append("deleted")
+            assert not (self.__exc or self.__shr)
+          elif self.__exc:
+            info.append(_EXCLUSIVE_TEXT)
+          elif self.__shr:
+            info.append(_SHARED_TEXT)
+          else:
+            info.append(None)
+        elif fname == "owner":
+          if self.__exc:
+            owner = [self.__exc]
+          else:
+            owner = self.__shr
+
+          if owner:
+            assert not self.__deleted
+            info.append([i.getName() for i in owner])
+          else:
+            info.append(None)
+        elif fname == "pending":
+          data = []
+
+          for cond in self.__pending:
+            if cond in (self.__active_shr_c, self.__inactive_shr_c):
+              mode = _SHARED_TEXT
+            else:
+              mode = _EXCLUSIVE_TEXT
+
+            # This function should be fast as it runs with the lock held. Hence
+            # not using utils.NiceSort.
+            data.append((mode, sorted([i.getName()
+                                       for i in cond.get_waiting()])))
+
+          info.append(data)
+        else:
+          raise errors.OpExecError("Invalid query field '%s'" % fname)
+
+      return info
+    finally:
+      self.__lock.release()
+
   def __check_deleted(self):
     """Raises an exception if the lock has been deleted.
 
     """
     if self.__deleted:
   def __check_deleted(self):
     """Raises an exception if the lock has been deleted.
 
     """
     if self.__deleted:
-      raise errors.LockError("Deleted lock")
+      raise errors.LockError("Deleted lock %s" % self.name)
 
   def __is_sharer(self):
     """Is the current thread sharing the lock at this time?
 
   def __is_sharer(self):
     """Is the current thread sharing the lock at this time?
@@ -537,7 +625,8 @@ class SharedLock(object):
     self.__check_deleted()
 
     # We cannot acquire the lock if we already have it
     self.__check_deleted()
 
     # We cannot acquire the lock if we already have it
-    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
+    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
+                                   " %s" % self.name)
 
     # Check whether someone else holds the lock or there are pending acquires.
     if not self.__pending and self.__can_acquire(shared):
 
     # Check whether someone else holds the lock or there are pending acquires.
     if not self.__pending and self.__can_acquire(shared):
@@ -662,6 +751,8 @@ class SharedLock(object):
         self.__deleted = True
         self.__exc = None
 
         self.__deleted = True
         self.__exc = None
 
+        assert not (self.__exc or self.__shr), "Found owner during deletion"
+
         # Notify all acquires. They'll throw an error.
         while self.__pending:
           self.__pending.pop().notifyAll()
         # Notify all acquires. They'll throw an error.
         while self.__pending:
           self.__pending.pop().notifyAll()
@@ -700,24 +791,35 @@ class LockSet:
 
   All the locks needed in the same set must be acquired together, though.
 
 
   All the locks needed in the same set must be acquired together, though.
 
+  @type name: string
+  @ivar name: the name of the lockset
+
   """
   """
-  def __init__(self, members=None):
+  def __init__(self, members, name, monitor=None):
     """Constructs a new LockSet.
 
     @type members: list of strings
     @param members: initial members of the set
     """Constructs a new LockSet.
 
     @type members: list of strings
     @param members: initial members of the set
+    @type monitor: L{LockMonitor}
+    @param monitor: Lock monitor with which to register member locks
 
     """
 
     """
+    assert members is not None, "members parameter is not a list"
+    self.name = name
+
+    # Lock monitor
+    self.__monitor = monitor
+
     # Used internally to guarantee coherency.
     # Used internally to guarantee coherency.
-    self.__lock = SharedLock()
+    self.__lock = SharedLock(name)
 
     # The lockdict indexes the relationship name -> lock
     # The order-of-locking is implied by the alphabetical order of names
     self.__lockdict = {}
 
 
     # The lockdict indexes the relationship name -> lock
     # The order-of-locking is implied by the alphabetical order of names
     self.__lockdict = {}
 
-    if members is not None:
-      for name in members:
-        self.__lockdict[name] = SharedLock()
+    for mname in members:
+      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
+                                          monitor=monitor)
 
     # The owner dict contains the set of locks each thread owns. For
     # performance each thread can access its own key without a global lock on
 
     # The owner dict contains the set of locks each thread owns. For
     # performance each thread can access its own key without a global lock on
@@ -728,6 +830,12 @@ class LockSet:
     # will be trouble.
     self.__owners = {}
 
     # will be trouble.
     self.__owners = {}
 
+  def _GetLockName(self, mname):
+    """Returns the name for a member lock.
+
+    """
+    return "%s/%s" % (self.name, mname)
+
   def _is_owned(self):
     """Is the current thread a current level owner?"""
     return threading.currentThread() in self.__owners
   def _is_owned(self):
     """Is the current thread a current level owner?"""
     return threading.currentThread() in self.__owners
@@ -824,7 +932,8 @@ class LockSet:
     assert timeout is None or timeout >= 0.0
 
     # Check we don't already own locks at this level
     assert timeout is None or timeout >= 0.0
 
     # Check we don't already own locks at this level
-    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
+    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
+                                  " (lockset %s)" % self.name)
 
     # We need to keep track of how long we spent waiting for a lock. The
     # timeout passed to this function is over all lock acquires.
 
     # We need to keep track of how long we spent waiting for a lock. The
     # timeout passed to this function is over all lock acquires.
@@ -894,7 +1003,8 @@ class LockSet:
           # element is not there anymore.
           continue
 
           # element is not there anymore.
           continue
 
-        raise errors.LockError("Non-existing lock in set (%s)" % lname)
+        raise errors.LockError("Non-existing lock %s in set %s" %
+                               (lname, self.name))
 
       acquire_list.append((lname, lock))
 
 
       acquire_list.append((lname, lock))
 
@@ -925,14 +1035,16 @@ class LockSet:
             # particular element is not there anymore.
             continue
 
             # particular element is not there anymore.
             continue
 
-          raise errors.LockError("Non-existing lock in set (%s)" % lname)
+          raise errors.LockError("Non-existing lock %s in set %s" %
+                                 (lname, self.name))
 
         if not acq_success:
           # Couldn't get lock or timeout occurred
           if timeout is None:
             # This shouldn't happen as SharedLock.acquire(timeout=None) is
             # blocking.
 
         if not acq_success:
           # Couldn't get lock or timeout occurred
           if timeout is None:
             # This shouldn't happen as SharedLock.acquire(timeout=None) is
             # blocking.
-            raise errors.LockError("Failed to get lock %s" % lname)
+            raise errors.LockError("Failed to get lock %s (set %s)" %
+                                   (lname, self.name))
 
           raise _AcquireTimeout()
 
 
           raise _AcquireTimeout()
 
@@ -967,7 +1079,8 @@ class LockSet:
         (defaults to all the locks acquired at that level).
 
     """
         (defaults to all the locks acquired at that level).
 
     """
-    assert self._is_owned(), "release() on lock set while not owner"
+    assert self._is_owned(), ("release() on lock set %s while not owner" %
+                              self.name)
 
     # Support passing in a single resource to release rather than many
     if isinstance(names, basestring):
 
     # Support passing in a single resource to release rather than many
     if isinstance(names, basestring):
@@ -978,8 +1091,8 @@ class LockSet:
     else:
       names = set(names)
       assert self._list_owned().issuperset(names), (
     else:
       names = set(names)
       assert self._list_owned().issuperset(names), (
-               "release() on unheld resources %s" %
-               names.difference(self._list_owned()))
+               "release() on unheld resources %s (set %s)" %
+               (names.difference(self._list_owned()), self.name))
 
     # First of all let's release the "all elements" lock, if set.
     # After this 'add' can work again
 
     # First of all let's release the "all elements" lock, if set.
     # After this 'add' can work again
@@ -1006,7 +1119,8 @@ class LockSet:
     """
     # Check we don't already own locks at this level
     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
     """
     # Check we don't already own locks at this level
     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
-      "Cannot add locks if the set is only partially owned, or shared"
+      ("Cannot add locks if the set %s is only partially owned, or shared" %
+       self.name)
 
     # Support passing in a single resource to add rather than many
     if isinstance(names, basestring):
 
     # Support passing in a single resource to add rather than many
     if isinstance(names, basestring):
@@ -1025,10 +1139,11 @@ class LockSet:
         # This must be an explicit raise, not an assert, because assert is
         # turned off when using optimization, and this can happen because of
         # concurrency even if the user doesn't want it.
         # This must be an explicit raise, not an assert, because assert is
         # turned off when using optimization, and this can happen because of
         # concurrency even if the user doesn't want it.
-        raise errors.LockError("duplicate add() (%s)" % invalid_names)
+        raise errors.LockError("duplicate add(%s) on lockset %s" %
+                               (invalid_names, self.name))
 
       for lockname in names:
 
       for lockname in names:
-        lock = SharedLock()
+        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
 
         if acquired:
           lock.acquire(shared=shared)
 
         if acquired:
           lock.acquire(shared=shared)
@@ -1076,7 +1191,8 @@ class LockSet:
     # to delete. The ownership must also be exclusive, but that will be checked
     # by the lock itself.
     assert not self._is_owned() or self._list_owned().issuperset(names), (
     # to delete. The ownership must also be exclusive, but that will be checked
     # by the lock itself.
     assert not self._is_owned() or self._list_owned().issuperset(names), (
-      "remove() on acquired lockset while not owning all elements")
+      "remove() on acquired lockset %s while not owning all elements" %
+      self.name)
 
     removed = []
 
 
     removed = []
 
@@ -1091,7 +1207,8 @@ class LockSet:
         removed.append(lname)
       except (KeyError, errors.LockError):
         # This cannot happen if we were already holding it, verify:
         removed.append(lname)
       except (KeyError, errors.LockError):
         # This cannot happen if we were already holding it, verify:
-        assert not self._is_owned(), "remove failed while holding lockset"
+        assert not self._is_owned(), ("remove failed while holding lockset %s"
+                                      % self.name)
       else:
         # If no LockError was raised we are the ones who deleted the lock.
         # This means we can safely remove it from lockdict, as any further or
       else:
         # If no LockError was raised we are the ones who deleted the lock.
         # This means we can safely remove it from lockdict, as any further or
@@ -1164,13 +1281,24 @@ class GanetiLockManager:
 
     self.__class__._instance = self
 
 
     self.__class__._instance = self
 
+    self._monitor = LockMonitor()
+
     # The keyring contains all the locks, at their level and in the correct
     # locking order.
     self.__keyring = {
     # The keyring contains all the locks, at their level and in the correct
     # locking order.
     self.__keyring = {
-      LEVEL_CLUSTER: LockSet([BGL]),
-      LEVEL_NODE: LockSet(nodes),
-      LEVEL_INSTANCE: LockSet(instances),
-    }
+      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
+      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
+      LEVEL_INSTANCE: LockSet(instances, "instances",
+                              monitor=self._monitor),
+      }
+
+  def QueryLocks(self, fields, sync):
+    """Queries information from all locks.
+
+    See L{LockMonitor.QueryLocks}.
+
+    """
+    return self._monitor.QueryLocks(fields, sync)
 
   def _names(self, level):
     """List the lock names at the given level.
 
   def _names(self, level):
     """List the lock names at the given level.
@@ -1323,3 +1451,59 @@ class GanetiLockManager:
            "Cannot remove locks at a level while not owning it or"
            " owning some at a greater one")
     return self.__keyring[level].remove(names)
            "Cannot remove locks at a level while not owning it or"
            " owning some at a greater one")
     return self.__keyring[level].remove(names)
+
+
+class LockMonitor(object):
+  _LOCK_ATTR = "_lock"
+
+  def __init__(self):
+    """Initializes this class.
+
+    """
+    self._lock = SharedLock("LockMonitor")
+
+    # Tracked locks. Weak references are used to avoid issues with circular
+    # references and deletion.
+    self._locks = weakref.WeakKeyDictionary()
+
+  @ssynchronized(_LOCK_ATTR)
+  def RegisterLock(self, lock):
+    """Registers a new lock.
+
+    """
+    logging.debug("Registering lock %s", lock.name)
+    assert lock not in self._locks, "Duplicate lock registration"
+    assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
+           "Found duplicate lock name"
+    self._locks[lock] = None
+
+  @ssynchronized(_LOCK_ATTR)
+  def _GetLockInfo(self, fields):
+    """Get information from all locks while the monitor lock is held.
+
+    """
+    result = {}
+
+    for lock in self._locks.keys():
+      assert lock.name not in result, "Found duplicate lock name"
+      result[lock.name] = lock.GetInfo(fields)
+
+    return result
+
+  def QueryLocks(self, fields, sync):
+    """Queries information from all locks.
+
+    @type fields: list of strings
+    @param fields: List of fields to return
+    @type sync: boolean
+    @param sync: Whether to operate in synchronous mode
+
+    """
+    if sync:
+      raise NotImplementedError("Synchronous queries are not implemented")
+
+    # Get all data without sorting
+    result = self._GetLockInfo(fields)
+
+    # Sort by name
+    return [result[name] for name in utils.NiceSort(result.keys())]