X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/6c948699089dcd61ffbe772f1ee51a6aa2f4c7f1..ad54f3d2e3a370a1b8db3d6fb0ecbebcf0f37f88:/daemons/ganeti-confd diff --git a/daemons/ganeti-confd b/daemons/ganeti-confd index d86cb68..b3cf688 100755 --- a/daemons/ganeti-confd +++ b/daemons/ganeti-confd @@ -29,21 +29,25 @@ It uses UDP+HMAC for authentication with a global cluster key. import os import sys import logging -import asyncore -import socket -import pyinotify +import time + +try: + from pyinotify import pyinotify +except ImportError: + import pyinotify from optparse import OptionParser +from ganeti import asyncnotifier +from ganeti import confd +from ganeti.confd import server as confd_server from ganeti import constants from ganeti import errors from ganeti import daemon from ganeti import ssconf -from ganeti.asyncnotifier import AsyncNotifier -from ganeti.confd.server import ConfdProcessor -class ConfdAsyncUDPServer(asyncore.dispatcher): +class ConfdAsyncUDPServer(daemon.AsyncUDPSocket): """The confd udp server, suitable for use with asyncore. """ @@ -55,85 +59,73 @@ class ConfdAsyncUDPServer(asyncore.dispatcher): @type port: int @param port: udp port @type processor: L{confd.server.ConfdProcessor} - @param reader: ConfigReader to use to access the config + @param processor: ConfdProcessor to use to handle queries """ - asyncore.dispatcher.__init__(self) + daemon.AsyncUDPSocket.__init__(self) self.bind_address = bind_address self.port = port self.processor = processor - self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) self.bind((bind_address, port)) logging.debug("listening on ('%s':%d)" % (bind_address, port)) - # this method is overriding an asyncore.dispatcher method - def handle_connect(self): - # Python thinks that the first udp message from a source qualifies as a - # "connect" and further ones are part of the same connection. We beg to - # differ and treat all messages equally. - pass - - # this method is overriding an asyncore.dispatcher method - def handle_read(self): + # this method is overriding a daemon.AsyncUDPSocket method + def handle_datagram(self, payload_in, ip, port): try: - payload_in, address = self.recvfrom(4096) - ip, port = address - payload_out = self.processor.ExecQuery(payload_in, ip, port) - if payload_out is not None: - self.sendto(payload_out, 0, (ip, port)) - except: - # we need to catch any exception here, log it, but proceed, because even - # if we failed handling a single request, we still want the confd to - # continue working. - logging.error("Unexpected exception", exc_info=True) + query = confd.UnpackMagic(payload_in) + except errors.ConfdMagicError, err: + logging.debug(err) + return - # this method is overriding an asyncore.dispatcher method - def writable(self): - # No need to check if we can write to the UDP socket - return False + answer = self.processor.ExecQuery(query, ip, port) + if answer is not None: + try: + self.enqueue_send(ip, port, confd.PackMagic(answer)) + except errors.UdpDataSizeError: + logging.error("Reply too big to fit in an udp packet.") class ConfdInotifyEventHandler(pyinotify.ProcessEvent): - def __init__(self, watch_manager, reader, + def __init__(self, watch_manager, callback, file=constants.CLUSTER_CONF_FILE): """Constructor for ConfdInotifyEventHandler @type watch_manager: L{pyinotify.WatchManager} @param watch_manager: ganeti-confd inotify watch manager - @type reader: L{ssconf.SimpleConfigReader} - @param reader: ganeti-confd SimpleConfigReader + @type callback: function accepting a boolean + @param callback: function to call when an inotify event happens @type file: string @param file: config file to watch """ # no need to call the parent's constructor self.watch_manager = watch_manager - self.reader = reader + self.callback = callback self.mask = pyinotify.EventsCodes.IN_IGNORED | \ pyinotify.EventsCodes.IN_MODIFY self.file = file - self.add_config_watch() + self.watch_handle = None - def add_config_watch(self): - """Add a watcher for the ganeti config file + def enable(self): + """Watch the given file """ - result = self.watch_manager.add_watch(self.file, self.mask) - if not result[self.file] > 0: - raise errors.ConfdFatalError("Could not add inotify watcher") - - def reload_config(self): - try: - reloaded = self.reader.Reload() - if reloaded: - logging.info("Reloaded ganeti config") + if self.watch_handle is None: + result = self.watch_manager.add_watch(self.file, self.mask) + if not self.file in result or result[self.file] <= 0: + raise errors.InotifyError("Could not add inotify watcher") else: - logging.debug("Skipped double config reload") - except errors.ConfigurationError: - # transform a ConfigurationError in a fatal error, that will cause confd - # to quit. - raise errors.ConfdFatalError(err) + self.watch_handle = result[self.file] + + def disable(self): + """Stop watching the given file + + """ + if self.watch_handle is not None: + result = self.watch_manager.rm_watch(self.watch_handle) + if result[self.watch_handle]: + self.watch_handle = None def process_IN_IGNORED(self, event): # Due to the fact that we monitor just for the cluster config file (rather @@ -143,14 +135,14 @@ class ConfdInotifyEventHandler(pyinotify.ProcessEvent): # contextual with the replacement). In such a case we need to create # another watcher for the "new" file. logging.debug("Received 'ignored' inotify event for %s" % event.path) + self.watch_handle = None try: # Since the kernel believes the file we were interested in is gone, it's # not going to notify us of any other events, until we set up, here, the # new watch. This is not a race condition, though, since we're anyway # going to realod the file after setting up the new watch. - self.add_config_watch() - self.reload_config() + self.callback(False) except errors.ConfdFatalError, err: logging.critical("Critical error, shutting down: %s" % err) sys.exit(constants.EXIT_FAILURE) @@ -168,7 +160,7 @@ class ConfdInotifyEventHandler(pyinotify.ProcessEvent): logging.debug("Received 'modify' inotify event for %s" % event.path) try: - self.reload_config() + self.callback(True) except errors.ConfdFatalError, err: logging.critical("Critical error, shutting down: %s" % err) sys.exit(constants.EXIT_FAILURE) @@ -182,6 +174,151 @@ class ConfdInotifyEventHandler(pyinotify.ProcessEvent): logging.error("Received unhandled inotify event: %s" % event) +class ConfdConfigurationReloader(object): + """Logic to control when to reload the ganeti configuration + + This class is able to alter between inotify and polling, to rate-limit the + number of reloads. When using inotify it also supports a fallback timed + check, to verify that the reload hasn't failed. + + """ + def __init__(self, processor, mainloop): + """Constructor for ConfdConfigurationReloader + + @type processor: L{confd.server.ConfdProcessor} + @param processor: ganeti-confd ConfdProcessor + @type mainloop: L{daemon.Mainloop} + @param mainloop: ganeti-confd mainloop + + """ + self.processor = processor + self.mainloop = mainloop + + self.polling = True + self.last_notification = 0 + + # Asyncronous inotify handler for config changes + self.wm = pyinotify.WatchManager() + self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify) + self.notifier = asyncnotifier.AsyncNotifier(self.wm, self.inotify_handler) + + self.timer_handle = None + self._EnableTimer() + + def OnInotify(self, notifier_enabled): + """Receive an inotify notification. + + @type notifier_enabled: boolean + @param notifier_enabled: whether the notifier is still enabled + + """ + current_time = time.time() + time_delta = current_time - self.last_notification + self.last_notification = current_time + + if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT: + logging.debug("Moving from inotify mode to polling mode") + self.polling = True + if notifier_enabled: + self.inotify_handler.disable() + + if not self.polling and not notifier_enabled: + try: + self.inotify_handler.enable() + except errors.InotifyError: + self.polling = True + + try: + reloaded = self.processor.reader.Reload() + if reloaded: + logging.info("Reloaded ganeti config") + else: + logging.debug("Skipped double config reload") + except errors.ConfigurationError: + self.DisableConfd() + self.inotify_handler.disable() + return + + # Reset the timer. If we're polling it will go to the polling rate, if + # we're not it will delay it again to its base safe timeout. + self._ResetTimer() + + def _DisableTimer(self): + if self.timer_handle is not None: + self.mainloop.scheduler.cancel(self.timer_handle) + self.timer_handle = None + + def _EnableTimer(self): + if self.polling: + timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT + else: + timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT + + if self.timer_handle is None: + self.timer_handle = self.mainloop.scheduler.enter( + timeout, 1, self.OnTimer, []) + + def _ResetTimer(self): + self._DisableTimer() + self._EnableTimer() + + def OnTimer(self): + """Function called when the timer fires + + """ + self.timer_handle = None + reloaded = False + was_disabled = False + try: + if self.processor.reader is None: + was_disabled = True + self.EnableConfd() + reloaded = True + else: + reloaded = self.processor.reader.Reload() + except errors.ConfigurationError: + self.DisableConfd(silent=was_disabled) + return + + if self.polling and reloaded: + logging.info("Reloaded ganeti config") + elif reloaded: + # We have reloaded the config files, but received no inotify event. If + # an event is pending though, we just happen to have timed out before + # receiving it, so this is not a problem, and we shouldn't alert + if not self.notifier.check_events() and not was_disabled: + logging.warning("Config file reload at timeout (inotify failure)") + elif self.polling: + # We're polling, but we haven't reloaded the config: + # Going back to inotify mode + logging.debug("Moving from polling mode to inotify mode") + self.polling = False + try: + self.inotify_handler.enable() + except errors.InotifyError: + self.polling = True + else: + logging.debug("Performed configuration check") + + self._EnableTimer() + + def DisableConfd(self, silent=False): + """Puts confd in non-serving mode + + """ + if not silent: + logging.warning("Confd is being disabled") + self.processor.Disable() + self.polling = False + self._ResetTimer() + + def EnableConfd(self): + self.processor.Enable() + logging.warning("Confd is being enabled") + self.polling = True + self._ResetTimer() + + def CheckConfd(options, args): """Initial checks whether to run exit with a failure. @@ -192,8 +329,6 @@ def CheckConfd(options, args): print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY sys.exit(constants.EXIT_FAILURE) - ssconf.CheckMasterCandidate(options.debug) - def ExecConfd(options, args): """Main confd function, executed with PID file held @@ -201,17 +336,19 @@ def ExecConfd(options, args): """ mainloop = daemon.Mainloop() - # confd-level SimpleConfigReader - reader = ssconf.SimpleConfigReader() - # Asyncronous confd UDP server - processor = ConfdProcessor(reader) + processor = confd_server.ConfdProcessor() + try: + processor.Enable() + except errors.ConfigurationError: + # If enabling the processor has failed, we can still go on, but confd will + # be disabled + logging.warning("Confd is starting in disabled mode") + pass server = ConfdAsyncUDPServer(options.bind_address, options.port, processor) - # Asyncronous inotify handler for config changes - wm = pyinotify.WatchManager() - confd_event_handler = ConfdInotifyEventHandler(wm, reader) - notifier = AsyncNotifier(wm, confd_event_handler) + # Configuration reloader + reloader = ConfdConfigurationReloader(processor, mainloop) mainloop.Run() @@ -226,7 +363,6 @@ def main(): constants.RELEASE_VERSION) dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS] - dirs.append((constants.LOG_OS_DIR, 0750)) dirs.append((constants.LOCK_DIR, 1777)) daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)