self._write_fd = None
self._poller = None
- def wait(self, timeout=None):
+ def wait(self, timeout):
"""Wait for a notification.
@type timeout: float or None
self._waiters = set()
self._single_condition = self._single_condition_class(self._lock)
- def wait(self, timeout=None):
+ def wait(self, timeout):
"""Wait for a notification.
@type timeout: float or None
def _testAcquireRelease(self):
self.assertFalse(self.cond._is_owned())
- self.assertRaises(RuntimeError, self.cond.wait)
+ self.assertRaises(RuntimeError, self.cond.wait, None)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.cond.acquire()
self.cond.release()
self.assertFalse(self.cond._is_owned())
- self.assertRaises(RuntimeError, self.cond.wait)
+ self.assertRaises(RuntimeError, self.cond.wait, None)
self.assertRaises(RuntimeError, self.cond.notifyAll)
def _testNotification(self):
self._addThread(target=_NotifyAll)
self.assertEqual(self.done.get(True, 1), "NE")
self.assertRaises(Queue.Empty, self.done.get_nowait)
- self.cond.wait()
+ self.cond.wait(None)
self.assertEqual(self.done.get(True, 1), "NA")
self.assertEqual(self.done.get(True, 1), "NN")
self.assert_(self.cond._is_owned())
def testNoNotifyReuse(self):
self.cond.acquire()
self.cond.notifyAll()
- self.assertRaises(RuntimeError, self.cond.wait)
+ self.assertRaises(RuntimeError, self.cond.wait, None)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.cond.release()
def _BlockingWait():
self.cond.acquire()
self.done.put("A")
- self.cond.wait()
+ self.cond.wait(None)
self.cond.release()
self.done.put("W")