@utils.SignalHandled([signal.SIGCHLD])
@utils.SignalHandled([signal.SIGTERM])
+ @utils.SignalHandled([signal.SIGINT])
def Run(self, signal_handlers=None):
"""Runs the mainloop.
handler = signal_handlers[sig]
if handler.called:
self._CallSignalWaiters(sig)
- running = (sig != signal.SIGTERM)
+ running = sig not in (signal.SIGTERM, signal.SIGINT)
handler.Clear()
def _CallSignalWaiters(self, signum):
self.mainloop.Run() # terminates by _SendSig being scheduled
self.assertEquals(self.sendsig_events, [signal.SIGTERM])
+ def testTerminatingSignals(self):
+ self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGCHLD])
+ self.mainloop.scheduler.enter(0.2, 1, self._SendSig, [signal.SIGINT])
+ self.mainloop.Run()
+ self.assertEquals(self.sendsig_events, [signal.SIGCHLD, signal.SIGINT])
+ self.mainloop.scheduler.enter(0.1, 1, self._SendSig, [signal.SIGTERM])
+ self.mainloop.Run()
+ self.assertEquals(self.sendsig_events, [signal.SIGCHLD, signal.SIGINT,
+ signal.SIGTERM])
+
def testSchedulerCancel(self):
handle = self.mainloop.scheduler.enter(0.1, 1, self._SendSig,
[signal.SIGTERM])