bash_completion: Enable extglob while parsing file
[ganeti-local] / test / tempfile_fork_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing utils.ResetTempfileModule"""
23
24 import os
25 import sys
26 import errno
27 import shutil
28 import tempfile
29 import unittest
30 import logging
31
32 from ganeti import utils
33
34 import testutils
35
36
37 # This constant is usually at a much higher value. Setting it lower for test
38 # purposes.
39 tempfile.TMP_MAX = 3
40
41
42 class TestResetTempfileModule(unittest.TestCase):
43   def setUp(self):
44     self.tmpdir = tempfile.mkdtemp()
45
46   def tearDown(self):
47     shutil.rmtree(self.tmpdir)
48
49   def testNoReset(self):
50     if ((sys.hexversion >= 0x020703F0 and sys.hexversion < 0x03000000) or
51         sys.hexversion >= 0x030203F0):
52       # We can't test the no_reset case on Python 2.7+
53       return
54     # evil Debian sid...
55     if (hasattr(tempfile._RandomNameSequence, "rng") and
56         type(tempfile._RandomNameSequence.rng) == property):
57       return
58     self._Test(False)
59
60   def testReset(self):
61     self._Test(True)
62
63   def _Test(self, reset):
64     self.failIf(tempfile.TMP_MAX > 10)
65
66     # Initialize tempfile module
67     (fd, _) = tempfile.mkstemp(dir=self.tmpdir, prefix="init.", suffix="")
68     os.close(fd)
69
70     (notify_read, notify_write) = os.pipe()
71
72     pid = os.fork()
73     if pid == 0:
74       # Child
75       try:
76         try:
77           if reset:
78             utils.ResetTempfileModule()
79
80           os.close(notify_write)
81
82           # Wait for parent to close pipe
83           os.read(notify_read, 1)
84
85           try:
86             # This is a short-lived process, not caring about closing file
87             # descriptors
88             (_, path) = tempfile.mkstemp(dir=self.tmpdir,
89                                          prefix="test.", suffix="")
90           except EnvironmentError, err:
91             if err.errno == errno.EEXIST:
92               # Couldnt' create temporary file (e.g. because we run out of
93               # retries)
94               os._exit(2)
95             raise
96
97           logging.debug("Child created %s", path)
98
99           os._exit(0)
100         except Exception:
101           logging.exception("Unhandled error")
102       finally:
103         os._exit(1)
104
105     # Parent
106     os.close(notify_read)
107
108     # Create parent's temporary files
109     for _ in range(tempfile.TMP_MAX):
110       (fd, path) = tempfile.mkstemp(dir=self.tmpdir,
111                                     prefix="test.", suffix="")
112       os.close(fd)
113       logging.debug("Parent created %s", path)
114
115     # Notify child by closing pipe
116     os.close(notify_write)
117
118     (_, status) = os.waitpid(pid, 0)
119
120     self.failIf(os.WIFSIGNALED(status))
121
122     if reset:
123       # If the tempfile module was reset, it should not fail to create
124       # temporary files
125       expected = 0
126     else:
127       expected = 2
128
129     self.assertEqual(os.WEXITSTATUS(status), expected)
130
131
132 if __name__ == "__main__":
133   testutils.GanetiTestProgram()