4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
38 from ganeti import utils
39 from ganeti import constants
40 from ganeti import errors
43 _DEFAULT_RUN_USER = "root"
44 _DEFAULT_RUN_GROUP = "root"
47 class SchedulerBreakout(Exception):
48 """Exception used to get out of the scheduler loop
53 def AsyncoreDelayFunction(timeout):
54 """Asyncore-compatible scheduler delay function.
56 This is a delay function for sched that, rather than actually sleeping,
57 executes asyncore events happening in the meantime.
59 After an event has occurred, rather than returning, it raises a
60 SchedulerBreakout exception, which will force the current scheduler.run()
61 invocation to terminate, so that we can also check for signals. The main loop
62 will then call the scheduler run again, which will allow it to actually
63 process any due events.
65 This is needed because scheduler.run() doesn't support a count=..., as
66 asyncore loop, and the scheduler module documents throwing exceptions from
67 inside the delay function as an allowed usage model.
70 asyncore.loop(timeout=timeout, count=1, use_poll=True)
71 raise SchedulerBreakout()
74 class AsyncoreScheduler(sched.scheduler):
75 """Event scheduler integrated with asyncore
78 def __init__(self, timefunc):
79 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
82 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
83 """Base Ganeti Asyncore Dispacher
86 # this method is overriding an asyncore.dispatcher method
87 def handle_error(self):
88 """Log an error in handling any request, and proceed.
91 logging.exception("Error while handling asyncore request")
93 # this method is overriding an asyncore.dispatcher method
95 """Most of the time we don't want to check for writability.
101 def FormatAddress(family, address):
102 """Format a client's address
104 @type family: integer
105 @param family: socket family (one of socket.AF_*)
106 @type address: family specific (usually tuple)
107 @param address: address, as reported by this class
110 if family == socket.AF_INET and len(address) == 2:
111 return "%s:%d" % address
112 elif family == socket.AF_UNIX and len(address) == 3:
113 return "pid=%s, uid=%s, gid=%s" % address
118 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
119 """A stream server to use with asyncore.
121 Each request is accepted, and then dispatched to a separate asyncore
122 dispatcher to handle.
126 _REQUEST_QUEUE_SIZE = 5
128 def __init__(self, family, address):
129 """Constructor for AsyncUnixStreamSocket
131 @type family: integer
132 @param family: socket family (one of socket.AF_*)
133 @type address: address family dependent
134 @param address: address to bind the socket to
137 GanetiBaseAsyncoreDispatcher.__init__(self)
139 self.create_socket(self.family, socket.SOCK_STREAM)
140 self.set_reuse_addr()
142 self.listen(self._REQUEST_QUEUE_SIZE)
144 # this method is overriding an asyncore.dispatcher method
145 def handle_accept(self):
146 """Accept a new client connection.
148 Creates a new instance of the handler class, which will use asyncore to
152 accept_result = utils.IgnoreSignals(self.accept)
153 if accept_result is not None:
154 connected_socket, client_address = accept_result
155 if self.family == socket.AF_UNIX:
156 # override the client address, as for unix sockets nothing meaningful
157 # is passed in from accept anyway
158 client_address = utils.GetSocketCredentials(connected_socket)
159 logging.info("Accepted connection from %s",
160 FormatAddress(self.family, client_address))
161 self.handle_connection(connected_socket, client_address)
163 def handle_connection(self, connected_socket, client_address):
164 """Handle an already accepted connection.
167 raise NotImplementedError
170 class AsyncTerminatedMessageStream(asynchat.async_chat):
171 """A terminator separated message stream asyncore module.
173 Handles a stream connection receiving messages terminated by a defined
174 separator. For each complete message handle_message is called.
177 def __init__(self, connected_socket, peer_address, terminator, family):
178 """AsyncTerminatedMessageStream constructor.
180 @type connected_socket: socket.socket
181 @param connected_socket: connected stream socket to receive messages from
182 @param peer_address: family-specific peer address
183 @type terminator: string
184 @param terminator: terminator separating messages in the stream
185 @type family: integer
186 @param family: socket family
189 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
190 # using a positional argument rather than a keyword one.
191 asynchat.async_chat.__init__(self, connected_socket)
192 self.connected_socket = connected_socket
193 # on python 2.4 there is no "family" attribute for the socket class
194 # FIXME: when we move to python 2.5 or above remove the family parameter
195 #self.family = self.connected_socket.family
197 self.peer_address = peer_address
198 self.terminator = terminator
199 self.set_terminator(terminator)
201 self.next_incoming_message = 0
203 # this method is overriding an asynchat.async_chat method
204 def collect_incoming_data(self, data):
205 self.ibuffer.append(data)
207 # this method is overriding an asynchat.async_chat method
208 def found_terminator(self):
209 message = "".join(self.ibuffer)
211 message_id = self.next_incoming_message
212 self.next_incoming_message += 1
213 self.handle_message(message, message_id)
215 def handle_message(self, message, message_id):
216 """Handle a terminated message.
218 @type message: string
219 @param message: message to handle
220 @type message_id: integer
221 @param message_id: stream's message sequence number
225 # TODO: move this method to raise NotImplementedError
226 # raise NotImplementedError
229 logging.info("Closing connection from %s",
230 FormatAddress(self.family, self.peer_address))
233 # this method is overriding an asyncore.dispatcher method
234 def handle_expt(self):
237 # this method is overriding an asyncore.dispatcher method
238 def handle_error(self):
239 """Log an error in handling any request, and proceed.
242 logging.exception("Error while handling asyncore request")
246 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
247 """An improved asyncore udp socket.
251 """Constructor for AsyncUDPSocket
254 GanetiBaseAsyncoreDispatcher.__init__(self)
256 self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
258 # this method is overriding an asyncore.dispatcher method
259 def handle_connect(self):
260 # Python thinks that the first udp message from a source qualifies as a
261 # "connect" and further ones are part of the same connection. We beg to
262 # differ and treat all messages equally.
265 # this method is overriding an asyncore.dispatcher method
266 def handle_read(self):
267 recv_result = utils.IgnoreSignals(self.recvfrom,
268 constants.MAX_UDP_DATA_SIZE)
269 if recv_result is not None:
270 payload, address = recv_result
272 self.handle_datagram(payload, ip, port)
274 def handle_datagram(self, payload, ip, port):
275 """Handle an already read udp datagram
278 raise NotImplementedError
280 # this method is overriding an asyncore.dispatcher method
282 # We should check whether we can write to the socket only if we have
283 # something scheduled to be written
284 return bool(self._out_queue)
286 # this method is overriding an asyncore.dispatcher method
287 def handle_write(self):
288 if not self._out_queue:
289 logging.error("handle_write called with empty output queue")
291 (ip, port, payload) = self._out_queue[0]
292 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
293 self._out_queue.pop(0)
295 def enqueue_send(self, ip, port, payload):
296 """Enqueue a datagram to be sent when possible
299 if len(payload) > constants.MAX_UDP_DATA_SIZE:
300 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
301 constants.MAX_UDP_DATA_SIZE))
302 self._out_queue.append((ip, port, payload))
304 def process_next_packet(self, timeout=0):
305 """Process the next datagram, waiting for it if necessary.
308 @param timeout: how long to wait for data
310 @return: True if some data has been handled, False otherwise
313 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
314 if result is not None and result & select.POLLIN:
321 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
322 """A way to notify the asyncore loop that something is going on.
324 If an asyncore daemon is multithreaded when a thread tries to push some data
325 to a socket, the main loop handling asynchronous requests might be sleeping
326 waiting on a select(). To avoid this it can create an instance of the
327 AsyncAwaker, which other threads can use to wake it up.
330 def __init__(self, signal_fn=None):
331 """Constructor for AsyncAwaker
333 @type signal_fn: function
334 @param signal_fn: function to call when awaken
337 GanetiBaseAsyncoreDispatcher.__init__(self)
338 assert signal_fn == None or callable(signal_fn)
339 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
341 self.in_socket.setblocking(0)
342 self.set_socket(self.in_socket)
343 self.need_signal = True
344 self.signal_fn = signal_fn
345 self.connected = True
347 # this method is overriding an asyncore.dispatcher method
348 def handle_read(self):
349 utils.IgnoreSignals(self.recv, 4096)
352 self.need_signal = True
354 # this method is overriding an asyncore.dispatcher method
356 asyncore.dispatcher.close(self)
357 self.out_socket.close()
360 """Signal the asyncore main loop.
362 Any data we send here will be ignored, but it will cause the select() call
366 # Yes, there is a race condition here. No, we don't care, at worst we're
367 # sending more than one wakeup token, which doesn't harm at all.
369 self.need_signal = False
370 self.out_socket.send("\0")
373 class Mainloop(object):
374 """Generic mainloop for daemons
376 @ivar scheduler: A sched.scheduler object, which can be used to register
381 """Constructs a new Mainloop instance.
384 self._signal_wait = []
385 self.scheduler = AsyncoreScheduler(time.time)
387 @utils.SignalHandled([signal.SIGCHLD])
388 @utils.SignalHandled([signal.SIGTERM])
389 @utils.SignalHandled([signal.SIGINT])
390 def Run(self, signal_handlers=None):
391 """Runs the mainloop.
393 @type signal_handlers: dict
394 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
397 assert isinstance(signal_handlers, dict) and \
398 len(signal_handlers) > 0, \
399 "Broken SignalHandled decorator"
401 # Start actual main loop
403 if not self.scheduler.empty():
406 except SchedulerBreakout:
409 asyncore.loop(count=1, use_poll=True)
411 # Check whether a signal was raised
412 for sig in signal_handlers:
413 handler = signal_handlers[sig]
415 self._CallSignalWaiters(sig)
416 running = sig not in (signal.SIGTERM, signal.SIGINT)
419 def _CallSignalWaiters(self, signum):
420 """Calls all signal waiters for a certain signal.
423 @param signum: Signal number
426 for owner in self._signal_wait:
427 owner.OnSignal(signum)
429 def RegisterSignal(self, owner):
430 """Registers a receiver for signal notifications
432 The receiver must support a "OnSignal(self, signum)" function.
434 @type owner: instance
435 @param owner: Receiver
438 self._signal_wait.append(owner)
441 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
442 multithreaded=False, console_logging=False,
443 default_ssl_cert=None, default_ssl_key=None,
444 user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
445 """Shared main function for daemons.
447 @type daemon_name: string
448 @param daemon_name: daemon name
449 @type optionparser: optparse.OptionParser
450 @param optionparser: initialized optionparser with daemon-specific options
451 (common -f -d options will be handled by this module)
452 @type dirs: list of (string, integer)
453 @param dirs: list of directories that must be created if they don't exist,
454 and the permissions to be used to create them
455 @type check_fn: function which accepts (options, args)
456 @param check_fn: function that checks start conditions and exits if they're
458 @type exec_fn: function which accepts (options, args)
459 @param exec_fn: function that's executed with the daemon's pid file held, and
460 runs the daemon itself.
461 @type multithreaded: bool
462 @param multithreaded: Whether the daemon uses threads
463 @type console_logging: boolean
464 @param console_logging: if True, the daemon will fall back to the system
465 console if logging fails
466 @type default_ssl_cert: string
467 @param default_ssl_cert: Default SSL certificate path
468 @type default_ssl_key: string
469 @param default_ssl_key: Default SSL key path
470 @param user: Default user to run as
472 @param group: Default group to run as
476 optionparser.add_option("-f", "--foreground", dest="fork",
477 help="Don't detach from the current terminal",
478 default=True, action="store_false")
479 optionparser.add_option("-d", "--debug", dest="debug",
480 help="Enable some debug messages",
481 default=False, action="store_true")
482 optionparser.add_option("--syslog", dest="syslog",
483 help="Enable logging to syslog (except debug"
484 " messages); one of 'no', 'yes' or 'only' [%s]" %
485 constants.SYSLOG_USAGE,
486 default=constants.SYSLOG_USAGE,
487 choices=["no", "yes", "only"])
489 if daemon_name in constants.DAEMONS_PORTS:
490 default_bind_address = "0.0.0.0"
491 default_port = utils.GetDaemonPort(daemon_name)
493 # For networked daemons we allow choosing the port and bind address
494 optionparser.add_option("-p", "--port", dest="port",
495 help="Network port (default: %s)" % default_port,
496 default=default_port, type="int")
497 optionparser.add_option("-b", "--bind", dest="bind_address",
498 help=("Bind address (default: %s)" %
499 default_bind_address),
500 default=default_bind_address, metavar="ADDRESS")
502 if default_ssl_key is not None and default_ssl_cert is not None:
503 optionparser.add_option("--no-ssl", dest="ssl",
504 help="Do not secure HTTP protocol with SSL",
505 default=True, action="store_false")
506 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
507 help=("SSL key path (default: %s)" %
509 default=default_ssl_key, type="string",
510 metavar="SSL_KEY_PATH")
511 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
512 help=("SSL certificate path (default: %s)" %
514 default=default_ssl_cert, type="string",
515 metavar="SSL_CERT_PATH")
517 # Disable the use of fork(2) if the daemon uses threads
518 utils.no_fork = multithreaded
520 options, args = optionparser.parse_args()
522 if getattr(options, "ssl", False):
524 "certificate": options.ssl_cert,
525 "key": options.ssl_key,
528 for name, path in ssl_paths.iteritems():
529 if not os.path.isfile(path):
530 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
531 sys.exit(constants.EXIT_FAILURE)
533 # TODO: By initiating http.HttpSslParams here we would only read the files
534 # once and have a proper validation (isfile returns False on directories)
537 if check_fn is not None:
538 check_fn(options, args)
540 utils.EnsureDirs(dirs)
544 uid = pwd.getpwnam(user).pw_uid
545 gid = grp.getgrnam(group).gr_gid
547 raise errors.ConfigurationError("User or group not existing on system:"
548 " %s:%s" % (user, group))
550 utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
552 utils.WritePidFile(daemon_name)
554 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
556 stderr_logging=not options.fork,
557 multithreaded=multithreaded,
559 syslog=options.syslog,
560 console_logging=console_logging)
561 logging.info("%s daemon startup", daemon_name)
562 exec_fn(options, args)
564 utils.RemovePidFile(daemon_name)