Move bootstrap-related RPC to generated wrappers
[ganeti-local] / test / ganeti.tools.ensure_dirs_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.tools.ensure_dirs"""
23
24 import errno
25 import stat
26 import unittest
27 import os.path
28
29 from ganeti.tools import ensure_dirs
30
31 import testutils
32
33
34 def _MockStatResult(cb, mode, uid, gid):
35   def _fn(path):
36     if cb:
37       cb()
38     return {
39       stat.ST_MODE: mode,
40       stat.ST_UID: uid,
41       stat.ST_GID: gid,
42       }
43   return _fn
44
45
46 def _RaiseNoEntError():
47   raise EnvironmentError(errno.ENOENT, "not found")
48
49
50 def _OtherStatRaise():
51   raise EnvironmentError()
52
53
54 class TestEnsureDirsFunctions(unittest.TestCase):
55   UID_A = 16024
56   UID_B = 25850
57   GID_A = 14028
58   GID_B = 29801
59
60   def setUp(self):
61     self._chown_calls = []
62     self._chmod_calls = []
63     self._mkdir_calls = []
64
65   def tearDown(self):
66     self.assertRaises(IndexError, self._mkdir_calls.pop)
67     self.assertRaises(IndexError, self._chmod_calls.pop)
68     self.assertRaises(IndexError, self._chown_calls.pop)
69
70   def _FakeMkdir(self, path):
71     self._mkdir_calls.append(path)
72
73   def _FakeChown(self, path, uid, gid):
74     self._chown_calls.append((path, uid, gid))
75
76   def _ChmodWrapper(self, cb):
77     def _fn(path, mode):
78       self._chmod_calls.append((path, mode))
79       if cb:
80         cb()
81     return _fn
82
83   def _VerifyEnsure(self, path, mode, uid=-1, gid=-1):
84     self.assertEqual(path, "/ganeti-qa-non-test")
85     self.assertEqual(mode, 0700)
86     self.assertEqual(uid, self.UID_A)
87     self.assertEqual(gid, self.GID_A)
88
89   def testEnsureDir(self):
90     is_dir_stat = _MockStatResult(None, stat.S_IFDIR, 0, 0)
91     ensure_dirs.EnsureDir("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
92                           _lstat_fn=is_dir_stat, _ensure_fn=self._VerifyEnsure)
93
94   def testEnsureDirErrors(self):
95     self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsureDir,
96                       "/ganeti-qa-non-test", 0700, 0, 0,
97                       _lstat_fn=_MockStatResult(None, 0, 0, 0))
98     self.assertRaises(IndexError, self._mkdir_calls.pop)
99
100     other_stat_raise = _MockStatResult(_OtherStatRaise, stat.S_IFDIR, 0, 0)
101     self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsureDir,
102                       "/ganeti-qa-non-test", 0700, 0, 0,
103                       _lstat_fn=other_stat_raise)
104     self.assertRaises(IndexError, self._mkdir_calls.pop)
105
106     non_exist_stat = _MockStatResult(_RaiseNoEntError, stat.S_IFDIR, 0, 0)
107     ensure_dirs.EnsureDir("/ganeti-qa-non-test", 0700, self.UID_A, self.GID_A,
108                           _lstat_fn=non_exist_stat, _mkdir_fn=self._FakeMkdir,
109                           _ensure_fn=self._VerifyEnsure)
110     self.assertEqual(self._mkdir_calls.pop(0), "/ganeti-qa-non-test")
111
112   def testEnsurePermissionNoEnt(self):
113     self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsurePermission,
114                       "/ganeti-qa-non-test", 0600,
115                       _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
116                       _stat_fn=_MockStatResult(_RaiseNoEntError, 0, 0, 0))
117
118   def testEnsurePermissionNoEntMustNotExist(self):
119     ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600, must_exist=False,
120                                  _chmod_fn=NotImplemented,
121                                  _chown_fn=NotImplemented,
122                                  _stat_fn=_MockStatResult(_RaiseNoEntError,
123                                                           0, 0, 0))
124
125   def testEnsurePermissionOtherErrorMustNotExist(self):
126     self.assertRaises(ensure_dirs.EnsureError, ensure_dirs.EnsurePermission,
127                       "/ganeti-qa-non-test", 0600, must_exist=False,
128                       _chmod_fn=NotImplemented, _chown_fn=NotImplemented,
129                       _stat_fn=_MockStatResult(_OtherStatRaise, 0, 0, 0))
130
131   def testEnsurePermissionNoChanges(self):
132     ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600,
133                                  _stat_fn=_MockStatResult(None, 0600, 0, 0),
134                                  _chmod_fn=self._ChmodWrapper(None),
135                                  _chown_fn=self._FakeChown)
136
137   def testEnsurePermissionChangeMode(self):
138     ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0444,
139                                  _stat_fn=_MockStatResult(None, 0600, 0, 0),
140                                  _chmod_fn=self._ChmodWrapper(None),
141                                  _chown_fn=self._FakeChown)
142     self.assertEqual(self._chmod_calls.pop(0), ("/ganeti-qa-non-test", 0444))
143
144   def testEnsurePermissionSetUidGid(self):
145     ensure_dirs.EnsurePermission("/ganeti-qa-non-test", 0600,
146                                  uid=self.UID_B, gid=self.GID_B,
147                                  _stat_fn=_MockStatResult(None, 0600,
148                                                           self.UID_A,
149                                                           self.GID_A),
150                                  _chmod_fn=self._ChmodWrapper(None),
151                                  _chown_fn=self._FakeChown)
152     self.assertEqual(self._chown_calls.pop(0),
153                      ("/ganeti-qa-non-test", self.UID_B, self.GID_B))
154
155   def testPaths(self):
156     paths = [(path[0], path[1]) for path in ensure_dirs.GetPaths()]
157
158     seen = []
159     last_dirname = ""
160     for path, pathtype in paths:
161       self.assertTrue(pathtype in ensure_dirs.ALL_TYPES)
162       dirname = os.path.dirname(path)
163       if dirname != last_dirname or pathtype == ensure_dirs.DIR:
164         if pathtype == ensure_dirs.FILE:
165           self.assertFalse(dirname in seen,
166                            msg="path %s; dirname %s seen in %s" % (path,
167                                                                    dirname,
168                                                                    seen))
169           last_dirname = dirname
170           seen.append(dirname)
171         elif pathtype == ensure_dirs.DIR:
172           self.assertFalse(path in seen)
173           last_dirname = path
174           seen.append(path)
175
176
177 if __name__ == "__main__":
178   testutils.GanetiTestProgram()