4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
35 from ganeti import utils
36 from ganeti import constants
37 from ganeti import errors
40 class SchedulerBreakout(Exception):
41 """Exception used to get out of the scheduler loop
46 def AsyncoreDelayFunction(timeout):
47 """Asyncore-compatible scheduler delay function.
49 This is a delay function for sched that, rather than actually sleeping,
50 executes asyncore events happening in the meantime.
52 After an event has occurred, rather than returning, it raises a
53 SchedulerBreakout exception, which will force the current scheduler.run()
54 invocation to terminate, so that we can also check for signals. The main loop
55 will then call the scheduler run again, which will allow it to actually
56 process any due events.
58 This is needed because scheduler.run() doesn't support a count=..., as
59 asyncore loop, and the scheduler module documents throwing exceptions from
60 inside the delay function as an allowed usage model.
63 asyncore.loop(timeout=timeout, count=1, use_poll=True)
64 raise SchedulerBreakout()
67 class AsyncoreScheduler(sched.scheduler):
68 """Event scheduler integrated with asyncore
71 def __init__(self, timefunc):
72 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
75 class AsyncUDPSocket(asyncore.dispatcher):
76 """An improved asyncore udp socket.
80 """Constructor for AsyncUDPSocket
83 asyncore.dispatcher.__init__(self)
85 self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
87 # this method is overriding an asyncore.dispatcher method
88 def handle_connect(self):
89 # Python thinks that the first udp message from a source qualifies as a
90 # "connect" and further ones are part of the same connection. We beg to
91 # differ and treat all messages equally.
94 # this method is overriding an asyncore.dispatcher method
95 def handle_read(self):
98 payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE)
99 except socket.error, err:
100 if err.errno == errno.EINTR:
101 # we got a signal while trying to read. no need to do anything,
102 # handle_read will be called again if there is data on the socket.
107 self.handle_datagram(payload, ip, port)
109 # we need to catch any exception here, log it, but proceed, because even
110 # if we failed handling a single request, we still want to continue.
111 logging.error("Unexpected exception", exc_info=True)
113 def handle_datagram(self, payload, ip, port):
114 """Handle an already read udp datagram
117 raise NotImplementedError
119 # this method is overriding an asyncore.dispatcher method
121 # We should check whether we can write to the socket only if we have
122 # something scheduled to be written
123 return bool(self._out_queue)
125 def handle_write(self):
127 if not self._out_queue:
128 logging.error("handle_write called with empty output queue")
130 (ip, port, payload) = self._out_queue[0]
132 self.sendto(payload, 0, (ip, port))
133 except socket.error, err:
134 if err.errno == errno.EINTR:
135 # we got a signal while trying to write. no need to do anything,
136 # handle_write will be called again because we haven't emptied the
137 # _out_queue, and we'll try again
141 self._out_queue.pop(0)
143 # we need to catch any exception here, log it, but proceed, because even
144 # if we failed sending a single datagram we still want to continue.
145 logging.error("Unexpected exception", exc_info=True)
147 def enqueue_send(self, ip, port, payload):
148 """Enqueue a datagram to be sent when possible
151 if len(payload) > constants.MAX_UDP_DATA_SIZE:
152 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
153 constants.MAX_UDP_DATA_SIZE))
154 self._out_queue.append((ip, port, payload))
157 class Mainloop(object):
158 """Generic mainloop for daemons
160 @ivar scheduler: A sched.scheduler object, which can be used to register
165 """Constructs a new Mainloop instance.
168 self._signal_wait = []
169 self.scheduler = AsyncoreScheduler(time.time)
171 @utils.SignalHandled([signal.SIGCHLD])
172 @utils.SignalHandled([signal.SIGTERM])
173 def Run(self, signal_handlers=None):
174 """Runs the mainloop.
176 @type signal_handlers: dict
177 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
180 assert isinstance(signal_handlers, dict) and \
181 len(signal_handlers) > 0, \
182 "Broken SignalHandled decorator"
184 # Start actual main loop
186 if not self.scheduler.empty():
189 except SchedulerBreakout:
192 asyncore.loop(count=1, use_poll=True)
194 # Check whether a signal was raised
195 for sig in signal_handlers:
196 handler = signal_handlers[sig]
198 self._CallSignalWaiters(sig)
199 running = (sig != signal.SIGTERM)
202 def _CallSignalWaiters(self, signum):
203 """Calls all signal waiters for a certain signal.
206 @param signum: Signal number
209 for owner in self._signal_wait:
210 owner.OnSignal(signum)
212 def RegisterSignal(self, owner):
213 """Registers a receiver for signal notifications
215 The receiver must support a "OnSignal(self, signum)" function.
217 @type owner: instance
218 @param owner: Receiver
221 self._signal_wait.append(owner)
224 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
226 default_ssl_cert=None, default_ssl_key=None):
227 """Shared main function for daemons.
229 @type daemon_name: string
230 @param daemon_name: daemon name
231 @type optionparser: optparse.OptionParser
232 @param optionparser: initialized optionparser with daemon-specific options
233 (common -f -d options will be handled by this module)
234 @type dirs: list of strings
235 @param dirs: list of directories that must exist for this daemon to work
236 @type check_fn: function which accepts (options, args)
237 @param check_fn: function that checks start conditions and exits if they're
239 @type exec_fn: function which accepts (options, args)
240 @param exec_fn: function that's executed with the daemon's pid file held, and
241 runs the daemon itself.
242 @type multithreaded: bool
243 @param multithreaded: Whether the daemon uses threads
244 @type default_ssl_cert: string
245 @param default_ssl_cert: Default SSL certificate path
246 @type default_ssl_key: string
247 @param default_ssl_key: Default SSL key path
250 optionparser.add_option("-f", "--foreground", dest="fork",
251 help="Don't detach from the current terminal",
252 default=True, action="store_false")
253 optionparser.add_option("-d", "--debug", dest="debug",
254 help="Enable some debug messages",
255 default=False, action="store_true")
257 if daemon_name in constants.DAEMONS_PORTS:
258 default_bind_address = "0.0.0.0"
259 default_port = utils.GetDaemonPort(daemon_name)
261 # For networked daemons we allow choosing the port and bind address
262 optionparser.add_option("-p", "--port", dest="port",
263 help="Network port (default: %s)" % default_port,
264 default=default_port, type="int")
265 optionparser.add_option("-b", "--bind", dest="bind_address",
266 help=("Bind address (default: %s)" %
267 default_bind_address),
268 default=default_bind_address, metavar="ADDRESS")
270 if default_ssl_key is not None and default_ssl_cert is not None:
271 optionparser.add_option("--no-ssl", dest="ssl",
272 help="Do not secure HTTP protocol with SSL",
273 default=True, action="store_false")
274 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
275 help=("SSL key path (default: %s)" %
277 default=default_ssl_key, type="string",
278 metavar="SSL_KEY_PATH")
279 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
280 help=("SSL certificate path (default: %s)" %
282 default=default_ssl_cert, type="string",
283 metavar="SSL_CERT_PATH")
285 # Disable the use of fork(2) if the daemon uses threads
286 utils.no_fork = multithreaded
288 options, args = optionparser.parse_args()
290 if getattr(options, "ssl", False):
292 "certificate": options.ssl_cert,
293 "key": options.ssl_key,
296 for name, path in ssl_paths.iteritems():
297 if not os.path.isfile(path):
298 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
299 sys.exit(constants.EXIT_FAILURE)
301 # TODO: By initiating http.HttpSslParams here we would only read the files
302 # once and have a proper validation (isfile returns False on directories)
305 if check_fn is not None:
306 check_fn(options, args)
308 utils.EnsureDirs(dirs)
312 utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
314 utils.WritePidFile(daemon_name)
316 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
318 stderr_logging=not options.fork,
319 multithreaded=multithreaded)
320 logging.info("%s daemon startup", daemon_name)
321 exec_fn(options, args)
323 utils.RemovePidFile(daemon_name)