X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/562bee4d2fe51f809be9e1206178e5d55312bd1d..2e6469a14ae2b2c7b547076efe3fdc8eacae6050:/daemons/ganeti-confd diff --git a/daemons/ganeti-confd b/daemons/ganeti-confd index a0e6a0c..d19157e 100755 --- a/daemons/ganeti-confd +++ b/daemons/ganeti-confd @@ -1,7 +1,7 @@ #!/usr/bin/python # -# Copyright (C) 2009, Google Inc. +# Copyright (C) 2009, 2010 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -26,24 +26,32 @@ It uses UDP+HMAC for authentication with a global cluster key. """ +# pylint: disable-msg=C0103 +# C0103: Invalid name ganeti-confd + import os import sys import logging -import asyncore -import socket -import pyinotify +import time + +try: + # pylint: disable-msg=E0611 + from pyinotify import pyinotify +except ImportError: + import pyinotify from optparse import OptionParser +from ganeti import asyncnotifier +from ganeti import confd +from ganeti.confd import server as confd_server from ganeti import constants from ganeti import errors from ganeti import daemon -from ganeti import ssconf -from ganeti.asyncnotifier import AsyncNotifier -from ganeti.confd.server import ConfdProcessor +from ganeti import netutils -class ConfdAsyncUDPServer(asyncore.dispatcher): +class ConfdAsyncUDPServer(daemon.AsyncUDPSocket): """The confd udp server, suitable for use with asyncore. """ @@ -51,202 +59,227 @@ class ConfdAsyncUDPServer(asyncore.dispatcher): """Constructor for ConfdAsyncUDPServer @type bind_address: string - @param bind_address: socket bind address ('' for all) + @param bind_address: socket bind address @type port: int @param port: udp port @type processor: L{confd.server.ConfdProcessor} - @param reader: ConfigReader to use to access the config + @param processor: ConfdProcessor to use to handle queries """ - asyncore.dispatcher.__init__(self) + family = netutils.IPAddress.GetAddressFamily(bind_address) + daemon.AsyncUDPSocket.__init__(self, family) self.bind_address = bind_address self.port = port self.processor = processor - self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) self.bind((bind_address, port)) - logging.debug("listening on ('%s':%d)" % (bind_address, port)) - - # this method is overriding an asyncore.dispatcher method - def handle_connect(self): - # Python thinks that the first udp message from a source qualifies as a - # "connect" and further ones are part of the same connection. We beg to - # differ and treat all messages equally. - pass + logging.debug("listening on ('%s':%d)", bind_address, port) - # this method is overriding an asyncore.dispatcher method - def handle_read(self): + # this method is overriding a daemon.AsyncUDPSocket method + def handle_datagram(self, payload_in, ip, port): try: - payload_in, address = self.recvfrom(4096) - ip, port = address - payload_out = self.processor.ExecQuery(payload_in, ip, port) - if payload_out is not None: - self.sendto(payload_out, 0, (ip, port)) - except: - # we need to catch any exception here, log it, but proceed, because even - # if we failed handling a single request, we still want the confd to - # continue working. - logging.error("Unexpected exception", exc_info=True) - - # this method is overriding an asyncore.dispatcher method - def writable(self): - # No need to check if we can write to the UDP socket - return False - - -class ConfdInotifyEventHandler(pyinotify.ProcessEvent): - - def __init__(self, watch_manager, reader, - file=constants.CLUSTER_CONF_FILE): - """Constructor for ConfdInotifyEventHandler - - @type watch_manager: L{pyinotify.WatchManager} - @param watch_manager: ganeti-confd inotify watch manager - @type reader: L{ssconf.SimpleConfigReader} - @param reader: ganeti-confd SimpleConfigReader - @type file: string - @param file: config file to watch + query = confd.UnpackMagic(payload_in) + except errors.ConfdMagicError, err: + logging.debug(err) + return - """ - # no need to call the parent's constructor - self.watch_manager = watch_manager - self.reader = reader - self.mask = pyinotify.EventsCodes.IN_IGNORED | \ - pyinotify.EventsCodes.IN_MODIFY - self.file = file - self.watch_handle = None - self.enable() - - def enable(self): - """Watch the given file + answer = self.processor.ExecQuery(query, ip, port) + if answer is not None: + try: + self.enqueue_send(ip, port, confd.PackMagic(answer)) + except errors.UdpDataSizeError: + logging.error("Reply too big to fit in an udp packet.") + + +class ConfdConfigurationReloader(object): + """Logic to control when to reload the ganeti configuration + + This class is able to alter between inotify and polling, to rate-limit the + number of reloads. When using inotify it also supports a fallback timed + check, to verify that the reload hasn't failed. + + """ + def __init__(self, processor, mainloop): + """Constructor for ConfdConfigurationReloader + + @type processor: L{confd.server.ConfdProcessor} + @param processor: ganeti-confd ConfdProcessor + @type mainloop: L{daemon.Mainloop} + @param mainloop: ganeti-confd mainloop """ - if self.watch_handle is None: - result = self.watch_manager.add_watch(self.file, self.mask) - if not self.file in result or result[self.file] <= 0: - raise errors.ConfdFatalError("Could not add inotify watcher") - else: - self.watch_handle = result[self.file] + self.processor = processor + self.mainloop = mainloop + + self.polling = True + self.last_notification = 0 + + # Asyncronous inotify handler for config changes + cfg_file = constants.CLUSTER_CONF_FILE + self.wm = pyinotify.WatchManager() + self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm, + self.OnInotify, + cfg_file) + notifier_class = asyncnotifier.ErrorLoggingAsyncNotifier + self.notifier = notifier_class(self.wm, self.inotify_handler) - def disable(self): - """Stop watching the given file + self.timer_handle = None + self._EnableTimer() + + def OnInotify(self, notifier_enabled): + """Receive an inotify notification. + + @type notifier_enabled: boolean + @param notifier_enabled: whether the notifier is still enabled """ - if self.watch_handle is not None: - result = self.watch_manager.rm_watch(self.watch_handle) - if result[self.watch_handle]: - self.watch_handle = None + current_time = time.time() + time_delta = current_time - self.last_notification + self.last_notification = current_time + + if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT: + logging.debug("Moving from inotify mode to polling mode") + self.polling = True + if notifier_enabled: + self.inotify_handler.disable() + + if not self.polling and not notifier_enabled: + try: + self.inotify_handler.enable() + except errors.InotifyError: + self.polling = True - def reload_config(self): try: - reloaded = self.reader.Reload() + reloaded = self.processor.reader.Reload() if reloaded: logging.info("Reloaded ganeti config") else: logging.debug("Skipped double config reload") except errors.ConfigurationError: - # transform a ConfigurationError in a fatal error, that will cause confd - # to quit. - raise errors.ConfdFatalError(err) - - def process_IN_IGNORED(self, event): - # Due to the fact that we monitor just for the cluster config file (rather - # than for the whole data dir) when the file is replaced with another one - # (which is what happens normally in ganeti) we're going to receive an - # IN_IGNORED event from inotify, because of the file removal (which is - # contextual with the replacement). In such a case we need to create - # another watcher for the "new" file. - logging.debug("Received 'ignored' inotify event for %s" % event.path) - self.watch_handle = None + self.DisableConfd() + self.inotify_handler.disable() + return - try: - # Since the kernel believes the file we were interested in is gone, it's - # not going to notify us of any other events, until we set up, here, the - # new watch. This is not a race condition, though, since we're anyway - # going to realod the file after setting up the new watch. - self.enable() - self.reload_config() - except errors.ConfdFatalError, err: - logging.critical("Critical error, shutting down: %s" % err) - sys.exit(constants.EXIT_FAILURE) - except: - # we need to catch any exception here, log it, but proceed, because even - # if we failed handling a single request, we still want the confd to - # continue working. - logging.error("Unexpected exception", exc_info=True) - - def process_IN_MODIFY(self, event): - # This gets called when the config file is modified. Note that this doesn't - # usually happen in Ganeti, as the config file is normally replaced by a - # new one, at filesystem level, rather than actually modified (see - # utils.WriteFile) - logging.debug("Received 'modify' inotify event for %s" % event.path) + # Reset the timer. If we're polling it will go to the polling rate, if + # we're not it will delay it again to its base safe timeout. + self._ResetTimer() - try: - self.reload_config() - except errors.ConfdFatalError, err: - logging.critical("Critical error, shutting down: %s" % err) - sys.exit(constants.EXIT_FAILURE) - except: - # we need to catch any exception here, log it, but proceed, because even - # if we failed handling a single request, we still want the confd to - # continue working. - logging.error("Unexpected exception", exc_info=True) + def _DisableTimer(self): + if self.timer_handle is not None: + self.mainloop.scheduler.cancel(self.timer_handle) + self.timer_handle = None - def process_default(self, event): - logging.error("Received unhandled inotify event: %s" % event) + def _EnableTimer(self): + if self.polling: + timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT + else: + timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT + if self.timer_handle is None: + self.timer_handle = self.mainloop.scheduler.enter( + timeout, 1, self.OnTimer, []) -class ConfdConfigurationReloader(object): - """Logic to control when to reload the ganeti configuration + def _ResetTimer(self): + self._DisableTimer() + self._EnableTimer() - This class is able to alter between inotify and polling, to rate-limit the - number of reloads. When using inotify it also supports a fallback timed - check, to verify that the reload hasn't failed. - - """ - def __init__(self, reader): - """Constructor for ConfdConfigurationReloader + def OnTimer(self): + """Function called when the timer fires - @type reader: L{ssconf.SimpleConfigReader} - @param reader: ganeti-confd SimpleConfigReader + """ + self.timer_handle = None + reloaded = False + was_disabled = False + try: + if self.processor.reader is None: + was_disabled = True + self.EnableConfd() + reloaded = True + else: + reloaded = self.processor.reader.Reload() + except errors.ConfigurationError: + self.DisableConfd(silent=was_disabled) + return + + if self.polling and reloaded: + logging.info("Reloaded ganeti config") + elif reloaded: + # We have reloaded the config files, but received no inotify event. If + # an event is pending though, we just happen to have timed out before + # receiving it, so this is not a problem, and we shouldn't alert + if not self.notifier.check_events() and not was_disabled: + logging.warning("Config file reload at timeout (inotify failure)") + elif self.polling: + # We're polling, but we haven't reloaded the config: + # Going back to inotify mode + logging.debug("Moving from polling mode to inotify mode") + self.polling = False + try: + self.inotify_handler.enable() + except errors.InotifyError: + self.polling = True + else: + logging.debug("Performed configuration check") + + self._EnableTimer() + + def DisableConfd(self, silent=False): + """Puts confd in non-serving mode """ - self.reader = reader + if not silent: + logging.warning("Confd is being disabled") + self.processor.Disable() + self.polling = False + self._ResetTimer() - # Asyncronous inotify handler for config changes - self.wm = pyinotify.WatchManager() - self.inotify_handler = ConfdInotifyEventHandler(self.wm, reader) - self.notifier = AsyncNotifier(self.wm, self.confd_event_handler) + def EnableConfd(self): + self.processor.Enable() + logging.warning("Confd is being enabled") + self.polling = True + self._ResetTimer() -def CheckConfd(options, args): +def CheckConfd(_, args): """Initial checks whether to run exit with a failure. """ + if args: # confd doesn't take any arguments + print >> sys.stderr, ("Usage: %s [-f] [-d] [-b ADDRESS]" % sys.argv[0]) + sys.exit(constants.EXIT_FAILURE) + # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll # have more than one. - if not os.path.isfile(constants.HMAC_CLUSTER_KEY): - print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY + if not os.path.isfile(constants.CONFD_HMAC_KEY): + print >> sys.stderr, "Need HMAC key %s to run" % constants.CONFD_HMAC_KEY sys.exit(constants.EXIT_FAILURE) - ssconf.CheckMasterCandidate(options.debug) + # TODO: once we have a cluster param specifying the address family + # preference, we need to check if the requested options.bind_address does not + # conflict with that. If so, we might warn or EXIT_FAILURE. -def ExecConfd(options, args): +def ExecConfd(options, _): """Main confd function, executed with PID file held """ + # TODO: clarify how the server and reloader variables work (they are + # not used) + # pylint: disable-msg=W0612 mainloop = daemon.Mainloop() - # confd-level SimpleConfigReader - reader = ssconf.SimpleConfigReader() - # Asyncronous confd UDP server - processor = ConfdProcessor(reader) + processor = confd_server.ConfdProcessor() + try: + processor.Enable() + except errors.ConfigurationError: + # If enabling the processor has failed, we can still go on, but confd will + # be disabled + logging.warning("Confd is starting in disabled mode") + server = ConfdAsyncUDPServer(options.bind_address, options.port, processor) # Configuration reloader - reloader = ConfdConfigurationReloader(reader) + reloader = ConfdConfigurationReloader(processor, mainloop) mainloop.Run() @@ -261,7 +294,6 @@ def main(): constants.RELEASE_VERSION) dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS] - dirs.append((constants.LOG_OS_DIR, 0750)) dirs.append((constants.LOCK_DIR, 1777)) daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)