+def _MockStatResult(cb, mode, uid, gid):
+ def _fn(path):
+ if cb:
+ cb()
+ return {
+ stat.ST_MODE: mode,
+ stat.ST_UID: uid,
+ stat.ST_GID: gid,
+ }
+ return _fn
+
+
+def _RaiseNoEntError():
+ raise EnvironmentError(errno.ENOENT, "not found")
+
+
+def _OtherStatRaise():
+ raise EnvironmentError()
+
+
+class TestPermissionEnforcements(unittest.TestCase):
+ UID_A = 16024
+ UID_B = 25850
+ GID_A = 14028
+ GID_B = 29801
+
+ def setUp(self):
+ self._chown_calls = []
+ self._chmod_calls = []
+ self._mkdir_calls = []
+
+ def tearDown(self):
+ self.assertRaises(IndexError, self._mkdir_calls.pop)
+ self.assertRaises(IndexError, self._chmod_calls.pop)
+ self.assertRaises(IndexError, self._chown_calls.pop)
+
+ def _FakeMkdir(self, path):
+ self._mkdir_calls.append(path)
+
+ def _FakeChown(self, path, uid, gid):
+ self._chown_calls.append((path, uid, gid))
+
+ def _ChmodWrapper(self, cb):
+ def _fn(path, mode):
+ self._chmod_calls.append((path, mode))
+ if cb:
+ cb()
+ return _fn
+
+ def _VerifyPerm(self, path, mode, uid=-1, gid=-1):
+ self.assertEqual(path, "/ganeti-qa-non-test")
+ self.assertEqual(mode, 0700)
+ self.assertEqual(uid, self.UID_A)
+ self.assertEqual(gid, self.GID_A)
+
+ def testMakeDirWithPerm(self):
+ is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
+ utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
+ _lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm)
+
+ def testDirErrors(self):
+ self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
+ "/ganeti-qa-non-test", 0700, 0, 0,
+ _lstat_fn=_MockStatResult(None, 0, 0, 0))
+ self.assertRaises(IndexError, self._mkdir_calls.pop)
+
+ other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
+ self.assertRaises(errors.GenericError, utils.MakeDirWithPerm,
+ "/ganeti-qa-non-test", 0700, 0, 0,
+ _lstat_fn=other_stat_raise)
+ self.assertRaises(IndexError, self._mkdir_calls.pop)
+
+ non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
+ utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
+ _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
+ _perm_fn=self._VerifyPerm)
+ self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
+
+ def testEnforcePermissionNoEnt(self):
+ self.assertRaises(errors.GenericError, utils.EnforcePermission,
+ "/ganeti-qa-non-test", 0600,
+ _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
+ _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
+
+ def testEnforcePermissionNoEntMustNotExist(self):
+ utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False,
+ _chmod_fn=NotImplemented,
+ _chown_fn=NotImplemented,
+ _stat_fn=_MockStatResult(_RaiseNoEntError,
+ 0, 0, 0))
+
+ def testEnforcePermissionOtherErrorMustNotExist(self):
+ self.assertRaises(errors.GenericError, utils.EnforcePermission,
+ "/ganeti-qa-non-test", 0600, must_exist=False,
+ _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
+ _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
+
+ def testEnforcePermissionNoChanges(self):
+ utils.EnforcePermission("/ganeti-qa-non-test", 0600,
+ _stat_fn=_MockStatResult(None, 0600, 0, 0),
+ _chmod_fn=self._ChmodWrapper(None),
+ _chown_fn=self._FakeChown)
+
+ def testEnforcePermissionChangeMode(self):
+ utils.EnforcePermission("/ganeti-qa-non-test", 0444,
+ _stat_fn=_MockStatResult(None, 0600, 0, 0),
+ _chmod_fn=self._ChmodWrapper(None),
+ _chown_fn=self._FakeChown)
+ self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
+
+ def testEnforcePermissionSetUidGid(self):
+ utils.EnforcePermission("/ganeti-qa-non-test", 0600,
+ uid=self.UID_B, gid=self.GID_B,
+ _stat_fn=_MockStatResult(None, 0600,
+ self.UID_A,
+ self.GID_A),
+ _chmod_fn=self._ChmodWrapper(None),
+ _chown_fn=self._FakeChown)
+ self.assertEqual(self._chown_calls.pop(0),
+ ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
+
+