import select
import signal
import errno
-import time
import logging
from ganeti import utils
from ganeti import constants
-class Timer(object):
- def __init__(self, owner, timer_id, start, interval, repeat):
- self.owner = owner
- self.timer_id = timer_id
- self.start = start
- self.interval = interval
- self.repeat = repeat
-
-
class Mainloop(object):
"""Generic mainloop for daemons
self._io_wait_add = []
self._io_wait_remove = []
self._signal_wait = []
- self._timer_id_last = 0
- self._timer = {}
- self._timer_add = []
- self._timer_remove = []
def Run(self, handle_sigchld=True, handle_sigterm=True, stop_on_empty=False):
"""Runs the mainloop.
try:
running = True
- timeout = None
- timeout_needs_update = True
# Start actual main loop
while running:
poller.register(fd, conditions)
self._io_wait_add = []
- # Add new timers
- if self._timer_add:
- timeout_needs_update = True
- for timer in self._timer_add:
- self._timer[timer.timer_id] = timer
- del self._timer_add[:]
-
- # Remove timers
- if self._timer_remove:
- timeout_needs_update = True
- for timer_id in self._timer_remove:
- try:
- del self._timer[timer_id]
- except KeyError:
- pass
- del self._timer_remove[:]
-
# Stop if nothing is listening anymore
- if stop_on_empty and not (self._io_wait or self._timer):
+ if stop_on_empty and not (self._io_wait):
break
- # Calculate timeout again if required
- if timeout_needs_update:
- timeout = self._CalcTimeout(time.time())
- timeout_needs_update = False
-
# Wait for I/O events
try:
- io_events = poller.poll(timeout)
+ io_events = poller.poll(None)
except select.error, err:
# EINTR can happen when signals are sent
if err.args and err.args[0] in (errno.EINTR,):
else:
raise
- after_poll = time.time()
-
if io_events:
# Check for I/O events
for (evfd, evcond) in io_events:
if owner:
owner.OnIO(evfd, evcond)
- if self._timer:
- self._CheckTimers(after_poll)
-
# Check whether signal was raised
if sigchld_handler and sigchld_handler.called:
self._CallSignalWaiters(signal.SIGCHLD)
if sigchld_handler:
sigchld_handler.Reset()
- def _CalcTimeout(self, now):
- if not self._timer:
- return None
-
- timeout = None
-
- # TODO: Repeating timers
-
- min_timeout = 0.001
-
- for timer in self._timer.itervalues():
- time_left = (timer.start + timer.interval) - now
- if timeout is None or time_left < timeout:
- timeout = time_left
- if timeout < 0:
- timeout = 0
- break
- elif timeout < min_timeout:
- timeout = min_timeout
- break
-
- return timeout * 1000.0
-
- def _CheckTimers(self, now):
- # TODO: Repeating timers
- for timer in self._timer.itervalues():
- if now < (timer.start + timer.interval):
- continue
-
- timer.owner.OnTimer(timer.timer_id)
-
- # TODO: Repeating timers should not be removed
- self._timer_remove.append(timer.timer_id)
-
def _CallSignalWaiters(self, signum):
"""Calls all signal waiters for a certain signal.
"""
self._signal_wait.append(owner)
- def AddTimer(self, owner, interval, repeat):
- """Add a new timer.
-
- The receiver must support a "OnTimer(self, timer_id)" function.
-
- @type owner: instance
- @param owner: Receiver
- @type interval: int or float
- @param interval: Timer interval in seconds
- @type repeat: bool
- @param repeat: Whether this is a repeating timer or one-off
-
- """
- # TODO: Implement repeating timers
- assert not repeat, "Repeating timers are not yet supported"
-
- # Get new ID
- self._timer_id_last += 1
-
- timer_id = self._timer_id_last
-
- self._timer_add.append(Timer(owner, timer_id, time.time(),
- float(interval), repeat))
-
- return timer_id
-
- def RemoveTimer(self, timer_id):
- """Removes a timer.
-
- @type timer_id: int
- @param timer_id: Timer ID
-
- """
- self._timer_remove.append(timer_id)
-
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
"""Shared main function for daemons.