Rename OpGetTags and LUGetTags
[ganeti-local] / test / ganeti.asyncnotifier_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the asyncnotifier module"""
23
24 import unittest
25 import signal
26 import os
27 import tempfile
28 import shutil
29
30 try:
31   # pylint: disable-msg=E0611
32   from pyinotify import pyinotify
33 except ImportError:
34   import pyinotify
35
36 from ganeti import asyncnotifier
37 from ganeti import daemon
38 from ganeti import utils
39 from ganeti import errors
40
41 import testutils
42
43
44 class _MyErrorLoggingAsyncNotifier(asyncnotifier.ErrorLoggingAsyncNotifier):
45   def __init__(self, *args, **kwargs):
46     asyncnotifier.ErrorLoggingAsyncNotifier.__init__(self, *args, **kwargs)
47     self.error_count = 0
48
49   def handle_error(self):
50     self.error_count += 1
51     raise
52
53
54 class TestSingleFileEventHandler(testutils.GanetiTestCase):
55   """Test daemon.Mainloop"""
56
57   NOTIFIERS = [NOTIFIER_TERM, NOTIFIER_NORM, NOTIFIER_ERR] = range(3)
58
59   def setUp(self):
60     testutils.GanetiTestCase.setUp(self)
61     self.mainloop = daemon.Mainloop()
62     self.chk_files = [self._CreateTempFile() for i in self.NOTIFIERS]
63     self.notified = [False for i in self.NOTIFIERS]
64     # We need one watch manager per notifier, as those contain the file
65     # descriptor which is monitored by asyncore
66     self.wms = [pyinotify.WatchManager() for i in self.NOTIFIERS]
67     self.cbk = [self.OnInotifyCallback(self, i) for i in self.NOTIFIERS]
68     self.ihandler = [asyncnotifier.SingleFileEventHandler(wm, cb, cf)
69                      for (wm, cb, cf) in
70                      zip(self.wms, self.cbk, self.chk_files)]
71     self.notifiers = [_MyErrorLoggingAsyncNotifier(wm, ih)
72                       for (wm, ih) in zip(self.wms, self.ihandler)]
73     # TERM notifier is enabled by default, as we use it to get out of the loop
74     self.ihandler[self.NOTIFIER_TERM].enable()
75
76   class OnInotifyCallback:
77     def __init__(self, testobj, i):
78       self.testobj = testobj
79       self.notified = testobj.notified
80       self.i = i
81
82     def __call__(self, enabled):
83       self.notified[self.i] = True
84       if self.i == self.testobj.NOTIFIER_TERM:
85         os.kill(os.getpid(), signal.SIGTERM)
86       elif self.i == self.testobj.NOTIFIER_ERR:
87         raise errors.GenericError("an error")
88
89   def testReplace(self):
90     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
91     self.mainloop.Run()
92     self.assert_(self.notified[self.NOTIFIER_TERM])
93     self.assertFalse(self.notified[self.NOTIFIER_NORM])
94     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
95     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
96
97   def testEnableDisable(self):
98     self.ihandler[self.NOTIFIER_TERM].enable()
99     self.ihandler[self.NOTIFIER_TERM].disable()
100     self.ihandler[self.NOTIFIER_TERM].disable()
101     self.ihandler[self.NOTIFIER_TERM].enable()
102     self.ihandler[self.NOTIFIER_TERM].disable()
103     self.ihandler[self.NOTIFIER_TERM].enable()
104     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
105     self.mainloop.Run()
106     self.assert_(self.notified[self.NOTIFIER_TERM])
107     self.assertFalse(self.notified[self.NOTIFIER_NORM])
108     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
109     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
110
111   def testDoubleEnable(self):
112     self.ihandler[self.NOTIFIER_TERM].enable()
113     self.ihandler[self.NOTIFIER_TERM].enable()
114     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
115     self.mainloop.Run()
116     self.assert_(self.notified[self.NOTIFIER_TERM])
117     self.assertFalse(self.notified[self.NOTIFIER_NORM])
118     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
119     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
120
121   def testDefaultDisabled(self):
122     utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
123     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
124     self.mainloop.Run()
125     self.assert_(self.notified[self.NOTIFIER_TERM])
126     # NORM notifier is disabled by default
127     self.assertFalse(self.notified[self.NOTIFIER_NORM])
128     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
129     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
130
131   def testBothEnabled(self):
132     self.ihandler[self.NOTIFIER_NORM].enable()
133     utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
134     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
135     self.mainloop.Run()
136     self.assert_(self.notified[self.NOTIFIER_TERM])
137     self.assert_(self.notified[self.NOTIFIER_NORM])
138     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
139     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
140
141   def testError(self):
142     self.ihandler[self.NOTIFIER_ERR].enable()
143     utils.WriteFile(self.chk_files[self.NOTIFIER_ERR], data="dummy")
144     self.assertRaises(errors.GenericError, self.mainloop.Run)
145     self.assert_(self.notified[self.NOTIFIER_ERR])
146     self.assertEquals(self.notifiers[self.NOTIFIER_ERR].error_count, 1)
147     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
148     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
149
150
151 class TestSingleFileEventHandlerError(unittest.TestCase):
152   def setUp(self):
153     self.tmpdir = tempfile.mkdtemp()
154
155   def tearDown(self):
156     shutil.rmtree(self.tmpdir)
157
158   def test(self):
159     wm = pyinotify.WatchManager()
160     handler = asyncnotifier.SingleFileEventHandler(wm, None,
161                                                    utils.PathJoin(self.tmpdir,
162                                                                   "nonexist"))
163     self.assertRaises(errors.InotifyError, handler.enable)
164     self.assertRaises(errors.InotifyError, handler.enable)
165     handler.disable()
166     self.assertRaises(errors.InotifyError, handler.enable)
167
168
169 if __name__ == "__main__":
170   testutils.GanetiTestProgram()