4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
32 from ganeti import utils
33 from ganeti import constants
37 def __init__(self, owner, timer_id, start, interval, repeat):
39 self.timer_id = timer_id
41 self.interval = interval
45 class Mainloop(object):
46 """Generic mainloop for daemons
50 """Constructs a new Mainloop instance.
54 self._io_wait_add = []
55 self._io_wait_remove = []
56 self._signal_wait = []
57 self._timer_id_last = 0
60 self._timer_remove = []
62 def Run(self, handle_sigchld=True, handle_sigterm=True, stop_on_empty=False):
65 @type handle_sigchld: bool
66 @param handle_sigchld: Whether to install handler for SIGCHLD
67 @type handle_sigterm: bool
68 @param handle_sigterm: Whether to install handler for SIGTERM
69 @type stop_on_empty: bool
70 @param stop_on_empty: Whether to stop mainloop once all I/O waiters
74 poller = select.poll()
76 # Setup signal handlers
78 sigchld_handler = utils.SignalHandler([signal.SIGCHLD])
80 sigchld_handler = None
83 sigterm_handler = utils.SignalHandler([signal.SIGTERM])
85 sigterm_handler = None
90 timeout_needs_update = True
92 # Start actual main loop
94 # Entries could be added again afterwards, hence removing first
95 if self._io_wait_remove:
96 for fd in self._io_wait_remove:
102 del self._io_wait[fd]
105 self._io_wait_remove = []
108 if self._io_wait_add:
109 for (owner, fd, conditions) in self._io_wait_add:
110 self._io_wait[fd] = owner
111 poller.register(fd, conditions)
112 self._io_wait_add = []
116 timeout_needs_update = True
117 for timer in self._timer_add:
118 self._timer[timer.timer_id] = timer
119 del self._timer_add[:]
122 if self._timer_remove:
123 timeout_needs_update = True
124 for timer_id in self._timer_remove:
126 del self._timer[timer_id]
129 del self._timer_remove[:]
131 # Stop if nothing is listening anymore
132 if stop_on_empty and not (self._io_wait or self._timer):
135 # Calculate timeout again if required
136 if timeout_needs_update:
137 timeout = self._CalcTimeout(time.time())
138 timeout_needs_update = False
140 # Wait for I/O events
142 io_events = poller.poll(timeout)
143 except select.error, err:
144 # EINTR can happen when signals are sent
145 if err.args and err.args[0] in (errno.EINTR,):
150 after_poll = time.time()
153 # Check for I/O events
154 for (evfd, evcond) in io_events:
155 owner = self._io_wait.get(evfd, None)
157 owner.OnIO(evfd, evcond)
160 self._CheckTimers(after_poll)
162 # Check whether signal was raised
163 if sigchld_handler and sigchld_handler.called:
164 self._CallSignalWaiters(signal.SIGCHLD)
165 sigchld_handler.Clear()
167 if sigterm_handler and sigterm_handler.called:
168 self._CallSignalWaiters(signal.SIGTERM)
170 sigterm_handler.Clear()
172 # Restore signal handlers
174 sigterm_handler.Reset()
177 sigchld_handler.Reset()
179 def _CalcTimeout(self, now):
185 # TODO: Repeating timers
189 for timer in self._timer.itervalues():
190 time_left = (timer.start + timer.interval) - now
191 if timeout is None or time_left < timeout:
196 elif timeout < min_timeout:
197 timeout = min_timeout
200 return timeout * 1000.0
202 def _CheckTimers(self, now):
203 # TODO: Repeating timers
204 for timer in self._timer.itervalues():
205 if now < (timer.start + timer.interval):
208 timer.owner.OnTimer(timer.timer_id)
210 # TODO: Repeating timers should not be removed
211 self._timer_remove.append(timer.timer_id)
213 def _CallSignalWaiters(self, signum):
214 """Calls all signal waiters for a certain signal.
217 @param signum: Signal number
220 for owner in self._signal_wait:
221 owner.OnSignal(signal.SIGCHLD)
223 def RegisterIO(self, owner, fd, condition):
224 """Registers a receiver for I/O notifications
226 The receiver must support a "OnIO(self, fd, conditions)" function.
228 @type owner: instance
229 @param owner: Receiver
231 @param fd: File descriptor
233 @param condition: ORed field of conditions to be notified
237 # select.Poller also supports file() like objects, but we don't.
238 assert isinstance(fd, (int, long)), \
239 "Only integers are supported for file descriptors"
241 self._io_wait_add.append((owner, fd, condition))
243 def UnregisterIO(self, fd):
244 """Unregister a file descriptor.
246 It'll be unregistered the next time the mainloop checks for it.
249 @param fd: File descriptor
252 # select.Poller also supports file() like objects, but we don't.
253 assert isinstance(fd, (int, long)), \
254 "Only integers are supported for file descriptors"
256 self._io_wait_remove.append(fd)
258 def RegisterSignal(self, owner):
259 """Registers a receiver for signal notifications
261 The receiver must support a "OnSignal(self, signum)" function.
263 @type owner: instance
264 @param owner: Receiver
267 self._signal_wait.append(owner)
269 def AddTimer(self, owner, interval, repeat):
272 The receiver must support a "OnTimer(self, timer_id)" function.
274 @type owner: instance
275 @param owner: Receiver
276 @type interval: int or float
277 @param interval: Timer interval in seconds
279 @param repeat: Whether this is a repeating timer or one-off
282 # TODO: Implement repeating timers
283 assert not repeat, "Repeating timers are not yet supported"
286 self._timer_id_last += 1
288 timer_id = self._timer_id_last
290 self._timer_add.append(Timer(owner, timer_id, time.time(),
291 float(interval), repeat))
295 def RemoveTimer(self, timer_id):
299 @param timer_id: Timer ID
302 self._timer_remove.append(timer_id)
305 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
306 """Shared main function for daemons.
308 @type daemon_name: string
309 @param daemon_name: daemon name
310 @type optionparser: L{optparse.OptionParser}
311 @param optionparser: initialized optionparser with daemon-specific options
312 (common -f -d options will be handled by this module)
313 @type options: object @param options: OptionParser result, should contain at
314 least the fork and the debug options
315 @type dirs: list of strings
316 @param dirs: list of directories that must exist for this daemon to work
317 @type check_fn: function which accepts (options, args)
318 @param check_fn: function that checks start conditions and exits if they're
320 @type exec_fn: function which accepts (options, args)
321 @param exec_fn: function that's executed with the daemon's pid file held, and
322 runs the daemon itself.
325 optionparser.add_option("-f", "--foreground", dest="fork",
326 help="Don't detach from the current terminal",
327 default=True, action="store_false")
328 optionparser.add_option("-d", "--debug", dest="debug",
329 help="Enable some debug messages",
330 default=False, action="store_true")
331 if daemon_name in constants.DAEMONS_PORTS:
332 # for networked daemons we also allow choosing the bind port and address.
333 # by default we use the port provided by utils.GetDaemonPort, and bind to
334 # 0.0.0.0 (which is represented by and empty bind address.
335 port = utils.GetDaemonPort(daemon_name)
336 optionparser.add_option("-p", "--port", dest="port",
337 help="Network port (%s default)." % port,
338 default=port, type="int")
339 optionparser.add_option("-b", "--bind", dest="bind_address",
341 default="", metavar="ADDRESS")
343 if daemon_name in constants.DAEMONS_SSL:
344 default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
345 optionparser.add_option("--no-ssl", dest="ssl",
346 help="Do not secure HTTP protocol with SSL",
347 default=True, action="store_false")
348 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
350 default=default_key, type="string")
351 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
352 help="SSL certificate",
353 default=default_cert, type="string")
355 multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
357 options, args = optionparser.parse_args()
359 if hasattr(options, 'ssl') and options.ssl:
360 if not (options.ssl_cert and options.ssl_key):
361 print >> sys.stderr, "Need key and certificate to use ssl"
362 sys.exit(constants.EXIT_FAILURE)
363 for fname in (options.ssl_cert, options.ssl_key):
364 if not os.path.isfile(fname):
365 print >> sys.stderr, "Need ssl file %s to run" % fname
366 sys.exit(constants.EXIT_FAILURE)
368 if check_fn is not None:
369 check_fn(options, args)
371 utils.EnsureDirs(dirs)
375 utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
377 utils.WritePidFile(daemon_name)
379 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
381 stderr_logging=not options.fork,
382 multithreaded=multithread)
383 logging.info("%s daemon startup" % daemon_name)
384 exec_fn(options, args)
386 utils.RemovePidFile(daemon_name)