Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.wrapper_unittest.py @ 10b86782

History | View | Annotate | Download (5.1 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.wrapper"""
23

    
24
import errno
25
import fcntl
26
import os
27
import socket
28
import tempfile
29
import unittest
30
import shutil
31

    
32
from ganeti import constants
33
from ganeti import utils
34

    
35
import testutils
36

    
37

    
38
class TestSetCloseOnExecFlag(unittest.TestCase):
39
  """Tests for SetCloseOnExecFlag"""
40

    
41
  def setUp(self):
42
    self.tmpfile = tempfile.TemporaryFile()
43

    
44
  def testEnable(self):
45
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
46
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
47
                    fcntl.FD_CLOEXEC)
48

    
49
  def testDisable(self):
50
    utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
51
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
52
                fcntl.FD_CLOEXEC)
53

    
54

    
55
class TestSetNonblockFlag(unittest.TestCase):
56
  def setUp(self):
57
    self.tmpfile = tempfile.TemporaryFile()
58

    
59
  def testEnable(self):
60
    utils.SetNonblockFlag(self.tmpfile.fileno(), True)
61
    self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
62
                    os.O_NONBLOCK)
63

    
64
  def testDisable(self):
65
    utils.SetNonblockFlag(self.tmpfile.fileno(), False)
66
    self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
67
                os.O_NONBLOCK)
68

    
69

    
70
class TestIgnoreProcessNotFound(unittest.TestCase):
71
  @staticmethod
72
  def _WritePid(fd):
73
    os.write(fd, str(os.getpid()))
74
    os.close(fd)
75
    return True
76

    
77
  def test(self):
78
    (pid_read_fd, pid_write_fd) = os.pipe()
79

    
80
    # Start short-lived process which writes its PID to pipe
81
    self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
82
    os.close(pid_write_fd)
83

    
84
    # Read PID from pipe
85
    pid = int(os.read(pid_read_fd, 1024))
86
    os.close(pid_read_fd)
87

    
88
    # Try to send signal to process which exited recently
89
    self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
90

    
91

    
92
class TestIgnoreSignals(unittest.TestCase):
93
  """Test the IgnoreSignals decorator"""
94

    
95
  @staticmethod
96
  def _Raise(exception):
97
    raise exception
98

    
99
  @staticmethod
100
  def _Return(rval):
101
    return rval
102

    
103
  def testIgnoreSignals(self):
104
    sock_err_intr = socket.error(errno.EINTR, "Message")
105
    sock_err_inval = socket.error(errno.EINVAL, "Message")
106

    
107
    env_err_intr = EnvironmentError(errno.EINTR, "Message")
108
    env_err_inval = EnvironmentError(errno.EINVAL, "Message")
109

    
110
    self.assertRaises(socket.error, self._Raise, sock_err_intr)
111
    self.assertRaises(socket.error, self._Raise, sock_err_inval)
112
    self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
113
    self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
114

    
115
    self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
116
    self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
117
    self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
118
                      sock_err_inval)
119
    self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
120
                      env_err_inval)
121

    
122
    self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
123
    self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
124

    
125

    
126
class TestIsExecutable(unittest.TestCase):
127
  def setUp(self):
128
    self.tmpdir = tempfile.mkdtemp()
129

    
130
  def tearDown(self):
131
    shutil.rmtree(self.tmpdir)
132

    
133
  def testNonExisting(self):
134
    fname = utils.PathJoin(self.tmpdir, "file")
135
    assert not os.path.exists(fname)
136
    self.assertFalse(utils.IsExecutable(fname))
137

    
138
  def testNoFile(self):
139
    path = utils.PathJoin(self.tmpdir, "something")
140
    os.mkdir(path)
141
    assert os.path.isdir(path)
142
    self.assertFalse(utils.IsExecutable(path))
143

    
144
  def testExecutable(self):
145
    fname = utils.PathJoin(self.tmpdir, "file")
146
    utils.WriteFile(fname, data="#!/bin/bash", mode=0700)
147
    assert os.path.exists(fname)
148
    self.assertTrue(utils.IsExecutable(fname))
149

    
150
    self.assertTrue(self._TestSymlink(fname))
151

    
152
  def testFileNotExecutable(self):
153
    fname = utils.PathJoin(self.tmpdir, "file")
154
    utils.WriteFile(fname, data="#!/bin/bash", mode=0600)
155
    assert os.path.exists(fname)
156
    self.assertFalse(utils.IsExecutable(fname))
157

    
158
    self.assertFalse(self._TestSymlink(fname))
159

    
160
  def _TestSymlink(self, fname):
161
    assert os.path.exists(fname)
162

    
163
    linkname = utils.PathJoin(self.tmpdir, "cmd")
164
    os.symlink(fname, linkname)
165

    
166
    assert os.path.islink(linkname)
167

    
168
    return utils.IsExecutable(linkname)
169

    
170

    
171
if __name__ == "__main__":
172
  testutils.GanetiTestProgram()