4 # Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import netutils
41 from ganeti import ssconf
42 from ganeti import runtime
43 from ganeti import compat
46 class SchedulerBreakout(Exception):
47 """Exception used to get out of the scheduler loop
52 def AsyncoreDelayFunction(timeout):
53 """Asyncore-compatible scheduler delay function.
55 This is a delay function for sched that, rather than actually sleeping,
56 executes asyncore events happening in the meantime.
58 After an event has occurred, rather than returning, it raises a
59 SchedulerBreakout exception, which will force the current scheduler.run()
60 invocation to terminate, so that we can also check for signals. The main loop
61 will then call the scheduler run again, which will allow it to actually
62 process any due events.
64 This is needed because scheduler.run() doesn't support a count=..., as
65 asyncore loop, and the scheduler module documents throwing exceptions from
66 inside the delay function as an allowed usage model.
69 asyncore.loop(timeout=timeout, count=1, use_poll=True)
70 raise SchedulerBreakout()
73 class AsyncoreScheduler(sched.scheduler):
74 """Event scheduler integrated with asyncore
77 def __init__(self, timefunc):
78 """Initializes this class.
81 sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
82 self._max_delay = None
84 def run(self, max_delay=None): # pylint: disable=W0221
85 """Run any pending events.
87 @type max_delay: None or number
88 @param max_delay: Maximum delay (useful if caller has timeouts running)
91 assert self._max_delay is None
93 # The delay function used by the scheduler can't be different on each run,
94 # hence an instance variable must be used.
96 self._max_delay = None
98 self._max_delay = utils.RunningTimeout(max_delay, False)
101 return sched.scheduler.run(self)
103 self._max_delay = None
105 def _LimitedDelay(self, duration):
106 """Custom delay function for C{sched.scheduler}.
109 if self._max_delay is None:
112 timeout = min(duration, self._max_delay.Remaining())
114 return AsyncoreDelayFunction(timeout)
117 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
118 """Base Ganeti Asyncore Dispacher
121 # this method is overriding an asyncore.dispatcher method
122 def handle_error(self):
123 """Log an error in handling any request, and proceed.
126 logging.exception("Error while handling asyncore request")
128 # this method is overriding an asyncore.dispatcher method
130 """Most of the time we don't want to check for writability.
136 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
137 """A stream server to use with asyncore.
139 Each request is accepted, and then dispatched to a separate asyncore
140 dispatcher to handle.
144 _REQUEST_QUEUE_SIZE = 5
146 def __init__(self, family, address):
147 """Constructor for AsyncUnixStreamSocket
149 @type family: integer
150 @param family: socket family (one of socket.AF_*)
151 @type address: address family dependent
152 @param address: address to bind the socket to
155 GanetiBaseAsyncoreDispatcher.__init__(self)
157 self.create_socket(self.family, socket.SOCK_STREAM)
158 self.set_reuse_addr()
160 self.listen(self._REQUEST_QUEUE_SIZE)
162 # this method is overriding an asyncore.dispatcher method
163 def handle_accept(self):
164 """Accept a new client connection.
166 Creates a new instance of the handler class, which will use asyncore to
170 accept_result = utils.IgnoreSignals(self.accept)
171 if accept_result is not None:
172 connected_socket, client_address = accept_result
173 if self.family == socket.AF_UNIX:
174 # override the client address, as for unix sockets nothing meaningful
175 # is passed in from accept anyway
176 client_address = netutils.GetSocketCredentials(connected_socket)
177 logging.info("Accepted connection from %s",
178 netutils.FormatAddress(client_address, family=self.family))
179 self.handle_connection(connected_socket, client_address)
181 def handle_connection(self, connected_socket, client_address):
182 """Handle an already accepted connection.
185 raise NotImplementedError
188 class AsyncTerminatedMessageStream(asynchat.async_chat):
189 """A terminator separated message stream asyncore module.
191 Handles a stream connection receiving messages terminated by a defined
192 separator. For each complete message handle_message is called.
195 def __init__(self, connected_socket, peer_address, terminator, family,
197 """AsyncTerminatedMessageStream constructor.
199 @type connected_socket: socket.socket
200 @param connected_socket: connected stream socket to receive messages from
201 @param peer_address: family-specific peer address
202 @type terminator: string
203 @param terminator: terminator separating messages in the stream
204 @type family: integer
205 @param family: socket family
206 @type unhandled_limit: integer or None
207 @param unhandled_limit: maximum unanswered messages
210 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
211 # using a positional argument rather than a keyword one.
212 asynchat.async_chat.__init__(self, connected_socket)
213 self.connected_socket = connected_socket
214 # on python 2.4 there is no "family" attribute for the socket class
215 # FIXME: when we move to python 2.5 or above remove the family parameter
216 #self.family = self.connected_socket.family
218 self.peer_address = peer_address
219 self.terminator = terminator
220 self.unhandled_limit = unhandled_limit
221 self.set_terminator(terminator)
223 self.receive_count = 0
225 self.oqueue = collections.deque()
226 self.iqueue = collections.deque()
228 # this method is overriding an asynchat.async_chat method
229 def collect_incoming_data(self, data):
230 self.ibuffer.append(data)
232 def _can_handle_message(self):
233 return (self.unhandled_limit is None or
234 (self.receive_count < self.send_count + self.unhandled_limit) and
237 # this method is overriding an asynchat.async_chat method
238 def found_terminator(self):
239 message = "".join(self.ibuffer)
241 message_id = self.receive_count
242 # We need to increase the receive_count after checking if the message can
243 # be handled, but before calling handle_message
244 can_handle = self._can_handle_message()
245 self.receive_count += 1
247 self.handle_message(message, message_id)
249 self.iqueue.append((message, message_id))
251 def handle_message(self, message, message_id):
252 """Handle a terminated message.
254 @type message: string
255 @param message: message to handle
256 @type message_id: integer
257 @param message_id: stream's message sequence number
261 # TODO: move this method to raise NotImplementedError
262 # raise NotImplementedError
264 def send_message(self, message):
265 """Send a message to the remote peer. This function is thread-safe.
267 @type message: string
268 @param message: message to send, without the terminator
270 @warning: If calling this function from a thread different than the one
271 performing the main asyncore loop, remember that you have to wake that one
275 # If we just append the message we received to the output queue, this
276 # function can be safely called by multiple threads at the same time, and
277 # we don't need locking, since deques are thread safe. handle_write in the
278 # asyncore thread will handle the next input message if there are any
280 self.oqueue.append(message)
282 # this method is overriding an asyncore.dispatcher method
284 # read from the socket if we can handle the next requests
285 return self._can_handle_message() and asynchat.async_chat.readable(self)
287 # this method is overriding an asyncore.dispatcher method
289 # the output queue may become full just after we called writable. This only
290 # works if we know we'll have something else waking us up from the select,
291 # in such case, anyway.
292 return asynchat.async_chat.writable(self) or self.oqueue
294 # this method is overriding an asyncore.dispatcher method
295 def handle_write(self):
297 # if we have data in the output queue, then send_message was called.
298 # this means we can process one more message from the input queue, if
300 data = self.oqueue.popleft()
301 self.push(data + self.terminator)
304 self.handle_message(*self.iqueue.popleft())
308 logging.info("Closing connection from %s",
309 netutils.FormatAddress(self.peer_address, family=self.family))
312 # this method is overriding an asyncore.dispatcher method
313 def handle_expt(self):
316 # this method is overriding an asyncore.dispatcher method
317 def handle_error(self):
318 """Log an error in handling any request, and proceed.
321 logging.exception("Error while handling asyncore request")
325 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
326 """An improved asyncore udp socket.
329 def __init__(self, family):
330 """Constructor for AsyncUDPSocket
333 GanetiBaseAsyncoreDispatcher.__init__(self)
335 self._family = family
336 self.create_socket(family, socket.SOCK_DGRAM)
338 # this method is overriding an asyncore.dispatcher method
339 def handle_connect(self):
340 # Python thinks that the first udp message from a source qualifies as a
341 # "connect" and further ones are part of the same connection. We beg to
342 # differ and treat all messages equally.
345 # this method is overriding an asyncore.dispatcher method
346 def handle_read(self):
347 recv_result = utils.IgnoreSignals(self.recvfrom,
348 constants.MAX_UDP_DATA_SIZE)
349 if recv_result is not None:
350 payload, address = recv_result
351 if self._family == socket.AF_INET6:
352 # we ignore 'flow info' and 'scope id' as we don't need them
353 ip, port, _, _ = address
357 self.handle_datagram(payload, ip, port)
359 def handle_datagram(self, payload, ip, port):
360 """Handle an already read udp datagram
363 raise NotImplementedError
365 # this method is overriding an asyncore.dispatcher method
367 # We should check whether we can write to the socket only if we have
368 # something scheduled to be written
369 return bool(self._out_queue)
371 # this method is overriding an asyncore.dispatcher method
372 def handle_write(self):
373 if not self._out_queue:
374 logging.error("handle_write called with empty output queue")
376 (ip, port, payload) = self._out_queue[0]
377 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
378 self._out_queue.pop(0)
380 def enqueue_send(self, ip, port, payload):
381 """Enqueue a datagram to be sent when possible
384 if len(payload) > constants.MAX_UDP_DATA_SIZE:
385 raise errors.UdpDataSizeError("Packet too big: %s > %s" % (len(payload),
386 constants.MAX_UDP_DATA_SIZE))
387 self._out_queue.append((ip, port, payload))
389 def process_next_packet(self, timeout=0):
390 """Process the next datagram, waiting for it if necessary.
393 @param timeout: how long to wait for data
395 @return: True if some data has been handled, False otherwise
398 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
399 if result is not None and result & select.POLLIN:
406 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
407 """A way to notify the asyncore loop that something is going on.
409 If an asyncore daemon is multithreaded when a thread tries to push some data
410 to a socket, the main loop handling asynchronous requests might be sleeping
411 waiting on a select(). To avoid this it can create an instance of the
412 AsyncAwaker, which other threads can use to wake it up.
415 def __init__(self, signal_fn=None):
416 """Constructor for AsyncAwaker
418 @type signal_fn: function
419 @param signal_fn: function to call when awaken
422 GanetiBaseAsyncoreDispatcher.__init__(self)
423 assert signal_fn is None or callable(signal_fn)
424 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
426 self.in_socket.setblocking(0)
427 self.in_socket.shutdown(socket.SHUT_WR)
428 self.out_socket.shutdown(socket.SHUT_RD)
429 self.set_socket(self.in_socket)
430 self.need_signal = True
431 self.signal_fn = signal_fn
432 self.connected = True
434 # this method is overriding an asyncore.dispatcher method
435 def handle_read(self):
436 utils.IgnoreSignals(self.recv, 4096)
439 self.need_signal = True
441 # this method is overriding an asyncore.dispatcher method
443 asyncore.dispatcher.close(self)
444 self.out_socket.close()
447 """Signal the asyncore main loop.
449 Any data we send here will be ignored, but it will cause the select() call
453 # Yes, there is a race condition here. No, we don't care, at worst we're
454 # sending more than one wakeup token, which doesn't harm at all.
456 self.need_signal = False
457 self.out_socket.send("\0")
460 class _ShutdownCheck:
461 """Logic for L{Mainloop} shutdown.
464 def __init__(self, fn):
465 """Initializes this class.
468 @param fn: Function returning C{None} if mainloop can be stopped or a
469 duration in seconds after which the function should be called again
470 @see: L{Mainloop.Run}
478 def CanShutdown(self):
479 """Checks whether mainloop can be stopped.
484 if self._defer and self._defer.Remaining() > 0:
485 # A deferred check has already been scheduled
488 # Ask mainloop driver whether we can stop or should check again
492 # Yes, can stop mainloop
495 # Schedule another check in the future
496 self._defer = utils.RunningTimeout(timeout, True)
501 class Mainloop(object):
502 """Generic mainloop for daemons
504 @ivar scheduler: A sched.scheduler object, which can be used to register
508 _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
511 """Constructs a new Mainloop instance.
514 self._signal_wait = []
515 self.scheduler = AsyncoreScheduler(time.time)
517 # Resolve uid/gids used
520 @utils.SignalHandled([signal.SIGCHLD])
521 @utils.SignalHandled([signal.SIGTERM])
522 @utils.SignalHandled([signal.SIGINT])
523 def Run(self, shutdown_wait_fn=None, signal_handlers=None):
524 """Runs the mainloop.
526 @type shutdown_wait_fn: callable
527 @param shutdown_wait_fn: Function to check whether loop can be terminated;
528 B{important}: function must be idempotent and must return either None
529 for shutting down or a timeout for another call
530 @type signal_handlers: dict
531 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
534 assert isinstance(signal_handlers, dict) and \
535 len(signal_handlers) > 0, \
536 "Broken SignalHandled decorator"
538 # Counter for received signals
541 # Logic to wait for shutdown
542 shutdown_waiter = None
544 # Start actual main loop
546 if shutdown_signals == 1 and shutdown_wait_fn is not None:
547 if shutdown_waiter is None:
548 shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)
550 # Let mainloop driver decide if we can already abort
551 if shutdown_waiter.CanShutdown():
554 # Re-evaluate in a second
557 elif shutdown_signals >= 1:
558 # Abort loop if more than one signal has been sent or no callback has
563 # Wait forever on I/O events
566 if self.scheduler.empty():
567 asyncore.loop(count=1, timeout=timeout, use_poll=True)
570 self.scheduler.run(max_delay=timeout)
571 except SchedulerBreakout:
574 # Check whether a signal was raised
575 for (sig, handler) in signal_handlers.items():
577 self._CallSignalWaiters(sig)
578 if sig in (signal.SIGTERM, signal.SIGINT):
579 logging.info("Received signal %s asking for shutdown", sig)
580 shutdown_signals += 1
583 def _CallSignalWaiters(self, signum):
584 """Calls all signal waiters for a certain signal.
587 @param signum: Signal number
590 for owner in self._signal_wait:
591 owner.OnSignal(signum)
593 def RegisterSignal(self, owner):
594 """Registers a receiver for signal notifications
596 The receiver must support a "OnSignal(self, signum)" function.
598 @type owner: instance
599 @param owner: Receiver
602 self._signal_wait.append(owner)
605 def _VerifyDaemonUser(daemon_name):
606 """Verifies the process uid matches the configured uid.
608 This method verifies that a daemon is started as the user it is
611 @param daemon_name: The name of daemon to be started
612 @return: A tuple with the first item indicating success or not,
613 the second item current uid and third with expected uid
616 getents = runtime.GetEnts()
617 running_uid = os.getuid()
619 constants.MASTERD: getents.masterd_uid,
620 constants.RAPI: getents.rapi_uid,
621 constants.NODED: getents.noded_uid,
622 constants.CONFD: getents.confd_uid,
624 assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name
626 return (daemon_uids[daemon_name] == running_uid, running_uid,
627 daemon_uids[daemon_name])
630 def _BeautifyError(err):
631 """Try to format an error better.
633 Since we're dealing with daemon startup errors, in many cases this
634 will be due to socket error and such, so we try to format these cases better.
636 @param err: an exception object
638 @return: the formatted error description
642 if isinstance(err, socket.error):
643 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
644 elif isinstance(err, EnvironmentError):
645 if err.filename is None:
646 return "%s (errno=%s)" % (err.strerror, err.errno)
648 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
652 except Exception: # pylint: disable=W0703
653 logging.exception("Error while handling existing error %s", err)
654 return "%s" % str(err)
657 def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
658 """Handler for SIGHUP.
660 @param reopen_fn: List of callback functions for reopening log files
663 logging.info("Reopening log files after receiving SIGHUP")
670 def GenericMain(daemon_name, optionparser,
671 check_fn, prepare_fn, exec_fn,
672 multithreaded=False, console_logging=False,
673 default_ssl_cert=None, default_ssl_key=None):
674 """Shared main function for daemons.
676 @type daemon_name: string
677 @param daemon_name: daemon name
678 @type optionparser: optparse.OptionParser
679 @param optionparser: initialized optionparser with daemon-specific options
680 (common -f -d options will be handled by this module)
681 @type check_fn: function which accepts (options, args)
682 @param check_fn: function that checks start conditions and exits if they're
684 @type prepare_fn: function which accepts (options, args)
685 @param prepare_fn: function that is run before forking, or None;
686 it's result will be passed as the third parameter to exec_fn, or
687 if None was passed in, we will just pass None to exec_fn
688 @type exec_fn: function which accepts (options, args, prepare_results)
689 @param exec_fn: function that's executed with the daemon's pid file held, and
690 runs the daemon itself.
691 @type multithreaded: bool
692 @param multithreaded: Whether the daemon uses threads
693 @type console_logging: boolean
694 @param console_logging: if True, the daemon will fall back to the system
695 console if logging fails
696 @type default_ssl_cert: string
697 @param default_ssl_cert: Default SSL certificate path
698 @type default_ssl_key: string
699 @param default_ssl_key: Default SSL key path
702 optionparser.add_option("-f", "--foreground", dest="fork",
703 help="Don't detach from the current terminal",
704 default=True, action="store_false")
705 optionparser.add_option("-d", "--debug", dest="debug",
706 help="Enable some debug messages",
707 default=False, action="store_true")
708 optionparser.add_option("--syslog", dest="syslog",
709 help="Enable logging to syslog (except debug"
710 " messages); one of 'no', 'yes' or 'only' [%s]" %
711 constants.SYSLOG_USAGE,
712 default=constants.SYSLOG_USAGE,
713 choices=["no", "yes", "only"])
715 family = ssconf.SimpleStore().GetPrimaryIPFamily()
716 # family will default to AF_INET if there is no ssconf file (e.g. when
717 # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
718 # <= 2.2 can not be AF_INET6
719 if daemon_name in constants.DAEMONS_PORTS:
720 default_bind_address = constants.IP4_ADDRESS_ANY
721 if family == netutils.IP6Address.family:
722 default_bind_address = constants.IP6_ADDRESS_ANY
724 default_port = netutils.GetDaemonPort(daemon_name)
726 # For networked daemons we allow choosing the port and bind address
727 optionparser.add_option("-p", "--port", dest="port",
728 help="Network port (default: %s)" % default_port,
729 default=default_port, type="int")
730 optionparser.add_option("-b", "--bind", dest="bind_address",
731 help=("Bind address (default: '%s')" %
732 default_bind_address),
733 default=default_bind_address, metavar="ADDRESS")
734 optionparser.add_option("-i", "--interface", dest="bind_interface",
735 help=("Bind interface"), metavar="INTERFACE")
737 if default_ssl_key is not None and default_ssl_cert is not None:
738 optionparser.add_option("--no-ssl", dest="ssl",
739 help="Do not secure HTTP protocol with SSL",
740 default=True, action="store_false")
741 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
742 help=("SSL key path (default: %s)" %
744 default=default_ssl_key, type="string",
745 metavar="SSL_KEY_PATH")
746 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
747 help=("SSL certificate path (default: %s)" %
749 default=default_ssl_cert, type="string",
750 metavar="SSL_CERT_PATH")
752 # Disable the use of fork(2) if the daemon uses threads
756 options, args = optionparser.parse_args()
758 if getattr(options, "bind_interface", None) is not None:
759 if options.bind_address != default_bind_address:
760 msg = ("Can't specify both, bind address (%s) and bind interface (%s)" %
761 (options.bind_address, options.bind_interface))
762 print >> sys.stderr, msg
763 sys.exit(constants.EXIT_FAILURE)
764 interface_ip_addresses = \
765 netutils.GetInterfaceIpAddresses(options.bind_interface)
766 if family == netutils.IP6Address.family:
767 if_addresses = interface_ip_addresses[constants.IP6_VERSION]
769 if_addresses = interface_ip_addresses[constants.IP4_VERSION]
770 if len(if_addresses) < 1:
771 msg = "Failed to find IP for interface %s" % options.bind_interace
772 print >> sys.stderr, msg
773 sys.exit(constants.EXIT_FAILURE)
774 options.bind_address = if_addresses[0]
776 if getattr(options, "ssl", False):
778 "certificate": options.ssl_cert,
779 "key": options.ssl_key,
782 for name, path in ssl_paths.iteritems():
783 if not os.path.isfile(path):
784 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
785 sys.exit(constants.EXIT_FAILURE)
787 # TODO: By initiating http.HttpSslParams here we would only read the files
788 # once and have a proper validation (isfile returns False on directories)
791 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
793 msg = ("%s started using wrong user ID (%d), expected %d" %
794 (daemon_name, running_uid, expected_uid))
795 print >> sys.stderr, msg
796 sys.exit(constants.EXIT_FAILURE)
798 if check_fn is not None:
799 check_fn(options, args)
801 log_filename = constants.DAEMONS_LOGFILES[daemon_name]
805 (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename)
807 (wpipe, stdio_reopen_fn) = (None, None)
810 utils.SetupLogging(log_filename, daemon_name,
812 stderr_logging=not options.fork,
813 multithreaded=multithreaded,
814 syslog=options.syslog,
815 console_logging=console_logging)
817 # Reopen log file(s) on SIGHUP
818 signal.signal(signal.SIGHUP,
819 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
822 utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
823 except errors.PidFileLockError, err:
824 print >> sys.stderr, "Error while locking PID file:\n%s" % err
825 sys.exit(constants.EXIT_FAILURE)
829 logging.info("%s daemon startup", daemon_name)
830 if callable(prepare_fn):
831 prep_results = prepare_fn(options, args)
834 except Exception, err:
835 utils.WriteErrorToFD(wpipe, _BeautifyError(err))
838 if wpipe is not None:
839 # we're done with the preparation phase, we close the pipe to
840 # let the parent know it's safe to exit
843 exec_fn(options, args, prep_results)
845 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))