Revision 162c1c1f

b/lib/locking.py
1
#
2
#
3

  
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21
"""Module implementing the Ganeti locking code."""
22

  
23
# pylint: disable-msg=W0613,W0201
24

  
25
import threading
26

  
27

  
28
class SharedLock:
29
  """Implements a shared lock.
30

  
31
  Multiple threads can acquire the lock in a shared way, calling
32
  acquire_shared().  In order to acquire the lock in an exclusive way threads
33
  can call acquire_exclusive().
34

  
35
  The lock prevents starvation but does not guarantee that threads will acquire
36
  the shared lock in the order they queued for it, just that they will
37
  eventually do so.
38

  
39
  """
40
  def __init__(self):
41
    """Construct a new Shared Lock"""
42
    # we have two conditions, c_shr and c_exc, sharing the same lock.
43
    self.__lock = threading.Lock()
44
    self.__turn_shr = threading.Condition(self.__lock)
45
    self.__turn_exc = threading.Condition(self.__lock)
46

  
47
    # current lock holders
48
    self.__shr = set()
49
    self.__exc = None
50

  
51
    # lock waiters
52
    self.__nwait_exc = 0
53
    self.__nwait_shr = 0
54

  
55
  def __is_sharer(self):
56
    """Is the current thread sharing the lock at this time?"""
57
    return threading.currentThread() in self.__shr
58

  
59
  def __is_exclusive(self):
60
    """Is the current thread holding the lock exclusively at this time?"""
61
    return threading.currentThread() == self.__exc
62

  
63
  def __is_owned(self, shared=-1):
64
    """Is the current thread somehow owning the lock at this time?
65

  
66
    This is a private version of the function, which presumes you're holding
67
    the internal lock.
68

  
69
    """
70
    if shared < 0:
71
      return self.__is_sharer() or self.__is_exclusive()
72
    elif shared:
73
      return self.__is_sharer()
74
    else:
75
      return self.__is_exclusive()
76

  
77
  def _is_owned(self, shared=-1):
78
    """Is the current thread somehow owning the lock at this time?
79

  
80
    Args:
81
      shared:
82
        < 0: check for any type of ownership (default)
83
        0: check for exclusive ownership
84
        > 0: check for shared ownership
85

  
86
    """
87
    self.__lock.acquire()
88
    try:
89
      result = self.__is_owned(shared)
90
    finally:
91
      self.__lock.release()
92

  
93
    return result
94

  
95
  def acquire(self, blocking=1, shared=0):
96
    """Acquire a shared lock.
97

  
98
    Args:
99
      shared: whether to acquire in shared mode. By default an exclusive lock
100
              will be acquired.
101
      blocking: whether to block while trying to acquire or to operate in try-lock mode.
102
                this locking mode is not supported yet.
103

  
104
    """
105
    if not blocking:
106
      # We don't have non-blocking mode for now
107
      raise NotImplementedError
108

  
109
    self.__lock.acquire()
110
    try:
111
      # We cannot acquire the lock if we already have it
112
      assert not self.__is_owned(), "double acquire() on a non-recursive lock"
113

  
114
      if shared:
115
        self.__nwait_shr += 1
116
        try:
117
          # If there is an exclusive holder waiting we have to wait.  We'll
118
          # only do this once, though, when we start waiting for the lock. Then
119
          # we'll just wait while there are no exclusive holders.
120
          if self.__nwait_exc > 0:
121
            # TODO: if !blocking...
122
            self.__turn_shr.wait()
123

  
124
          while self.__exc is not None:
125
            # TODO: if !blocking...
126
            self.__turn_shr.wait()
127

  
128
          self.__shr.add(threading.currentThread())
129
        finally:
130
          self.__nwait_shr -= 1
131

  
132
      else:
133
        self.__nwait_exc += 1
134
        try:
135
          # This is to save ourselves from a nasty race condition that could
136
          # theoretically make the sharers starve.
137
          if self.__nwait_shr > 0 or self.__nwait_exc > 1:
138
            # TODO: if !blocking...
139
              self.__turn_exc.wait()
140

  
141
          while len(self.__shr) > 0 or self.__exc is not None:
142
            # TODO: if !blocking...
143
            self.__turn_exc.wait()
144

  
145
          self.__exc = threading.currentThread()
146
        finally:
147
          self.__nwait_exc -= 1
148

  
149
    finally:
150
      self.__lock.release()
151

  
152
    return True
153

  
154
  def release(self):
155
    """Release a Shared Lock.
156

  
157
    You must have acquired the lock, either in shared or in exclusive mode,
158
    before calling this function.
159

  
160
    """
161
    self.__lock.acquire()
162
    try:
163
      # Autodetect release type
164
      if self.__is_exclusive():
165
        self.__exc = None
166

  
167
        # An exclusive holder has just had the lock, time to put it in shared
168
        # mode if there are shared holders waiting. Otherwise wake up the next
169
        # exclusive holder.
170
        if self.__nwait_shr > 0:
171
          self.__turn_shr.notifyAll()
172
        elif self.__nwait_exc > 0:
173
         self.__turn_exc.notify()
174

  
175
      elif self.__is_sharer():
176
        self.__shr.remove(threading.currentThread())
177

  
178
        # If there are shared holders waiting there *must* be an exclusive holder
179
        # waiting as well; otherwise what were they waiting for?
180
        assert (self.__nwait_shr == 0 or self.__nwait_exc > 0,
181
                "Lock sharers waiting while no exclusive is queueing")
182

  
183
        # If there are no more shared holders and some exclusive holders are
184
        # waiting let's wake one up.
185
        if len(self.__shr) == 0 and self.__nwait_exc > 0:
186
          self.__turn_exc.notify()
187

  
188
      else:
189
        assert False, "Cannot release non-owned lock"
190

  
191
    finally:
192
      self.__lock.release()
193

  
b/test/Makefile.am
2 2
  ganeti.config_unittest.py \
3 3
  ganeti.hooks_unittest.py \
4 4
  ganeti.utils_unittest.py \
5
  ganeti.bdev_unittest.py
5
  ganeti.bdev_unittest.py \
6
  ganeti.locking_unittest.py
6 7

  
7 8
TESTS_ENVIRONMENT = PYTHONPATH=.:$(top_builddir)
8 9

  
b/test/ganeti.locking_unittest.py
1
#!/usr/bin/python
2
#
3

  
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 0.0510-1301, USA.
20

  
21

  
22
"""Script for unittesting the locking module"""
23

  
24

  
25
import os
26
import unittest
27
import time
28
import Queue
29

  
30
from ganeti import locking
31
from threading import Thread
32

  
33

  
34
class TestSharedLock(unittest.TestCase):
35
  """Shared lock tests"""
36

  
37
  def setUp(self):
38
    self.sl = locking.SharedLock()
39
    # helper threads use the 'done' queue to tell the master they finished.
40
    self.done = Queue.Queue(0)
41

  
42
  def testSequenceAndOwnership(self):
43
    self.assert_(not self.sl._is_owned())
44
    self.sl.acquire(shared=1)
45
    self.assert_(self.sl._is_owned())
46
    self.assert_(self.sl._is_owned(shared=1))
47
    self.assert_(not self.sl._is_owned(shared=0))
48
    self.sl.release()
49
    self.assert_(not self.sl._is_owned())
50
    self.sl.acquire()
51
    self.assert_(self.sl._is_owned())
52
    self.assert_(not self.sl._is_owned(shared=1))
53
    self.assert_(self.sl._is_owned(shared=0))
54
    self.sl.release()
55
    self.assert_(not self.sl._is_owned())
56
    self.sl.acquire(shared=1)
57
    self.assert_(self.sl._is_owned())
58
    self.assert_(self.sl._is_owned(shared=1))
59
    self.assert_(not self.sl._is_owned(shared=0))
60
    self.sl.release()
61
    self.assert_(not self.sl._is_owned())
62

  
63
  def testBooleanValue(self):
64
    # semaphores are supposed to return a true value on a successful acquire
65
    self.assert_(self.sl.acquire(shared=1))
66
    self.sl.release()
67
    self.assert_(self.sl.acquire())
68
    self.sl.release()
69

  
70
  def testDoubleLockingStoE(self):
71
    self.sl.acquire(shared=1)
72
    self.assertRaises(AssertionError, self.sl.acquire)
73

  
74
  def testDoubleLockingEtoS(self):
75
    self.sl.acquire()
76
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
77

  
78
  def testDoubleLockingStoS(self):
79
    self.sl.acquire(shared=1)
80
    self.assertRaises(AssertionError, self.sl.acquire, shared=1)
81

  
82
  def testDoubleLockingEtoE(self):
83
    self.sl.acquire()
84
    self.assertRaises(AssertionError, self.sl.acquire)
85

  
86
  # helper functions: called in a separate thread they acquire the lock, send
87
  # their identifier on the done queue, then release it.
88
  def _doItSharer(self):
89
    self.sl.acquire(shared=1)
90
    self.done.put('SHR')
91
    self.sl.release()
92

  
93
  def _doItExclusive(self):
94
    self.sl.acquire()
95
    self.done.put('EXC')
96
    self.sl.release()
97

  
98
  def testSharersCanCoexist(self):
99
    self.sl.acquire(shared=1)
100
    Thread(target=self._doItSharer).start()
101
    self.assert_(self.done.get(True, 1))
102
    self.sl.release()
103

  
104
  def testExclusiveBlocksExclusive(self):
105
    self.sl.acquire()
106
    Thread(target=self._doItExclusive).start()
107
    # give it a bit of time to check that it's not actually doing anything
108
    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
109
    self.sl.release()
110
    self.assert_(self.done.get(True, 1))
111

  
112
  def testExclusiveBlocksSharer(self):
113
    self.sl.acquire()
114
    Thread(target=self._doItSharer).start()
115
    time.sleep(0.05)
116
    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
117
    self.sl.release()
118
    self.assert_(self.done.get(True, 1))
119

  
120
  def testSharerBlocksExclusive(self):
121
    self.sl.acquire(shared=1)
122
    Thread(target=self._doItExclusive).start()
123
    time.sleep(0.05)
124
    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
125
    self.sl.release()
126
    self.assert_(self.done.get(True, 1))
127

  
128
  def testWaitingExclusiveBlocksSharer(self):
129
    self.sl.acquire(shared=1)
130
    # the lock is acquired in shared mode...
131
    Thread(target=self._doItExclusive).start()
132
    # ...but now an exclusive is waiting...
133
    time.sleep(0.05)
134
    Thread(target=self._doItSharer).start()
135
    # ...so the sharer should be blocked as well
136
    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
137
    self.sl.release()
138
    # The exclusive passed before
139
    self.assertEqual(self.done.get(True, 1), 'EXC')
140
    self.assertEqual(self.done.get(True, 1), 'SHR')
141

  
142
  def testWaitingSharerBlocksExclusive(self):
143
    self.sl.acquire()
144
    # the lock is acquired in exclusive mode...
145
    Thread(target=self._doItSharer).start()
146
    # ...but now a sharer is waiting...
147
    time.sleep(0.05)
148
    Thread(target=self._doItExclusive).start()
149
    # ...the exclusive is waiting too...
150
    self.assertRaises(Queue.Empty, self.done.get, True, 0.2)
151
    self.sl.release()
152
    # The sharer passed before
153
    self.assertEqual(self.done.get(True, 1), 'SHR')
154
    self.assertEqual(self.done.get(True, 1), 'EXC')
155

  
156

  
157
if __name__ == '__main__':
158
  unittest.main()
159
  #suite = unittest.TestLoader().loadTestsFromTestCase(TestSharedLock)
160
  #unittest.TextTestRunner(verbosity=2).run(suite)

Also available in: Unified diff