-import errno
-
-import ganeti
-import testutils
-from ganeti import constants
-from ganeti import compat
-from ganeti import utils
-from ganeti import errors
-from ganeti import serializer
-from ganeti.utils import IsProcessAlive, RunCmd, \
- RemoveFile, MatchNameComponent, FormatUnit, \
- ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
- ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
- SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
- TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
- UnescapeAndSplit, RunParts, PathJoin, HostInfo, ReadOneLineFile
-
-from ganeti.errors import LockError, UnitParseError, GenericError, \
- ProgrammerError, OpPrereqError
-
-
-class TestIsProcessAlive(unittest.TestCase):
- """Testing case for IsProcessAlive"""
-
- def testExists(self):
- mypid = os.getpid()
- self.assert_(IsProcessAlive(mypid),
- "can't find myself running")
-
- def testNotExisting(self):
- pid_non_existing = os.fork()
- if pid_non_existing == 0:
- os._exit(0)
- elif pid_non_existing < 0:
- raise SystemError("can't fork")
- os.waitpid(pid_non_existing, 0)
- self.assert_(not IsProcessAlive(pid_non_existing),
- "nonexisting process detected")
-
-
-class TestPidFileFunctions(unittest.TestCase):
- """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
-
- def setUp(self):
- self.dir = tempfile.mkdtemp()
- self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
- utils.DaemonPidFileName = self.f_dpn
-
- def testPidFileFunctions(self):
- pid_file = self.f_dpn('test')
- utils.WritePidFile('test')
- self.failUnless(os.path.exists(pid_file),
- "PID file should have been created")
- read_pid = utils.ReadPidFile(pid_file)
- self.failUnlessEqual(read_pid, os.getpid())
- self.failUnless(utils.IsProcessAlive(read_pid))
- self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
- utils.RemovePidFile('test')
- self.failIf(os.path.exists(pid_file),
- "PID file should not exist anymore")
- self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
- "ReadPidFile should return 0 for missing pid file")
- fh = open(pid_file, "w")
- fh.write("blah\n")
- fh.close()
- self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
- "ReadPidFile should return 0 for invalid pid file")
- utils.RemovePidFile('test')
- self.failIf(os.path.exists(pid_file),
- "PID file should not exist anymore")
-
- def testKill(self):
- pid_file = self.f_dpn('child')
- r_fd, w_fd = os.pipe()
- new_pid = os.fork()
- if new_pid == 0: #child
- utils.WritePidFile('child')
- os.write(w_fd, 'a')
- signal.pause()
- os._exit(0)
- return
- # else we are in the parent
- # wait until the child has written the pid file
- os.read(r_fd, 1)
- read_pid = utils.ReadPidFile(pid_file)
- self.failUnlessEqual(read_pid, new_pid)
- self.failUnless(utils.IsProcessAlive(new_pid))
- utils.KillProcess(new_pid, waitpid=True)
- self.failIf(utils.IsProcessAlive(new_pid))
- utils.RemovePidFile('child')
- self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
-
- def tearDown(self):
- for name in os.listdir(self.dir):
- os.unlink(os.path.join(self.dir, name))
- os.rmdir(self.dir)
-
-
-class TestRunCmd(testutils.GanetiTestCase):
- """Testing case for the RunCmd function"""
-
- def setUp(self):
- testutils.GanetiTestCase.setUp(self)
- self.magic = time.ctime() + " ganeti test"
- self.fname = self._CreateTempFile()
-
- def testOk(self):
- """Test successful exit code"""
- result = RunCmd("/bin/sh -c 'exit 0'")
- self.assertEqual(result.exit_code, 0)
- self.assertEqual(result.output, "")
-
- def testFail(self):
- """Test fail exit code"""
- result = RunCmd("/bin/sh -c 'exit 1'")
- self.assertEqual(result.exit_code, 1)
- self.assertEqual(result.output, "")
-
- def testStdout(self):
- """Test standard output"""
- cmd = 'echo -n "%s"' % self.magic
- result = RunCmd("/bin/sh -c '%s'" % cmd)
- self.assertEqual(result.stdout, self.magic)
- result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
- self.assertEqual(result.output, "")
- self.assertFileContent(self.fname, self.magic)
-
- def testStderr(self):
- """Test standard error"""
- cmd = 'echo -n "%s"' % self.magic
- result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
- self.assertEqual(result.stderr, self.magic)
- result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
- self.assertEqual(result.output, "")
- self.assertFileContent(self.fname, self.magic)
-
- def testCombined(self):
- """Test combined output"""
- cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
- expected = "A" + self.magic + "B" + self.magic
- result = RunCmd("/bin/sh -c '%s'" % cmd)
- self.assertEqual(result.output, expected)
- result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
- self.assertEqual(result.output, "")
- self.assertFileContent(self.fname, expected)
-
- def testSignal(self):
- """Test signal"""
- result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
- self.assertEqual(result.signal, 15)
- self.assertEqual(result.output, "")
-
- def testListRun(self):
- """Test list runs"""
- result = RunCmd(["true"])
- self.assertEqual(result.signal, None)
- self.assertEqual(result.exit_code, 0)
- result = RunCmd(["/bin/sh", "-c", "exit 1"])
- self.assertEqual(result.signal, None)
- self.assertEqual(result.exit_code, 1)
- result = RunCmd(["echo", "-n", self.magic])
- self.assertEqual(result.signal, None)
- self.assertEqual(result.exit_code, 0)
- self.assertEqual(result.stdout, self.magic)
-
- def testFileEmptyOutput(self):
- """Test file output"""
- result = RunCmd(["true"], output=self.fname)
- self.assertEqual(result.signal, None)
- self.assertEqual(result.exit_code, 0)
- self.assertFileContent(self.fname, "")
-
- def testLang(self):
- """Test locale environment"""
- old_env = os.environ.copy()
- try:
- os.environ["LANG"] = "en_US.UTF-8"
- os.environ["LC_ALL"] = "en_US.UTF-8"
- result = RunCmd(["locale"])
- for line in result.output.splitlines():
- key, value = line.split("=", 1)
- # Ignore these variables, they're overridden by LC_ALL
- if key == "LANG" or key == "LANGUAGE":
- continue
- self.failIf(value and value != "C" and value != '"C"',
- "Variable %s is set to the invalid value '%s'" % (key, value))
- finally:
- os.environ = old_env
-
- def testDefaultCwd(self):
- """Test default working directory"""
- self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
-
- def testCwd(self):
- """Test default working directory"""
- self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
- self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
- cwd = os.getcwd()
- self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
-
- def testResetEnv(self):
- """Test environment reset functionality"""
- self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
- self.failUnlessEqual(RunCmd(["env"], reset_env=True,
- env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
-
-
-class TestRunParts(unittest.TestCase):
- """Testing case for the RunParts function"""
-
- def setUp(self):
- self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
-
- def tearDown(self):
- shutil.rmtree(self.rundir)
-
- def testEmpty(self):
- """Test on an empty dir"""
- self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
-
- def testSkipWrongName(self):
- """Test that wrong files are skipped"""
- fname = os.path.join(self.rundir, "00test.dot")
- utils.WriteFile(fname, data="")
- os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
- relname = os.path.basename(fname)
- self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
- [(relname, constants.RUNPARTS_SKIP, None)])
-
- def testSkipNonExec(self):
- """Test that non executable files are skipped"""
- fname = os.path.join(self.rundir, "00test")
- utils.WriteFile(fname, data="")
- relname = os.path.basename(fname)
- self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
- [(relname, constants.RUNPARTS_SKIP, None)])
-
- def testError(self):
- """Test error on a broken executable"""
- fname = os.path.join(self.rundir, "00test")
- utils.WriteFile(fname, data="")
- os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
- (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
- self.failUnlessEqual(relname, os.path.basename(fname))
- self.failUnlessEqual(status, constants.RUNPARTS_ERR)
- self.failUnless(error)
-
- def testSorted(self):
- """Test executions are sorted"""
- files = []
- files.append(os.path.join(self.rundir, "64test"))
- files.append(os.path.join(self.rundir, "00test"))
- files.append(os.path.join(self.rundir, "42test"))
-
- for fname in files:
- utils.WriteFile(fname, data="")
-
- results = RunParts(self.rundir, reset_env=True)
-
- for fname in sorted(files):
- self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
-
- def testOk(self):
- """Test correct execution"""
- fname = os.path.join(self.rundir, "00test")
- utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
- os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
- (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
- self.failUnlessEqual(relname, os.path.basename(fname))
- self.failUnlessEqual(status, constants.RUNPARTS_RUN)
- self.failUnlessEqual(runresult.stdout, "ciao")
-
- def testRunFail(self):
- """Test correct execution, with run failure"""
- fname = os.path.join(self.rundir, "00test")
- utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
- os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
- (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
- self.failUnlessEqual(relname, os.path.basename(fname))
- self.failUnlessEqual(status, constants.RUNPARTS_RUN)
- self.failUnlessEqual(runresult.exit_code, 1)
- self.failUnless(runresult.failed)
-
- def testRunMix(self):
- files = []
- files.append(os.path.join(self.rundir, "00test"))
- files.append(os.path.join(self.rundir, "42test"))
- files.append(os.path.join(self.rundir, "64test"))
- files.append(os.path.join(self.rundir, "99test"))
-
- files.sort()
-
- # 1st has errors in execution
- utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
- os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
-
- # 2nd is skipped
- utils.WriteFile(files[1], data="")
-
- # 3rd cannot execute properly
- utils.WriteFile(files[2], data="")
- os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
-
- # 4th execs
- utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
- os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
-
- results = RunParts(self.rundir, reset_env=True)
-
- (relname, status, runresult) = results[0]
- self.failUnlessEqual(relname, os.path.basename(files[0]))
- self.failUnlessEqual(status, constants.RUNPARTS_RUN)
- self.failUnlessEqual(runresult.exit_code, 1)
- self.failUnless(runresult.failed)
-
- (relname, status, runresult) = results[1]
- self.failUnlessEqual(relname, os.path.basename(files[1]))
- self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
- self.failUnlessEqual(runresult, None)
-
- (relname, status, runresult) = results[2]
- self.failUnlessEqual(relname, os.path.basename(files[2]))
- self.failUnlessEqual(status, constants.RUNPARTS_ERR)
- self.failUnless(runresult)
-
- (relname, status, runresult) = results[3]
- self.failUnlessEqual(relname, os.path.basename(files[3]))
- self.failUnlessEqual(status, constants.RUNPARTS_RUN)
- self.failUnlessEqual(runresult.output, "ciao")
- self.failUnlessEqual(runresult.exit_code, 0)
- self.failUnless(not runresult.failed)
-
-
-class TestStartDaemon(testutils.GanetiTestCase):
- def setUp(self):
- self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
- self.tmpfile = os.path.join(self.tmpdir, "test")
-
- def tearDown(self):
- shutil.rmtree(self.tmpdir)
-
- def testShell(self):
- utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
- self._wait(self.tmpfile, 60.0, "Hello World")
-
- def testShellOutput(self):
- utils.StartDaemon("echo Hello World", output=self.tmpfile)
- self._wait(self.tmpfile, 60.0, "Hello World")
-
- def testNoShellNoOutput(self):
- utils.StartDaemon(["pwd"])
-
- def testNoShellNoOutputTouch(self):
- testfile = os.path.join(self.tmpdir, "check")
- self.failIf(os.path.exists(testfile))
- utils.StartDaemon(["touch", testfile])
- self._wait(testfile, 60.0, "")
-
- def testNoShellOutput(self):
- utils.StartDaemon(["pwd"], output=self.tmpfile)
- self._wait(self.tmpfile, 60.0, "/")
-
- def testNoShellOutputCwd(self):
- utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
- self._wait(self.tmpfile, 60.0, os.getcwd())
-
- def testShellEnv(self):
- utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
- env={ "GNT_TEST_VAR": "Hello World", })
- self._wait(self.tmpfile, 60.0, "Hello World")
-
- def testNoShellEnv(self):
- utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
- env={ "GNT_TEST_VAR": "Hello World", })
- self._wait(self.tmpfile, 60.0, "Hello World")
-
- def testOutputFd(self):
- fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
- try:
- utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
- finally:
- os.close(fd)
- self._wait(self.tmpfile, 60.0, os.getcwd())
-
- def testPid(self):
- pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
- self._wait(self.tmpfile, 60.0, str(pid))
-
- def testPidFile(self):
- pidfile = os.path.join(self.tmpdir, "pid")
- checkfile = os.path.join(self.tmpdir, "abort")
-
- pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
- output=self.tmpfile)
- try:
- fd = os.open(pidfile, os.O_RDONLY)
- try:
- # Check file is locked
- self.assertRaises(errors.LockError, utils.LockFile, fd)
-
- pidtext = os.read(fd, 100)
- finally:
- os.close(fd)
-
- self.assertEqual(int(pidtext.strip()), pid)
-
- self.assert_(utils.IsProcessAlive(pid))
- finally:
- # No matter what happens, kill daemon
- utils.KillProcess(pid, timeout=5.0, waitpid=False)
- self.failIf(utils.IsProcessAlive(pid))
-
- self.assertEqual(utils.ReadFile(self.tmpfile), "")
-
- def _wait(self, path, timeout, expected):
- # Due to the asynchronous nature of daemon processes, polling is necessary.
- # A timeout makes sure the test doesn't hang forever.
- def _CheckFile():
- if not (os.path.isfile(path) and
- utils.ReadFile(path).strip() == expected):
- raise utils.RetryAgain()
-
- try:
- utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
- except utils.RetryTimeout:
- self.fail("Apparently the daemon didn't run in %s seconds and/or"
- " didn't write the correct output" % timeout)
-
- def testError(self):
- self.assertRaises(errors.OpExecError, utils.StartDaemon,
- ["./does-NOT-EXIST/here/0123456789"])
- self.assertRaises(errors.OpExecError, utils.StartDaemon,
- ["./does-NOT-EXIST/here/0123456789"],
- output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
- self.assertRaises(errors.OpExecError, utils.StartDaemon,
- ["./does-NOT-EXIST/here/0123456789"],
- cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
- self.assertRaises(errors.OpExecError, utils.StartDaemon,
- ["./does-NOT-EXIST/here/0123456789"],
- output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
-
- fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
- try:
- self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
- ["./does-NOT-EXIST/here/0123456789"],
- output=self.tmpfile, output_fd=fd)
- finally:
- os.close(fd)
-
-
-class TestSetCloseOnExecFlag(unittest.TestCase):
- """Tests for SetCloseOnExecFlag"""
-
- def setUp(self):
- self.tmpfile = tempfile.TemporaryFile()
-
- def testEnable(self):
- utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
- self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
- fcntl.FD_CLOEXEC)
-
- def testDisable(self):
- utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
- self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
- fcntl.FD_CLOEXEC)
-
-
-class TestSetNonblockFlag(unittest.TestCase):
- def setUp(self):
- self.tmpfile = tempfile.TemporaryFile()
-
- def testEnable(self):
- utils.SetNonblockFlag(self.tmpfile.fileno(), True)
- self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
- os.O_NONBLOCK)
-
- def testDisable(self):
- utils.SetNonblockFlag(self.tmpfile.fileno(), False)
- self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
- os.O_NONBLOCK)
-
-
-class TestRemoveFile(unittest.TestCase):
- """Test case for the RemoveFile function"""
-
- def setUp(self):
- """Create a temp dir and file for each case"""
- self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
- fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
- os.close(fd)
-
- def tearDown(self):
- if os.path.exists(self.tmpfile):
- os.unlink(self.tmpfile)
- os.rmdir(self.tmpdir)
-
- def testIgnoreDirs(self):
- """Test that RemoveFile() ignores directories"""
- self.assertEqual(None, RemoveFile(self.tmpdir))
-
- def testIgnoreNotExisting(self):
- """Test that RemoveFile() ignores non-existing files"""
- RemoveFile(self.tmpfile)
- RemoveFile(self.tmpfile)
-
- def testRemoveFile(self):
- """Test that RemoveFile does remove a file"""
- RemoveFile(self.tmpfile)
- if os.path.exists(self.tmpfile):
- self.fail("File '%s' not removed" % self.tmpfile)
-
- def testRemoveSymlink(self):
- """Test that RemoveFile does remove symlinks"""
- symlink = self.tmpdir + "/symlink"
- os.symlink("no-such-file", symlink)
- RemoveFile(symlink)
- if os.path.exists(symlink):
- self.fail("File '%s' not removed" % symlink)
- os.symlink(self.tmpfile, symlink)
- RemoveFile(symlink)
- if os.path.exists(symlink):
- self.fail("File '%s' not removed" % symlink)
-
-
-class TestRename(unittest.TestCase):
- """Test case for RenameFile"""
-
- def setUp(self):
- """Create a temporary directory"""
- self.tmpdir = tempfile.mkdtemp()
- self.tmpfile = os.path.join(self.tmpdir, "test1")
-
- # Touch the file
- open(self.tmpfile, "w").close()
-
- def tearDown(self):
- """Remove temporary directory"""
- shutil.rmtree(self.tmpdir)
-
- def testSimpleRename1(self):
- """Simple rename 1"""
- utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
- self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
-
- def testSimpleRename2(self):
- """Simple rename 2"""
- utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
- mkdir=True)
- self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
-
- def testRenameMkdir(self):
- """Rename with mkdir"""
- utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
- mkdir=True)
- self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
- self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
-
- utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
- os.path.join(self.tmpdir, "test/foo/bar/baz"),
- mkdir=True)
- self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
- self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
- self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
-
-
-class TestMatchNameComponent(unittest.TestCase):
- """Test case for the MatchNameComponent function"""
-
- def testEmptyList(self):
- """Test that there is no match against an empty list"""
-
- self.failUnlessEqual(MatchNameComponent("", []), None)
- self.failUnlessEqual(MatchNameComponent("test", []), None)
-
- def testSingleMatch(self):
- """Test that a single match is performed correctly"""
- mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
- for key in "test2", "test2.example", "test2.example.com":
- self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
-
- def testMultipleMatches(self):
- """Test that a multiple match is returned as None"""
- mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
- for key in "test1", "test1.example":
- self.failUnlessEqual(MatchNameComponent(key, mlist), None)
-
- def testFullMatch(self):
- """Test that a full match is returned correctly"""
- key1 = "test1"
- key2 = "test1.example"
- mlist = [key2, key2 + ".com"]
- self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
- self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
-
- def testCaseInsensitivePartialMatch(self):
- """Test for the case_insensitive keyword"""
- mlist = ["test1.example.com", "test2.example.net"]
- self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
- "test2.example.net")
- self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
- "test2.example.net")
- self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
- "test2.example.net")
- self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
- "test2.example.net")
-
-
- def testCaseInsensitiveFullMatch(self):
- mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
- # Between the two ts1 a full string match non-case insensitive should work
- self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
- None)
- self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
- "ts1.ex")
- self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
- "ts1.ex")
- # Between the two ts2 only case differs, so only case-match works
- self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
- "ts2.ex")
- self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
- "Ts2.ex")
- self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
- None)
-
-
-class TestReadFile(testutils.GanetiTestCase):
-
- def testReadAll(self):
- data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
- self.assertEqual(len(data), 814)
-
- h = compat.md5_hash()
- h.update(data)
- self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
-
- def testReadSize(self):
- data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
- size=100)
- self.assertEqual(len(data), 100)
-
- h = compat.md5_hash()
- h.update(data)
- self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
-
- def testError(self):
- self.assertRaises(EnvironmentError, utils.ReadFile,
- "/dev/null/does-not-exist")
-
-
-class TestReadOneLineFile(testutils.GanetiTestCase):
-
- def setUp(self):
- testutils.GanetiTestCase.setUp(self)
-
- def testDefault(self):
- data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
- self.assertEqual(len(data), 27)
- self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
-
- def testNotStrict(self):
- data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
- self.assertEqual(len(data), 27)
- self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
-
- def testStrictFailure(self):
- self.assertRaises(errors.GenericError, ReadOneLineFile,
- self._TestDataFilename("cert1.pem"), strict=True)
-
- def testLongLine(self):
- dummydata = (1024 * "Hello World! ")
- myfile = self._CreateTempFile()
- utils.WriteFile(myfile, data=dummydata)
- datastrict = ReadOneLineFile(myfile, strict=True)
- datalax = ReadOneLineFile(myfile, strict=False)
- self.assertEqual(dummydata, datastrict)
- self.assertEqual(dummydata, datalax)
-
- def testNewline(self):
- myfile = self._CreateTempFile()
- myline = "myline"
- for nl in ["", "\n", "\r\n"]:
- dummydata = "%s%s" % (myline, nl)
- utils.WriteFile(myfile, data=dummydata)
- datalax = ReadOneLineFile(myfile, strict=False)
- self.assertEqual(myline, datalax)
- datastrict = ReadOneLineFile(myfile, strict=True)
- self.assertEqual(myline, datastrict)
-
- def testWhitespaceAndMultipleLines(self):
- myfile = self._CreateTempFile()
- for nl in ["", "\n", "\r\n"]:
- for ws in [" ", "\t", "\t\t \t", "\t "]:
- dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
- utils.WriteFile(myfile, data=dummydata)
- datalax = ReadOneLineFile(myfile, strict=False)
- if nl:
- self.assert_(set("\r\n") & set(dummydata))
- self.assertRaises(errors.GenericError, ReadOneLineFile,
- myfile, strict=True)
- explen = len("Foo bar baz ") + len(ws)
- self.assertEqual(len(datalax), explen)
- self.assertEqual(datalax, dummydata[:explen])
- self.assertFalse(set("\r\n") & set(datalax))
- else:
- datastrict = ReadOneLineFile(myfile, strict=True)
- self.assertEqual(dummydata, datastrict)
- self.assertEqual(dummydata, datalax)
-
- def testEmptylines(self):
- myfile = self._CreateTempFile()
- myline = "myline"
- for nl in ["\n", "\r\n"]:
- for ol in ["", "otherline"]:
- dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
- utils.WriteFile(myfile, data=dummydata)
- self.assert_(set("\r\n") & set(dummydata))
- datalax = ReadOneLineFile(myfile, strict=False)
- self.assertEqual(myline, datalax)
- if ol:
- self.assertRaises(errors.GenericError, ReadOneLineFile,
- myfile, strict=True)
- else:
- datastrict = ReadOneLineFile(myfile, strict=True)
- self.assertEqual(myline, datastrict)
-
-
-class TestTimestampForFilename(unittest.TestCase):
- def test(self):
- self.assert_("." not in utils.TimestampForFilename())
- self.assert_(":" not in utils.TimestampForFilename())
-
-
-class TestCreateBackup(testutils.GanetiTestCase):
- def setUp(self):
- testutils.GanetiTestCase.setUp(self)
-
- self.tmpdir = tempfile.mkdtemp()
-
- def tearDown(self):
- testutils.GanetiTestCase.tearDown(self)
-
- shutil.rmtree(self.tmpdir)
-
- def testEmpty(self):
- filename = utils.PathJoin(self.tmpdir, "config.data")
- utils.WriteFile(filename, data="")
- bname = utils.CreateBackup(filename)
- self.assertFileContent(bname, "")
- self.assertEqual(len(glob.glob("%s*" % filename)), 2)
- utils.CreateBackup(filename)
- self.assertEqual(len(glob.glob("%s*" % filename)), 3)
- utils.CreateBackup(filename)
- self.assertEqual(len(glob.glob("%s*" % filename)), 4)
-
- fifoname = utils.PathJoin(self.tmpdir, "fifo")
- os.mkfifo(fifoname)
- self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
-
- def testContent(self):
- bkpcount = 0
- for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
- for rep in [1, 2, 10, 127]:
- testdata = data * rep
-
- filename = utils.PathJoin(self.tmpdir, "test.data_")
- utils.WriteFile(filename, data=testdata)
- self.assertFileContent(filename, testdata)
-
- for _ in range(3):
- bname = utils.CreateBackup(filename)
- bkpcount += 1
- self.assertFileContent(bname, testdata)
- self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
-
-
-class TestFormatUnit(unittest.TestCase):
- """Test case for the FormatUnit function"""
-
- def testMiB(self):
- self.assertEqual(FormatUnit(1, 'h'), '1M')
- self.assertEqual(FormatUnit(100, 'h'), '100M')
- self.assertEqual(FormatUnit(1023, 'h'), '1023M')
-
- self.assertEqual(FormatUnit(1, 'm'), '1')
- self.assertEqual(FormatUnit(100, 'm'), '100')
- self.assertEqual(FormatUnit(1023, 'm'), '1023')
-
- self.assertEqual(FormatUnit(1024, 'm'), '1024')
- self.assertEqual(FormatUnit(1536, 'm'), '1536')
- self.assertEqual(FormatUnit(17133, 'm'), '17133')
- self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
-
- def testGiB(self):
- self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
- self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
- self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
- self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
-
- self.assertEqual(FormatUnit(1024, 'g'), '1.0')
- self.assertEqual(FormatUnit(1536, 'g'), '1.5')
- self.assertEqual(FormatUnit(17133, 'g'), '16.7')
- self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
-
- self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
- self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
- self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
-
- def testTiB(self):
- self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
- self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
- self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
-
- self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
- self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
- self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
-
-class TestParseUnit(unittest.TestCase):
- """Test case for the ParseUnit function"""
-
- SCALES = (('', 1),
- ('M', 1), ('G', 1024), ('T', 1024 * 1024),
- ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
- ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
-
- def testRounding(self):
- self.assertEqual(ParseUnit('0'), 0)
- self.assertEqual(ParseUnit('1'), 4)
- self.assertEqual(ParseUnit('2'), 4)
- self.assertEqual(ParseUnit('3'), 4)
-
- self.assertEqual(ParseUnit('124'), 124)
- self.assertEqual(ParseUnit('125'), 128)
- self.assertEqual(ParseUnit('126'), 128)
- self.assertEqual(ParseUnit('127'), 128)
- self.assertEqual(ParseUnit('128'), 128)
- self.assertEqual(ParseUnit('129'), 132)
- self.assertEqual(ParseUnit('130'), 132)
-
- def testFloating(self):
- self.assertEqual(ParseUnit('0'), 0)
- self.assertEqual(ParseUnit('0.5'), 4)
- self.assertEqual(ParseUnit('1.75'), 4)
- self.assertEqual(ParseUnit('1.99'), 4)
- self.assertEqual(ParseUnit('2.00'), 4)
- self.assertEqual(ParseUnit('2.01'), 4)
- self.assertEqual(ParseUnit('3.99'), 4)
- self.assertEqual(ParseUnit('4.00'), 4)
- self.assertEqual(ParseUnit('4.01'), 8)
- self.assertEqual(ParseUnit('1.5G'), 1536)
- self.assertEqual(ParseUnit('1.8G'), 1844)
- self.assertEqual(ParseUnit('8.28T'), 8682212)
-
- def testSuffixes(self):
- for sep in ('', ' ', ' ', "\t", "\t "):
- for suffix, scale in TestParseUnit.SCALES:
- for func in (lambda x: x, str.lower, str.upper):
- self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
- 1024 * scale)
-
- def testInvalidInput(self):
- for sep in ('-', '_', ',', 'a'):
- for suffix, _ in TestParseUnit.SCALES:
- self.assertRaises(UnitParseError, ParseUnit, '1' + sep + suffix)
-
- for suffix, _ in TestParseUnit.SCALES:
- self.assertRaises(UnitParseError, ParseUnit, '1,3' + suffix)
-
-
-class TestSshKeys(testutils.GanetiTestCase):
- """Test case for the AddAuthorizedKey function"""
-
- KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
- KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
- 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
-
- def setUp(self):
- testutils.GanetiTestCase.setUp(self)
- self.tmpname = self._CreateTempFile()
- handle = open(self.tmpname, 'w')
- try:
- handle.write("%s\n" % TestSshKeys.KEY_A)
- handle.write("%s\n" % TestSshKeys.KEY_B)
- finally:
- handle.close()
-
- def testAddingNewKey(self):
- AddAuthorizedKey(self.tmpname, 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
-
- self.assertFileContent(self.tmpname,
- "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
- 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
- " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
- "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
-
- def testAddingAlmostButNotCompletelyTheSameKey(self):
- AddAuthorizedKey(self.tmpname,
- 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
-
- self.assertFileContent(self.tmpname,
- "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
- 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
- " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
- "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
-
- def testAddingExistingKeyWithSomeMoreSpaces(self):
- AddAuthorizedKey(self.tmpname,
- 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
-
- self.assertFileContent(self.tmpname,
- "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
- 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
- " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
-
- def testRemovingExistingKeyWithSomeMoreSpaces(self):
- RemoveAuthorizedKey(self.tmpname,
- 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
-
- self.assertFileContent(self.tmpname,
- 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
- " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
-
- def testRemovingNonExistingKey(self):
- RemoveAuthorizedKey(self.tmpname,
- 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
-
- self.assertFileContent(self.tmpname,
- "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
- 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
- " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
-
-
-class TestEtcHosts(testutils.GanetiTestCase):
- """Test functions modifying /etc/hosts"""
-
- def setUp(self):
- testutils.GanetiTestCase.setUp(self)
- self.tmpname = self._CreateTempFile()
- handle = open(self.tmpname, 'w')
- try:
- handle.write('# This is a test file for /etc/hosts\n')
- handle.write('127.0.0.1\tlocalhost\n')
- handle.write('192.168.1.1 router gw\n')
- finally:
- handle.close()
-
- def testSettingNewIp(self):
- SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
-
- self.assertFileContent(self.tmpname,
- "# This is a test file for /etc/hosts\n"
- "127.0.0.1\tlocalhost\n"
- "192.168.1.1 router gw\n"
- "1.2.3.4\tmyhost.domain.tld myhost\n")
- self.assertFileMode(self.tmpname, 0644)
-
- def testSettingExistingIp(self):
- SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
- ['myhost'])
-
- self.assertFileContent(self.tmpname,
- "# This is a test file for /etc/hosts\n"
- "127.0.0.1\tlocalhost\n"
- "192.168.1.1\tmyhost.domain.tld myhost\n")
- self.assertFileMode(self.tmpname, 0644)
-
- def testSettingDuplicateName(self):
- SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
-
- self.assertFileContent(self.tmpname,
- "# This is a test file for /etc/hosts\n"
- "127.0.0.1\tlocalhost\n"
- "192.168.1.1 router gw\n"
- "1.2.3.4\tmyhost\n")
- self.assertFileMode(self.tmpname, 0644)
-
- def testRemovingExistingHost(self):
- RemoveEtcHostsEntry(self.tmpname, 'router')
-
- self.assertFileContent(self.tmpname,
- "# This is a test file for /etc/hosts\n"
- "127.0.0.1\tlocalhost\n"
- "192.168.1.1 gw\n")
- self.assertFileMode(self.tmpname, 0644)
-
- def testRemovingSingleExistingHost(self):
- RemoveEtcHostsEntry(self.tmpname, 'localhost')
-
- self.assertFileContent(self.tmpname,
- "# This is a test file for /etc/hosts\n"
- "192.168.1.1 router gw\n")
- self.assertFileMode(self.tmpname, 0644)
-
- def testRemovingNonExistingHost(self):
- RemoveEtcHostsEntry(self.tmpname, 'myhost')
-
- self.assertFileContent(self.tmpname,
- "# This is a test file for /etc/hosts\n"
- "127.0.0.1\tlocalhost\n"
- "192.168.1.1 router gw\n")
- self.assertFileMode(self.tmpname, 0644)
-
- def testRemovingAlias(self):
- RemoveEtcHostsEntry(self.tmpname, 'gw')
-
- self.assertFileContent(self.tmpname,
- "# This is a test file for /etc/hosts\n"
- "127.0.0.1\tlocalhost\n"
- "192.168.1.1 router\n")
- self.assertFileMode(self.tmpname, 0644)
-
-
-class TestShellQuoting(unittest.TestCase):
- """Test case for shell quoting functions"""
-
- def testShellQuote(self):
- self.assertEqual(ShellQuote('abc'), "abc")
- self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
- self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
- self.assertEqual(ShellQuote("a b c"), "'a b c'")
- self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
-
- def testShellQuoteArgs(self):
- self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
- self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
- self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
-
-
-class TestTcpPing(unittest.TestCase):
- """Testcase for TCP version of ping - against listen(2)ing port"""
-
- def setUp(self):
- self.listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.listener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
- self.listenerport = self.listener.getsockname()[1]
- self.listener.listen(1)
-
- def tearDown(self):
- self.listener.shutdown(socket.SHUT_RDWR)
- del self.listener
- del self.listenerport
-
- def testTcpPingToLocalHostAccept(self):
- self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
- self.listenerport,
- timeout=10,
- live_port_needed=True,
- source=constants.LOCALHOST_IP_ADDRESS,
- ),
- "failed to connect to test listener")
-
- self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
- self.listenerport,
- timeout=10,
- live_port_needed=True,
- ),
- "failed to connect to test listener (no source)")
-
-
-class TestTcpPingDeaf(unittest.TestCase):
- """Testcase for TCP version of ping - against non listen(2)ing port"""
-
- def setUp(self):
- self.deaflistener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.deaflistener.bind((constants.LOCALHOST_IP_ADDRESS, 0))
- self.deaflistenerport = self.deaflistener.getsockname()[1]
-
- def tearDown(self):
- del self.deaflistener
- del self.deaflistenerport
-
- def testTcpPingToLocalHostAcceptDeaf(self):
- self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
- self.deaflistenerport,
- timeout=constants.TCP_PING_TIMEOUT,
- live_port_needed=True,
- source=constants.LOCALHOST_IP_ADDRESS,
- ), # need successful connect(2)
- "successfully connected to deaf listener")
-
- self.failIf(TcpPing(constants.LOCALHOST_IP_ADDRESS,
- self.deaflistenerport,
- timeout=constants.TCP_PING_TIMEOUT,
- live_port_needed=True,
- ), # need successful connect(2)
- "successfully connected to deaf listener (no source addr)")
-
- def testTcpPingToLocalHostNoAccept(self):
- self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
- self.deaflistenerport,
- timeout=constants.TCP_PING_TIMEOUT,
- live_port_needed=False,
- source=constants.LOCALHOST_IP_ADDRESS,
- ), # ECONNREFUSED is OK
- "failed to ping alive host on deaf port")
-
- self.assert_(TcpPing(constants.LOCALHOST_IP_ADDRESS,
- self.deaflistenerport,
- timeout=constants.TCP_PING_TIMEOUT,
- live_port_needed=False,
- ), # ECONNREFUSED is OK
- "failed to ping alive host on deaf port (no source addr)")
-
-
-class TestOwnIpAddress(unittest.TestCase):
- """Testcase for OwnIpAddress"""
-
- def testOwnLoopback(self):
- """check having the loopback ip"""
- self.failUnless(OwnIpAddress(constants.LOCALHOST_IP_ADDRESS),
- "Should own the loopback address")
-
- def testNowOwnAddress(self):
- """check that I don't own an address"""
-
- # Network 192.0.2.0/24 is reserved for test/documentation as per
- # RFC 5735, so we *should* not have an address of this range... if
- # this fails, we should extend the test to multiple addresses
- DST_IP = "192.0.2.1"
- self.failIf(OwnIpAddress(DST_IP), "Should not own IP address %s" % DST_IP)
-
-
-def _GetSocketCredentials(path):
- """Connect to a Unix socket and return remote credentials.
-
- """
- sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- try:
- sock.settimeout(10)
- sock.connect(path)
- return utils.GetSocketCredentials(sock)
- finally:
- sock.close()
-
-
-class TestGetSocketCredentials(unittest.TestCase):
- def setUp(self):
- self.tmpdir = tempfile.mkdtemp()
- self.sockpath = utils.PathJoin(self.tmpdir, "sock")
-
- self.listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.listener.settimeout(10)
- self.listener.bind(self.sockpath)
- self.listener.listen(1)
-
- def tearDown(self):
- self.listener.shutdown(socket.SHUT_RDWR)
- self.listener.close()
- shutil.rmtree(self.tmpdir)
-
- def test(self):
- (c2pr, c2pw) = os.pipe()
-
- # Start child process
- child = os.fork()
- if child == 0:
- try:
- data = serializer.DumpJson(_GetSocketCredentials(self.sockpath))
-
- os.write(c2pw, data)
- os.close(c2pw)