Extract base class from SingleFileEventHandler
[ganeti-local] / test / ganeti.asyncnotifier_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the asyncnotifier module"""
23
24 import unittest
25 import signal
26 import os
27 import tempfile
28 import shutil
29
30 try:
31   # pylint: disable-msg=E0611
32   from pyinotify import pyinotify
33 except ImportError:
34   import pyinotify
35
36 from ganeti import asyncnotifier
37 from ganeti import daemon
38 from ganeti import utils
39 from ganeti import errors
40
41 import testutils
42
43
44 class _MyErrorLoggingAsyncNotifier(asyncnotifier.ErrorLoggingAsyncNotifier):
45   def __init__(self, *args, **kwargs):
46     asyncnotifier.ErrorLoggingAsyncNotifier.__init__(self, *args, **kwargs)
47     self.error_count = 0
48
49   def handle_error(self):
50     self.error_count += 1
51     raise
52
53
54 class TestSingleFileEventHandler(testutils.GanetiTestCase):
55   """Test daemon.Mainloop"""
56
57   NOTIFIERS = [NOTIFIER_TERM, NOTIFIER_NORM, NOTIFIER_ERR] = range(3)
58
59   def setUp(self):
60     testutils.GanetiTestCase.setUp(self)
61     self.mainloop = daemon.Mainloop()
62     self.chk_files = [self._CreateTempFile() for i in self.NOTIFIERS]
63     self.notified = [False for i in self.NOTIFIERS]
64     # We need one watch manager per notifier, as those contain the file
65     # descriptor which is monitored by asyncore
66     self.wms = [pyinotify.WatchManager() for i in self.NOTIFIERS]
67     self.cbk = [self.OnInotifyCallback(self, i)
68                  for i in range(len(self.NOTIFIERS))]
69     self.ihandler = [asyncnotifier.SingleFileEventHandler(self.wms[i],
70                                                           self.cbk[i],
71                                                           self.chk_files[i])
72                       for i in range(len(self.NOTIFIERS))]
73     self.notifiers = [_MyErrorLoggingAsyncNotifier(self.wms[i],
74                                                    self.ihandler[i])
75                        for i in range(len(self.NOTIFIERS))]
76     # TERM notifier is enabled by default, as we use it to get out of the loop
77     self.ihandler[self.NOTIFIER_TERM].enable()
78
79   class OnInotifyCallback:
80     def __init__(self, testobj, i):
81       self.testobj = testobj
82       self.notified = testobj.notified
83       self.i = i
84
85     def __call__(self, enabled):
86       self.notified[self.i] = True
87       if self.i == self.testobj.NOTIFIER_TERM:
88         os.kill(os.getpid(), signal.SIGTERM)
89       elif self.i == self.testobj.NOTIFIER_ERR:
90         raise errors.GenericError("an error")
91
92   def testReplace(self):
93     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
94     self.mainloop.Run()
95     self.assert_(self.notified[self.NOTIFIER_TERM])
96     self.assertFalse(self.notified[self.NOTIFIER_NORM])
97     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
98     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
99
100   def testEnableDisable(self):
101     self.ihandler[self.NOTIFIER_TERM].enable()
102     self.ihandler[self.NOTIFIER_TERM].disable()
103     self.ihandler[self.NOTIFIER_TERM].disable()
104     self.ihandler[self.NOTIFIER_TERM].enable()
105     self.ihandler[self.NOTIFIER_TERM].disable()
106     self.ihandler[self.NOTIFIER_TERM].enable()
107     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
108     self.mainloop.Run()
109     self.assert_(self.notified[self.NOTIFIER_TERM])
110     self.assertFalse(self.notified[self.NOTIFIER_NORM])
111     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
112     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
113
114   def testDoubleEnable(self):
115     self.ihandler[self.NOTIFIER_TERM].enable()
116     self.ihandler[self.NOTIFIER_TERM].enable()
117     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
118     self.mainloop.Run()
119     self.assert_(self.notified[self.NOTIFIER_TERM])
120     self.assertFalse(self.notified[self.NOTIFIER_NORM])
121     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
122     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
123
124   def testDefaultDisabled(self):
125     utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
126     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
127     self.mainloop.Run()
128     self.assert_(self.notified[self.NOTIFIER_TERM])
129     # NORM notifier is disabled by default
130     self.assertFalse(self.notified[self.NOTIFIER_NORM])
131     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
132     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
133
134   def testBothEnabled(self):
135     self.ihandler[self.NOTIFIER_NORM].enable()
136     utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
137     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
138     self.mainloop.Run()
139     self.assert_(self.notified[self.NOTIFIER_TERM])
140     self.assert_(self.notified[self.NOTIFIER_NORM])
141     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
142     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
143
144   def testError(self):
145     self.ihandler[self.NOTIFIER_ERR].enable()
146     utils.WriteFile(self.chk_files[self.NOTIFIER_ERR], data="dummy")
147     self.assertRaises(errors.GenericError, self.mainloop.Run)
148     self.assert_(self.notified[self.NOTIFIER_ERR])
149     self.assertEquals(self.notifiers[self.NOTIFIER_ERR].error_count, 1)
150     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
151     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
152
153
154 class TestSingleFileEventHandlerError(unittest.TestCase):
155   def setUp(self):
156     self.tmpdir = tempfile.mkdtemp()
157
158   def tearDown(self):
159     shutil.rmtree(self.tmpdir)
160
161   def test(self):
162     wm = pyinotify.WatchManager()
163     handler = asyncnotifier.SingleFileEventHandler(wm, None,
164                                                    utils.PathJoin(self.tmpdir,
165                                                                   "nonexist"))
166     self.assertRaises(errors.InotifyError, handler.enable)
167     self.assertRaises(errors.InotifyError, handler.enable)
168     handler.disable()
169     self.assertRaises(errors.InotifyError, handler.enable)
170
171
172 if __name__ == "__main__":
173   testutils.GanetiTestProgram()