Revision 48dabc6a

b/lib/locking.py
223 223
    self._Cleanup()
224 224

  
225 225

  
226
class _PipeCondition(object):
227
  """Group-only non-polling condition with counters.
228

  
229
  This condition class uses pipes and poll, internally, to be able to wait for
230
  notification with a timeout, without resorting to polling. It is almost
231
  compatible with Python's threading.Condition, but only supports notifyAll and
232
  non-recursive locks. As an additional features it's able to report whether
233
  there are any waiting threads.
234

  
235
  """
236
  __slots__ = [
237
    "_lock",
238
    "_nwaiters",
239
    "_pipe",
240
    "acquire",
241
    "release",
242
    ]
243

  
244
  _pipe_class = _SingleActionPipeCondition
245

  
246
  def __init__(self, lock):
247
    """Initializes this class.
248

  
249
    """
250
    object.__init__(self)
251

  
252
    # Recursive locks are not supported
253
    assert not hasattr(lock, "_acquire_restore")
254
    assert not hasattr(lock, "_release_save")
255

  
256
    self._lock = lock
257

  
258
    # Export the lock's acquire() and release() methods
259
    self.acquire = lock.acquire
260
    self.release = lock.release
261

  
262
    self._nwaiters = 0
263
    self._pipe = None
264

  
265
  def _is_owned(self):
266
    """Check whether lock is owned by current thread.
267

  
268
    """
269
    if self._lock.acquire(0):
270
      self._lock.release()
271
      return False
272

  
273
    return True
274

  
275
  def _check_owned(self):
276
    """Raise an exception if the current thread doesn't own the lock.
277

  
278
    """
279
    if not self._is_owned():
280
      raise RuntimeError("cannot work with un-aquired lock")
281

  
282
  def wait(self, timeout=None):
283
    """Wait for a notification.
284

  
285
    @type timeout: float or None
286
    @param timeout: Waiting timeout (can be None)
287

  
288
    """
289
    self._check_owned()
290

  
291
    if not self._pipe:
292
      self._pipe = self._pipe_class()
293

  
294
    # Keep local reference to the pipe. It could be replaced by another thread
295
    # notifying while we're waiting.
296
    pipe = self._pipe
297

  
298
    assert self._nwaiters >= 0
299
    self._nwaiters += 1
300
    try:
301
      # Get function to wait on the pipe
302
      wait_fn = pipe.StartWaiting()
303
      try:
304
        # Release lock while waiting
305
        self.release()
306
        try:
307
          # Wait for notification
308
          wait_fn(timeout)
309
        finally:
310
          # Re-acquire lock
311
          self.acquire()
312
      finally:
313
        # Destroy pipe if this was the last waiter and the current pipe is
314
        # still the same. The same pipe cannot be reused after cleanup.
315
        if pipe.DoneWaiting() and pipe == self._pipe:
316
          self._pipe = None
317
    finally:
318
      assert self._nwaiters > 0
319
      self._nwaiters -= 1
320

  
321
  def notifyAll(self):
322
    """Notify all currently waiting threads.
323

  
324
    """
325
    self._check_owned()
326

  
327
    # Notify and forget pipe. A new one will be created on the next call to
328
    # wait.
329
    if self._pipe is not None:
330
      self._pipe.notifyAll()
331
      self._pipe = None
332

  
333
  def has_waiting(self):
334
    """Returns whether there are active waiters.
335

  
336
    """
337
    self._check_owned()
338

  
339
    return bool(self._nwaiters)
340

  
341

  
226 342
class _CountingCondition(object):
227 343
  """Wrapper for Python's built-in threading.Condition class.
228 344

  
b/test/ganeti.locking_unittest.py
69 69
    self.threads = []
70 70

  
71 71

  
72
class TestPipeCondition(_ThreadedTestCase):
73
  """_PipeCondition tests"""
74

  
75
  def setUp(self):
76
    _ThreadedTestCase.setUp(self)
77
    self.lock = threading.Lock()
78
    self.cond = locking._PipeCondition(self.lock)
79
    self.done = Queue.Queue(0)
80

  
81
  def testAcquireRelease(self):
82
    self.assert_(not self.cond._is_owned())
83
    self.assertRaises(RuntimeError, self.cond.wait)
84
    self.assertRaises(RuntimeError, self.cond.notifyAll)
85

  
86
    self.cond.acquire()
87
    self.assert_(self.cond._is_owned())
88
    self.cond.notifyAll()
89
    self.assert_(self.cond._is_owned())
90
    self.cond.release()
91

  
92
    self.assert_(not self.cond._is_owned())
93
    self.assertRaises(RuntimeError, self.cond.wait)
94
    self.assertRaises(RuntimeError, self.cond.notifyAll)
95

  
96
  def testNotification(self):
97
    def _NotifyAll():
98
      self.cond.acquire()
99
      self.cond.notifyAll()
100
      self.cond.release()
101

  
102
    self.cond.acquire()
103
    self._addThread(target=_NotifyAll)
104
    self.cond.wait()
105
    self.assert_(self.cond._is_owned())
106
    self.cond.release()
107
    self.assert_(not self.cond._is_owned())
108

  
109
  def _TestWait(self, fn):
110
    self._addThread(target=fn)
111
    self._addThread(target=fn)
112
    self._addThread(target=fn)
113

  
114
    # Wait for threads to be waiting
115
    self.assertEqual(self.done.get(True, 1), "A")
116
    self.assertEqual(self.done.get(True, 1), "A")
117
    self.assertEqual(self.done.get(True, 1), "A")
118

  
119
    self.assertRaises(Queue.Empty, self.done.get_nowait)
120

  
121
    self.cond.acquire()
122
    self.assertEqual(self.cond._nwaiters, 3)
123
    # This new thread can"t acquire the lock, and thus call wait, before we
124
    # release it
125
    self._addThread(target=fn)
126
    self.cond.notifyAll()
127
    self.assertRaises(Queue.Empty, self.done.get_nowait)
128
    self.cond.release()
129

  
130
    # We should now get 3 W and 1 A (for the new thread) in whatever order
131
    w = 0
132
    a = 0
133
    for i in range(4):
134
      got = self.done.get(True, 1)
135
      if got == "W":
136
        w += 1
137
      elif got == "A":
138
        a += 1
139
      else:
140
        self.fail("Got %s on the done queue" % got)
141

  
142
    self.assertEqual(w, 3)
143
    self.assertEqual(a, 1)
144

  
145
    self.cond.acquire()
146
    self.cond.notifyAll()
147
    self.cond.release()
148
    self._waitThreads()
149
    self.assertEqual(self.done.get_nowait(), "W")
150
    self.assertRaises(Queue.Empty, self.done.get_nowait)
151

  
152
  def testBlockingWait(self):
153
    def _BlockingWait():
154
      self.cond.acquire()
155
      self.done.put("A")
156
      self.cond.wait()
157
      self.cond.release()
158
      self.done.put("W")
159

  
160
    self._TestWait(_BlockingWait)
161

  
162
  def testLongTimeoutWait(self):
163
    def _Helper():
164
      self.cond.acquire()
165
      self.done.put("A")
166
      self.cond.wait(15.0)
167
      self.cond.release()
168
      self.done.put("W")
169

  
170
    self._TestWait(_Helper)
171

  
172
  def _TimeoutWait(self, timeout, check):
173
    self.cond.acquire()
174
    self.cond.wait(timeout)
175
    self.cond.release()
176
    self.done.put(check)
177

  
178
  def testShortTimeoutWait(self):
179
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
180
    self._addThread(target=self._TimeoutWait, args=(0.1, "T1"))
181
    self._waitThreads()
182
    self.assertEqual(self.done.get_nowait(), "T1")
183
    self.assertEqual(self.done.get_nowait(), "T1")
184
    self.assertRaises(Queue.Empty, self.done.get_nowait)
185

  
186
  def testZeroTimeoutWait(self):
187
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
188
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
189
    self._addThread(target=self._TimeoutWait, args=(0, "T0"))
190
    self._waitThreads()
191
    self.assertEqual(self.done.get_nowait(), "T0")
192
    self.assertEqual(self.done.get_nowait(), "T0")
193
    self.assertEqual(self.done.get_nowait(), "T0")
194
    self.assertRaises(Queue.Empty, self.done.get_nowait)
195

  
196

  
72 197
class TestSingleActionPipeCondition(unittest.TestCase):
73 198
  """_SingleActionPipeCondition tests"""
74 199

  

Also available in: Unified diff