Initial commit.
[ganeti-local] / testing / ganeti.hooks_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the hooks module"""
23
24
25 import unittest
26 import os
27 import time
28 import tempfile
29 import os.path
30
31 from ganeti import errors
32 from ganeti import opcodes
33 from ganeti import mcpu
34 from ganeti import backend
35 from ganeti import constants
36 from ganeti import cmdlib
37 from ganeti.constants import HKR_SUCCESS, HKR_FAIL, HKR_SKIP
38
39 from fake_config import FakeConfig
40
41 class FakeLU(cmdlib.LogicalUnit):
42   HPATH = "test"
43   def BuildHooksEnv(self):
44     return {}, ["localhost"], ["localhost"]
45
46 class TestHooksRunner(unittest.TestCase):
47   """Testing case for HooksRunner"""
48   def setUp(self):
49     self.torm = []
50     self.tmpdir = tempfile.mkdtemp()
51     self.torm.append((self.tmpdir, True))
52     self.logdir = tempfile.mkdtemp()
53     self.torm.append((self.logdir, True))
54     self.hpath = "fake"
55     self.ph_dirs = {}
56     for i in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
57       dname = "%s/%s-%s.d" % (self.tmpdir, self.hpath, i)
58       os.mkdir(dname)
59       self.torm.append((dname, True))
60       self.ph_dirs[i] = dname
61     self.hr = backend.HooksRunner(hooks_base_dir=self.tmpdir)
62
63   def tearDown(self):
64     self.torm.reverse()
65     for path, kind in self.torm:
66       if kind:
67         os.rmdir(path)
68       else:
69         os.unlink(path)
70
71   def _rname(self, fname):
72     return "/".join(fname.split("/")[-2:])
73
74   def testEmpty(self):
75     """Test no hooks"""
76     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
77       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}), [])
78
79   def testSkipNonExec(self):
80     """Test skip non-exec file"""
81     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
82       fname = "%s/test" % self.ph_dirs[phase]
83       f = open(fname, "w")
84       f.close()
85       self.torm.append((fname, False))
86       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
87                            [(self._rname(fname), HKR_SKIP, "")])
88
89   def testSkipInvalidName(self):
90     """Test skip script with invalid name"""
91     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
92       fname = "%s/a.off" % self.ph_dirs[phase]
93       f = open(fname, "w")
94       f.write("#!/bin/sh\nexit 0\n")
95       f.close()
96       os.chmod(fname, 0700)
97       self.torm.append((fname, False))
98       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
99                            [(self._rname(fname), HKR_SKIP, "")])
100
101   def testSkipDir(self):
102     """Test skip directory"""
103     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
104       fname = "%s/testdir" % self.ph_dirs[phase]
105       os.mkdir(fname)
106       self.torm.append((fname, True))
107       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
108                            [(self._rname(fname), HKR_SKIP, "")])
109
110   def testSuccess(self):
111     """Test success execution"""
112     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
113       fname = "%s/success" % self.ph_dirs[phase]
114       f = open(fname, "w")
115       f.write("#!/bin/sh\nexit 0\n")
116       f.close()
117       self.torm.append((fname, False))
118       os.chmod(fname, 0700)
119       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
120                            [(self._rname(fname), HKR_SUCCESS, "")])
121
122   def testSymlink(self):
123     """Test running a symlink"""
124     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
125       fname = "%s/success" % self.ph_dirs[phase]
126       os.symlink("/bin/true", fname)
127       self.torm.append((fname, False))
128       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
129                            [(self._rname(fname), HKR_SUCCESS, "")])
130
131   def testFail(self):
132     """Test success execution"""
133     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
134       fname = "%s/success" % self.ph_dirs[phase]
135       f = open(fname, "w")
136       f.write("#!/bin/sh\nexit 1\n")
137       f.close()
138       self.torm.append((fname, False))
139       os.chmod(fname, 0700)
140       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
141                            [(self._rname(fname), HKR_FAIL, "")])
142
143   def testCombined(self):
144     """Test success, failure and skip all in one test"""
145     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
146       expect = []
147       for fbase, ecode, rs in [("00succ", 0, HKR_SUCCESS),
148                                ("10fail", 1, HKR_FAIL),
149                                ("20inv.", 0, HKR_SKIP),
150                                ]:
151         fname = "%s/%s" % (self.ph_dirs[phase], fbase)
152         f = open(fname, "w")
153         f.write("#!/bin/sh\nexit %d\n" % ecode)
154         f.close()
155         self.torm.append((fname, False))
156         os.chmod(fname, 0700)
157         expect.append((self._rname(fname), rs, ""))
158       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}), expect)
159
160   def testOrdering(self):
161     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
162       expect = []
163       for fbase in ["10s1",
164                     "00s0",
165                     "10sa",
166                     "80sc",
167                     "60sd",
168                     ]:
169         fname = "%s/%s" % (self.ph_dirs[phase], fbase)
170         os.symlink("/bin/true", fname)
171         self.torm.append((fname, False))
172         expect.append((self._rname(fname), HKR_SUCCESS, ""))
173       expect.sort()
174       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}), expect)
175
176   def testEnv(self):
177     """Test environment execution"""
178     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
179       fbase = "success"
180       fname = "%s/%s" % (self.ph_dirs[phase], fbase)
181       os.symlink("/usr/bin/env", fname)
182       self.torm.append((fname, False))
183       env_snt = {"PHASE": phase}
184       env_exp = "PHASE=%s\n" % phase
185       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, env_snt),
186                            [(self._rname(fname), HKR_SUCCESS, env_exp)])
187
188
189 class TestHooksMaster(unittest.TestCase):
190   """Testing case for HooksMaster"""
191
192   def _call_false(*args):
193     """Fake call_hooks_runner function which returns False."""
194     return False
195
196   @staticmethod
197   def _call_nodes_false(node_list, hpath, phase, env):
198     """Fake call_hooks_runner function.
199
200     Returns:
201       - list of False values with the same len as the node_list argument
202
203     """
204     return [False for node_name in node_list]
205
206   @staticmethod
207   def _call_script_fail(node_list, hpath, phase, env):
208     """Fake call_hooks_runner function.
209
210     Returns:
211       - list of False values with the same len as the node_list argument
212
213     """
214     return dict([(node_name, [("unittest", constants.HKR_FAIL, "error")])
215                  for node_name in node_list])
216
217   @staticmethod
218   def _call_script_succeed(node_list, hpath, phase, env):
219     """Fake call_hooks_runner function.
220
221     Returns:
222       - list of False values with the same len as the node_list argument
223
224     """
225     return dict([(node_name, [("unittest", constants.HKR_SUCCESS, "ok")])
226                  for node_name in node_list])
227
228   def testTotalFalse(self):
229     """Test complete rpc failure"""
230     cfg = FakeConfig()
231     op = opcodes.OpCode()
232     lu = FakeLU(None, op, cfg, None)
233     hm = mcpu.HooksMaster(self._call_false, cfg, lu)
234     self.failUnlessRaises(errors.HooksFailure,
235                           hm.RunPhase, constants.HOOKS_PHASE_PRE)
236     hm.RunPhase(constants.HOOKS_PHASE_POST)
237
238   def testIndividualFalse(self):
239     """Test individual rpc failure"""
240     cfg = FakeConfig()
241     op = opcodes.OpCode()
242     lu = FakeLU(None, op, cfg, None)
243     hm = mcpu.HooksMaster(self._call_nodes_false, cfg, lu)
244     self.failUnlessRaises(errors.HooksFailure,
245                           hm.RunPhase, constants.HOOKS_PHASE_PRE)
246     hm.RunPhase(constants.HOOKS_PHASE_POST)
247
248   def testScriptFalse(self):
249     """Test individual rpc failure"""
250     cfg = FakeConfig()
251     op = opcodes.OpCode()
252     lu = FakeLU(None, op, cfg, None)
253     hm = mcpu.HooksMaster(self._call_script_fail, cfg, lu)
254     self.failUnlessRaises(errors.HooksAbort,
255                           hm.RunPhase, constants.HOOKS_PHASE_PRE)
256     hm.RunPhase(constants.HOOKS_PHASE_POST)
257
258   def testScriptSucceed(self):
259     """Test individual rpc failure"""
260     cfg = FakeConfig()
261     op = opcodes.OpCode()
262     lu = FakeLU(None, op, cfg, None)
263     hm = mcpu.HooksMaster(self._call_script_succeed, cfg, lu)
264     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
265       hm.RunPhase(phase)
266
267 if __name__ == '__main__':
268   unittest.main()