4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.utils.filelock"""
28 from ganeti import constants
29 from ganeti import utils
30 from ganeti import errors
35 class _BaseFileLockTest:
36 """Test case for the FileLock class"""
38 def testSharedNonblocking(self):
39 self.lock.Shared(blocking=False)
42 def testExclusiveNonblocking(self):
43 self.lock.Exclusive(blocking=False)
46 def testUnlockNonblocking(self):
47 self.lock.Unlock(blocking=False)
50 def testSharedBlocking(self):
51 self.lock.Shared(blocking=True)
54 def testExclusiveBlocking(self):
55 self.lock.Exclusive(blocking=True)
58 def testUnlockBlocking(self):
59 self.lock.Unlock(blocking=True)
62 def testSharedExclusiveUnlock(self):
63 self.lock.Shared(blocking=False)
64 self.lock.Exclusive(blocking=False)
65 self.lock.Unlock(blocking=False)
68 def testExclusiveSharedUnlock(self):
69 self.lock.Exclusive(blocking=False)
70 self.lock.Shared(blocking=False)
71 self.lock.Unlock(blocking=False)
74 def testSimpleTimeout(self):
75 # These will succeed on the first attempt, hence a short timeout
76 self.lock.Shared(blocking=True, timeout=10.0)
77 self.lock.Exclusive(blocking=False, timeout=10.0)
78 self.lock.Unlock(blocking=True, timeout=10.0)
82 def _TryLockInner(filename, shared, blocking):
83 lock = utils.FileLock.Open(filename)
91 # The timeout doesn't really matter as the parent process waits for us to
93 fn(blocking=blocking, timeout=0.01)
94 except errors.LockError, err:
99 def _TryLock(self, *args):
100 return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
103 def testTimeout(self):
104 for blocking in [True, False]:
105 self.lock.Exclusive(blocking=True)
106 self.failIf(self._TryLock(False, blocking))
107 self.failIf(self._TryLock(True, blocking))
109 self.lock.Shared(blocking=True)
110 self.assert_(self._TryLock(True, blocking))
111 self.failIf(self._TryLock(False, blocking))
113 def testCloseShared(self):
115 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
117 def testCloseExclusive(self):
119 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
121 def testCloseUnlock(self):
123 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
126 class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
127 TESTDATA = "Hello World\n" * 10
130 testutils.GanetiTestCase.setUp(self)
132 self.tmpfile = tempfile.NamedTemporaryFile()
133 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
134 self.lock = utils.FileLock.Open(self.tmpfile.name)
136 # Ensure "Open" didn't truncate file
137 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
140 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
142 testutils.GanetiTestCase.tearDown(self)
145 class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
147 self.tmpfile = tempfile.NamedTemporaryFile()
148 self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
151 if __name__ == "__main__":
152 testutils.GanetiTestProgram()