Revision c5fe2a67 test/ganeti.locking_unittest.py

b/test/ganeti.locking_unittest.py
70 70
    self.threads = []
71 71

  
72 72

  
73
class TestPipeCondition(_ThreadedTestCase):
74
  """_PipeCondition tests"""
73
class _ConditionTestCase(_ThreadedTestCase):
74
  """Common test case for conditions"""
75 75

  
76
  def setUp(self):
76
  def setUp(self, cls):
77 77
    _ThreadedTestCase.setUp(self)
78 78
    self.lock = threading.Lock()
79
    self.cond = locking._PipeCondition(self.lock)
79
    self.cond = cls(self.lock)
80 80

  
81
  def testAcquireRelease(self):
81
  def _testAcquireRelease(self):
82 82
    self.assert_(not self.cond._is_owned())
83 83
    self.assertRaises(RuntimeError, self.cond.wait)
84 84
    self.assertRaises(RuntimeError, self.cond.notifyAll)
......
93 93
    self.assertRaises(RuntimeError, self.cond.wait)
94 94
    self.assertRaises(RuntimeError, self.cond.notifyAll)
95 95

  
96
  def testNotification(self):
96
  def _testNotification(self):
97 97
    def _NotifyAll():
98 98
      self.cond.acquire()
99 99
      self.cond.notifyAll()
......
106 106
    self.cond.release()
107 107
    self.assert_(not self.cond._is_owned())
108 108

  
109

  
110
class TestPipeCondition(_ConditionTestCase):
111
  """_PipeCondition tests"""
112

  
113
  def setUp(self):
114
    _ConditionTestCase.setUp(self, locking._PipeCondition)
115

  
116
  def testAcquireRelease(self):
117
    self._testAcquireRelease()
118

  
119
  def testNotification(self):
120
    self._testNotification()
121

  
109 122
  def _TestWait(self, fn):
110 123
    self._addThread(target=fn)
111 124
    self._addThread(target=fn)

Also available in: Unified diff