Merge branch 'stable-2.7' into stable-2.8
[ganeti-local] / test / py / ganeti.utils.wrapper_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.utils.wrapper"""
23
24 import errno
25 import fcntl
26 import os
27 import socket
28 import tempfile
29 import unittest
30 import shutil
31
32 from ganeti import constants
33 from ganeti import utils
34
35 import testutils
36
37
38 class TestSetCloseOnExecFlag(unittest.TestCase):
39   """Tests for SetCloseOnExecFlag"""
40
41   def setUp(self):
42     self.tmpfile = tempfile.TemporaryFile()
43
44   def testEnable(self):
45     utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
46     self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
47                     fcntl.FD_CLOEXEC)
48
49   def testDisable(self):
50     utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
51     self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
52                 fcntl.FD_CLOEXEC)
53
54
55 class TestSetNonblockFlag(unittest.TestCase):
56   def setUp(self):
57     self.tmpfile = tempfile.TemporaryFile()
58
59   def testEnable(self):
60     utils.SetNonblockFlag(self.tmpfile.fileno(), True)
61     self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
62                     os.O_NONBLOCK)
63
64   def testDisable(self):
65     utils.SetNonblockFlag(self.tmpfile.fileno(), False)
66     self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
67                 os.O_NONBLOCK)
68
69
70 class TestIgnoreProcessNotFound(unittest.TestCase):
71   @staticmethod
72   def _WritePid(fd):
73     os.write(fd, str(os.getpid()))
74     os.close(fd)
75     return True
76
77   def test(self):
78     (pid_read_fd, pid_write_fd) = os.pipe()
79
80     # Start short-lived process which writes its PID to pipe
81     self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
82     os.close(pid_write_fd)
83
84     # Read PID from pipe
85     pid = int(os.read(pid_read_fd, 1024))
86     os.close(pid_read_fd)
87
88     # Try to send signal to process which exited recently
89     self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
90
91
92 class TestIgnoreSignals(unittest.TestCase):
93   """Test the IgnoreSignals decorator"""
94
95   @staticmethod
96   def _Raise(exception):
97     raise exception
98
99   @staticmethod
100   def _Return(rval):
101     return rval
102
103   def testIgnoreSignals(self):
104     sock_err_intr = socket.error(errno.EINTR, "Message")
105     sock_err_inval = socket.error(errno.EINVAL, "Message")
106
107     env_err_intr = EnvironmentError(errno.EINTR, "Message")
108     env_err_inval = EnvironmentError(errno.EINVAL, "Message")
109
110     self.assertRaises(socket.error, self._Raise, sock_err_intr)
111     self.assertRaises(socket.error, self._Raise, sock_err_inval)
112     self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
113     self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
114
115     self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
116     self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
117     self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
118                       sock_err_inval)
119     self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
120                       env_err_inval)
121
122     self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
123     self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
124
125
126 class TestIsExecutable(unittest.TestCase):
127   def setUp(self):
128     self.tmpdir = tempfile.mkdtemp()
129
130   def tearDown(self):
131     shutil.rmtree(self.tmpdir)
132
133   def testNonExisting(self):
134     fname = utils.PathJoin(self.tmpdir, "file")
135     assert not os.path.exists(fname)
136     self.assertFalse(utils.IsExecutable(fname))
137
138   def testNoFile(self):
139     path = utils.PathJoin(self.tmpdir, "something")
140     os.mkdir(path)
141     assert os.path.isdir(path)
142     self.assertFalse(utils.IsExecutable(path))
143
144   def testExecutable(self):
145     fname = utils.PathJoin(self.tmpdir, "file")
146     utils.WriteFile(fname, data="#!/bin/bash", mode=0700)
147     assert os.path.exists(fname)
148     self.assertTrue(utils.IsExecutable(fname))
149
150     self.assertTrue(self._TestSymlink(fname))
151
152   def testFileNotExecutable(self):
153     fname = utils.PathJoin(self.tmpdir, "file")
154     utils.WriteFile(fname, data="#!/bin/bash", mode=0600)
155     assert os.path.exists(fname)
156     self.assertFalse(utils.IsExecutable(fname))
157
158     self.assertFalse(self._TestSymlink(fname))
159
160   def _TestSymlink(self, fname):
161     assert os.path.exists(fname)
162
163     linkname = utils.PathJoin(self.tmpdir, "cmd")
164     os.symlink(fname, linkname)
165
166     assert os.path.islink(linkname)
167
168     return utils.IsExecutable(linkname)
169
170
171 if __name__ == "__main__":
172   testutils.GanetiTestProgram()