4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
35 from ganeti import utils
36 from ganeti import constants
37 from ganeti import errors
40 class SchedulerBreakout(Exception):
41 """Exception used to get out of the scheduler loop
46 def AsyncoreDelayFunction(timeout):
47 """Asyncore-compatible scheduler delay function.
49 This is a delay function for sched that, rather than actually sleeping,
50 executes asyncore events happening in the meantime.
52 After an event has occurred, rather than returning, it raises a
53 SchedulerBreakout exception, which will force the current scheduler.run()
54 invocation to terminate, so that we can also check for signals. The main loop
55 will then call the scheduler run again, which will allow it to actually
56 process any due events.
58 This is needed because scheduler.run() doesn't support a count=..., as
59 asyncore loop, and the scheduler module documents throwing exceptions from
60 inside the delay function as an allowed usage model.
63 asyncore.loop(timeout=timeout, count=1, use_poll=True)
64 raise SchedulerBreakout()
67 class AsyncoreScheduler(sched.scheduler):
68 """Event scheduler integrated with asyncore
71 def __init__(self, timefunc):
72 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
75 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
76 """Base Ganeti Asyncore Dispacher
79 # this method is overriding an asyncore.dispatcher method
80 def handle_error(self):
81 """Log an error in handling any request, and proceed.
84 logging.exception("Error while handling asyncore request")
86 # this method is overriding an asyncore.dispatcher method
88 """Most of the time we don't want to check for writability.
94 def FormatAddress(family, address):
95 """Format a client's address
98 @param family: socket family (one of socket.AF_*)
99 @type address: family specific (usually tuple)
100 @param address: address, as reported by this class
103 if family == socket.AF_INET and len(address) == 2:
104 return "%s:%d" % address
105 elif family == socket.AF_UNIX and len(address) == 3:
106 return "pid=%s, uid=%s, gid=%s" % address
111 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
112 """A stream server to use with asyncore.
114 Each request is accepted, and then dispatched to a separate asyncore
115 dispatcher to handle.
119 _REQUEST_QUEUE_SIZE = 5
121 def __init__(self, family, address):
122 """Constructor for AsyncUnixStreamSocket
124 @type family: integer
125 @param family: socket family (one of socket.AF_*)
126 @type address: address family dependent
127 @param address: address to bind the socket to
130 GanetiBaseAsyncoreDispatcher.__init__(self)
132 self.create_socket(self.family, socket.SOCK_STREAM)
133 self.set_reuse_addr()
135 self.listen(self._REQUEST_QUEUE_SIZE)
137 # this method is overriding an asyncore.dispatcher method
138 def handle_accept(self):
139 """Accept a new client connection.
141 Creates a new instance of the handler class, which will use asyncore to
145 accept_result = utils.IgnoreSignals(self.accept)
146 if accept_result is not None:
147 connected_socket, client_address = accept_result
148 if self.family == socket.AF_UNIX:
149 # override the client address, as for unix sockets nothing meaningful
150 # is passed in from accept anyway
151 client_address = utils.GetSocketCredentials(connected_socket)
152 logging.info("Accepted connection from %s",
153 FormatAddress(self.family, client_address))
154 self.handle_connection(connected_socket, client_address)
156 def handle_connection(self, connected_socket, client_address):
157 """Handle an already accepted connection.
160 raise NotImplementedError
163 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
164 """An improved asyncore udp socket.
168 """Constructor for AsyncUDPSocket
171 GanetiBaseAsyncoreDispatcher.__init__(self)
173 self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
175 # this method is overriding an asyncore.dispatcher method
176 def handle_connect(self):
177 # Python thinks that the first udp message from a source qualifies as a
178 # "connect" and further ones are part of the same connection. We beg to
179 # differ and treat all messages equally.
182 # this method is overriding an asyncore.dispatcher method
183 def handle_read(self):
184 recv_result = utils.IgnoreSignals(self.recvfrom,
185 constants.MAX_UDP_DATA_SIZE)
186 if recv_result is not None:
187 payload, address = recv_result
189 self.handle_datagram(payload, ip, port)
191 def handle_datagram(self, payload, ip, port):
192 """Handle an already read udp datagram
195 raise NotImplementedError
197 # this method is overriding an asyncore.dispatcher method
199 # We should check whether we can write to the socket only if we have
200 # something scheduled to be written
201 return bool(self._out_queue)
203 # this method is overriding an asyncore.dispatcher method
204 def handle_write(self):
205 if not self._out_queue:
206 logging.error("handle_write called with empty output queue")
208 (ip, port, payload) = self._out_queue[0]
209 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
210 self._out_queue.pop(0)
212 def enqueue_send(self, ip, port, payload):
213 """Enqueue a datagram to be sent when possible
216 if len(payload) > constants.MAX_UDP_DATA_SIZE:
217 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
218 constants.MAX_UDP_DATA_SIZE))
219 self._out_queue.append((ip, port, payload))
221 def process_next_packet(self, timeout=0):
222 """Process the next datagram, waiting for it if necessary.
225 @param timeout: how long to wait for data
227 @return: True if some data has been handled, False otherwise
230 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
231 if result is not None and result & select.POLLIN:
238 class Mainloop(object):
239 """Generic mainloop for daemons
241 @ivar scheduler: A sched.scheduler object, which can be used to register
246 """Constructs a new Mainloop instance.
249 self._signal_wait = []
250 self.scheduler = AsyncoreScheduler(time.time)
252 @utils.SignalHandled([signal.SIGCHLD])
253 @utils.SignalHandled([signal.SIGTERM])
254 @utils.SignalHandled([signal.SIGINT])
255 def Run(self, signal_handlers=None):
256 """Runs the mainloop.
258 @type signal_handlers: dict
259 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
262 assert isinstance(signal_handlers, dict) and \
263 len(signal_handlers) > 0, \
264 "Broken SignalHandled decorator"
266 # Start actual main loop
268 if not self.scheduler.empty():
271 except SchedulerBreakout:
274 asyncore.loop(count=1, use_poll=True)
276 # Check whether a signal was raised
277 for sig in signal_handlers:
278 handler = signal_handlers[sig]
280 self._CallSignalWaiters(sig)
281 running = sig not in (signal.SIGTERM, signal.SIGINT)
284 def _CallSignalWaiters(self, signum):
285 """Calls all signal waiters for a certain signal.
288 @param signum: Signal number
291 for owner in self._signal_wait:
292 owner.OnSignal(signum)
294 def RegisterSignal(self, owner):
295 """Registers a receiver for signal notifications
297 The receiver must support a "OnSignal(self, signum)" function.
299 @type owner: instance
300 @param owner: Receiver
303 self._signal_wait.append(owner)
306 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
307 multithreaded=False, console_logging=False,
308 default_ssl_cert=None, default_ssl_key=None):
309 """Shared main function for daemons.
311 @type daemon_name: string
312 @param daemon_name: daemon name
313 @type optionparser: optparse.OptionParser
314 @param optionparser: initialized optionparser with daemon-specific options
315 (common -f -d options will be handled by this module)
316 @type dirs: list of (string, integer)
317 @param dirs: list of directories that must be created if they don't exist,
318 and the permissions to be used to create them
319 @type check_fn: function which accepts (options, args)
320 @param check_fn: function that checks start conditions and exits if they're
322 @type exec_fn: function which accepts (options, args)
323 @param exec_fn: function that's executed with the daemon's pid file held, and
324 runs the daemon itself.
325 @type multithreaded: bool
326 @param multithreaded: Whether the daemon uses threads
327 @type console_logging: boolean
328 @param console_logging: if True, the daemon will fall back to the system
329 console if logging fails
330 @type default_ssl_cert: string
331 @param default_ssl_cert: Default SSL certificate path
332 @type default_ssl_key: string
333 @param default_ssl_key: Default SSL key path
336 optionparser.add_option("-f", "--foreground", dest="fork",
337 help="Don't detach from the current terminal",
338 default=True, action="store_false")
339 optionparser.add_option("-d", "--debug", dest="debug",
340 help="Enable some debug messages",
341 default=False, action="store_true")
342 optionparser.add_option("--syslog", dest="syslog",
343 help="Enable logging to syslog (except debug"
344 " messages); one of 'no', 'yes' or 'only' [%s]" %
345 constants.SYSLOG_USAGE,
346 default=constants.SYSLOG_USAGE,
347 choices=["no", "yes", "only"])
349 if daemon_name in constants.DAEMONS_PORTS:
350 default_bind_address = "0.0.0.0"
351 default_port = utils.GetDaemonPort(daemon_name)
353 # For networked daemons we allow choosing the port and bind address
354 optionparser.add_option("-p", "--port", dest="port",
355 help="Network port (default: %s)" % default_port,
356 default=default_port, type="int")
357 optionparser.add_option("-b", "--bind", dest="bind_address",
358 help=("Bind address (default: %s)" %
359 default_bind_address),
360 default=default_bind_address, metavar="ADDRESS")
362 if default_ssl_key is not None and default_ssl_cert is not None:
363 optionparser.add_option("--no-ssl", dest="ssl",
364 help="Do not secure HTTP protocol with SSL",
365 default=True, action="store_false")
366 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
367 help=("SSL key path (default: %s)" %
369 default=default_ssl_key, type="string",
370 metavar="SSL_KEY_PATH")
371 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
372 help=("SSL certificate path (default: %s)" %
374 default=default_ssl_cert, type="string",
375 metavar="SSL_CERT_PATH")
377 # Disable the use of fork(2) if the daemon uses threads
378 utils.no_fork = multithreaded
380 options, args = optionparser.parse_args()
382 if getattr(options, "ssl", False):
384 "certificate": options.ssl_cert,
385 "key": options.ssl_key,
388 for name, path in ssl_paths.iteritems():
389 if not os.path.isfile(path):
390 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
391 sys.exit(constants.EXIT_FAILURE)
393 # TODO: By initiating http.HttpSslParams here we would only read the files
394 # once and have a proper validation (isfile returns False on directories)
397 if check_fn is not None:
398 check_fn(options, args)
400 utils.EnsureDirs(dirs)
404 utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
406 utils.WritePidFile(daemon_name)
408 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
410 stderr_logging=not options.fork,
411 multithreaded=multithreaded,
413 syslog=options.syslog,
414 console_logging=console_logging)
415 logging.info("%s daemon startup", daemon_name)
416 exec_fn(options, args)
418 utils.RemovePidFile(daemon_name)