Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.asyncnotifier_unittest.py @ db859c7d

History | View | Annotate | Download (6.4 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the asyncnotifier module"""
23

    
24
import unittest
25
import signal
26
import os
27
import tempfile
28
import shutil
29

    
30
try:
31
  # pylint: disable=E0611
32
  from pyinotify import pyinotify
33
except ImportError:
34
  import pyinotify
35

    
36
from ganeti import asyncnotifier
37
from ganeti import daemon
38
from ganeti import utils
39
from ganeti import errors
40

    
41
import testutils
42

    
43

    
44
class _MyErrorLoggingAsyncNotifier(asyncnotifier.ErrorLoggingAsyncNotifier):
45
  def __init__(self, *args, **kwargs):
46
    asyncnotifier.ErrorLoggingAsyncNotifier.__init__(self, *args, **kwargs)
47
    self.error_count = 0
48

    
49
  def handle_error(self):
50
    self.error_count += 1
51
    raise
52

    
53

    
54
class TestSingleFileEventHandler(testutils.GanetiTestCase):
55
  """Test daemon.Mainloop"""
56

    
57
  NOTIFIERS = [NOTIFIER_TERM, NOTIFIER_NORM, NOTIFIER_ERR] = range(3)
58

    
59
  def setUp(self):
60
    testutils.GanetiTestCase.setUp(self)
61
    self.mainloop = daemon.Mainloop()
62
    self.chk_files = [self._CreateTempFile() for i in self.NOTIFIERS]
63
    self.notified = [False for i in self.NOTIFIERS]
64
    # We need one watch manager per notifier, as those contain the file
65
    # descriptor which is monitored by asyncore
66
    self.wms = [pyinotify.WatchManager() for i in self.NOTIFIERS]
67
    self.cbk = [self.OnInotifyCallback(self, i) for i in self.NOTIFIERS]
68
    self.ihandler = [asyncnotifier.SingleFileEventHandler(wm, cb, cf)
69
                     for (wm, cb, cf) in
70
                     zip(self.wms, self.cbk, self.chk_files)]
71
    self.notifiers = [_MyErrorLoggingAsyncNotifier(wm, ih)
72
                      for (wm, ih) in zip(self.wms, self.ihandler)]
73
    # TERM notifier is enabled by default, as we use it to get out of the loop
74
    self.ihandler[self.NOTIFIER_TERM].enable()
75

    
76
  class OnInotifyCallback:
77
    def __init__(self, testobj, i):
78
      self.testobj = testobj
79
      self.notified = testobj.notified
80
      self.i = i
81

    
82
    def __call__(self, enabled):
83
      self.notified[self.i] = True
84
      if self.i == self.testobj.NOTIFIER_TERM:
85
        os.kill(os.getpid(), signal.SIGTERM)
86
      elif self.i == self.testobj.NOTIFIER_ERR:
87
        raise errors.GenericError("an error")
88

    
89
  def testReplace(self):
90
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
91
    self.mainloop.Run()
92
    self.assert_(self.notified[self.NOTIFIER_TERM])
93
    self.assertFalse(self.notified[self.NOTIFIER_NORM])
94
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
95
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
96

    
97
  def testEnableDisable(self):
98
    self.ihandler[self.NOTIFIER_TERM].enable()
99
    self.ihandler[self.NOTIFIER_TERM].disable()
100
    self.ihandler[self.NOTIFIER_TERM].disable()
101
    self.ihandler[self.NOTIFIER_TERM].enable()
102
    self.ihandler[self.NOTIFIER_TERM].disable()
103
    self.ihandler[self.NOTIFIER_TERM].enable()
104
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
105
    self.mainloop.Run()
106
    self.assert_(self.notified[self.NOTIFIER_TERM])
107
    self.assertFalse(self.notified[self.NOTIFIER_NORM])
108
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
109
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
110

    
111
  def testDoubleEnable(self):
112
    self.ihandler[self.NOTIFIER_TERM].enable()
113
    self.ihandler[self.NOTIFIER_TERM].enable()
114
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
115
    self.mainloop.Run()
116
    self.assert_(self.notified[self.NOTIFIER_TERM])
117
    self.assertFalse(self.notified[self.NOTIFIER_NORM])
118
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
119
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
120

    
121
  def testDefaultDisabled(self):
122
    utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
123
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
124
    self.mainloop.Run()
125
    self.assert_(self.notified[self.NOTIFIER_TERM])
126
    # NORM notifier is disabled by default
127
    self.assertFalse(self.notified[self.NOTIFIER_NORM])
128
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
129
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
130

    
131
  def testBothEnabled(self):
132
    self.ihandler[self.NOTIFIER_NORM].enable()
133
    utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
134
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
135
    self.mainloop.Run()
136
    self.assert_(self.notified[self.NOTIFIER_TERM])
137
    self.assert_(self.notified[self.NOTIFIER_NORM])
138
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
139
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
140

    
141
  def testError(self):
142
    self.ihandler[self.NOTIFIER_ERR].enable()
143
    utils.WriteFile(self.chk_files[self.NOTIFIER_ERR], data="dummy")
144
    self.assertRaises(errors.GenericError, self.mainloop.Run)
145
    self.assert_(self.notified[self.NOTIFIER_ERR])
146
    self.assertEquals(self.notifiers[self.NOTIFIER_ERR].error_count, 1)
147
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
148
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
149

    
150

    
151
class TestSingleFileEventHandlerError(unittest.TestCase):
152
  def setUp(self):
153
    self.tmpdir = tempfile.mkdtemp()
154

    
155
  def tearDown(self):
156
    shutil.rmtree(self.tmpdir)
157

    
158
  def test(self):
159
    wm = pyinotify.WatchManager()
160
    handler = asyncnotifier.SingleFileEventHandler(wm, None,
161
                                                   utils.PathJoin(self.tmpdir,
162
                                                                  "nonexist"))
163
    self.assertRaises(errors.InotifyError, handler.enable)
164
    self.assertRaises(errors.InotifyError, handler.enable)
165
    handler.disable()
166
    self.assertRaises(errors.InotifyError, handler.enable)
167

    
168

    
169
if __name__ == "__main__":
170
  testutils.GanetiTestProgram()