Revision debed9ae test/ganeti.utils_unittest.py

b/test/ganeti.utils_unittest.py
1840 1840
                             "", "x"])
1841 1841

  
1842 1842

  
1843
class TestReadLockedPidFile(unittest.TestCase):
1844
  def setUp(self):
1845
    self.tmpdir = tempfile.mkdtemp()
1846

  
1847
  def tearDown(self):
1848
    shutil.rmtree(self.tmpdir)
1849

  
1850
  def testNonExistent(self):
1851
    path = utils.PathJoin(self.tmpdir, "nonexist")
1852
    self.assert_(utils.ReadLockedPidFile(path) is None)
1853

  
1854
  def testUnlocked(self):
1855
    path = utils.PathJoin(self.tmpdir, "pid")
1856
    utils.WriteFile(path, data="123")
1857
    self.assert_(utils.ReadLockedPidFile(path) is None)
1858

  
1859
  def testLocked(self):
1860
    path = utils.PathJoin(self.tmpdir, "pid")
1861
    utils.WriteFile(path, data="123")
1862

  
1863
    fl = utils.FileLock.Open(path)
1864
    try:
1865
      fl.Exclusive(blocking=True)
1866

  
1867
      self.assertEqual(utils.ReadLockedPidFile(path), 123)
1868
    finally:
1869
      fl.Close()
1870

  
1871
    self.assert_(utils.ReadLockedPidFile(path) is None)
1872

  
1873
  def testError(self):
1874
    path = utils.PathJoin(self.tmpdir, "foobar", "pid")
1875
    utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
1876
    # open(2) should return ENOTDIR
1877
    self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
1878

  
1879

  
1843 1880
if __name__ == '__main__':
1844 1881
  testutils.GanetiTestProgram()

Also available in: Unified diff