Revision d76167a5 test/ganeti.locking_unittest.py

b/test/ganeti.locking_unittest.py
69 69
    self.threads = []
70 70

  
71 71

  
72
class TestSingleActionPipeCondition(unittest.TestCase):
73
  """_SingleActionPipeCondition tests"""
74

  
75
  def setUp(self):
76
    self.cond = locking._SingleActionPipeCondition()
77

  
78
  def testInitialization(self):
79
    self.assert_(self.cond._read_fd is not None)
80
    self.assert_(self.cond._write_fd is not None)
81
    self.assert_(self.cond._poller is not None)
82
    self.assertEqual(self.cond._nwaiters, 0)
83

  
84
  def testUsageCount(self):
85
    self.cond.StartWaiting()
86
    self.assert_(self.cond._read_fd is not None)
87
    self.assert_(self.cond._write_fd is not None)
88
    self.assert_(self.cond._poller is not None)
89
    self.assertEqual(self.cond._nwaiters, 1)
90

  
91
    # use again
92
    self.cond.StartWaiting()
93
    self.assertEqual(self.cond._nwaiters, 2)
94

  
95
    # there is more than one user
96
    self.assert_(not self.cond.DoneWaiting())
97
    self.assert_(self.cond._read_fd is not None)
98
    self.assert_(self.cond._write_fd is not None)
99
    self.assert_(self.cond._poller is not None)
100
    self.assertEqual(self.cond._nwaiters, 1)
101

  
102
    self.assert_(self.cond.DoneWaiting())
103
    self.assertEqual(self.cond._nwaiters, 0)
104
    self.assert_(self.cond._read_fd is None)
105
    self.assert_(self.cond._write_fd is None)
106
    self.assert_(self.cond._poller is None)
107

  
108
  def testNotify(self):
109
    wait1 = self.cond.StartWaiting()
110
    wait2 = self.cond.StartWaiting()
111

  
112
    self.assert_(self.cond._read_fd is not None)
113
    self.assert_(self.cond._write_fd is not None)
114
    self.assert_(self.cond._poller is not None)
115

  
116
    self.cond.notifyAll()
117

  
118
    self.assert_(self.cond._read_fd is not None)
119
    self.assert_(self.cond._write_fd is None)
120
    self.assert_(self.cond._poller is not None)
121

  
122
    self.assert_(not self.cond.DoneWaiting())
123

  
124
    self.assert_(self.cond._read_fd is not None)
125
    self.assert_(self.cond._write_fd is None)
126
    self.assert_(self.cond._poller is not None)
127

  
128
    self.assert_(self.cond.DoneWaiting())
129

  
130
    self.assert_(self.cond._read_fd is None)
131
    self.assert_(self.cond._write_fd is None)
132
    self.assert_(self.cond._poller is None)
133

  
134
  def testReusage(self):
135
    self.cond.StartWaiting()
136
    self.assert_(self.cond._read_fd is not None)
137
    self.assert_(self.cond._write_fd is not None)
138
    self.assert_(self.cond._poller is not None)
139

  
140
    self.assert_(self.cond.DoneWaiting())
141

  
142
    self.assertRaises(RuntimeError, self.cond.StartWaiting)
143
    self.assert_(self.cond._read_fd is None)
144
    self.assert_(self.cond._write_fd is None)
145
    self.assert_(self.cond._poller is None)
146

  
147
  def testNotifyTwice(self):
148
    self.cond.notifyAll()
149
    self.assertRaises(RuntimeError, self.cond.notifyAll)
150

  
151

  
72 152
class TestSharedLock(_ThreadedTestCase):
73 153
  """SharedLock tests"""
74 154

  

Also available in: Unified diff