Revision 34cb5617 lib/locking.py
b/lib/locking.py | ||
---|---|---|
49 | 49 |
return wrap |
50 | 50 |
|
51 | 51 |
|
52 |
class _SingleActionPipeConditionWaiter(object):
|
|
53 |
"""Callable helper class for _SingleActionPipeCondition.
|
|
52 |
class _SingleNotifyPipeConditionWaiter(object):
|
|
53 |
"""Helper class for SingleNotifyPipeCondition
|
|
54 | 54 |
|
55 | 55 |
""" |
56 | 56 |
__slots__ = [ |
57 |
"_cond", |
|
58 | 57 |
"_fd", |
59 | 58 |
"_poller", |
60 | 59 |
] |
61 | 60 |
|
62 |
def __init__(self, cond, poller, fd):
|
|
63 |
"""Initializes this class.
|
|
61 |
def __init__(self, poller, fd): |
|
62 |
"""Constructor for _SingleNotifyPipeConditionWaiter
|
|
64 | 63 |
|
65 |
@type cond: L{_SingleActionPipeCondition} |
|
66 |
@param cond: Parent condition |
|
67 | 64 |
@type poller: select.poll |
68 | 65 |
@param poller: Poller object |
69 | 66 |
@type fd: int |
... | ... | |
71 | 68 |
|
72 | 69 |
""" |
73 | 70 |
object.__init__(self) |
74 |
|
|
75 |
self._cond = cond |
|
76 | 71 |
self._poller = poller |
77 | 72 |
self._fd = fd |
78 | 73 |
|
... | ... | |
152 | 147 |
raise RuntimeError("cannot work with un-aquired lock") |
153 | 148 |
|
154 | 149 |
|
155 |
class _SingleActionPipeCondition(object): |
|
156 |
"""Wrapper around a pipe for usage inside conditions. |
|
157 |
|
|
158 |
This class contains a POSIX pipe(2) and a poller to poll it. The pipe is |
|
159 |
always allocated when constructing the class. Extra care is taken to always |
|
160 |
close the file descriptors. |
|
161 |
|
|
162 |
An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for |
|
163 |
notifications. |
|
150 |
class SingleNotifyPipeCondition(_BaseCondition): |
|
151 |
"""Condition which can only be notified once. |
|
164 | 152 |
|
165 |
Warning: This class is designed to be used as the underlying component of a |
|
166 |
locking condition, but is not by itself thread safe, and needs to be |
|
167 |
protected by an external lock. |
|
153 |
This condition class uses pipes and poll, internally, to be able to wait for |
|
154 |
notification with a timeout, without resorting to polling. It is almost |
|
155 |
compatible with Python's threading.Condition, with the following differences: |
|
156 |
- notifyAll can only be called once, and no wait can happen after that |
|
157 |
- notify is not supported, only notifyAll |
|
168 | 158 |
|
169 | 159 |
""" |
170 |
__slots__ = [ |
|
160 |
|
|
161 |
__slots__ = _BaseCondition.__slots__ + [ |
|
171 | 162 |
"_poller", |
172 | 163 |
"_read_fd", |
173 | 164 |
"_write_fd", |
174 | 165 |
"_nwaiters", |
166 |
"_notified", |
|
175 | 167 |
] |
176 | 168 |
|
177 |
_waiter_class = _SingleActionPipeConditionWaiter
|
|
169 |
_waiter_class = _SingleNotifyPipeConditionWaiter
|
|
178 | 170 |
|
179 |
def __init__(self): |
|
180 |
"""Initializes this class.
|
|
171 |
def __init__(self, lock):
|
|
172 |
"""Constructor for SingleNotifyPipeCondition
|
|
181 | 173 |
|
182 | 174 |
""" |
183 |
object.__init__(self) |
|
184 |
|
|
175 |
_BaseCondition.__init__(self, lock) |
|
185 | 176 |
self._nwaiters = 0 |
177 |
self._notified = False |
|
178 |
self._read_fd = None |
|
179 |
self._write_fd = None |
|
180 |
self._poller = None |
|
186 | 181 |
|
187 |
# Just assume the unpacking is successful, otherwise error handling gets |
|
188 |
# very complicated. |
|
189 |
(self._read_fd, self._write_fd) = os.pipe() |
|
190 |
try: |
|
191 |
# The poller looks for closure of the write side |
|
192 |
poller = select.poll() |
|
193 |
poller.register(self._read_fd, select.POLLHUP) |
|
194 |
|
|
195 |
self._poller = poller |
|
196 |
except: |
|
197 |
if self._read_fd is not None: |
|
198 |
os.close(self._read_fd) |
|
199 |
if self._write_fd is not None: |
|
200 |
os.close(self._write_fd) |
|
201 |
raise |
|
202 |
|
|
203 |
# There should be no code here anymore, otherwise the pipe file descriptors |
|
204 |
# may be not be cleaned up properly in case of errors. |
|
205 |
|
|
206 |
def StartWaiting(self): |
|
207 |
"""Return function to wait for notification. |
|
182 |
def _check_unnotified(self): |
|
183 |
if self._notified: |
|
184 |
raise RuntimeError("cannot use already notified condition") |
|
208 | 185 |
|
209 |
@rtype: L{_SingleActionPipeConditionWaiter}
|
|
210 |
@return: Function to wait for notification
|
|
186 |
def _Cleanup(self):
|
|
187 |
"""Cleanup open file descriptors, if any.
|
|
211 | 188 |
|
212 | 189 |
""" |
213 |
assert self._nwaiters >= 0 |
|
214 |
|
|
215 |
if self._poller is None: |
|
216 |
raise RuntimeError("Already cleaned up") |
|
217 |
|
|
218 |
# Create waiter function and increase number of waiters |
|
219 |
wait_fn = self._waiter_class(self, self._poller, self._read_fd) |
|
220 |
self._nwaiters += 1 |
|
221 |
return wait_fn |
|
190 |
if self._read_fd is not None: |
|
191 |
os.close(self._read_fd) |
|
192 |
self._read_fd = None |
|
222 | 193 |
|
223 |
def DoneWaiting(self): |
|
224 |
"""Decrement number of waiters and automatic cleanup. |
|
194 |
if self._write_fd is not None: |
|
195 |
os.close(self._write_fd) |
|
196 |
self._write_fd = None |
|
197 |
self._poller = None |
|
225 | 198 |
|
226 |
Must be called after waiting for a notification. |
|
199 |
def wait(self, timeout=None): |
|
200 |
"""Wait for a notification. |
|
227 | 201 |
|
228 |
@rtype: bool
|
|
229 |
@return: Whether this was the last waiter
|
|
202 |
@type timeout: float or None
|
|
203 |
@param timeout: Waiting timeout (can be None)
|
|
230 | 204 |
|
231 | 205 |
""" |
232 |
assert self._nwaiters > 0 |
|
233 |
|
|
234 |
self._nwaiters -= 1 |
|
206 |
self._check_owned() |
|
207 |
self._check_unnotified() |
|
235 | 208 |
|
236 |
if self._nwaiters == 0: |
|
237 |
self._Cleanup() |
|
238 |
return True |
|
209 |
self._nwaiters += 1 |
|
210 |
try: |
|
211 |
if self._poller is None: |
|
212 |
(self._read_fd, self._write_fd) = os.pipe() |
|
213 |
self._poller = select.poll() |
|
214 |
self._poller.register(self._read_fd, select.POLLHUP) |
|
239 | 215 |
|
240 |
return False |
|
216 |
wait_fn = self._waiter_class(self._poller, self._read_fd) |
|
217 |
self.release() |
|
218 |
try: |
|
219 |
# Wait for notification |
|
220 |
wait_fn(timeout) |
|
221 |
finally: |
|
222 |
# Re-acquire lock |
|
223 |
self.acquire() |
|
224 |
finally: |
|
225 |
self._nwaiters -= 1 |
|
226 |
if self._nwaiters == 0: |
|
227 |
self._Cleanup() |
|
241 | 228 |
|
242 | 229 |
def notifyAll(self): |
243 | 230 |
"""Close the writing side of the pipe to notify all waiters. |
244 | 231 |
|
245 | 232 |
""" |
246 |
if self._write_fd is None: |
|
247 |
raise RuntimeError("Can only notify once") |
|
248 |
|
|
249 |
os.close(self._write_fd) |
|
250 |
self._write_fd = None |
|
251 |
|
|
252 |
def _Cleanup(self): |
|
253 |
"""Close all file descriptors. |
|
254 |
|
|
255 |
""" |
|
256 |
if self._read_fd is not None: |
|
257 |
os.close(self._read_fd) |
|
258 |
self._read_fd = None |
|
259 |
|
|
233 |
self._check_owned() |
|
234 |
self._check_unnotified() |
|
235 |
self._notified = True |
|
260 | 236 |
if self._write_fd is not None: |
261 | 237 |
os.close(self._write_fd) |
262 | 238 |
self._write_fd = None |
263 | 239 |
|
264 |
self._poller = None |
|
265 |
|
|
266 |
def __del__(self): |
|
267 |
"""Called on object deletion. |
|
268 |
|
|
269 |
Ensure no file descriptors are left open. |
|
270 | 240 |
|
271 |
""" |
|
272 |
self._Cleanup() |
|
273 |
|
|
274 |
|
|
275 |
class _PipeCondition(_BaseCondition): |
|
241 |
class PipeCondition(_BaseCondition): |
|
276 | 242 |
"""Group-only non-polling condition with counters. |
277 | 243 |
|
278 | 244 |
This condition class uses pipes and poll, internally, to be able to wait for |
... | ... | |
284 | 250 |
""" |
285 | 251 |
__slots__ = _BaseCondition.__slots__ + [ |
286 | 252 |
"_nwaiters", |
287 |
"_pipe",
|
|
253 |
"_single_condition",
|
|
288 | 254 |
] |
289 | 255 |
|
290 |
_pipe_class = _SingleActionPipeCondition
|
|
256 |
_single_condition_class = SingleNotifyPipeCondition
|
|
291 | 257 |
|
292 | 258 |
def __init__(self, lock): |
293 | 259 |
"""Initializes this class. |
... | ... | |
295 | 261 |
""" |
296 | 262 |
_BaseCondition.__init__(self, lock) |
297 | 263 |
self._nwaiters = 0 |
298 |
self._pipe = None
|
|
264 |
self._single_condition = self._single_condition_class(self._lock)
|
|
299 | 265 |
|
300 | 266 |
def wait(self, timeout=None): |
301 | 267 |
"""Wait for a notification. |
... | ... | |
306 | 272 |
""" |
307 | 273 |
self._check_owned() |
308 | 274 |
|
309 |
if not self._pipe: |
|
310 |
self._pipe = self._pipe_class() |
|
311 |
|
|
312 | 275 |
# Keep local reference to the pipe. It could be replaced by another thread |
313 | 276 |
# notifying while we're waiting. |
314 |
pipe = self._pipe
|
|
277 |
my_condition = self._single_condition
|
|
315 | 278 |
|
316 | 279 |
assert self._nwaiters >= 0 |
317 | 280 |
self._nwaiters += 1 |
318 | 281 |
try: |
319 |
# Get function to wait on the pipe |
|
320 |
wait_fn = pipe.StartWaiting() |
|
321 |
try: |
|
322 |
# Release lock while waiting |
|
323 |
self.release() |
|
324 |
try: |
|
325 |
# Wait for notification |
|
326 |
wait_fn(timeout) |
|
327 |
finally: |
|
328 |
# Re-acquire lock |
|
329 |
self.acquire() |
|
330 |
finally: |
|
331 |
# Destroy pipe if this was the last waiter and the current pipe is |
|
332 |
# still the same. The same pipe cannot be reused after cleanup. |
|
333 |
if pipe.DoneWaiting() and pipe == self._pipe: |
|
334 |
self._pipe = None |
|
282 |
my_condition.wait(timeout) |
|
335 | 283 |
finally: |
336 | 284 |
assert self._nwaiters > 0 |
337 | 285 |
self._nwaiters -= 1 |
... | ... | |
341 | 289 |
|
342 | 290 |
""" |
343 | 291 |
self._check_owned() |
344 |
|
|
345 |
# Notify and forget pipe. A new one will be created on the next call to |
|
346 |
# wait. |
|
347 |
if self._pipe is not None: |
|
348 |
self._pipe.notifyAll() |
|
349 |
self._pipe = None |
|
292 |
self._single_condition.notifyAll() |
|
293 |
self._single_condition = self._single_condition_class(self._lock) |
|
350 | 294 |
|
351 | 295 |
def has_waiting(self): |
352 | 296 |
"""Returns whether there are active waiters. |
... | ... | |
387 | 331 |
"""Waits for the condition to be notified. |
388 | 332 |
|
389 | 333 |
@type timeout: float or None |
390 |
@param timeout: Timeout in seconds
|
|
334 |
@param timeout: Waiting timeout (can be None)
|
|
391 | 335 |
|
392 | 336 |
""" |
393 | 337 |
assert self._nwaiters >= 0 |
... | ... | |
427 | 371 |
"__shr", |
428 | 372 |
] |
429 | 373 |
|
430 |
__condition_class = _PipeCondition
|
|
374 |
__condition_class = PipeCondition |
|
431 | 375 |
|
432 | 376 |
def __init__(self): |
433 | 377 |
"""Construct a new SharedLock. |
Also available in: Unified diff