locking: Change locking order, move NAL after instances
[ganeti-local] / test / ganeti.utils.io_unittest.py
index 109232a..8cee74b 100755 (executable)
@@ -470,8 +470,8 @@ class TestRemoveFile(unittest.TestCase):
 
   def setUp(self):
     """Create a temp dir and file for each case"""
-    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
-    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
+    self.tmpdir = tempfile.mkdtemp("", "ganeti-unittest-")
+    fd, self.tmpfile = tempfile.mkstemp("", "", self.tmpdir)
     os.close(fd)
 
   def tearDown(self):
@@ -630,10 +630,10 @@ class TestIsNormAbsPath(unittest.TestCase):
   def _pathTestHelper(self, path, result):
     if result:
       self.assert_(utils.IsNormAbsPath(path),
-          "Path %s should result absolute and normalized" % path)
+          msg="Path %s should result absolute and normalized" % path)
     else:
       self.assertFalse(utils.IsNormAbsPath(path),
-          "Path %s should not result absolute and normalized" % path)
+          msg="Path %s should not result absolute and normalized" % path)
 
   def testBase(self):
     self._pathTestHelper("/etc", True)
@@ -642,10 +642,27 @@ class TestIsNormAbsPath(unittest.TestCase):
     self._pathTestHelper("/etc/../root", False)
     self._pathTestHelper("/etc/", False)
 
+  def testSlashes(self):
+    # Root directory
+    self._pathTestHelper("/", True)
+
+    # POSIX' "implementation-defined" double slashes
+    self._pathTestHelper("//", True)
+
+    # Three and more slashes count as one, so the path is not normalized
+    for i in range(3, 10):
+      self._pathTestHelper("/" * i, False)
+
 
 class TestIsBelowDir(unittest.TestCase):
   """Testing case for IsBelowDir"""
 
+  def testExactlyTheSame(self):
+    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b"))
+    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/"))
+    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b"))
+    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/"))
+
   def testSamePrefix(self):
     self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
     self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
@@ -667,6 +684,25 @@ class TestIsBelowDir(unittest.TestCase):
     self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
     self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
     self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
+    self.assertRaises(ValueError, utils.IsBelowDir, "", "/")
+    self.assertRaises(ValueError, utils.IsBelowDir, "/", "")
+
+  def testRoot(self):
+    self.assertFalse(utils.IsBelowDir("/", "/"))
+
+    for i in ["/a", "/tmp", "/tmp/foo/bar", "/tmp/"]:
+      self.assertTrue(utils.IsBelowDir("/", i))
+
+  def testSlashes(self):
+    # In POSIX a double slash is "implementation-defined".
+    self.assertFalse(utils.IsBelowDir("//", "//"))
+    self.assertFalse(utils.IsBelowDir("//", "/tmp"))
+    self.assertTrue(utils.IsBelowDir("//tmp", "//tmp/x"))
+
+    # Three (or more) slashes count as one
+    self.assertFalse(utils.IsBelowDir("/", "///"))
+    self.assertTrue(utils.IsBelowDir("/", "///tmp"))
+    self.assertTrue(utils.IsBelowDir("/tmp", "///tmp/a/b"))
 
 
 class TestPathJoin(unittest.TestCase):
@@ -736,15 +772,15 @@ class TestPidFileFunctions(unittest.TestCase):
     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
 
   def testPidFileFunctions(self):
-    pid_file = self.f_dpn('test')
-    fd = utils.WritePidFile(self.f_dpn('test'))
+    pid_file = self.f_dpn("test")
+    fd = utils.WritePidFile(self.f_dpn("test"))
     self.failUnless(os.path.exists(pid_file),
                     "PID file should have been created")
     read_pid = utils.ReadPidFile(pid_file)
     self.failUnlessEqual(read_pid, os.getpid())
     self.failUnless(utils.IsProcessAlive(read_pid))
     self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile,
-                          self.f_dpn('test'))
+                          self.f_dpn("test"))
     os.close(fd)
     utils.RemoveFile(self.f_dpn("test"))
     self.failIf(os.path.exists(pid_file),
@@ -757,19 +793,19 @@ class TestPidFileFunctions(unittest.TestCase):
     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
                          "ReadPidFile should return 0 for invalid pid file")
     # but now, even with the file existing, we should be able to lock it
-    fd = utils.WritePidFile(self.f_dpn('test'))
+    fd = utils.WritePidFile(self.f_dpn("test"))
     os.close(fd)
     utils.RemoveFile(self.f_dpn("test"))
     self.failIf(os.path.exists(pid_file),
                 "PID file should not exist anymore")
 
   def testKill(self):
-    pid_file = self.f_dpn('child')
+    pid_file = self.f_dpn("child")
     r_fd, w_fd = os.pipe()
     new_pid = os.fork()
     if new_pid == 0: #child
-      utils.WritePidFile(self.f_dpn('child'))
-      os.write(w_fd, 'a')
+      utils.WritePidFile(self.f_dpn("child"))
+      os.write(w_fd, "a")
       signal.pause()
       os._exit(0)
       return
@@ -793,7 +829,7 @@ class TestPidFileFunctions(unittest.TestCase):
 
     utils.KillProcess(new_pid, waitpid=True)
     self.failIf(utils.IsProcessAlive(new_pid))
-    utils.RemoveFile(self.f_dpn('child'))
+    utils.RemoveFile(self.f_dpn("child"))
     self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
 
   def testExceptionType(self):
@@ -808,14 +844,14 @@ class TestPidFileFunctions(unittest.TestCase):
 class TestSshKeys(testutils.GanetiTestCase):
   """Test case for the AddAuthorizedKey function"""
 
-  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
+  KEY_A = "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a"
   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
-           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
+           "ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b")
 
   def setUp(self):
     testutils.GanetiTestCase.setUp(self)
     self.tmpname = self._CreateTempFile()
-    handle = open(self.tmpname, 'w')
+    handle = open(self.tmpname, "w")
     try:
       handle.write("%s\n" % TestSshKeys.KEY_A)
       handle.write("%s\n" % TestSshKeys.KEY_B)
@@ -824,7 +860,7 @@ class TestSshKeys(testutils.GanetiTestCase):
 
   def testAddingNewKey(self):
     utils.AddAuthorizedKey(self.tmpname,
-                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
+                           "ssh-dss AAAAB3NzaC1kc3MAAACB root@test")
 
     self.assertFileContent(self.tmpname,
       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
@@ -834,26 +870,30 @@ class TestSshKeys(testutils.GanetiTestCase):
 
   def testAddingAlmostButNotCompletelyTheSameKey(self):
     utils.AddAuthorizedKey(self.tmpname,
-        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
+        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test")
 
+    # Only significant fields are compared, therefore the key won't be
+    # updated/added
     self.assertFileContent(self.tmpname,
       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
-      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
-      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
+      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
 
   def testAddingExistingKeyWithSomeMoreSpaces(self):
     utils.AddAuthorizedKey(self.tmpname,
-        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
+      "ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a")
+    utils.AddAuthorizedKey(self.tmpname,
+      "ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22")
 
     self.assertFileContent(self.tmpname,
       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
-      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
+      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
+      "ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22\n")
 
   def testRemovingExistingKeyWithSomeMoreSpaces(self):
     utils.RemoveAuthorizedKey(self.tmpname,
-        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
+        "ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a")
 
     self.assertFileContent(self.tmpname,
       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
@@ -861,7 +901,7 @@ class TestSshKeys(testutils.GanetiTestCase):
 
   def testRemovingNonExistingKey(self):
     utils.RemoveAuthorizedKey(self.tmpname,
-        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
+        "ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test")
 
     self.assertFileContent(self.tmpname,
       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"