X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/54cf65995770ef2cb2ab63759c7de232ac947642..da961187f97344fde390140ebb2f10d10d334d51:/lib/daemon.py diff --git a/lib/daemon.py b/lib/daemon.py index ab2a272..26f9cdf 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -25,62 +25,197 @@ import select import signal import errno +import time from ganeti import utils +class Timer(object): + def __init__(self, owner, timer_id, start, interval, repeat): + self.owner = owner + self.timer_id = timer_id + self.start = start + self.interval = interval + self.repeat = repeat + + class Mainloop(object): """Generic mainloop for daemons """ def __init__(self): - self._io_wait = [] + """Constructs a new Mainloop instance. + + """ + self._io_wait = {} + self._io_wait_add = [] + self._io_wait_remove = [] self._signal_wait = [] - self.sigchld_handler = None - self.sigterm_handler = None - self.quit = False + self._timer_id_last = 0 + self._timer = {} + self._timer_add = [] + self._timer_remove = [] - def Run(self): - # TODO: Does not yet support adding new event sources while running + def Run(self, handle_sigchld=True, handle_sigterm=True, stop_on_empty=False): + """Runs the mainloop. + + @type handle_sigchld: bool + @param handle_sigchld: Whether to install handler for SIGCHLD + @type handle_sigterm: bool + @param handle_sigterm: Whether to install handler for SIGTERM + @type stop_on_empty: bool + @param stop_on_empty: Whether to stop mainloop once all I/O waiters + unregistered + + """ poller = select.poll() - for (owner, fd, conditions) in self._io_wait: - poller.register(fd, conditions) - self.sigchld_handler = utils.SignalHandler([signal.SIGCHLD]) - self.sigterm_handler = utils.SignalHandler([signal.SIGTERM]) + # Setup signal handlers + if handle_sigchld: + sigchld_handler = utils.SignalHandler([signal.SIGCHLD]) + else: + sigchld_handler = None try: - while not self.quit: - try: - io_events = poller.poll() - except select.error, err: - # EINTR can happen when signals are sent - if err.args and err.args[0] in (errno.EINTR,): - io_events = None - else: - raise - - if io_events: - # Check for I/O events - for (evfd, evcond) in io_events: - for (owner, fd, conditions) in self._io_wait: - if fd == evfd and evcond & conditions: - owner.OnIO(fd, evcond) - - # Check whether signal was raised - if self.sigchld_handler.called: - for owner in self._signal_wait: - owner.OnSignal(signal.SIGCHLD) - self.sigchld_handler.Clear() - - if self.sigterm_handler.called: - self.quit = True - self.sigterm_handler.Clear() + if handle_sigterm: + sigterm_handler = utils.SignalHandler([signal.SIGTERM]) + else: + sigterm_handler = None + + try: + running = True + timeout = None + timeout_needs_update = True + + # Start actual main loop + while running: + # Entries could be added again afterwards, hence removing first + if self._io_wait_remove: + for fd in self._io_wait_remove: + try: + poller.unregister(fd) + except KeyError: + pass + try: + del self._io_wait[fd] + except KeyError: + pass + self._io_wait_remove = [] + + # Add new entries + if self._io_wait_add: + for (owner, fd, conditions) in self._io_wait_add: + self._io_wait[fd] = owner + poller.register(fd, conditions) + self._io_wait_add = [] + + # Add new timers + if self._timer_add: + timeout_needs_update = True + for timer in self._timer_add: + self._timer[timer.timer_id] = timer + del self._timer_add[:] + + # Remove timers + if self._timer_remove: + timeout_needs_update = True + for timer_id in self._timer_remove: + try: + del self._timer[timer_id] + except KeyError: + pass + del self._timer_remove[:] + + # Stop if nothing is listening anymore + if stop_on_empty and not (self._io_wait or self._timer): + break + + # Calculate timeout again if required + if timeout_needs_update: + timeout = self._CalcTimeout(time.time()) + timeout_needs_update = False + + # Wait for I/O events + try: + io_events = poller.poll(timeout) + except select.error, err: + # EINTR can happen when signals are sent + if err.args and err.args[0] in (errno.EINTR,): + io_events = None + else: + raise + + after_poll = time.time() + + if io_events: + # Check for I/O events + for (evfd, evcond) in io_events: + owner = self._io_wait.get(evfd, None) + if owner: + owner.OnIO(evfd, evcond) + + if self._timer: + self._CheckTimers(after_poll) + + # Check whether signal was raised + if sigchld_handler and sigchld_handler.called: + self._CallSignalWaiters(signal.SIGCHLD) + sigchld_handler.Clear() + + if sigterm_handler and sigterm_handler.called: + self._CallSignalWaiters(signal.SIGTERM) + running = False + sigterm_handler.Clear() + finally: + # Restore signal handlers + if sigterm_handler: + sigterm_handler.Reset() finally: - self.sigchld_handler.Reset() - self.sigchld_handler = None - self.sigterm_handler.Reset() - self.sigterm_handler = None + if sigchld_handler: + sigchld_handler.Reset() + + def _CalcTimeout(self, now): + if not self._timer: + return None + + timeout = None + + # TODO: Repeating timers + + min_timeout = 0.001 + + for timer in self._timer.itervalues(): + time_left = (timer.start + timer.interval) - now + if timeout is None or time_left < timeout: + timeout = time_left + if timeout < 0: + timeout = 0 + break + elif timeout < min_timeout: + timeout = min_timeout + break + + return timeout * 1000.0 + + def _CheckTimers(self, now): + # TODO: Repeating timers + for timer in self._timer.itervalues(): + if now < (timer.start + timer.interval): + continue + + timer.owner.OnTimer(timer.timer_id) + # TODO: Repeating timers should not be removed + self._timer_remove.append(timer.timer_id) + + def _CallSignalWaiters(self, signum): + """Calls all signal waiters for a certain signal. + + @type signum: int + @param signum: Signal number + + """ + for owner in self._signal_wait: + owner.OnSignal(signal.SIGCHLD) def RegisterIO(self, owner, fd, condition): """Registers a receiver for I/O notifications @@ -96,7 +231,26 @@ class Mainloop(object): (see select module) """ - self._io_wait.append((owner, fd, condition)) + # select.Poller also supports file() like objects, but we don't. + assert isinstance(fd, (int, long)), \ + "Only integers are supported for file descriptors" + + self._io_wait_add.append((owner, fd, condition)) + + def UnregisterIO(self, fd): + """Unregister a file descriptor. + + It'll be unregistered the next time the mainloop checks for it. + + @type fd: int + @param fd: File descriptor + + """ + # select.Poller also supports file() like objects, but we don't. + assert isinstance(fd, (int, long)), \ + "Only integers are supported for file descriptors" + + self._io_wait_remove.append(fd) def RegisterSignal(self, owner): """Registers a receiver for signal notifications @@ -108,3 +262,38 @@ class Mainloop(object): """ self._signal_wait.append(owner) + + def AddTimer(self, owner, interval, repeat): + """Add a new timer. + + The receiver must support a "OnTimer(self, timer_id)" function. + + @type owner: instance + @param owner: Receiver + @type interval: int or float + @param interval: Timer interval in seconds + @type repeat: bool + @param repeat: Whether this is a repeating timer or one-off + + """ + # TODO: Implement repeating timers + assert not repeat, "Repeating timers are not yet supported" + + # Get new ID + self._timer_id_last += 1 + + timer_id = self._timer_id_last + + self._timer_add.append(Timer(owner, timer_id, time.time(), + float(interval), repeat)) + + return timer_id + + def RemoveTimer(self, timer_id): + """Removes a timer. + + @type timer_id: int + @param timer_id: Timer ID + + """ + self._timer_remove.append(timer_id)