Fix unittests for ganeti-rapi
[ganeti-local] / test / ganeti.hooks_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the hooks module"""
23
24
25 import unittest
26 import os
27 import time
28 import tempfile
29 import os.path
30
31 from ganeti import errors
32 from ganeti import opcodes
33 from ganeti import mcpu
34 from ganeti import backend
35 from ganeti import constants
36 from ganeti import cmdlib
37 from ganeti.constants import HKR_SUCCESS, HKR_FAIL, HKR_SKIP
38
39 from mocks import FakeConfig, FakeSStore, FakeProc, FakeContext
40
41 class FakeLU(cmdlib.LogicalUnit):
42   HPATH = "test"
43   def BuildHooksEnv(self):
44     return {}, ["localhost"], ["localhost"]
45
46 class TestHooksRunner(unittest.TestCase):
47   """Testing case for HooksRunner"""
48   def setUp(self):
49     self.torm = []
50     self.tmpdir = tempfile.mkdtemp()
51     self.torm.append((self.tmpdir, True))
52     self.logdir = tempfile.mkdtemp()
53     self.torm.append((self.logdir, True))
54     self.hpath = "fake"
55     self.ph_dirs = {}
56     for i in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
57       dname = "%s/%s-%s.d" % (self.tmpdir, self.hpath, i)
58       os.mkdir(dname)
59       self.torm.append((dname, True))
60       self.ph_dirs[i] = dname
61     self.hr = backend.HooksRunner(hooks_base_dir=self.tmpdir)
62
63   def tearDown(self):
64     self.torm.reverse()
65     for path, kind in self.torm:
66       if kind:
67         os.rmdir(path)
68       else:
69         os.unlink(path)
70
71   def _rname(self, fname):
72     return "/".join(fname.split("/")[-2:])
73
74   def testEmpty(self):
75     """Test no hooks"""
76     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
77       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}), [])
78
79   def testSkipNonExec(self):
80     """Test skip non-exec file"""
81     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
82       fname = "%s/test" % self.ph_dirs[phase]
83       f = open(fname, "w")
84       f.close()
85       self.torm.append((fname, False))
86       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
87                            [(self._rname(fname), HKR_SKIP, "")])
88
89   def testSkipInvalidName(self):
90     """Test skip script with invalid name"""
91     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
92       fname = "%s/a.off" % self.ph_dirs[phase]
93       f = open(fname, "w")
94       f.write("#!/bin/sh\nexit 0\n")
95       f.close()
96       os.chmod(fname, 0700)
97       self.torm.append((fname, False))
98       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
99                            [(self._rname(fname), HKR_SKIP, "")])
100
101   def testSkipDir(self):
102     """Test skip directory"""
103     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
104       fname = "%s/testdir" % self.ph_dirs[phase]
105       os.mkdir(fname)
106       self.torm.append((fname, True))
107       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
108                            [(self._rname(fname), HKR_SKIP, "")])
109
110   def testSuccess(self):
111     """Test success execution"""
112     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
113       fname = "%s/success" % self.ph_dirs[phase]
114       f = open(fname, "w")
115       f.write("#!/bin/sh\nexit 0\n")
116       f.close()
117       self.torm.append((fname, False))
118       os.chmod(fname, 0700)
119       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
120                            [(self._rname(fname), HKR_SUCCESS, "")])
121
122   def testSymlink(self):
123     """Test running a symlink"""
124     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
125       fname = "%s/success" % self.ph_dirs[phase]
126       os.symlink("/bin/true", fname)
127       self.torm.append((fname, False))
128       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
129                            [(self._rname(fname), HKR_SUCCESS, "")])
130
131   def testFail(self):
132     """Test success execution"""
133     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
134       fname = "%s/success" % self.ph_dirs[phase]
135       f = open(fname, "w")
136       f.write("#!/bin/sh\nexit 1\n")
137       f.close()
138       self.torm.append((fname, False))
139       os.chmod(fname, 0700)
140       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
141                            [(self._rname(fname), HKR_FAIL, "")])
142
143   def testCombined(self):
144     """Test success, failure and skip all in one test"""
145     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
146       expect = []
147       for fbase, ecode, rs in [("00succ", 0, HKR_SUCCESS),
148                                ("10fail", 1, HKR_FAIL),
149                                ("20inv.", 0, HKR_SKIP),
150                                ]:
151         fname = "%s/%s" % (self.ph_dirs[phase], fbase)
152         f = open(fname, "w")
153         f.write("#!/bin/sh\nexit %d\n" % ecode)
154         f.close()
155         self.torm.append((fname, False))
156         os.chmod(fname, 0700)
157         expect.append((self._rname(fname), rs, ""))
158       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}), expect)
159
160   def testOrdering(self):
161     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
162       expect = []
163       for fbase in ["10s1",
164                     "00s0",
165                     "10sa",
166                     "80sc",
167                     "60sd",
168                     ]:
169         fname = "%s/%s" % (self.ph_dirs[phase], fbase)
170         os.symlink("/bin/true", fname)
171         self.torm.append((fname, False))
172         expect.append((self._rname(fname), HKR_SUCCESS, ""))
173       expect.sort()
174       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}), expect)
175
176   def testEnv(self):
177     """Test environment execution"""
178     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
179       fbase = "success"
180       fname = "%s/%s" % (self.ph_dirs[phase], fbase)
181       os.symlink("/usr/bin/env", fname)
182       self.torm.append((fname, False))
183       env_snt = {"PHASE": phase}
184       env_exp = "PHASE=%s\n" % phase
185       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, env_snt),
186                            [(self._rname(fname), HKR_SUCCESS, env_exp)])
187
188
189 class TestHooksMaster(unittest.TestCase):
190   """Testing case for HooksMaster"""
191
192   def _call_false(*args):
193     """Fake call_hooks_runner function which returns False."""
194     return False
195
196   @staticmethod
197   def _call_nodes_false(node_list, hpath, phase, env):
198     """Fake call_hooks_runner function.
199
200     Returns:
201       - list of False values with the same len as the node_list argument
202
203     """
204     return [False for node_name in node_list]
205
206   @staticmethod
207   def _call_script_fail(node_list, hpath, phase, env):
208     """Fake call_hooks_runner function.
209
210     Returns:
211       - list of False values with the same len as the node_list argument
212
213     """
214     return dict([(node_name, [("unittest", constants.HKR_FAIL, "error")])
215                  for node_name in node_list])
216
217   @staticmethod
218   def _call_script_succeed(node_list, hpath, phase, env):
219     """Fake call_hooks_runner function.
220
221     Returns:
222       - list of False values with the same len as the node_list argument
223
224     """
225     return dict([(node_name, [("unittest", constants.HKR_SUCCESS, "ok")])
226                  for node_name in node_list])
227
228   def setUp(self):
229     self.sstore = FakeSStore()
230     self.op = opcodes.OpCode()
231     self.context = FakeContext()
232     self.lu = FakeLU(None, self.op, self.context, self.sstore)
233
234   def testTotalFalse(self):
235     """Test complete rpc failure"""
236     hm = mcpu.HooksMaster(self._call_false, FakeProc(), self.lu)
237     self.failUnlessRaises(errors.HooksFailure,
238                           hm.RunPhase, constants.HOOKS_PHASE_PRE)
239     hm.RunPhase(constants.HOOKS_PHASE_POST)
240
241   def testIndividualFalse(self):
242     """Test individual node failure"""
243     hm = mcpu.HooksMaster(self._call_nodes_false, FakeProc(), self.lu)
244     hm.RunPhase(constants.HOOKS_PHASE_PRE)
245     #self.failUnlessRaises(errors.HooksFailure,
246     #                      hm.RunPhase, constants.HOOKS_PHASE_PRE)
247     hm.RunPhase(constants.HOOKS_PHASE_POST)
248
249   def testScriptFalse(self):
250     """Test individual rpc failure"""
251     hm = mcpu.HooksMaster(self._call_script_fail, FakeProc(), self.lu)
252     self.failUnlessRaises(errors.HooksAbort,
253                           hm.RunPhase, constants.HOOKS_PHASE_PRE)
254     hm.RunPhase(constants.HOOKS_PHASE_POST)
255
256   def testScriptSucceed(self):
257     """Test individual rpc failure"""
258     hm = mcpu.HooksMaster(self._call_script_succeed, FakeProc(), self.lu)
259     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
260       hm.RunPhase(phase)
261
262 if __name__ == '__main__':
263   unittest.main()