Use setUp/tearDown for tests using temporary files.
[ganeti-local] / test / ganeti.hooks_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the hooks module"""
23
24
25 import unittest
26 import os
27 import time
28 import tempfile
29 import os.path
30
31 from ganeti import errors
32 from ganeti import opcodes
33 from ganeti import mcpu
34 from ganeti import backend
35 from ganeti import constants
36 from ganeti import cmdlib
37 from ganeti.constants import HKR_SUCCESS, HKR_FAIL, HKR_SKIP
38
39 from mocks import FakeConfig, FakeSStore, FakeProc
40
41 class FakeLU(cmdlib.LogicalUnit):
42   HPATH = "test"
43   def BuildHooksEnv(self):
44     return {}, ["localhost"], ["localhost"]
45
46 class TestHooksRunner(unittest.TestCase):
47   """Testing case for HooksRunner"""
48   def setUp(self):
49     self.torm = []
50     self.tmpdir = tempfile.mkdtemp()
51     self.torm.append((self.tmpdir, True))
52     self.logdir = tempfile.mkdtemp()
53     self.torm.append((self.logdir, True))
54     self.hpath = "fake"
55     self.ph_dirs = {}
56     for i in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
57       dname = "%s/%s-%s.d" % (self.tmpdir, self.hpath, i)
58       os.mkdir(dname)
59       self.torm.append((dname, True))
60       self.ph_dirs[i] = dname
61     self.hr = backend.HooksRunner(hooks_base_dir=self.tmpdir)
62
63   def tearDown(self):
64     self.torm.reverse()
65     for path, kind in self.torm:
66       if kind:
67         os.rmdir(path)
68       else:
69         os.unlink(path)
70
71   def _rname(self, fname):
72     return "/".join(fname.split("/")[-2:])
73
74   def testEmpty(self):
75     """Test no hooks"""
76     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
77       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}), [])
78
79   def testSkipNonExec(self):
80     """Test skip non-exec file"""
81     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
82       fname = "%s/test" % self.ph_dirs[phase]
83       f = open(fname, "w")
84       f.close()
85       self.torm.append((fname, False))
86       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
87                            [(self._rname(fname), HKR_SKIP, "")])
88
89   def testSkipInvalidName(self):
90     """Test skip script with invalid name"""
91     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
92       fname = "%s/a.off" % self.ph_dirs[phase]
93       f = open(fname, "w")
94       f.write("#!/bin/sh\nexit 0\n")
95       f.close()
96       os.chmod(fname, 0700)
97       self.torm.append((fname, False))
98       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
99                            [(self._rname(fname), HKR_SKIP, "")])
100
101   def testSkipDir(self):
102     """Test skip directory"""
103     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
104       fname = "%s/testdir" % self.ph_dirs[phase]
105       os.mkdir(fname)
106       self.torm.append((fname, True))
107       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
108                            [(self._rname(fname), HKR_SKIP, "")])
109
110   def testSuccess(self):
111     """Test success execution"""
112     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
113       fname = "%s/success" % self.ph_dirs[phase]
114       f = open(fname, "w")
115       f.write("#!/bin/sh\nexit 0\n")
116       f.close()
117       self.torm.append((fname, False))
118       os.chmod(fname, 0700)
119       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
120                            [(self._rname(fname), HKR_SUCCESS, "")])
121
122   def testSymlink(self):
123     """Test running a symlink"""
124     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
125       fname = "%s/success" % self.ph_dirs[phase]
126       os.symlink("/bin/true", fname)
127       self.torm.append((fname, False))
128       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
129                            [(self._rname(fname), HKR_SUCCESS, "")])
130
131   def testFail(self):
132     """Test success execution"""
133     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
134       fname = "%s/success" % self.ph_dirs[phase]
135       f = open(fname, "w")
136       f.write("#!/bin/sh\nexit 1\n")
137       f.close()
138       self.torm.append((fname, False))
139       os.chmod(fname, 0700)
140       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
141                            [(self._rname(fname), HKR_FAIL, "")])
142
143   def testCombined(self):
144     """Test success, failure and skip all in one test"""
145     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
146       expect = []
147       for fbase, ecode, rs in [("00succ", 0, HKR_SUCCESS),
148                                ("10fail", 1, HKR_FAIL),
149                                ("20inv.", 0, HKR_SKIP),
150                                ]:
151         fname = "%s/%s" % (self.ph_dirs[phase], fbase)
152         f = open(fname, "w")
153         f.write("#!/bin/sh\nexit %d\n" % ecode)
154         f.close()
155         self.torm.append((fname, False))
156         os.chmod(fname, 0700)
157         expect.append((self._rname(fname), rs, ""))
158       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}), expect)
159
160   def testOrdering(self):
161     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
162       expect = []
163       for fbase in ["10s1",
164                     "00s0",
165                     "10sa",
166                     "80sc",
167                     "60sd",
168                     ]:
169         fname = "%s/%s" % (self.ph_dirs[phase], fbase)
170         os.symlink("/bin/true", fname)
171         self.torm.append((fname, False))
172         expect.append((self._rname(fname), HKR_SUCCESS, ""))
173       expect.sort()
174       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}), expect)
175
176   def testEnv(self):
177     """Test environment execution"""
178     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
179       fbase = "success"
180       fname = "%s/%s" % (self.ph_dirs[phase], fbase)
181       os.symlink("/usr/bin/env", fname)
182       self.torm.append((fname, False))
183       env_snt = {"PHASE": phase}
184       env_exp = "PHASE=%s\n" % phase
185       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, env_snt),
186                            [(self._rname(fname), HKR_SUCCESS, env_exp)])
187
188
189 class TestHooksMaster(unittest.TestCase):
190   """Testing case for HooksMaster"""
191
192   def _call_false(*args):
193     """Fake call_hooks_runner function which returns False."""
194     return False
195
196   @staticmethod
197   def _call_nodes_false(node_list, hpath, phase, env):
198     """Fake call_hooks_runner function.
199
200     Returns:
201       - list of False values with the same len as the node_list argument
202
203     """
204     return [False for node_name in node_list]
205
206   @staticmethod
207   def _call_script_fail(node_list, hpath, phase, env):
208     """Fake call_hooks_runner function.
209
210     Returns:
211       - list of False values with the same len as the node_list argument
212
213     """
214     return dict([(node_name, [("unittest", constants.HKR_FAIL, "error")])
215                  for node_name in node_list])
216
217   @staticmethod
218   def _call_script_succeed(node_list, hpath, phase, env):
219     """Fake call_hooks_runner function.
220
221     Returns:
222       - list of False values with the same len as the node_list argument
223
224     """
225     return dict([(node_name, [("unittest", constants.HKR_SUCCESS, "ok")])
226                  for node_name in node_list])
227
228   def testTotalFalse(self):
229     """Test complete rpc failure"""
230     cfg = FakeConfig()
231     sstore = FakeSStore()
232     op = opcodes.OpCode()
233     lu = FakeLU(None, op, cfg, sstore)
234     hm = mcpu.HooksMaster(self._call_false, FakeProc(), lu)
235     self.failUnlessRaises(errors.HooksFailure,
236                           hm.RunPhase, constants.HOOKS_PHASE_PRE)
237     hm.RunPhase(constants.HOOKS_PHASE_POST)
238
239   def testIndividualFalse(self):
240     """Test individual node failure"""
241     cfg = FakeConfig()
242     sstore = FakeSStore()
243     op = opcodes.OpCode()
244     lu = FakeLU(None, op, cfg, sstore)
245     hm = mcpu.HooksMaster(self._call_nodes_false, FakeProc(), lu)
246     hm.RunPhase(constants.HOOKS_PHASE_PRE)
247     #self.failUnlessRaises(errors.HooksFailure,
248     #                      hm.RunPhase, constants.HOOKS_PHASE_PRE)
249     hm.RunPhase(constants.HOOKS_PHASE_POST)
250
251   def testScriptFalse(self):
252     """Test individual rpc failure"""
253     cfg = FakeConfig()
254     op = opcodes.OpCode()
255     sstore = FakeSStore()
256     lu = FakeLU(None, op, cfg, sstore)
257     hm = mcpu.HooksMaster(self._call_script_fail, FakeProc(), lu)
258     self.failUnlessRaises(errors.HooksAbort,
259                           hm.RunPhase, constants.HOOKS_PHASE_PRE)
260     hm.RunPhase(constants.HOOKS_PHASE_POST)
261
262   def testScriptSucceed(self):
263     """Test individual rpc failure"""
264     cfg = FakeConfig()
265     op = opcodes.OpCode()
266     sstore = FakeSStore()
267     lu = FakeLU(None, op, cfg, sstore)
268     hm = mcpu.HooksMaster(self._call_script_succeed, FakeProc(), lu)
269     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
270       hm.RunPhase(phase)
271
272 if __name__ == '__main__':
273   unittest.main()