4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
30 from ganeti import utils
34 def __init__(self, owner, timer_id, start, interval, repeat):
36 self.timer_id = timer_id
38 self.interval = interval
42 class Mainloop(object):
43 """Generic mainloop for daemons
47 """Constructs a new Mainloop instance.
51 self._io_wait_add = []
52 self._io_wait_remove = []
53 self._signal_wait = []
54 self._timer_id_last = 0
57 self._timer_remove = []
59 def Run(self, handle_sigchld=True, handle_sigterm=True, stop_on_empty=False):
62 @type handle_sigchld: bool
63 @param handle_sigchld: Whether to install handler for SIGCHLD
64 @type handle_sigterm: bool
65 @param handle_sigterm: Whether to install handler for SIGTERM
66 @type stop_on_empty: bool
67 @param stop_on_empty: Whether to stop mainloop once all I/O waiters
71 poller = select.poll()
73 # Setup signal handlers
75 sigchld_handler = utils.SignalHandler([signal.SIGCHLD])
77 sigchld_handler = None
80 sigterm_handler = utils.SignalHandler([signal.SIGTERM])
82 sigterm_handler = None
87 timeout_needs_update = True
89 # Start actual main loop
91 # Entries could be added again afterwards, hence removing first
92 if self._io_wait_remove:
93 for fd in self._io_wait_remove:
102 self._io_wait_remove = []
105 if self._io_wait_add:
106 for (owner, fd, conditions) in self._io_wait_add:
107 self._io_wait[fd] = owner
108 poller.register(fd, conditions)
109 self._io_wait_add = []
113 timeout_needs_update = True
114 for timer in self._timer_add:
115 self._timer[timer.timer_id] = timer
116 del self._timer_add[:]
119 if self._timer_remove:
120 timeout_needs_update = True
121 for timer_id in self._timer_remove:
123 del self._timer[timer_id]
126 del self._timer_remove[:]
128 # Stop if nothing is listening anymore
129 if stop_on_empty and not (self._io_wait or self._timer):
132 # Calculate timeout again if required
133 if timeout_needs_update:
134 timeout = self._CalcTimeout(time.time())
135 timeout_needs_update = False
137 # Wait for I/O events
139 io_events = poller.poll(timeout)
140 except select.error, err:
141 # EINTR can happen when signals are sent
142 if err.args and err.args[0] in (errno.EINTR,):
147 after_poll = time.time()
150 # Check for I/O events
151 for (evfd, evcond) in io_events:
152 owner = self._io_wait.get(evfd, None)
154 owner.OnIO(evfd, evcond)
157 self._CheckTimers(after_poll)
159 # Check whether signal was raised
160 if sigchld_handler and sigchld_handler.called:
161 self._CallSignalWaiters(signal.SIGCHLD)
162 sigchld_handler.Clear()
164 if sigterm_handler and sigterm_handler.called:
165 self._CallSignalWaiters(signal.SIGTERM)
167 sigterm_handler.Clear()
169 # Restore signal handlers
171 sigterm_handler.Reset()
174 sigchld_handler.Reset()
176 def _CalcTimeout(self, now):
182 # TODO: Repeating timers
186 for timer in self._timer.itervalues():
187 time_left = (timer.start + timer.interval) - now
188 if timeout is None or time_left < timeout:
193 elif timeout < min_timeout:
194 timeout = min_timeout
197 return timeout * 1000.0
199 def _CheckTimers(self, now):
200 # TODO: Repeating timers
201 for timer in self._timer.itervalues():
202 if now < (timer.start + timer.interval):
205 timer.owner.OnTimer(timer.timer_id)
207 # TODO: Repeating timers should not be removed
208 self._timer_remove.append(timer.timer_id)
210 def _CallSignalWaiters(self, signum):
211 """Calls all signal waiters for a certain signal.
214 @param signum: Signal number
217 for owner in self._signal_wait:
218 owner.OnSignal(signal.SIGCHLD)
220 def RegisterIO(self, owner, fd, condition):
221 """Registers a receiver for I/O notifications
223 The receiver must support a "OnIO(self, fd, conditions)" function.
225 @type owner: instance
226 @param owner: Receiver
228 @param fd: File descriptor
230 @param condition: ORed field of conditions to be notified
234 # select.Poller also supports file() like objects, but we don't.
235 assert isinstance(fd, (int, long)), \
236 "Only integers are supported for file descriptors"
238 self._io_wait_add.append((owner, fd, condition))
240 def UnregisterIO(self, fd):
241 """Unregister a file descriptor.
243 It'll be unregistered the next time the mainloop checks for it.
246 @param fd: File descriptor
249 # select.Poller also supports file() like objects, but we don't.
250 assert isinstance(fd, (int, long)), \
251 "Only integers are supported for file descriptors"
253 self._io_wait_remove.append(fd)
255 def RegisterSignal(self, owner):
256 """Registers a receiver for signal notifications
258 The receiver must support a "OnSignal(self, signum)" function.
260 @type owner: instance
261 @param owner: Receiver
264 self._signal_wait.append(owner)
266 def AddTimer(self, owner, interval, repeat):
269 The receiver must support a "OnTimer(self, timer_id)" function.
271 @type owner: instance
272 @param owner: Receiver
273 @type interval: int or float
274 @param interval: Timer interval in seconds
276 @param repeat: Whether this is a repeating timer or one-off
279 # TODO: Implement repeating timers
280 assert not repeat, "Repeating timers are not yet supported"
283 self._timer_id_last += 1
285 timer_id = self._timer_id_last
287 self._timer_add.append(Timer(owner, timer_id, time.time(),
288 float(interval), repeat))
292 def RemoveTimer(self, timer_id):
296 @param timer_id: Timer ID
299 self._timer_remove.append(timer_id)