self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
"/bin/../tmp")
+ def testMountpoint(self):
+ lvfmp_fn = compat.partial(utils.ListVisibleFiles,
+ _is_mountpoint=lambda _: True)
+ self.assertEqual(lvfmp_fn(self.path), [])
+
+ # Create "lost+found" as a regular file
+ self._CreateFiles(["foo", "bar", ".baz", "lost+found"])
+ self.assertEqual(set(lvfmp_fn(self.path)),
+ set(["foo", "bar", "lost+found"]))
+
+ # Replace "lost+found" with a directory
+ laf_path = utils.PathJoin(self.path, "lost+found")
+ utils.RemoveFile(laf_path)
+ os.mkdir(laf_path)
+ self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"]))
+
+ def testLostAndFoundNoMountpoint(self):
+ files = ["foo", "bar", ".Hello World", "lost+found"]
+ expected = ["foo", "bar", "lost+found"]
+ self._test(files, expected)
+
class TestWriteFile(testutils.GanetiTestCase):
def setUp(self):
def testUpdate(self):
name = self._CreateTempFile()
oldi = utils.GetFileID(path=name)
- os.utime(name, None)
fd = os.open(name, os.O_RDWR)
try:
newi = utils.GetFileID(fd=fd)
def setUp(self):
"""Create a temp dir and file for each case"""
- self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
- fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
+ self.tmpdir = tempfile.mkdtemp("", "ganeti-unittest-")
+ fd, self.tmpfile = tempfile.mkstemp("", "", self.tmpdir)
os.close(fd)
def tearDown(self):
def _pathTestHelper(self, path, result):
if result:
self.assert_(utils.IsNormAbsPath(path),
- "Path %s should result absolute and normalized" % path)
+ msg="Path %s should result absolute and normalized" % path)
else:
self.assertFalse(utils.IsNormAbsPath(path),
- "Path %s should not result absolute and normalized" % path)
+ msg="Path %s should not result absolute and normalized" % path)
def testBase(self):
self._pathTestHelper("/etc", True)
self._pathTestHelper("/etc/../root", False)
self._pathTestHelper("/etc/", False)
+ def testSlashes(self):
+ # Root directory
+ self._pathTestHelper("/", True)
+
+ # POSIX' "implementation-defined" double slashes
+ self._pathTestHelper("//", True)
+
+ # Three and more slashes count as one, so the path is not normalized
+ for i in range(3, 10):
+ self._pathTestHelper("/" * i, False)
+
class TestIsBelowDir(unittest.TestCase):
"""Testing case for IsBelowDir"""
+ def testExactlyTheSame(self):
+ self.assertFalse(utils.IsBelowDir("/a/b", "/a/b"))
+ self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/"))
+ self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b"))
+ self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/"))
+
def testSamePrefix(self):
self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
+ self.assertRaises(ValueError, utils.IsBelowDir, "", "/")
+ self.assertRaises(ValueError, utils.IsBelowDir, "/", "")
+
+ def testRoot(self):
+ self.assertFalse(utils.IsBelowDir("/", "/"))
+
+ for i in ["/a", "/tmp", "/tmp/foo/bar", "/tmp/"]:
+ self.assertTrue(utils.IsBelowDir("/", i))
+
+ def testSlashes(self):
+ # In POSIX a double slash is "implementation-defined".
+ self.assertFalse(utils.IsBelowDir("//", "//"))
+ self.assertFalse(utils.IsBelowDir("//", "/tmp"))
+ self.assertTrue(utils.IsBelowDir("//tmp", "//tmp/x"))
+
+ # Three (or more) slashes count as one
+ self.assertFalse(utils.IsBelowDir("/", "///"))
+ self.assertTrue(utils.IsBelowDir("/", "///tmp"))
+ self.assertTrue(utils.IsBelowDir("/tmp", "///tmp/a/b"))
class TestPathJoin(unittest.TestCase):
self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
def testPidFileFunctions(self):
- pid_file = self.f_dpn('test')
- fd = utils.WritePidFile(self.f_dpn('test'))
+ pid_file = self.f_dpn("test")
+ fd = utils.WritePidFile(self.f_dpn("test"))
self.failUnless(os.path.exists(pid_file),
"PID file should have been created")
read_pid = utils.ReadPidFile(pid_file)
self.failUnlessEqual(read_pid, os.getpid())
self.failUnless(utils.IsProcessAlive(read_pid))
- self.failUnlessRaises(errors.LockError, utils.WritePidFile,
- self.f_dpn('test'))
+ self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile,
+ self.f_dpn("test"))
os.close(fd)
utils.RemoveFile(self.f_dpn("test"))
self.failIf(os.path.exists(pid_file),
self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
"ReadPidFile should return 0 for invalid pid file")
# but now, even with the file existing, we should be able to lock it
- fd = utils.WritePidFile(self.f_dpn('test'))
+ fd = utils.WritePidFile(self.f_dpn("test"))
os.close(fd)
utils.RemoveFile(self.f_dpn("test"))
self.failIf(os.path.exists(pid_file),
"PID file should not exist anymore")
def testKill(self):
- pid_file = self.f_dpn('child')
+ pid_file = self.f_dpn("child")
r_fd, w_fd = os.pipe()
new_pid = os.fork()
if new_pid == 0: #child
- utils.WritePidFile(self.f_dpn('child'))
- os.write(w_fd, 'a')
+ utils.WritePidFile(self.f_dpn("child"))
+ os.write(w_fd, "a")
signal.pause()
os._exit(0)
return
read_pid = utils.ReadPidFile(pid_file)
self.failUnlessEqual(read_pid, new_pid)
self.failUnless(utils.IsProcessAlive(new_pid))
+
+ # Try writing to locked file
+ try:
+ utils.WritePidFile(pid_file)
+ except errors.PidFileLockError, err:
+ errmsg = str(err)
+ self.assertTrue(errmsg.endswith(" %s" % new_pid),
+ msg=("Error message ('%s') didn't contain correct"
+ " PID (%s)" % (errmsg, new_pid)))
+ else:
+ self.fail("Writing to locked file didn't fail")
+
utils.KillProcess(new_pid, waitpid=True)
self.failIf(utils.IsProcessAlive(new_pid))
- utils.RemoveFile(self.f_dpn('child'))
+ utils.RemoveFile(self.f_dpn("child"))
self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
+ def testExceptionType(self):
+ # Make sure the PID lock error is a subclass of LockError in case some code
+ # depends on it
+ self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError))
+
def tearDown(self):
shutil.rmtree(self.dir)
class TestSshKeys(testutils.GanetiTestCase):
"""Test case for the AddAuthorizedKey function"""
- KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
+ KEY_A = "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a"
KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
- 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
+ "ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b")
def setUp(self):
testutils.GanetiTestCase.setUp(self)
self.tmpname = self._CreateTempFile()
- handle = open(self.tmpname, 'w')
+ handle = open(self.tmpname, "w")
try:
handle.write("%s\n" % TestSshKeys.KEY_A)
handle.write("%s\n" % TestSshKeys.KEY_B)
def testAddingNewKey(self):
utils.AddAuthorizedKey(self.tmpname,
- 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
+ "ssh-dss AAAAB3NzaC1kc3MAAACB root@test")
self.assertFileContent(self.tmpname,
"ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
def testAddingAlmostButNotCompletelyTheSameKey(self):
utils.AddAuthorizedKey(self.tmpname,
- 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
+ "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test")
+ # Only significant fields are compared, therefore the key won't be
+ # updated/added
self.assertFileContent(self.tmpname,
"ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
- " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
- "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
+ " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
def testAddingExistingKeyWithSomeMoreSpaces(self):
utils.AddAuthorizedKey(self.tmpname,
- 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
+ "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a")
+ utils.AddAuthorizedKey(self.tmpname,
+ "ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22")
self.assertFileContent(self.tmpname,
"ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
- " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
+ " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
+ "ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22\n")
def testRemovingExistingKeyWithSomeMoreSpaces(self):
utils.RemoveAuthorizedKey(self.tmpname,
- 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
+ "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a")
self.assertFileContent(self.tmpname,
'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
def testRemovingNonExistingKey(self):
utils.RemoveAuthorizedKey(self.tmpname,
- 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
+ "ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test")
self.assertFileContent(self.tmpname,
"ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"