Introduce utils.IsValidIP{4,6}()
[ganeti-local] / test / ganeti.utils_unittest.py
index 2c964d5..e0cdd0d 100755 (executable)
@@ -75,8 +75,8 @@ class TestIsProcessAlive(unittest.TestCase):
     elif pid_non_existing < 0:
       raise SystemError("can't fork")
     os.waitpid(pid_non_existing, 0)
-    self.assert_(not IsProcessAlive(pid_non_existing),
-                 "nonexisting process detected")
+    self.assertFalse(IsProcessAlive(pid_non_existing),
+                     "nonexisting process detected")
 
 
 class TestGetProcStatusPath(unittest.TestCase):
@@ -1169,7 +1169,7 @@ class TestTcpPing(unittest.TestCase):
 
   def setUp(self):
     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
+    self.listener.bind((constants.IP4_ADDRESS_LOCALHOST, 0))
     self.listenerport = self.listener.getsockname()[1]
     self.listener.listen(1)
 
@@ -1179,15 +1179,15 @@ class TestTcpPing(unittest.TestCase):
     del self.listenerport
 
   def testTcpPingToLocalHostAccept(self):
-    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+    self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
                          self.listenerport,
                          timeout=10,
                          live_port_needed=True,
-                         source=constants.LOCALHOST_IP_ADDRESS,
+                         source=constants.IP4_ADDRESS_LOCALHOST,
                          ),
                  "failed to connect to test listener")
 
-    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+    self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
                          self.listenerport,
                          timeout=10,
                          live_port_needed=True,
@@ -1200,7 +1200,7 @@ class TestTcpPingDeaf(unittest.TestCase):
 
   def setUp(self):
     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
+    self.deaflistener.bind((constants.IP4_ADDRESS_LOCALHOST, 0))
     self.deaflistenerport = self.deaflistener.getsockname()[1]
 
   def tearDown(self):
@@ -1208,15 +1208,15 @@ class TestTcpPingDeaf(unittest.TestCase):
     del self.deaflistenerport
 
   def testTcpPingToLocalHostAcceptDeaf(self):
-    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+    self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
                         self.deaflistenerport,
                         timeout=constants.TCP_PING_TIMEOUT,
                         live_port_needed=True,
-                        source=constants.LOCALHOST_IP_ADDRESS,
+                        source=constants.IP4_ADDRESS_LOCALHOST,
                         ), # need successful connect(2)
                 "successfully connected to deaf listener")
 
-    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+    self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
                         self.deaflistenerport,
                         timeout=constants.TCP_PING_TIMEOUT,
                         live_port_needed=True,
@@ -1224,15 +1224,15 @@ class TestTcpPingDeaf(unittest.TestCase):
                 "successfully connected to deaf listener (no source addr)")
 
   def testTcpPingToLocalHostNoAccept(self):
-    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+    self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
                          self.deaflistenerport,
                          timeout=constants.TCP_PING_TIMEOUT,
                          live_port_needed=False,
-                         source=constants.LOCALHOST_IP_ADDRESS,
+                         source=constants.IP4_ADDRESS_LOCALHOST,
                          ), # ECONNREFUSED is OK
                  "failed to ping alive host on deaf port")
 
-    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+    self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
                          self.deaflistenerport,
                          timeout=constants.TCP_PING_TIMEOUT,
                          live_port_needed=False,
@@ -1245,7 +1245,7 @@ class TestOwnIpAddress(unittest.TestCase):
 
   def testOwnLoopback(self):
     """check having the loopback ip"""
-    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
+    self.failUnless(OwnIpAddress(constants.IP4_ADDRESS_LOCALHOST),
                     "Should own the loopback address")
 
   def testNowOwnAddress(self):
@@ -1334,22 +1334,14 @@ class TestListVisibleFiles(unittest.TestCase):
   def tearDown(self):
     shutil.rmtree(self.path)
 
-  def _test(self, files, expected):
-    # Sort a copy
-    expected = expected[:]
-    expected.sort()
-
+  def _CreateFiles(self, files):
     for name in files:
-      f = open(os.path.join(self.path, name), 'w')
-      try:
-        f.write("Test\n")
-      finally:
-        f.close()
+      utils.WriteFile(os.path.join(self.path, name), data="test")
 
+  def _test(self, files, expected):
+    self._CreateFiles(files)
     found = ListVisibleFiles(self.path)
-    found.sort()
-
-    self.assertEqual(found, expected)
+    self.assertEqual(set(found), set(expected))
 
   def testAllVisible(self):
     files = ["a", "b", "c"]
@@ -1675,7 +1667,7 @@ class TestIsNormAbsPath(unittest.TestCase):
       self.assert_(IsNormAbsPath(path),
           "Path %s should result absolute and normalized" % path)
     else:
-      self.assert_(not IsNormAbsPath(path),
+      self.assertFalse(IsNormAbsPath(path),
           "Path %s should not result absolute and normalized" % path)
 
   def testBase(self):
@@ -1903,6 +1895,32 @@ class TestHostInfo(unittest.TestCase):
       HostInfo.NormalizeName(value)
 
 
+class TestValidateServiceName(unittest.TestCase):
+  def testValid(self):
+    testnames = [
+      0, 1, 2, 3, 1024, 65000, 65534, 65535,
+      "ganeti",
+      "gnt-masterd",
+      "HELLO_WORLD_SVC",
+      "hello.world.1",
+      "0", "80", "1111", "65535",
+      ]
+
+    for name in testnames:
+      self.assertEqual(utils.ValidateServiceName(name), name)
+
+  def testInvalid(self):
+    testnames = [
+      -15756, -1, 65536, 133428083,
+      "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
+      "-8546", "-1", "65536",
+      (129 * "A"),
+      ]
+
+    for name in testnames:
+      self.assertRaises(OpPrereqError, utils.ValidateServiceName, name)
+
+
 class TestParseAsn1Generalizedtime(unittest.TestCase):
   def test(self):
     # UTC
@@ -2046,7 +2064,7 @@ class TestMakedirs(unittest.TestCase):
 
   def testRecursiveExisting(self):
     path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
-    self.assert_(not os.path.exists(path))
+    self.assertFalse(os.path.exists(path))
     os.mkdir(utils.PathJoin(self.tmpdir, "B"))
     utils.Makedirs(path)
     self.assert_(os.path.isdir(path))
@@ -2364,5 +2382,87 @@ class TestEnsureDirs(unittest.TestCase):
     os.rmdir(self.dir)
     os.umask(self.old_umask)
 
+
+class TestFormatSeconds(unittest.TestCase):
+  def test(self):
+    self.assertEqual(utils.FormatSeconds(1), "1s")
+    self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
+    self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
+    self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
+    self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
+    self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
+    self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
+    self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
+    self.assertEqual(utils.FormatSeconds(-1), "-1s")
+    self.assertEqual(utils.FormatSeconds(-282), "-282s")
+    self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
+
+  def testFloat(self):
+    self.assertEqual(utils.FormatSeconds(1.3), "1s")
+    self.assertEqual(utils.FormatSeconds(1.9), "2s")
+    self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
+    self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
+
+
+class RunIgnoreProcessNotFound(unittest.TestCase):
+  @staticmethod
+  def _WritePid(fd):
+    os.write(fd, str(os.getpid()))
+    os.close(fd)
+    return True
+
+  def test(self):
+    (pid_read_fd, pid_write_fd) = os.pipe()
+
+    # Start short-lived process which writes its PID to pipe
+    self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
+    os.close(pid_write_fd)
+
+    # Read PID from pipe
+    pid = int(os.read(pid_read_fd, 1024))
+    os.close(pid_read_fd)
+
+    # Try to send signal to process which exited recently
+    self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
+
+
+class TestIsValidIP4(unittest.TestCase):
+  def test(self):
+    self.assert_(utils.IsValidIP4("127.0.0.1"))
+    self.assert_(utils.IsValidIP4("0.0.0.0"))
+    self.assert_(utils.IsValidIP4("255.255.255.255"))
+    self.assertFalse(utils.IsValidIP4("0"))
+    self.assertFalse(utils.IsValidIP4("1"))
+    self.assertFalse(utils.IsValidIP4("1.1.1"))
+    self.assertFalse(utils.IsValidIP4("255.255.255.256"))
+    self.assertFalse(utils.IsValidIP4("::1"))
+
+
+class TestIsValidIP6(unittest.TestCase):
+  def test(self):
+    self.assert_(utils.IsValidIP6("::"))
+    self.assert_(utils.IsValidIP6("::1"))
+    self.assert_(utils.IsValidIP6("1" + (":1" * 7)))
+    self.assert_(utils.IsValidIP6("ffff" + (":ffff" * 7)))
+    self.assertFalse(utils.IsValidIP6("0"))
+    self.assertFalse(utils.IsValidIP6(":1"))
+    self.assertFalse(utils.IsValidIP6("f" + (":f" * 6)))
+    self.assertFalse(utils.IsValidIP6("fffg" + (":ffff" * 7)))
+    self.assertFalse(utils.IsValidIP6("fffff" + (":ffff" * 7)))
+    self.assertFalse(utils.IsValidIP6("1" + (":1" * 8)))
+    self.assertFalse(utils.IsValidIP6("127.0.0.1"))
+
+
+class TestIsValidIP(unittest.TestCase):
+  def test(self):
+    self.assert_(utils.IsValidIP("0.0.0.0"))
+    self.assert_(utils.IsValidIP("127.0.0.1"))
+    self.assert_(utils.IsValidIP("::"))
+    self.assert_(utils.IsValidIP("::1"))
+    self.assertFalse(utils.IsValidIP("0"))
+    self.assertFalse(utils.IsValidIP("1.1.1.256"))
+    self.assertFalse(utils.IsValidIP("a:g::1"))
+
+
 if __name__ == '__main__':
   testutils.GanetiTestProgram()