elif pid_non_existing < 0:
raise SystemError("can't fork")
os.waitpid(pid_non_existing, 0)
- self.assert_(not IsProcessAlive(pid_non_existing),
- "nonexisting process detected")
+ self.assertFalse(IsProcessAlive(pid_non_existing),
+ "nonexisting process detected")
+
+
+class TestGetProcStatusPath(unittest.TestCase):
+ def test(self):
+ self.assert_("/1234/" in utils._GetProcStatusPath(1234))
+ self.assertNotEqual(utils._GetProcStatusPath(1),
+ utils._GetProcStatusPath(2))
+
+
+class TestIsProcessHandlingSignal(unittest.TestCase):
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+
+ def testParseSigsetT(self):
+ self.assertEqual(len(utils._ParseSigsetT("0")), 0)
+ self.assertEqual(utils._ParseSigsetT("1"), set([1]))
+ self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
+ self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
+ self.assertEqual(utils._ParseSigsetT("0000000180000202"),
+ set([2, 10, 32, 33]))
+ self.assertEqual(utils._ParseSigsetT("0000000180000002"),
+ set([2, 32, 33]))
+ self.assertEqual(utils._ParseSigsetT("0000000188000002"),
+ set([2, 28, 32, 33]))
+ self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
+ set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
+ 24, 25, 26, 28, 31]))
+ self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
+
+ def testGetProcStatusField(self):
+ for field in ["SigCgt", "Name", "FDSize"]:
+ for value in ["", "0", "cat", " 1234 KB"]:
+ pstatus = "\n".join([
+ "VmPeak: 999 kB",
+ "%s: %s" % (field, value),
+ "TracerPid: 0",
+ ])
+ result = utils._GetProcStatusField(pstatus, field)
+ self.assertEqual(result, value.strip())
+
+ def test(self):
+ sp = utils.PathJoin(self.tmpdir, "status")
+
+ utils.WriteFile(sp, data="\n".join([
+ "Name: bash",
+ "State: S (sleeping)",
+ "SleepAVG: 98%",
+ "Pid: 22250",
+ "PPid: 10858",
+ "TracerPid: 0",
+ "SigBlk: 0000000000010000",
+ "SigIgn: 0000000000384004",
+ "SigCgt: 000000004b813efb",
+ "CapEff: 0000000000000000",
+ ]))
+
+ self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
+
+ def testNoSigCgt(self):
+ sp = utils.PathJoin(self.tmpdir, "status")
+
+ utils.WriteFile(sp, data="\n".join([
+ "Name: bash",
+ ]))
+
+ self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
+ 1234, 10, status_path=sp)
+
+ def testNoSuchFile(self):
+ sp = utils.PathJoin(self.tmpdir, "notexist")
+
+ self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
+
+ @staticmethod
+ def _TestRealProcess():
+ signal.signal(signal.SIGUSR1, signal.SIG_DFL)
+ if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
+ raise Exception("SIGUSR1 is handled when it should not be")
+
+ signal.signal(signal.SIGUSR1, lambda signum, frame: None)
+ if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
+ raise Exception("SIGUSR1 is not handled when it should be")
+
+ signal.signal(signal.SIGUSR1, signal.SIG_IGN)
+ if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
+ raise Exception("SIGUSR1 is not handled when it should be")
+
+ signal.signal(signal.SIGUSR1, signal.SIG_DFL)
+ if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
+ raise Exception("SIGUSR1 is handled when it should not be")
+
+ return True
+
+ def testRealProcess(self):
+ self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
class TestPidFileFunctions(unittest.TestCase):
def setUp(self):
self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
+ self.listener.bind((constants.IP4_ADDRESS_LOCALHOST, 0))
self.listenerport = self.listener.getsockname()[1]
self.listener.listen(1)
del self.listenerport
def testTcpPingToLocalHostAccept(self):
- self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+ self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
self.listenerport,
timeout=10,
live_port_needed=True,
- source=constants.LOCALHOST_IP_ADDRESS,
+ source=constants.IP4_ADDRESS_LOCALHOST,
),
"failed to connect to test listener")
- self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+ self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
self.listenerport,
timeout=10,
live_port_needed=True,
def setUp(self):
self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
+ self.deaflistener.bind((constants.IP4_ADDRESS_LOCALHOST, 0))
self.deaflistenerport = self.deaflistener.getsockname()[1]
def tearDown(self):
del self.deaflistenerport
def testTcpPingToLocalHostAcceptDeaf(self):
- self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+ self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
self.deaflistenerport,
timeout=constants.TCP_PING_TIMEOUT,
live_port_needed=True,
- source=constants.LOCALHOST_IP_ADDRESS,
+ source=constants.IP4_ADDRESS_LOCALHOST,
), # need successful connect(2)
"successfully connected to deaf listener")
- self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+ self.failIf(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
self.deaflistenerport,
timeout=constants.TCP_PING_TIMEOUT,
live_port_needed=True,
"successfully connected to deaf listener (no source addr)")
def testTcpPingToLocalHostNoAccept(self):
- self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+ self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
self.deaflistenerport,
timeout=constants.TCP_PING_TIMEOUT,
live_port_needed=False,
- source=constants.LOCALHOST_IP_ADDRESS,
+ source=constants.IP4_ADDRESS_LOCALHOST,
), # ECONNREFUSED is OK
"failed to ping alive host on deaf port")
- self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
+ self.assert_(TcpPing(constants.IP4_ADDRESS_LOCALHOST,
self.deaflistenerport,
timeout=constants.TCP_PING_TIMEOUT,
live_port_needed=False,
def testOwnLoopback(self):
"""check having the loopback ip"""
- self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
+ self.failUnless(OwnIpAddress(constants.IP4_ADDRESS_LOCALHOST),
"Should own the loopback address")
def testNowOwnAddress(self):
def tearDown(self):
shutil.rmtree(self.path)
- def _test(self, files, expected):
- # Sort a copy
- expected = expected[:]
- expected.sort()
-
+ def _CreateFiles(self, files):
for name in files:
- f = open(os.path.join(self.path, name), 'w')
- try:
- f.write("Test\n")
- finally:
- f.close()
+ utils.WriteFile(os.path.join(self.path, name), data="test")
+ def _test(self, files, expected):
+ self._CreateFiles(files)
found = ListVisibleFiles(self.path)
- found.sort()
-
- self.assertEqual(found, expected)
+ self.assertEqual(set(found), set(expected))
def testAllVisible(self):
files = ["a", "b", "c"]
self.assert_(IsNormAbsPath(path),
"Path %s should result absolute and normalized" % path)
else:
- self.assert_(not IsNormAbsPath(path),
+ self.assertFalse(IsNormAbsPath(path),
"Path %s should not result absolute and normalized" % path)
def testBase(self):
HostInfo.NormalizeName(value)
+class TestValidateServiceName(unittest.TestCase):
+ def testValid(self):
+ testnames = [
+ 0, 1, 2, 3, 1024, 65000, 65534, 65535,
+ "ganeti",
+ "gnt-masterd",
+ "HELLO_WORLD_SVC",
+ "hello.world.1",
+ "0", "80", "1111", "65535",
+ ]
+
+ for name in testnames:
+ self.assertEqual(utils.ValidateServiceName(name), name)
+
+ def testInvalid(self):
+ testnames = [
+ -15756, -1, 65536, 133428083,
+ "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
+ "-8546", "-1", "65536",
+ (129 * "A"),
+ ]
+
+ for name in testnames:
+ self.assertRaises(OpPrereqError, utils.ValidateServiceName, name)
+
+
class TestParseAsn1Generalizedtime(unittest.TestCase):
def test(self):
# UTC
def testRecursiveExisting(self):
path = utils.PathJoin(self.tmpdir, "B/moo/xyz")
- self.assert_(not os.path.exists(path))
+ self.assertFalse(os.path.exists(path))
os.mkdir(utils.PathJoin(self.tmpdir, "B"))
utils.Makedirs(path)
self.assert_(os.path.isdir(path))
def testIgnoreSignals(self):
sock_err_intr = socket.error(errno.EINTR, "Message")
- sock_err_intr.errno = errno.EINTR
sock_err_inval = socket.error(errno.EINVAL, "Message")
- sock_err_inval.errno = errno.EINVAL
env_err_intr = EnvironmentError(errno.EINTR, "Message")
env_err_inval = EnvironmentError(errno.EINVAL, "Message")
self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
+class TestEnsureDirs(unittest.TestCase):
+ """Tests for EnsureDirs"""
+
+ def setUp(self):
+ self.dir = tempfile.mkdtemp()
+ self.old_umask = os.umask(0777)
+
+ def testEnsureDirs(self):
+ utils.EnsureDirs([
+ (utils.PathJoin(self.dir, "foo"), 0777),
+ (utils.PathJoin(self.dir, "bar"), 0000),
+ ])
+ self.assertEquals(os.stat(utils.PathJoin(self.dir, "foo"))[0] & 0777, 0777)
+ self.assertEquals(os.stat(utils.PathJoin(self.dir, "bar"))[0] & 0777, 0000)
+
+ def tearDown(self):
+ os.rmdir(utils.PathJoin(self.dir, "foo"))
+ os.rmdir(utils.PathJoin(self.dir, "bar"))
+ os.rmdir(self.dir)
+ os.umask(self.old_umask)
+
+
+class TestFormatSeconds(unittest.TestCase):
+ def test(self):
+ self.assertEqual(utils.FormatSeconds(1), "1s")
+ self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
+ self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
+ self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
+ self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
+ self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
+ self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
+ self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
+ self.assertEqual(utils.FormatSeconds(-1), "-1s")
+ self.assertEqual(utils.FormatSeconds(-282), "-282s")
+ self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
+
+ def testFloat(self):
+ self.assertEqual(utils.FormatSeconds(1.3), "1s")
+ self.assertEqual(utils.FormatSeconds(1.9), "2s")
+ self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
+ self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
+
+
+class RunIgnoreProcessNotFound(unittest.TestCase):
+ @staticmethod
+ def _WritePid(fd):
+ os.write(fd, str(os.getpid()))
+ os.close(fd)
+ return True
+
+ def test(self):
+ (pid_read_fd, pid_write_fd) = os.pipe()
+
+ # Start short-lived process which writes its PID to pipe
+ self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
+ os.close(pid_write_fd)
+
+ # Read PID from pipe
+ pid = int(os.read(pid_read_fd, 1024))
+ os.close(pid_read_fd)
+
+ # Try to send signal to process which exited recently
+ self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
+
+
+class TestIsValidIP4(unittest.TestCase):
+ def test(self):
+ self.assert_(utils.IsValidIP4("127.0.0.1"))
+ self.assert_(utils.IsValidIP4("0.0.0.0"))
+ self.assert_(utils.IsValidIP4("255.255.255.255"))
+ self.assertFalse(utils.IsValidIP4("0"))
+ self.assertFalse(utils.IsValidIP4("1"))
+ self.assertFalse(utils.IsValidIP4("1.1.1"))
+ self.assertFalse(utils.IsValidIP4("255.255.255.256"))
+ self.assertFalse(utils.IsValidIP4("::1"))
+
+
+class TestIsValidIP6(unittest.TestCase):
+ def test(self):
+ self.assert_(utils.IsValidIP6("::"))
+ self.assert_(utils.IsValidIP6("::1"))
+ self.assert_(utils.IsValidIP6("1" + (":1" * 7)))
+ self.assert_(utils.IsValidIP6("ffff" + (":ffff" * 7)))
+ self.assertFalse(utils.IsValidIP6("0"))
+ self.assertFalse(utils.IsValidIP6(":1"))
+ self.assertFalse(utils.IsValidIP6("f" + (":f" * 6)))
+ self.assertFalse(utils.IsValidIP6("fffg" + (":ffff" * 7)))
+ self.assertFalse(utils.IsValidIP6("fffff" + (":ffff" * 7)))
+ self.assertFalse(utils.IsValidIP6("1" + (":1" * 8)))
+ self.assertFalse(utils.IsValidIP6("127.0.0.1"))
+
+
+class TestIsValidIP(unittest.TestCase):
+ def test(self):
+ self.assert_(utils.IsValidIP("0.0.0.0"))
+ self.assert_(utils.IsValidIP("127.0.0.1"))
+ self.assert_(utils.IsValidIP("::"))
+ self.assert_(utils.IsValidIP("::1"))
+ self.assertFalse(utils.IsValidIP("0"))
+ self.assertFalse(utils.IsValidIP("1.1.1.256"))
+ self.assertFalse(utils.IsValidIP("a:g::1"))
+
+
if __name__ == '__main__':
testutils.GanetiTestProgram()