Xen: remove two end-of-line semicolons
[ganeti-local] / lib / locking.py
index 3004621..2144735 100644 (file)
@@ -29,6 +29,25 @@ from ganeti import errors
 from ganeti import utils
 
 
+def ssynchronized(lock, shared=0):
+  """Shared Synchronization decorator.
+
+  Calls the function holding the given lock, either in exclusive or shared
+  mode. It requires the passed lock to be a SharedLock (or support its
+  semantics).
+
+  """
+  def wrap(fn):
+    def sync_function(*args, **kwargs):
+      lock.acquire(shared=shared)
+      try:
+        return fn(*args, **kwargs)
+      finally:
+        lock.release()
+    return sync_function
+  return wrap
+
+
 class SharedLock:
   """Implements a shared lock.
 
@@ -55,6 +74,7 @@ class SharedLock:
     # lock waiters
     self.__nwait_exc = 0
     self.__nwait_shr = 0
+    self.__npass_shr = 0
 
     # is this lock in the deleted state?
     self.__deleted = False
@@ -93,13 +113,13 @@ class SharedLock:
     """
     self.__lock.acquire()
     try:
-      result = self.__is_owned(shared)
+      result = self.__is_owned(shared=shared)
     finally:
       self.__lock.release()
 
     return result
 
-  def __wait(self,c):
+  def __wait(self, c):
     """Wait on the given condition, and raise an exception if the current lock
     is declared deleted in the meantime.
 
@@ -133,6 +153,7 @@ class SharedLock:
     finally:
       self.__nwait_exc -= 1
 
+    assert self.__npass_shr == 0, "SharedLock: internal fairness violation"
 
   def acquire(self, blocking=1, shared=0):
     """Acquire a shared lock.
@@ -140,8 +161,8 @@ class SharedLock:
     Args:
       shared: whether to acquire in shared mode. By default an exclusive lock
               will be acquired.
-      blocking: whether to block while trying to acquire or to operate in try-lock mode.
-                this locking mode is not supported yet.
+      blocking: whether to block while trying to acquire or to operate in
+                try-lock mode. this locking mode is not supported yet.
 
     """
     if not blocking:
@@ -155,25 +176,35 @@ class SharedLock:
 
       # We cannot acquire the lock if we already have it
       assert not self.__is_owned(), "double acquire() on a non-recursive lock"
+      assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
 
       if shared:
         self.__nwait_shr += 1
         try:
+          wait = False
           # If there is an exclusive holder waiting we have to wait.  We'll
           # only do this once, though, when we start waiting for the lock. Then
           # we'll just wait while there are no exclusive holders.
           if self.__nwait_exc > 0:
             # TODO: if !blocking...
+            wait = True
             self.__wait(self.__turn_shr)
 
           while self.__exc is not None:
+            wait = True
             # TODO: if !blocking...
             self.__wait(self.__turn_shr)
 
           self.__shr.add(threading.currentThread())
+
+          # If we were waiting note that we passed
+          if wait:
+            self.__npass_shr -= 1
+
         finally:
           self.__nwait_shr -= 1
 
+        assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
       else:
         # TODO: if !blocking...
         # (or modify __exclusive_acquire for non-blocking mode)
@@ -193,6 +224,7 @@ class SharedLock:
     """
     self.__lock.acquire()
     try:
+      assert self.__npass_shr >= 0, "Internal fairness condition weirdness"
       # Autodetect release type
       if self.__is_exclusive():
         self.__exc = None
@@ -201,21 +233,26 @@ class SharedLock:
         # mode if there are shared holders waiting. Otherwise wake up the next
         # exclusive holder.
         if self.__nwait_shr > 0:
+          # Make sure at least the ones which were blocked pass.
+          self.__npass_shr = self.__nwait_shr
           self.__turn_shr.notifyAll()
         elif self.__nwait_exc > 0:
-         self.__turn_exc.notify()
+          self.__turn_exc.notify()
 
       elif self.__is_sharer():
         self.__shr.remove(threading.currentThread())
 
-        # If there are shared holders waiting there *must* be an exclusive holder
-        # waiting as well; otherwise what were they waiting for?
-        assert (self.__nwait_shr == 0 or self.__nwait_exc > 0,
-                "Lock sharers waiting while no exclusive is queueing")
-
-        # If there are no more shared holders and some exclusive holders are
-        # waiting let's wake one up.
-        if len(self.__shr) == 0 and self.__nwait_exc > 0:
+        # If there are shared holders waiting (and not just scheduled to pass)
+        # there *must* be an exclusive holder waiting as well; otherwise what
+        # were they waiting for?
+        assert (self.__nwait_exc > 0 or self.__npass_shr == self.__nwait_shr), \
+               "Lock sharers waiting while no exclusive is queueing"
+
+        # If there are no more shared holders either in or scheduled to pass,
+        # and some exclusive holders are waiting let's wake one up.
+        if (len(self.__shr) == 0 and
+            self.__nwait_exc > 0 and
+            not self.__npass_shr > 0):
           self.__turn_exc.notify()
 
       else:
@@ -303,18 +340,27 @@ class LockSet:
     """Is the current thread a current level owner?"""
     return threading.currentThread() in self.__owners
 
-  def _add_owned(self, name):
+  def _add_owned(self, name=None):
     """Note the current thread owns the given lock"""
-    if self._is_owned():
-      self.__owners[threading.currentThread()].add(name)
+    if name is None:
+      if not self._is_owned():
+        self.__owners[threading.currentThread()] = set()
     else:
-       self.__owners[threading.currentThread()] = set([name])
+      if self._is_owned():
+        self.__owners[threading.currentThread()].add(name)
+      else:
+        self.__owners[threading.currentThread()] = set([name])
+
 
-  def _del_owned(self, name):
+  def _del_owned(self, name=None):
     """Note the current thread owns the given lock"""
-    self.__owners[threading.currentThread()].remove(name)
 
-    if not self.__owners[threading.currentThread()]:
+    if name is not None:
+      self.__owners[threading.currentThread()].remove(name)
+
+    # Only remove the key if we don't hold the set-lock as well
+    if (not self.__lock._is_owned() and
+        not self.__owners[threading.currentThread()]):
       del self.__owners[threading.currentThread()]
 
   def _list_owned(self):
@@ -331,19 +377,20 @@ class LockSet:
     result after releasing the lock.
 
     """
-    return set(self.__lockdict.keys())
+    return self.__lockdict.keys()
 
   def _names(self):
     """Return a copy of the current set of elements.
 
     Used only for debugging purposes.
+
     """
     self.__lock.acquire(shared=1)
     try:
       result = self.__names()
     finally:
       self.__lock.release()
-    return result
+    return set(result)
 
   def acquire(self, names, blocking=1, shared=0):
     """Acquire a set of resource locks.
@@ -353,8 +400,8 @@ class LockSet:
              (special lock names, or instance/node names)
       shared: whether to acquire in shared mode. By default an exclusive lock
               will be acquired.
-      blocking: whether to block while trying to acquire or to operate in try-lock mode.
-                this locking mode is not supported yet.
+      blocking: whether to block while trying to acquire or to operate in
+                try-lock mode.  this locking mode is not supported yet.
 
     Returns:
       True: when all the locks are successfully acquired
@@ -372,49 +419,87 @@ class LockSet:
     # Check we don't already own locks at this level
     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
 
-    # Support passing in a single resource to acquire rather than many
-    if isinstance(names, basestring):
-      names = [names]
-    else:
-      names.sort()
-
-    acquire_list = []
-    # First we look the locks up on __lockdict. We have no way of being sure
-    # they will still be there after, but this makes it a lot faster should
-    # just one of them be the already wrong
-    for lname in names:
-      try:
-        lock = self.__lockdict[lname] # raises KeyError if the lock is not there
-        acquire_list.append((lname, lock))
-      except (KeyError):
-        raise errors.LockError('non-existing lock in set (%s)' % lname)
-
-    # This will hold the locknames we effectively acquired.
-    acquired = set()
-    # Now acquire_list contains a sorted list of resources and locks we want.
-    # In order to get them we loop on this (private) list and acquire() them.
-    # We gave no real guarantee they will still exist till this is done but
-    # .acquire() itself is safe and will alert us if the lock gets deleted.
-    for (lname, lock) in acquire_list:
+    if names is None:
+      # If no names are given acquire the whole set by not letting new names
+      # being added before we release, and getting the current list of names.
+      # Some of them may then be deleted later, but we'll cope with this.
+      #
+      # We'd like to acquire this lock in a shared way, as it's nice if
+      # everybody else can use the instances at the same time. If are acquiring
+      # them exclusively though they won't be able to do this anyway, though,
+      # so we'll get the list lock exclusively as well in order to be able to
+      # do add() on the set while owning it.
+      self.__lock.acquire(shared=shared)
       try:
-        lock.acquire(shared=shared) # raises LockError if the lock is deleted
+        # note we own the set-lock
+        self._add_owned()
+        names = self.__names()
+      except:
+        # We shouldn't have problems adding the lock to the owners list, but
+        # if we did we'll try to release this lock and re-raise exception.
+        # Of course something is going to be really wrong, after this.
+        self.__lock.release()
+        raise
+
+    try:
+      # Support passing in a single resource to acquire rather than many
+      if isinstance(names, basestring):
+        names = [names]
+      else:
+        names.sort()
+
+      acquire_list = []
+      # First we look the locks up on __lockdict. We have no way of being sure
+      # they will still be there after, but this makes it a lot faster should
+      # just one of them be the already wrong
+      for lname in names:
+        try:
+          lock = self.__lockdict[lname] # raises KeyError if lock is not there
+          acquire_list.append((lname, lock))
+        except (KeyError):
+          if self.__lock._is_owned():
+            # We are acquiring all the set, it doesn't matter if this particular
+            # element is not there anymore.
+            continue
+          else:
+            raise errors.LockError('non-existing lock in set (%s)' % lname)
+
+      # This will hold the locknames we effectively acquired.
+      acquired = set()
+      # Now acquire_list contains a sorted list of resources and locks we want.
+      # In order to get them we loop on this (private) list and acquire() them.
+      # We gave no real guarantee they will still exist till this is done but
+      # .acquire() itself is safe and will alert us if the lock gets deleted.
+      for (lname, lock) in acquire_list:
         try:
+          lock.acquire(shared=shared) # raises LockError if the lock is deleted
           # now the lock cannot be deleted, we have it!
-          self._add_owned(lname)
+          self._add_owned(name=lname)
           acquired.add(lname)
+        except (errors.LockError):
+          if self.__lock._is_owned():
+            # We are acquiring all the set, it doesn't matter if this particular
+            # element is not there anymore.
+            continue
+          else:
+            name_fail = lname
+            for lname in self._list_owned():
+              self.__lockdict[lname].release()
+              self._del_owned(name=lname)
+            raise errors.LockError('non-existing lock in set (%s)' % name_fail)
         except:
           # We shouldn't have problems adding the lock to the owners list, but
           # if we did we'll try to release this lock and re-raise exception.
           # Of course something is going to be really wrong, after this.
-          lock.release()
-          raise
+          if lock._is_owned():
+            lock.release()
+            raise
 
-      except (errors.LockError):
-        name_fail = lname
-        for lname in self._list_owned():
-          self.__lockdict[lname].release()
-          self._del_owned(lname)
-        raise errors.LockError('non-existing lock in set (%s)' % name_fail)
+    except:
+      # If something went wrong and we had the set-lock let's release it...
+      if self.__lock._is_owned():
+        self.__lock.release()
+      raise
 
     return acquired
 
@@ -429,7 +514,6 @@ class LockSet:
              (defaults to all the locks acquired at that level).
 
     """
-
     assert self._is_owned(), "release() on lock set while not owner"
 
     # Support passing in a single resource to release rather than many
@@ -444,11 +528,17 @@ class LockSet:
                "release() on unheld resources %s" %
                names.difference(self._list_owned()))
 
+    # First of all let's release the "all elements" lock, if set.
+    # After this 'add' can work again
+    if self.__lock._is_owned():
+      self.__lock.release()
+      self._del_owned()
+
     for lockname in names:
       # If we are sure the lock doesn't leave __lockdict without being
       # exclusively held we can do this...
       self.__lockdict[lockname].release()
-      self._del_owned(lockname)
+      self._del_owned(name=lockname)
 
   def add(self, names, acquired=0, shared=0):
     """Add a new set of elements to the set
@@ -459,15 +549,23 @@ class LockSet:
       shared: is the pre-acquisition shared?
 
     """
+
+    assert not self.__lock._is_owned(shared=1), (
+           "Cannot add new elements while sharing the set-lock")
+
     # Support passing in a single resource to add rather than many
     if isinstance(names, basestring):
       names = [names]
 
-    # Acquire the internal lock in an exclusive way, so there cannot be a
-    # conflicting add()
-    self.__lock.acquire()
+    # If we don't already own the set-level lock acquire it in an exclusive way
+    # we'll get it and note we need to release it later.
+    release_lock = False
+    if not self.__lock._is_owned():
+      release_lock = True
+      self.__lock.acquire()
+
     try:
-      invalid_names = self.__names().intersection(names)
+      invalid_names = set(self.__names()).intersection(names)
       if invalid_names:
         # This must be an explicit raise, not an assert, because assert is
         # turned off when using optimization, and this can happen because of
@@ -481,7 +579,7 @@ class LockSet:
           lock.acquire(shared=shared)
           # now the lock cannot be deleted, we have it!
           try:
-            self._add_owned(lockname)
+            self._add_owned(name=lockname)
           except:
             # We shouldn't have problems adding the lock to the owners list,
             # but if we did we'll try to release this lock and re-raise
@@ -495,7 +593,9 @@ class LockSet:
         self.__lockdict[lockname] = lock
 
     finally:
-      self.__lock.release()
+      # Only release __lock if we were not holding it previously.
+      if release_lock:
+        self.__lock.release()
 
     return True
 
@@ -555,7 +655,7 @@ class LockSet:
         del self.__lockdict[lname]
         # And let's remove it from our private list if we owned it.
         if self._is_owned():
-          self._del_owned(lname)
+          self._del_owned(name=lname)
 
     return removed
 
@@ -569,24 +669,19 @@ class LockSet:
 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
 #   If you need more than one node, or more than one instance, acquire them at
 #   the same time.
-#  - level LEVEL_CONFIG contains the configuration lock, which you must acquire
-#  before reading or changing the config file.
 LEVEL_CLUSTER = 0
-LEVEL_NODE = 1
-LEVEL_INSTANCE = 2
-LEVEL_CONFIG = 3
+LEVEL_INSTANCE = 1
+LEVEL_NODE = 2
 
 LEVELS = [LEVEL_CLUSTER,
-          LEVEL_NODE,
           LEVEL_INSTANCE,
-          LEVEL_CONFIG]
+          LEVEL_NODE]
 
 # Lock levels which are modifiable
 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
 
-# Constant for the big ganeti lock and config lock
+# Constant for the big ganeti lock
 BGL = 'BGL'
-CONFIG = 'config'
 
 
 class GanetiLockManager:
@@ -603,9 +698,8 @@ class GanetiLockManager:
   def __init__(self, nodes=None, instances=None):
     """Constructs a new GanetiLockManager object.
 
-    There should be only a
-    GanetiLockManager object at any time, so this function raises an error if this
-    is not the case.
+    There should be only a GanetiLockManager object at any time, so this
+    function raises an error if this is not the case.
 
     Args:
       nodes: list of node names
@@ -621,7 +715,6 @@ class GanetiLockManager:
       LEVEL_CLUSTER: LockSet([BGL]),
       LEVEL_NODE: LockSet(nodes),
       LEVEL_INSTANCE: LockSet(instances),
-      LEVEL_CONFIG: LockSet([CONFIG]),
     }
 
   def _names(self, level):
@@ -680,8 +773,8 @@ class GanetiLockManager:
              (special lock names, or instance/node names)
       shared: whether to acquire in shared mode. By default an exclusive lock
               will be acquired.
-      blocking: whether to block while trying to acquire or to operate in try-lock mode.
-                this locking mode is not supported yet.
+      blocking: whether to block while trying to acquire or to operate in
+                try-lock mode.  this locking mode is not supported yet.
 
     """
     assert level in LEVELS, "Invalid locking level %s" % level
@@ -696,7 +789,7 @@ class GanetiLockManager:
             "You must own the Big Ganeti Lock before acquiring any other")
 
     # Check we don't own locks at the same or upper levels.
-    assert not self._upper_owned(level), ("Cannot acquire locks at a level" 
+    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
            " while owning some at a greater one")
 
     # Acquire the locks in the set.
@@ -766,5 +859,4 @@ class GanetiLockManager:
     assert self._is_owned(level) or not self._upper_owned(level), (
            "Cannot remove locks at a level while not owning it or"
            " owning some at a greater one")
-    return self.__keyring[level].remove(names, blocking)
-
+    return self.__keyring[level].remove(names, blocking=blocking)