+class TestFirstFree(unittest.TestCase):
+ """Test case for the FirstFree function"""
+
+ def test(self):
+ """Test FirstFree"""
+ self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
+ self.failUnlessEqual(FirstFree([]), None)
+ self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
+ self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
+ self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
+
+
+class TestTailFile(testutils.GanetiTestCase):
+ """Test case for the TailFile function"""
+
+ def testEmpty(self):
+ fname = self._CreateTempFile()
+ self.failUnlessEqual(TailFile(fname), [])
+ self.failUnlessEqual(TailFile(fname, lines=25), [])
+
+ def testAllLines(self):
+ data = ["test %d" % i for i in range(30)]
+ for i in range(30):
+ fname = self._CreateTempFile()
+ fd = open(fname, "w")
+ fd.write("\n".join(data[:i]))
+ if i > 0:
+ fd.write("\n")
+ fd.close()
+ self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
+
+ def testPartialLines(self):
+ data = ["test %d" % i for i in range(30)]
+ fname = self._CreateTempFile()
+ fd = open(fname, "w")
+ fd.write("\n".join(data))
+ fd.write("\n")
+ fd.close()
+ for i in range(1, 30):
+ self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
+
+ def testBigFile(self):
+ data = ["test %d" % i for i in range(30)]
+ fname = self._CreateTempFile()
+ fd = open(fname, "w")
+ fd.write("X" * 1048576)
+ fd.write("\n")
+ fd.write("\n".join(data))
+ fd.write("\n")
+ fd.close()
+ for i in range(1, 30):
+ self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
+
+
+class TestFileLock(unittest.TestCase):
+ """Test case for the FileLock class"""
+
+ def setUp(self):
+ self.tmpfile = tempfile.NamedTemporaryFile()
+ self.lock = utils.FileLock(self.tmpfile.name)
+
+ def testSharedNonblocking(self):
+ self.lock.Shared(blocking=False)
+ self.lock.Close()
+
+ def testExclusiveNonblocking(self):
+ self.lock.Exclusive(blocking=False)
+ self.lock.Close()
+
+ def testUnlockNonblocking(self):
+ self.lock.Unlock(blocking=False)
+ self.lock.Close()
+
+ def testSharedBlocking(self):
+ self.lock.Shared(blocking=True)
+ self.lock.Close()
+
+ def testExclusiveBlocking(self):
+ self.lock.Exclusive(blocking=True)
+ self.lock.Close()
+
+ def testUnlockBlocking(self):
+ self.lock.Unlock(blocking=True)
+ self.lock.Close()
+
+ def testSharedExclusiveUnlock(self):
+ self.lock.Shared(blocking=False)
+ self.lock.Exclusive(blocking=False)
+ self.lock.Unlock(blocking=False)
+ self.lock.Close()
+
+ def testExclusiveSharedUnlock(self):
+ self.lock.Exclusive(blocking=False)
+ self.lock.Shared(blocking=False)
+ self.lock.Unlock(blocking=False)
+ self.lock.Close()
+
+ def testCloseShared(self):
+ self.lock.Close()
+ self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
+
+ def testCloseExclusive(self):
+ self.lock.Close()
+ self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
+
+ def testCloseUnlock(self):
+ self.lock.Close()
+ self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
+
+
+class TestTimeFunctions(unittest.TestCase):
+ """Test case for time functions"""
+
+ def runTest(self):
+ self.assertEqual(utils.SplitTime(1), (1, 0))
+ self.assertEqual(utils.SplitTime(1.5), (1, 500000))
+ self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
+ self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
+ self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
+ self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
+ self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
+ self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
+
+ self.assertRaises(AssertionError, utils.SplitTime, -1)
+
+ self.assertEqual(utils.MergeTime((1, 0)), 1.0)
+ self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
+ self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
+
+ self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3), 1218448917.481)
+ self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
+
+ self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
+ self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
+ self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
+ self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
+ self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
+
+
+class FieldSetTestCase(unittest.TestCase):
+ """Test case for FieldSets"""
+
+ def testSimpleMatch(self):
+ f = utils.FieldSet("a", "b", "c", "def")
+ self.failUnless(f.Matches("a"))
+ self.failIf(f.Matches("d"), "Substring matched")
+ self.failIf(f.Matches("defghi"), "Prefix string matched")
+ self.failIf(f.NonMatching(["b", "c"]))
+ self.failIf(f.NonMatching(["a", "b", "c", "def"]))
+ self.failUnless(f.NonMatching(["a", "d"]))
+
+ def testRegexMatch(self):
+ f = utils.FieldSet("a", "b([0-9]+)", "c")
+ self.failUnless(f.Matches("b1"))
+ self.failUnless(f.Matches("b99"))
+ self.failIf(f.Matches("b/1"))
+ self.failIf(f.NonMatching(["b12", "c"]))
+ self.failUnless(f.NonMatching(["a", "1"]))
+
+class TestForceDictType(unittest.TestCase):
+ """Test case for ForceDictType"""
+
+ def setUp(self):
+ self.key_types = {
+ 'a': constants.VTYPE_INT,
+ 'b': constants.VTYPE_BOOL,
+ 'c': constants.VTYPE_STRING,
+ 'd': constants.VTYPE_SIZE,
+ }
+
+ def _fdt(self, dict, allowed_values=None):
+ if allowed_values is None:
+ ForceDictType(dict, self.key_types)
+ else:
+ ForceDictType(dict, self.key_types, allowed_values=allowed_values)
+
+ return dict
+
+ def testSimpleDict(self):
+ self.assertEqual(self._fdt({}), {})
+ self.assertEqual(self._fdt({'a': 1}), {'a': 1})
+ self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
+ self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
+ self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
+ self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
+ self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
+ self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
+ self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
+ self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
+ self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
+ self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
+
+ def testErrors(self):
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
+
+
+class TestIsAbsNormPath(unittest.TestCase):
+ """Testing case for IsProcessAlive"""
+
+ def _pathTestHelper(self, path, result):
+ if result:
+ self.assert_(IsNormAbsPath(path),
+ "Path %s should result absolute and normalized" % path)
+ else:
+ self.assert_(not IsNormAbsPath(path),
+ "Path %s should not result absolute and normalized" % path)
+
+ def testBase(self):
+ self._pathTestHelper('/etc', True)
+ self._pathTestHelper('/srv', True)
+ self._pathTestHelper('etc', False)
+ self._pathTestHelper('/etc/../root', False)
+ self._pathTestHelper('/etc/', False)
+