5826f9b3e89fa2a3684dffa9041a1109dddf0ab5
[ganeti-local] / test / ganeti.hooks_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the hooks module"""
23
24
25 import unittest
26 import os
27 import time
28 import tempfile
29 import os.path
30
31 from ganeti import errors
32 from ganeti import opcodes
33 from ganeti import mcpu
34 from ganeti import backend
35 from ganeti import constants
36 from ganeti import cmdlib
37 from ganeti import rpc
38 from ganeti.constants import HKR_SUCCESS, HKR_FAIL, HKR_SKIP
39
40 from mocks import FakeConfig, FakeProc, FakeContext
41
42 class FakeLU(cmdlib.LogicalUnit):
43   HPATH = "test"
44   def BuildHooksEnv(self):
45     return {}, ["localhost"], ["localhost"]
46
47 class TestHooksRunner(unittest.TestCase):
48   """Testing case for HooksRunner"""
49   def setUp(self):
50     self.torm = []
51     self.tmpdir = tempfile.mkdtemp()
52     self.torm.append((self.tmpdir, True))
53     self.logdir = tempfile.mkdtemp()
54     self.torm.append((self.logdir, True))
55     self.hpath = "fake"
56     self.ph_dirs = {}
57     for i in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
58       dname = "%s/%s-%s.d" % (self.tmpdir, self.hpath, i)
59       os.mkdir(dname)
60       self.torm.append((dname, True))
61       self.ph_dirs[i] = dname
62     self.hr = backend.HooksRunner(hooks_base_dir=self.tmpdir)
63
64   def tearDown(self):
65     self.torm.reverse()
66     for path, kind in self.torm:
67       if kind:
68         os.rmdir(path)
69       else:
70         os.unlink(path)
71
72   def _rname(self, fname):
73     return "/".join(fname.split("/")[-2:])
74
75   def testEmpty(self):
76     """Test no hooks"""
77     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
78       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}), [])
79
80   def testSkipNonExec(self):
81     """Test skip non-exec file"""
82     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
83       fname = "%s/test" % self.ph_dirs[phase]
84       f = open(fname, "w")
85       f.close()
86       self.torm.append((fname, False))
87       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
88                            [(self._rname(fname), HKR_SKIP, "")])
89
90   def testSkipInvalidName(self):
91     """Test skip script with invalid name"""
92     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
93       fname = "%s/a.off" % self.ph_dirs[phase]
94       f = open(fname, "w")
95       f.write("#!/bin/sh\nexit 0\n")
96       f.close()
97       os.chmod(fname, 0700)
98       self.torm.append((fname, False))
99       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
100                            [(self._rname(fname), HKR_SKIP, "")])
101
102   def testSkipDir(self):
103     """Test skip directory"""
104     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
105       fname = "%s/testdir" % self.ph_dirs[phase]
106       os.mkdir(fname)
107       self.torm.append((fname, True))
108       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
109                            [(self._rname(fname), HKR_SKIP, "")])
110
111   def testSuccess(self):
112     """Test success execution"""
113     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
114       fname = "%s/success" % self.ph_dirs[phase]
115       f = open(fname, "w")
116       f.write("#!/bin/sh\nexit 0\n")
117       f.close()
118       self.torm.append((fname, False))
119       os.chmod(fname, 0700)
120       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
121                            [(self._rname(fname), HKR_SUCCESS, "")])
122
123   def testSymlink(self):
124     """Test running a symlink"""
125     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
126       fname = "%s/success" % self.ph_dirs[phase]
127       os.symlink("/bin/true", fname)
128       self.torm.append((fname, False))
129       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
130                            [(self._rname(fname), HKR_SUCCESS, "")])
131
132   def testFail(self):
133     """Test success execution"""
134     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
135       fname = "%s/success" % self.ph_dirs[phase]
136       f = open(fname, "w")
137       f.write("#!/bin/sh\nexit 1\n")
138       f.close()
139       self.torm.append((fname, False))
140       os.chmod(fname, 0700)
141       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}),
142                            [(self._rname(fname), HKR_FAIL, "")])
143
144   def testCombined(self):
145     """Test success, failure and skip all in one test"""
146     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
147       expect = []
148       for fbase, ecode, rs in [("00succ", 0, HKR_SUCCESS),
149                                ("10fail", 1, HKR_FAIL),
150                                ("20inv.", 0, HKR_SKIP),
151                                ]:
152         fname = "%s/%s" % (self.ph_dirs[phase], fbase)
153         f = open(fname, "w")
154         f.write("#!/bin/sh\nexit %d\n" % ecode)
155         f.close()
156         self.torm.append((fname, False))
157         os.chmod(fname, 0700)
158         expect.append((self._rname(fname), rs, ""))
159       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}), expect)
160
161   def testOrdering(self):
162     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
163       expect = []
164       for fbase in ["10s1",
165                     "00s0",
166                     "10sa",
167                     "80sc",
168                     "60sd",
169                     ]:
170         fname = "%s/%s" % (self.ph_dirs[phase], fbase)
171         os.symlink("/bin/true", fname)
172         self.torm.append((fname, False))
173         expect.append((self._rname(fname), HKR_SUCCESS, ""))
174       expect.sort()
175       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, {}), expect)
176
177   def testEnv(self):
178     """Test environment execution"""
179     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
180       fbase = "success"
181       fname = "%s/%s" % (self.ph_dirs[phase], fbase)
182       os.symlink("/usr/bin/env", fname)
183       self.torm.append((fname, False))
184       env_snt = {"PHASE": phase}
185       env_exp = "PHASE=%s" % phase
186       self.failUnlessEqual(self.hr.RunHooks(self.hpath, phase, env_snt),
187                            [(self._rname(fname), HKR_SUCCESS, env_exp)])
188
189
190 class TestHooksMaster(unittest.TestCase):
191   """Testing case for HooksMaster"""
192
193   def _call_false(*args):
194     """Fake call_hooks_runner function which returns False."""
195     return False
196
197   @staticmethod
198   def _call_nodes_false(node_list, hpath, phase, env):
199     """Fake call_hooks_runner function.
200
201     @rtype: dict of node -> L{rpc.RpcResult} with an rpc error
202     @return: rpc failure from all nodes
203
204     """
205     return dict([(node, rpc.RpcResult('error', failed=True,
206                   node=node, call='FakeError')) for node in node_list])
207
208   @staticmethod
209   def _call_script_fail(node_list, hpath, phase, env):
210     """Fake call_hooks_runner function.
211
212     @rtype: dict of node -> L{rpc.RpcResult} with a failed script result
213     @return: script execution failure from all nodes
214
215     """
216     rr = rpc.RpcResult
217     return dict([(node, rr((True, [("utest", constants.HKR_FAIL, "err")]),
218                            node=node, call='FakeScriptFail'))
219                   for node in node_list])
220
221   @staticmethod
222   def _call_script_succeed(node_list, hpath, phase, env):
223     """Fake call_hooks_runner function.
224
225     @rtype: dict of node -> L{rpc.RpcResult} with a successful script result
226     @return: script execution from all nodes
227
228     """
229     rr = rpc.RpcResult
230     return dict([(node, rr(True, [("utest", constants.HKR_SUCCESS, "ok")],
231                            node=node, call='FakeScriptOk'))
232                  for node in node_list])
233
234   def setUp(self):
235     self.op = opcodes.OpCode()
236     self.context = FakeContext()
237     # WARNING: here we pass None as RpcRunner instance since we know
238     # our usage via HooksMaster will not use lu.rpc
239     self.lu = FakeLU(FakeProc(), self.op, self.context, None)
240
241   def testTotalFalse(self):
242     """Test complete rpc failure"""
243     hm = mcpu.HooksMaster(self._call_false, FakeProc(), self.lu)
244     self.failUnlessRaises(errors.HooksFailure,
245                           hm.RunPhase, constants.HOOKS_PHASE_PRE)
246     hm.RunPhase(constants.HOOKS_PHASE_POST)
247
248   def testIndividualFalse(self):
249     """Test individual node failure"""
250     hm = mcpu.HooksMaster(self._call_nodes_false, FakeProc(), self.lu)
251     hm.RunPhase(constants.HOOKS_PHASE_PRE)
252     #self.failUnlessRaises(errors.HooksFailure,
253     #                      hm.RunPhase, constants.HOOKS_PHASE_PRE)
254     hm.RunPhase(constants.HOOKS_PHASE_POST)
255
256   def testScriptFalse(self):
257     """Test individual rpc failure"""
258     hm = mcpu.HooksMaster(self._call_script_fail, FakeProc(), self.lu)
259     self.failUnlessRaises(errors.HooksAbort,
260                           hm.RunPhase, constants.HOOKS_PHASE_PRE)
261     hm.RunPhase(constants.HOOKS_PHASE_POST)
262
263   def testScriptSucceed(self):
264     """Test individual rpc failure"""
265     hm = mcpu.HooksMaster(self._call_script_succeed, FakeProc(), self.lu)
266     for phase in (constants.HOOKS_PHASE_PRE, constants.HOOKS_PHASE_POST):
267       hm.RunPhase(phase)
268
269 if __name__ == '__main__':
270   unittest.main()