Revision b81b3c96 test/ganeti.utils.io_unittest.py
b/test/ganeti.utils.io_unittest.py | ||
---|---|---|
28 | 28 |
import glob |
29 | 29 |
import time |
30 | 30 |
import signal |
31 |
import stat |
|
32 |
import errno |
|
31 | 33 |
|
32 | 34 |
from ganeti import constants |
33 | 35 |
from ganeti import utils |
... | ... | |
802 | 804 |
self.failUnless(utils.UUID_RE.match(utils.NewUUID())) |
803 | 805 |
|
804 | 806 |
|
807 |
def _MockStatResult(cb, mode, uid, gid): |
|
808 |
def _fn(path): |
|
809 |
if cb: |
|
810 |
cb() |
|
811 |
return { |
|
812 |
stat.ST_MODE: mode, |
|
813 |
stat.ST_UID: uid, |
|
814 |
stat.ST_GID: gid, |
|
815 |
} |
|
816 |
return _fn |
|
817 |
|
|
818 |
|
|
819 |
def _RaiseNoEntError(): |
|
820 |
raise EnvironmentError(errno.ENOENT, "not found") |
|
821 |
|
|
822 |
|
|
823 |
def _OtherStatRaise(): |
|
824 |
raise EnvironmentError() |
|
825 |
|
|
826 |
|
|
827 |
class TestPermissionEnforcements(unittest.TestCase): |
|
828 |
UID_A = 16024 |
|
829 |
UID_B = 25850 |
|
830 |
GID_A = 14028 |
|
831 |
GID_B = 29801 |
|
832 |
|
|
833 |
def setUp(self): |
|
834 |
self._chown_calls = [] |
|
835 |
self._chmod_calls = [] |
|
836 |
self._mkdir_calls = [] |
|
837 |
|
|
838 |
def tearDown(self): |
|
839 |
self.assertRaises(IndexError, self._mkdir_calls.pop) |
|
840 |
self.assertRaises(IndexError, self._chmod_calls.pop) |
|
841 |
self.assertRaises(IndexError, self._chown_calls.pop) |
|
842 |
|
|
843 |
def _FakeMkdir(self, path): |
|
844 |
self._mkdir_calls.append(path) |
|
845 |
|
|
846 |
def _FakeChown(self, path, uid, gid): |
|
847 |
self._chown_calls.append((path, uid, gid)) |
|
848 |
|
|
849 |
def _ChmodWrapper(self, cb): |
|
850 |
def _fn(path, mode): |
|
851 |
self._chmod_calls.append((path, mode)) |
|
852 |
if cb: |
|
853 |
cb() |
|
854 |
return _fn |
|
855 |
|
|
856 |
def _VerifyPerm(self, path, mode, uid=-1, gid=-1): |
|
857 |
self.assertEqual(path, "/ganeti-qa-non-test") |
|
858 |
self.assertEqual(mode, 0700) |
|
859 |
self.assertEqual(uid, self.UID_A) |
|
860 |
self.assertEqual(gid, self.GID_A) |
|
861 |
|
|
862 |
def testMakeDirWithPerm(self): |
|
863 |
is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0) |
|
864 |
utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A, |
|
865 |
_lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm) |
|
866 |
|
|
867 |
def testDirErrors(self): |
|
868 |
self.assertRaises(errors.GenericError, utils.MakeDirWithPerm, |
|
869 |
"/ganeti-qa-non-test", 0700, 0, 0, |
|
870 |
_lstat_fn=_MockStatResult(None, 0, 0, 0)) |
|
871 |
self.assertRaises(IndexError, self._mkdir_calls.pop) |
|
872 |
|
|
873 |
other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0) |
|
874 |
self.assertRaises(errors.GenericError, utils.MakeDirWithPerm, |
|
875 |
"/ganeti-qa-non-test", 0700, 0, 0, |
|
876 |
_lstat_fn=other_stat_raise) |
|
877 |
self.assertRaises(IndexError, self._mkdir_calls.pop) |
|
878 |
|
|
879 |
non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0) |
|
880 |
utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A, |
|
881 |
_lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir, |
|
882 |
_perm_fn=self._VerifyPerm) |
|
883 |
self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test") |
|
884 |
|
|
885 |
def testEnforcePermissionNoEnt(self): |
|
886 |
self.assertRaises(errors.GenericError, utils.EnforcePermission, |
|
887 |
"/ganeti-qa-non-test", 0600, |
|
888 |
_chmod_fn=NotImplemented, _chown_fn=NotImplemented, |
|
889 |
_stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0)) |
|
890 |
|
|
891 |
def testEnforcePermissionNoEntMustNotExist(self): |
|
892 |
utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False, |
|
893 |
_chmod_fn=NotImplemented, |
|
894 |
_chown_fn=NotImplemented, |
|
895 |
_stat_fn=_MockStatResult(_RaiseNoEntError, |
|
896 |
0, 0, 0)) |
|
897 |
|
|
898 |
def testEnforcePermissionOtherErrorMustNotExist(self): |
|
899 |
self.assertRaises(errors.GenericError, utils.EnforcePermission, |
|
900 |
"/ganeti-qa-non-test", 0600, must_exist=False, |
|
901 |
_chmod_fn=NotImplemented, _chown_fn=NotImplemented, |
|
902 |
_stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0)) |
|
903 |
|
|
904 |
def testEnforcePermissionNoChanges(self): |
|
905 |
utils.EnforcePermission("/ganeti-qa-non-test", 0600, |
|
906 |
_stat_fn=_MockStatResult(None, 0600, 0, 0), |
|
907 |
_chmod_fn=self._ChmodWrapper(None), |
|
908 |
_chown_fn=self._FakeChown) |
|
909 |
|
|
910 |
def testEnforcePermissionChangeMode(self): |
|
911 |
utils.EnforcePermission("/ganeti-qa-non-test", 0444, |
|
912 |
_stat_fn=_MockStatResult(None, 0600, 0, 0), |
|
913 |
_chmod_fn=self._ChmodWrapper(None), |
|
914 |
_chown_fn=self._FakeChown) |
|
915 |
self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444)) |
|
916 |
|
|
917 |
def testEnforcePermissionSetUidGid(self): |
|
918 |
utils.EnforcePermission("/ganeti-qa-non-test", 0600, |
|
919 |
uid=self.UID_B, gid=self.GID_B, |
|
920 |
_stat_fn=_MockStatResult(None, 0600, |
|
921 |
self.UID_A, |
|
922 |
self.GID_A), |
|
923 |
_chmod_fn=self._ChmodWrapper(None), |
|
924 |
_chown_fn=self._FakeChown) |
|
925 |
self.assertEqual(self._chown_calls.pop(0), |
|
926 |
("/ganeti-qa-non-test", self.UID_B, self.GID_B)) |
|
927 |
|
|
928 |
|
|
805 | 929 |
if __name__ == "__main__": |
806 | 930 |
testutils.GanetiTestProgram() |
Also available in: Unified diff