"""Test class that supports adding/waiting on threads"""
def setUp(self):
unittest.TestCase.setUp(self)
+ self.done = Queue.Queue(0)
self.threads = []
def _addThread(self, *args, **kwargs):
_ThreadedTestCase.setUp(self)
self.lock = threading.Lock()
self.cond = locking._PipeCondition(self.lock)
- self.done = Queue.Queue(0)
def testAcquireRelease(self):
self.assert_(not self.cond._is_owned())
def setUp(self):
_ThreadedTestCase.setUp(self)
self.sl = locking.SharedLock()
- # helper threads use the 'done' queue to tell the master they finished.
- self.done = Queue.Queue(0)
def testSequenceAndOwnership(self):
self.assert_(not self.sl._is_owned())
def setUp(self):
_ThreadedTestCase.setUp(self)
- # helper threads use the 'done' queue to tell the master they finished.
- self.done = Queue.Queue(0)
@locking.ssynchronized(_decoratorlock)
def _doItExclusive(self):
def setUp(self):
_ThreadedTestCase.setUp(self)
self._setUpLS()
- # helper threads use the 'done' queue to tell the master they finished.
- self.done = Queue.Queue(0)
def _setUpLS(self):
"""Helper to (re)initialize the lock set"""
self.instances=['i1', 'i2', 'i3']
self.GL = locking.GanetiLockManager(nodes=self.nodes,
instances=self.instances)
- self.done = Queue.Queue(0)
def tearDown(self):
# Don't try this at home...