Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.asyncnotifier_unittest.py @ 415feb2e

History | View | Annotate | Download (6.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the asyncnotifier module"""
23

    
24
import unittest
25
import signal
26
import os
27
import tempfile
28
import shutil
29

    
30
try:
31
  # pylint: disable=E0611
32
  from pyinotify import pyinotify
33
except ImportError:
34
  import pyinotify
35

    
36
from ganeti import asyncnotifier
37
from ganeti import daemon
38
from ganeti import utils
39
from ganeti import errors
40

    
41
import testutils
42

    
43

    
44
class _MyErrorLoggingAsyncNotifier(asyncnotifier.ErrorLoggingAsyncNotifier):
45
  def __init__(self, *args, **kwargs):
46
    asyncnotifier.ErrorLoggingAsyncNotifier.__init__(self, *args, **kwargs)
47
    self.error_count = 0
48

    
49
  def handle_error(self):
50
    self.error_count += 1
51
    raise
52

    
53

    
54
class TestSingleFileEventHandler(testutils.GanetiTestCase):
55
  """Test daemon.Mainloop"""
56

    
57
  NOTIFIERS = [NOTIFIER_TERM, NOTIFIER_NORM, NOTIFIER_ERR] = range(3)
58

    
59
  def setUp(self):
60
    testutils.GanetiTestCase.setUp(self)
61
    self.mainloop = daemon.Mainloop()
62
    self.chk_files = [self._CreateTempFile() for i in self.NOTIFIERS]
63
    self.notified = [False for i in self.NOTIFIERS]
64
    # We need one watch manager per notifier, as those contain the file
65
    # descriptor which is monitored by asyncore
66
    self.wms = [pyinotify.WatchManager() for i in self.NOTIFIERS]
67
    self.cbk = [self.OnInotifyCallback(self, i) for i in self.NOTIFIERS]
68
    self.ihandler = [asyncnotifier.SingleFileEventHandler(wm, cb, cf)
69
                     for (wm, cb, cf) in
70
                     zip(self.wms, self.cbk, self.chk_files)]
71
    self.notifiers = [_MyErrorLoggingAsyncNotifier(wm, ih)
72
                      for (wm, ih) in zip(self.wms, self.ihandler)]
73
    # TERM notifier is enabled by default, as we use it to get out of the loop
74
    self.ihandler[self.NOTIFIER_TERM].enable()
75

    
76
  def tearDown(self):
77
    # disable the inotifiers, before removing the files
78
    for i in self.ihandler:
79
      i.disable()
80
    testutils.GanetiTestCase.tearDown(self)
81
    # and unregister the fd's being polled
82
    for n in self.notifiers:
83
      n.del_channel()
84

    
85
  class OnInotifyCallback:
86
    def __init__(self, testobj, i):
87
      self.testobj = testobj
88
      self.notified = testobj.notified
89
      self.i = i
90

    
91
    def __call__(self, enabled):
92
      self.notified[self.i] = True
93
      if self.i == self.testobj.NOTIFIER_TERM:
94
        os.kill(os.getpid(), signal.SIGTERM)
95
      elif self.i == self.testobj.NOTIFIER_ERR:
96
        raise errors.GenericError("an error")
97

    
98
  def testReplace(self):
99
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
100
    self.mainloop.Run()
101
    self.assert_(self.notified[self.NOTIFIER_TERM])
102
    self.assertFalse(self.notified[self.NOTIFIER_NORM])
103
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
104
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
105

    
106
  def testEnableDisable(self):
107
    self.ihandler[self.NOTIFIER_TERM].enable()
108
    self.ihandler[self.NOTIFIER_TERM].disable()
109
    self.ihandler[self.NOTIFIER_TERM].disable()
110
    self.ihandler[self.NOTIFIER_TERM].enable()
111
    self.ihandler[self.NOTIFIER_TERM].disable()
112
    self.ihandler[self.NOTIFIER_TERM].enable()
113
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
114
    self.mainloop.Run()
115
    self.assert_(self.notified[self.NOTIFIER_TERM])
116
    self.assertFalse(self.notified[self.NOTIFIER_NORM])
117
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
118
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
119

    
120
  def testDoubleEnable(self):
121
    self.ihandler[self.NOTIFIER_TERM].enable()
122
    self.ihandler[self.NOTIFIER_TERM].enable()
123
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
124
    self.mainloop.Run()
125
    self.assert_(self.notified[self.NOTIFIER_TERM])
126
    self.assertFalse(self.notified[self.NOTIFIER_NORM])
127
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
128
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
129

    
130
  def testDefaultDisabled(self):
131
    utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
132
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
133
    self.mainloop.Run()
134
    self.assert_(self.notified[self.NOTIFIER_TERM])
135
    # NORM notifier is disabled by default
136
    self.assertFalse(self.notified[self.NOTIFIER_NORM])
137
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
138
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
139

    
140
  def testBothEnabled(self):
141
    self.ihandler[self.NOTIFIER_NORM].enable()
142
    utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
143
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
144
    self.mainloop.Run()
145
    self.assert_(self.notified[self.NOTIFIER_TERM])
146
    self.assert_(self.notified[self.NOTIFIER_NORM])
147
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
148
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
149

    
150
  def testError(self):
151
    self.ihandler[self.NOTIFIER_ERR].enable()
152
    utils.WriteFile(self.chk_files[self.NOTIFIER_ERR], data="dummy")
153
    self.assertRaises(errors.GenericError, self.mainloop.Run)
154
    self.assert_(self.notified[self.NOTIFIER_ERR])
155
    self.assertEquals(self.notifiers[self.NOTIFIER_ERR].error_count, 1)
156
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
157
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
158

    
159

    
160
class TestSingleFileEventHandlerError(unittest.TestCase):
161
  def setUp(self):
162
    self.tmpdir = tempfile.mkdtemp()
163

    
164
  def tearDown(self):
165
    shutil.rmtree(self.tmpdir)
166

    
167
  def test(self):
168
    wm = pyinotify.WatchManager()
169
    handler = asyncnotifier.SingleFileEventHandler(wm, None,
170
                                                   utils.PathJoin(self.tmpdir,
171
                                                                  "nonexist"))
172
    self.assertRaises(errors.InotifyError, handler.enable)
173
    self.assertRaises(errors.InotifyError, handler.enable)
174
    handler.disable()
175
    self.assertRaises(errors.InotifyError, handler.enable)
176

    
177

    
178
if __name__ == "__main__":
179
  testutils.GanetiTestProgram()