bash_completion: Enable extglob while parsing file
[ganeti-local] / test / ganeti.asyncnotifier_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the asyncnotifier module"""
23
24 import unittest
25 import signal
26 import os
27 import tempfile
28 import shutil
29
30 try:
31   # pylint: disable=E0611
32   from pyinotify import pyinotify
33 except ImportError:
34   import pyinotify
35
36 from ganeti import asyncnotifier
37 from ganeti import daemon
38 from ganeti import utils
39 from ganeti import errors
40
41 import testutils
42
43
44 class _MyErrorLoggingAsyncNotifier(asyncnotifier.ErrorLoggingAsyncNotifier):
45   def __init__(self, *args, **kwargs):
46     asyncnotifier.ErrorLoggingAsyncNotifier.__init__(self, *args, **kwargs)
47     self.error_count = 0
48
49   def handle_error(self):
50     self.error_count += 1
51     raise
52
53
54 class TestSingleFileEventHandler(testutils.GanetiTestCase):
55   """Test daemon.Mainloop"""
56
57   NOTIFIERS = [NOTIFIER_TERM, NOTIFIER_NORM, NOTIFIER_ERR] = range(3)
58
59   def setUp(self):
60     testutils.GanetiTestCase.setUp(self)
61     self.mainloop = daemon.Mainloop()
62     self.chk_files = [self._CreateTempFile() for i in self.NOTIFIERS]
63     self.notified = [False for i in self.NOTIFIERS]
64     # We need one watch manager per notifier, as those contain the file
65     # descriptor which is monitored by asyncore
66     self.wms = [pyinotify.WatchManager() for i in self.NOTIFIERS]
67     self.cbk = [self.OnInotifyCallback(self, i) for i in self.NOTIFIERS]
68     self.ihandler = [asyncnotifier.SingleFileEventHandler(wm, cb, cf)
69                      for (wm, cb, cf) in
70                      zip(self.wms, self.cbk, self.chk_files)]
71     self.notifiers = [_MyErrorLoggingAsyncNotifier(wm, ih)
72                       for (wm, ih) in zip(self.wms, self.ihandler)]
73     # TERM notifier is enabled by default, as we use it to get out of the loop
74     self.ihandler[self.NOTIFIER_TERM].enable()
75
76   def tearDown(self):
77     # disable the inotifiers, before removing the files
78     for i in self.ihandler:
79       i.disable()
80     testutils.GanetiTestCase.tearDown(self)
81     # and unregister the fd's being polled
82     for n in self.notifiers:
83       n.del_channel()
84
85   class OnInotifyCallback:
86     def __init__(self, testobj, i):
87       self.testobj = testobj
88       self.notified = testobj.notified
89       self.i = i
90
91     def __call__(self, enabled):
92       self.notified[self.i] = True
93       if self.i == self.testobj.NOTIFIER_TERM:
94         os.kill(os.getpid(), signal.SIGTERM)
95       elif self.i == self.testobj.NOTIFIER_ERR:
96         raise errors.GenericError("an error")
97
98   def testReplace(self):
99     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
100     self.mainloop.Run()
101     self.assert_(self.notified[self.NOTIFIER_TERM])
102     self.assertFalse(self.notified[self.NOTIFIER_NORM])
103     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
104     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
105
106   def testEnableDisable(self):
107     self.ihandler[self.NOTIFIER_TERM].enable()
108     self.ihandler[self.NOTIFIER_TERM].disable()
109     self.ihandler[self.NOTIFIER_TERM].disable()
110     self.ihandler[self.NOTIFIER_TERM].enable()
111     self.ihandler[self.NOTIFIER_TERM].disable()
112     self.ihandler[self.NOTIFIER_TERM].enable()
113     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
114     self.mainloop.Run()
115     self.assert_(self.notified[self.NOTIFIER_TERM])
116     self.assertFalse(self.notified[self.NOTIFIER_NORM])
117     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
118     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
119
120   def testDoubleEnable(self):
121     self.ihandler[self.NOTIFIER_TERM].enable()
122     self.ihandler[self.NOTIFIER_TERM].enable()
123     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
124     self.mainloop.Run()
125     self.assert_(self.notified[self.NOTIFIER_TERM])
126     self.assertFalse(self.notified[self.NOTIFIER_NORM])
127     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
128     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
129
130   def testDefaultDisabled(self):
131     utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
132     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
133     self.mainloop.Run()
134     self.assert_(self.notified[self.NOTIFIER_TERM])
135     # NORM notifier is disabled by default
136     self.assertFalse(self.notified[self.NOTIFIER_NORM])
137     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
138     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
139
140   def testBothEnabled(self):
141     self.ihandler[self.NOTIFIER_NORM].enable()
142     utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
143     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
144     self.mainloop.Run()
145     self.assert_(self.notified[self.NOTIFIER_TERM])
146     self.assert_(self.notified[self.NOTIFIER_NORM])
147     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
148     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
149
150   def testError(self):
151     self.ihandler[self.NOTIFIER_ERR].enable()
152     utils.WriteFile(self.chk_files[self.NOTIFIER_ERR], data="dummy")
153     self.assertRaises(errors.GenericError, self.mainloop.Run)
154     self.assert_(self.notified[self.NOTIFIER_ERR])
155     self.assertEquals(self.notifiers[self.NOTIFIER_ERR].error_count, 1)
156     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
157     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
158
159
160 class TestSingleFileEventHandlerError(unittest.TestCase):
161   def setUp(self):
162     self.tmpdir = tempfile.mkdtemp()
163
164   def tearDown(self):
165     shutil.rmtree(self.tmpdir)
166
167   def test(self):
168     wm = pyinotify.WatchManager()
169     handler = asyncnotifier.SingleFileEventHandler(wm, None,
170                                                    utils.PathJoin(self.tmpdir,
171                                                                   "nonexist"))
172     self.assertRaises(errors.InotifyError, handler.enable)
173     self.assertRaises(errors.InotifyError, handler.enable)
174     handler.disable()
175     self.assertRaises(errors.InotifyError, handler.enable)
176
177
178 if __name__ == "__main__":
179   testutils.GanetiTestProgram()