Revision b81b3c96
b/lib/tools/ensure_dirs.py | ||
---|---|---|
22 | 22 |
|
23 | 23 |
""" |
24 | 24 |
|
25 |
import errno |
|
26 | 25 |
import os |
27 | 26 |
import os.path |
28 | 27 |
import optparse |
29 | 28 |
import sys |
30 |
import stat |
|
31 | 29 |
import logging |
32 | 30 |
|
33 | 31 |
from ganeti import constants |
... | ... | |
49 | 47 |
]) |
50 | 48 |
|
51 | 49 |
|
52 |
class EnsureError(errors.GenericError): |
|
53 |
"""Top-level error class related to this script. |
|
54 |
|
|
55 |
""" |
|
56 |
|
|
57 |
|
|
58 |
def EnsurePermission(path, mode, uid=-1, gid=-1, must_exist=True, |
|
59 |
_chmod_fn=os.chmod, _chown_fn=os.chown, _stat_fn=os.stat): |
|
60 |
"""Ensures that given path has given mode. |
|
61 |
|
|
62 |
@param path: The path to the file |
|
63 |
@param mode: The mode of the file |
|
64 |
@param uid: The uid of the owner of this file |
|
65 |
@param gid: The gid of the owner of this file |
|
66 |
@param must_exist: Specifies if non-existance of path will be an error |
|
67 |
@param _chmod_fn: chmod function to use (unittest only) |
|
68 |
@param _chown_fn: chown function to use (unittest only) |
|
69 |
|
|
70 |
""" |
|
71 |
logging.debug("Checking %s", path) |
|
72 |
try: |
|
73 |
st = _stat_fn(path) |
|
74 |
|
|
75 |
fmode = stat.S_IMODE(st[stat.ST_MODE]) |
|
76 |
if fmode != mode: |
|
77 |
logging.debug("Changing mode of %s from %#o to %#o", path, fmode, mode) |
|
78 |
_chmod_fn(path, mode) |
|
79 |
|
|
80 |
if max(uid, gid) > -1: |
|
81 |
fuid = st[stat.ST_UID] |
|
82 |
fgid = st[stat.ST_GID] |
|
83 |
if fuid != uid or fgid != gid: |
|
84 |
logging.debug("Changing owner of %s from UID %s/GID %s to" |
|
85 |
" UID %s/GID %s", path, fuid, fgid, uid, gid) |
|
86 |
_chown_fn(path, uid, gid) |
|
87 |
except EnvironmentError, err: |
|
88 |
if err.errno == errno.ENOENT: |
|
89 |
if must_exist: |
|
90 |
raise EnsureError("Path %s should exist, but does not" % path) |
|
91 |
else: |
|
92 |
raise EnsureError("Error while changing permissions on %s: %s" % |
|
93 |
(path, err)) |
|
94 |
|
|
95 |
|
|
96 |
def EnsureDir(path, mode, uid, gid, _lstat_fn=os.lstat, _mkdir_fn=os.mkdir, |
|
97 |
_ensure_fn=EnsurePermission): |
|
98 |
"""Ensures that given path is a dir and has given mode, uid and gid set. |
|
99 |
|
|
100 |
@param path: The path to the file |
|
101 |
@param mode: The mode of the file |
|
102 |
@param uid: The uid of the owner of this file |
|
103 |
@param gid: The gid of the owner of this file |
|
104 |
@param _lstat_fn: Stat function to use (unittest only) |
|
105 |
@param _mkdir_fn: mkdir function to use (unittest only) |
|
106 |
@param _ensure_fn: ensure function to use (unittest only) |
|
107 |
|
|
108 |
""" |
|
109 |
logging.debug("Checking directory %s", path) |
|
110 |
try: |
|
111 |
# We don't want to follow symlinks |
|
112 |
st = _lstat_fn(path) |
|
113 |
except EnvironmentError, err: |
|
114 |
if err.errno != errno.ENOENT: |
|
115 |
raise EnsureError("stat(2) on %s failed: %s" % (path, err)) |
|
116 |
_mkdir_fn(path) |
|
117 |
else: |
|
118 |
if not stat.S_ISDIR(st[stat.ST_MODE]): |
|
119 |
raise EnsureError("Path %s is expected to be a directory, but isn't" % |
|
120 |
path) |
|
121 |
|
|
122 |
_ensure_fn(path, mode, uid=uid, gid=gid) |
|
123 |
|
|
124 |
|
|
125 | 50 |
def RecursiveEnsure(path, uid, gid, dir_perm, file_perm): |
126 | 51 |
"""Ensures permissions recursively down a directory. |
127 | 52 |
|
... | ... | |
141 | 66 |
|
142 | 67 |
for root, dirs, files in os.walk(path): |
143 | 68 |
for subdir in dirs: |
144 |
EnsurePermission(os.path.join(root, subdir), dir_perm, uid=uid, gid=gid) |
|
69 |
utils.EnforcePermission(os.path.join(root, subdir), dir_perm, uid=uid, |
|
70 |
gid=gid) |
|
145 | 71 |
|
146 | 72 |
for filename in files: |
147 |
EnsurePermission(os.path.join(root, filename), file_perm, uid=uid,
|
|
148 |
gid=gid) |
|
73 |
utils.EnforcePermission(os.path.join(root, filename), file_perm, uid=uid,
|
|
74 |
gid=gid)
|
|
149 | 75 |
|
150 | 76 |
|
151 | 77 |
def EnsureQueueDir(path, mode, uid, gid): |
... | ... | |
159 | 85 |
""" |
160 | 86 |
for filename in utils.ListVisibleFiles(path): |
161 | 87 |
if constants.JOB_FILE_RE.match(filename): |
162 |
EnsurePermission(utils.PathJoin(path, filename), mode, uid=uid, gid=gid) |
|
88 |
utils.EnforcePermission(utils.PathJoin(path, filename), mode, uid=uid, |
|
89 |
gid=gid) |
|
163 | 90 |
|
164 | 91 |
|
165 | 92 |
def ProcessPath(path): |
... | ... | |
176 | 103 |
# No additional parameters |
177 | 104 |
assert len(path[5:]) == 0 |
178 | 105 |
if pathtype == DIR: |
179 |
EnsureDir(pathname, mode, uid, gid)
|
|
106 |
utils.MakeDirWithPerm(pathname, mode, uid, gid)
|
|
180 | 107 |
elif pathtype == QUEUE_DIR: |
181 | 108 |
EnsureQueueDir(pathname, mode, uid, gid) |
182 | 109 |
elif pathtype == FILE: |
183 | 110 |
(must_exist, ) = path[5:] |
184 |
EnsurePermission(pathname, mode, uid=uid, gid=gid, must_exist=must_exist) |
|
111 |
utils.EnforcePermission(pathname, mode, uid=uid, gid=gid, |
|
112 |
must_exist=must_exist) |
|
185 | 113 |
|
186 | 114 |
|
187 | 115 |
def GetPaths(): |
... | ... | |
323 | 251 |
if opts.full_run: |
324 | 252 |
RecursiveEnsure(constants.JOB_QUEUE_ARCHIVE_DIR, getent.masterd_uid, |
325 | 253 |
getent.masterd_gid, 0700, 0600) |
326 |
except EnsureError, err:
|
|
254 |
except errors.GenericError, err:
|
|
327 | 255 |
logging.error("An error occurred while setting permissions: %s", err) |
328 | 256 |
return constants.EXIT_FAILURE |
329 | 257 |
|
b/lib/utils/io.py | ||
---|---|---|
28 | 28 |
import tempfile |
29 | 29 |
import errno |
30 | 30 |
import time |
31 |
import stat |
|
31 | 32 |
|
32 | 33 |
from ganeti import errors |
33 | 34 |
from ganeti import constants |
... | ... | |
331 | 332 |
raise |
332 | 333 |
|
333 | 334 |
|
335 |
def EnforcePermission(path, mode, uid=-1, gid=-1, must_exist=True, |
|
336 |
_chmod_fn=os.chmod, _chown_fn=os.chown, _stat_fn=os.stat): |
|
337 |
"""Enforces that given path has given permissions. |
|
338 |
|
|
339 |
@param path: The path to the file |
|
340 |
@param mode: The mode of the file |
|
341 |
@param uid: The uid of the owner of this file |
|
342 |
@param gid: The gid of the owner of this file |
|
343 |
@param must_exist: Specifies if non-existance of path will be an error |
|
344 |
@param _chmod_fn: chmod function to use (unittest only) |
|
345 |
@param _chown_fn: chown function to use (unittest only) |
|
346 |
|
|
347 |
""" |
|
348 |
logging.debug("Checking %s", path) |
|
349 |
try: |
|
350 |
st = _stat_fn(path) |
|
351 |
|
|
352 |
fmode = stat.S_IMODE(st[stat.ST_MODE]) |
|
353 |
if fmode != mode: |
|
354 |
logging.debug("Changing mode of %s from %#o to %#o", path, fmode, mode) |
|
355 |
_chmod_fn(path, mode) |
|
356 |
|
|
357 |
if max(uid, gid) > -1: |
|
358 |
fuid = st[stat.ST_UID] |
|
359 |
fgid = st[stat.ST_GID] |
|
360 |
if fuid != uid or fgid != gid: |
|
361 |
logging.debug("Changing owner of %s from UID %s/GID %s to" |
|
362 |
" UID %s/GID %s", path, fuid, fgid, uid, gid) |
|
363 |
_chown_fn(path, uid, gid) |
|
364 |
except EnvironmentError, err: |
|
365 |
if err.errno == errno.ENOENT: |
|
366 |
if must_exist: |
|
367 |
raise errors.GenericError("Path %s should exist, but does not" % path) |
|
368 |
else: |
|
369 |
raise errors.GenericError("Error while changing permissions on %s: %s" % |
|
370 |
(path, err)) |
|
371 |
|
|
372 |
|
|
373 |
def MakeDirWithPerm(path, mode, uid, gid, _lstat_fn=os.lstat, |
|
374 |
_mkdir_fn=os.mkdir, _perm_fn=EnforcePermission): |
|
375 |
"""Enforces that given path is a dir and has given mode, uid and gid set. |
|
376 |
|
|
377 |
@param path: The path to the file |
|
378 |
@param mode: The mode of the file |
|
379 |
@param uid: The uid of the owner of this file |
|
380 |
@param gid: The gid of the owner of this file |
|
381 |
@param _lstat_fn: Stat function to use (unittest only) |
|
382 |
@param _mkdir_fn: mkdir function to use (unittest only) |
|
383 |
@param _perm_fn: permission setter function to use (unittest only) |
|
384 |
|
|
385 |
""" |
|
386 |
logging.debug("Checking directory %s", path) |
|
387 |
try: |
|
388 |
# We don't want to follow symlinks |
|
389 |
st = _lstat_fn(path) |
|
390 |
except EnvironmentError, err: |
|
391 |
if err.errno != errno.ENOENT: |
|
392 |
raise errors.GenericError("stat(2) on %s failed: %s" % (path, err)) |
|
393 |
_mkdir_fn(path) |
|
394 |
else: |
|
395 |
if not stat.S_ISDIR(st[stat.ST_MODE]): |
|
396 |
raise errors.GenericError(("Path %s is expected to be a directory, but " |
|
397 |
"isn't") % path) |
|
398 |
|
|
399 |
_perm_fn(path, mode, uid=uid, gid=gid) |
|
400 |
|
|
401 |
|
|
334 | 402 |
def Makedirs(path, mode=0750): |
335 | 403 |
"""Super-mkdir; create a leaf directory and all intermediate ones. |
336 | 404 |
|
b/test/ganeti.tools.ensure_dirs_unittest.py | ||
---|---|---|
21 | 21 |
|
22 | 22 |
"""Script for testing ganeti.tools.ensure_dirs""" |
23 | 23 |
|
24 |
import errno |
|
25 |
import stat |
|
26 | 24 |
import unittest |
27 | 25 |
import os.path |
28 | 26 |
|
... | ... | |
31 | 29 |
import testutils |
32 | 30 |
|
33 | 31 |
|
34 |
def _MockStatResult(cb, mode, uid, gid): |
|
35 |
def _fn(path): |
|
36 |
if cb: |
|
37 |
cb() |
|
38 |
return { |
|
39 |
stat.ST_MODE: mode, |
|
40 |
stat.ST_UID: uid, |
|
41 |
stat.ST_GID: gid, |
|
42 |
} |
|
43 |
return _fn |
|
44 |
|
|
45 |
|
|
46 |
def _RaiseNoEntError(): |
|
47 |
raise EnvironmentError(errno.ENOENT, "not found") |
|
48 |
|
|
49 |
|
|
50 |
def _OtherStatRaise(): |
|
51 |
raise EnvironmentError() |
|
52 |
|
|
53 |
|
|
54 | 32 |
class TestEnsureDirsFunctions(unittest.TestCase): |
55 |
UID_A = 16024 |
|
56 |
UID_B = 25850 |
|
57 |
GID_A = 14028 |
|
58 |
GID_B = 29801 |
|
59 |
|
|
60 |
def setUp(self): |
|
61 |
self._chown_calls = [] |
|
62 |
self._chmod_calls = [] |
|
63 |
self._mkdir_calls = [] |
|
64 |
|
|
65 |
def tearDown(self): |
|
66 |
self.assertRaises(IndexError, self._mkdir_calls.pop) |
|
67 |
self.assertRaises(IndexError, self._chmod_calls.pop) |
|
68 |
self.assertRaises(IndexError, self._chown_calls.pop) |
|
69 |
|
|
70 |
def _FakeMkdir(self, path): |
|
71 |
self._mkdir_calls.append(path) |
|
72 |
|
|
73 |
def _FakeChown(self, path, uid, gid): |
|
74 |
self._chown_calls.append((path, uid, gid)) |
|
75 |
|
|
76 |
def _ChmodWrapper(self, cb): |
|
77 |
def _fn(path, mode): |
|
78 |
self._chmod_calls.append((path, mode)) |
|
79 |
if cb: |
|
80 |
cb() |
|
81 |
return _fn |
|
82 |
|
|
83 |
def _VerifyEnsure(self, path, mode, uid=-1, gid=-1): |
|
84 |
self.assertEqual(path, "/ganeti-qa-non-test") |
|
85 |
self.assertEqual(mode, 0700) |
|
86 |
self.assertEqual(uid, self.UID_A) |
|
87 |
self.assertEqual(gid, self.GID_A) |
|
88 |
|
|
89 |
def testEnsureDir(self): |
|
90 |
is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0) |
|
91 |
ensure_dirs.EnsureDir("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A, |
|
92 |
_lstat_fn=is_dir_stat, _ensure_fn=self._VerifyEnsure) |
|
93 |
|
|
94 |
def testEnsureDirErrors(self): |
|
95 |
self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsureDir, |
|
96 |
"/ganeti-qa-non-test", 0700, 0, 0, |
|
97 |
_lstat_fn=_MockStatResult(None, 0, 0, 0)) |
|
98 |
self.assertRaises(IndexError, self._mkdir_calls.pop) |
|
99 |
|
|
100 |
other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0) |
|
101 |
self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsureDir, |
|
102 |
"/ganeti-qa-non-test", 0700, 0, 0, |
|
103 |
_lstat_fn=other_stat_raise) |
|
104 |
self.assertRaises(IndexError, self._mkdir_calls.pop) |
|
105 |
|
|
106 |
non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0) |
|
107 |
ensure_dirs.EnsureDir("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A, |
|
108 |
_lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir, |
|
109 |
_ensure_fn=self._VerifyEnsure) |
|
110 |
self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test") |
|
111 |
|
|
112 |
def testEnsurePermissionNoEnt(self): |
|
113 |
self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsurePermission, |
|
114 |
"/ganeti-qa-non-test", 0600, |
|
115 |
_chmod_fn=NotImplemented, _chown_fn=NotImplemented, |
|
116 |
_stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0)) |
|
117 |
|
|
118 |
def testEnsurePermissionNoEntMustNotExist(self): |
|
119 |
ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600, must_exist=False, |
|
120 |
_chmod_fn=NotImplemented, |
|
121 |
_chown_fn=NotImplemented, |
|
122 |
_stat_fn=_MockStatResult(_RaiseNoEntError, |
|
123 |
0, 0, 0)) |
|
124 |
|
|
125 |
def testEnsurePermissionOtherErrorMustNotExist(self): |
|
126 |
self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsurePermission, |
|
127 |
"/ganeti-qa-non-test", 0600, must_exist=False, |
|
128 |
_chmod_fn=NotImplemented, _chown_fn=NotImplemented, |
|
129 |
_stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0)) |
|
130 |
|
|
131 |
def testEnsurePermissionNoChanges(self): |
|
132 |
ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600, |
|
133 |
_stat_fn=_MockStatResult(None, 0600, 0, 0), |
|
134 |
_chmod_fn=self._ChmodWrapper(None), |
|
135 |
_chown_fn=self._FakeChown) |
|
136 |
|
|
137 |
def testEnsurePermissionChangeMode(self): |
|
138 |
ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0444, |
|
139 |
_stat_fn=_MockStatResult(None, 0600, 0, 0), |
|
140 |
_chmod_fn=self._ChmodWrapper(None), |
|
141 |
_chown_fn=self._FakeChown) |
|
142 |
self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444)) |
|
143 |
|
|
144 |
def testEnsurePermissionSetUidGid(self): |
|
145 |
ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600, |
|
146 |
uid=self.UID_B, gid=self.GID_B, |
|
147 |
_stat_fn=_MockStatResult(None, 0600, |
|
148 |
self.UID_A, |
|
149 |
self.GID_A), |
|
150 |
_chmod_fn=self._ChmodWrapper(None), |
|
151 |
_chown_fn=self._FakeChown) |
|
152 |
self.assertEqual(self._chown_calls.pop(0), |
|
153 |
("/ganeti-qa-non-test", self.UID_B, self.GID_B)) |
|
154 |
|
|
155 | 33 |
def testPaths(self): |
156 | 34 |
paths = [(path[0], path[1]) for path in ensure_dirs.GetPaths()] |
157 | 35 |
|
b/test/ganeti.utils.io_unittest.py | ||
---|---|---|
28 | 28 |
import glob |
29 | 29 |
import time |
30 | 30 |
import signal |
31 |
import stat |
|
32 |
import errno |
|
31 | 33 |
|
32 | 34 |
from ganeti import constants |
33 | 35 |
from ganeti import utils |
... | ... | |
802 | 804 |
self.failUnless(utils.UUID_RE.match(utils.NewUUID())) |
803 | 805 |
|
804 | 806 |
|
807 |
def _MockStatResult(cb, mode, uid, gid): |
|
808 |
def _fn(path): |
|
809 |
if cb: |
|
810 |
cb() |
|
811 |
return { |
|
812 |
stat.ST_MODE: mode, |
|
813 |
stat.ST_UID: uid, |
|
814 |
stat.ST_GID: gid, |
|
815 |
} |
|
816 |
return _fn |
|
817 |
|
|
818 |
|
|
819 |
def _RaiseNoEntError(): |
|
820 |
raise EnvironmentError(errno.ENOENT, "not found") |
|
821 |
|
|
822 |
|
|
823 |
def _OtherStatRaise(): |
|
824 |
raise EnvironmentError() |
|
825 |
|
|
826 |
|
|
827 |
class TestPermissionEnforcements(unittest.TestCase): |
|
828 |
UID_A = 16024 |
|
829 |
UID_B = 25850 |
|
830 |
GID_A = 14028 |
|
831 |
GID_B = 29801 |
|
832 |
|
|
833 |
def setUp(self): |
|
834 |
self._chown_calls = [] |
|
835 |
self._chmod_calls = [] |
|
836 |
self._mkdir_calls = [] |
|
837 |
|
|
838 |
def tearDown(self): |
|
839 |
self.assertRaises(IndexError, self._mkdir_calls.pop) |
|
840 |
self.assertRaises(IndexError, self._chmod_calls.pop) |
|
841 |
self.assertRaises(IndexError, self._chown_calls.pop) |
|
842 |
|
|
843 |
def _FakeMkdir(self, path): |
|
844 |
self._mkdir_calls.append(path) |
|
845 |
|
|
846 |
def _FakeChown(self, path, uid, gid): |
|
847 |
self._chown_calls.append((path, uid, gid)) |
|
848 |
|
|
849 |
def _ChmodWrapper(self, cb): |
|
850 |
def _fn(path, mode): |
|
851 |
self._chmod_calls.append((path, mode)) |
|
852 |
if cb: |
|
853 |
cb() |
|
854 |
return _fn |
|
855 |
|
|
856 |
def _VerifyPerm(self, path, mode, uid=-1, gid=-1): |
|
857 |
self.assertEqual(path, "/ganeti-qa-non-test") |
|
858 |
self.assertEqual(mode, 0700) |
|
859 |
self.assertEqual(uid, self.UID_A) |
|
860 |
self.assertEqual(gid, self.GID_A) |
|
861 |
|
|
862 |
def testMakeDirWithPerm(self): |
|
863 |
is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0) |
|
864 |
utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A, |
|
865 |
_lstat_fn=is_dir_stat, _perm_fn=self._VerifyPerm) |
|
866 |
|
|
867 |
def testDirErrors(self): |
|
868 |
self.assertRaises(errors.GenericError, utils.MakeDirWithPerm, |
|
869 |
"/ganeti-qa-non-test", 0700, 0, 0, |
|
870 |
_lstat_fn=_MockStatResult(None, 0, 0, 0)) |
|
871 |
self.assertRaises(IndexError, self._mkdir_calls.pop) |
|
872 |
|
|
873 |
other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0) |
|
874 |
self.assertRaises(errors.GenericError, utils.MakeDirWithPerm, |
|
875 |
"/ganeti-qa-non-test", 0700, 0, 0, |
|
876 |
_lstat_fn=other_stat_raise) |
|
877 |
self.assertRaises(IndexError, self._mkdir_calls.pop) |
|
878 |
|
|
879 |
non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0) |
|
880 |
utils.MakeDirWithPerm("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A, |
|
881 |
_lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir, |
|
882 |
_perm_fn=self._VerifyPerm) |
|
883 |
self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test") |
|
884 |
|
|
885 |
def testEnforcePermissionNoEnt(self): |
|
886 |
self.assertRaises(errors.GenericError, utils.EnforcePermission, |
|
887 |
"/ganeti-qa-non-test", 0600, |
|
888 |
_chmod_fn=NotImplemented, _chown_fn=NotImplemented, |
|
889 |
_stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0)) |
|
890 |
|
|
891 |
def testEnforcePermissionNoEntMustNotExist(self): |
|
892 |
utils.EnforcePermission("/ganeti-qa-non-test", 0600, must_exist=False, |
|
893 |
_chmod_fn=NotImplemented, |
|
894 |
_chown_fn=NotImplemented, |
|
895 |
_stat_fn=_MockStatResult(_RaiseNoEntError, |
|
896 |
0, 0, 0)) |
|
897 |
|
|
898 |
def testEnforcePermissionOtherErrorMustNotExist(self): |
|
899 |
self.assertRaises(errors.GenericError, utils.EnforcePermission, |
|
900 |
"/ganeti-qa-non-test", 0600, must_exist=False, |
|
901 |
_chmod_fn=NotImplemented, _chown_fn=NotImplemented, |
|
902 |
_stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0)) |
|
903 |
|
|
904 |
def testEnforcePermissionNoChanges(self): |
|
905 |
utils.EnforcePermission("/ganeti-qa-non-test", 0600, |
|
906 |
_stat_fn=_MockStatResult(None, 0600, 0, 0), |
|
907 |
_chmod_fn=self._ChmodWrapper(None), |
|
908 |
_chown_fn=self._FakeChown) |
|
909 |
|
|
910 |
def testEnforcePermissionChangeMode(self): |
|
911 |
utils.EnforcePermission("/ganeti-qa-non-test", 0444, |
|
912 |
_stat_fn=_MockStatResult(None, 0600, 0, 0), |
|
913 |
_chmod_fn=self._ChmodWrapper(None), |
|
914 |
_chown_fn=self._FakeChown) |
|
915 |
self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444)) |
|
916 |
|
|
917 |
def testEnforcePermissionSetUidGid(self): |
|
918 |
utils.EnforcePermission("/ganeti-qa-non-test", 0600, |
|
919 |
uid=self.UID_B, gid=self.GID_B, |
|
920 |
_stat_fn=_MockStatResult(None, 0600, |
|
921 |
self.UID_A, |
|
922 |
self.GID_A), |
|
923 |
_chmod_fn=self._ChmodWrapper(None), |
|
924 |
_chown_fn=self._FakeChown) |
|
925 |
self.assertEqual(self._chown_calls.pop(0), |
|
926 |
("/ganeti-qa-non-test", self.UID_B, self.GID_B)) |
|
927 |
|
|
928 |
|
|
805 | 929 |
if __name__ == "__main__": |
806 | 930 |
testutils.GanetiTestProgram() |
Also available in: Unified diff