X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/b14b975f7986d80d5a349d18342ca92e1a8e775f..944bf54895c1d4491c6d06ad464aa6e97844c366:/lib/daemon.py?ds=sidebyside diff --git a/lib/daemon.py b/lib/daemon.py index 3aa514a..26f9cdf 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -25,10 +25,20 @@ import select import signal import errno +import time from ganeti import utils +class Timer(object): + def __init__(self, owner, timer_id, start, interval, repeat): + self.owner = owner + self.timer_id = timer_id + self.start = start + self.interval = interval + self.repeat = repeat + + class Mainloop(object): """Generic mainloop for daemons @@ -41,6 +51,10 @@ class Mainloop(object): self._io_wait_add = [] self._io_wait_remove = [] self._signal_wait = [] + self._timer_id_last = 0 + self._timer = {} + self._timer_add = [] + self._timer_remove = [] def Run(self, handle_sigchld=True, handle_sigterm=True, stop_on_empty=False): """Runs the mainloop. @@ -69,6 +83,8 @@ class Mainloop(object): try: running = True + timeout = None + timeout_needs_update = True # Start actual main loop while running: @@ -92,13 +108,35 @@ class Mainloop(object): poller.register(fd, conditions) self._io_wait_add = [] + # Add new timers + if self._timer_add: + timeout_needs_update = True + for timer in self._timer_add: + self._timer[timer.timer_id] = timer + del self._timer_add[:] + + # Remove timers + if self._timer_remove: + timeout_needs_update = True + for timer_id in self._timer_remove: + try: + del self._timer[timer_id] + except KeyError: + pass + del self._timer_remove[:] + # Stop if nothing is listening anymore - if stop_on_empty and not self._io_wait: + if stop_on_empty and not (self._io_wait or self._timer): break + # Calculate timeout again if required + if timeout_needs_update: + timeout = self._CalcTimeout(time.time()) + timeout_needs_update = False + # Wait for I/O events try: - io_events = poller.poll() + io_events = poller.poll(timeout) except select.error, err: # EINTR can happen when signals are sent if err.args and err.args[0] in (errno.EINTR,): @@ -106,6 +144,8 @@ class Mainloop(object): else: raise + after_poll = time.time() + if io_events: # Check for I/O events for (evfd, evcond) in io_events: @@ -113,6 +153,9 @@ class Mainloop(object): if owner: owner.OnIO(evfd, evcond) + if self._timer: + self._CheckTimers(after_poll) + # Check whether signal was raised if sigchld_handler and sigchld_handler.called: self._CallSignalWaiters(signal.SIGCHLD) @@ -130,6 +173,40 @@ class Mainloop(object): if sigchld_handler: sigchld_handler.Reset() + def _CalcTimeout(self, now): + if not self._timer: + return None + + timeout = None + + # TODO: Repeating timers + + min_timeout = 0.001 + + for timer in self._timer.itervalues(): + time_left = (timer.start + timer.interval) - now + if timeout is None or time_left < timeout: + timeout = time_left + if timeout < 0: + timeout = 0 + break + elif timeout < min_timeout: + timeout = min_timeout + break + + return timeout * 1000.0 + + def _CheckTimers(self, now): + # TODO: Repeating timers + for timer in self._timer.itervalues(): + if now < (timer.start + timer.interval): + continue + + timer.owner.OnTimer(timer.timer_id) + + # TODO: Repeating timers should not be removed + self._timer_remove.append(timer.timer_id) + def _CallSignalWaiters(self, signum): """Calls all signal waiters for a certain signal. @@ -185,3 +262,38 @@ class Mainloop(object): """ self._signal_wait.append(owner) + + def AddTimer(self, owner, interval, repeat): + """Add a new timer. + + The receiver must support a "OnTimer(self, timer_id)" function. + + @type owner: instance + @param owner: Receiver + @type interval: int or float + @param interval: Timer interval in seconds + @type repeat: bool + @param repeat: Whether this is a repeating timer or one-off + + """ + # TODO: Implement repeating timers + assert not repeat, "Repeating timers are not yet supported" + + # Get new ID + self._timer_id_last += 1 + + timer_id = self._timer_id_last + + self._timer_add.append(Timer(owner, timer_id, time.time(), + float(interval), repeat)) + + return timer_id + + def RemoveTimer(self, timer_id): + """Removes a timer. + + @type timer_id: int + @param timer_id: Timer ID + + """ + self._timer_remove.append(timer_id)