raise
+class FileLock(object):
+ """Utility class for file locks.
+
+ """
+ def __init__(self, filename):
+ self.filename = filename
+ self.fd = open(self.filename, "w")
+
+ def __del__(self):
+ self.Close()
+
+ def Close(self):
+ if self.fd:
+ self.fd.close()
+ self.fd = None
+
+ def _flock(self, flag, blocking, errmsg):
+ assert self.fd, "Lock was closed"
+
+ if not blocking:
+ flag |= fcntl.LOCK_NB
+
+ try:
+ fcntl.flock(self.fd, flag)
+ except IOError, err:
+ logging.exception("fcntl.flock failed")
+ if err.errno in (errno.EAGAIN, ):
+ raise errors.LockError(errmsg)
+ raise
+
+ def Exclusive(self, blocking=False):
+ """Locks the file in exclusive mode.
+
+ """
+ self._flock(fcntl.LOCK_EX, blocking,
+ "Failed to lock %s in exclusive mode" % self.filename)
+
+ def Shared(self, blocking=False):
+ """Locks the file in shared mode.
+
+ """
+ self._flock(fcntl.LOCK_SH, blocking,
+ "Failed to lock %s in shared mode" % self.filename)
+
+ def Unlock(self, blocking=True):
+ """Unlocks the file.
+
+ According to "man flock", unlocking can also be a nonblocking operation:
+ "To make a non-blocking request, include LOCK_NB with any of the above
+ operations"
+
+ """
+ self._flock(fcntl.LOCK_UN, blocking,
+ "Failed to unlock %s" % self.filename)
+
+
class SignalHandler(object):
"""Generic signal handler class.
import socket
import shutil
import re
-import tempfile
import ganeti
import testutils
self._test(["a", "b"], ["a", "b"])
self._test(["a", "b", "a"], ["a", "b"])
+
class TestFirstFree(unittest.TestCase):
"""Test case for the FirstFree function"""
self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
+
+class TestFileLock(unittest.TestCase):
+ """Test case for the FileLock class"""
+
+ def setUp(self):
+ self.tmpfile = tempfile.NamedTemporaryFile()
+ self.lock = utils.FileLock(self.tmpfile.name)
+
+ def testSharedNonblocking(self):
+ self.lock.Shared(blocking=False)
+ self.lock.Close()
+
+ def testExclusiveNonblocking(self):
+ self.lock.Exclusive(blocking=False)
+ self.lock.Close()
+
+ def testUnlockNonblocking(self):
+ self.lock.Unlock(blocking=False)
+ self.lock.Close()
+
+ def testSharedBlocking(self):
+ self.lock.Shared(blocking=True)
+ self.lock.Close()
+
+ def testExclusiveBlocking(self):
+ self.lock.Exclusive(blocking=True)
+ self.lock.Close()
+
+ def testUnlockBlocking(self):
+ self.lock.Unlock(blocking=True)
+ self.lock.Close()
+
+ def testSharedExclusiveUnlock(self):
+ self.lock.Shared(blocking=False)
+ self.lock.Exclusive(blocking=False)
+ self.lock.Unlock(blocking=False)
+ self.lock.Close()
+
+ def testExclusiveSharedUnlock(self):
+ self.lock.Exclusive(blocking=False)
+ self.lock.Shared(blocking=False)
+ self.lock.Unlock(blocking=False)
+ self.lock.Close()
+
+ def testCloseShared(self):
+ self.lock.Close()
+ self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
+
+ def testCloseExclusive(self):
+ self.lock.Close()
+ self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
+
+ def testCloseUnlock(self):
+ self.lock.Close()
+ self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
+
+
if __name__ == '__main__':
unittest.main()