locking: Change locking order, move NAL after instances
[ganeti-local] / test / ganeti.utils.io_unittest.py
index ed67f68..8cee74b 100755 (executable)
@@ -28,6 +28,8 @@ import shutil
 import glob
 import time
 import signal
+import stat
+import errno
 
 from ganeti import constants
 from ganeti import utils
@@ -55,6 +57,12 @@ class TestReadFile(testutils.GanetiTestCase):
     h.update(data)
     self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
 
+  def testCallback(self):
+    def _Cb(fh):
+      self.assertEqual(fh.tell(), 0)
+    data = utils.ReadFile(self._TestDataFilename("cert1.pem"), preread=_Cb)
+    self.assertEqual(len(data), 814)
+
   def testError(self):
     self.assertRaises(EnvironmentError, utils.ReadFile,
                       "/dev/null/does-not-exist")
@@ -231,9 +239,31 @@ class TestListVisibleFiles(unittest.TestCase):
     self.failUnlessRaises(errors.ProgrammerError, utils.ListVisibleFiles,
                           "/bin/../tmp")
 
+  def testMountpoint(self):
+    lvfmp_fn = compat.partial(utils.ListVisibleFiles,
+                              _is_mountpoint=lambda _: True)
+    self.assertEqual(lvfmp_fn(self.path), [])
+
+    # Create "lost+found" as a regular file
+    self._CreateFiles(["foo", "bar", ".baz", "lost+found"])
+    self.assertEqual(set(lvfmp_fn(self.path)),
+                     set(["foo", "bar", "lost+found"]))
+
+    # Replace "lost+found" with a directory
+    laf_path = utils.PathJoin(self.path, "lost+found")
+    utils.RemoveFile(laf_path)
+    os.mkdir(laf_path)
+    self.assertEqual(set(lvfmp_fn(self.path)), set(["foo", "bar"]))
+
+  def testLostAndFoundNoMountpoint(self):
+    files = ["foo", "bar", ".Hello World", "lost+found"]
+    expected = ["foo", "bar", "lost+found"]
+    self._test(files, expected)
+
 
-class TestWriteFile(unittest.TestCase):
+class TestWriteFile(testutils.GanetiTestCase):
   def setUp(self):
+    testutils.GanetiTestCase.setUp(self)
     self.tmpdir = None
     self.tfile = tempfile.NamedTemporaryFile()
     self.did_pre = False
@@ -241,6 +271,7 @@ class TestWriteFile(unittest.TestCase):
     self.did_write = False
 
   def tearDown(self):
+    testutils.GanetiTestCase.tearDown(self)
     if self.tmpdir:
       shutil.rmtree(self.tmpdir)
 
@@ -269,6 +300,14 @@ class TestWriteFile(unittest.TestCase):
     self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name)
     self.assertRaises(errors.ProgrammerError, utils.WriteFile,
                       self.tfile.name, data="test", atime=0)
+    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+                      mode=0400, keep_perms=utils.KP_ALWAYS)
+    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+                      uid=0, keep_perms=utils.KP_ALWAYS)
+    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+                      gid=0, keep_perms=utils.KP_ALWAYS)
+    self.assertRaises(errors.ProgrammerError, utils.WriteFile, self.tfile.name,
+                      mode=0400, uid=0, keep_perms=utils.KP_ALWAYS)
 
   def testPreWrite(self):
     utils.WriteFile(self.tfile.name, data="", prewrite=self.markPre)
@@ -365,6 +404,28 @@ class TestWriteFile(unittest.TestCase):
     self.assertTrue("test" in os.listdir(self.tmpdir))
     self.assertEqual(len(os.listdir(self.tmpdir)), 2)
 
+  def testFileMode(self):
+    self.tmpdir = tempfile.mkdtemp()
+    target = utils.PathJoin(self.tmpdir, "target")
+    self.assertRaises(OSError, utils.WriteFile, target, data="data",
+                      keep_perms=utils.KP_ALWAYS)
+    # All masks have only user bits set, to avoid interactions with umask
+    utils.WriteFile(target, data="data", mode=0200)
+    self.assertFileMode(target, 0200)
+    utils.WriteFile(target, data="data", mode=0400,
+                    keep_perms=utils.KP_IF_EXISTS)
+    self.assertFileMode(target, 0200)
+    utils.WriteFile(target, data="data", keep_perms=utils.KP_ALWAYS)
+    self.assertFileMode(target, 0200)
+    utils.WriteFile(target, data="data", mode=0700)
+    self.assertFileMode(target, 0700)
+
+  def testNewFileMode(self):
+    self.tmpdir = tempfile.mkdtemp()
+    target = utils.PathJoin(self.tmpdir, "target")
+    utils.WriteFile(target, data="data", mode=0400,
+                    keep_perms=utils.KP_IF_EXISTS)
+    self.assertFileMode(target, 0400)
 
 class TestFileID(testutils.GanetiTestCase):
   def testEquality(self):
@@ -375,7 +436,6 @@ class TestFileID(testutils.GanetiTestCase):
   def testUpdate(self):
     name = self._CreateTempFile()
     oldi = utils.GetFileID(path=name)
-    os.utime(name, None)
     fd = os.open(name, os.O_RDWR)
     try:
       newi = utils.GetFileID(fd=fd)
@@ -410,8 +470,8 @@ class TestRemoveFile(unittest.TestCase):
 
   def setUp(self):
     """Create a temp dir and file for each case"""
-    self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
-    fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
+    self.tmpdir = tempfile.mkdtemp("", "ganeti-unittest-")
+    fd, self.tmpfile = tempfile.mkstemp("", "", self.tmpdir)
     os.close(fd)
 
   def tearDown(self):
@@ -500,12 +560,15 @@ class TestRename(unittest.TestCase):
     self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
     self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
 
-    utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
-                     os.path.join(self.tmpdir, "test/foo/bar/baz"),
-                     mkdir=True)
-    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
-    self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
-    self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
+    self.assertRaises(EnvironmentError, utils.RenameFile,
+                      os.path.join(self.tmpdir, "test/xyz"),
+                      os.path.join(self.tmpdir, "test/foo/bar/baz"),
+                      mkdir=True)
+
+    self.assertTrue(os.path.exists(os.path.join(self.tmpdir, "test/xyz")))
+    self.assertFalse(os.path.exists(os.path.join(self.tmpdir, "test/foo/bar")))
+    self.assertFalse(os.path.exists(os.path.join(self.tmpdir,
+                                                 "test/foo/bar/baz")))
 
 
 class TestMakedirs(unittest.TestCase):
@@ -567,10 +630,10 @@ class TestIsNormAbsPath(unittest.TestCase):
   def _pathTestHelper(self, path, result):
     if result:
       self.assert_(utils.IsNormAbsPath(path),
-          "Path %s should result absolute and normalized" % path)
+          msg="Path %s should result absolute and normalized" % path)
     else:
       self.assertFalse(utils.IsNormAbsPath(path),
-          "Path %s should not result absolute and normalized" % path)
+          msg="Path %s should not result absolute and normalized" % path)
 
   def testBase(self):
     self._pathTestHelper("/etc", True)
@@ -579,6 +642,68 @@ class TestIsNormAbsPath(unittest.TestCase):
     self._pathTestHelper("/etc/../root", False)
     self._pathTestHelper("/etc/", False)
 
+  def testSlashes(self):
+    # Root directory
+    self._pathTestHelper("/", True)
+
+    # POSIX' "implementation-defined" double slashes
+    self._pathTestHelper("//", True)
+
+    # Three and more slashes count as one, so the path is not normalized
+    for i in range(3, 10):
+      self._pathTestHelper("/" * i, False)
+
+
+class TestIsBelowDir(unittest.TestCase):
+  """Testing case for IsBelowDir"""
+
+  def testExactlyTheSame(self):
+    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b"))
+    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/"))
+    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b"))
+    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/"))
+
+  def testSamePrefix(self):
+    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c"))
+    self.assertTrue(utils.IsBelowDir("/a/b/", "/a/b/e"))
+
+  def testSamePrefixButDifferentDir(self):
+    self.assertFalse(utils.IsBelowDir("/a/b", "/a/bc/d"))
+    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/bc/e"))
+
+  def testSamePrefixButDirTraversal(self):
+    self.assertFalse(utils.IsBelowDir("/a/b", "/a/b/../c"))
+    self.assertFalse(utils.IsBelowDir("/a/b/", "/a/b/../d"))
+
+  def testSamePrefixAndTraversal(self):
+    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/../d"))
+    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/c/./e"))
+    self.assertTrue(utils.IsBelowDir("/a/b", "/a/b/../b/./e"))
+
+  def testBothAbsPath(self):
+    self.assertRaises(ValueError, utils.IsBelowDir, "/a/b/c", "d")
+    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "/d")
+    self.assertRaises(ValueError, utils.IsBelowDir, "a/b/c", "d")
+    self.assertRaises(ValueError, utils.IsBelowDir, "", "/")
+    self.assertRaises(ValueError, utils.IsBelowDir, "/", "")
+
+  def testRoot(self):
+    self.assertFalse(utils.IsBelowDir("/", "/"))
+
+    for i in ["/a", "/tmp", "/tmp/foo/bar", "/tmp/"]:
+      self.assertTrue(utils.IsBelowDir("/", i))
+
+  def testSlashes(self):
+    # In POSIX a double slash is "implementation-defined".
+    self.assertFalse(utils.IsBelowDir("//", "//"))
+    self.assertFalse(utils.IsBelowDir("//", "/tmp"))
+    self.assertTrue(utils.IsBelowDir("//tmp", "//tmp/x"))
+
+    # Three (or more) slashes count as one
+    self.assertFalse(utils.IsBelowDir("/", "///"))
+    self.assertTrue(utils.IsBelowDir("/", "///tmp"))
+    self.assertTrue(utils.IsBelowDir("/tmp", "///tmp/a/b"))
+
 
 class TestPathJoin(unittest.TestCase):
   """Testing case for PathJoin"""
@@ -647,15 +772,15 @@ class TestPidFileFunctions(unittest.TestCase):
     self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
 
   def testPidFileFunctions(self):
-    pid_file = self.f_dpn('test')
-    fd = utils.WritePidFile(self.f_dpn('test'))
+    pid_file = self.f_dpn("test")
+    fd = utils.WritePidFile(self.f_dpn("test"))
     self.failUnless(os.path.exists(pid_file),
                     "PID file should have been created")
     read_pid = utils.ReadPidFile(pid_file)
     self.failUnlessEqual(read_pid, os.getpid())
     self.failUnless(utils.IsProcessAlive(read_pid))
-    self.failUnlessRaises(errors.LockError, utils.WritePidFile,
-                          self.f_dpn('test'))
+    self.failUnlessRaises(errors.PidFileLockError, utils.WritePidFile,
+                          self.f_dpn("test"))
     os.close(fd)
     utils.RemoveFile(self.f_dpn("test"))
     self.failIf(os.path.exists(pid_file),
@@ -668,19 +793,19 @@ class TestPidFileFunctions(unittest.TestCase):
     self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
                          "ReadPidFile should return 0 for invalid pid file")
     # but now, even with the file existing, we should be able to lock it
-    fd = utils.WritePidFile(self.f_dpn('test'))
+    fd = utils.WritePidFile(self.f_dpn("test"))
     os.close(fd)
     utils.RemoveFile(self.f_dpn("test"))
     self.failIf(os.path.exists(pid_file),
                 "PID file should not exist anymore")
 
   def testKill(self):
-    pid_file = self.f_dpn('child')
+    pid_file = self.f_dpn("child")
     r_fd, w_fd = os.pipe()
     new_pid = os.fork()
     if new_pid == 0: #child
-      utils.WritePidFile(self.f_dpn('child'))
-      os.write(w_fd, 'a')
+      utils.WritePidFile(self.f_dpn("child"))
+      os.write(w_fd, "a")
       signal.pause()
       os._exit(0)
       return
@@ -690,11 +815,28 @@ class TestPidFileFunctions(unittest.TestCase):
     read_pid = utils.ReadPidFile(pid_file)
     self.failUnlessEqual(read_pid, new_pid)
     self.failUnless(utils.IsProcessAlive(new_pid))
+
+    # Try writing to locked file
+    try:
+      utils.WritePidFile(pid_file)
+    except errors.PidFileLockError, err:
+      errmsg = str(err)
+      self.assertTrue(errmsg.endswith(" %s" % new_pid),
+                      msg=("Error message ('%s') didn't contain correct"
+                           " PID (%s)" % (errmsg, new_pid)))
+    else:
+      self.fail("Writing to locked file didn't fail")
+
     utils.KillProcess(new_pid, waitpid=True)
     self.failIf(utils.IsProcessAlive(new_pid))
-    utils.RemoveFile(self.f_dpn('child'))
+    utils.RemoveFile(self.f_dpn("child"))
     self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
 
+  def testExceptionType(self):
+    # Make sure the PID lock error is a subclass of LockError in case some code
+    # depends on it
+    self.assertTrue(issubclass(errors.PidFileLockError, errors.LockError))
+
   def tearDown(self):
     shutil.rmtree(self.dir)
 
@@ -702,14 +844,14 @@ class TestPidFileFunctions(unittest.TestCase):
 class TestSshKeys(testutils.GanetiTestCase):
   """Test case for the AddAuthorizedKey function"""
 
-  KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
+  KEY_A = "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a"
   KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
-           'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
+           "ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b")
 
   def setUp(self):
     testutils.GanetiTestCase.setUp(self)
     self.tmpname = self._CreateTempFile()
-    handle = open(self.tmpname, 'w')
+    handle = open(self.tmpname, "w")
     try:
       handle.write("%s\n" % TestSshKeys.KEY_A)
       handle.write("%s\n" % TestSshKeys.KEY_B)
@@ -718,7 +860,7 @@ class TestSshKeys(testutils.GanetiTestCase):
 
   def testAddingNewKey(self):
     utils.AddAuthorizedKey(self.tmpname,
-                           'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
+                           "ssh-dss AAAAB3NzaC1kc3MAAACB root@test")
 
     self.assertFileContent(self.tmpname,
       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
@@ -728,26 +870,30 @@ class TestSshKeys(testutils.GanetiTestCase):
 
   def testAddingAlmostButNotCompletelyTheSameKey(self):
     utils.AddAuthorizedKey(self.tmpname,
-        'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
+        "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test")
 
+    # Only significant fields are compared, therefore the key won't be
+    # updated/added
     self.assertFileContent(self.tmpname,
       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
-      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
-      "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
+      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
 
   def testAddingExistingKeyWithSomeMoreSpaces(self):
     utils.AddAuthorizedKey(self.tmpname,
-        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
+      "ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a")
+    utils.AddAuthorizedKey(self.tmpname,
+      "ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22")
 
     self.assertFileContent(self.tmpname,
       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
-      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
+      " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
+      "ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22\n")
 
   def testRemovingExistingKeyWithSomeMoreSpaces(self):
     utils.RemoveAuthorizedKey(self.tmpname,
-        'ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a')
+        "ssh-dss  AAAAB3NzaC1w5256closdj32mZaQU   root@key-a")
 
     self.assertFileContent(self.tmpname,
       'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
@@ -755,7 +901,7 @@ class TestSshKeys(testutils.GanetiTestCase):
 
   def testRemovingNonExistingKey(self):
     utils.RemoveAuthorizedKey(self.tmpname,
-        'ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test')
+        "ssh-dss  AAAAB3Nsdfj230xxjxJjsjwjsjdjU   root@test")
 
     self.assertFileContent(self.tmpname,
       "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
@@ -770,5 +916,127 @@ class TestNewUUID(unittest.TestCase):
     self.failUnless(utils.UUID_RE.match(utils.NewUUID()))
 
 
+def _MockStatResult(cb, mode, uid, gid):
+  def _fn(path):
+    if cb:
+      cb()
+    return {
+      stat.ST_MODE: mode,
+      stat.ST_UID: uid,
+      stat.ST_GID: gid,
+      }
+  return _fn
+
+
+def _RaiseNoEntError():
+  raise EnvironmentError(errno.ENOENT, "not found")
+
+
+def _OtherStatRaise():
+  raise EnvironmentError()
+
+
+class TestPermissionEnforcements(unittest.TestCase):
+  UID_A = 16024
+  UID_B = 25850
+  GID_A = 14028
+  GID_B = 29801
+
+  def setUp(self):
+    self._chown_calls = []
+    self._chmod_calls = []
+    self._mkdir_calls = []
+
+  def tearDown(self):
+    self.assertRaises(IndexError, self._mkdir_calls.pop)
+    self.assertRaises(IndexError, self._chmod_calls.pop)
+    self.assertRaises(IndexError, self._chown_calls.pop)
+
+  def _FakeMkdir(self, path):
+    self._mkdir_calls.append(path)
+
+  def _FakeChown(self, path, uid, gid):
+    self._chown_calls.append((path, uid, gid))
+
+  def _ChmodWrapper(self, cb):
+    def _fn(path, mode):
+      self._chmod_calls.append((path, mode))
+      if cb:
+        cb()
+    return _fn
+
+  def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
+    self.assertEqual(path, "/ganeti-qa-non-test")
+    self.assertEqual(mode, 0700)
+    self.assertEqual(uid, self.UID_A)
+    self.assertEqual(gid, self.GID_A)
+
+  def testMakeDirWithPerm(self):
+    is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
+    utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
+                          _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
+
+  def testDirErrors(self):
+    self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
+                      "/ganeti-qa-non-test", 0700, 0, 0,
+                      _lstat_fn=_MockStatResult(None, 0, 0, 0))
+    self.assertRaises(IndexError, self._mkdir_calls.pop)
+
+    other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
+    self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
+                      "/ganeti-qa-non-test", 0700, 0, 0,
+                      _lstat_fn=other_stat_raise)
+    self.assertRaises(IndexError, self._mkdir_calls.pop)
+
+    non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
+    utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
+                          _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
+                          _perm_fn=self._VerifyPerm)
+    self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
+
+  def testEnforcePermissionNoEnt(self):
+    self.assertRaises(errors.GenericError, utils.EnforcePermission,
+                      "/ganeti-qa-non-test", 0600,
+                      _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
+                      _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
+
+  def testEnforcePermissionNoEntMustNotExist(self):
+    utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
+                            _chmod_fn=NotImplemented,
+                            _chown_fn=NotImplemented,
+                            _stat_fn=_MockStatResult(_RaiseNoEntError,
+                                                          0, 0, 0))
+
+  def testEnforcePermissionOtherErrorMustNotExist(self):
+    self.assertRaises(errors.GenericError, utils.EnforcePermission,
+                      "/ganeti-qa-non-test", 0600, must_exist=False,
+                      _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
+                      _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
+
+  def testEnforcePermissionNoChanges(self):
+    utils.EnforcePermission("/ganeti-qa-non-test", 0600,
+                            _stat_fn=_MockStatResult(None, 0600, 0, 0),
+                            _chmod_fn=self._ChmodWrapper(None),
+                            _chown_fn=self._FakeChown)
+
+  def testEnforcePermissionChangeMode(self):
+    utils.EnforcePermission("/ganeti-qa-non-test", 0444,
+                            _stat_fn=_MockStatResult(None, 0600, 0, 0),
+                            _chmod_fn=self._ChmodWrapper(None),
+                            _chown_fn=self._FakeChown)
+    self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
+
+  def testEnforcePermissionSetUidGid(self):
+    utils.EnforcePermission("/ganeti-qa-non-test", 0600,
+                            uid=self.UID_B, gid=self.GID_B,
+                            _stat_fn=_MockStatResult(None, 0600,
+                                                     self.UID_A,
+                                                     self.GID_A),
+                            _chmod_fn=self._ChmodWrapper(None),
+                            _chown_fn=self._FakeChown)
+    self.assertEqual(self._chown_calls.pop(0),
+                     ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
+
+
 if __name__ == "__main__":
   testutils.GanetiTestProgram()