4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the asyncnotifier module"""
31 # pylint: disable-msg=E0611
32 from pyinotify import pyinotify
36 from ganeti import asyncnotifier
37 from ganeti import daemon
38 from ganeti import utils
39 from ganeti import errors
44 class _MyErrorLoggingAsyncNotifier(asyncnotifier.ErrorLoggingAsyncNotifier):
45 def __init__(self, *args, **kwargs):
46 asyncnotifier.ErrorLoggingAsyncNotifier.__init__(self, *args, **kwargs)
49 def handle_error(self):
54 class TestSingleFileEventHandler(testutils.GanetiTestCase):
55 """Test daemon.Mainloop"""
57 NOTIFIERS = [NOTIFIER_TERM, NOTIFIER_NORM, NOTIFIER_ERR] = range(3)
60 testutils.GanetiTestCase.setUp(self)
61 self.mainloop = daemon.Mainloop()
62 self.chk_files = [self._CreateTempFile() for i in self.NOTIFIERS]
63 self.notified = [False for i in self.NOTIFIERS]
64 # We need one watch manager per notifier, as those contain the file
65 # descriptor which is monitored by asyncore
66 self.wms = [pyinotify.WatchManager() for i in self.NOTIFIERS]
67 self.cbk = [self.OnInotifyCallback(self, i) for i in self.NOTIFIERS]
68 self.ihandler = [asyncnotifier.SingleFileEventHandler(wm, cb, cf)
70 zip(self.wms, self.cbk, self.chk_files)]
71 self.notifiers = [_MyErrorLoggingAsyncNotifier(wm, ih)
72 for (wm, ih) in zip(self.wms, self.ihandler)]
73 # TERM notifier is enabled by default, as we use it to get out of the loop
74 self.ihandler[self.NOTIFIER_TERM].enable()
76 class OnInotifyCallback:
77 def __init__(self, testobj, i):
78 self.testobj = testobj
79 self.notified = testobj.notified
82 def __call__(self, enabled):
83 self.notified[self.i] = True
84 if self.i == self.testobj.NOTIFIER_TERM:
85 os.kill(os.getpid(), signal.SIGTERM)
86 elif self.i == self.testobj.NOTIFIER_ERR:
87 raise errors.GenericError("an error")
89 def testReplace(self):
90 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
92 self.assert_(self.notified[self.NOTIFIER_TERM])
93 self.assertFalse(self.notified[self.NOTIFIER_NORM])
94 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
95 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
97 def testEnableDisable(self):
98 self.ihandler[self.NOTIFIER_TERM].enable()
99 self.ihandler[self.NOTIFIER_TERM].disable()
100 self.ihandler[self.NOTIFIER_TERM].disable()
101 self.ihandler[self.NOTIFIER_TERM].enable()
102 self.ihandler[self.NOTIFIER_TERM].disable()
103 self.ihandler[self.NOTIFIER_TERM].enable()
104 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
106 self.assert_(self.notified[self.NOTIFIER_TERM])
107 self.assertFalse(self.notified[self.NOTIFIER_NORM])
108 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
109 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
111 def testDoubleEnable(self):
112 self.ihandler[self.NOTIFIER_TERM].enable()
113 self.ihandler[self.NOTIFIER_TERM].enable()
114 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
116 self.assert_(self.notified[self.NOTIFIER_TERM])
117 self.assertFalse(self.notified[self.NOTIFIER_NORM])
118 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
119 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
121 def testDefaultDisabled(self):
122 utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
123 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
125 self.assert_(self.notified[self.NOTIFIER_TERM])
126 # NORM notifier is disabled by default
127 self.assertFalse(self.notified[self.NOTIFIER_NORM])
128 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
129 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
131 def testBothEnabled(self):
132 self.ihandler[self.NOTIFIER_NORM].enable()
133 utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
134 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
136 self.assert_(self.notified[self.NOTIFIER_TERM])
137 self.assert_(self.notified[self.NOTIFIER_NORM])
138 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
139 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
142 self.ihandler[self.NOTIFIER_ERR].enable()
143 utils.WriteFile(self.chk_files[self.NOTIFIER_ERR], data="dummy")
144 self.assertRaises(errors.GenericError, self.mainloop.Run)
145 self.assert_(self.notified[self.NOTIFIER_ERR])
146 self.assertEquals(self.notifiers[self.NOTIFIER_ERR].error_count, 1)
147 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
148 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
151 class TestSingleFileEventHandlerError(unittest.TestCase):
153 self.tmpdir = tempfile.mkdtemp()
156 shutil.rmtree(self.tmpdir)
159 wm = pyinotify.WatchManager()
160 handler = asyncnotifier.SingleFileEventHandler(wm, None,
161 utils.PathJoin(self.tmpdir,
163 self.assertRaises(errors.InotifyError, handler.enable)
164 self.assertRaises(errors.InotifyError, handler.enable)
166 self.assertRaises(errors.InotifyError, handler.enable)
169 if __name__ == "__main__":
170 testutils.GanetiTestProgram()