4 # Copyright (C) 2006, 2007, 2008, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import netutils
41 from ganeti import ssconf
42 from ganeti import runtime
43 from ganeti import compat
46 class SchedulerBreakout(Exception):
47 """Exception used to get out of the scheduler loop
52 def AsyncoreDelayFunction(timeout):
53 """Asyncore-compatible scheduler delay function.
55 This is a delay function for sched that, rather than actually sleeping,
56 executes asyncore events happening in the meantime.
58 After an event has occurred, rather than returning, it raises a
59 SchedulerBreakout exception, which will force the current scheduler.run()
60 invocation to terminate, so that we can also check for signals. The main loop
61 will then call the scheduler run again, which will allow it to actually
62 process any due events.
64 This is needed because scheduler.run() doesn't support a count=..., as
65 asyncore loop, and the scheduler module documents throwing exceptions from
66 inside the delay function as an allowed usage model.
69 asyncore.loop(timeout=timeout, count=1, use_poll=True)
70 raise SchedulerBreakout()
73 class AsyncoreScheduler(sched.scheduler):
74 """Event scheduler integrated with asyncore
77 def __init__(self, timefunc):
78 """Initializes this class.
81 sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
82 self._max_delay = None
84 def run(self, max_delay=None): # pylint: disable=W0221
85 """Run any pending events.
87 @type max_delay: None or number
88 @param max_delay: Maximum delay (useful if caller has timeouts running)
91 assert self._max_delay is None
93 # The delay function used by the scheduler can't be different on each run,
94 # hence an instance variable must be used.
96 self._max_delay = None
98 self._max_delay = utils.RunningTimeout(max_delay, False)
101 return sched.scheduler.run(self)
103 self._max_delay = None
105 def _LimitedDelay(self, duration):
106 """Custom delay function for C{sched.scheduler}.
109 if self._max_delay is None:
112 timeout = min(duration, self._max_delay.Remaining())
114 return AsyncoreDelayFunction(timeout)
117 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
118 """Base Ganeti Asyncore Dispacher
121 # this method is overriding an asyncore.dispatcher method
122 def handle_error(self):
123 """Log an error in handling any request, and proceed.
126 logging.exception("Error while handling asyncore request")
128 # this method is overriding an asyncore.dispatcher method
130 """Most of the time we don't want to check for writability.
136 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
137 """A stream server to use with asyncore.
139 Each request is accepted, and then dispatched to a separate asyncore
140 dispatcher to handle.
144 _REQUEST_QUEUE_SIZE = 5
146 def __init__(self, family, address):
147 """Constructor for AsyncUnixStreamSocket
149 @type family: integer
150 @param family: socket family (one of socket.AF_*)
151 @type address: address family dependent
152 @param address: address to bind the socket to
155 GanetiBaseAsyncoreDispatcher.__init__(self)
157 self.create_socket(self.family, socket.SOCK_STREAM)
158 self.set_reuse_addr()
160 self.listen(self._REQUEST_QUEUE_SIZE)
162 # this method is overriding an asyncore.dispatcher method
163 def handle_accept(self):
164 """Accept a new client connection.
166 Creates a new instance of the handler class, which will use asyncore to
170 accept_result = utils.IgnoreSignals(self.accept)
171 if accept_result is not None:
172 connected_socket, client_address = accept_result
173 if self.family == socket.AF_UNIX:
174 # override the client address, as for unix sockets nothing meaningful
175 # is passed in from accept anyway
176 client_address = netutils.GetSocketCredentials(connected_socket)
177 logging.info("Accepted connection from %s",
178 netutils.FormatAddress(client_address, family=self.family))
179 self.handle_connection(connected_socket, client_address)
181 def handle_connection(self, connected_socket, client_address):
182 """Handle an already accepted connection.
185 raise NotImplementedError
188 class AsyncTerminatedMessageStream(asynchat.async_chat):
189 """A terminator separated message stream asyncore module.
191 Handles a stream connection receiving messages terminated by a defined
192 separator. For each complete message handle_message is called.
195 def __init__(self, connected_socket, peer_address, terminator, family,
197 """AsyncTerminatedMessageStream constructor.
199 @type connected_socket: socket.socket
200 @param connected_socket: connected stream socket to receive messages from
201 @param peer_address: family-specific peer address
202 @type terminator: string
203 @param terminator: terminator separating messages in the stream
204 @type family: integer
205 @param family: socket family
206 @type unhandled_limit: integer or None
207 @param unhandled_limit: maximum unanswered messages
210 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
211 # using a positional argument rather than a keyword one.
212 asynchat.async_chat.__init__(self, connected_socket)
213 self.connected_socket = connected_socket
214 # on python 2.4 there is no "family" attribute for the socket class
215 # FIXME: when we move to python 2.5 or above remove the family parameter
216 #self.family = self.connected_socket.family
218 self.peer_address = peer_address
219 self.terminator = terminator
220 self.unhandled_limit = unhandled_limit
221 self.set_terminator(terminator)
223 self.receive_count = 0
225 self.oqueue = collections.deque()
226 self.iqueue = collections.deque()
228 # this method is overriding an asynchat.async_chat method
229 def collect_incoming_data(self, data):
230 self.ibuffer.append(data)
232 def _can_handle_message(self):
233 return (self.unhandled_limit is None or
234 (self.receive_count < self.send_count + self.unhandled_limit) and
237 # this method is overriding an asynchat.async_chat method
238 def found_terminator(self):
239 message = "".join(self.ibuffer)
241 message_id = self.receive_count
242 # We need to increase the receive_count after checking if the message can
243 # be handled, but before calling handle_message
244 can_handle = self._can_handle_message()
245 self.receive_count += 1
247 self.handle_message(message, message_id)
249 self.iqueue.append((message, message_id))
251 def handle_message(self, message, message_id):
252 """Handle a terminated message.
254 @type message: string
255 @param message: message to handle
256 @type message_id: integer
257 @param message_id: stream's message sequence number
261 # TODO: move this method to raise NotImplementedError
262 # raise NotImplementedError
264 def send_message(self, message):
265 """Send a message to the remote peer. This function is thread-safe.
267 @type message: string
268 @param message: message to send, without the terminator
270 @warning: If calling this function from a thread different than the one
271 performing the main asyncore loop, remember that you have to wake that one
275 # If we just append the message we received to the output queue, this
276 # function can be safely called by multiple threads at the same time, and
277 # we don't need locking, since deques are thread safe. handle_write in the
278 # asyncore thread will handle the next input message if there are any
280 self.oqueue.append(message)
282 # this method is overriding an asyncore.dispatcher method
284 # read from the socket if we can handle the next requests
285 return self._can_handle_message() and asynchat.async_chat.readable(self)
287 # this method is overriding an asyncore.dispatcher method
289 # the output queue may become full just after we called writable. This only
290 # works if we know we'll have something else waking us up from the select,
291 # in such case, anyway.
292 return asynchat.async_chat.writable(self) or self.oqueue
294 # this method is overriding an asyncore.dispatcher method
295 def handle_write(self):
297 # if we have data in the output queue, then send_message was called.
298 # this means we can process one more message from the input queue, if
300 data = self.oqueue.popleft()
301 self.push(data + self.terminator)
304 self.handle_message(*self.iqueue.popleft())
308 logging.info("Closing connection from %s",
309 netutils.FormatAddress(self.peer_address, family=self.family))
312 # this method is overriding an asyncore.dispatcher method
313 def handle_expt(self):
316 # this method is overriding an asyncore.dispatcher method
317 def handle_error(self):
318 """Log an error in handling any request, and proceed.
321 logging.exception("Error while handling asyncore request")
325 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
326 """An improved asyncore udp socket.
329 def __init__(self, family):
330 """Constructor for AsyncUDPSocket
333 GanetiBaseAsyncoreDispatcher.__init__(self)
335 self._family = family
336 self.create_socket(family, socket.SOCK_DGRAM)
338 # this method is overriding an asyncore.dispatcher method
339 def handle_connect(self):
340 # Python thinks that the first udp message from a source qualifies as a
341 # "connect" and further ones are part of the same connection. We beg to
342 # differ and treat all messages equally.
345 # this method is overriding an asyncore.dispatcher method
346 def handle_read(self):
347 recv_result = utils.IgnoreSignals(self.recvfrom,
348 constants.MAX_UDP_DATA_SIZE)
349 if recv_result is not None:
350 payload, address = recv_result
351 if self._family == socket.AF_INET6:
352 # we ignore 'flow info' and 'scope id' as we don't need them
353 ip, port, _, _ = address
357 self.handle_datagram(payload, ip, port)
359 def handle_datagram(self, payload, ip, port):
360 """Handle an already read udp datagram
363 raise NotImplementedError
365 # this method is overriding an asyncore.dispatcher method
367 # We should check whether we can write to the socket only if we have
368 # something scheduled to be written
369 return bool(self._out_queue)
371 # this method is overriding an asyncore.dispatcher method
372 def handle_write(self):
373 if not self._out_queue:
374 logging.error("handle_write called with empty output queue")
376 (ip, port, payload) = self._out_queue[0]
377 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
378 self._out_queue.pop(0)
380 def enqueue_send(self, ip, port, payload):
381 """Enqueue a datagram to be sent when possible
384 if len(payload) > constants.MAX_UDP_DATA_SIZE:
385 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
386 constants.MAX_UDP_DATA_SIZE))
387 self._out_queue.append((ip, port, payload))
389 def process_next_packet(self, timeout=0):
390 """Process the next datagram, waiting for it if necessary.
393 @param timeout: how long to wait for data
395 @return: True if some data has been handled, False otherwise
398 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
399 if result is not None and result & select.POLLIN:
406 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
407 """A way to notify the asyncore loop that something is going on.
409 If an asyncore daemon is multithreaded when a thread tries to push some data
410 to a socket, the main loop handling asynchronous requests might be sleeping
411 waiting on a select(). To avoid this it can create an instance of the
412 AsyncAwaker, which other threads can use to wake it up.
415 def __init__(self, signal_fn=None):
416 """Constructor for AsyncAwaker
418 @type signal_fn: function
419 @param signal_fn: function to call when awaken
422 GanetiBaseAsyncoreDispatcher.__init__(self)
423 assert signal_fn == None or callable(signal_fn)
424 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
426 self.in_socket.setblocking(0)
427 self.in_socket.shutdown(socket.SHUT_WR)
428 self.out_socket.shutdown(socket.SHUT_RD)
429 self.set_socket(self.in_socket)
430 self.need_signal = True
431 self.signal_fn = signal_fn
432 self.connected = True
434 # this method is overriding an asyncore.dispatcher method
435 def handle_read(self):
436 utils.IgnoreSignals(self.recv, 4096)
439 self.need_signal = True
441 # this method is overriding an asyncore.dispatcher method
443 asyncore.dispatcher.close(self)
444 self.out_socket.close()
447 """Signal the asyncore main loop.
449 Any data we send here will be ignored, but it will cause the select() call
453 # Yes, there is a race condition here. No, we don't care, at worst we're
454 # sending more than one wakeup token, which doesn't harm at all.
456 self.need_signal = False
457 self.out_socket.send("\0")
460 class _ShutdownCheck:
461 """Logic for L{Mainloop} shutdown.
464 def __init__(self, fn):
465 """Initializes this class.
468 @param fn: Function returning C{None} if mainloop can be stopped or a
469 duration in seconds after which the function should be called again
470 @see: L{Mainloop.Run}
478 def CanShutdown(self):
479 """Checks whether mainloop can be stopped.
484 if self._defer and self._defer.Remaining() > 0:
485 # A deferred check has already been scheduled
488 # Ask mainloop driver whether we can stop or should check again
492 # Yes, can stop mainloop
495 # Schedule another check in the future
496 self._defer = utils.RunningTimeout(timeout, True)
501 class Mainloop(object):
502 """Generic mainloop for daemons
504 @ivar scheduler: A sched.scheduler object, which can be used to register
508 _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
511 """Constructs a new Mainloop instance.
514 self._signal_wait = []
515 self.scheduler = AsyncoreScheduler(time.time)
517 # Resolve uid/gids used
520 @utils.SignalHandled([signal.SIGCHLD])
521 @utils.SignalHandled([signal.SIGTERM])
522 @utils.SignalHandled([signal.SIGINT])
523 def Run(self, shutdown_wait_fn=None, signal_handlers=None):
524 """Runs the mainloop.
526 @type shutdown_wait_fn: callable
527 @param shutdown_wait_fn: Function to check whether loop can be terminated;
528 B{important}: function must be idempotent and must return either None
529 for shutting down or a timeout for another call
530 @type signal_handlers: dict
531 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
534 assert isinstance(signal_handlers, dict) and \
535 len(signal_handlers) > 0, \
536 "Broken SignalHandled decorator"
538 # Counter for received signals
541 # Logic to wait for shutdown
542 shutdown_waiter = None
544 # Start actual main loop
546 if shutdown_signals == 1 and shutdown_wait_fn is not None:
547 if shutdown_waiter is None:
548 shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)
550 # Let mainloop driver decide if we can already abort
551 if shutdown_waiter.CanShutdown():
554 # Re-evaluate in a second
557 elif shutdown_signals >= 1:
558 # Abort loop if more than one signal has been sent or no callback has
563 # Wait forever on I/O events
566 if self.scheduler.empty():
567 asyncore.loop(count=1, timeout=timeout, use_poll=True)
570 self.scheduler.run(max_delay=timeout)
571 except SchedulerBreakout:
574 # Check whether a signal was raised
575 for (sig, handler) in signal_handlers.items():
577 self._CallSignalWaiters(sig)
578 if sig in (signal.SIGTERM, signal.SIGINT):
579 logging.info("Received signal %s asking for shutdown", sig)
580 shutdown_signals += 1
583 def _CallSignalWaiters(self, signum):
584 """Calls all signal waiters for a certain signal.
587 @param signum: Signal number
590 for owner in self._signal_wait:
591 owner.OnSignal(signum)
593 def RegisterSignal(self, owner):
594 """Registers a receiver for signal notifications
596 The receiver must support a "OnSignal(self, signum)" function.
598 @type owner: instance
599 @param owner: Receiver
602 self._signal_wait.append(owner)
605 def _VerifyDaemonUser(daemon_name):
606 """Verifies the process uid matches the configured uid.
608 This method verifies that a daemon is started as the user it is
611 @param daemon_name: The name of daemon to be started
612 @return: A tuple with the first item indicating success or not,
613 the second item current uid and third with expected uid
616 getents = runtime.GetEnts()
617 running_uid = os.getuid()
619 constants.MASTERD: getents.masterd_uid,
620 constants.RAPI: getents.rapi_uid,
621 constants.NODED: getents.noded_uid,
622 constants.CONFD: getents.confd_uid,
625 return (daemon_uids[daemon_name] == running_uid, running_uid,
626 daemon_uids[daemon_name])
629 def _BeautifyError(err):
630 """Try to format an error better.
632 Since we're dealing with daemon startup errors, in many cases this
633 will be due to socket error and such, so we try to format these cases better.
635 @param err: an exception object
637 @return: the formatted error description
641 if isinstance(err, socket.error):
642 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
643 elif isinstance(err, EnvironmentError):
644 if err.filename is None:
645 return "%s (errno=%s)" % (err.strerror, err.errno)
647 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
651 except Exception: # pylint: disable=W0703
652 logging.exception("Error while handling existing error %s", err)
653 return "%s" % str(err)
656 def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
657 """Handler for SIGHUP.
659 @param reopen_fn: List of callback functions for reopening log files
662 logging.info("Reopening log files after receiving SIGHUP")
669 def GenericMain(daemon_name, optionparser,
670 check_fn, prepare_fn, exec_fn,
671 multithreaded=False, console_logging=False,
672 default_ssl_cert=None, default_ssl_key=None):
673 """Shared main function for daemons.
675 @type daemon_name: string
676 @param daemon_name: daemon name
677 @type optionparser: optparse.OptionParser
678 @param optionparser: initialized optionparser with daemon-specific options
679 (common -f -d options will be handled by this module)
680 @type check_fn: function which accepts (options, args)
681 @param check_fn: function that checks start conditions and exits if they're
683 @type prepare_fn: function which accepts (options, args)
684 @param prepare_fn: function that is run before forking, or None;
685 it's result will be passed as the third parameter to exec_fn, or
686 if None was passed in, we will just pass None to exec_fn
687 @type exec_fn: function which accepts (options, args, prepare_results)
688 @param exec_fn: function that's executed with the daemon's pid file held, and
689 runs the daemon itself.
690 @type multithreaded: bool
691 @param multithreaded: Whether the daemon uses threads
692 @type console_logging: boolean
693 @param console_logging: if True, the daemon will fall back to the system
694 console if logging fails
695 @type default_ssl_cert: string
696 @param default_ssl_cert: Default SSL certificate path
697 @type default_ssl_key: string
698 @param default_ssl_key: Default SSL key path
701 optionparser.add_option("-f", "--foreground", dest="fork",
702 help="Don't detach from the current terminal",
703 default=True, action="store_false")
704 optionparser.add_option("-d", "--debug", dest="debug",
705 help="Enable some debug messages",
706 default=False, action="store_true")
707 optionparser.add_option("--syslog", dest="syslog",
708 help="Enable logging to syslog (except debug"
709 " messages); one of 'no', 'yes' or 'only' [%s]" %
710 constants.SYSLOG_USAGE,
711 default=constants.SYSLOG_USAGE,
712 choices=["no", "yes", "only"])
714 if daemon_name in constants.DAEMONS_PORTS:
715 default_bind_address = constants.IP4_ADDRESS_ANY
716 family = ssconf.SimpleStore().GetPrimaryIPFamily()
717 # family will default to AF_INET if there is no ssconf file (e.g. when
718 # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
719 # <= 2.2 can not be AF_INET6
720 if family == netutils.IP6Address.family:
721 default_bind_address = constants.IP6_ADDRESS_ANY
723 default_port = netutils.GetDaemonPort(daemon_name)
725 # For networked daemons we allow choosing the port and bind address
726 optionparser.add_option("-p", "--port", dest="port",
727 help="Network port (default: %s)" % default_port,
728 default=default_port, type="int")
729 optionparser.add_option("-b", "--bind", dest="bind_address",
730 help=("Bind address (default: '%s')" %
731 default_bind_address),
732 default=default_bind_address, metavar="ADDRESS")
734 if default_ssl_key is not None and default_ssl_cert is not None:
735 optionparser.add_option("--no-ssl", dest="ssl",
736 help="Do not secure HTTP protocol with SSL",
737 default=True, action="store_false")
738 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
739 help=("SSL key path (default: %s)" %
741 default=default_ssl_key, type="string",
742 metavar="SSL_KEY_PATH")
743 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
744 help=("SSL certificate path (default: %s)" %
746 default=default_ssl_cert, type="string",
747 metavar="SSL_CERT_PATH")
749 # Disable the use of fork(2) if the daemon uses threads
753 options, args = optionparser.parse_args()
755 if getattr(options, "ssl", False):
757 "certificate": options.ssl_cert,
758 "key": options.ssl_key,
761 for name, path in ssl_paths.iteritems():
762 if not os.path.isfile(path):
763 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
764 sys.exit(constants.EXIT_FAILURE)
766 # TODO: By initiating http.HttpSslParams here we would only read the files
767 # once and have a proper validation (isfile returns False on directories)
770 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
772 msg = ("%s started using wrong user ID (%d), expected %d" %
773 (daemon_name, running_uid, expected_uid))
774 print >> sys.stderr, msg
775 sys.exit(constants.EXIT_FAILURE)
777 if check_fn is not None:
778 check_fn(options, args)
782 (wpipe, stdio_reopen_fn) = \
783 utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
785 (wpipe, stdio_reopen_fn) = (None, None)
788 utils.SetupLogging(constants.DAEMONS_LOGFILES[daemon_name], daemon_name,
790 stderr_logging=not options.fork,
791 multithreaded=multithreaded,
792 syslog=options.syslog,
793 console_logging=console_logging)
795 # Reopen log file(s) on SIGHUP
796 signal.signal(signal.SIGHUP,
797 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
799 utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
802 logging.info("%s daemon startup", daemon_name)
803 if callable(prepare_fn):
804 prep_results = prepare_fn(options, args)
807 except Exception, err:
808 utils.WriteErrorToFD(wpipe, _BeautifyError(err))
811 if wpipe is not None:
812 # we're done with the preparation phase, we close the pipe to
813 # let the parent know it's safe to exit
816 exec_fn(options, args, prep_results)
818 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))