49 |
49 |
return wrap
|
50 |
50 |
|
51 |
51 |
|
52 |
|
class _SingleActionPipeConditionWaiter(object):
|
53 |
|
"""Callable helper class for _SingleActionPipeCondition.
|
|
52 |
class _SingleNotifyPipeConditionWaiter(object):
|
|
53 |
"""Helper class for SingleNotifyPipeCondition
|
54 |
54 |
|
55 |
55 |
"""
|
56 |
56 |
__slots__ = [
|
57 |
|
"_cond",
|
58 |
57 |
"_fd",
|
59 |
58 |
"_poller",
|
60 |
59 |
]
|
61 |
60 |
|
62 |
|
def __init__(self, cond, poller, fd):
|
63 |
|
"""Initializes this class.
|
|
61 |
def __init__(self, poller, fd):
|
|
62 |
"""Constructor for _SingleNotifyPipeConditionWaiter
|
64 |
63 |
|
65 |
|
@type cond: L{_SingleActionPipeCondition}
|
66 |
|
@param cond: Parent condition
|
67 |
64 |
@type poller: select.poll
|
68 |
65 |
@param poller: Poller object
|
69 |
66 |
@type fd: int
|
... | ... | |
71 |
68 |
|
72 |
69 |
"""
|
73 |
70 |
object.__init__(self)
|
74 |
|
|
75 |
|
self._cond = cond
|
76 |
71 |
self._poller = poller
|
77 |
72 |
self._fd = fd
|
78 |
73 |
|
... | ... | |
152 |
147 |
raise RuntimeError("cannot work with un-aquired lock")
|
153 |
148 |
|
154 |
149 |
|
155 |
|
class _SingleActionPipeCondition(object):
|
156 |
|
"""Wrapper around a pipe for usage inside conditions.
|
157 |
|
|
158 |
|
This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
|
159 |
|
always allocated when constructing the class. Extra care is taken to always
|
160 |
|
close the file descriptors.
|
161 |
|
|
162 |
|
An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
|
163 |
|
notifications.
|
|
150 |
class SingleNotifyPipeCondition(_BaseCondition):
|
|
151 |
"""Condition which can only be notified once.
|
164 |
152 |
|
165 |
|
Warning: This class is designed to be used as the underlying component of a
|
166 |
|
locking condition, but is not by itself thread safe, and needs to be
|
167 |
|
protected by an external lock.
|
|
153 |
This condition class uses pipes and poll, internally, to be able to wait for
|
|
154 |
notification with a timeout, without resorting to polling. It is almost
|
|
155 |
compatible with Python's threading.Condition, with the following differences:
|
|
156 |
- notifyAll can only be called once, and no wait can happen after that
|
|
157 |
- notify is not supported, only notifyAll
|
168 |
158 |
|
169 |
159 |
"""
|
170 |
|
__slots__ = [
|
|
160 |
|
|
161 |
__slots__ = _BaseCondition.__slots__ + [
|
171 |
162 |
"_poller",
|
172 |
163 |
"_read_fd",
|
173 |
164 |
"_write_fd",
|
174 |
165 |
"_nwaiters",
|
|
166 |
"_notified",
|
175 |
167 |
]
|
176 |
168 |
|
177 |
|
_waiter_class = _SingleActionPipeConditionWaiter
|
|
169 |
_waiter_class = _SingleNotifyPipeConditionWaiter
|
178 |
170 |
|
179 |
|
def __init__(self):
|
180 |
|
"""Initializes this class.
|
|
171 |
def __init__(self, lock):
|
|
172 |
"""Constructor for SingleNotifyPipeCondition
|
181 |
173 |
|
182 |
174 |
"""
|
183 |
|
object.__init__(self)
|
184 |
|
|
|
175 |
_BaseCondition.__init__(self, lock)
|
185 |
176 |
self._nwaiters = 0
|
|
177 |
self._notified = False
|
|
178 |
self._read_fd = None
|
|
179 |
self._write_fd = None
|
|
180 |
self._poller = None
|
186 |
181 |
|
187 |
|
# Just assume the unpacking is successful, otherwise error handling gets
|
188 |
|
# very complicated.
|
189 |
|
(self._read_fd, self._write_fd) = os.pipe()
|
190 |
|
try:
|
191 |
|
# The poller looks for closure of the write side
|
192 |
|
poller = select.poll()
|
193 |
|
poller.register(self._read_fd, select.POLLHUP)
|
194 |
|
|
195 |
|
self._poller = poller
|
196 |
|
except:
|
197 |
|
if self._read_fd is not None:
|
198 |
|
os.close(self._read_fd)
|
199 |
|
if self._write_fd is not None:
|
200 |
|
os.close(self._write_fd)
|
201 |
|
raise
|
202 |
|
|
203 |
|
# There should be no code here anymore, otherwise the pipe file descriptors
|
204 |
|
# may be not be cleaned up properly in case of errors.
|
205 |
|
|
206 |
|
def StartWaiting(self):
|
207 |
|
"""Return function to wait for notification.
|
|
182 |
def _check_unnotified(self):
|
|
183 |
if self._notified:
|
|
184 |
raise RuntimeError("cannot use already notified condition")
|
208 |
185 |
|
209 |
|
@rtype: L{_SingleActionPipeConditionWaiter}
|
210 |
|
@return: Function to wait for notification
|
|
186 |
def _Cleanup(self):
|
|
187 |
"""Cleanup open file descriptors, if any.
|
211 |
188 |
|
212 |
189 |
"""
|
213 |
|
assert self._nwaiters >= 0
|
214 |
|
|
215 |
|
if self._poller is None:
|
216 |
|
raise RuntimeError("Already cleaned up")
|
217 |
|
|
218 |
|
# Create waiter function and increase number of waiters
|
219 |
|
wait_fn = self._waiter_class(self, self._poller, self._read_fd)
|
220 |
|
self._nwaiters += 1
|
221 |
|
return wait_fn
|
|
190 |
if self._read_fd is not None:
|
|
191 |
os.close(self._read_fd)
|
|
192 |
self._read_fd = None
|
222 |
193 |
|
223 |
|
def DoneWaiting(self):
|
224 |
|
"""Decrement number of waiters and automatic cleanup.
|
|
194 |
if self._write_fd is not None:
|
|
195 |
os.close(self._write_fd)
|
|
196 |
self._write_fd = None
|
|
197 |
self._poller = None
|
225 |
198 |
|
226 |
|
Must be called after waiting for a notification.
|
|
199 |
def wait(self, timeout=None):
|
|
200 |
"""Wait for a notification.
|
227 |
201 |
|
228 |
|
@rtype: bool
|
229 |
|
@return: Whether this was the last waiter
|
|
202 |
@type timeout: float or None
|
|
203 |
@param timeout: Waiting timeout (can be None)
|
230 |
204 |
|
231 |
205 |
"""
|
232 |
|
assert self._nwaiters > 0
|
233 |
|
|
234 |
|
self._nwaiters -= 1
|
|
206 |
self._check_owned()
|
|
207 |
self._check_unnotified()
|
235 |
208 |
|
236 |
|
if self._nwaiters == 0:
|
237 |
|
self._Cleanup()
|
238 |
|
return True
|
|
209 |
self._nwaiters += 1
|
|
210 |
try:
|
|
211 |
if self._poller is None:
|
|
212 |
(self._read_fd, self._write_fd) = os.pipe()
|
|
213 |
self._poller = select.poll()
|
|
214 |
self._poller.register(self._read_fd, select.POLLHUP)
|
239 |
215 |
|
240 |
|
return False
|
|
216 |
wait_fn = self._waiter_class(self._poller, self._read_fd)
|
|
217 |
self.release()
|
|
218 |
try:
|
|
219 |
# Wait for notification
|
|
220 |
wait_fn(timeout)
|
|
221 |
finally:
|
|
222 |
# Re-acquire lock
|
|
223 |
self.acquire()
|
|
224 |
finally:
|
|
225 |
self._nwaiters -= 1
|
|
226 |
if self._nwaiters == 0:
|
|
227 |
self._Cleanup()
|
241 |
228 |
|
242 |
229 |
def notifyAll(self):
|
243 |
230 |
"""Close the writing side of the pipe to notify all waiters.
|
244 |
231 |
|
245 |
232 |
"""
|
246 |
|
if self._write_fd is None:
|
247 |
|
raise RuntimeError("Can only notify once")
|
248 |
|
|
249 |
|
os.close(self._write_fd)
|
250 |
|
self._write_fd = None
|
251 |
|
|
252 |
|
def _Cleanup(self):
|
253 |
|
"""Close all file descriptors.
|
254 |
|
|
255 |
|
"""
|
256 |
|
if self._read_fd is not None:
|
257 |
|
os.close(self._read_fd)
|
258 |
|
self._read_fd = None
|
259 |
|
|
|
233 |
self._check_owned()
|
|
234 |
self._check_unnotified()
|
|
235 |
self._notified = True
|
260 |
236 |
if self._write_fd is not None:
|
261 |
237 |
os.close(self._write_fd)
|
262 |
238 |
self._write_fd = None
|
263 |
239 |
|
264 |
|
self._poller = None
|
265 |
|
|
266 |
|
def __del__(self):
|
267 |
|
"""Called on object deletion.
|
268 |
|
|
269 |
|
Ensure no file descriptors are left open.
|
270 |
240 |
|
271 |
|
"""
|
272 |
|
self._Cleanup()
|
273 |
|
|
274 |
|
|
275 |
|
class _PipeCondition(_BaseCondition):
|
|
241 |
class PipeCondition(_BaseCondition):
|
276 |
242 |
"""Group-only non-polling condition with counters.
|
277 |
243 |
|
278 |
244 |
This condition class uses pipes and poll, internally, to be able to wait for
|
... | ... | |
284 |
250 |
"""
|
285 |
251 |
__slots__ = _BaseCondition.__slots__ + [
|
286 |
252 |
"_nwaiters",
|
287 |
|
"_pipe",
|
|
253 |
"_single_condition",
|
288 |
254 |
]
|
289 |
255 |
|
290 |
|
_pipe_class = _SingleActionPipeCondition
|
|
256 |
_single_condition_class = SingleNotifyPipeCondition
|
291 |
257 |
|
292 |
258 |
def __init__(self, lock):
|
293 |
259 |
"""Initializes this class.
|
... | ... | |
295 |
261 |
"""
|
296 |
262 |
_BaseCondition.__init__(self, lock)
|
297 |
263 |
self._nwaiters = 0
|
298 |
|
self._pipe = None
|
|
264 |
self._single_condition = self._single_condition_class(self._lock)
|
299 |
265 |
|
300 |
266 |
def wait(self, timeout=None):
|
301 |
267 |
"""Wait for a notification.
|
... | ... | |
306 |
272 |
"""
|
307 |
273 |
self._check_owned()
|
308 |
274 |
|
309 |
|
if not self._pipe:
|
310 |
|
self._pipe = self._pipe_class()
|
311 |
|
|
312 |
275 |
# Keep local reference to the pipe. It could be replaced by another thread
|
313 |
276 |
# notifying while we're waiting.
|
314 |
|
pipe = self._pipe
|
|
277 |
my_condition = self._single_condition
|
315 |
278 |
|
316 |
279 |
assert self._nwaiters >= 0
|
317 |
280 |
self._nwaiters += 1
|
318 |
281 |
try:
|
319 |
|
# Get function to wait on the pipe
|
320 |
|
wait_fn = pipe.StartWaiting()
|
321 |
|
try:
|
322 |
|
# Release lock while waiting
|
323 |
|
self.release()
|
324 |
|
try:
|
325 |
|
# Wait for notification
|
326 |
|
wait_fn(timeout)
|
327 |
|
finally:
|
328 |
|
# Re-acquire lock
|
329 |
|
self.acquire()
|
330 |
|
finally:
|
331 |
|
# Destroy pipe if this was the last waiter and the current pipe is
|
332 |
|
# still the same. The same pipe cannot be reused after cleanup.
|
333 |
|
if pipe.DoneWaiting() and pipe == self._pipe:
|
334 |
|
self._pipe = None
|
|
282 |
my_condition.wait(timeout)
|
335 |
283 |
finally:
|
336 |
284 |
assert self._nwaiters > 0
|
337 |
285 |
self._nwaiters -= 1
|
... | ... | |
341 |
289 |
|
342 |
290 |
"""
|
343 |
291 |
self._check_owned()
|
344 |
|
|
345 |
|
# Notify and forget pipe. A new one will be created on the next call to
|
346 |
|
# wait.
|
347 |
|
if self._pipe is not None:
|
348 |
|
self._pipe.notifyAll()
|
349 |
|
self._pipe = None
|
|
292 |
self._single_condition.notifyAll()
|
|
293 |
self._single_condition = self._single_condition_class(self._lock)
|
350 |
294 |
|
351 |
295 |
def has_waiting(self):
|
352 |
296 |
"""Returns whether there are active waiters.
|
... | ... | |
387 |
331 |
"""Waits for the condition to be notified.
|
388 |
332 |
|
389 |
333 |
@type timeout: float or None
|
390 |
|
@param timeout: Timeout in seconds
|
|
334 |
@param timeout: Waiting timeout (can be None)
|
391 |
335 |
|
392 |
336 |
"""
|
393 |
337 |
assert self._nwaiters >= 0
|
... | ... | |
427 |
371 |
"__shr",
|
428 |
372 |
]
|
429 |
373 |
|
430 |
|
__condition_class = _PipeCondition
|
|
374 |
__condition_class = PipeCondition
|
431 |
375 |
|
432 |
376 |
def __init__(self):
|
433 |
377 |
"""Construct a new SharedLock.
|