Revision a95fd5d7
b/lib/locking.py | ||
---|---|---|
23 | 23 |
# pylint: disable-msg=W0613,W0201 |
24 | 24 |
|
25 | 25 |
import threading |
26 |
# Wouldn't it be better to define LockingError in the locking module? |
|
27 |
# Well, for now that's how the rest of the code does it... |
|
28 |
from ganeti import errors |
|
26 | 29 |
|
27 | 30 |
|
28 | 31 |
class SharedLock: |
... | ... | |
52 | 55 |
self.__nwait_exc = 0 |
53 | 56 |
self.__nwait_shr = 0 |
54 | 57 |
|
58 |
# is this lock in the deleted state? |
|
59 |
self.__deleted = False |
|
60 |
|
|
55 | 61 |
def __is_sharer(self): |
56 | 62 |
"""Is the current thread sharing the lock at this time?""" |
57 | 63 |
return threading.currentThread() in self.__shr |
... | ... | |
92 | 98 |
|
93 | 99 |
return result |
94 | 100 |
|
101 |
def __wait(self,c): |
|
102 |
"""Wait on the given condition, and raise an exception if the current lock |
|
103 |
is declared deleted in the meantime. |
|
104 |
|
|
105 |
Args: |
|
106 |
c: condition to wait on |
|
107 |
|
|
108 |
""" |
|
109 |
c.wait() |
|
110 |
if self.__deleted: |
|
111 |
raise errors.LockError('deleted lock') |
|
112 |
|
|
113 |
def __exclusive_acquire(self): |
|
114 |
"""Acquire the lock exclusively. |
|
115 |
|
|
116 |
This is a private function that presumes you are already holding the |
|
117 |
internal lock. It's defined separately to avoid code duplication between |
|
118 |
acquire() and delete() |
|
119 |
|
|
120 |
""" |
|
121 |
self.__nwait_exc += 1 |
|
122 |
try: |
|
123 |
# This is to save ourselves from a nasty race condition that could |
|
124 |
# theoretically make the sharers starve. |
|
125 |
if self.__nwait_shr > 0 or self.__nwait_exc > 1: |
|
126 |
self.__wait(self.__turn_exc) |
|
127 |
|
|
128 |
while len(self.__shr) > 0 or self.__exc is not None: |
|
129 |
self.__wait(self.__turn_exc) |
|
130 |
|
|
131 |
self.__exc = threading.currentThread() |
|
132 |
finally: |
|
133 |
self.__nwait_exc -= 1 |
|
134 |
|
|
135 |
|
|
95 | 136 |
def acquire(self, blocking=1, shared=0): |
96 | 137 |
"""Acquire a shared lock. |
97 | 138 |
|
... | ... | |
108 | 149 |
|
109 | 150 |
self.__lock.acquire() |
110 | 151 |
try: |
152 |
if self.__deleted: |
|
153 |
raise errors.LockError('deleted lock') |
|
154 |
|
|
111 | 155 |
# We cannot acquire the lock if we already have it |
112 | 156 |
assert not self.__is_owned(), "double acquire() on a non-recursive lock" |
113 | 157 |
|
... | ... | |
119 | 163 |
# we'll just wait while there are no exclusive holders. |
120 | 164 |
if self.__nwait_exc > 0: |
121 | 165 |
# TODO: if !blocking... |
122 |
self.__turn_shr.wait()
|
|
166 |
self.__wait(self.__turn_shr)
|
|
123 | 167 |
|
124 | 168 |
while self.__exc is not None: |
125 | 169 |
# TODO: if !blocking... |
126 |
self.__turn_shr.wait()
|
|
170 |
self.__wait(self.__turn_shr)
|
|
127 | 171 |
|
128 | 172 |
self.__shr.add(threading.currentThread()) |
129 | 173 |
finally: |
130 | 174 |
self.__nwait_shr -= 1 |
131 | 175 |
|
132 | 176 |
else: |
133 |
self.__nwait_exc += 1 |
|
134 |
try: |
|
135 |
# This is to save ourselves from a nasty race condition that could |
|
136 |
# theoretically make the sharers starve. |
|
137 |
if self.__nwait_shr > 0 or self.__nwait_exc > 1: |
|
138 |
# TODO: if !blocking... |
|
139 |
self.__turn_exc.wait() |
|
140 |
|
|
141 |
while len(self.__shr) > 0 or self.__exc is not None: |
|
142 |
# TODO: if !blocking... |
|
143 |
self.__turn_exc.wait() |
|
144 |
|
|
145 |
self.__exc = threading.currentThread() |
|
146 |
finally: |
|
147 |
self.__nwait_exc -= 1 |
|
177 |
# TODO: if !blocking... |
|
178 |
# (or modify __exclusive_acquire for non-blocking mode) |
|
179 |
self.__exclusive_acquire() |
|
148 | 180 |
|
149 | 181 |
finally: |
150 | 182 |
self.__lock.release() |
... | ... | |
191 | 223 |
finally: |
192 | 224 |
self.__lock.release() |
193 | 225 |
|
226 |
def delete(self, blocking=1): |
|
227 |
"""Delete a Shared Lock. |
|
228 |
|
|
229 |
This operation will declare the lock for removal. First the lock will be |
|
230 |
acquired in exclusive mode if you don't already own it, then the lock |
|
231 |
will be put in a state where any future and pending acquire() fail. |
|
232 |
|
|
233 |
Args: |
|
234 |
blocking: whether to block while trying to acquire or to operate in |
|
235 |
try-lock mode. this locking mode is not supported yet unless |
|
236 |
you are already holding exclusively the lock. |
|
237 |
|
|
238 |
""" |
|
239 |
self.__lock.acquire() |
|
240 |
try: |
|
241 |
assert not self.__is_sharer(), "cannot delete() a lock while sharing it" |
|
242 |
|
|
243 |
if self.__deleted: |
|
244 |
raise errors.LockError('deleted lock') |
|
245 |
|
|
246 |
if not self.__is_exclusive(): |
|
247 |
if not blocking: |
|
248 |
# We don't have non-blocking mode for now |
|
249 |
raise NotImplementedError |
|
250 |
self.__exclusive_acquire() |
|
251 |
|
|
252 |
self.__deleted = True |
|
253 |
self.__exc = None |
|
254 |
# Wake up everybody, they will fail acquiring the lock and |
|
255 |
# raise an exception instead. |
|
256 |
self.__turn_exc.notifyAll() |
|
257 |
self.__turn_shr.notifyAll() |
|
258 |
|
|
259 |
finally: |
|
260 |
self.__lock.release() |
|
261 |
|
b/test/ganeti.locking_unittest.py | ||
---|---|---|
28 | 28 |
import Queue |
29 | 29 |
|
30 | 30 |
from ganeti import locking |
31 |
from ganeti import errors |
|
31 | 32 |
from threading import Thread |
32 | 33 |
|
33 | 34 |
|
... | ... | |
86 | 87 |
# helper functions: called in a separate thread they acquire the lock, send |
87 | 88 |
# their identifier on the done queue, then release it. |
88 | 89 |
def _doItSharer(self): |
89 |
self.sl.acquire(shared=1) |
|
90 |
self.done.put('SHR') |
|
91 |
self.sl.release() |
|
90 |
try: |
|
91 |
self.sl.acquire(shared=1) |
|
92 |
self.done.put('SHR') |
|
93 |
self.sl.release() |
|
94 |
except errors.LockError: |
|
95 |
self.done.put('ERR') |
|
92 | 96 |
|
93 | 97 |
def _doItExclusive(self): |
94 |
self.sl.acquire() |
|
95 |
self.done.put('EXC') |
|
96 |
self.sl.release() |
|
98 |
try: |
|
99 |
self.sl.acquire() |
|
100 |
self.done.put('EXC') |
|
101 |
self.sl.release() |
|
102 |
except errors.LockError: |
|
103 |
self.done.put('ERR') |
|
104 |
|
|
105 |
def _doItDelete(self): |
|
106 |
try: |
|
107 |
self.sl.acquire() |
|
108 |
self.done.put('DEL') |
|
109 |
self.sl.release() |
|
110 |
except errors.LockError: |
|
111 |
self.done.put('ERR') |
|
97 | 112 |
|
98 | 113 |
def testSharersCanCoexist(self): |
99 | 114 |
self.sl.acquire(shared=1) |
... | ... | |
109 | 124 |
self.sl.release() |
110 | 125 |
self.assert_(self.done.get(True, 1)) |
111 | 126 |
|
127 |
def testExclusiveBlocksDelete(self): |
|
128 |
self.sl.acquire() |
|
129 |
Thread(target=self._doItDelete).start() |
|
130 |
# give it a bit of time to check that it's not actually doing anything |
|
131 |
self.assertRaises(Queue.Empty, self.done.get, True, 0.2) |
|
132 |
self.sl.release() |
|
133 |
self.assert_(self.done.get(True, 1)) |
|
134 |
|
|
112 | 135 |
def testExclusiveBlocksSharer(self): |
113 | 136 |
self.sl.acquire() |
114 | 137 |
Thread(target=self._doItSharer).start() |
... | ... | |
125 | 148 |
self.sl.release() |
126 | 149 |
self.assert_(self.done.get(True, 1)) |
127 | 150 |
|
151 |
def testSharerBlocksDelete(self): |
|
152 |
self.sl.acquire(shared=1) |
|
153 |
Thread(target=self._doItDelete).start() |
|
154 |
time.sleep(0.05) |
|
155 |
self.assertRaises(Queue.Empty, self.done.get, True, 0.2) |
|
156 |
self.sl.release() |
|
157 |
self.assert_(self.done.get(True, 1)) |
|
158 |
|
|
128 | 159 |
def testWaitingExclusiveBlocksSharer(self): |
129 | 160 |
self.sl.acquire(shared=1) |
130 | 161 |
# the lock is acquired in shared mode... |
... | ... | |
153 | 184 |
self.assertEqual(self.done.get(True, 1), 'SHR') |
154 | 185 |
self.assertEqual(self.done.get(True, 1), 'EXC') |
155 | 186 |
|
187 |
def testNoNonBlocking(self): |
|
188 |
self.assertRaises(NotImplementedError, self.sl.acquire, blocking=0) |
|
189 |
self.assertRaises(NotImplementedError, self.sl.delete, blocking=0) |
|
190 |
self.sl.acquire() |
|
191 |
self.sl.delete(blocking=0) # Fine, because the lock is already acquired |
|
192 |
|
|
193 |
def testDelete(self): |
|
194 |
self.sl.delete() |
|
195 |
self.assertRaises(errors.LockError, self.sl.acquire) |
|
196 |
self.assertRaises(errors.LockError, self.sl.delete) |
|
197 |
|
|
198 |
def testDeletePendingSharersExclusiveDelete(self): |
|
199 |
self.sl.acquire() |
|
200 |
Thread(target=self._doItSharer).start() |
|
201 |
Thread(target=self._doItSharer).start() |
|
202 |
time.sleep(0.05) |
|
203 |
Thread(target=self._doItExclusive).start() |
|
204 |
Thread(target=self._doItDelete).start() |
|
205 |
time.sleep(0.05) |
|
206 |
self.sl.delete() |
|
207 |
# The two threads who were pending return both ERR |
|
208 |
self.assertEqual(self.done.get(True, 1), 'ERR') |
|
209 |
self.assertEqual(self.done.get(True, 1), 'ERR') |
|
210 |
self.assertEqual(self.done.get(True, 1), 'ERR') |
|
211 |
self.assertEqual(self.done.get(True, 1), 'ERR') |
|
212 |
|
|
213 |
def testDeletePendingDeleteExclusiveSharers(self): |
|
214 |
self.sl.acquire() |
|
215 |
Thread(target=self._doItDelete).start() |
|
216 |
Thread(target=self._doItExclusive).start() |
|
217 |
time.sleep(0.05) |
|
218 |
Thread(target=self._doItSharer).start() |
|
219 |
Thread(target=self._doItSharer).start() |
|
220 |
time.sleep(0.05) |
|
221 |
self.sl.delete() |
|
222 |
# The two threads who were pending return both ERR |
|
223 |
self.assertEqual(self.done.get(True, 1), 'ERR') |
|
224 |
self.assertEqual(self.done.get(True, 1), 'ERR') |
|
225 |
self.assertEqual(self.done.get(True, 1), 'ERR') |
|
226 |
self.assertEqual(self.done.get(True, 1), 'ERR') |
|
227 |
|
|
156 | 228 |
|
157 | 229 |
if __name__ == '__main__': |
158 | 230 |
unittest.main() |
Also available in: Unified diff