Revision 9d1b963f test/ganeti.utils_unittest.py
b/test/ganeti.utils_unittest.py | ||
---|---|---|
1165 | 1165 |
self.failUnlessEqual(TailFile(fname, lines=i), data[-i:]) |
1166 | 1166 |
|
1167 | 1167 |
|
1168 |
class _BaseFileLockTest: |
|
1169 |
"""Test case for the FileLock class""" |
|
1170 |
|
|
1171 |
def testSharedNonblocking(self): |
|
1172 |
self.lock.Shared(blocking=False) |
|
1173 |
self.lock.Close() |
|
1174 |
|
|
1175 |
def testExclusiveNonblocking(self): |
|
1176 |
self.lock.Exclusive(blocking=False) |
|
1177 |
self.lock.Close() |
|
1178 |
|
|
1179 |
def testUnlockNonblocking(self): |
|
1180 |
self.lock.Unlock(blocking=False) |
|
1181 |
self.lock.Close() |
|
1182 |
|
|
1183 |
def testSharedBlocking(self): |
|
1184 |
self.lock.Shared(blocking=True) |
|
1185 |
self.lock.Close() |
|
1186 |
|
|
1187 |
def testExclusiveBlocking(self): |
|
1188 |
self.lock.Exclusive(blocking=True) |
|
1189 |
self.lock.Close() |
|
1190 |
|
|
1191 |
def testUnlockBlocking(self): |
|
1192 |
self.lock.Unlock(blocking=True) |
|
1193 |
self.lock.Close() |
|
1194 |
|
|
1195 |
def testSharedExclusiveUnlock(self): |
|
1196 |
self.lock.Shared(blocking=False) |
|
1197 |
self.lock.Exclusive(blocking=False) |
|
1198 |
self.lock.Unlock(blocking=False) |
|
1199 |
self.lock.Close() |
|
1200 |
|
|
1201 |
def testExclusiveSharedUnlock(self): |
|
1202 |
self.lock.Exclusive(blocking=False) |
|
1203 |
self.lock.Shared(blocking=False) |
|
1204 |
self.lock.Unlock(blocking=False) |
|
1205 |
self.lock.Close() |
|
1206 |
|
|
1207 |
def testSimpleTimeout(self): |
|
1208 |
# These will succeed on the first attempt, hence a short timeout |
|
1209 |
self.lock.Shared(blocking=True, timeout=10.0) |
|
1210 |
self.lock.Exclusive(blocking=False, timeout=10.0) |
|
1211 |
self.lock.Unlock(blocking=True, timeout=10.0) |
|
1212 |
self.lock.Close() |
|
1213 |
|
|
1214 |
@staticmethod |
|
1215 |
def _TryLockInner(filename, shared, blocking): |
|
1216 |
lock = utils.FileLock.Open(filename) |
|
1217 |
|
|
1218 |
if shared: |
|
1219 |
fn = lock.Shared |
|
1220 |
else: |
|
1221 |
fn = lock.Exclusive |
|
1222 |
|
|
1223 |
try: |
|
1224 |
# The timeout doesn't really matter as the parent process waits for us to |
|
1225 |
# finish anyway. |
|
1226 |
fn(blocking=blocking, timeout=0.01) |
|
1227 |
except errors.LockError, err: |
|
1228 |
return False |
|
1229 |
|
|
1230 |
return True |
|
1231 |
|
|
1232 |
def _TryLock(self, *args): |
|
1233 |
return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name, |
|
1234 |
*args) |
|
1235 |
|
|
1236 |
def testTimeout(self): |
|
1237 |
for blocking in [True, False]: |
|
1238 |
self.lock.Exclusive(blocking=True) |
|
1239 |
self.failIf(self._TryLock(False, blocking)) |
|
1240 |
self.failIf(self._TryLock(True, blocking)) |
|
1241 |
|
|
1242 |
self.lock.Shared(blocking=True) |
|
1243 |
self.assert_(self._TryLock(True, blocking)) |
|
1244 |
self.failIf(self._TryLock(False, blocking)) |
|
1245 |
|
|
1246 |
def testCloseShared(self): |
|
1247 |
self.lock.Close() |
|
1248 |
self.assertRaises(AssertionError, self.lock.Shared, blocking=False) |
|
1249 |
|
|
1250 |
def testCloseExclusive(self): |
|
1251 |
self.lock.Close() |
|
1252 |
self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False) |
|
1253 |
|
|
1254 |
def testCloseUnlock(self): |
|
1255 |
self.lock.Close() |
|
1256 |
self.assertRaises(AssertionError, self.lock.Unlock, blocking=False) |
|
1257 |
|
|
1258 |
|
|
1259 |
class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest): |
|
1260 |
TESTDATA = "Hello World\n" * 10 |
|
1261 |
|
|
1262 |
def setUp(self): |
|
1263 |
testutils.GanetiTestCase.setUp(self) |
|
1264 |
|
|
1265 |
self.tmpfile = tempfile.NamedTemporaryFile() |
|
1266 |
utils.WriteFile(self.tmpfile.name, data=self.TESTDATA) |
|
1267 |
self.lock = utils.FileLock.Open(self.tmpfile.name) |
|
1268 |
|
|
1269 |
# Ensure "Open" didn't truncate file |
|
1270 |
self.assertFileContent(self.tmpfile.name, self.TESTDATA) |
|
1271 |
|
|
1272 |
def tearDown(self): |
|
1273 |
self.assertFileContent(self.tmpfile.name, self.TESTDATA) |
|
1274 |
|
|
1275 |
testutils.GanetiTestCase.tearDown(self) |
|
1276 |
|
|
1277 |
|
|
1278 |
class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest): |
|
1279 |
def setUp(self): |
|
1280 |
self.tmpfile = tempfile.NamedTemporaryFile() |
|
1281 |
self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name) |
|
1282 |
|
|
1283 |
|
|
1284 | 1168 |
class TestTimeFunctions(unittest.TestCase): |
1285 | 1169 |
"""Test case for time functions""" |
1286 | 1170 |
|
Also available in: Unified diff