X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/f91c72231c9b5d91dcc7d2c88f88460d7600c734..acd9ff9e995bb1dd7314ce7923aa4e5e888019a1:/daemons/ganeti-confd diff --git a/daemons/ganeti-confd b/daemons/ganeti-confd index df026da..3a685c7 100755 --- a/daemons/ganeti-confd +++ b/daemons/ganeti-confd @@ -26,24 +26,31 @@ It uses UDP+HMAC for authentication with a global cluster key. """ +# pylint: disable-msg=C0103 +# C0103: Invalid name ganeti-confd + import os import sys import logging -import asyncore -import socket -import pyinotify +import time + +try: + # pylint: disable-msg=E0611 + from pyinotify import pyinotify +except ImportError: + import pyinotify from optparse import OptionParser +from ganeti import asyncnotifier +from ganeti import confd +from ganeti.confd import server as confd_server from ganeti import constants from ganeti import errors from ganeti import daemon -from ganeti import ssconf -from ganeti.asyncnotifier import AsyncNotifier -from ganeti.confd.server import ConfdProcessor -class ConfdAsyncUDPServer(asyncore.dispatcher): +class ConfdAsyncUDPServer(daemon.AsyncUDPSocket): """The confd udp server, suitable for use with asyncore. """ @@ -55,163 +62,218 @@ class ConfdAsyncUDPServer(asyncore.dispatcher): @type port: int @param port: udp port @type processor: L{confd.server.ConfdProcessor} - @param reader: ConfigReader to use to access the config + @param processor: ConfdProcessor to use to handle queries """ - asyncore.dispatcher.__init__(self) + daemon.AsyncUDPSocket.__init__(self) self.bind_address = bind_address self.port = port self.processor = processor - self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) self.bind((bind_address, port)) - logging.debug("listening on ('%s':%d)" % (bind_address, port)) + logging.debug("listening on ('%s':%d)", bind_address, port) - # this method is overriding an asyncore.dispatcher method - def handle_connect(self): - # Python thinks that the first udp message from a source qualifies as a - # "connect" and further ones are part of the same connection. We beg to - # differ and treat all messages equally. - pass - - # this method is overriding an asyncore.dispatcher method - def handle_read(self): + # this method is overriding a daemon.AsyncUDPSocket method + def handle_datagram(self, payload_in, ip, port): try: - payload_in, address = self.recvfrom(4096) - ip, port = address - payload_out = self.processor.ExecQuery(payload_in, ip, port) - if payload_out is not None: - self.sendto(payload_out, 0, (ip, port)) - except: - # we need to catch any exception here, log it, but proceed, because even - # if we failed handling a single request, we still want the confd to - # continue working. - logging.error("Unexpected exception", exc_info=True) - - # this method is overriding an asyncore.dispatcher method - def writable(self): - # No need to check if we can write to the UDP socket - return False - - -class ConfdInotifyEventHandler(pyinotify.ProcessEvent): - - def __init__(self, watch_manager, reader, - file=constants.CLUSTER_CONF_FILE): - """Constructor for ConfdInotifyEventHandler - - @type watch_manager: L{pyinotify.WatchManager} - @param watch_manager: ganeti-confd inotify watch manager - @type reader: L{ssconf.SimpleConfigReader} - @param reader: ganeti-confd SimpleConfigReader - @type file: string - @param file: config file to watch + query = confd.UnpackMagic(payload_in) + except errors.ConfdMagicError, err: + logging.debug(err) + return + + answer = self.processor.ExecQuery(query, ip, port) + if answer is not None: + try: + self.enqueue_send(ip, port, confd.PackMagic(answer)) + except errors.UdpDataSizeError: + logging.error("Reply too big to fit in an udp packet.") + + +class ConfdConfigurationReloader(object): + """Logic to control when to reload the ganeti configuration + + This class is able to alter between inotify and polling, to rate-limit the + number of reloads. When using inotify it also supports a fallback timed + check, to verify that the reload hasn't failed. + + """ + def __init__(self, processor, mainloop): + """Constructor for ConfdConfigurationReloader + + @type processor: L{confd.server.ConfdProcessor} + @param processor: ganeti-confd ConfdProcessor + @type mainloop: L{daemon.Mainloop} + @param mainloop: ganeti-confd mainloop """ - # no need to call the parent's constructor - self.watch_manager = watch_manager - self.reader = reader - self.mask = pyinotify.EventsCodes.IN_IGNORED | \ - pyinotify.EventsCodes.IN_MODIFY - self.file = file - self.add_config_watch() + self.processor = processor + self.mainloop = mainloop + + self.polling = True + self.last_notification = 0 + + # Asyncronous inotify handler for config changes + cfg_file = constants.CLUSTER_CONF_FILE + self.wm = pyinotify.WatchManager() + self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm, + self.OnInotify, + cfg_file) + notifier_class = asyncnotifier.ErrorLoggingAsyncNotifier + self.notifier = notifier_class(self.wm, self.inotify_handler) + + self.timer_handle = None + self._EnableTimer() + + def OnInotify(self, notifier_enabled): + """Receive an inotify notification. - def add_config_watch(self): - """Add a watcher for the ganeti config file + @type notifier_enabled: boolean + @param notifier_enabled: whether the notifier is still enabled """ - result = self.watch_manager.add_watch(self.file, self.mask) - if not result[self.file] > 0: - raise errors.ConfdFatalError("Could not add inotify watcher") + current_time = time.time() + time_delta = current_time - self.last_notification + self.last_notification = current_time + + if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT: + logging.debug("Moving from inotify mode to polling mode") + self.polling = True + if notifier_enabled: + self.inotify_handler.disable() + + if not self.polling and not notifier_enabled: + try: + self.inotify_handler.enable() + except errors.InotifyError: + self.polling = True - def reload_config(self): try: - reloaded = self.reader.Reload() + reloaded = self.processor.reader.Reload() if reloaded: logging.info("Reloaded ganeti config") else: logging.debug("Skipped double config reload") except errors.ConfigurationError: - # transform a ConfigurationError in a fatal error, that will cause confd - # to quit. - raise errors.ConfdFatalError(err) - - def process_IN_IGNORED(self, event): - # Due to the fact that we monitor just for the cluster config file (rather - # than for the whole data dir) when the file is replaced with another one - # (which is what happens normally in ganeti) we're going to receive an - # IN_IGNORED event from inotify, because of the file removal (which is - # contextual with the replacement). In such a case we need to create - # another watcher for the "new" file. - logging.debug("Received 'ignored' inotify event for %s" % event.path) + self.DisableConfd() + self.inotify_handler.disable() + return - try: - # Since the kernel believes the file we were interested in is gone, it's - # not going to notify us of any other events, until we set up, here, the - # new watch. This is not a race condition, though, since we're anyway - # going to realod the file after setting up the new watch. - self.add_config_watch() - self.reload_config() - except errors.ConfdFatalError, err: - logging.critical("Critical error, shutting down: %s" % err) - sys.exit(constants.EXIT_FAILURE) - except: - # we need to catch any exception here, log it, but proceed, because even - # if we failed handling a single request, we still want the confd to - # continue working. - logging.error("Unexpected exception", exc_info=True) - - def process_IN_MODIFY(self, event): - # This gets called when the config file is modified. Note that this doesn't - # usually happen in Ganeti, as the config file is normally replaced by a - # new one, at filesystem level, rather than actually modified (see - # utils.WriteFile) - logging.debug("Received 'modify' inotify event for %s" % event.path) + # Reset the timer. If we're polling it will go to the polling rate, if + # we're not it will delay it again to its base safe timeout. + self._ResetTimer() + + def _DisableTimer(self): + if self.timer_handle is not None: + self.mainloop.scheduler.cancel(self.timer_handle) + self.timer_handle = None + + def _EnableTimer(self): + if self.polling: + timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT + else: + timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT + + if self.timer_handle is None: + self.timer_handle = self.mainloop.scheduler.enter( + timeout, 1, self.OnTimer, []) + def _ResetTimer(self): + self._DisableTimer() + self._EnableTimer() + + def OnTimer(self): + """Function called when the timer fires + + """ + self.timer_handle = None + reloaded = False + was_disabled = False try: - self.reload_config() - except errors.ConfdFatalError, err: - logging.critical("Critical error, shutting down: %s" % err) - sys.exit(constants.EXIT_FAILURE) - except: - # we need to catch any exception here, log it, but proceed, because even - # if we failed handling a single request, we still want the confd to - # continue working. - logging.error("Unexpected exception", exc_info=True) + if self.processor.reader is None: + was_disabled = True + self.EnableConfd() + reloaded = True + else: + reloaded = self.processor.reader.Reload() + except errors.ConfigurationError: + self.DisableConfd(silent=was_disabled) + return + + if self.polling and reloaded: + logging.info("Reloaded ganeti config") + elif reloaded: + # We have reloaded the config files, but received no inotify event. If + # an event is pending though, we just happen to have timed out before + # receiving it, so this is not a problem, and we shouldn't alert + if not self.notifier.check_events() and not was_disabled: + logging.warning("Config file reload at timeout (inotify failure)") + elif self.polling: + # We're polling, but we haven't reloaded the config: + # Going back to inotify mode + logging.debug("Moving from polling mode to inotify mode") + self.polling = False + try: + self.inotify_handler.enable() + except errors.InotifyError: + self.polling = True + else: + logging.debug("Performed configuration check") + + self._EnableTimer() + + def DisableConfd(self, silent=False): + """Puts confd in non-serving mode + + """ + if not silent: + logging.warning("Confd is being disabled") + self.processor.Disable() + self.polling = False + self._ResetTimer() - def process_default(self, event): - logging.error("Received unhandled inotify event: %s" % event) + def EnableConfd(self): + self.processor.Enable() + logging.warning("Confd is being enabled") + self.polling = True + self._ResetTimer() -def CheckCONFD(options, args): - """Initial checks whether to run exit with a failure +def CheckConfd(_, args): + """Initial checks whether to run exit with a failure. """ + if args: # confd doesn't take any arguments + print >> sys.stderr, ("Usage: %s [-f] [-d] [-b ADDRESS]" % sys.argv[0]) + sys.exit(constants.EXIT_FAILURE) + # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll # have more than one. - if not os.path.isfile(constants.HMAC_CLUSTER_KEY): - print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY + if not os.path.isfile(constants.CONFD_HMAC_KEY): + print >> sys.stderr, "Need HMAC key %s to run" % constants.CONFD_HMAC_KEY sys.exit(constants.EXIT_FAILURE) - ssconf.CheckMasterCandidate(options.debug) - -def ExecCONFD(options, args): - """Main CONFD function, executed with pidfile held +def ExecConfd(options, _): + """Main confd function, executed with PID file held """ + # TODO: clarify how the server and reloader variables work (they are + # not used) + # pylint: disable-msg=W0612 mainloop = daemon.Mainloop() - # confd-level SimpleConfigReader - reader = ssconf.SimpleConfigReader() - # Asyncronous confd UDP server - processor = ConfdProcessor(reader) + processor = confd_server.ConfdProcessor() + try: + processor.Enable() + except errors.ConfigurationError: + # If enabling the processor has failed, we can still go on, but confd will + # be disabled + logging.warning("Confd is starting in disabled mode") + server = ConfdAsyncUDPServer(options.bind_address, options.port, processor) - # Asyncronous inotify handler for config changes - wm = pyinotify.WatchManager() - confd_event_handler = ConfdInotifyEventHandler(wm, reader) - notifier = AsyncNotifier(wm, confd_event_handler) + # Configuration reloader + reloader = ConfdConfigurationReloader(processor, mainloop) mainloop.Run() @@ -226,10 +288,9 @@ def main(): constants.RELEASE_VERSION) dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS] - dirs.append((constants.LOG_OS_DIR, 0750)) dirs.append((constants.LOCK_DIR, 1777)) - daemon.GenericMain(constants.CONFD, parser, dirs, CheckCONFD, ExecCONFD) + daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd) -if __name__ == '__main__': +if __name__ == "__main__": main()