Revision 34cb5617

b/lib/locking.py
49 49
  return wrap
50 50

  
51 51

  
52
class _SingleActionPipeConditionWaiter(object):
53
  """Callable helper class for _SingleActionPipeCondition.
52
class _SingleNotifyPipeConditionWaiter(object):
53
  """Helper class for SingleNotifyPipeCondition
54 54

  
55 55
  """
56 56
  __slots__ = [
57
    "_cond",
58 57
    "_fd",
59 58
    "_poller",
60 59
    ]
61 60

  
62
  def __init__(self, cond, poller, fd):
63
    """Initializes this class.
61
  def __init__(self, poller, fd):
62
    """Constructor for _SingleNotifyPipeConditionWaiter
64 63

  
65
    @type cond: L{_SingleActionPipeCondition}
66
    @param cond: Parent condition
67 64
    @type poller: select.poll
68 65
    @param poller: Poller object
69 66
    @type fd: int
......
71 68

  
72 69
    """
73 70
    object.__init__(self)
74

  
75
    self._cond = cond
76 71
    self._poller = poller
77 72
    self._fd = fd
78 73

  
......
152 147
      raise RuntimeError("cannot work with un-aquired lock")
153 148

  
154 149

  
155
class _SingleActionPipeCondition(object):
156
  """Wrapper around a pipe for usage inside conditions.
157

  
158
  This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
159
  always allocated when constructing the class. Extra care is taken to always
160
  close the file descriptors.
161

  
162
  An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
163
  notifications.
150
class SingleNotifyPipeCondition(_BaseCondition):
151
  """Condition which can only be notified once.
164 152

  
165
  Warning: This class is designed to be used as the underlying component of a
166
  locking condition, but is not by itself thread safe, and needs to be
167
  protected by an external lock.
153
  This condition class uses pipes and poll, internally, to be able to wait for
154
  notification with a timeout, without resorting to polling. It is almost
155
  compatible with Python's threading.Condition, with the following differences:
156
    - notifyAll can only be called once, and no wait can happen after that
157
    - notify is not supported, only notifyAll
168 158

  
169 159
  """
170
  __slots__ = [
160

  
161
  __slots__ = _BaseCondition.__slots__ + [
171 162
    "_poller",
172 163
    "_read_fd",
173 164
    "_write_fd",
174 165
    "_nwaiters",
166
    "_notified",
175 167
    ]
176 168

  
177
  _waiter_class = _SingleActionPipeConditionWaiter
169
  _waiter_class = _SingleNotifyPipeConditionWaiter
178 170

  
179
  def __init__(self):
180
    """Initializes this class.
171
  def __init__(self, lock):
172
    """Constructor for SingleNotifyPipeCondition
181 173

  
182 174
    """
183
    object.__init__(self)
184

  
175
    _BaseCondition.__init__(self, lock)
185 176
    self._nwaiters = 0
177
    self._notified = False
178
    self._read_fd = None
179
    self._write_fd = None
180
    self._poller = None
186 181

  
187
    # Just assume the unpacking is successful, otherwise error handling gets
188
    # very complicated.
189
    (self._read_fd, self._write_fd) = os.pipe()
190
    try:
191
      # The poller looks for closure of the write side
192
      poller = select.poll()
193
      poller.register(self._read_fd, select.POLLHUP)
194

  
195
      self._poller = poller
196
    except:
197
      if self._read_fd is not None:
198
        os.close(self._read_fd)
199
      if self._write_fd is not None:
200
        os.close(self._write_fd)
201
      raise
202

  
203
    # There should be no code here anymore, otherwise the pipe file descriptors
204
    # may be not be cleaned up properly in case of errors.
205

  
206
  def StartWaiting(self):
207
    """Return function to wait for notification.
182
  def _check_unnotified(self):
183
    if self._notified:
184
      raise RuntimeError("cannot use already notified condition")
208 185

  
209
    @rtype: L{_SingleActionPipeConditionWaiter}
210
    @return: Function to wait for notification
186
  def _Cleanup(self):
187
    """Cleanup open file descriptors, if any.
211 188

  
212 189
    """
213
    assert self._nwaiters >= 0
214

  
215
    if self._poller is None:
216
      raise RuntimeError("Already cleaned up")
217

  
218
    # Create waiter function and increase number of waiters
219
    wait_fn = self._waiter_class(self, self._poller, self._read_fd)
220
    self._nwaiters += 1
221
    return wait_fn
190
    if self._read_fd is not None:
191
      os.close(self._read_fd)
192
      self._read_fd = None
222 193

  
223
  def DoneWaiting(self):
224
    """Decrement number of waiters and automatic cleanup.
194
    if self._write_fd is not None:
195
      os.close(self._write_fd)
196
      self._write_fd = None
197
    self._poller = None
225 198

  
226
    Must be called after waiting for a notification.
199
  def wait(self, timeout=None):
200
    """Wait for a notification.
227 201

  
228
    @rtype: bool
229
    @return: Whether this was the last waiter
202
    @type timeout: float or None
203
    @param timeout: Waiting timeout (can be None)
230 204

  
231 205
    """
232
    assert self._nwaiters > 0
233

  
234
    self._nwaiters -= 1
206
    self._check_owned()
207
    self._check_unnotified()
235 208

  
236
    if self._nwaiters == 0:
237
      self._Cleanup()
238
      return True
209
    self._nwaiters += 1
210
    try:
211
      if self._poller is None:
212
        (self._read_fd, self._write_fd) = os.pipe()
213
        self._poller = select.poll()
214
        self._poller.register(self._read_fd, select.POLLHUP)
239 215

  
240
    return False
216
      wait_fn = self._waiter_class(self._poller, self._read_fd)
217
      self.release()
218
      try:
219
        # Wait for notification
220
        wait_fn(timeout)
221
      finally:
222
        # Re-acquire lock
223
        self.acquire()
224
    finally:
225
      self._nwaiters -= 1
226
      if self._nwaiters == 0:
227
        self._Cleanup()
241 228

  
242 229
  def notifyAll(self):
243 230
    """Close the writing side of the pipe to notify all waiters.
244 231

  
245 232
    """
246
    if self._write_fd is None:
247
      raise RuntimeError("Can only notify once")
248

  
249
    os.close(self._write_fd)
250
    self._write_fd = None
251

  
252
  def _Cleanup(self):
253
    """Close all file descriptors.
254

  
255
    """
256
    if self._read_fd is not None:
257
      os.close(self._read_fd)
258
      self._read_fd = None
259

  
233
    self._check_owned()
234
    self._check_unnotified()
235
    self._notified = True
260 236
    if self._write_fd is not None:
261 237
      os.close(self._write_fd)
262 238
      self._write_fd = None
263 239

  
264
    self._poller = None
265

  
266
  def __del__(self):
267
    """Called on object deletion.
268

  
269
    Ensure no file descriptors are left open.
270 240

  
271
    """
272
    self._Cleanup()
273

  
274

  
275
class _PipeCondition(_BaseCondition):
241
class PipeCondition(_BaseCondition):
276 242
  """Group-only non-polling condition with counters.
277 243

  
278 244
  This condition class uses pipes and poll, internally, to be able to wait for
......
284 250
  """
285 251
  __slots__ = _BaseCondition.__slots__ + [
286 252
    "_nwaiters",
287
    "_pipe",
253
    "_single_condition",
288 254
    ]
289 255

  
290
  _pipe_class = _SingleActionPipeCondition
256
  _single_condition_class = SingleNotifyPipeCondition
291 257

  
292 258
  def __init__(self, lock):
293 259
    """Initializes this class.
......
295 261
    """
296 262
    _BaseCondition.__init__(self, lock)
297 263
    self._nwaiters = 0
298
    self._pipe = None
264
    self._single_condition = self._single_condition_class(self._lock)
299 265

  
300 266
  def wait(self, timeout=None):
301 267
    """Wait for a notification.
......
306 272
    """
307 273
    self._check_owned()
308 274

  
309
    if not self._pipe:
310
      self._pipe = self._pipe_class()
311

  
312 275
    # Keep local reference to the pipe. It could be replaced by another thread
313 276
    # notifying while we're waiting.
314
    pipe = self._pipe
277
    my_condition = self._single_condition
315 278

  
316 279
    assert self._nwaiters >= 0
317 280
    self._nwaiters += 1
318 281
    try:
319
      # Get function to wait on the pipe
320
      wait_fn = pipe.StartWaiting()
321
      try:
322
        # Release lock while waiting
323
        self.release()
324
        try:
325
          # Wait for notification
326
          wait_fn(timeout)
327
        finally:
328
          # Re-acquire lock
329
          self.acquire()
330
      finally:
331
        # Destroy pipe if this was the last waiter and the current pipe is
332
        # still the same. The same pipe cannot be reused after cleanup.
333
        if pipe.DoneWaiting() and pipe == self._pipe:
334
          self._pipe = None
282
      my_condition.wait(timeout)
335 283
    finally:
336 284
      assert self._nwaiters > 0
337 285
      self._nwaiters -= 1
......
341 289

  
342 290
    """
343 291
    self._check_owned()
344

  
345
    # Notify and forget pipe. A new one will be created on the next call to
346
    # wait.
347
    if self._pipe is not None:
348
      self._pipe.notifyAll()
349
      self._pipe = None
292
    self._single_condition.notifyAll()
293
    self._single_condition = self._single_condition_class(self._lock)
350 294

  
351 295
  def has_waiting(self):
352 296
    """Returns whether there are active waiters.
......
387 331
    """Waits for the condition to be notified.
388 332

  
389 333
    @type timeout: float or None
390
    @param timeout: Timeout in seconds
334
    @param timeout: Waiting timeout (can be None)
391 335

  
392 336
    """
393 337
    assert self._nwaiters >= 0
......
427 371
    "__shr",
428 372
    ]
429 373

  
430
  __condition_class = _PipeCondition
374
  __condition_class = PipeCondition
431 375

  
432 376
  def __init__(self):
433 377
    """Construct a new SharedLock.
b/test/ganeti.locking_unittest.py
114 114
    self.assert_(not self.cond._is_owned())
115 115

  
116 116

  
117
class TestSingleNotifyPipeCondition(_ConditionTestCase):
118
  """SingleNotifyPipeCondition tests"""
119

  
120
  def setUp(self):
121
    _ConditionTestCase.setUp(self, locking.SingleNotifyPipeCondition)
122

  
123
  def testAcquireRelease(self):
124
    self._testAcquireRelease()
125

  
126
  def testNotification(self):
127
    self._testNotification()
128

  
129
  def testWaitReuse(self):
130
    self.cond.acquire()
131
    self.cond.wait(0)
132
    self.cond.wait(0.1)
133
    self.cond.release()
134

  
135
  def testNoNotifyReuse(self):
136
    self.cond.acquire()
137
    self.cond.notifyAll()
138
    self.assertRaises(RuntimeError, self.cond.wait)
139
    self.assertRaises(RuntimeError, self.cond.notifyAll)
140
    self.cond.release()
141

  
142

  
117 143
class TestPipeCondition(_ConditionTestCase):
118
  """_PipeCondition tests"""
144
  """PipeCondition tests"""
119 145

  
120 146
  def setUp(self):
121
    _ConditionTestCase.setUp(self, locking._PipeCondition)
147
    _ConditionTestCase.setUp(self, locking.PipeCondition)
122 148

  
123 149
  def testAcquireRelease(self):
124 150
    self._testAcquireRelease()
......
214 240
    self.assertRaises(Queue.Empty, self.done.get_nowait)
215 241

  
216 242

  
217
class TestSingleActionPipeCondition(unittest.TestCase):
218
  """_SingleActionPipeCondition tests"""
219

  
220
  def setUp(self):
221
    self.cond = locking._SingleActionPipeCondition()
222

  
223
  def testInitialization(self):
224
    self.assert_(self.cond._read_fd is not None)
225
    self.assert_(self.cond._write_fd is not None)
226
    self.assert_(self.cond._poller is not None)
227
    self.assertEqual(self.cond._nwaiters, 0)
228

  
229
  def testUsageCount(self):
230
    self.cond.StartWaiting()
231
    self.assert_(self.cond._read_fd is not None)
232
    self.assert_(self.cond._write_fd is not None)
233
    self.assert_(self.cond._poller is not None)
234
    self.assertEqual(self.cond._nwaiters, 1)
235

  
236
    # use again
237
    self.cond.StartWaiting()
238
    self.assertEqual(self.cond._nwaiters, 2)
239

  
240
    # there is more than one user
241
    self.assert_(not self.cond.DoneWaiting())
242
    self.assert_(self.cond._read_fd is not None)
243
    self.assert_(self.cond._write_fd is not None)
244
    self.assert_(self.cond._poller is not None)
245
    self.assertEqual(self.cond._nwaiters, 1)
246

  
247
    self.assert_(self.cond.DoneWaiting())
248
    self.assertEqual(self.cond._nwaiters, 0)
249
    self.assert_(self.cond._read_fd is None)
250
    self.assert_(self.cond._write_fd is None)
251
    self.assert_(self.cond._poller is None)
252

  
253
  def testNotify(self):
254
    wait1 = self.cond.StartWaiting()
255
    wait2 = self.cond.StartWaiting()
256

  
257
    self.assert_(self.cond._read_fd is not None)
258
    self.assert_(self.cond._write_fd is not None)
259
    self.assert_(self.cond._poller is not None)
260

  
261
    self.cond.notifyAll()
262

  
263
    self.assert_(self.cond._read_fd is not None)
264
    self.assert_(self.cond._write_fd is None)
265
    self.assert_(self.cond._poller is not None)
266

  
267
    self.assert_(not self.cond.DoneWaiting())
268

  
269
    self.assert_(self.cond._read_fd is not None)
270
    self.assert_(self.cond._write_fd is None)
271
    self.assert_(self.cond._poller is not None)
272

  
273
    self.assert_(self.cond.DoneWaiting())
274

  
275
    self.assert_(self.cond._read_fd is None)
276
    self.assert_(self.cond._write_fd is None)
277
    self.assert_(self.cond._poller is None)
278

  
279
  def testReusage(self):
280
    self.cond.StartWaiting()
281
    self.assert_(self.cond._read_fd is not None)
282
    self.assert_(self.cond._write_fd is not None)
283
    self.assert_(self.cond._poller is not None)
284

  
285
    self.assert_(self.cond.DoneWaiting())
286

  
287
    self.assertRaises(RuntimeError, self.cond.StartWaiting)
288
    self.assert_(self.cond._read_fd is None)
289
    self.assert_(self.cond._write_fd is None)
290
    self.assert_(self.cond._poller is None)
291

  
292
  def testNotifyTwice(self):
293
    self.cond.notifyAll()
294
    self.assertRaises(RuntimeError, self.cond.notifyAll)
295

  
296

  
297 243
class TestSharedLock(_ThreadedTestCase):
298 244
  """SharedLock tests"""
299 245

  

Also available in: Unified diff