Signed-off-by: Manuel Franceschini <livewire@google.com>
Reviewed-by: Guido Trotter <ultrotter@google.com>
utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
self.mainloop.Run()
self.assert_(self.notified[self.NOTIFIER_TERM])
utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
self.mainloop.Run()
self.assert_(self.notified[self.NOTIFIER_TERM])
- self.assert_(not self.notified[self.NOTIFIER_NORM])
+ self.assertFalse(self.notified[self.NOTIFIER_NORM])
self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
self.mainloop.Run()
self.assert_(self.notified[self.NOTIFIER_TERM])
utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
self.mainloop.Run()
self.assert_(self.notified[self.NOTIFIER_TERM])
- self.assert_(not self.notified[self.NOTIFIER_NORM])
+ self.assertFalse(self.notified[self.NOTIFIER_NORM])
self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
self.mainloop.Run()
self.assert_(self.notified[self.NOTIFIER_TERM])
utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
self.mainloop.Run()
self.assert_(self.notified[self.NOTIFIER_TERM])
- self.assert_(not self.notified[self.NOTIFIER_NORM])
+ self.assertFalse(self.notified[self.NOTIFIER_NORM])
self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
self.mainloop.Run()
self.assert_(self.notified[self.NOTIFIER_TERM])
# NORM notifier is disabled by default
self.mainloop.Run()
self.assert_(self.notified[self.NOTIFIER_TERM])
# NORM notifier is disabled by default
- self.assert_(not self.notified[self.NOTIFIER_NORM])
+ self.assertFalse(self.notified[self.NOTIFIER_NORM])
self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
self.cond = cls(self.lock)
def _testAcquireRelease(self):
self.cond = cls(self.lock)
def _testAcquireRelease(self):
- self.assert_(not self.cond._is_owned())
+ self.assertFalse(self.cond._is_owned())
self.assertRaises(RuntimeError, self.cond.wait)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.assertRaises(RuntimeError, self.cond.wait)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.assert_(self.cond._is_owned())
self.cond.release()
self.assert_(self.cond._is_owned())
self.cond.release()
- self.assert_(not self.cond._is_owned())
+ self.assertFalse(self.cond._is_owned())
self.assertRaises(RuntimeError, self.cond.wait)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.assertRaises(RuntimeError, self.cond.wait)
self.assertRaises(RuntimeError, self.cond.notifyAll)
self.assertEqual(self.done.get(True, 1), "NN")
self.assert_(self.cond._is_owned())
self.cond.release()
self.assertEqual(self.done.get(True, 1), "NN")
self.assert_(self.cond._is_owned())
self.cond.release()
- self.assert_(not self.cond._is_owned())
+ self.assertFalse(self.cond._is_owned())
class TestSingleNotifyPipeCondition(_ConditionTestCase):
class TestSingleNotifyPipeCondition(_ConditionTestCase):
self.sl = locking.SharedLock()
def testSequenceAndOwnership(self):
self.sl = locking.SharedLock()
def testSequenceAndOwnership(self):
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl._is_owned())
self.sl.acquire(shared=1)
self.assert_(self.sl._is_owned())
self.assert_(self.sl._is_owned(shared=1))
self.sl.acquire(shared=1)
self.assert_(self.sl._is_owned())
self.assert_(self.sl._is_owned(shared=1))
- self.assert_(not self.sl._is_owned(shared=0))
+ self.assertFalse(self.sl._is_owned(shared=0))
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl._is_owned())
self.sl.acquire()
self.assert_(self.sl._is_owned())
self.sl.acquire()
self.assert_(self.sl._is_owned())
- self.assert_(not self.sl._is_owned(shared=1))
+ self.assertFalse(self.sl._is_owned(shared=1))
self.assert_(self.sl._is_owned(shared=0))
self.sl.release()
self.assert_(self.sl._is_owned(shared=0))
self.sl.release()
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl._is_owned())
self.sl.acquire(shared=1)
self.assert_(self.sl._is_owned())
self.assert_(self.sl._is_owned(shared=1))
self.sl.acquire(shared=1)
self.assert_(self.sl._is_owned())
self.assert_(self.sl._is_owned(shared=1))
- self.assert_(not self.sl._is_owned(shared=0))
+ self.assertFalse(self.sl._is_owned(shared=0))
- self.assert_(not self.sl._is_owned())
+ self.assertFalse(self.sl._is_owned())
def testBooleanValue(self):
# semaphores are supposed to return a true value on a successful acquire
def testBooleanValue(self):
# semaphores are supposed to return a true value on a successful acquire
def testDecoratedFunctions(self):
self._doItExclusive()
def testDecoratedFunctions(self):
self._doItExclusive()
- self.assert_(not _decoratorlock._is_owned())
+ self.assertFalse(_decoratorlock._is_owned())
- self.assert_(not _decoratorlock._is_owned())
+ self.assertFalse(_decoratorlock._is_owned())
def testSharersCanCoexist(self):
_decoratorlock.acquire(shared=1)
def testSharersCanCoexist(self):
_decoratorlock.acquire(shared=1)
self.ls.release()
else:
self.assert_(acquired is None)
self.ls.release()
else:
self.assert_(acquired is None)
- self.assert_(not self.ls._list_owned())
- self.assert_(not self.ls._is_owned())
+ self.assertFalse(self.ls._list_owned())
+ self.assertFalse(self.ls._is_owned())
self.done.put("not acquired")
self._addThread(target=_AcquireOne)
self.done.put("not acquired")
self._addThread(target=_AcquireOne)
self.ls.release(names=name)
self.ls.release(names=name)
- self.assert_(not self.ls._list_owned())
+ self.assertFalse(self.ls._list_owned())
elif pid_non_existing < 0:
raise SystemError("can't fork")
os.waitpid(pid_non_existing, 0)
elif pid_non_existing < 0:
raise SystemError("can't fork")
os.waitpid(pid_non_existing, 0)
- self.assert_(not IsProcessAlive(pid_non_existing),
- "nonexisting process detected")
+ self.assertFalse(IsProcessAlive(pid_non_existing),
+ "nonexisting process detected")
class TestGetProcStatusPath(unittest.TestCase):
class TestGetProcStatusPath(unittest.TestCase):
self.assert_(IsNormAbsPath(path),
"Path %s should result absolute and normalized" % path)
else:
self.assert_(IsNormAbsPath(path),
"Path %s should result absolute and normalized" % path)
else:
- self.assert_(not IsNormAbsPath(path),
+ self.assertFalse(IsNormAbsPath(path),
"Path %s should not result absolute and normalized" % path)
def testBase(self):
"Path %s should not result absolute and normalized" % path)
def testBase(self):
def testRecursiveExisting(self):
path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
def testRecursiveExisting(self):
path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
- self.assert_(not os.path.exists(path))
+ self.assertFalse(os.path.exists(path))
os.mkdir(utils.PathJoin(self.tmpdir, "B"))
utils.Makedirs(path)
self.assert_(os.path.isdir(path))
os.mkdir(utils.PathJoin(self.tmpdir, "B"))
utils.Makedirs(path)
self.assert_(os.path.isdir(path))