4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.utils.wrapper"""
32 from ganeti import constants
33 from ganeti import utils
38 class TestSetCloseOnExecFlag(unittest.TestCase):
39 """Tests for SetCloseOnExecFlag"""
42 self.tmpfile = tempfile.TemporaryFile()
45 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
46 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
49 def testDisable(self):
50 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
51 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
55 class TestSetNonblockFlag(unittest.TestCase):
57 self.tmpfile = tempfile.TemporaryFile()
60 utils.SetNonblockFlag(self.tmpfile.fileno(), True)
61 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
64 def testDisable(self):
65 utils.SetNonblockFlag(self.tmpfile.fileno(), False)
66 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
70 class TestIgnoreProcessNotFound(unittest.TestCase):
73 os.write(fd, str(os.getpid()))
78 (pid_read_fd, pid_write_fd) = os.pipe()
80 # Start short-lived process which writes its PID to pipe
81 self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
82 os.close(pid_write_fd)
85 pid = int(os.read(pid_read_fd, 1024))
88 # Try to send signal to process which exited recently
89 self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
92 class TestIgnoreSignals(unittest.TestCase):
93 """Test the IgnoreSignals decorator"""
96 def _Raise(exception):
103 def testIgnoreSignals(self):
104 sock_err_intr = socket.error(errno.EINTR, "Message")
105 sock_err_inval = socket.error(errno.EINVAL, "Message")
107 env_err_intr = EnvironmentError(errno.EINTR, "Message")
108 env_err_inval = EnvironmentError(errno.EINVAL, "Message")
110 self.assertRaises(socket.error, self._Raise, sock_err_intr)
111 self.assertRaises(socket.error, self._Raise, sock_err_inval)
112 self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
113 self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
115 self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
116 self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
117 self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
119 self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
122 self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
123 self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
126 class TestIsExecutable(unittest.TestCase):
128 self.tmpdir = tempfile.mkdtemp()
131 shutil.rmtree(self.tmpdir)
133 def testNonExisting(self):
134 fname = utils.PathJoin(self.tmpdir, "file")
135 assert not os.path.exists(fname)
136 self.assertFalse(utils.IsExecutable(fname))
138 def testNoFile(self):
139 path = utils.PathJoin(self.tmpdir, "something")
141 assert os.path.isdir(path)
142 self.assertFalse(utils.IsExecutable(path))
144 def testExecutable(self):
145 fname = utils.PathJoin(self.tmpdir, "file")
146 utils.WriteFile(fname, data="#!/bin/bash", mode=0700)
147 assert os.path.exists(fname)
148 self.assertTrue(utils.IsExecutable(fname))
150 self.assertTrue(self._TestSymlink(fname))
152 def testFileNotExecutable(self):
153 fname = utils.PathJoin(self.tmpdir, "file")
154 utils.WriteFile(fname, data="#!/bin/bash", mode=0600)
155 assert os.path.exists(fname)
156 self.assertFalse(utils.IsExecutable(fname))
158 self.assertFalse(self._TestSymlink(fname))
160 def _TestSymlink(self, fname):
161 assert os.path.exists(fname)
163 linkname = utils.PathJoin(self.tmpdir, "cmd")
164 os.symlink(fname, linkname)
166 assert os.path.islink(linkname)
168 return utils.IsExecutable(linkname)
171 if __name__ == "__main__":
172 testutils.GanetiTestProgram()