utils: Add function to find items in dictionary using regex
[ganeti-local] / test / ganeti.utils_unittest.py
index 5ff5fc9..08c753e 100755 (executable)
@@ -177,13 +177,15 @@ class TestPidFileFunctions(unittest.TestCase):
 
   def testPidFileFunctions(self):
     pid_file = self.f_dpn('test')
-    utils.WritePidFile('test')
+    fd = utils.WritePidFile(self.f_dpn('test'))
     self.failUnless(os.path.exists(pid_file),
                     "PID file should have been created")
     read_pid = utils.ReadPidFile(pid_file)
     self.failUnlessEqual(read_pid, os.getpid())
     self.failUnless(utils.IsProcessAlive(read_pid))
-    self.failUnlessRaises(errors.GenericError, utils.WritePidFile, 'test')
+    self.failUnlessRaises(errors.LockError, utils.WritePidFile,
+                          self.f_dpn('test'))
+    os.close(fd)
     utils.RemovePidFile('test')
     self.failIf(os.path.exists(pid_file),
                 "PID file should not exist anymore")
@@ -194,6 +196,9 @@ class TestPidFileFunctions(unittest.TestCase):
     fh.close()
     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
                          "ReadPidFile should return 0 for invalid pid file")
+    # but now, even with the file existing, we should be able to lock it
+    fd = utils.WritePidFile(self.f_dpn('test'))
+    os.close(fd)
     utils.RemovePidFile('test')
     self.failIf(os.path.exists(pid_file),
                 "PID file should not exist anymore")
@@ -203,7 +208,7 @@ class TestPidFileFunctions(unittest.TestCase):
     r_fd, w_fd = os.pipe()
     new_pid = os.fork()
     if new_pid == 0: #child
-      utils.WritePidFile('child')
+      utils.WritePidFile(self.f_dpn('child'))
       os.write(w_fd, 'a')
       signal.pause()
       os._exit(0)
@@ -996,6 +1001,28 @@ class TestParseUnit(unittest.TestCase):
       self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix)
 
 
+class TestParseCpuMask(unittest.TestCase):
+  """Test case for the ParseCpuMask function."""
+
+  def testWellFormed(self):
+    self.assertEqual(utils.ParseCpuMask(""), [])
+    self.assertEqual(utils.ParseCpuMask("1"), [1])
+    self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
+
+  def testInvalidInput(self):
+    self.assertRaises(errors.ParseError,
+                      utils.ParseCpuMask,
+                      "garbage")
+    self.assertRaises(errors.ParseError,
+                      utils.ParseCpuMask,
+                      "0,")
+    self.assertRaises(errors.ParseError,
+                      utils.ParseCpuMask,
+                      "0-1-2")
+    self.assertRaises(errors.ParseError,
+                      utils.ParseCpuMask,
+                      "2-1")
+
 class TestSshKeys(testutils.GanetiTestCase):
   """Test case for the AddAuthorizedKey function"""
 
@@ -1222,11 +1249,8 @@ class TestListVisibleFiles(unittest.TestCase):
 class TestNewUUID(unittest.TestCase):
   """Test case for NewUUID"""
 
-  _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
-                        '[a-f0-9]{4}-[a-f0-9]{12}$')
-
   def runTest(self):
-    self.failUnless(self._re_uuid.match(utils.NewUUID()))
+    self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
 
 
 class TestUniqueSequence(unittest.TestCase):
@@ -1481,6 +1505,7 @@ class TestForceDictType(unittest.TestCase):
       'b': constants.VTYPE_BOOL,
       'c': constants.VTYPE_STRING,
       'd': constants.VTYPE_SIZE,
+      "e": constants.VTYPE_MAYBE_STRING,
       }
 
   def _fdt(self, dict, allowed_values=None):
@@ -1504,12 +1529,17 @@ class TestForceDictType(unittest.TestCase):
     self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
     self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
     self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
+    self.assertEqual(self._fdt({"e": None, }), {"e": None, })
+    self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
+    self.assertEqual(self._fdt({"e": False, }), {"e": '', })
 
   def testErrors(self):
     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
     self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
+    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
+    self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
 
 
 class TestIsNormAbsPath(unittest.TestCase):
@@ -2286,5 +2316,45 @@ class TestShellWriter(unittest.TestCase):
     self.assertEqual(buf.getvalue(), "")
 
 
+class TestCommaJoin(unittest.TestCase):
+  def test(self):
+    self.assertEqual(utils.CommaJoin([]), "")
+    self.assertEqual(utils.CommaJoin([1, 2, 3]), "1, 2, 3")
+    self.assertEqual(utils.CommaJoin(["Hello"]), "Hello")
+    self.assertEqual(utils.CommaJoin(["Hello", "World"]), "Hello, World")
+    self.assertEqual(utils.CommaJoin(["Hello", "World", 99]),
+                     "Hello, World, 99")
+
+
+class TestFindMatch(unittest.TestCase):
+  def test(self):
+    data = {
+      "aaaa": "Four A",
+      "bb": {"Two B": True},
+      re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
+      }
+
+    self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
+    self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
+
+    for i in ["foo", "bar", "bazX"]:
+      for j in range(1, 100, 7):
+        self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
+                         ((1, 2, 3), [i, str(j)]))
+
+  def testNoMatch(self):
+    self.assert_(utils.FindMatch({}, "") is None)
+    self.assert_(utils.FindMatch({}, "foo") is None)
+    self.assert_(utils.FindMatch({}, 1234) is None)
+
+    data = {
+      "X": "Hello World",
+      re.compile("^(something)$"): "Hello World",
+      }
+
+    self.assert_(utils.FindMatch(data, "") is None)
+    self.assert_(utils.FindMatch(data, "Hello World") is None)
+
+
 if __name__ == '__main__':
   testutils.GanetiTestProgram()