Enable disk growth with exclusive storage
[ganeti-local] / lib / locking.py
index 5a02fd5..7a23af6 100644 (file)
@@ -53,7 +53,14 @@ _LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
 
 # Internal lock acquisition modes for L{LockSet}
 (_LS_ACQUIRE_EXACT,
- _LS_ACQUIRE_ALL) = range(1, 3)
+ _LS_ACQUIRE_ALL,
+ _LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4)
+
+_LS_ACQUIRE_MODES = compat.UniqueFrozenset([
+  _LS_ACQUIRE_EXACT,
+  _LS_ACQUIRE_ALL,
+  _LS_ACQUIRE_OPPORTUNISTIC,
+  ])
 
 
 def ssynchronized(mylock, shared=0):
@@ -917,6 +924,52 @@ class SharedLock(object):
 ALL_SET = None
 
 
+def _TimeoutZero():
+  """Returns the number zero.
+
+  """
+  return 0
+
+
+def _GetLsAcquireModeAndTimeouts(want_all, timeout, opportunistic):
+  """Determines modes and timeouts for L{LockSet.acquire}.
+
+  @type want_all: boolean
+  @param want_all: Whether all locks in set should be acquired
+  @param timeout: Timeout in seconds or C{None}
+  @param opportunistic: Whther locks should be acquired opportunistically
+  @rtype: tuple
+  @return: Tuple containing mode to be passed to L{LockSet.__acquire_inner}
+    (one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for
+    acquiring the lockset-internal lock (might be C{None}) and a function to
+    calculate the timeout for acquiring individual locks
+
+  """
+  # Short circuit when no running timeout is needed
+  if opportunistic and not want_all:
+    assert timeout is None, "Got timeout for an opportunistic acquisition"
+    return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero)
+
+  # We need to keep track of how long we spent waiting for a lock. The
+  # timeout passed to this function is over all lock acquisitions.
+  running_timeout = utils.RunningTimeout(timeout, False)
+
+  if want_all:
+    mode = _LS_ACQUIRE_ALL
+    ls_timeout_fn = running_timeout.Remaining
+  else:
+    mode = _LS_ACQUIRE_EXACT
+    ls_timeout_fn = None
+
+  if opportunistic:
+    mode = _LS_ACQUIRE_OPPORTUNISTIC
+    timeout_fn = _TimeoutZero
+  else:
+    timeout_fn = running_timeout.Remaining
+
+  return (mode, ls_timeout_fn, timeout_fn)
+
+
 class _AcquireTimeout(Exception):
   """Internal exception to abort an acquire on a timeout.
 
@@ -1033,6 +1086,18 @@ class LockSet:
     else:
       return False
 
+  def owning_all(self):
+    """Checks whether current thread owns internal lock.
+
+    Holding the internal lock is equivalent with holding all locks in the set
+    (the opposite does not necessarily hold as it can not be easily
+    determined). L{add} and L{remove} require the internal lock.
+
+    @rtype: boolean
+
+    """
+    return self.__lock.is_owned()
+
   def _add_owned(self, name=None):
     """Note the current thread owns the given lock"""
     if name is None:
@@ -1102,9 +1167,12 @@ class LockSet:
     return set(result)
 
   def acquire(self, names, timeout=None, shared=0, priority=None,
-              test_notify=None):
+              opportunistic=False, test_notify=None):
     """Acquire a set of resource locks.
 
+    @note: When acquiring locks opportunistically, any number of locks might
+      actually be acquired, even zero.
+
     @type names: list of strings (or string)
     @param names: the names of the locks which shall be acquired
         (special lock names, or instance/node names)
@@ -1112,9 +1180,16 @@ class LockSet:
     @param shared: whether to acquire in shared mode; by default an
         exclusive lock will be acquired
     @type timeout: float or None
-    @param timeout: Maximum time to acquire all locks
+    @param timeout: Maximum time to acquire all locks; for opportunistic
+      acquisitions, a timeout can only be given when C{names} is C{None}, in
+      which case it is exclusively used for acquiring the L{LockSet}-internal
+      lock; opportunistic acquisitions don't use a timeout for acquiring
+      individual locks
     @type priority: integer
     @param priority: Priority for acquiring locks
+    @type opportunistic: boolean
+    @param opportunistic: Acquire locks opportunistically; use the return value
+      to determine which locks were actually acquired
     @type test_notify: callable or None
     @param test_notify: Special callback function for unittesting
 
@@ -1134,20 +1209,26 @@ class LockSet:
     if priority is None:
       priority = _DEFAULT_PRIORITY
 
-    # We need to keep track of how long we spent waiting for a lock. The
-    # timeout passed to this function is over all lock acquires.
-    running_timeout = utils.RunningTimeout(timeout, False)
-
     try:
       if names is not None:
+        assert timeout is None or not opportunistic, \
+          ("Opportunistic acquisitions can only use a timeout if no"
+           " names are given; see docstring for details")
+
         # Support passing in a single resource to acquire rather than many
         if isinstance(names, basestring):
           names = [names]
 
-        return self.__acquire_inner(names, _LS_ACQUIRE_EXACT, shared, priority,
-                                    running_timeout.Remaining, test_notify)
+        (mode, _, timeout_fn) = \
+          _GetLsAcquireModeAndTimeouts(False, timeout, opportunistic)
+
+        return self.__acquire_inner(names, mode, shared, priority,
+                                    timeout_fn, test_notify)
 
       else:
+        (mode, ls_timeout_fn, timeout_fn) = \
+          _GetLsAcquireModeAndTimeouts(True, timeout, opportunistic)
+
         # If no names are given acquire the whole set by not letting new names
         # being added before we release, and getting the current list of names.
         # Some of them may then be deleted later, but we'll cope with this.
@@ -1158,15 +1239,15 @@ class LockSet:
         # anyway, though, so we'll get the list lock exclusively as well in
         # order to be able to do add() on the set while owning it.
         if not self.__lock.acquire(shared=shared, priority=priority,
-                                   timeout=running_timeout.Remaining()):
+                                   timeout=ls_timeout_fn()):
           raise _AcquireTimeout()
+
         try:
           # note we own the set-lock
           self._add_owned()
 
-          return self.__acquire_inner(self.__names(), _LS_ACQUIRE_ALL, shared,
-                                      priority, running_timeout.Remaining,
-                                      test_notify)
+          return self.__acquire_inner(self.__names(), mode, shared,
+                                      priority, timeout_fn, test_notify)
         except:
           # We shouldn't have problems adding the lock to the owners list, but
           # if we did we'll try to release this lock and re-raise exception.
@@ -1182,15 +1263,27 @@ class LockSet:
                       timeout_fn, test_notify):
     """Inner logic for acquiring a number of locks.
 
+    Acquisition modes:
+
+      - C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but
+        deleted locks can be ignored as the whole set is being acquired with
+        its internal lock held
+      - C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired;
+        timeouts and deleted locks are fatal
+      - C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially
+        all within the set) which should be acquired opportunistically, that is
+        failures are ignored
+
     @param names: Names of the locks to be acquired
-    @param mode: Lock acquisition mode
+    @param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES})
     @param shared: Whether to acquire in shared mode
-    @param timeout_fn: Function returning remaining timeout
+    @param timeout_fn: Function returning remaining timeout (C{None} for
+      opportunistic acquisitions)
     @param priority: Priority for acquiring locks
     @param test_notify: Special callback function for unittesting
 
     """
-    assert mode in (_LS_ACQUIRE_EXACT, _LS_ACQUIRE_ALL)
+    assert mode in _LS_ACQUIRE_MODES
 
     acquire_list = []
 
@@ -1234,7 +1327,7 @@ class LockSet:
                                      priority=priority,
                                      test_notify=test_notify_fn)
         except errors.LockError:
-          if mode == _LS_ACQUIRE_ALL:
+          if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC):
             # We are acquiring the whole set, it doesn't matter if this
             # particular element is not there anymore.
             continue
@@ -1244,6 +1337,10 @@ class LockSet:
 
         if not acq_success:
           # Couldn't get lock or timeout occurred
+          if mode == _LS_ACQUIRE_OPPORTUNISTIC:
+            # Ignore timeouts on opportunistic acquisitions
+            continue
+
           if timeout is None:
             # This shouldn't happen as SharedLock.acquire(timeout=None) is
             # blocking.
@@ -1467,27 +1564,34 @@ class LockSet:
     return removed
 
 
-# Locking levels, must be acquired in increasing order.
-# Current rules are:
-#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
-#   acquired before performing any operation, either in shared or in exclusive
-#   mode. acquiring the BGL in exclusive mode is discouraged and should be
-#   avoided.
-#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
-#   If you need more than one node, or more than one instance, acquire them at
-#   the same time.
-LEVEL_CLUSTER = 0
-LEVEL_INSTANCE = 1
-LEVEL_NODEGROUP = 2
-LEVEL_NODE = 3
-#: Level for node resources, used for operations with possibly high impact on
-#: the node's disks.
-LEVEL_NODE_RES = 4
-LEVEL_NETWORK = 5
+# Locking levels, must be acquired in increasing order. Current rules are:
+# - At level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
+#   acquired before performing any operation, either in shared or exclusive
+#   mode. Acquiring the BGL in exclusive mode is discouraged and should be
+#   avoided..
+# - At levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. If
+#   you need more than one node, or more than one instance, acquire them at the
+#   same time.
+# - LEVEL_NODE_RES is for node resources and should be used by operations with
+#   possibly high impact on the node's disks.
+# - LEVEL_NODE_ALLOC blocks instance allocations for the whole cluster
+#   ("NAL" is the only lock at this level). It should be acquired in shared
+#   mode when an opcode blocks all or a significant amount of a cluster's
+#   locks. Opcodes doing instance allocations should acquire in exclusive mode.
+#   Once the set of acquired locks for an opcode has been reduced to the working
+#   set, the NAL should be released as well to allow allocations to proceed.
+(LEVEL_CLUSTER,
+ LEVEL_INSTANCE,
+ LEVEL_NODE_ALLOC,
+ LEVEL_NODEGROUP,
+ LEVEL_NODE,
+ LEVEL_NODE_RES,
+ LEVEL_NETWORK) = range(0, 7)
 
 LEVELS = [
   LEVEL_CLUSTER,
   LEVEL_INSTANCE,
+  LEVEL_NODE_ALLOC,
   LEVEL_NODEGROUP,
   LEVEL_NODE,
   LEVEL_NODE_RES,
@@ -1495,7 +1599,7 @@ LEVELS = [
   ]
 
 # Lock levels which are modifiable
-LEVELS_MOD = frozenset([
+LEVELS_MOD = compat.UniqueFrozenset([
   LEVEL_NODE_RES,
   LEVEL_NODE,
   LEVEL_NODEGROUP,
@@ -1507,6 +1611,7 @@ LEVELS_MOD = frozenset([
 LEVEL_NAMES = {
   LEVEL_CLUSTER: "cluster",
   LEVEL_INSTANCE: "instance",
+  LEVEL_NODE_ALLOC: "node-alloc",
   LEVEL_NODEGROUP: "nodegroup",
   LEVEL_NODE: "node",
   LEVEL_NODE_RES: "node-res",
@@ -1516,6 +1621,9 @@ LEVEL_NAMES = {
 # Constant for the big ganeti lock
 BGL = "BGL"
 
+#: Node allocation lock
+NAL = "NAL"
+
 
 class GanetiLockManager:
   """The Ganeti Locking Library
@@ -1528,15 +1636,15 @@ class GanetiLockManager:
   """
   _instance = None
 
-  def __init__(self, nodes, nodegroups, instances, networks):
+  def __init__(self, node_uuids, nodegroups, instance_names, networks):
     """Constructs a new GanetiLockManager object.
 
     There should be only a GanetiLockManager object at any time, so this
     function raises an error if this is not the case.
 
-    @param nodes: list of node names
+    @param node_uuids: list of node UUIDs
     @param nodegroups: list of nodegroup uuids
-    @param instances: list of instance names
+    @param instance_names: list of instance names
 
     """
     assert self.__class__._instance is None, \
@@ -1550,15 +1658,18 @@ class GanetiLockManager:
     # locking order.
     self.__keyring = {
       LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
-      LEVEL_NODE: LockSet(nodes, "node", monitor=self._monitor),
-      LEVEL_NODE_RES: LockSet(nodes, "node-res", monitor=self._monitor),
+      LEVEL_NODE: LockSet(node_uuids, "node", monitor=self._monitor),
+      LEVEL_NODE_RES: LockSet(node_uuids, "node-res", monitor=self._monitor),
       LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
-      LEVEL_INSTANCE: LockSet(instances, "instance", monitor=self._monitor),
+      LEVEL_INSTANCE: LockSet(instance_names, "instance",
+                              monitor=self._monitor),
       LEVEL_NETWORK: LockSet(networks, "network", monitor=self._monitor),
+      LEVEL_NODE_ALLOC: LockSet([NAL], "node-alloc", monitor=self._monitor),
       }
 
     assert compat.all(ls.name == LEVEL_NAMES[level]
-                      for (level, ls) in self.__keyring.items())
+                      for (level, ls) in self.__keyring.items()), \
+      "Keyring name mismatch"
 
   def AddToLockMonitor(self, provider):
     """Registers a new lock with the monitor.
@@ -1607,6 +1718,14 @@ class GanetiLockManager:
     """
     return self.__keyring[level].check_owned(names, shared=shared)
 
+  def owning_all(self, level):
+    """Checks whether current thread owns all locks at a certain level.
+
+    @see: L{LockSet.owning_all}
+
+    """
+    return self.__keyring[level].owning_all()
+
   def _upper_owned(self, level):
     """Check that we don't own any lock at a level greater than the given one.
 
@@ -1633,7 +1752,8 @@ class GanetiLockManager:
     """
     return level == LEVEL_CLUSTER and (names is None or BGL in names)
 
-  def acquire(self, level, names, timeout=None, shared=0, priority=None):
+  def acquire(self, level, names, timeout=None, shared=0, priority=None,
+              opportunistic=False):
     """Acquire a set of resource locks, at the same level.
 
     @type level: member of locking.LEVELS
@@ -1648,6 +1768,9 @@ class GanetiLockManager:
     @param timeout: Maximum time to acquire all locks
     @type priority: integer
     @param priority: Priority for acquiring lock
+    @type opportunistic: boolean
+    @param opportunistic: Acquire locks opportunistically; use the return value
+      to determine which locks were actually acquired
 
     """
     assert level in LEVELS, "Invalid locking level %s" % level
@@ -1667,7 +1790,8 @@ class GanetiLockManager:
 
     # Acquire the locks in the set.
     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
-                                         priority=priority)
+                                         priority=priority,
+                                         opportunistic=opportunistic)
 
   def downgrade(self, level, names=None):
     """Downgrade a set of resource locks from exclusive to shared mode.