burnin: move start_stop at the end
[ganeti-local] / test / ganeti.utils_unittest.py
index 21ec7c6..29fddfd 100755 (executable)
@@ -50,37 +50,11 @@ from ganeti.errors import LockError, UnitParseError, GenericError, \
 class TestIsProcessAlive(unittest.TestCase):
   """Testing case for IsProcessAlive"""
 
-  def _CreateZombie(self):
-    # create a zombie
-    r_fd, w_fd = os.pipe()
-    pid_zombie = os.fork()
-    if pid_zombie == 0:
-      # explicit close of read, write end will be closed only due to exit
-      os.close(r_fd)
-      os._exit(0)
-    elif pid_zombie < 0:
-      raise SystemError("can't fork")
-    # parent: we close our end of the w_fd, so reads will error as
-    # soon as the OS cleans the child's filedescriptors on its exit
-    os.close(w_fd)
-    # wait for 60 seconds at max for the exit (pathological case, just
-    # so that the test doesn't hang indefinitely)
-    r_set, w_set, e_set = select.select([r_fd], [], [], 60)
-    if not r_set and not e_set:
-      self.fail("Timeout exceeded in zombie creation")
-    return pid_zombie
-
   def testExists(self):
     mypid = os.getpid()
     self.assert_(IsProcessAlive(mypid),
                  "can't find myself running")
 
-  def testZombie(self):
-    pid_zombie = self._CreateZombie()
-    is_zombie = not IsProcessAlive(pid_zombie)
-    self.assert_(is_zombie, "zombie not detected as zombie")
-    os.waitpid(pid_zombie, os.WNOHANG)
-
   def testNotExisting(self):
     pid_non_existing = os.fork()
     if pid_non_existing == 0:
@@ -139,7 +113,7 @@ class TestPidFileFunctions(unittest.TestCase):
     read_pid = utils.ReadPidFile(pid_file)
     self.failUnlessEqual(read_pid, new_pid)
     self.failUnless(utils.IsProcessAlive(new_pid))
-    utils.KillProcess(new_pid)
+    utils.KillProcess(new_pid, waitpid=True)
     self.failIf(utils.IsProcessAlive(new_pid))
     utils.RemovePidFile('child')
     self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
@@ -303,6 +277,36 @@ class TestRemoveFile(unittest.TestCase):
       self.fail("File '%s' not removed" % symlink)
 
 
+class TestRename(unittest.TestCase):
+  """Test case for RenameFile"""
+
+  def setUp(self):
+    """Create a temporary directory"""
+    self.tmpdir = tempfile.mkdtemp()
+    self.tmpfile = os.path.join(self.tmpdir, "test1")
+
+    # Touch the file
+    open(self.tmpfile, "w").close()
+
+  def tearDown(self):
+    """Remove temporary directory"""
+    shutil.rmtree(self.tmpdir)
+
+  def testSimpleRename1(self):
+    """Simple rename 1"""
+    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
+
+  def testSimpleRename2(self):
+    """Simple rename 2"""
+    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
+                     mkdir=True)
+
+  def testRenameMkdir(self):
+    """Rename with mkdir"""
+    utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
+                     mkdir=True)
+
+
 class TestCheckdict(unittest.TestCase):
   """Test case for the CheckDict function"""
 
@@ -350,21 +354,42 @@ class TestFormatUnit(unittest.TestCase):
   """Test case for the FormatUnit function"""
 
   def testMiB(self):
-    self.assertEqual(FormatUnit(1), '1M')
-    self.assertEqual(FormatUnit(100), '100M')
-    self.assertEqual(FormatUnit(1023), '1023M')
+    self.assertEqual(FormatUnit(1, 'h'), '1M')
+    self.assertEqual(FormatUnit(100, 'h'), '100M')
+    self.assertEqual(FormatUnit(1023, 'h'), '1023M')
+
+    self.assertEqual(FormatUnit(1, 'm'), '1')
+    self.assertEqual(FormatUnit(100, 'm'), '100')
+    self.assertEqual(FormatUnit(1023, 'm'), '1023')
+
+    self.assertEqual(FormatUnit(1024, 'm'), '1024')
+    self.assertEqual(FormatUnit(1536, 'm'), '1536')
+    self.assertEqual(FormatUnit(17133, 'm'), '17133')
+    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
 
   def testGiB(self):
-    self.assertEqual(FormatUnit(1024), '1.0G')
-    self.assertEqual(FormatUnit(1536), '1.5G')
-    self.assertEqual(FormatUnit(17133), '16.7G')
-    self.assertEqual(FormatUnit(1024 * 1024 - 1), '1024.0G')
+    self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
+    self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
+    self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
+    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
+
+    self.assertEqual(FormatUnit(1024, 'g'), '1.0')
+    self.assertEqual(FormatUnit(1536, 'g'), '1.5')
+    self.assertEqual(FormatUnit(17133, 'g'), '16.7')
+    self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
+
+    self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
+    self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
+    self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
 
   def testTiB(self):
-    self.assertEqual(FormatUnit(1024 * 1024), '1.0T')
-    self.assertEqual(FormatUnit(5120 * 1024), '5.0T')
-    self.assertEqual(FormatUnit(29829 * 1024), '29.1T')
+    self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
+    self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
+    self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
 
+    self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
+    self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
+    self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
 
 class TestParseUnit(unittest.TestCase):
   """Test case for the ParseUnit function"""
@@ -850,5 +875,26 @@ class TestTimeFunctions(unittest.TestCase):
     self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
 
 
+class FieldSetTestCase(unittest.TestCase):
+  """Test case for FieldSets"""
+
+  def testSimpleMatch(self):
+    f = utils.FieldSet("a", "b", "c", "def")
+    self.failUnless(f.Matches("a"))
+    self.failIf(f.Matches("d"), "Substring matched")
+    self.failIf(f.Matches("defghi"), "Prefix string matched")
+    self.failIf(f.NonMatching(["b", "c"]))
+    self.failIf(f.NonMatching(["a", "b", "c", "def"]))
+    self.failUnless(f.NonMatching(["a", "d"]))
+
+  def testRegexMatch(self):
+    f = utils.FieldSet("a", "b([0-9]+)", "c")
+    self.failUnless(f.Matches("b1"))
+    self.failUnless(f.Matches("b99"))
+    self.failIf(f.Matches("b/1"))
+    self.failIf(f.NonMatching(["b12", "c"]))
+    self.failUnless(f.NonMatching(["a", "1"]))
+
+
 if __name__ == '__main__':
   unittest.main()