import re
import select
import string
+import fcntl
import OpenSSL
import warnings
import distutils.version
self.failUnless(not runresult.failed)
+class TestStartDaemon(testutils.GanetiTestCase):
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
+ self.tmpfile = os.path.join(self.tmpdir, "test")
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+
+ def testShell(self):
+ utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
+ self._wait(self.tmpfile, 60.0, "Hello World")
+
+ def testShellOutput(self):
+ utils.StartDaemon("echo Hello World", output=self.tmpfile)
+ self._wait(self.tmpfile, 60.0, "Hello World")
+
+ def testNoShellNoOutput(self):
+ utils.StartDaemon(["pwd"])
+
+ def testNoShellNoOutputTouch(self):
+ testfile = os.path.join(self.tmpdir, "check")
+ self.failIf(os.path.exists(testfile))
+ utils.StartDaemon(["touch", testfile])
+ self._wait(testfile, 60.0, "")
+
+ def testNoShellOutput(self):
+ utils.StartDaemon(["pwd"], output=self.tmpfile)
+ self._wait(self.tmpfile, 60.0, "/")
+
+ def testNoShellOutputCwd(self):
+ utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
+ self._wait(self.tmpfile, 60.0, os.getcwd())
+
+ def testShellEnv(self):
+ utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
+ env={ "GNT_TEST_VAR": "Hello World", })
+ self._wait(self.tmpfile, 60.0, "Hello World")
+
+ def testNoShellEnv(self):
+ utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
+ env={ "GNT_TEST_VAR": "Hello World", })
+ self._wait(self.tmpfile, 60.0, "Hello World")
+
+ def testOutputFd(self):
+ fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
+ try:
+ utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
+ finally:
+ os.close(fd)
+ self._wait(self.tmpfile, 60.0, os.getcwd())
+
+ def testPid(self):
+ pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
+ self._wait(self.tmpfile, 60.0, str(pid))
+
+ def testPidFile(self):
+ pidfile = os.path.join(self.tmpdir, "pid")
+ checkfile = os.path.join(self.tmpdir, "abort")
+
+ pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
+ output=self.tmpfile)
+ try:
+ fd = os.open(pidfile, os.O_RDONLY)
+ try:
+ # Check file is locked
+ self.assertRaises(errors.LockError, utils.LockFile, fd)
+
+ pidtext = os.read(fd, 100)
+ finally:
+ os.close(fd)
+
+ self.assertEqual(int(pidtext.strip()), pid)
+
+ self.assert_(utils.IsProcessAlive(pid))
+ finally:
+ # No matter what happens, kill daemon
+ utils.KillProcess(pid, timeout=5.0, waitpid=False)
+ self.failIf(utils.IsProcessAlive(pid))
+
+ self.assertEqual(utils.ReadFile(self.tmpfile), "")
+
+ def _wait(self, path, timeout, expected):
+ # Due to the asynchronous nature of daemon processes, polling is necessary.
+ # A timeout makes sure the test doesn't hang forever.
+ def _CheckFile():
+ if not (os.path.isfile(path) and
+ utils.ReadFile(path).strip() == expected):
+ raise utils.RetryAgain()
+
+ try:
+ utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
+ except utils.RetryTimeout:
+ self.fail("Apparently the daemon didn't run in %s seconds and/or"
+ " didn't write the correct output" % timeout)
+
+ def testError(self):
+ self.assertRaises(errors.OpExecError, utils.StartDaemon,
+ ["./does-NOT-EXIST/here/0123456789"])
+ self.assertRaises(errors.OpExecError, utils.StartDaemon,
+ ["./does-NOT-EXIST/here/0123456789"],
+ output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
+ self.assertRaises(errors.OpExecError, utils.StartDaemon,
+ ["./does-NOT-EXIST/here/0123456789"],
+ cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
+ self.assertRaises(errors.OpExecError, utils.StartDaemon,
+ ["./does-NOT-EXIST/here/0123456789"],
+ output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
+
+ fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
+ try:
+ self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
+ ["./does-NOT-EXIST/here/0123456789"],
+ output=self.tmpfile, output_fd=fd)
+ finally:
+ os.close(fd)
+
+
+class TestSetCloseOnExecFlag(unittest.TestCase):
+ """Tests for SetCloseOnExecFlag"""
+
+ def setUp(self):
+ self.tmpfile = tempfile.TemporaryFile()
+
+ def testEnable(self):
+ utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
+ self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
+ fcntl.FD_CLOEXEC)
+
+ def testDisable(self):
+ utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
+ self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
+ fcntl.FD_CLOEXEC)
+
+
+class TestSetNonblockFlag(unittest.TestCase):
+ def setUp(self):
+ self.tmpfile = tempfile.TemporaryFile()
+
+ def testEnable(self):
+ utils.SetNonblockFlag(self.tmpfile.fileno(), True)
+ self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
+ os.O_NONBLOCK)
+
+ def testDisable(self):
+ utils.SetNonblockFlag(self.tmpfile.fileno(), False)
+ self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
+ os.O_NONBLOCK)
+
+
class TestRemoveFile(unittest.TestCase):
"""Test case for the RemoveFile function"""
os.unlink(self.tmpfile)
os.rmdir(self.tmpdir)
-
def testIgnoreDirs(self):
"""Test that RemoveFile() ignores directories"""
self.assertEqual(None, RemoveFile(self.tmpdir))
-
def testIgnoreNotExisting(self):
"""Test that RemoveFile() ignores non-existing files"""
RemoveFile(self.tmpfile)
RemoveFile(self.tmpfile)
-
def testRemoveFile(self):
"""Test that RemoveFile does remove a file"""
RemoveFile(self.tmpfile)
if os.path.exists(self.tmpfile):
self.fail("File '%s' not removed" % self.tmpfile)
-
def testRemoveSymlink(self):
"""Test that RemoveFile does remove symlinks"""
symlink = self.tmpdir + "/symlink"
class TestIsAbsNormPath(unittest.TestCase):
- """Testing case for IsProcessAlive"""
+ """Testing case for IsNormAbsPath"""
def _pathTestHelper(self, path, result):
if result:
self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
+class TestGenerateSelfSignedX509Cert(unittest.TestCase):
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+
+ def _checkRsaPrivateKey(self, key):
+ lines = key.splitlines()
+ return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
+ "-----END RSA PRIVATE KEY-----" in lines)
+
+ def _checkCertificate(self, cert):
+ lines = cert.splitlines()
+ return ("-----BEGIN CERTIFICATE-----" in lines and
+ "-----END CERTIFICATE-----" in lines)
+
+ def test(self):
+ for common_name in [None, ".", "Ganeti", "node1.example.com"]:
+ (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
+ self._checkRsaPrivateKey(key_pem)
+ self._checkCertificate(cert_pem)
+
+ key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
+ key_pem)
+ self.assert_(key.bits() >= 1024)
+ self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
+ self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
+
+ x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ cert_pem)
+ self.failIf(x509.has_expired())
+ self.assertEqual(x509.get_issuer().CN, common_name)
+ self.assertEqual(x509.get_subject().CN, common_name)
+ self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
+
+ def testLegacy(self):
+ cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
+
+ utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
+
+ cert1 = utils.ReadFile(cert1_filename)
+
+ self.assert_(self._checkRsaPrivateKey(cert1))
+ self.assert_(self._checkCertificate(cert1))
+
+
class TestPathJoin(unittest.TestCase):
"""Testing case for PathJoin"""