4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import errors
41 class SchedulerBreakout(Exception):
42 """Exception used to get out of the scheduler loop
47 def AsyncoreDelayFunction(timeout):
48 """Asyncore-compatible scheduler delay function.
50 This is a delay function for sched that, rather than actually sleeping,
51 executes asyncore events happening in the meantime.
53 After an event has occurred, rather than returning, it raises a
54 SchedulerBreakout exception, which will force the current scheduler.run()
55 invocation to terminate, so that we can also check for signals. The main loop
56 will then call the scheduler run again, which will allow it to actually
57 process any due events.
59 This is needed because scheduler.run() doesn't support a count=..., as
60 asyncore loop, and the scheduler module documents throwing exceptions from
61 inside the delay function as an allowed usage model.
64 asyncore.loop(timeout=timeout, count=1, use_poll=True)
65 raise SchedulerBreakout()
68 class AsyncoreScheduler(sched.scheduler):
69 """Event scheduler integrated with asyncore
72 def __init__(self, timefunc):
73 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
76 class AsyncUDPSocket(asyncore.dispatcher):
77 """An improved asyncore udp socket.
81 """Constructor for AsyncUDPSocket
84 asyncore.dispatcher.__init__(self)
86 self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
88 # this method is overriding an asyncore.dispatcher method
89 def handle_connect(self):
90 # Python thinks that the first udp message from a source qualifies as a
91 # "connect" and further ones are part of the same connection. We beg to
92 # differ and treat all messages equally.
95 # this method is overriding an asyncore.dispatcher method
96 def handle_read(self):
99 payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE)
100 except socket.error, err:
101 if err.errno == errno.EINTR:
102 # we got a signal while trying to read. no need to do anything,
103 # handle_read will be called again if there is data on the socket.
108 self.handle_datagram(payload, ip, port)
110 # we need to catch any exception here, log it, but proceed, because even
111 # if we failed handling a single request, we still want to continue.
112 logging.error("Unexpected exception", exc_info=True)
114 def handle_datagram(self, payload, ip, port):
115 """Handle an already read udp datagram
118 raise NotImplementedError
120 # this method is overriding an asyncore.dispatcher method
122 # We should check whether we can write to the socket only if we have
123 # something scheduled to be written
124 return bool(self._out_queue)
126 def handle_write(self):
128 if not self._out_queue:
129 logging.error("handle_write called with empty output queue")
131 (ip, port, payload) = self._out_queue[0]
133 self.sendto(payload, 0, (ip, port))
134 except socket.error, err:
135 if err.errno == errno.EINTR:
136 # we got a signal while trying to write. no need to do anything,
137 # handle_write will be called again because we haven't emptied the
138 # _out_queue, and we'll try again
142 self._out_queue.pop(0)
144 # we need to catch any exception here, log it, but proceed, because even
145 # if we failed sending a single datagram we still want to continue.
146 logging.error("Unexpected exception", exc_info=True)
148 def enqueue_send(self, ip, port, payload):
149 """Enqueue a datagram to be sent when possible
152 if len(payload) > constants.MAX_UDP_DATA_SIZE:
153 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
154 constants.MAX_UDP_DATA_SIZE))
155 self._out_queue.append((ip, port, payload))
158 class Mainloop(object):
159 """Generic mainloop for daemons
163 """Constructs a new Mainloop instance.
165 @ivar scheduler: A L{sched.scheduler} object, which can be used to register
169 self._signal_wait = []
170 self.scheduler = AsyncoreScheduler(time.time)
172 @utils.SignalHandled([signal.SIGCHLD])
173 @utils.SignalHandled([signal.SIGTERM])
174 def Run(self, stop_on_empty=False, signal_handlers=None):
175 """Runs the mainloop.
177 @type stop_on_empty: bool
178 @param stop_on_empty: Whether to stop mainloop once all I/O waiters
180 @type signal_handlers: dict
181 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
184 assert isinstance(signal_handlers, dict) and \
185 len(signal_handlers) > 0, \
186 "Broken SignalHandled decorator"
188 # Start actual main loop
190 # Stop if nothing is listening anymore
191 if stop_on_empty and not (self._io_wait):
194 if not self.scheduler.empty():
197 except SchedulerBreakout:
200 asyncore.loop(count=1, use_poll=True)
202 # Check whether a signal was raised
203 for sig in signal_handlers:
204 handler = signal_handlers[sig]
206 self._CallSignalWaiters(sig)
207 running = (sig != signal.SIGTERM)
210 def _CallSignalWaiters(self, signum):
211 """Calls all signal waiters for a certain signal.
214 @param signum: Signal number
217 for owner in self._signal_wait:
218 owner.OnSignal(signum)
220 def RegisterSignal(self, owner):
221 """Registers a receiver for signal notifications
223 The receiver must support a "OnSignal(self, signum)" function.
225 @type owner: instance
226 @param owner: Receiver
229 self._signal_wait.append(owner)
232 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
233 """Shared main function for daemons.
235 @type daemon_name: string
236 @param daemon_name: daemon name
237 @type optionparser: L{optparse.OptionParser}
238 @param optionparser: initialized optionparser with daemon-specific options
239 (common -f -d options will be handled by this module)
240 @type options: object @param options: OptionParser result, should contain at
241 least the fork and the debug options
242 @type dirs: list of strings
243 @param dirs: list of directories that must exist for this daemon to work
244 @type check_fn: function which accepts (options, args)
245 @param check_fn: function that checks start conditions and exits if they're
247 @type exec_fn: function which accepts (options, args)
248 @param exec_fn: function that's executed with the daemon's pid file held, and
249 runs the daemon itself.
252 optionparser.add_option("-f", "--foreground", dest="fork",
253 help="Don't detach from the current terminal",
254 default=True, action="store_false")
255 optionparser.add_option("-d", "--debug", dest="debug",
256 help="Enable some debug messages",
257 default=False, action="store_true")
258 if daemon_name in constants.DAEMONS_PORTS:
259 # for networked daemons we also allow choosing the bind port and address.
260 # by default we use the port provided by utils.GetDaemonPort, and bind to
261 # 0.0.0.0 (which is represented by and empty bind address.
262 port = utils.GetDaemonPort(daemon_name)
263 optionparser.add_option("-p", "--port", dest="port",
264 help="Network port (%s default)." % port,
265 default=port, type="int")
266 optionparser.add_option("-b", "--bind", dest="bind_address",
268 default="", metavar="ADDRESS")
270 if daemon_name in constants.DAEMONS_SSL:
271 default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
272 optionparser.add_option("--no-ssl", dest="ssl",
273 help="Do not secure HTTP protocol with SSL",
274 default=True, action="store_false")
275 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
277 default=default_key, type="string")
278 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
279 help="SSL certificate",
280 default=default_cert, type="string")
282 multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
284 options, args = optionparser.parse_args()
286 if hasattr(options, 'ssl') and options.ssl:
287 if not (options.ssl_cert and options.ssl_key):
288 print >> sys.stderr, "Need key and certificate to use ssl"
289 sys.exit(constants.EXIT_FAILURE)
290 for fname in (options.ssl_cert, options.ssl_key):
291 if not os.path.isfile(fname):
292 print >> sys.stderr, "Need ssl file %s to run" % fname
293 sys.exit(constants.EXIT_FAILURE)
295 if check_fn is not None:
296 check_fn(options, args)
298 utils.EnsureDirs(dirs)
302 utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
304 utils.WritePidFile(daemon_name)
306 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
308 stderr_logging=not options.fork,
309 multithreaded=multithread)
310 logging.info("%s daemon startup" % daemon_name)
311 exec_fn(options, args)
313 utils.RemovePidFile(daemon_name)