import unittest
import warnings
import OpenSSL
+from cStringIO import StringIO
import testutils
from ganeti import constants
self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix)
+class TestParseCpuMask(unittest.TestCase):
+ """Test case for the ParseCpuMask function."""
+
+ def testWellFormed(self):
+ self.assertEqual(utils.ParseCpuMask(""), [])
+ self.assertEqual(utils.ParseCpuMask("1"), [1])
+ self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
+
+ def testInvalidInput(self):
+ self.assertRaises(errors.ParseError,
+ utils.ParseCpuMask,
+ "garbage")
+ self.assertRaises(errors.ParseError,
+ utils.ParseCpuMask,
+ "0,")
+ self.assertRaises(errors.ParseError,
+ utils.ParseCpuMask,
+ "0-1-2")
+ self.assertRaises(errors.ParseError,
+ utils.ParseCpuMask,
+ "2-1")
+
class TestSshKeys(testutils.GanetiTestCase):
"""Test case for the AddAuthorizedKey function"""
'b': constants.VTYPE_BOOL,
'c': constants.VTYPE_STRING,
'd': constants.VTYPE_SIZE,
+ "e": constants.VTYPE_MAYBE_STRING,
}
def _fdt(self, dict, allowed_values=None):
self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
+ self.assertEqual(self._fdt({"e": None, }), {"e": None, })
+ self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
+ self.assertEqual(self._fdt({"e": False, }), {"e": '', })
def testErrors(self):
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
+ self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
class TestIsNormAbsPath(unittest.TestCase):
self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
-class RunIgnoreProcessNotFound(unittest.TestCase):
+class TestIgnoreProcessNotFound(unittest.TestCase):
@staticmethod
def _WritePid(fd):
os.write(fd, str(os.getpid()))
self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
+class TestShellWriter(unittest.TestCase):
+ def test(self):
+ buf = StringIO()
+ sw = utils.ShellWriter(buf)
+ sw.Write("#!/bin/bash")
+ sw.Write("if true; then")
+ sw.IncIndent()
+ try:
+ sw.Write("echo true")
+
+ sw.Write("for i in 1 2 3")
+ sw.Write("do")
+ sw.IncIndent()
+ try:
+ self.assertEqual(sw._indent, 2)
+ sw.Write("date")
+ finally:
+ sw.DecIndent()
+ sw.Write("done")
+ finally:
+ sw.DecIndent()
+ sw.Write("echo %s", utils.ShellQuote("Hello World"))
+ sw.Write("exit 0")
+
+ self.assertEqual(sw._indent, 0)
+
+ output = buf.getvalue()
+
+ self.assert_(output.endswith("\n"))
+
+ lines = output.splitlines()
+ self.assertEqual(len(lines), 9)
+ self.assertEqual(lines[0], "#!/bin/bash")
+ self.assert_(re.match(r"^\s+date$", lines[5]))
+ self.assertEqual(lines[7], "echo 'Hello World'")
+
+ def testEmpty(self):
+ buf = StringIO()
+ sw = utils.ShellWriter(buf)
+ sw = None
+ self.assertEqual(buf.getvalue(), "")
+
+
if __name__ == '__main__':
testutils.GanetiTestProgram()