4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for unittesting the daemon module"""
28 from ganeti import daemon
33 class TestMainloop(testutils.GanetiTestCase):
34 """Test daemon.Mainloop"""
37 testutils.GanetiTestCase.setUp(self)
38 self.mainloop = daemon.Mainloop()
39 self.sendsig_events = []
40 self.onsignal_events = []
42 def _CancelEvent(self, handle):
43 self.mainloop.scheduler.cancel(handle)
45 def _SendSig(self, sig):
46 self.sendsig_events.append(sig)
47 os.kill(os.getpid(), sig)
49 def OnSignal(self, signum):
50 self.onsignal_events.append(signum)
52 def testRunAndTermBySched(self):
53 self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGTERM])
54 self.mainloop.Run() # terminates by _SendSig being scheduled
55 self.assertEquals(self.sendsig_events, [signal.SIGTERM])
57 def testSchedulerCancel(self):
58 handle = self.mainloop.scheduler.enter(0.1, 1, self._SendSig,
60 self.mainloop.scheduler.cancel(handle)
61 self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGCHLD])
62 self.mainloop.scheduler.enter(0.3, 1, self._SendSig, [signal.SIGTERM])
64 self.assertEquals(self.sendsig_events, [signal.SIGCHLD, signal.SIGTERM])
66 def testRegisterSignal(self):
67 self.mainloop.RegisterSignal(self)
68 self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGCHLD])
69 handle = self.mainloop.scheduler.enter(0.1, 1, self._SendSig,
71 self.mainloop.scheduler.cancel(handle)
72 self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGCHLD])
73 self.mainloop.scheduler.enter(0.3, 1, self._SendSig, [signal.SIGTERM])
74 # ...not delievered because they are scheduled after TERM
75 self.mainloop.scheduler.enter(0.4, 1, self._SendSig, [signal.SIGCHLD])
76 self.mainloop.scheduler.enter(0.5, 1, self._SendSig, [signal.SIGCHLD])
78 self.assertEquals(self.sendsig_events,
79 [signal.SIGCHLD, signal.SIGCHLD, signal.SIGTERM])
80 self.assertEquals(self.onsignal_events, self.sendsig_events)
82 def testDeferredCancel(self):
83 self.mainloop.RegisterSignal(self)
84 self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGCHLD])
85 handle1 = self.mainloop.scheduler.enter(0.3, 2, self._SendSig,
87 handle2 = self.mainloop.scheduler.enter(0.4, 2, self._SendSig,
89 self.mainloop.scheduler.enter(0, 1, self._CancelEvent, [handle1])
90 self.mainloop.scheduler.enter(0, 1, self._CancelEvent, [handle2])
91 self.mainloop.scheduler.enter(0.5, 1, self._SendSig, [signal.SIGTERM])
93 self.assertEquals(self.sendsig_events, [signal.SIGCHLD, signal.SIGTERM])
94 self.assertEquals(self.onsignal_events, self.sendsig_events)
97 if __name__ == "__main__":
98 testutils.GanetiTestProgram()