Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.filelock_unittest.py @ 7578ab0a

History | View | Annotate | Download (4.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.filelock"""
23

    
24
import os
25
import tempfile
26
import unittest
27

    
28
from ganeti import constants
29
from ganeti import utils
30
from ganeti import errors
31

    
32
import testutils
33

    
34

    
35
class _BaseFileLockTest:
36
  """Test case for the FileLock class"""
37

    
38
  def testSharedNonblocking(self):
39
    self.lock.Shared(blocking=False)
40
    self.lock.Close()
41

    
42
  def testExclusiveNonblocking(self):
43
    self.lock.Exclusive(blocking=False)
44
    self.lock.Close()
45

    
46
  def testUnlockNonblocking(self):
47
    self.lock.Unlock(blocking=False)
48
    self.lock.Close()
49

    
50
  def testSharedBlocking(self):
51
    self.lock.Shared(blocking=True)
52
    self.lock.Close()
53

    
54
  def testExclusiveBlocking(self):
55
    self.lock.Exclusive(blocking=True)
56
    self.lock.Close()
57

    
58
  def testUnlockBlocking(self):
59
    self.lock.Unlock(blocking=True)
60
    self.lock.Close()
61

    
62
  def testSharedExclusiveUnlock(self):
63
    self.lock.Shared(blocking=False)
64
    self.lock.Exclusive(blocking=False)
65
    self.lock.Unlock(blocking=False)
66
    self.lock.Close()
67

    
68
  def testExclusiveSharedUnlock(self):
69
    self.lock.Exclusive(blocking=False)
70
    self.lock.Shared(blocking=False)
71
    self.lock.Unlock(blocking=False)
72
    self.lock.Close()
73

    
74
  def testSimpleTimeout(self):
75
    # These will succeed on the first attempt, hence a short timeout
76
    self.lock.Shared(blocking=True, timeout=10.0)
77
    self.lock.Exclusive(blocking=False, timeout=10.0)
78
    self.lock.Unlock(blocking=True, timeout=10.0)
79
    self.lock.Close()
80

    
81
  @staticmethod
82
  def _TryLockInner(filename, shared, blocking):
83
    lock = utils.FileLock.Open(filename)
84

    
85
    if shared:
86
      fn = lock.Shared
87
    else:
88
      fn = lock.Exclusive
89

    
90
    try:
91
      # The timeout doesn't really matter as the parent process waits for us to
92
      # finish anyway.
93
      fn(blocking=blocking, timeout=0.01)
94
    except errors.LockError, err:
95
      return False
96

    
97
    return True
98

    
99
  def _TryLock(self, *args):
100
    return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
101
                                      *args)
102

    
103
  def testTimeout(self):
104
    for blocking in [True, False]:
105
      self.lock.Exclusive(blocking=True)
106
      self.failIf(self._TryLock(False, blocking))
107
      self.failIf(self._TryLock(True, blocking))
108

    
109
      self.lock.Shared(blocking=True)
110
      self.assert_(self._TryLock(True, blocking))
111
      self.failIf(self._TryLock(False, blocking))
112

    
113
  def testCloseShared(self):
114
    self.lock.Close()
115
    self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
116

    
117
  def testCloseExclusive(self):
118
    self.lock.Close()
119
    self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
120

    
121
  def testCloseUnlock(self):
122
    self.lock.Close()
123
    self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
124

    
125

    
126
class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
127
  TESTDATA = "Hello World\n" * 10
128

    
129
  def setUp(self):
130
    testutils.GanetiTestCase.setUp(self)
131

    
132
    self.tmpfile = tempfile.NamedTemporaryFile()
133
    utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
134
    self.lock = utils.FileLock.Open(self.tmpfile.name)
135

    
136
    # Ensure "Open" didn't truncate file
137
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
138

    
139
  def tearDown(self):
140
    self.assertFileContent(self.tmpfile.name, self.TESTDATA)
141

    
142
    testutils.GanetiTestCase.tearDown(self)
143

    
144

    
145
class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
146
  def setUp(self):
147
    self.tmpfile = tempfile.NamedTemporaryFile()
148
    self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
149

    
150

    
151
if __name__ == "__main__":
152
  testutils.GanetiTestProgram()