def testPidFileFunctions(self):
pid_file = self.f_dpn('test')
- utils.WritePidFile('test')
+ fd = utils.WritePidFile(self.f_dpn('test'))
self.failUnless(os.path.exists(pid_file),
"PID file should have been created")
read_pid = utils.ReadPidFile(pid_file)
self.failUnlessEqual(read_pid, os.getpid())
self.failUnless(utils.IsProcessAlive(read_pid))
- self.failUnlessRaises(errors.GenericError, utils.WritePidFile, 'test')
+ self.failUnlessRaises(errors.LockError, utils.WritePidFile,
+ self.f_dpn('test'))
+ os.close(fd)
utils.RemovePidFile('test')
self.failIf(os.path.exists(pid_file),
"PID file should not exist anymore")
fh.close()
self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
"ReadPidFile should return 0 for invalid pid file")
+ # but now, even with the file existing, we should be able to lock it
+ fd = utils.WritePidFile(self.f_dpn('test'))
+ os.close(fd)
utils.RemovePidFile('test')
self.failIf(os.path.exists(pid_file),
"PID file should not exist anymore")
r_fd, w_fd = os.pipe()
new_pid = os.fork()
if new_pid == 0: #child
- utils.WritePidFile('child')
+ utils.WritePidFile(self.f_dpn('child'))
os.write(w_fd, 'a')
signal.pause()
os._exit(0)
self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix)
+class TestParseCpuMask(unittest.TestCase):
+ """Test case for the ParseCpuMask function."""
+
+ def testWellFormed(self):
+ self.assertEqual(utils.ParseCpuMask(""), [])
+ self.assertEqual(utils.ParseCpuMask("1"), [1])
+ self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
+
+ def testInvalidInput(self):
+ self.assertRaises(errors.ParseError,
+ utils.ParseCpuMask,
+ "garbage")
+ self.assertRaises(errors.ParseError,
+ utils.ParseCpuMask,
+ "0,")
+ self.assertRaises(errors.ParseError,
+ utils.ParseCpuMask,
+ "0-1-2")
+ self.assertRaises(errors.ParseError,
+ utils.ParseCpuMask,
+ "2-1")
+
class TestSshKeys(testutils.GanetiTestCase):
"""Test case for the AddAuthorizedKey function"""
class TestNewUUID(unittest.TestCase):
"""Test case for NewUUID"""
- _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
- '[a-f0-9]{4}-[a-f0-9]{12}$')
-
def runTest(self):
- self.failUnless(self._re_uuid.match(utils.NewUUID()))
+ self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
class TestUniqueSequence(unittest.TestCase):
'b': constants.VTYPE_BOOL,
'c': constants.VTYPE_STRING,
'd': constants.VTYPE_SIZE,
+ "e": constants.VTYPE_MAYBE_STRING,
}
def _fdt(self, dict, allowed_values=None):
self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
+ self.assertEqual(self._fdt({"e": None, }), {"e": None, })
+ self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
+ self.assertEqual(self._fdt({"e": False, }), {"e": '', })
def testErrors(self):
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
class TestIsNormAbsPath(unittest.TestCase):
self.assertEqual(buf.getvalue(), "")
+class TestCommaJoin(unittest.TestCase):
+ def test(self):
+ self.assertEqual(utils.CommaJoin([]), "")
+ self.assertEqual(utils.CommaJoin([1, 2, 3]), "1, 2, 3")
+ self.assertEqual(utils.CommaJoin(["Hello"]), "Hello")
+ self.assertEqual(utils.CommaJoin(["Hello", "World"]), "Hello, World")
+ self.assertEqual(utils.CommaJoin(["Hello", "World", 99]),
+ "Hello, World, 99")
+
+
+class TestFindMatch(unittest.TestCase):
+ def test(self):
+ data = {
+ "aaaa": "Four A",
+ "bb": {"Two B": True},
+ re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
+ }
+
+ self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
+ self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
+
+ for i in ["foo", "bar", "bazX"]:
+ for j in range(1, 100, 7):
+ self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
+ ((1, 2, 3), [i, str(j)]))
+
+ def testNoMatch(self):
+ self.assert_(utils.FindMatch({}, "") is None)
+ self.assert_(utils.FindMatch({}, "foo") is None)
+ self.assert_(utils.FindMatch({}, 1234) is None)
+
+ data = {
+ "X": "Hello World",
+ re.compile("^(something)$"): "Hello World",
+ }
+
+ self.assert_(utils.FindMatch(data, "") is None)
+ self.assert_(utils.FindMatch(data, "Hello World") is None)
+
+
if __name__ == '__main__':
testutils.GanetiTestProgram()