4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the asyncnotifier module"""
29 # pylint: disable-msg=E0611
30 from pyinotify import pyinotify
34 from ganeti import asyncnotifier
35 from ganeti import daemon
36 from ganeti import utils
37 from ganeti import errors
42 class _MyErrorLoggingAsyncNotifier(asyncnotifier.ErrorLoggingAsyncNotifier):
43 def __init__(self, *args, **kwargs):
44 asyncnotifier.ErrorLoggingAsyncNotifier.__init__(self, *args, **kwargs)
47 def handle_error(self):
52 class TestSingleFileEventHandler(testutils.GanetiTestCase):
53 """Test daemon.Mainloop"""
55 NOTIFIERS = [NOTIFIER_TERM, NOTIFIER_NORM, NOTIFIER_ERR] = range(3)
58 testutils.GanetiTestCase.setUp(self)
59 self.mainloop = daemon.Mainloop()
60 self.chk_files = [self._CreateTempFile() for i in self.NOTIFIERS]
61 self.notified = [False for i in self.NOTIFIERS]
62 # We need one watch manager per notifier, as those contain the file
63 # descriptor which is monitored by asyncore
64 self.wms = [pyinotify.WatchManager() for i in self.NOTIFIERS]
65 self.cbk = [self.OnInotifyCallback(self, i)
66 for i in range(len(self.NOTIFIERS))]
67 self.ihandler = [asyncnotifier.SingleFileEventHandler(self.wms[i],
70 for i in range(len(self.NOTIFIERS))]
71 self.notifiers = [_MyErrorLoggingAsyncNotifier(self.wms[i],
73 for i in range(len(self.NOTIFIERS))]
74 # TERM notifier is enabled by default, as we use it to get out of the loop
75 self.ihandler[self.NOTIFIER_TERM].enable()
77 class OnInotifyCallback:
78 def __init__(self, testobj, i):
79 self.testobj = testobj
80 self.notified = testobj.notified
83 def __call__(self, enabled):
84 self.notified[self.i] = True
85 if self.i == self.testobj.NOTIFIER_TERM:
86 os.kill(os.getpid(), signal.SIGTERM)
87 elif self.i == self.testobj.NOTIFIER_ERR:
88 raise errors.GenericError("an error")
90 def testReplace(self):
91 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
93 self.assert_(self.notified[self.NOTIFIER_TERM])
94 self.assert_(not self.notified[self.NOTIFIER_NORM])
95 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
96 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
98 def testEnableDisable(self):
99 self.ihandler[self.NOTIFIER_TERM].enable()
100 self.ihandler[self.NOTIFIER_TERM].disable()
101 self.ihandler[self.NOTIFIER_TERM].disable()
102 self.ihandler[self.NOTIFIER_TERM].enable()
103 self.ihandler[self.NOTIFIER_TERM].disable()
104 self.ihandler[self.NOTIFIER_TERM].enable()
105 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
107 self.assert_(self.notified[self.NOTIFIER_TERM])
108 self.assert_(not self.notified[self.NOTIFIER_NORM])
109 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
110 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
112 def testDoubleEnable(self):
113 self.ihandler[self.NOTIFIER_TERM].enable()
114 self.ihandler[self.NOTIFIER_TERM].enable()
115 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
117 self.assert_(self.notified[self.NOTIFIER_TERM])
118 self.assert_(not self.notified[self.NOTIFIER_NORM])
119 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
120 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
122 def testDefaultDisabled(self):
123 utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
124 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
126 self.assert_(self.notified[self.NOTIFIER_TERM])
127 # NORM notifier is disabled by default
128 self.assert_(not self.notified[self.NOTIFIER_NORM])
129 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
130 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
132 def testBothEnabled(self):
133 self.ihandler[self.NOTIFIER_NORM].enable()
134 utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
135 utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
137 self.assert_(self.notified[self.NOTIFIER_TERM])
138 self.assert_(self.notified[self.NOTIFIER_NORM])
139 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
140 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
143 self.ihandler[self.NOTIFIER_ERR].enable()
144 utils.WriteFile(self.chk_files[self.NOTIFIER_ERR], data="dummy")
145 self.assertRaises(errors.GenericError, self.mainloop.Run)
146 self.assert_(self.notified[self.NOTIFIER_ERR])
147 self.assertEquals(self.notifiers[self.NOTIFIER_ERR].error_count, 1)
148 self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
149 self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
152 if __name__ == "__main__":
153 testutils.GanetiTestProgram()