From: Michael Hanselmann Date: Fri, 19 Aug 2011 11:14:04 +0000 (+0200) Subject: ensure-dirs: Check mode and owner before changing X-Git-Tag: v2.5.0beta2~13 X-Git-Url: https://code.grnet.gr/git/ganeti-local/commitdiff_plain/c3f54085a6c91c0b36a205ea01e6769af9ec4059 ensure-dirs: Check mode and owner before changing This avoids many calls to chmod(2) and chown(2), and thereby ctime updates. Since I had to update the unittests anyway I untangled the code a bit, split it into more separate functions and added some more tests. Signed-off-by: Michael Hanselmann Reviewed-by: René Nussbaumer --- diff --git a/lib/tools/ensure_dirs.py b/lib/tools/ensure_dirs.py index ad1fb09..11f9134 100644 --- a/lib/tools/ensure_dirs.py +++ b/lib/tools/ensure_dirs.py @@ -56,7 +56,7 @@ class EnsureError(errors.GenericError): def EnsurePermission(path, mode, uid=-1, gid=-1, must_exist=True, - _chmod_fn=os.chmod, _chown_fn=os.chown): + _chmod_fn=os.chmod, _chown_fn=os.chown, _stat_fn=os.stat): """Ensures that given path has given mode. @param path: The path to the file @@ -70,10 +70,20 @@ def EnsurePermission(path, mode, uid=-1, gid=-1, must_exist=True, """ logging.debug("Checking %s", path) try: - _chmod_fn(path, mode) + st = _stat_fn(path) + + fmode = stat.S_IMODE(st[stat.ST_MODE]) + if fmode != mode: + logging.debug("Changing mode of %s from %#o to %#o", path, fmode, mode) + _chmod_fn(path, mode) if max(uid, gid) > -1: - _chown_fn(path, uid, gid) + fuid = st[stat.ST_UID] + fgid = st[stat.ST_GID] + if fuid != uid or fgid != gid: + logging.debug("Changing owner of %s from UID %s/GID %s to" + " UID %s/GID %s", path, fuid, fgid, uid, gid) + _chown_fn(path, uid, gid) except EnvironmentError, err: if err.errno == errno.ENOENT: if must_exist: diff --git a/test/ganeti.tools.ensure_dirs_unittest.py b/test/ganeti.tools.ensure_dirs_unittest.py index 999da06..f619b81 100755 --- a/test/ganeti.tools.ensure_dirs_unittest.py +++ b/test/ganeti.tools.ensure_dirs_unittest.py @@ -31,91 +31,126 @@ from ganeti.tools import ensure_dirs import testutils +def _MockStatResult(cb, mode, uid, gid): + def _fn(path): + if cb: + cb() + return { + stat.ST_MODE: mode, + stat.ST_UID: uid, + stat.ST_GID: gid, + } + return _fn + + +def _RaiseNoEntError(): + raise EnvironmentError(errno.ENOENT, "not found") + + +def _OtherStatRaise(): + raise EnvironmentError() + + class TestEnsureDirsFunctions(unittest.TestCase): - def _NoopMkdir(self, _): - self.mkdir_called = True - - @staticmethod - def _MockStatResult(mode, pre_fn=lambda: 0): - def _fn(path): - pre_fn() - return {stat.ST_MODE: mode} - return _fn + UID_A = 16024 + UID_B = 25850 + GID_A = 14028 + GID_B = 29801 - def _VerifyEnsure(self, path, mode, uid=-1, gid=-1): - self.assertEqual(path, "/ganeti-qa-non-test") - self.assertEqual(mode, 0700) - self.assertEqual(uid, 0) - self.assertEqual(gid, 0) + def setUp(self): + self._chown_calls = [] + self._chmod_calls = [] + self._mkdir_calls = [] + + def tearDown(self): + self.assertRaises(IndexError, self._mkdir_calls.pop) + self.assertRaises(IndexError, self._chmod_calls.pop) + self.assertRaises(IndexError, self._chown_calls.pop) - @staticmethod - def _RaiseNoEntError(): - noent_error = EnvironmentError() - noent_error.errno = errno.ENOENT - raise noent_error + def _FakeMkdir(self, path): + self._mkdir_calls.append(path) - @staticmethod - def _OtherStatRaise(): - raise EnvironmentError() + def _FakeChown(self, path, uid, gid): + self._chown_calls.append((path, uid, gid)) - def _ChmodWrapper(self, pre_fn=lambda: 0): + def _ChmodWrapper(self, cb): def _fn(path, mode): - self.chmod_called = True - pre_fn() + self._chmod_calls.append((path, mode)) + if cb: + cb() return _fn - def _NoopChown(self, path, uid, gid): - self.chown_called = True + def _VerifyEnsure(self, path, mode, uid=-1, gid=-1): + self.assertEqual(path, "/ganeti-qa-non-test") + self.assertEqual(mode, 0700) + self.assertEqual(uid, self.UID_A) + self.assertEqual(gid, self.GID_A) def testEnsureDir(self): - is_dir_stat = self._MockStatResult(stat.S_IFDIR) - not_dir_stat = self._MockStatResult(0) - non_exist_stat = self._MockStatResult(stat.S_IFDIR, - pre_fn=self._RaiseNoEntError) - other_stat_raise = self._MockStatResult(stat.S_IFDIR, - pre_fn=self._OtherStatRaise) + is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0) + ensure_dirs.EnsureDir("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A, + _lstat_fn=is_dir_stat, _ensure_fn=self._VerifyEnsure) + def testEnsureDirErrors(self): self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsureDir, "/ganeti-qa-non-test", 0700, 0, 0, - _lstat_fn=not_dir_stat) + _lstat_fn=_MockStatResult(None, 0, 0, 0)) + self.assertRaises(IndexError, self._mkdir_calls.pop) + + other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0) self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsureDir, "/ganeti-qa-non-test", 0700, 0, 0, _lstat_fn=other_stat_raise) - self.mkdir_called = False - ensure_dirs.EnsureDir("/ganeti-qa-non-test", 0700, 0, 0, - _lstat_fn=non_exist_stat, _mkdir_fn=self._NoopMkdir, + self.assertRaises(IndexError, self._mkdir_calls.pop) + + non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0) + ensure_dirs.EnsureDir("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A, + _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir, _ensure_fn=self._VerifyEnsure) - self.assertTrue(self.mkdir_called) - self.mkdir_called = False - ensure_dirs.EnsureDir("/ganeti-qa-non-test", 0700, 0, 0, - _lstat_fn=is_dir_stat, _ensure_fn=self._VerifyEnsure) - self.assertFalse(self.mkdir_called) + self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test") - def testEnsurePermission(self): - noent_chmod_fn = self._ChmodWrapper(pre_fn=self._RaiseNoEntError) + def testEnsurePermissionNoEnt(self): self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsurePermission, "/ganeti-qa-non-test", 0600, - _chmod_fn=noent_chmod_fn) - self.chmod_called = False + _chmod_fn=NotImplemented, _chown_fn=NotImplemented, + _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0)) + + def testEnsurePermissionNoEntMustNotExist(self): ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600, must_exist=False, - _chmod_fn=noent_chmod_fn) - self.assertTrue(self.chmod_called) + _chmod_fn=NotImplemented, + _chown_fn=NotImplemented, + _stat_fn=_MockStatResult(_RaiseNoEntError, + 0, 0, 0)) + + def testEnsurePermissionOtherErrorMustNotExist(self): self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsurePermission, "/ganeti-qa-non-test", 0600, must_exist=False, - _chmod_fn=self._ChmodWrapper(pre_fn=self._OtherStatRaise)) - self.chmod_called = False - self.chown_called = False + _chmod_fn=NotImplemented, _chown_fn=NotImplemented, + _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0)) + + def testEnsurePermissionNoChanges(self): + ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600, + _stat_fn=_MockStatResult(None, 0600, 0, 0), + _chmod_fn=self._ChmodWrapper(None), + _chown_fn=self._FakeChown) + + def testEnsurePermissionChangeMode(self): + ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0444, + _stat_fn=_MockStatResult(None, 0600, 0, 0), + _chmod_fn=self._ChmodWrapper(None), + _chown_fn=self._FakeChown) + self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444)) + + def testEnsurePermissionSetUidGid(self): ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600, - _chmod_fn=self._ChmodWrapper(), - _chown_fn=self._NoopChown) - self.assertTrue(self.chmod_called) - self.assertFalse(self.chown_called) - self.chmod_called = False - ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600, uid=1, gid=1, - _chmod_fn=self._ChmodWrapper(), - _chown_fn=self._NoopChown) - self.assertTrue(self.chmod_called) - self.assertTrue(self.chown_called) + uid=self.UID_B, gid=self.GID_B, + _stat_fn=_MockStatResult(None, 0600, + self.UID_A, + self.GID_A), + _chmod_fn=self._ChmodWrapper(None), + _chown_fn=self._FakeChown) + self.assertEqual(self._chown_calls.pop(0), + ("/ganeti-qa-non-test", self.UID_B, self.GID_B)) def testPaths(self): paths = [(path[0], path[1]) for path in ensure_dirs.GetPaths()]