bash_completion: Enable extglob while parsing file
[ganeti-local] / test / ganeti.utils.filelock_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.utils.filelock"""
23
24 import os
25 import tempfile
26 import unittest
27
28 from ganeti import constants
29 from ganeti import utils
30 from ganeti import errors
31
32 import testutils
33
34
35 class _BaseFileLockTest:
36   """Test case for the FileLock class"""
37
38   def testSharedNonblocking(self):
39     self.lock.Shared(blocking=False)
40     self.lock.Close()
41
42   def testExclusiveNonblocking(self):
43     self.lock.Exclusive(blocking=False)
44     self.lock.Close()
45
46   def testUnlockNonblocking(self):
47     self.lock.Unlock(blocking=False)
48     self.lock.Close()
49
50   def testSharedBlocking(self):
51     self.lock.Shared(blocking=True)
52     self.lock.Close()
53
54   def testExclusiveBlocking(self):
55     self.lock.Exclusive(blocking=True)
56     self.lock.Close()
57
58   def testUnlockBlocking(self):
59     self.lock.Unlock(blocking=True)
60     self.lock.Close()
61
62   def testSharedExclusiveUnlock(self):
63     self.lock.Shared(blocking=False)
64     self.lock.Exclusive(blocking=False)
65     self.lock.Unlock(blocking=False)
66     self.lock.Close()
67
68   def testExclusiveSharedUnlock(self):
69     self.lock.Exclusive(blocking=False)
70     self.lock.Shared(blocking=False)
71     self.lock.Unlock(blocking=False)
72     self.lock.Close()
73
74   def testSimpleTimeout(self):
75     # These will succeed on the first attempt, hence a short timeout
76     self.lock.Shared(blocking=True, timeout=10.0)
77     self.lock.Exclusive(blocking=False, timeout=10.0)
78     self.lock.Unlock(blocking=True, timeout=10.0)
79     self.lock.Close()
80
81   @staticmethod
82   def _TryLockInner(filename, shared, blocking):
83     lock = utils.FileLock.Open(filename)
84
85     if shared:
86       fn = lock.Shared
87     else:
88       fn = lock.Exclusive
89
90     try:
91       # The timeout doesn't really matter as the parent process waits for us to
92       # finish anyway.
93       fn(blocking=blocking, timeout=0.01)
94     except errors.LockError, err:
95       return False
96
97     return True
98
99   def _TryLock(self, *args):
100     return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
101                                       *args)
102
103   def testTimeout(self):
104     for blocking in [True, False]:
105       self.lock.Exclusive(blocking=True)
106       self.failIf(self._TryLock(False, blocking))
107       self.failIf(self._TryLock(True, blocking))
108
109       self.lock.Shared(blocking=True)
110       self.assert_(self._TryLock(True, blocking))
111       self.failIf(self._TryLock(False, blocking))
112
113   def testCloseShared(self):
114     self.lock.Close()
115     self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
116
117   def testCloseExclusive(self):
118     self.lock.Close()
119     self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
120
121   def testCloseUnlock(self):
122     self.lock.Close()
123     self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
124
125
126 class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
127   TESTDATA = "Hello World\n" * 10
128
129   def setUp(self):
130     testutils.GanetiTestCase.setUp(self)
131
132     self.tmpfile = tempfile.NamedTemporaryFile()
133     utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
134     self.lock = utils.FileLock.Open(self.tmpfile.name)
135
136     # Ensure "Open" didn't truncate file
137     self.assertFileContent(self.tmpfile.name, self.TESTDATA)
138
139   def tearDown(self):
140     self.assertFileContent(self.tmpfile.name, self.TESTDATA)
141
142     testutils.GanetiTestCase.tearDown(self)
143
144
145 class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
146   def setUp(self):
147     self.tmpfile = tempfile.NamedTemporaryFile()
148     self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
149
150
151 if __name__ == "__main__":
152   testutils.GanetiTestProgram()