4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the asyncnotifier module"""
31 # pylint: disable-msg=E0611
32 from pyinotify import pyinotify
36 from ganeti import asyncnotifier
37 from ganeti import daemon
38 from ganeti import utils
39 from ganeti import errors
44 class _MyErrorLoggingAsyncNotifier(asyncnotifier.ErrorLoggingAsyncNotifier):
45 def __init__(self, *args, **kwargs):
46 asyncnotifier.ErrorLoggingAsyncNotifier.__init__(self, *args, **kwargs)
49 def handle_error(self):
54 class TestSingleFileEventHandler(testutils.GanetiTestCase):
55 """Test daemon.Mainloop"""
57 NOTIFIERS = [NOTIFIER_TERM, NOTIFIER_NORM, NOTIFIER_ERR] = range(3)
60 testutils.GanetiTestCase.setUp(self)
61 self.mainloop = daemon.Mainloop()
62 self.chk_files = [self._CreateTempFile() for i in self.NOTIFIERS]
63 self.notified = [False for i in self.NOTIFIERS]
64 # We need one watch manager per notifier, as those contain the file
65 # descriptor which is monitored by asyncore
66 self.wms = [pyinotify.WatchManager() for i in self.NOTIFIERS]
67 self.cbk = [self.OnInotifyCallback(self, i)
68 for i in range(len(self.NOTIFIERS))]
69 self.ihandler = [asyncnotifier.SingleFileEventHandler(self.wms[i],
72 for i in range(len(self.NOTIFIERS))]
73 self.notifiers = [_MyErrorLoggingAsyncNotifier(self.wms[i],
75 for i in range(len(self.NOTIFIERS))]
76 # TERM notifier is enabled by default, as we use it to get out of the loop
77 self.ihandler[self.NOTIFIER_TERM].enable()
79 class OnInotifyCallback:
80 def __init__(self, testobj, i):
81 self.testobj = testobj
82 self.notified = testobj.notified
85 def __call__(self, enabled):
86 self.notified[self.i] = True
87 if self.i == self.testobj.NOTIFIER_TERM:
88 os.kill(os.getpid(), signal.SIGTERM)
89 elif self.i == self.testobj.NOTIFIER_ERR:
90 raise errors.GenericError("an error")
92 def testReplace(self):
93 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
95 self.assert_(self.notified[self.NOTIFIER_TERM])
96 self.assertFalse(self.notified[self.NOTIFIER_NORM])
97 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
98 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
100 def testEnableDisable(self):
101 self.ihandler[self.NOTIFIER_TERM].enable()
102 self.ihandler[self.NOTIFIER_TERM].disable()
103 self.ihandler[self.NOTIFIER_TERM].disable()
104 self.ihandler[self.NOTIFIER_TERM].enable()
105 self.ihandler[self.NOTIFIER_TERM].disable()
106 self.ihandler[self.NOTIFIER_TERM].enable()
107 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
109 self.assert_(self.notified[self.NOTIFIER_TERM])
110 self.assertFalse(self.notified[self.NOTIFIER_NORM])
111 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
112 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
114 def testDoubleEnable(self):
115 self.ihandler[self.NOTIFIER_TERM].enable()
116 self.ihandler[self.NOTIFIER_TERM].enable()
117 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
119 self.assert_(self.notified[self.NOTIFIER_TERM])
120 self.assertFalse(self.notified[self.NOTIFIER_NORM])
121 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
122 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
124 def testDefaultDisabled(self):
125 utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
126 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
128 self.assert_(self.notified[self.NOTIFIER_TERM])
129 # NORM notifier is disabled by default
130 self.assertFalse(self.notified[self.NOTIFIER_NORM])
131 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
132 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
134 def testBothEnabled(self):
135 self.ihandler[self.NOTIFIER_NORM].enable()
136 utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
137 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
139 self.assert_(self.notified[self.NOTIFIER_TERM])
140 self.assert_(self.notified[self.NOTIFIER_NORM])
141 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
142 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
145 self.ihandler[self.NOTIFIER_ERR].enable()
146 utils.WriteFile(self.chk_files[self.NOTIFIER_ERR], data="dummy")
147 self.assertRaises(errors.GenericError, self.mainloop.Run)
148 self.assert_(self.notified[self.NOTIFIER_ERR])
149 self.assertEquals(self.notifiers[self.NOTIFIER_ERR].error_count, 1)
150 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
151 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
154 class TestSingleFileEventHandlerError(unittest.TestCase):
156 self.tmpdir = tempfile.mkdtemp()
159 shutil.rmtree(self.tmpdir)
162 wm = pyinotify.WatchManager()
163 handler = asyncnotifier.SingleFileEventHandler(wm, None,
164 utils.PathJoin(self.tmpdir,
166 self.assertRaises(errors.InotifyError, handler.enable)
167 self.assertRaises(errors.InotifyError, handler.enable)
169 self.assertRaises(errors.InotifyError, handler.enable)
172 if __name__ == "__main__":
173 testutils.GanetiTestProgram()