-from ganeti import serializer
-from ganeti.utils import IsProcessAlive, RunCmd, \
- RemoveFile, MatchNameComponent, FormatUnit, \
- ParseUnit, AddAuthorizedKey, RemoveAuthorizedKey, \
- ShellQuote, ShellQuoteArgs, TcpPing, ListVisibleFiles, \
- SetEtcHostsEntry, RemoveEtcHostsEntry, FirstFree, OwnIpAddress, \
- TailFile, ForceDictType, SafeEncode, IsNormAbsPath, FormatTime, \
- UnescapeAndSplit, RunParts, PathJoin, HostInfo
-
-from ganeti.errors import LockError, UnitParseError, GenericError, \
- ProgrammerError, OpPrereqError
-
-
-class TestIsProcessAlive(unittest.TestCase):
- """Testing case for IsProcessAlive"""
-
- def testExists(self):
- mypid = os.getpid()
- self.assert_(IsProcessAlive(mypid),
- "can't find myself running")
-
- def testNotExisting(self):
- pid_non_existing = os.fork()
- if pid_non_existing == 0:
- os._exit(0)
- elif pid_non_existing < 0:
- raise SystemError("can't fork")
- os.waitpid(pid_non_existing, 0)
- self.assert_(not IsProcessAlive(pid_non_existing),
- "nonexisting process detected")
-
-
-class TestPidFileFunctions(unittest.TestCase):
- """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
-
- def setUp(self):
- self.dir = tempfile.mkdtemp()
- self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
- utils.DaemonPidFileName = self.f_dpn
-
- def testPidFileFunctions(self):
- pid_file = self.f_dpn('test')
- utils.WritePidFile('test')
- self.failUnless(os.path.exists(pid_file),
- "PID file should have been created")
- read_pid = utils.ReadPidFile(pid_file)
- self.failUnlessEqual(read_pid, os.getpid())
- self.failUnless(utils.IsProcessAlive(read_pid))
- self.failUnlessRaises(GenericError, utils.WritePidFile, 'test')
- utils.RemovePidFile('test')
- self.failIf(os.path.exists(pid_file),
- "PID file should not exist anymore")
- self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
- "ReadPidFile should return 0 for missing pid file")
- fh = open(pid_file, "w")
- fh.write("blah\n")
- fh.close()
- self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
- "ReadPidFile should return 0 for invalid pid file")
- utils.RemovePidFile('test')
- self.failIf(os.path.exists(pid_file),
- "PID file should not exist anymore")
-
- def testKill(self):
- pid_file = self.f_dpn('child')
- r_fd, w_fd = os.pipe()
- new_pid = os.fork()
- if new_pid == 0: #child
- utils.WritePidFile('child')
- os.write(w_fd, 'a')
- signal.pause()
- os._exit(0)
- return
- # else we are in the parent
- # wait until the child has written the pid file
- os.read(r_fd, 1)
- read_pid = utils.ReadPidFile(pid_file)
- self.failUnlessEqual(read_pid, new_pid)
- self.failUnless(utils.IsProcessAlive(new_pid))
- utils.KillProcess(new_pid, waitpid=True)
- self.failIf(utils.IsProcessAlive(new_pid))
- utils.RemovePidFile('child')
- self.failUnlessRaises(ProgrammerError, utils.KillProcess, 0)
-
- def tearDown(self):
- for name in os.listdir(self.dir):
- os.unlink(os.path.join(self.dir, name))
- os.rmdir(self.dir)
-
-
-class TestRunCmd(testutils.GanetiTestCase):
- """Testing case for the RunCmd function"""
-
- def setUp(self):
- testutils.GanetiTestCase.setUp(self)
- self.magic = time.ctime() + " ganeti test"
- self.fname = self._CreateTempFile()
-
- def testOk(self):
- """Test successful exit code"""
- result = RunCmd("/bin/sh -c 'exit 0'")
- self.assertEqual(result.exit_code, 0)
- self.assertEqual(result.output, "")
-
- def testFail(self):
- """Test fail exit code"""
- result = RunCmd("/bin/sh -c 'exit 1'")
- self.assertEqual(result.exit_code, 1)
- self.assertEqual(result.output, "")
-
- def testStdout(self):
- """Test standard output"""
- cmd = 'echo -n "%s"' % self.magic
- result = RunCmd("/bin/sh -c '%s'" % cmd)
- self.assertEqual(result.stdout, self.magic)
- result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
- self.assertEqual(result.output, "")
- self.assertFileContent(self.fname, self.magic)
-
- def testStderr(self):
- """Test standard error"""
- cmd = 'echo -n "%s"' % self.magic
- result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
- self.assertEqual(result.stderr, self.magic)
- result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
- self.assertEqual(result.output, "")
- self.assertFileContent(self.fname, self.magic)
-
- def testCombined(self):
- """Test combined output"""
- cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
- expected = "A" + self.magic + "B" + self.magic
- result = RunCmd("/bin/sh -c '%s'" % cmd)
- self.assertEqual(result.output, expected)
- result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
- self.assertEqual(result.output, "")
- self.assertFileContent(self.fname, expected)
-
- def testSignal(self):
- """Test signal"""
- result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
- self.assertEqual(result.signal, 15)
- self.assertEqual(result.output, "")
-
- def testListRun(self):
- """Test list runs"""
- result = RunCmd(["true"])
- self.assertEqual(result.signal, None)
- self.assertEqual(result.exit_code, 0)
- result = RunCmd(["/bin/sh", "-c", "exit 1"])
- self.assertEqual(result.signal, None)
- self.assertEqual(result.exit_code, 1)
- result = RunCmd(["echo", "-n", self.magic])
- self.assertEqual(result.signal, None)
- self.assertEqual(result.exit_code, 0)
- self.assertEqual(result.stdout, self.magic)
-
- def testFileEmptyOutput(self):
- """Test file output"""
- result = RunCmd(["true"], output=self.fname)
- self.assertEqual(result.signal, None)
- self.assertEqual(result.exit_code, 0)
- self.assertFileContent(self.fname, "")
-
- def testLang(self):
- """Test locale environment"""
- old_env = os.environ.copy()
- try:
- os.environ["LANG"] = "en_US.UTF-8"
- os.environ["LC_ALL"] = "en_US.UTF-8"
- result = RunCmd(["locale"])
- for line in result.output.splitlines():
- key, value = line.split("=", 1)
- # Ignore these variables, they're overridden by LC_ALL
- if key == "LANG" or key == "LANGUAGE":
- continue
- self.failIf(value and value != "C" and value != '"C"',
- "Variable %s is set to the invalid value '%s'" % (key, value))
- finally:
- os.environ = old_env
-
- def testDefaultCwd(self):
- """Test default working directory"""
- self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
-
- def testCwd(self):
- """Test default working directory"""
- self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
- self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
- cwd = os.getcwd()
- self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
-
- def testResetEnv(self):
- """Test environment reset functionality"""
- self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
- self.failUnlessEqual(RunCmd(["env"], reset_env=True,
- env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
-
-
-class TestRunParts(unittest.TestCase):
- """Testing case for the RunParts function"""
-
- def setUp(self):
- self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
-
- def tearDown(self):
- shutil.rmtree(self.rundir)
-
- def testEmpty(self):
- """Test on an empty dir"""
- self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
-
- def testSkipWrongName(self):
- """Test that wrong files are skipped"""
- fname = os.path.join(self.rundir, "00test.dot")
- utils.WriteFile(fname, data="")
- os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
- relname = os.path.basename(fname)
- self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
- [(relname, constants.RUNPARTS_SKIP, None)])
-
- def testSkipNonExec(self):
- """Test that non executable files are skipped"""
- fname = os.path.join(self.rundir, "00test")
- utils.WriteFile(fname, data="")
- relname = os.path.basename(fname)
- self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
- [(relname, constants.RUNPARTS_SKIP, None)])
-
- def testError(self):
- """Test error on a broken executable"""
- fname = os.path.join(self.rundir, "00test")
- utils.WriteFile(fname, data="")
- os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
- (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
- self.failUnlessEqual(relname, os.path.basename(fname))
- self.failUnlessEqual(status, constants.RUNPARTS_ERR)
- self.failUnless(error)
-
- def testSorted(self):
- """Test executions are sorted"""
- files = []
- files.append(os.path.join(self.rundir, "64test"))
- files.append(os.path.join(self.rundir, "00test"))
- files.append(os.path.join(self.rundir, "42test"))
-
- for fname in files:
- utils.WriteFile(fname, data="")
-
- results = RunParts(self.rundir, reset_env=True)
-
- for fname in sorted(files):
- self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
-
- def testOk(self):
- """Test correct execution"""
- fname = os.path.join(self.rundir, "00test")
- utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
- os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
- (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
- self.failUnlessEqual(relname, os.path.basename(fname))
- self.failUnlessEqual(status, constants.RUNPARTS_RUN)
- self.failUnlessEqual(runresult.stdout, "ciao")
-
- def testRunFail(self):
- """Test correct execution, with run failure"""
- fname = os.path.join(self.rundir, "00test")
- utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
- os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
- (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
- self.failUnlessEqual(relname, os.path.basename(fname))
- self.failUnlessEqual(status, constants.RUNPARTS_RUN)
- self.failUnlessEqual(runresult.exit_code, 1)
- self.failUnless(runresult.failed)
-
- def testRunMix(self):
- files = []
- files.append(os.path.join(self.rundir, "00test"))
- files.append(os.path.join(self.rundir, "42test"))
- files.append(os.path.join(self.rundir, "64test"))
- files.append(os.path.join(self.rundir, "99test"))
-
- files.sort()
-
- # 1st has errors in execution
- utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
- os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
-
- # 2nd is skipped
- utils.WriteFile(files[1], data="")
-
- # 3rd cannot execute properly
- utils.WriteFile(files[2], data="")
- os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
-
- # 4th execs
- utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
- os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
-
- results = RunParts(self.rundir, reset_env=True)
-
- (relname, status, runresult) = results[0]
- self.failUnlessEqual(relname, os.path.basename(files[0]))
- self.failUnlessEqual(status, constants.RUNPARTS_RUN)
- self.failUnlessEqual(runresult.exit_code, 1)
- self.failUnless(runresult.failed)
-
- (relname, status, runresult) = results[1]
- self.failUnlessEqual(relname, os.path.basename(files[1]))
- self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
- self.failUnlessEqual(runresult, None)
-
- (relname, status, runresult) = results[2]
- self.failUnlessEqual(relname, os.path.basename(files[2]))
- self.failUnlessEqual(status, constants.RUNPARTS_ERR)
- self.failUnless(runresult)
-
- (relname, status, runresult) = results[3]
- self.failUnlessEqual(relname, os.path.basename(files[3]))
- self.failUnlessEqual(status, constants.RUNPARTS_RUN)
- self.failUnlessEqual(runresult.output, "ciao")
- self.failUnlessEqual(runresult.exit_code, 0)
- self.failUnless(not runresult.failed)
-
-
-class TestRemoveFile(unittest.TestCase):
- """Test case for the RemoveFile function"""
-
- def setUp(self):
- """Create a temp dir and file for each case"""
- self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
- fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
- os.close(fd)
-
- def tearDown(self):
- if os.path.exists(self.tmpfile):
- os.unlink(self.tmpfile)
- os.rmdir(self.tmpdir)
-
-
- def testIgnoreDirs(self):
- """Test that RemoveFile() ignores directories"""
- self.assertEqual(None, RemoveFile(self.tmpdir))
-
-
- def testIgnoreNotExisting(self):
- """Test that RemoveFile() ignores non-existing files"""
- RemoveFile(self.tmpfile)
- RemoveFile(self.tmpfile)
-
-
- def testRemoveFile(self):
- """Test that RemoveFile does remove a file"""
- RemoveFile(self.tmpfile)
- if os.path.exists(self.tmpfile):
- self.fail("File '%s' not removed" % self.tmpfile)
-
-
- def testRemoveSymlink(self):
- """Test that RemoveFile does remove symlinks"""
- symlink = self.tmpdir + "/symlink"
- os.symlink("no-such-file", symlink)
- RemoveFile(symlink)
- if os.path.exists(symlink):
- self.fail("File '%s' not removed" % symlink)
- os.symlink(self.tmpfile, symlink)
- RemoveFile(symlink)
- if os.path.exists(symlink):
- self.fail("File '%s' not removed" % symlink)
-
-
-class TestRename(unittest.TestCase):
- """Test case for RenameFile"""
-
- def setUp(self):
- """Create a temporary directory"""
- self.tmpdir = tempfile.mkdtemp()
- self.tmpfile = os.path.join(self.tmpdir, "test1")
-
- # Touch the file
- open(self.tmpfile, "w").close()
-
- def tearDown(self):
- """Remove temporary directory"""
- shutil.rmtree(self.tmpdir)