Generate import-export unittest certs in parallel
[ganeti-local] / test / ganeti.asyncnotifier_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the asyncnotifier module"""
23
24 import unittest
25 import signal
26 import os
27
28 try:
29   # pylint: disable-msg=E0611
30   from pyinotify import pyinotify
31 except ImportError:
32   import pyinotify
33
34 from ganeti import asyncnotifier
35 from ganeti import daemon
36 from ganeti import utils
37 from ganeti import errors
38
39 import testutils
40
41
42 class _MyErrorLoggingAsyncNotifier(asyncnotifier.ErrorLoggingAsyncNotifier):
43   def __init__(self, *args, **kwargs):
44     asyncnotifier.ErrorLoggingAsyncNotifier.__init__(self, *args, **kwargs)
45     self.error_count = 0
46
47   def handle_error(self):
48     self.error_count += 1
49     raise
50
51
52 class TestSingleFileEventHandler(testutils.GanetiTestCase):
53   """Test daemon.Mainloop"""
54
55   NOTIFIERS = [NOTIFIER_TERM, NOTIFIER_NORM, NOTIFIER_ERR] = range(3)
56
57   def setUp(self):
58     testutils.GanetiTestCase.setUp(self)
59     self.mainloop = daemon.Mainloop()
60     self.chk_files = [self._CreateTempFile() for i in self.NOTIFIERS]
61     self.notified = [False for i in self.NOTIFIERS]
62     # We need one watch manager per notifier, as those contain the file
63     # descriptor which is monitored by asyncore
64     self.wms = [pyinotify.WatchManager() for i in self.NOTIFIERS]
65     self.cbk = [self.OnInotifyCallback(self, i)
66                  for i in range(len(self.NOTIFIERS))]
67     self.ihandler = [asyncnotifier.SingleFileEventHandler(self.wms[i],
68                                                           self.cbk[i],
69                                                           self.chk_files[i])
70                       for i in range(len(self.NOTIFIERS))]
71     self.notifiers = [_MyErrorLoggingAsyncNotifier(self.wms[i],
72                                                    self.ihandler[i])
73                        for i in range(len(self.NOTIFIERS))]
74     # TERM notifier is enabled by default, as we use it to get out of the loop
75     self.ihandler[self.NOTIFIER_TERM].enable()
76
77   class OnInotifyCallback:
78     def __init__(self, testobj, i):
79       self.testobj = testobj
80       self.notified = testobj.notified
81       self.i = i
82
83     def __call__(self, enabled):
84       self.notified[self.i] = True
85       if self.i == self.testobj.NOTIFIER_TERM:
86         os.kill(os.getpid(), signal.SIGTERM)
87       elif self.i == self.testobj.NOTIFIER_ERR:
88         raise errors.GenericError("an error")
89
90   def testReplace(self):
91     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
92     self.mainloop.Run()
93     self.assert_(self.notified[self.NOTIFIER_TERM])
94     self.assert_(not self.notified[self.NOTIFIER_NORM])
95     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
96     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
97
98   def testEnableDisable(self):
99     self.ihandler[self.NOTIFIER_TERM].enable()
100     self.ihandler[self.NOTIFIER_TERM].disable()
101     self.ihandler[self.NOTIFIER_TERM].disable()
102     self.ihandler[self.NOTIFIER_TERM].enable()
103     self.ihandler[self.NOTIFIER_TERM].disable()
104     self.ihandler[self.NOTIFIER_TERM].enable()
105     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
106     self.mainloop.Run()
107     self.assert_(self.notified[self.NOTIFIER_TERM])
108     self.assert_(not self.notified[self.NOTIFIER_NORM])
109     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
110     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
111
112   def testDoubleEnable(self):
113     self.ihandler[self.NOTIFIER_TERM].enable()
114     self.ihandler[self.NOTIFIER_TERM].enable()
115     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
116     self.mainloop.Run()
117     self.assert_(self.notified[self.NOTIFIER_TERM])
118     self.assert_(not self.notified[self.NOTIFIER_NORM])
119     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
120     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
121
122   def testDefaultDisabled(self):
123     utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
124     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
125     self.mainloop.Run()
126     self.assert_(self.notified[self.NOTIFIER_TERM])
127     # NORM notifier is disabled by default
128     self.assert_(not self.notified[self.NOTIFIER_NORM])
129     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
130     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
131
132   def testBothEnabled(self):
133     self.ihandler[self.NOTIFIER_NORM].enable()
134     utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
135     utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
136     self.mainloop.Run()
137     self.assert_(self.notified[self.NOTIFIER_TERM])
138     self.assert_(self.notified[self.NOTIFIER_NORM])
139     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
140     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
141
142   def testError(self):
143     self.ihandler[self.NOTIFIER_ERR].enable()
144     utils.WriteFile(self.chk_files[self.NOTIFIER_ERR], data="dummy")
145     self.assertRaises(errors.GenericError, self.mainloop.Run)
146     self.assert_(self.notified[self.NOTIFIER_ERR])
147     self.assertEquals(self.notifiers[self.NOTIFIER_ERR].error_count, 1)
148     self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
149     self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
150
151
152 if __name__ == "__main__":
153   testutils.GanetiTestProgram()