Introduce utils.IsValidIP{4,6}()
[ganeti-local] / test / ganeti.utils_unittest.py
index d72232b..e0cdd0d 100755 (executable)
@@ -75,8 +75,106 @@ class TestIsProcessAlive(unittest.TestCase):
     elif pid_non_existing < 0:
       raise SystemError("can't fork")
     os.waitpid(pid_non_existing, 0)
-    self.assert_(not IsProcessAlive(pid_non_existing),
-                 "nonexisting process detected")
+    self.assertFalse(IsProcessAlive(pid_non_existing),
+                     "nonexisting process detected")
+
+
+class TestGetProcStatusPath(unittest.TestCase):
+  def test(self):
+    self.assert_("/1234/" in utils._GetProcStatusPath(1234))
+    self.assertNotEqual(utils._GetProcStatusPath(1),
+                        utils._GetProcStatusPath(2))
+
+
+class TestIsProcessHandlingSignal(unittest.TestCase):
+  def setUp(self):
+    self.tmpdir = tempfile.mkdtemp()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+
+  def testParseSigsetT(self):
+    self.assertEqual(len(utils._ParseSigsetT("0")), 0)
+    self.assertEqual(utils._ParseSigsetT("1"), set([1]))
+    self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
+    self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
+    self.assertEqual(utils._ParseSigsetT("0000000180000202"),
+                     set([2, 10, 32, 33]))
+    self.assertEqual(utils._ParseSigsetT("0000000180000002"),
+                     set([2, 32, 33]))
+    self.assertEqual(utils._ParseSigsetT("0000000188000002"),
+                     set([2, 28, 32, 33]))
+    self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
+                     set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
+                          24, 25, 26, 28, 31]))
+    self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
+
+  def testGetProcStatusField(self):
+    for field in ["SigCgt", "Name", "FDSize"]:
+      for value in ["", "0", "cat", "  1234 KB"]:
+        pstatus = "\n".join([
+          "VmPeak: 999 kB",
+          "%s: %s" % (field, value),
+          "TracerPid: 0",
+          ])
+        result = utils._GetProcStatusField(pstatus, field)
+        self.assertEqual(result, value.strip())
+
+  def test(self):
+    sp = utils.PathJoin(self.tmpdir, "status")
+
+    utils.WriteFile(sp, data="\n".join([
+      "Name:   bash",
+      "State:  S (sleeping)",
+      "SleepAVG:       98%",
+      "Pid:    22250",
+      "PPid:   10858",
+      "TracerPid:      0",
+      "SigBlk: 0000000000010000",
+      "SigIgn: 0000000000384004",
+      "SigCgt: 000000004b813efb",
+      "CapEff: 0000000000000000",
+      ]))
+
+    self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
+
+  def testNoSigCgt(self):
+    sp = utils.PathJoin(self.tmpdir, "status")
+
+    utils.WriteFile(sp, data="\n".join([
+      "Name:   bash",
+      ]))
+
+    self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
+                      1234, 10, status_path=sp)
+
+  def testNoSuchFile(self):
+    sp = utils.PathJoin(self.tmpdir, "notexist")
+
+    self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
+
+  @staticmethod
+  def _TestRealProcess():
+    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
+    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
+      raise Exception("SIGUSR1 is handled when it should not be")
+
+    signal.signal(signal.SIGUSR1, lambda signum, frame: None)
+    if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
+      raise Exception("SIGUSR1 is not handled when it should be")
+
+    signal.signal(signal.SIGUSR1, signal.SIG_IGN)
+    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
+      raise Exception("SIGUSR1 is not handled when it should be")
+
+    signal.signal(signal.SIGUSR1, signal.SIG_DFL)
+    if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
+      raise Exception("SIGUSR1 is handled when it should not be")
+
+    return True
+
+  def testRealProcess(self):
+    self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
 
 
 class TestPidFileFunctions(unittest.TestCase):
@@ -1071,7 +1169,7 @@ class TestTcpPing(unittest.TestCase):
 
   def setUp(self):
     self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
+    self.listener.bind((constants.IP4_ADDRESS_LOCALHOST, 0))
     self.listenerport = self.listener.getsockname()[1]
     self.listener.listen(1)
 
@@ -1081,15 +1179,15 @@ class TestTcpPing(unittest.TestCase):
     del self.listenerport
 
   def testTcpPingToLocalHostAccept(self):
-    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+    self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
                          self.listenerport,
                          timeout=10,
                          live_port_needed=True,
-                         source=constants.LOCALHOST_IP_ADDRESS,
+                         source=constants.IP4_ADDRESS_LOCALHOST,
                          ),
                  "failed to connect to test listener")
 
-    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+    self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
                          self.listenerport,
                          timeout=10,
                          live_port_needed=True,
@@ -1102,7 +1200,7 @@ class TestTcpPingDeaf(unittest.TestCase):
 
   def setUp(self):
     self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
+    self.deaflistener.bind((constants.IP4_ADDRESS_LOCALHOST, 0))
     self.deaflistenerport = self.deaflistener.getsockname()[1]
 
   def tearDown(self):
@@ -1110,15 +1208,15 @@ class TestTcpPingDeaf(unittest.TestCase):
     del self.deaflistenerport
 
   def testTcpPingToLocalHostAcceptDeaf(self):
-    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+    self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
                         self.deaflistenerport,
                         timeout=constants.TCP_PING_TIMEOUT,
                         live_port_needed=True,
-                        source=constants.LOCALHOST_IP_ADDRESS,
+                        source=constants.IP4_ADDRESS_LOCALHOST,
                         ), # need successful connect(2)
                 "successfully connected to deaf listener")
 
-    self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+    self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
                         self.deaflistenerport,
                         timeout=constants.TCP_PING_TIMEOUT,
                         live_port_needed=True,
@@ -1126,15 +1224,15 @@ class TestTcpPingDeaf(unittest.TestCase):
                 "successfully connected to deaf listener (no source addr)")
 
   def testTcpPingToLocalHostNoAccept(self):
-    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+    self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
                          self.deaflistenerport,
                          timeout=constants.TCP_PING_TIMEOUT,
                          live_port_needed=False,
-                         source=constants.LOCALHOST_IP_ADDRESS,
+                         source=constants.IP4_ADDRESS_LOCALHOST,
                          ), # ECONNREFUSED is OK
                  "failed to ping alive host on deaf port")
 
-    self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+    self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
                          self.deaflistenerport,
                          timeout=constants.TCP_PING_TIMEOUT,
                          live_port_needed=False,
@@ -1147,7 +1245,7 @@ class TestOwnIpAddress(unittest.TestCase):
 
   def testOwnLoopback(self):
     """check having the loopback ip"""
-    self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
+    self.failUnless(OwnIpAddress(constants.IP4_ADDRESS_LOCALHOST),
                     "Should own the loopback address")
 
   def testNowOwnAddress(self):
@@ -1236,22 +1334,14 @@ class TestListVisibleFiles(unittest.TestCase):
   def tearDown(self):
     shutil.rmtree(self.path)
 
-  def _test(self, files, expected):
-    # Sort a copy
-    expected = expected[:]
-    expected.sort()
-
+  def _CreateFiles(self, files):
     for name in files:
-      f = open(os.path.join(self.path, name), 'w')
-      try:
-        f.write("Test\n")
-      finally:
-        f.close()
+      utils.WriteFile(os.path.join(self.path, name), data="test")
 
+  def _test(self, files, expected):
+    self._CreateFiles(files)
     found = ListVisibleFiles(self.path)
-    found.sort()
-
-    self.assertEqual(found, expected)
+    self.assertEqual(set(found), set(expected))
 
   def testAllVisible(self):
     files = ["a", "b", "c"]
@@ -1577,7 +1667,7 @@ class TestIsNormAbsPath(unittest.TestCase):
       self.assert_(IsNormAbsPath(path),
           "Path %s should result absolute and normalized" % path)
     else:
-      self.assert_(not IsNormAbsPath(path),
+      self.assertFalse(IsNormAbsPath(path),
           "Path %s should not result absolute and normalized" % path)
 
   def testBase(self):
@@ -1805,6 +1895,32 @@ class TestHostInfo(unittest.TestCase):
       HostInfo.NormalizeName(value)
 
 
+class TestValidateServiceName(unittest.TestCase):
+  def testValid(self):
+    testnames = [
+      0, 1, 2, 3, 1024, 65000, 65534, 65535,
+      "ganeti",
+      "gnt-masterd",
+      "HELLO_WORLD_SVC",
+      "hello.world.1",
+      "0", "80", "1111", "65535",
+      ]
+
+    for name in testnames:
+      self.assertEqual(utils.ValidateServiceName(name), name)
+
+  def testInvalid(self):
+    testnames = [
+      -15756, -1, 65536, 133428083,
+      "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
+      "-8546", "-1", "65536",
+      (129 * "A"),
+      ]
+
+    for name in testnames:
+      self.assertRaises(OpPrereqError, utils.ValidateServiceName, name)
+
+
 class TestParseAsn1Generalizedtime(unittest.TestCase):
   def test(self):
     # UTC
@@ -1948,7 +2064,7 @@ class TestMakedirs(unittest.TestCase):
 
   def testRecursiveExisting(self):
     path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
-    self.assert_(not os.path.exists(path))
+    self.assertFalse(os.path.exists(path))
     os.mkdir(utils.PathJoin(self.tmpdir, "B"))
     utils.Makedirs(path)
     self.assert_(os.path.isdir(path))
@@ -2224,9 +2340,7 @@ class TestIgnoreSignals(unittest.TestCase):
 
   def testIgnoreSignals(self):
     sock_err_intr = socket.error(errno.EINTR, "Message")
-    sock_err_intr.errno = errno.EINTR
     sock_err_inval = socket.error(errno.EINVAL, "Message")
-    sock_err_inval.errno = errno.EINVAL
 
     env_err_intr = EnvironmentError(errno.EINTR, "Message")
     env_err_inval = EnvironmentError(errno.EINVAL, "Message")
@@ -2247,5 +2361,108 @@ class TestIgnoreSignals(unittest.TestCase):
     self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
 
 
+class TestEnsureDirs(unittest.TestCase):
+  """Tests for EnsureDirs"""
+
+  def setUp(self):
+    self.dir = tempfile.mkdtemp()
+    self.old_umask = os.umask(0777)
+
+  def testEnsureDirs(self):
+    utils.EnsureDirs([
+        (utils.PathJoin(self.dir, "foo"), 0777),
+        (utils.PathJoin(self.dir, "bar"), 0000),
+        ])
+    self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
+    self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
+
+  def tearDown(self):
+    os.rmdir(utils.PathJoin(self.dir, "foo"))
+    os.rmdir(utils.PathJoin(self.dir, "bar"))
+    os.rmdir(self.dir)
+    os.umask(self.old_umask)
+
+
+class TestFormatSeconds(unittest.TestCase):
+  def test(self):
+    self.assertEqual(utils.FormatSeconds(1), "1s")
+    self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
+    self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
+    self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
+    self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
+    self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
+    self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
+    self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
+    self.assertEqual(utils.FormatSeconds(-1), "-1s")
+    self.assertEqual(utils.FormatSeconds(-282), "-282s")
+    self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
+
+  def testFloat(self):
+    self.assertEqual(utils.FormatSeconds(1.3), "1s")
+    self.assertEqual(utils.FormatSeconds(1.9), "2s")
+    self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
+    self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
+
+
+class RunIgnoreProcessNotFound(unittest.TestCase):
+  @staticmethod
+  def _WritePid(fd):
+    os.write(fd, str(os.getpid()))
+    os.close(fd)
+    return True
+
+  def test(self):
+    (pid_read_fd, pid_write_fd) = os.pipe()
+
+    # Start short-lived process which writes its PID to pipe
+    self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
+    os.close(pid_write_fd)
+
+    # Read PID from pipe
+    pid = int(os.read(pid_read_fd, 1024))
+    os.close(pid_read_fd)
+
+    # Try to send signal to process which exited recently
+    self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
+
+
+class TestIsValidIP4(unittest.TestCase):
+  def test(self):
+    self.assert_(utils.IsValidIP4("127.0.0.1"))
+    self.assert_(utils.IsValidIP4("0.0.0.0"))
+    self.assert_(utils.IsValidIP4("255.255.255.255"))
+    self.assertFalse(utils.IsValidIP4("0"))
+    self.assertFalse(utils.IsValidIP4("1"))
+    self.assertFalse(utils.IsValidIP4("1.1.1"))
+    self.assertFalse(utils.IsValidIP4("255.255.255.256"))
+    self.assertFalse(utils.IsValidIP4("::1"))
+
+
+class TestIsValidIP6(unittest.TestCase):
+  def test(self):
+    self.assert_(utils.IsValidIP6("::"))
+    self.assert_(utils.IsValidIP6("::1"))
+    self.assert_(utils.IsValidIP6("1" + (":1" * 7)))
+    self.assert_(utils.IsValidIP6("ffff" + (":ffff" * 7)))
+    self.assertFalse(utils.IsValidIP6("0"))
+    self.assertFalse(utils.IsValidIP6(":1"))
+    self.assertFalse(utils.IsValidIP6("f" + (":f" * 6)))
+    self.assertFalse(utils.IsValidIP6("fffg" + (":ffff" * 7)))
+    self.assertFalse(utils.IsValidIP6("fffff" + (":ffff" * 7)))
+    self.assertFalse(utils.IsValidIP6("1" + (":1" * 8)))
+    self.assertFalse(utils.IsValidIP6("127.0.0.1"))
+
+
+class TestIsValidIP(unittest.TestCase):
+  def test(self):
+    self.assert_(utils.IsValidIP("0.0.0.0"))
+    self.assert_(utils.IsValidIP("127.0.0.1"))
+    self.assert_(utils.IsValidIP("::"))
+    self.assert_(utils.IsValidIP("::1"))
+    self.assertFalse(utils.IsValidIP("0"))
+    self.assertFalse(utils.IsValidIP("1.1.1.256"))
+    self.assertFalse(utils.IsValidIP("a:g::1"))
+
+
 if __name__ == '__main__':
   testutils.GanetiTestProgram()