Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.asyncnotifier_unittest.py @ e3cc4c69

History | View | Annotate | Download (5.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for unittesting the asyncnotifier module"""
23

    
24
import unittest
25
import signal
26
import os
27

    
28
try:
29
  # pylint: disable-msg=E0611
30
  from pyinotify import pyinotify
31
except ImportError:
32
  import pyinotify
33

    
34
from ganeti import asyncnotifier
35
from ganeti import daemon
36
from ganeti import utils
37
from ganeti import errors
38

    
39
import testutils
40

    
41

    
42
class _MyErrorLoggingAsyncNotifier(asyncnotifier.ErrorLoggingAsyncNotifier):
43
  def __init__(self, *args, **kwargs):
44
    asyncnotifier.ErrorLoggingAsyncNotifier.__init__(self, *args, **kwargs)
45
    self.error_count = 0
46

    
47
  def handle_error(self):
48
    self.error_count += 1
49
    raise
50

    
51

    
52
class TestSingleFileEventHandler(testutils.GanetiTestCase):
53
  """Test daemon.Mainloop"""
54

    
55
  NOTIFIERS = [NOTIFIER_TERM, NOTIFIER_NORM, NOTIFIER_ERR] = range(3)
56

    
57
  def setUp(self):
58
    testutils.GanetiTestCase.setUp(self)
59
    self.mainloop = daemon.Mainloop()
60
    self.chk_files = [self._CreateTempFile() for i in self.NOTIFIERS]
61
    self.notified = [False for i in self.NOTIFIERS]
62
    # We need one watch manager per notifier, as those contain the file
63
    # descriptor which is monitored by asyncore
64
    self.wms = [pyinotify.WatchManager() for i in self.NOTIFIERS]
65
    self.cbk = [self.OnInotifyCallback(self, i)
66
                 for i in range(len(self.NOTIFIERS))]
67
    self.ihandler = [asyncnotifier.SingleFileEventHandler(self.wms[i],
68
                                                          self.cbk[i],
69
                                                          self.chk_files[i])
70
                      for i in range(len(self.NOTIFIERS))]
71
    self.notifiers = [_MyErrorLoggingAsyncNotifier(self.wms[i],
72
                                                   self.ihandler[i])
73
                       for i in range(len(self.NOTIFIERS))]
74
    # TERM notifier is enabled by default, as we use it to get out of the loop
75
    self.ihandler[self.NOTIFIER_TERM].enable()
76

    
77
  class OnInotifyCallback:
78
    def __init__(self, testobj, i):
79
      self.testobj = testobj
80
      self.notified = testobj.notified
81
      self.i = i
82

    
83
    def __call__(self, enabled):
84
      self.notified[self.i] = True
85
      if self.i == self.testobj.NOTIFIER_TERM:
86
        os.kill(os.getpid(), signal.SIGTERM)
87
      elif self.i == self.testobj.NOTIFIER_ERR:
88
        raise errors.GenericError("an error")
89

    
90
  def testReplace(self):
91
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
92
    self.mainloop.Run()
93
    self.assert_(self.notified[self.NOTIFIER_TERM])
94
    self.assert_(not self.notified[self.NOTIFIER_NORM])
95
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
96
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
97

    
98
  def testEnableDisable(self):
99
    self.ihandler[self.NOTIFIER_TERM].enable()
100
    self.ihandler[self.NOTIFIER_TERM].disable()
101
    self.ihandler[self.NOTIFIER_TERM].disable()
102
    self.ihandler[self.NOTIFIER_TERM].enable()
103
    self.ihandler[self.NOTIFIER_TERM].disable()
104
    self.ihandler[self.NOTIFIER_TERM].enable()
105
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
106
    self.mainloop.Run()
107
    self.assert_(self.notified[self.NOTIFIER_TERM])
108
    self.assert_(not self.notified[self.NOTIFIER_NORM])
109
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
110
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
111

    
112
  def testDoubleEnable(self):
113
    self.ihandler[self.NOTIFIER_TERM].enable()
114
    self.ihandler[self.NOTIFIER_TERM].enable()
115
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
116
    self.mainloop.Run()
117
    self.assert_(self.notified[self.NOTIFIER_TERM])
118
    self.assert_(not self.notified[self.NOTIFIER_NORM])
119
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
120
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
121

    
122
  def testDefaultDisabled(self):
123
    utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
124
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
125
    self.mainloop.Run()
126
    self.assert_(self.notified[self.NOTIFIER_TERM])
127
    # NORM notifier is disabled by default
128
    self.assert_(not self.notified[self.NOTIFIER_NORM])
129
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
130
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
131

    
132
  def testBothEnabled(self):
133
    self.ihandler[self.NOTIFIER_NORM].enable()
134
    utils.WriteFile(self.chk_files[self.NOTIFIER_NORM], data="dummy")
135
    utils.WriteFile(self.chk_files[self.NOTIFIER_TERM], data="dummy")
136
    self.mainloop.Run()
137
    self.assert_(self.notified[self.NOTIFIER_TERM])
138
    self.assert_(self.notified[self.NOTIFIER_NORM])
139
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
140
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
141

    
142
  def testError(self):
143
    self.ihandler[self.NOTIFIER_ERR].enable()
144
    utils.WriteFile(self.chk_files[self.NOTIFIER_ERR], data="dummy")
145
    self.assertRaises(errors.GenericError, self.mainloop.Run)
146
    self.assert_(self.notified[self.NOTIFIER_ERR])
147
    self.assertEquals(self.notifiers[self.NOTIFIER_ERR].error_count, 1)
148
    self.assertEquals(self.notifiers[self.NOTIFIER_NORM].error_count, 0)
149
    self.assertEquals(self.notifiers[self.NOTIFIER_TERM].error_count, 0)
150

    
151

    
152
if __name__ == "__main__":
153
  testutils.GanetiTestProgram()