4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
30 from ganeti import utils
34 def __init__(self, owner, timer_id, start, interval, repeat):
36 self.timer_id = timer_id
38 self.interval = interval
42 class Mainloop(object):
43 """Generic mainloop for daemons
47 """Constructs a new Mainloop instance.
51 self._io_wait_add = []
52 self._io_wait_remove = []
53 self._signal_wait = []
54 self._timer_id_last = 0
57 self._timer_remove = []
59 def Run(self, handle_sigchld=True, handle_sigterm=True, stop_on_empty=False):
62 @type handle_sigchld: bool
63 @param handle_sigchld: Whether to install handler for SIGCHLD
64 @type handle_sigterm: bool
65 @param handle_sigterm: Whether to install handler for SIGTERM
66 @type stop_on_empty: bool
67 @param stop_on_empty: Whether to stop mainloop once all I/O waiters
71 poller = select.poll()
73 # Setup signal handlers
75 sigchld_handler = utils.SignalHandler([signal.SIGCHLD])
77 sigchld_handler = None
80 sigterm_handler = utils.SignalHandler([signal.SIGTERM])
82 sigterm_handler = None
87 timeout_needs_update = True
89 # Start actual main loop
91 # Entries could be added again afterwards, hence removing first
92 if self._io_wait_remove:
93 for fd in self._io_wait_remove:
102 self._io_wait_remove = []
105 if self._io_wait_add:
106 for (owner, fd, conditions) in self._io_wait_add:
107 self._io_wait[fd] = owner
108 poller.register(fd, conditions)
109 self._io_wait_add = []
113 timeout_needs_update = True
114 for timer in self._timer_add:
115 self._timer[timer.timer_id] = timer
116 del self._timer_add[:]
119 if self._timer_remove:
120 timeout_needs_update = True
121 for timer_id in self._timer_remove:
123 del self._timer[timer_id]
126 del self._timer_remove[:]
128 # Stop if nothing is listening anymore
129 if stop_on_empty and not (self._io_wait or self._timer):
132 # Calculate timeout again if required
133 if timeout_needs_update:
134 timeout = self._CalcTimeout(time.time())
136 # Wait for I/O events
138 io_events = poller.poll(timeout)
139 except select.error, err:
140 # EINTR can happen when signals are sent
141 if err.args and err.args[0] in (errno.EINTR,):
146 after_poll = time.time()
149 # Check for I/O events
150 for (evfd, evcond) in io_events:
151 owner = self._io_wait.get(evfd, None)
153 owner.OnIO(evfd, evcond)
156 self._CheckTimers(after_poll)
158 # Check whether signal was raised
159 if sigchld_handler and sigchld_handler.called:
160 self._CallSignalWaiters(signal.SIGCHLD)
161 sigchld_handler.Clear()
163 if sigterm_handler and sigterm_handler.called:
164 self._CallSignalWaiters(signal.SIGTERM)
166 sigterm_handler.Clear()
168 # Restore signal handlers
170 sigterm_handler.Reset()
173 sigchld_handler.Reset()
175 def _CalcTimeout(self, now):
181 # TODO: Repeating timers
185 for timer in self._timer.itervalues():
186 time_left = (timer.start + timer.interval) - now
187 if timeout is None or time_left < timeout:
192 elif timeout < min_timeout:
193 timeout = min_timeout
196 return timeout * 1000.0
198 def _CheckTimers(self, now):
199 # TODO: Repeating timers
200 for timer in self._timer.itervalues():
201 if now < (timer.start + timer.interval):
204 timer.owner.OnTimer(timer.timer_id)
206 # TODO: Repeating timers should not be removed
207 self._timer_remove.append(timer.timer_id)
209 def _CallSignalWaiters(self, signum):
210 """Calls all signal waiters for a certain signal.
213 @param signum: Signal number
216 for owner in self._signal_wait:
217 owner.OnSignal(signal.SIGCHLD)
219 def RegisterIO(self, owner, fd, condition):
220 """Registers a receiver for I/O notifications
222 The receiver must support a "OnIO(self, fd, conditions)" function.
224 @type owner: instance
225 @param owner: Receiver
227 @param fd: File descriptor
229 @param condition: ORed field of conditions to be notified
233 # select.Poller also supports file() like objects, but we don't.
234 assert isinstance(fd, (int, long)), \
235 "Only integers are supported for file descriptors"
237 self._io_wait_add.append((owner, fd, condition))
239 def UnregisterIO(self, fd):
240 """Unregister a file descriptor.
242 It'll be unregistered the next time the mainloop checks for it.
245 @param fd: File descriptor
248 # select.Poller also supports file() like objects, but we don't.
249 assert isinstance(fd, (int, long)), \
250 "Only integers are supported for file descriptors"
252 self._io_wait_remove.append(fd)
254 def RegisterSignal(self, owner):
255 """Registers a receiver for signal notifications
257 The receiver must support a "OnSignal(self, signum)" function.
259 @type owner: instance
260 @param owner: Receiver
263 self._signal_wait.append(owner)
265 def AddTimer(self, owner, interval, repeat):
268 The receiver must support a "OnTimer(self, timer_id)" function.
270 @type owner: instance
271 @param owner: Receiver
272 @type interval: int or float
273 @param interval: Timer interval in seconds
275 @param repeat: Whether this is a repeating timer or one-off
278 # TODO: Implement repeating timers
279 assert not repeat, "Repeating timers are not yet supported"
282 self._timer_id_last += 1
284 timer_id = self._timer_id_last
286 self._timer_add.append(Timer(owner, timer_id, time.time(),
287 float(interval), repeat))
291 def RemoveTimer(self, timer_id):
295 @param timer_id: Timer ID
298 self._timer_remove.append(timer_id)