4 # Copyright (C) 2010, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the asyncnotifier module"""
31 # pylint: disable=E0611
32 from pyinotify import pyinotify
36 from ganeti import asyncnotifier
37 from ganeti import daemon
38 from ganeti import utils
39 from ganeti import errors
44 class _MyErrorLoggingAsyncNotifier(asyncnotifier.ErrorLoggingAsyncNotifier):
45 def __init__(self, *args, **kwargs):
46 asyncnotifier.ErrorLoggingAsyncNotifier.__init__(self, *args, **kwargs)
49 def handle_error(self):
54 class TestSingleFileEventHandler(testutils.GanetiTestCase):
55 """Test daemon.Mainloop"""
57 NOTIFIERS = [NOTIFIER_TERM, NOTIFIER_NORM, NOTIFIER_ERR] = range(3)
60 testutils.GanetiTestCase.setUp(self)
61 self.mainloop = daemon.Mainloop()
62 self.chk_files = [self._CreateTempFile() for i in self.NOTIFIERS]
63 self.notified = [False for i in self.NOTIFIERS]
64 # We need one watch manager per notifier, as those contain the file
65 # descriptor which is monitored by asyncore
66 self.wms = [pyinotify.WatchManager() for i in self.NOTIFIERS]
67 self.cbk = [self.OnInotifyCallback(self, i) for i in self.NOTIFIERS]
68 self.ihandler = [asyncnotifier.SingleFileEventHandler(wm, cb, cf)
70 zip(self.wms, self.cbk, self.chk_files)]
71 self.notifiers = [_MyErrorLoggingAsyncNotifier(wm, ih)
72 for (wm, ih) in zip(self.wms, self.ihandler)]
73 # TERM notifier is enabled by default, as we use it to get out of the loop
74 self.ihandler[self.NOTIFIER_TERM].enable()
77 # disable the inotifiers, before removing the files
78 for i in self.ihandler:
80 testutils.GanetiTestCase.tearDown(self)
81 # and unregister the fd's being polled
82 for n in self.notifiers:
85 class OnInotifyCallback:
86 def __init__(self, testobj, i):
87 self.testobj = testobj
88 self.notified = testobj.notified
91 def __call__(self, enabled):
92 self.notified[self.i] = True
93 if self.i == self.testobj.NOTIFIER_TERM:
94 os.kill(os.getpid(), signal.SIGTERM)
95 elif self.i == self.testobj.NOTIFIER_ERR:
96 raise errors.GenericError("an error")
98 def testReplace(self):
99 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
101 self.assert_(self.notified[self.NOTIFIER_TERM])
102 self.assertFalse(self.notified[self.NOTIFIER_NORM])
103 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
104 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
106 def testEnableDisable(self):
107 self.ihandler[self.NOTIFIER_TERM].enable()
108 self.ihandler[self.NOTIFIER_TERM].disable()
109 self.ihandler[self.NOTIFIER_TERM].disable()
110 self.ihandler[self.NOTIFIER_TERM].enable()
111 self.ihandler[self.NOTIFIER_TERM].disable()
112 self.ihandler[self.NOTIFIER_TERM].enable()
113 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
115 self.assert_(self.notified[self.NOTIFIER_TERM])
116 self.assertFalse(self.notified[self.NOTIFIER_NORM])
117 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
118 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
120 def testDoubleEnable(self):
121 self.ihandler[self.NOTIFIER_TERM].enable()
122 self.ihandler[self.NOTIFIER_TERM].enable()
123 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
125 self.assert_(self.notified[self.NOTIFIER_TERM])
126 self.assertFalse(self.notified[self.NOTIFIER_NORM])
127 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
128 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
130 def testDefaultDisabled(self):
131 utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
132 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
134 self.assert_(self.notified[self.NOTIFIER_TERM])
135 # NORM notifier is disabled by default
136 self.assertFalse(self.notified[self.NOTIFIER_NORM])
137 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
138 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
140 def testBothEnabled(self):
141 self.ihandler[self.NOTIFIER_NORM].enable()
142 utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
143 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
145 self.assert_(self.notified[self.NOTIFIER_TERM])
146 self.assert_(self.notified[self.NOTIFIER_NORM])
147 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
148 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
151 self.ihandler[self.NOTIFIER_ERR].enable()
152 utils.WriteFile(self.chk_files[self.NOTIFIER_ERR], data="dummy")
153 self.assertRaises(errors.GenericError, self.mainloop.Run)
154 self.assert_(self.notified[self.NOTIFIER_ERR])
155 self.assertEquals(self.notifiers[self.NOTIFIER_ERR].error_count, 1)
156 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
157 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
160 class TestSingleFileEventHandlerError(unittest.TestCase):
162 self.tmpdir = tempfile.mkdtemp()
165 shutil.rmtree(self.tmpdir)
168 wm = pyinotify.WatchManager()
169 handler = asyncnotifier.SingleFileEventHandler(wm, None,
170 utils.PathJoin(self.tmpdir,
172 self.assertRaises(errors.InotifyError, handler.enable)
173 self.assertRaises(errors.InotifyError, handler.enable)
175 self.assertRaises(errors.InotifyError, handler.enable)
178 if __name__ == "__main__":
179 testutils.GanetiTestProgram()