4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import errors
41 class SchedulerBreakout(Exception):
42 """Exception used to get out of the scheduler loop
47 def AsyncoreDelayFunction(timeout):
48 """Asyncore-compatible scheduler delay function.
50 This is a delay function for sched that, rather than actually sleeping,
51 executes asyncore events happening in the meantime.
53 After an event has occurred, rather than returning, it raises a
54 SchedulerBreakout exception, which will force the current scheduler.run()
55 invocation to terminate, so that we can also check for signals. The main loop
56 will then call the scheduler run again, which will allow it to actually
57 process any due events.
59 This is needed because scheduler.run() doesn't support a count=..., as
60 asyncore loop, and the scheduler module documents throwing exceptions from
61 inside the delay function as an allowed usage model.
64 asyncore.loop(timeout=timeout, count=1, use_poll=True)
65 raise SchedulerBreakout()
68 class AsyncoreScheduler(sched.scheduler):
69 """Event scheduler integrated with asyncore
72 def __init__(self, timefunc):
73 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
76 class AsyncUDPSocket(asyncore.dispatcher):
77 """An improved asyncore udp socket.
81 """Constructor for AsyncUDPSocket
84 asyncore.dispatcher.__init__(self)
86 self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
88 # this method is overriding an asyncore.dispatcher method
89 def handle_connect(self):
90 # Python thinks that the first udp message from a source qualifies as a
91 # "connect" and further ones are part of the same connection. We beg to
92 # differ and treat all messages equally.
97 payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE)
98 except socket.error, err:
99 if err.errno == errno.EINTR:
100 # we got a signal while trying to read. no need to do anything,
101 # handle_read will be called again if there is data on the socket.
106 self.handle_datagram(payload, ip, port)
108 # this method is overriding an asyncore.dispatcher method
109 def handle_read(self):
112 except: # pylint: disable-msg=W0702
113 # we need to catch any exception here, log it, but proceed, because even
114 # if we failed handling a single request, we still want to continue.
115 logging.error("Unexpected exception", exc_info=True)
117 def handle_datagram(self, payload, ip, port):
118 """Handle an already read udp datagram
121 raise NotImplementedError
123 # this method is overriding an asyncore.dispatcher method
125 # We should check whether we can write to the socket only if we have
126 # something scheduled to be written
127 return bool(self._out_queue)
129 def handle_write(self):
131 if not self._out_queue:
132 logging.error("handle_write called with empty output queue")
134 (ip, port, payload) = self._out_queue[0]
136 self.sendto(payload, 0, (ip, port))
137 except socket.error, err:
138 if err.errno == errno.EINTR:
139 # we got a signal while trying to write. no need to do anything,
140 # handle_write will be called again because we haven't emptied the
141 # _out_queue, and we'll try again
145 self._out_queue.pop(0)
146 except: # pylint: disable-msg=W0702
147 # we need to catch any exception here, log it, but proceed, because even
148 # if we failed sending a single datagram we still want to continue.
149 logging.error("Unexpected exception", exc_info=True)
151 def enqueue_send(self, ip, port, payload):
152 """Enqueue a datagram to be sent when possible
155 if len(payload) > constants.MAX_UDP_DATA_SIZE:
156 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
157 constants.MAX_UDP_DATA_SIZE))
158 self._out_queue.append((ip, port, payload))
160 def process_next_packet(self, timeout=0):
161 """Process the next datagram, waiting for it if necessary.
164 @param timeout: how long to wait for data
166 @return: True if some data has been handled, False otherwise
169 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
170 if result is not None and result & select.POLLIN:
177 class Mainloop(object):
178 """Generic mainloop for daemons
180 @ivar scheduler: A sched.scheduler object, which can be used to register
185 """Constructs a new Mainloop instance.
188 self._signal_wait = []
189 self.scheduler = AsyncoreScheduler(time.time)
191 @utils.SignalHandled([signal.SIGCHLD])
192 @utils.SignalHandled([signal.SIGTERM])
193 def Run(self, signal_handlers=None):
194 """Runs the mainloop.
196 @type signal_handlers: dict
197 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
200 assert isinstance(signal_handlers, dict) and \
201 len(signal_handlers) > 0, \
202 "Broken SignalHandled decorator"
204 # Start actual main loop
206 if not self.scheduler.empty():
209 except SchedulerBreakout:
212 asyncore.loop(count=1, use_poll=True)
214 # Check whether a signal was raised
215 for sig in signal_handlers:
216 handler = signal_handlers[sig]
218 self._CallSignalWaiters(sig)
219 running = (sig != signal.SIGTERM)
222 def _CallSignalWaiters(self, signum):
223 """Calls all signal waiters for a certain signal.
226 @param signum: Signal number
229 for owner in self._signal_wait:
230 owner.OnSignal(signum)
232 def RegisterSignal(self, owner):
233 """Registers a receiver for signal notifications
235 The receiver must support a "OnSignal(self, signum)" function.
237 @type owner: instance
238 @param owner: Receiver
241 self._signal_wait.append(owner)
244 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
246 default_ssl_cert=None, default_ssl_key=None):
247 """Shared main function for daemons.
249 @type daemon_name: string
250 @param daemon_name: daemon name
251 @type optionparser: optparse.OptionParser
252 @param optionparser: initialized optionparser with daemon-specific options
253 (common -f -d options will be handled by this module)
254 @type dirs: list of strings
255 @param dirs: list of directories that must exist for this daemon to work
256 @type check_fn: function which accepts (options, args)
257 @param check_fn: function that checks start conditions and exits if they're
259 @type exec_fn: function which accepts (options, args)
260 @param exec_fn: function that's executed with the daemon's pid file held, and
261 runs the daemon itself.
262 @type multithreaded: bool
263 @param multithreaded: Whether the daemon uses threads
264 @type default_ssl_cert: string
265 @param default_ssl_cert: Default SSL certificate path
266 @type default_ssl_key: string
267 @param default_ssl_key: Default SSL key path
270 optionparser.add_option("-f", "--foreground", dest="fork",
271 help="Don't detach from the current terminal",
272 default=True, action="store_false")
273 optionparser.add_option("-d", "--debug", dest="debug",
274 help="Enable some debug messages",
275 default=False, action="store_true")
276 optionparser.add_option("--syslog", dest="syslog",
277 help="Enable logging to syslog (except debug"
278 " messages); one of 'no', 'yes' or 'only' [%s]" %
279 constants.SYSLOG_USAGE,
280 default=constants.SYSLOG_USAGE,
281 choices=["no", "yes", "only"])
283 if daemon_name in constants.DAEMONS_PORTS:
284 default_bind_address = "0.0.0.0"
285 default_port = utils.GetDaemonPort(daemon_name)
287 # For networked daemons we allow choosing the port and bind address
288 optionparser.add_option("-p", "--port", dest="port",
289 help="Network port (default: %s)" % default_port,
290 default=default_port, type="int")
291 optionparser.add_option("-b", "--bind", dest="bind_address",
292 help=("Bind address (default: %s)" %
293 default_bind_address),
294 default=default_bind_address, metavar="ADDRESS")
296 if default_ssl_key is not None and default_ssl_cert is not None:
297 optionparser.add_option("--no-ssl", dest="ssl",
298 help="Do not secure HTTP protocol with SSL",
299 default=True, action="store_false")
300 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
301 help=("SSL key path (default: %s)" %
303 default=default_ssl_key, type="string",
304 metavar="SSL_KEY_PATH")
305 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
306 help=("SSL certificate path (default: %s)" %
308 default=default_ssl_cert, type="string",
309 metavar="SSL_CERT_PATH")
311 # Disable the use of fork(2) if the daemon uses threads
312 utils.no_fork = multithreaded
314 options, args = optionparser.parse_args()
316 if getattr(options, "ssl", False):
318 "certificate": options.ssl_cert,
319 "key": options.ssl_key,
322 for name, path in ssl_paths.iteritems():
323 if not os.path.isfile(path):
324 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
325 sys.exit(constants.EXIT_FAILURE)
327 # TODO: By initiating http.HttpSslParams here we would only read the files
328 # once and have a proper validation (isfile returns False on directories)
331 if check_fn is not None:
332 check_fn(options, args)
334 utils.EnsureDirs(dirs)
338 utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
340 utils.WritePidFile(daemon_name)
342 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
344 stderr_logging=not options.fork,
345 multithreaded=multithreaded,
347 syslog=options.syslog)
348 logging.info("%s daemon startup", daemon_name)
349 exec_fn(options, args)
351 utils.RemovePidFile(daemon_name)