LockSet: make acquire() fail faster on wrong locks
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 # pylint: disable-msg=W0613,W0201
24
25 import threading
26 # Wouldn't it be better to define LockingError in the locking module?
27 # Well, for now that's how the rest of the code does it...
28 from ganeti import errors
29
30
31 class SharedLock:
32   """Implements a shared lock.
33
34   Multiple threads can acquire the lock in a shared way, calling
35   acquire_shared().  In order to acquire the lock in an exclusive way threads
36   can call acquire_exclusive().
37
38   The lock prevents starvation but does not guarantee that threads will acquire
39   the shared lock in the order they queued for it, just that they will
40   eventually do so.
41
42   """
43   def __init__(self):
44     """Construct a new SharedLock"""
45     # we have two conditions, c_shr and c_exc, sharing the same lock.
46     self.__lock = threading.Lock()
47     self.__turn_shr = threading.Condition(self.__lock)
48     self.__turn_exc = threading.Condition(self.__lock)
49
50     # current lock holders
51     self.__shr = set()
52     self.__exc = None
53
54     # lock waiters
55     self.__nwait_exc = 0
56     self.__nwait_shr = 0
57
58     # is this lock in the deleted state?
59     self.__deleted = False
60
61   def __is_sharer(self):
62     """Is the current thread sharing the lock at this time?"""
63     return threading.currentThread() in self.__shr
64
65   def __is_exclusive(self):
66     """Is the current thread holding the lock exclusively at this time?"""
67     return threading.currentThread() == self.__exc
68
69   def __is_owned(self, shared=-1):
70     """Is the current thread somehow owning the lock at this time?
71
72     This is a private version of the function, which presumes you're holding
73     the internal lock.
74
75     """
76     if shared < 0:
77       return self.__is_sharer() or self.__is_exclusive()
78     elif shared:
79       return self.__is_sharer()
80     else:
81       return self.__is_exclusive()
82
83   def _is_owned(self, shared=-1):
84     """Is the current thread somehow owning the lock at this time?
85
86     Args:
87       shared:
88         < 0: check for any type of ownership (default)
89         0: check for exclusive ownership
90         > 0: check for shared ownership
91
92     """
93     self.__lock.acquire()
94     try:
95       result = self.__is_owned(shared)
96     finally:
97       self.__lock.release()
98
99     return result
100
101   def __wait(self,c):
102     """Wait on the given condition, and raise an exception if the current lock
103     is declared deleted in the meantime.
104
105     Args:
106       c: condition to wait on
107
108     """
109     c.wait()
110     if self.__deleted:
111       raise errors.LockError('deleted lock')
112
113   def __exclusive_acquire(self):
114     """Acquire the lock exclusively.
115
116     This is a private function that presumes you are already holding the
117     internal lock. It's defined separately to avoid code duplication between
118     acquire() and delete()
119
120     """
121     self.__nwait_exc += 1
122     try:
123       # This is to save ourselves from a nasty race condition that could
124       # theoretically make the sharers starve.
125       if self.__nwait_shr > 0 or self.__nwait_exc > 1:
126         self.__wait(self.__turn_exc)
127
128       while len(self.__shr) > 0 or self.__exc is not None:
129         self.__wait(self.__turn_exc)
130
131       self.__exc = threading.currentThread()
132     finally:
133       self.__nwait_exc -= 1
134
135
136   def acquire(self, blocking=1, shared=0):
137     """Acquire a shared lock.
138
139     Args:
140       shared: whether to acquire in shared mode. By default an exclusive lock
141               will be acquired.
142       blocking: whether to block while trying to acquire or to operate in try-lock mode.
143                 this locking mode is not supported yet.
144
145     """
146     if not blocking:
147       # We don't have non-blocking mode for now
148       raise NotImplementedError
149
150     self.__lock.acquire()
151     try:
152       if self.__deleted:
153         raise errors.LockError('deleted lock')
154
155       # We cannot acquire the lock if we already have it
156       assert not self.__is_owned(), "double acquire() on a non-recursive lock"
157
158       if shared:
159         self.__nwait_shr += 1
160         try:
161           # If there is an exclusive holder waiting we have to wait.  We'll
162           # only do this once, though, when we start waiting for the lock. Then
163           # we'll just wait while there are no exclusive holders.
164           if self.__nwait_exc > 0:
165             # TODO: if !blocking...
166             self.__wait(self.__turn_shr)
167
168           while self.__exc is not None:
169             # TODO: if !blocking...
170             self.__wait(self.__turn_shr)
171
172           self.__shr.add(threading.currentThread())
173         finally:
174           self.__nwait_shr -= 1
175
176       else:
177         # TODO: if !blocking...
178         # (or modify __exclusive_acquire for non-blocking mode)
179         self.__exclusive_acquire()
180
181     finally:
182       self.__lock.release()
183
184     return True
185
186   def release(self):
187     """Release a Shared Lock.
188
189     You must have acquired the lock, either in shared or in exclusive mode,
190     before calling this function.
191
192     """
193     self.__lock.acquire()
194     try:
195       # Autodetect release type
196       if self.__is_exclusive():
197         self.__exc = None
198
199         # An exclusive holder has just had the lock, time to put it in shared
200         # mode if there are shared holders waiting. Otherwise wake up the next
201         # exclusive holder.
202         if self.__nwait_shr > 0:
203           self.__turn_shr.notifyAll()
204         elif self.__nwait_exc > 0:
205          self.__turn_exc.notify()
206
207       elif self.__is_sharer():
208         self.__shr.remove(threading.currentThread())
209
210         # If there are shared holders waiting there *must* be an exclusive holder
211         # waiting as well; otherwise what were they waiting for?
212         assert (self.__nwait_shr == 0 or self.__nwait_exc > 0,
213                 "Lock sharers waiting while no exclusive is queueing")
214
215         # If there are no more shared holders and some exclusive holders are
216         # waiting let's wake one up.
217         if len(self.__shr) == 0 and self.__nwait_exc > 0:
218           self.__turn_exc.notify()
219
220       else:
221         assert False, "Cannot release non-owned lock"
222
223     finally:
224       self.__lock.release()
225
226   def delete(self, blocking=1):
227     """Delete a Shared Lock.
228
229     This operation will declare the lock for removal. First the lock will be
230     acquired in exclusive mode if you don't already own it, then the lock
231     will be put in a state where any future and pending acquire() fail.
232
233     Args:
234       blocking: whether to block while trying to acquire or to operate in
235                 try-lock mode.  this locking mode is not supported yet unless
236                 you are already holding exclusively the lock.
237
238     """
239     self.__lock.acquire()
240     try:
241       assert not self.__is_sharer(), "cannot delete() a lock while sharing it"
242
243       if self.__deleted:
244         raise errors.LockError('deleted lock')
245
246       if not self.__is_exclusive():
247         if not blocking:
248           # We don't have non-blocking mode for now
249           raise NotImplementedError
250         self.__exclusive_acquire()
251
252       self.__deleted = True
253       self.__exc = None
254       # Wake up everybody, they will fail acquiring the lock and
255       # raise an exception instead.
256       self.__turn_exc.notifyAll()
257       self.__turn_shr.notifyAll()
258
259     finally:
260       self.__lock.release()
261
262
263 class LockSet:
264   """Implements a set of locks.
265
266   This abstraction implements a set of shared locks for the same resource type,
267   distinguished by name. The user can lock a subset of the resources and the
268   LockSet will take care of acquiring the locks always in the same order, thus
269   preventing deadlock.
270
271   All the locks needed in the same set must be acquired together, though.
272
273   """
274   def __init__(self, members=None):
275     """Constructs a new LockSet.
276
277     Args:
278       members: initial members of the set
279
280     """
281     # Used internally to guarantee coherency.
282     self.__lock = SharedLock()
283
284     # The lockdict indexes the relationship name -> lock
285     # The order-of-locking is implied by the alphabetical order of names
286     self.__lockdict = {}
287
288     if members is not None:
289       for name in members:
290         self.__lockdict[name] = SharedLock()
291
292     # The owner dict contains the set of locks each thread owns. For
293     # performance each thread can access its own key without a global lock on
294     # this structure. It is paramount though that *no* other type of access is
295     # done to this structure (eg. no looping over its keys). *_owner helper
296     # function are defined to guarantee access is correct, but in general never
297     # do anything different than __owners[threading.currentThread()], or there
298     # will be trouble.
299     self.__owners = {}
300
301   def _is_owned(self):
302     """Is the current thread a current level owner?"""
303     return threading.currentThread() in self.__owners
304
305   def _add_owned(self, name):
306     """Note the current thread owns the given lock"""
307     if self._is_owned():
308       self.__owners[threading.currentThread()].add(name)
309     else:
310        self.__owners[threading.currentThread()] = set([name])
311
312   def _del_owned(self, name):
313     """Note the current thread owns the given lock"""
314     self.__owners[threading.currentThread()].remove(name)
315
316     if not self.__owners[threading.currentThread()]:
317       del self.__owners[threading.currentThread()]
318
319   def _list_owned(self):
320     """Get the set of resource names owned by the current thread"""
321     if self._is_owned():
322       return self.__owners[threading.currentThread()].copy()
323     else:
324       return set()
325
326   def __names(self):
327     """Return the current set of names.
328
329     Only call this function while holding __lock and don't iterate on the
330     result after releasing the lock.
331
332     """
333     return set(self.__lockdict.keys())
334
335   def _names(self):
336     """Return a copy of the current set of elements.
337
338     Used only for debugging purposes.
339     """
340     self.__lock.acquire(shared=1)
341     try:
342       result = self.__names()
343     finally:
344       self.__lock.release()
345     return result
346
347   def acquire(self, names, blocking=1, shared=0):
348     """Acquire a set of resource locks.
349
350     Args:
351       names: the names of the locks which shall be acquired.
352              (special lock names, or instance/node names)
353       shared: whether to acquire in shared mode. By default an exclusive lock
354               will be acquired.
355       blocking: whether to block while trying to acquire or to operate in try-lock mode.
356                 this locking mode is not supported yet.
357
358     Returns:
359       True: when all the locks are successfully acquired
360
361     Raises:
362       errors.LockError: when any lock we try to acquire has been deleted
363       before we succeed. In this case none of the locks requested will be
364       acquired.
365
366     """
367     if not blocking:
368       # We don't have non-blocking mode for now
369       raise NotImplementedError
370
371     # Check we don't already own locks at this level
372     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
373
374     # Support passing in a single resource to acquire rather than many
375     if isinstance(names, basestring):
376       names = [names]
377     else:
378       names.sort()
379
380     acquire_list = []
381     # First we look the locks up on __lockdict. We have no way of being sure
382     # they will still be there after, but this makes it a lot faster should
383     # just one of them be the already wrong
384     try:
385       for lname in names:
386         lock = self.__lockdict[lname] # raises KeyError if the lock is not there
387         acquire_list.append((lname, lock))
388     except (KeyError):
389       raise errors.LockError('non-existing lock in set (%s)' % lname)
390
391     # Now acquire_list contains a sorted list of resources and locks we want.
392     # In order to get them we loop on this (private) list and acquire() them.
393     # We gave no real guarantee they will still exist till this is done but
394     # .acquire() itself is safe and will alert us if the lock gets deleted.
395     try:
396       for (lname, lock) in acquire_list:
397         lock.acquire(shared=shared) # raises LockError if the lock is deleted
398         try:
399           # now the lock cannot be deleted, we have it!
400           self._add_owned(lname)
401         except:
402           # We shouldn't have problems adding the lock to the owners list, but
403           # if we did we'll try to release this lock and re-raise exception.
404           # Of course something is going to be really wrong, after this.
405           lock.release()
406           raise
407
408     except (errors.LockError):
409       name_fail = lname
410       for lname in self._list_owned():
411         self.__lockdict[lname].release()
412         self._del_owned(lname)
413       raise errors.LockError('non-existing lock in set (%s)' % name_fail)
414
415     return True
416
417   def release(self, names=None):
418     """Release a set of resource locks, at the same level.
419
420     You must have acquired the locks, either in shared or in exclusive mode,
421     before releasing them.
422
423     Args:
424       names: the names of the locks which shall be released.
425              (defaults to all the locks acquired at that level).
426
427     """
428
429     assert self._is_owned(), "release() on lock set while not owner"
430
431     # Support passing in a single resource to release rather than many
432     if isinstance(names, basestring):
433       names = [names]
434
435     if names is None:
436       names = self._list_owned()
437     else:
438       names = set(names)
439       assert self._list_owned().issuperset(names), (
440                "release() on unheld resources %s" %
441                names.difference(self._list_owned()))
442
443     for lockname in names:
444       # If we are sure the lock doesn't leave __lockdict without being
445       # exclusively held we can do this...
446       self.__lockdict[lockname].release()
447       self._del_owned(lockname)
448
449   def add(self, names, acquired=0, shared=0):
450     """Add a new set of elements to the set
451
452     Args:
453       names: names of the new elements to add
454       acquired: pre-acquire the new resource?
455       shared: is the pre-acquisition shared?
456
457     """
458     # Support passing in a single resource to add rather than many
459     if isinstance(names, basestring):
460       names = [names]
461
462     # Acquire the internal lock in an exclusive way, so there cannot be a
463     # conflicting add()
464     self.__lock.acquire()
465     try:
466       invalid_names = self.__names().intersection(names)
467       if invalid_names:
468         # This must be an explicit raise, not an assert, because assert is
469         # turned off when using optimization, and this can happen because of
470         # concurrency even if the user doesn't want it.
471         raise errors.LockError("duplicate add() (%s)" % invalid_names)
472
473       for lockname in names:
474         lock = SharedLock()
475
476         if acquired:
477           lock.acquire(shared=shared)
478           # now the lock cannot be deleted, we have it!
479           try:
480             self._add_owned(lockname)
481           except:
482             # We shouldn't have problems adding the lock to the owners list,
483             # but if we did we'll try to release this lock and re-raise
484             # exception.  Of course something is going to be really wrong,
485             # after this.  On the other hand the lock hasn't been added to the
486             # __lockdict yet so no other threads should be pending on it. This
487             # release is just a safety measure.
488             lock.release()
489             raise
490
491         self.__lockdict[lockname] = lock
492
493     finally:
494       self.__lock.release()
495
496     return True
497
498   def remove(self, names, blocking=1):
499     """Remove elements from the lock set.
500
501     You can either not hold anything in the lockset or already hold a superset
502     of the elements you want to delete, exclusively.
503
504     Args:
505       names: names of the resource to remove.
506       blocking: whether to block while trying to acquire or to operate in
507                 try-lock mode.  this locking mode is not supported yet unless
508                 you are already holding exclusively the locks.
509
510     Returns:
511       A list of lock which we failed to delete. The list is always empty if we
512       were holding all the locks exclusively.
513
514     """
515     if not blocking and not self._is_owned():
516       # We don't have non-blocking mode for now
517       raise NotImplementedError
518
519     # Support passing in a single resource to remove rather than many
520     if isinstance(names, basestring):
521       names = [names]
522
523     # If we own any subset of this lock it must be a superset of what we want
524     # to delete. The ownership must also be exclusive, but that will be checked
525     # by the lock itself.
526     assert not self._is_owned() or self._list_owned().issuperset(names), (
527       "remove() on acquired lockset while not owning all elements")
528
529     delete_failed=[]
530
531     for lname in names:
532       # Calling delete() acquires the lock exclusively if we don't already own
533       # it, and causes all pending and subsequent lock acquires to fail. It's
534       # fine to call it out of order because delete() also implies release(),
535       # and the assertion above guarantees that if we either already hold
536       # everything we want to delete, or we hold none.
537       try:
538         self.__lockdict[lname].delete()
539       except (KeyError, errors.LockError):
540         delete_failed.append(lname)
541         # This cannot happen if we were already holding it, verify:
542         assert not self._is_owned(), "remove failed while holding lockset"
543       else:
544         # If no LockError was raised we are the ones who deleted the lock.
545         # This means we can safely remove it from lockdict, as any further or
546         # pending delete() or acquire() will fail (and nobody can have the lock
547         # since before our call to delete()).
548         #
549         # This is done in an else clause because if the exception was thrown
550         # it's the job of the one who actually deleted it.
551         del self.__lockdict[lname]
552         # And let's remove it from our private list if we owned it.
553         if self._is_owned():
554           self._del_owned(lname)
555
556     return delete_failed
557