Statistics
| Branch: | Tag: | Revision:

root / test / py / tempfile_fork_unittest.py @ d01e51a5

History | View | Annotate | Download (3.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing utils.ResetTempfileModule"""
23

    
24
import os
25
import sys
26
import errno
27
import shutil
28
import tempfile
29
import unittest
30
import logging
31

    
32
from ganeti import utils
33

    
34
import testutils
35

    
36

    
37
# This constant is usually at a much higher value. Setting it lower for test
38
# purposes.
39
tempfile.TMP_MAX = 3
40

    
41

    
42
class TestResetTempfileModule(unittest.TestCase):
43
  def setUp(self):
44
    self.tmpdir = tempfile.mkdtemp()
45

    
46
  def tearDown(self):
47
    shutil.rmtree(self.tmpdir)
48

    
49
  def testNoReset(self):
50
    if ((sys.hexversion >= 0x020703F0 and sys.hexversion < 0x03000000) or
51
        sys.hexversion >= 0x030203F0):
52
      # We can't test the no_reset case on Python 2.7+
53
      return
54
    # evil Debian sid...
55
    if (hasattr(tempfile._RandomNameSequence, "rng") and
56
        type(tempfile._RandomNameSequence.rng) == property):
57
      return
58
    self._Test(False)
59

    
60
  def testReset(self):
61
    self._Test(True)
62

    
63
  def _Test(self, reset):
64
    self.failIf(tempfile.TMP_MAX > 10)
65

    
66
    # Initialize tempfile module
67
    (fd, _) = tempfile.mkstemp(dir=self.tmpdir, prefix="init.", suffix="")
68
    os.close(fd)
69

    
70
    (notify_read, notify_write) = os.pipe()
71

    
72
    pid = os.fork()
73
    if pid == 0:
74
      # Child
75
      try:
76
        try:
77
          if reset:
78
            utils.ResetTempfileModule()
79

    
80
          os.close(notify_write)
81

    
82
          # Wait for parent to close pipe
83
          os.read(notify_read, 1)
84

    
85
          try:
86
            # This is a short-lived process, not caring about closing file
87
            # descriptors
88
            (_, path) = tempfile.mkstemp(dir=self.tmpdir,
89
                                         prefix="test.", suffix="")
90
          except EnvironmentError, err:
91
            if err.errno == errno.EEXIST:
92
              # Couldnt' create temporary file (e.g. because we run out of
93
              # retries)
94
              os._exit(2)
95
            raise
96

    
97
          logging.debug("Child created %s", path)
98

    
99
          os._exit(0)
100
        except Exception:
101
          logging.exception("Unhandled error")
102
      finally:
103
        os._exit(1)
104

    
105
    # Parent
106
    os.close(notify_read)
107

    
108
    # Create parent's temporary files
109
    for _ in range(tempfile.TMP_MAX):
110
      (fd, path) = tempfile.mkstemp(dir=self.tmpdir,
111
                                    prefix="test.", suffix="")
112
      os.close(fd)
113
      logging.debug("Parent created %s", path)
114

    
115
    # Notify child by closing pipe
116
    os.close(notify_write)
117

    
118
    (_, status) = os.waitpid(pid, 0)
119

    
120
    self.failIf(os.WIFSIGNALED(status))
121

    
122
    if reset:
123
      # If the tempfile module was reset, it should not fail to create
124
      # temporary files
125
      expected = 0
126
    else:
127
      expected = 2
128

    
129
    self.assertEqual(os.WEXITSTATUS(status), expected)
130

    
131

    
132
if __name__ == "__main__":
133
  testutils.GanetiTestProgram()