- def testShell(self):
- utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
- self._wait(self.tmpfile, 60.0, "Hello World")
-
- def testShellOutput(self):
- utils.StartDaemon("echo Hello World", output=self.tmpfile)
- self._wait(self.tmpfile, 60.0, "Hello World")
-
- def testNoShellNoOutput(self):
- utils.StartDaemon(["pwd"])
-
- def testNoShellNoOutputTouch(self):
- testfile = os.path.join(self.tmpdir, "check")
- self.failIf(os.path.exists(testfile))
- utils.StartDaemon(["touch", testfile])
- self._wait(testfile, 60.0, "")
-
- def testNoShellOutput(self):
- utils.StartDaemon(["pwd"], output=self.tmpfile)
- self._wait(self.tmpfile, 60.0, "/")
-
- def testNoShellOutputCwd(self):
- utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
- self._wait(self.tmpfile, 60.0, os.getcwd())
-
- def testShellEnv(self):
- utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
- env={ "GNT_TEST_VAR": "Hello World", })
- self._wait(self.tmpfile, 60.0, "Hello World")
-
- def testNoShellEnv(self):
- utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
- env={ "GNT_TEST_VAR": "Hello World", })
- self._wait(self.tmpfile, 60.0, "Hello World")
-
- def testOutputFd(self):
- fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
- try:
- utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
- finally:
- os.close(fd)
- self._wait(self.tmpfile, 60.0, os.getcwd())
-
- def testPid(self):
- pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
- self._wait(self.tmpfile, 60.0, str(pid))
-
- def testPidFile(self):
- pidfile = os.path.join(self.tmpdir, "pid")
- checkfile = os.path.join(self.tmpdir, "abort")
-
- pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
- output=self.tmpfile)
- try:
- fd = os.open(pidfile, os.O_RDONLY)
- try:
- # Check file is locked
- self.assertRaises(errors.LockError, utils.LockFile, fd)
-
- pidtext = os.read(fd, 100)
- finally:
- os.close(fd)
-
- self.assertEqual(int(pidtext.strip()), pid)
-
- self.assert_(utils.IsProcessAlive(pid))
- finally:
- # No matter what happens, kill daemon
- utils.KillProcess(pid, timeout=5.0, waitpid=False)
- self.failIf(utils.IsProcessAlive(pid))
-
- self.assertEqual(utils.ReadFile(self.tmpfile), "")
-
- def _wait(self, path, timeout, expected):
- # Due to the asynchronous nature of daemon processes, polling is necessary.
- # A timeout makes sure the test doesn't hang forever.
- def _CheckFile():
- if not (os.path.isfile(path) and
- utils.ReadFile(path).strip() == expected):
- raise utils.RetryAgain()
-
- try:
- utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
- except utils.RetryTimeout:
- self.fail("Apparently the daemon didn't run in %s seconds and/or"
- " didn't write the correct output" % timeout)
-
- def testError(self):
- self.assertRaises(errors.OpExecError, utils.StartDaemon,
- ["./does-NOT-EXIST/here/0123456789"])
- self.assertRaises(errors.OpExecError, utils.StartDaemon,
- ["./does-NOT-EXIST/here/0123456789"],
- output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
- self.assertRaises(errors.OpExecError, utils.StartDaemon,
- ["./does-NOT-EXIST/here/0123456789"],
- cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
- self.assertRaises(errors.OpExecError, utils.StartDaemon,
- ["./does-NOT-EXIST/here/0123456789"],
- output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
-
- fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
- try:
- self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
- ["./does-NOT-EXIST/here/0123456789"],
- output=self.tmpfile, output_fd=fd)
- finally:
- os.close(fd)
-
-
-class TestSetCloseOnExecFlag(unittest.TestCase):
- """Tests for SetCloseOnExecFlag"""
-
- def setUp(self):
- self.tmpfile = tempfile.TemporaryFile()
-
- def testEnable(self):
- utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
- self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
- fcntl.FD_CLOEXEC)
-
- def testDisable(self):
- utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
- self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
- fcntl.FD_CLOEXEC)
-
-
-class TestSetNonblockFlag(unittest.TestCase):
- def setUp(self):
- self.tmpfile = tempfile.TemporaryFile()
-
- def testEnable(self):
- utils.SetNonblockFlag(self.tmpfile.fileno(), True)
- self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
- os.O_NONBLOCK)
-
- def testDisable(self):
- utils.SetNonblockFlag(self.tmpfile.fileno(), False)
- self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
- os.O_NONBLOCK)
-
-
-class TestRemoveFile(unittest.TestCase):
- """Test case for the RemoveFile function"""
-
- def setUp(self):
- """Create a temp dir and file for each case"""
- self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
- fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
- os.close(fd)
-
- def tearDown(self):
- if os.path.exists(self.tmpfile):
- os.unlink(self.tmpfile)
- os.rmdir(self.tmpdir)
-
- def testIgnoreDirs(self):
- """Test that RemoveFile() ignores directories"""
- self.assertEqual(None, RemoveFile(self.tmpdir))
-
- def testIgnoreNotExisting(self):
- """Test that RemoveFile() ignores non-existing files"""
- RemoveFile(self.tmpfile)
- RemoveFile(self.tmpfile)
-
- def testRemoveFile(self):
- """Test that RemoveFile does remove a file"""
- RemoveFile(self.tmpfile)
- if os.path.exists(self.tmpfile):
- self.fail("File '%s' not removed" % self.tmpfile)
-
- def testRemoveSymlink(self):
- """Test that RemoveFile does remove symlinks"""
- symlink = self.tmpdir + "/symlink"
- os.symlink("no-such-file", symlink)
- RemoveFile(symlink)
- if os.path.exists(symlink):
- self.fail("File '%s' not removed" % symlink)
- os.symlink(self.tmpfile, symlink)
- RemoveFile(symlink)
- if os.path.exists(symlink):
- self.fail("File '%s' not removed" % symlink)
-
-
-class TestRename(unittest.TestCase):
- """Test case for RenameFile"""
-
- def setUp(self):
- """Create a temporary directory"""
- self.tmpdir = tempfile.mkdtemp()
- self.tmpfile = os.path.join(self.tmpdir, "test1")
-
- # Touch the file
- open(self.tmpfile, "w").close()
-
- def tearDown(self):
- """Remove temporary directory"""
- shutil.rmtree(self.tmpdir)
-
- def testSimpleRename1(self):
- """Simple rename 1"""
- utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
- self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
-
- def testSimpleRename2(self):
- """Simple rename 2"""
- utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
- mkdir=True)
- self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
-
- def testRenameMkdir(self):
- """Rename with mkdir"""
- utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
- mkdir=True)
- self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
- self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
-
- utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
- os.path.join(self.tmpdir, "test/foo/bar/baz"),
- mkdir=True)
- self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
- self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
- self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
-
-
-class TestMatchNameComponent(unittest.TestCase):
- """Test case for the MatchNameComponent function"""
-
- def testEmptyList(self):
- """Test that there is no match against an empty list"""
-
- self.failUnlessEqual(MatchNameComponent("", []), None)
- self.failUnlessEqual(MatchNameComponent("test", []), None)
-
- def testSingleMatch(self):
- """Test that a single match is performed correctly"""
- mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
- for key in "test2", "test2.example", "test2.example.com":
- self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
-
- def testMultipleMatches(self):
- """Test that a multiple match is returned as None"""
- mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
- for key in "test1", "test1.example":
- self.failUnlessEqual(MatchNameComponent(key, mlist), None)
-
- def testFullMatch(self):
- """Test that a full match is returned correctly"""
- key1 = "test1"
- key2 = "test1.example"
- mlist = [key2, key2 + ".com"]
- self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
- self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
-
- def testCaseInsensitivePartialMatch(self):
- """Test for the case_insensitive keyword"""
- mlist = ["test1.example.com", "test2.example.net"]
- self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
- "test2.example.net")
- self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
- "test2.example.net")
- self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
- "test2.example.net")
- self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
- "test2.example.net")
-
-
- def testCaseInsensitiveFullMatch(self):
- mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
- # Between the two ts1 a full string match non-case insensitive should work
- self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
- None)
- self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
- "ts1.ex")
- self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
- "ts1.ex")
- # Between the two ts2 only case differs, so only case-match works
- self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
- "ts2.ex")
- self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
- "Ts2.ex")
- self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
- None)
-
-
-class TestTimestampForFilename(unittest.TestCase):
- def test(self):
- self.assert_("." not in utils.TimestampForFilename())
- self.assert_(":" not in utils.TimestampForFilename())
-
-
-class TestCreateBackup(testutils.GanetiTestCase):
- def setUp(self):
- testutils.GanetiTestCase.setUp(self)
-
- self.tmpdir = tempfile.mkdtemp()
-
- def tearDown(self):
- testutils.GanetiTestCase.tearDown(self)
-
- shutil.rmtree(self.tmpdir)
-
- def testEmpty(self):
- filename = utils.PathJoin(self.tmpdir, "config.data")
- utils.WriteFile(filename, data="")
- bname = utils.CreateBackup(filename)
- self.assertFileContent(bname, "")
- self.assertEqual(len(glob.glob("%s*" % filename)), 2)
- utils.CreateBackup(filename)
- self.assertEqual(len(glob.glob("%s*" % filename)), 3)
- utils.CreateBackup(filename)
- self.assertEqual(len(glob.glob("%s*" % filename)), 4)
-
- fifoname = utils.PathJoin(self.tmpdir, "fifo")
- os.mkfifo(fifoname)
- self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
-
- def testContent(self):
- bkpcount = 0
- for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
- for rep in [1, 2, 10, 127]:
- testdata = data * rep
-
- filename = utils.PathJoin(self.tmpdir, "test.data_")
- utils.WriteFile(filename, data=testdata)
- self.assertFileContent(filename, testdata)
-
- for _ in range(3):
- bname = utils.CreateBackup(filename)
- bkpcount += 1
- self.assertFileContent(bname, testdata)
- self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
-
-
-class TestFormatUnit(unittest.TestCase):
- """Test case for the FormatUnit function"""
-
- def testMiB(self):
- self.assertEqual(FormatUnit(1, 'h'), '1M')
- self.assertEqual(FormatUnit(100, 'h'), '100M')
- self.assertEqual(FormatUnit(1023, 'h'), '1023M')
-
- self.assertEqual(FormatUnit(1, 'm'), '1')
- self.assertEqual(FormatUnit(100, 'm'), '100')
- self.assertEqual(FormatUnit(1023, 'm'), '1023')
-
- self.assertEqual(FormatUnit(1024, 'm'), '1024')
- self.assertEqual(FormatUnit(1536, 'm'), '1536')
- self.assertEqual(FormatUnit(17133, 'm'), '17133')
- self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
-
- def testGiB(self):
- self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
- self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
- self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
- self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
-
- self.assertEqual(FormatUnit(1024, 'g'), '1.0')
- self.assertEqual(FormatUnit(1536, 'g'), '1.5')
- self.assertEqual(FormatUnit(17133, 'g'), '16.7')
- self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
-
- self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
- self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
- self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
-
- def testTiB(self):
- self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
- self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
- self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
-
- self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
- self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
- self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
-
-class TestParseUnit(unittest.TestCase):
- """Test case for the ParseUnit function"""
-
- SCALES = (('', 1),
- ('M', 1), ('G', 1024), ('T', 1024 * 1024),
- ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
- ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
-
- def testRounding(self):
- self.assertEqual(ParseUnit('0'), 0)
- self.assertEqual(ParseUnit('1'), 4)
- self.assertEqual(ParseUnit('2'), 4)
- self.assertEqual(ParseUnit('3'), 4)
-
- self.assertEqual(ParseUnit('124'), 124)
- self.assertEqual(ParseUnit('125'), 128)
- self.assertEqual(ParseUnit('126'), 128)
- self.assertEqual(ParseUnit('127'), 128)
- self.assertEqual(ParseUnit('128'), 128)
- self.assertEqual(ParseUnit('129'), 132)
- self.assertEqual(ParseUnit('130'), 132)
-
- def testFloating(self):
- self.assertEqual(ParseUnit('0'), 0)
- self.assertEqual(ParseUnit('0.5'), 4)
- self.assertEqual(ParseUnit('1.75'), 4)
- self.assertEqual(ParseUnit('1.99'), 4)
- self.assertEqual(ParseUnit('2.00'), 4)
- self.assertEqual(ParseUnit('2.01'), 4)
- self.assertEqual(ParseUnit('3.99'), 4)
- self.assertEqual(ParseUnit('4.00'), 4)
- self.assertEqual(ParseUnit('4.01'), 8)
- self.assertEqual(ParseUnit('1.5G'), 1536)
- self.assertEqual(ParseUnit('1.8G'), 1844)
- self.assertEqual(ParseUnit('8.28T'), 8682212)
-
- def testSuffixes(self):
- for sep in ('', ' ', ' ', "\t", "\t "):
- for suffix, scale in TestParseUnit.SCALES:
- for func in (lambda x: x, str.lower, str.upper):
- self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
- 1024 * scale)