import select
import signal
import errno
+import time
from ganeti import utils
+class Timer(object):
+ def __init__(self, owner, timer_id, start, interval, repeat):
+ self.owner = owner
+ self.timer_id = timer_id
+ self.start = start
+ self.interval = interval
+ self.repeat = repeat
+
+
class Mainloop(object):
"""Generic mainloop for daemons
self._io_wait_add = []
self._io_wait_remove = []
self._signal_wait = []
+ self._timer_id_last = 0
+ self._timer = {}
+ self._timer_add = []
+ self._timer_remove = []
def Run(self, handle_sigchld=True, handle_sigterm=True, stop_on_empty=False):
"""Runs the mainloop.
try:
running = True
+ timeout = None
+ timeout_needs_update = True
# Start actual main loop
while running:
poller.register(fd, conditions)
self._io_wait_add = []
+ # Add new timers
+ if self._timer_add:
+ timeout_needs_update = True
+ for timer in self._timer_add:
+ self._timer[timer.timer_id] = timer
+ del self._timer_add[:]
+
+ # Remove timers
+ if self._timer_remove:
+ timeout_needs_update = True
+ for timer_id in self._timer_remove:
+ try:
+ del self._timer[timer_id]
+ except KeyError:
+ pass
+ del self._timer_remove[:]
+
# Stop if nothing is listening anymore
- if stop_on_empty and not self._io_wait:
+ if stop_on_empty and not (self._io_wait or self._timer):
break
+ # Calculate timeout again if required
+ if timeout_needs_update:
+ timeout = self._CalcTimeout(time.time())
+ timeout_needs_update = False
+
# Wait for I/O events
try:
- io_events = poller.poll()
+ io_events = poller.poll(timeout)
except select.error, err:
# EINTR can happen when signals are sent
if err.args and err.args[0] in (errno.EINTR,):
else:
raise
+ after_poll = time.time()
+
if io_events:
# Check for I/O events
for (evfd, evcond) in io_events:
if owner:
owner.OnIO(evfd, evcond)
+ if self._timer:
+ self._CheckTimers(after_poll)
+
# Check whether signal was raised
if sigchld_handler and sigchld_handler.called:
self._CallSignalWaiters(signal.SIGCHLD)
if sigchld_handler:
sigchld_handler.Reset()
+ def _CalcTimeout(self, now):
+ if not self._timer:
+ return None
+
+ timeout = None
+
+ # TODO: Repeating timers
+
+ min_timeout = 0.001
+
+ for timer in self._timer.itervalues():
+ time_left = (timer.start + timer.interval) - now
+ if timeout is None or time_left < timeout:
+ timeout = time_left
+ if timeout < 0:
+ timeout = 0
+ break
+ elif timeout < min_timeout:
+ timeout = min_timeout
+ break
+
+ return timeout * 1000.0
+
+ def _CheckTimers(self, now):
+ # TODO: Repeating timers
+ for timer in self._timer.itervalues():
+ if now < (timer.start + timer.interval):
+ continue
+
+ timer.owner.OnTimer(timer.timer_id)
+
+ # TODO: Repeating timers should not be removed
+ self._timer_remove.append(timer.timer_id)
+
def _CallSignalWaiters(self, signum):
"""Calls all signal waiters for a certain signal.
"""
self._signal_wait.append(owner)
+
+ def AddTimer(self, owner, interval, repeat):
+ """Add a new timer.
+
+ The receiver must support a "OnTimer(self, timer_id)" function.
+
+ @type owner: instance
+ @param owner: Receiver
+ @type interval: int or float
+ @param interval: Timer interval in seconds
+ @type repeat: bool
+ @param repeat: Whether this is a repeating timer or one-off
+
+ """
+ # TODO: Implement repeating timers
+ assert not repeat, "Repeating timers are not yet supported"
+
+ # Get new ID
+ self._timer_id_last += 1
+
+ timer_id = self._timer_id_last
+
+ self._timer_add.append(Timer(owner, timer_id, time.time(),
+ float(interval), repeat))
+
+ return timer_id
+
+ def RemoveTimer(self, timer_id):
+ """Removes a timer.
+
+ @type timer_id: int
+ @param timer_id: Timer ID
+
+ """
+ self._timer_remove.append(timer_id)