Initial GanetiLockManager implementation
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 # pylint: disable-msg=W0613,W0201
24
25 import threading
26 # Wouldn't it be better to define LockingError in the locking module?
27 # Well, for now that's how the rest of the code does it...
28 from ganeti import errors
29 from ganeti import utils
30
31
32 class SharedLock:
33   """Implements a shared lock.
34
35   Multiple threads can acquire the lock in a shared way, calling
36   acquire_shared().  In order to acquire the lock in an exclusive way threads
37   can call acquire_exclusive().
38
39   The lock prevents starvation but does not guarantee that threads will acquire
40   the shared lock in the order they queued for it, just that they will
41   eventually do so.
42
43   """
44   def __init__(self):
45     """Construct a new SharedLock"""
46     # we have two conditions, c_shr and c_exc, sharing the same lock.
47     self.__lock = threading.Lock()
48     self.__turn_shr = threading.Condition(self.__lock)
49     self.__turn_exc = threading.Condition(self.__lock)
50
51     # current lock holders
52     self.__shr = set()
53     self.__exc = None
54
55     # lock waiters
56     self.__nwait_exc = 0
57     self.__nwait_shr = 0
58
59     # is this lock in the deleted state?
60     self.__deleted = False
61
62   def __is_sharer(self):
63     """Is the current thread sharing the lock at this time?"""
64     return threading.currentThread() in self.__shr
65
66   def __is_exclusive(self):
67     """Is the current thread holding the lock exclusively at this time?"""
68     return threading.currentThread() == self.__exc
69
70   def __is_owned(self, shared=-1):
71     """Is the current thread somehow owning the lock at this time?
72
73     This is a private version of the function, which presumes you're holding
74     the internal lock.
75
76     """
77     if shared < 0:
78       return self.__is_sharer() or self.__is_exclusive()
79     elif shared:
80       return self.__is_sharer()
81     else:
82       return self.__is_exclusive()
83
84   def _is_owned(self, shared=-1):
85     """Is the current thread somehow owning the lock at this time?
86
87     Args:
88       shared:
89         < 0: check for any type of ownership (default)
90         0: check for exclusive ownership
91         > 0: check for shared ownership
92
93     """
94     self.__lock.acquire()
95     try:
96       result = self.__is_owned(shared)
97     finally:
98       self.__lock.release()
99
100     return result
101
102   def __wait(self,c):
103     """Wait on the given condition, and raise an exception if the current lock
104     is declared deleted in the meantime.
105
106     Args:
107       c: condition to wait on
108
109     """
110     c.wait()
111     if self.__deleted:
112       raise errors.LockError('deleted lock')
113
114   def __exclusive_acquire(self):
115     """Acquire the lock exclusively.
116
117     This is a private function that presumes you are already holding the
118     internal lock. It's defined separately to avoid code duplication between
119     acquire() and delete()
120
121     """
122     self.__nwait_exc += 1
123     try:
124       # This is to save ourselves from a nasty race condition that could
125       # theoretically make the sharers starve.
126       if self.__nwait_shr > 0 or self.__nwait_exc > 1:
127         self.__wait(self.__turn_exc)
128
129       while len(self.__shr) > 0 or self.__exc is not None:
130         self.__wait(self.__turn_exc)
131
132       self.__exc = threading.currentThread()
133     finally:
134       self.__nwait_exc -= 1
135
136
137   def acquire(self, blocking=1, shared=0):
138     """Acquire a shared lock.
139
140     Args:
141       shared: whether to acquire in shared mode. By default an exclusive lock
142               will be acquired.
143       blocking: whether to block while trying to acquire or to operate in try-lock mode.
144                 this locking mode is not supported yet.
145
146     """
147     if not blocking:
148       # We don't have non-blocking mode for now
149       raise NotImplementedError
150
151     self.__lock.acquire()
152     try:
153       if self.__deleted:
154         raise errors.LockError('deleted lock')
155
156       # We cannot acquire the lock if we already have it
157       assert not self.__is_owned(), "double acquire() on a non-recursive lock"
158
159       if shared:
160         self.__nwait_shr += 1
161         try:
162           # If there is an exclusive holder waiting we have to wait.  We'll
163           # only do this once, though, when we start waiting for the lock. Then
164           # we'll just wait while there are no exclusive holders.
165           if self.__nwait_exc > 0:
166             # TODO: if !blocking...
167             self.__wait(self.__turn_shr)
168
169           while self.__exc is not None:
170             # TODO: if !blocking...
171             self.__wait(self.__turn_shr)
172
173           self.__shr.add(threading.currentThread())
174         finally:
175           self.__nwait_shr -= 1
176
177       else:
178         # TODO: if !blocking...
179         # (or modify __exclusive_acquire for non-blocking mode)
180         self.__exclusive_acquire()
181
182     finally:
183       self.__lock.release()
184
185     return True
186
187   def release(self):
188     """Release a Shared Lock.
189
190     You must have acquired the lock, either in shared or in exclusive mode,
191     before calling this function.
192
193     """
194     self.__lock.acquire()
195     try:
196       # Autodetect release type
197       if self.__is_exclusive():
198         self.__exc = None
199
200         # An exclusive holder has just had the lock, time to put it in shared
201         # mode if there are shared holders waiting. Otherwise wake up the next
202         # exclusive holder.
203         if self.__nwait_shr > 0:
204           self.__turn_shr.notifyAll()
205         elif self.__nwait_exc > 0:
206          self.__turn_exc.notify()
207
208       elif self.__is_sharer():
209         self.__shr.remove(threading.currentThread())
210
211         # If there are shared holders waiting there *must* be an exclusive holder
212         # waiting as well; otherwise what were they waiting for?
213         assert (self.__nwait_shr == 0 or self.__nwait_exc > 0,
214                 "Lock sharers waiting while no exclusive is queueing")
215
216         # If there are no more shared holders and some exclusive holders are
217         # waiting let's wake one up.
218         if len(self.__shr) == 0 and self.__nwait_exc > 0:
219           self.__turn_exc.notify()
220
221       else:
222         assert False, "Cannot release non-owned lock"
223
224     finally:
225       self.__lock.release()
226
227   def delete(self, blocking=1):
228     """Delete a Shared Lock.
229
230     This operation will declare the lock for removal. First the lock will be
231     acquired in exclusive mode if you don't already own it, then the lock
232     will be put in a state where any future and pending acquire() fail.
233
234     Args:
235       blocking: whether to block while trying to acquire or to operate in
236                 try-lock mode.  this locking mode is not supported yet unless
237                 you are already holding exclusively the lock.
238
239     """
240     self.__lock.acquire()
241     try:
242       assert not self.__is_sharer(), "cannot delete() a lock while sharing it"
243
244       if self.__deleted:
245         raise errors.LockError('deleted lock')
246
247       if not self.__is_exclusive():
248         if not blocking:
249           # We don't have non-blocking mode for now
250           raise NotImplementedError
251         self.__exclusive_acquire()
252
253       self.__deleted = True
254       self.__exc = None
255       # Wake up everybody, they will fail acquiring the lock and
256       # raise an exception instead.
257       self.__turn_exc.notifyAll()
258       self.__turn_shr.notifyAll()
259
260     finally:
261       self.__lock.release()
262
263
264 class LockSet:
265   """Implements a set of locks.
266
267   This abstraction implements a set of shared locks for the same resource type,
268   distinguished by name. The user can lock a subset of the resources and the
269   LockSet will take care of acquiring the locks always in the same order, thus
270   preventing deadlock.
271
272   All the locks needed in the same set must be acquired together, though.
273
274   """
275   def __init__(self, members=None):
276     """Constructs a new LockSet.
277
278     Args:
279       members: initial members of the set
280
281     """
282     # Used internally to guarantee coherency.
283     self.__lock = SharedLock()
284
285     # The lockdict indexes the relationship name -> lock
286     # The order-of-locking is implied by the alphabetical order of names
287     self.__lockdict = {}
288
289     if members is not None:
290       for name in members:
291         self.__lockdict[name] = SharedLock()
292
293     # The owner dict contains the set of locks each thread owns. For
294     # performance each thread can access its own key without a global lock on
295     # this structure. It is paramount though that *no* other type of access is
296     # done to this structure (eg. no looping over its keys). *_owner helper
297     # function are defined to guarantee access is correct, but in general never
298     # do anything different than __owners[threading.currentThread()], or there
299     # will be trouble.
300     self.__owners = {}
301
302   def _is_owned(self):
303     """Is the current thread a current level owner?"""
304     return threading.currentThread() in self.__owners
305
306   def _add_owned(self, name):
307     """Note the current thread owns the given lock"""
308     if self._is_owned():
309       self.__owners[threading.currentThread()].add(name)
310     else:
311        self.__owners[threading.currentThread()] = set([name])
312
313   def _del_owned(self, name):
314     """Note the current thread owns the given lock"""
315     self.__owners[threading.currentThread()].remove(name)
316
317     if not self.__owners[threading.currentThread()]:
318       del self.__owners[threading.currentThread()]
319
320   def _list_owned(self):
321     """Get the set of resource names owned by the current thread"""
322     if self._is_owned():
323       return self.__owners[threading.currentThread()].copy()
324     else:
325       return set()
326
327   def __names(self):
328     """Return the current set of names.
329
330     Only call this function while holding __lock and don't iterate on the
331     result after releasing the lock.
332
333     """
334     return set(self.__lockdict.keys())
335
336   def _names(self):
337     """Return a copy of the current set of elements.
338
339     Used only for debugging purposes.
340     """
341     self.__lock.acquire(shared=1)
342     try:
343       result = self.__names()
344     finally:
345       self.__lock.release()
346     return result
347
348   def acquire(self, names, blocking=1, shared=0):
349     """Acquire a set of resource locks.
350
351     Args:
352       names: the names of the locks which shall be acquired.
353              (special lock names, or instance/node names)
354       shared: whether to acquire in shared mode. By default an exclusive lock
355               will be acquired.
356       blocking: whether to block while trying to acquire or to operate in try-lock mode.
357                 this locking mode is not supported yet.
358
359     Returns:
360       True: when all the locks are successfully acquired
361
362     Raises:
363       errors.LockError: when any lock we try to acquire has been deleted
364       before we succeed. In this case none of the locks requested will be
365       acquired.
366
367     """
368     if not blocking:
369       # We don't have non-blocking mode for now
370       raise NotImplementedError
371
372     # Check we don't already own locks at this level
373     assert not self._is_owned(), "Cannot acquire locks in the same set twice"
374
375     # Support passing in a single resource to acquire rather than many
376     if isinstance(names, basestring):
377       names = [names]
378     else:
379       names.sort()
380
381     acquire_list = []
382     # First we look the locks up on __lockdict. We have no way of being sure
383     # they will still be there after, but this makes it a lot faster should
384     # just one of them be the already wrong
385     try:
386       for lname in names:
387         lock = self.__lockdict[lname] # raises KeyError if the lock is not there
388         acquire_list.append((lname, lock))
389     except (KeyError):
390       raise errors.LockError('non-existing lock in set (%s)' % lname)
391
392     # Now acquire_list contains a sorted list of resources and locks we want.
393     # In order to get them we loop on this (private) list and acquire() them.
394     # We gave no real guarantee they will still exist till this is done but
395     # .acquire() itself is safe and will alert us if the lock gets deleted.
396     try:
397       for (lname, lock) in acquire_list:
398         lock.acquire(shared=shared) # raises LockError if the lock is deleted
399         try:
400           # now the lock cannot be deleted, we have it!
401           self._add_owned(lname)
402         except:
403           # We shouldn't have problems adding the lock to the owners list, but
404           # if we did we'll try to release this lock and re-raise exception.
405           # Of course something is going to be really wrong, after this.
406           lock.release()
407           raise
408
409     except (errors.LockError):
410       name_fail = lname
411       for lname in self._list_owned():
412         self.__lockdict[lname].release()
413         self._del_owned(lname)
414       raise errors.LockError('non-existing lock in set (%s)' % name_fail)
415
416     return True
417
418   def release(self, names=None):
419     """Release a set of resource locks, at the same level.
420
421     You must have acquired the locks, either in shared or in exclusive mode,
422     before releasing them.
423
424     Args:
425       names: the names of the locks which shall be released.
426              (defaults to all the locks acquired at that level).
427
428     """
429
430     assert self._is_owned(), "release() on lock set while not owner"
431
432     # Support passing in a single resource to release rather than many
433     if isinstance(names, basestring):
434       names = [names]
435
436     if names is None:
437       names = self._list_owned()
438     else:
439       names = set(names)
440       assert self._list_owned().issuperset(names), (
441                "release() on unheld resources %s" %
442                names.difference(self._list_owned()))
443
444     for lockname in names:
445       # If we are sure the lock doesn't leave __lockdict without being
446       # exclusively held we can do this...
447       self.__lockdict[lockname].release()
448       self._del_owned(lockname)
449
450   def add(self, names, acquired=0, shared=0):
451     """Add a new set of elements to the set
452
453     Args:
454       names: names of the new elements to add
455       acquired: pre-acquire the new resource?
456       shared: is the pre-acquisition shared?
457
458     """
459     # Support passing in a single resource to add rather than many
460     if isinstance(names, basestring):
461       names = [names]
462
463     # Acquire the internal lock in an exclusive way, so there cannot be a
464     # conflicting add()
465     self.__lock.acquire()
466     try:
467       invalid_names = self.__names().intersection(names)
468       if invalid_names:
469         # This must be an explicit raise, not an assert, because assert is
470         # turned off when using optimization, and this can happen because of
471         # concurrency even if the user doesn't want it.
472         raise errors.LockError("duplicate add() (%s)" % invalid_names)
473
474       for lockname in names:
475         lock = SharedLock()
476
477         if acquired:
478           lock.acquire(shared=shared)
479           # now the lock cannot be deleted, we have it!
480           try:
481             self._add_owned(lockname)
482           except:
483             # We shouldn't have problems adding the lock to the owners list,
484             # but if we did we'll try to release this lock and re-raise
485             # exception.  Of course something is going to be really wrong,
486             # after this.  On the other hand the lock hasn't been added to the
487             # __lockdict yet so no other threads should be pending on it. This
488             # release is just a safety measure.
489             lock.release()
490             raise
491
492         self.__lockdict[lockname] = lock
493
494     finally:
495       self.__lock.release()
496
497     return True
498
499   def remove(self, names, blocking=1):
500     """Remove elements from the lock set.
501
502     You can either not hold anything in the lockset or already hold a superset
503     of the elements you want to delete, exclusively.
504
505     Args:
506       names: names of the resource to remove.
507       blocking: whether to block while trying to acquire or to operate in
508                 try-lock mode.  this locking mode is not supported yet unless
509                 you are already holding exclusively the locks.
510
511     Returns:
512       A list of lock which we failed to delete. The list is always empty if we
513       were holding all the locks exclusively.
514
515     """
516     if not blocking and not self._is_owned():
517       # We don't have non-blocking mode for now
518       raise NotImplementedError
519
520     # Support passing in a single resource to remove rather than many
521     if isinstance(names, basestring):
522       names = [names]
523
524     # If we own any subset of this lock it must be a superset of what we want
525     # to delete. The ownership must also be exclusive, but that will be checked
526     # by the lock itself.
527     assert not self._is_owned() or self._list_owned().issuperset(names), (
528       "remove() on acquired lockset while not owning all elements")
529
530     delete_failed=[]
531
532     for lname in names:
533       # Calling delete() acquires the lock exclusively if we don't already own
534       # it, and causes all pending and subsequent lock acquires to fail. It's
535       # fine to call it out of order because delete() also implies release(),
536       # and the assertion above guarantees that if we either already hold
537       # everything we want to delete, or we hold none.
538       try:
539         self.__lockdict[lname].delete()
540       except (KeyError, errors.LockError):
541         delete_failed.append(lname)
542         # This cannot happen if we were already holding it, verify:
543         assert not self._is_owned(), "remove failed while holding lockset"
544       else:
545         # If no LockError was raised we are the ones who deleted the lock.
546         # This means we can safely remove it from lockdict, as any further or
547         # pending delete() or acquire() will fail (and nobody can have the lock
548         # since before our call to delete()).
549         #
550         # This is done in an else clause because if the exception was thrown
551         # it's the job of the one who actually deleted it.
552         del self.__lockdict[lname]
553         # And let's remove it from our private list if we owned it.
554         if self._is_owned():
555           self._del_owned(lname)
556
557     return delete_failed
558
559
560 # Locking levels, must be acquired in increasing order.
561 # Current rules are:
562 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
563 #   acquired before performing any operation, either in shared or in exclusive
564 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
565 #   avoided.
566 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
567 #   If you need more than one node, or more than one instance, acquire them at
568 #   the same time.
569 #  - level LEVEL_CONFIG contains the configuration lock, which you must acquire
570 #  before reading or changing the config file.
571 LEVEL_CLUSTER = 0
572 LEVEL_NODE = 1
573 LEVEL_INSTANCE = 2
574 LEVEL_CONFIG = 3
575
576 LEVELS = [LEVEL_CLUSTER,
577           LEVEL_NODE,
578           LEVEL_INSTANCE,
579           LEVEL_CONFIG]
580
581 # Lock levels which are modifiable
582 LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
583
584 # Constant for the big ganeti lock and config lock
585 BGL = 'BGL'
586 CONFIG = 'config'
587
588
589 class GanetiLockManager:
590   """The Ganeti Locking Library
591
592   The purpouse of this small library is to manage locking for ganeti clusters
593   in a central place, while at the same time doing dynamic checks against
594   possible deadlocks. It will also make it easier to transition to a different
595   lock type should we migrate away from python threads.
596
597   """
598   _instance = None
599
600   def __init__(self, nodes=None, instances=None):
601     """Constructs a new GanetiLockManager object.
602
603     There should be only a
604     GanetiLockManager object at any time, so this function raises an error if this
605     is not the case.
606
607     Args:
608       nodes: list of node names
609       instances: list of instance names
610
611     """
612     assert self.__class__._instance is None, "double GanetiLockManager instance"
613     self.__class__._instance = self
614
615     # The keyring contains all the locks, at their level and in the correct
616     # locking order.
617     self.__keyring = {
618       LEVEL_CLUSTER: LockSet([BGL]),
619       LEVEL_NODE: LockSet(nodes),
620       LEVEL_INSTANCE: LockSet(instances),
621       LEVEL_CONFIG: LockSet([CONFIG]),
622     }
623
624   def _names(self, level):
625     """List the lock names at the given level.
626     Used for debugging/testing purposes.
627
628     Args:
629       level: the level whose list of locks to get
630
631     """
632     assert level in LEVELS, "Invalid locking level %s" % level
633     return self.__keyring[level]._names()
634
635   def _is_owned(self, level):
636     """Check whether we are owning locks at the given level
637
638     """
639     return self.__keyring[level]._is_owned()
640
641   def _list_owned(self, level):
642     """Get the set of owned locks at the given level
643
644     """
645     return self.__keyring[level]._list_owned()
646
647   def _upper_owned(self, level):
648     """Check that we don't own any lock at a level greater than the given one.
649
650     """
651     # This way of checking only works if LEVELS[i] = i, which we check for in
652     # the test cases.
653     return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
654
655   def _BGL_owned(self):
656     """Check if the current thread owns the BGL.
657
658     Both an exclusive or a shared acquisition work.
659
660     """
661     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
662
663   def _contains_BGL(self, level, names):
664     """Check if acting on the given level and set of names will change the
665     status of the Big Ganeti Lock.
666
667     """
668     return level == LEVEL_CLUSTER and (names is None or BGL in names)
669
670   def acquire(self, level, names, blocking=1, shared=0):
671     """Acquire a set of resource locks, at the same level.
672
673     Args:
674       level: the level at which the locks shall be acquired.
675              It must be a memmber of LEVELS.
676       names: the names of the locks which shall be acquired.
677              (special lock names, or instance/node names)
678       shared: whether to acquire in shared mode. By default an exclusive lock
679               will be acquired.
680       blocking: whether to block while trying to acquire or to operate in try-lock mode.
681                 this locking mode is not supported yet.
682
683     """
684     assert level in LEVELS, "Invalid locking level %s" % level
685
686     # Check that we are either acquiring the Big Ganeti Lock or we already own
687     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
688     # so even if we've migrated we need to at least share the BGL to be
689     # compatible with them. Of course if we own the BGL exclusively there's no
690     # point in acquiring any other lock, unless perhaps we are half way through
691     # the migration of the current opcode.
692     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
693             "You must own the Big Ganeti Lock before acquiring any other")
694
695     # Check we don't own locks at the same or upper levels.
696     assert not self._upper_owned(level), ("Cannot acquire locks at a level" 
697            " while owning some at a greater one")
698
699     # Acquire the locks in the set.
700     return self.__keyring[level].acquire(names, shared=shared,
701                                          blocking=blocking)
702
703   def release(self, level, names=None):
704     """Release a set of resource locks, at the same level.
705
706     You must have acquired the locks, either in shared or in exclusive mode,
707     before releasing them.
708
709     Args:
710       level: the level at which the locks shall be released.
711              It must be a memmber of LEVELS.
712       names: the names of the locks which shall be released.
713              (defaults to all the locks acquired at that level).
714
715     """
716     assert level in LEVELS, "Invalid locking level %s" % level
717     assert (not self._contains_BGL(level, names) or
718             not self._upper_owned(LEVEL_CLUSTER)), (
719             "Cannot release the Big Ganeti Lock while holding something"
720             " at upper levels")
721
722     # Release will complain if we don't own the locks already
723     return self.__keyring[level].release(names)
724
725   def add(self, level, names, acquired=0, shared=0):
726     """Add locks at the specified level.
727
728     Args:
729       level: the level at which the locks shall be added.
730              It must be a memmber of LEVELS_MOD.
731       names: names of the locks to acquire
732       acquired: whether to acquire the newly added locks
733       shared: whether the acquisition will be shared
734     """
735     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
736     assert self._BGL_owned(), ("You must own the BGL before performing other"
737            " operations")
738     assert not self._upper_owned(level), ("Cannot add locks at a level"
739            " while owning some at a greater one")
740     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
741
742   def remove(self, level, names, blocking=1):
743     """Remove locks from the specified level.
744
745     You must either already own the locks you are trying to remove exclusively
746     or not own any lock at an upper level.
747
748     Args:
749       level: the level at which the locks shall be removed.
750              It must be a memmber of LEVELS_MOD.
751       names: the names of the locks which shall be removed.
752              (special lock names, or instance/node names)
753       blocking: whether to block while trying to operate in try-lock mode.
754                 this locking mode is not supported yet.
755
756     """
757     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
758     assert self._BGL_owned(), ("You must own the BGL before performing other"
759            " operations")
760     # Check we either own the level or don't own anything from here up.
761     # LockSet.remove() will check the case in which we don't own all the needed
762     # resources, or we have a shared ownership.
763     assert self._is_owned(level) or not self._upper_owned(level), (
764            "Cannot remove locks at a level while not owning it or"
765            " owning some at a greater one")
766     return self.__keyring[level].remove(names, blocking)
767