X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/ef4ca33b77b510abfa77192bdc7af5500b3f08cc..fe759e4c195bce5bef72068075a8710c32b20909:/daemons/ganeti-confd diff --git a/daemons/ganeti-confd b/daemons/ganeti-confd index 34d08f3..c08426e 100755 --- a/daemons/ganeti-confd +++ b/daemons/ganeti-confd @@ -32,6 +32,8 @@ import logging import asyncore import socket import pyinotify +import time +import errno from optparse import OptionParser @@ -43,7 +45,7 @@ from ganeti.asyncnotifier import AsyncNotifier from ganeti.confd.server import ConfdProcessor -class ConfdAsyncUDPServer(asyncore.dispatcher): +class ConfdAsyncUDPServer(daemon.AsyncUDPSocket): """The confd udp server, suitable for use with asyncore. """ @@ -55,42 +57,21 @@ class ConfdAsyncUDPServer(asyncore.dispatcher): @type port: int @param port: udp port @type processor: L{confd.server.ConfdProcessor} - @param reader: ConfigReader to use to access the config + @param processor: ConfdProcessor to use to handle queries """ - asyncore.dispatcher.__init__(self) + daemon.AsyncUDPSocket.__init__(self) self.bind_address = bind_address self.port = port self.processor = processor - self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) self.bind((bind_address, port)) logging.debug("listening on ('%s':%d)" % (bind_address, port)) - # this method is overriding an asyncore.dispatcher method - def handle_connect(self): - # Python thinks that the first udp message from a source qualifies as a - # "connect" and further ones are part of the same connection. We beg to - # differ and treat all messages equally. - pass - - # this method is overriding an asyncore.dispatcher method - def handle_read(self): - try: - payload_in, address = self.recvfrom(4096) - ip, port = address - payload_out = self.processor.ExecQuery(payload_in, ip, port) - if payload_out is not None: - self.sendto(payload_out, 0, (ip, port)) - except: - # we need to catch any exception here, log it, but proceed, because even - # if we failed handling a single request, we still want the confd to - # continue working. - logging.error("Unexpected exception", exc_info=True) - - # this method is overriding an asyncore.dispatcher method - def writable(self): - # No need to check if we can write to the UDP socket - return False + # this method is overriding a daemon.AsyncUDPSocket method + def handle_datagram(self, payload_in, ip, port): + payload_out = self.processor.ExecQuery(payload_in, ip, port) + if payload_out is not None: + self.enqueue_send(ip, port, payload_out) class ConfdInotifyEventHandler(pyinotify.ProcessEvent): @@ -114,7 +95,6 @@ class ConfdInotifyEventHandler(pyinotify.ProcessEvent): pyinotify.EventsCodes.IN_MODIFY self.file = file self.watch_handle = None - self.enable() def enable(self): """Watch the given file @@ -191,20 +171,29 @@ class ConfdConfigurationReloader(object): check, to verify that the reload hasn't failed. """ - def __init__(self, reader): + def __init__(self, processor, mainloop): """Constructor for ConfdConfigurationReloader - @type reader: L{ssconf.SimpleConfigReader} - @param reader: ganeti-confd SimpleConfigReader + @type processor: L{confd.server.ConfdProcessor} + @param processor: ganeti-confd ConfdProcessor + @type mainloop: L{daemon.Mainloop} + @param mainloop: ganeti-confd mainloop """ - self.reader = reader + self.processor = processor + self.mainloop = mainloop + + self.polling = True + self.last_notification = 0 # Asyncronous inotify handler for config changes self.wm = pyinotify.WatchManager() self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify) self.notifier = AsyncNotifier(self.wm, self.inotify_handler) + self.timer_handle = None + self._EnableTimer() + def OnInotify(self, notifier_enabled): """Receive an inotify notification. @@ -212,23 +201,111 @@ class ConfdConfigurationReloader(object): @param notifier_enabled: whether the notifier is still enabled """ - if not notifier_enabled: + current_time = time.time() + time_delta = current_time - self.last_notification + self.last_notification = current_time + + if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT: + logging.debug("Moving from inotify mode to polling mode") + self.polling = True + if notifier_enabled: + self.inotify_handler.disable() + + if not self.polling and not notifier_enabled: try: self.inotify_handler.enable() except errors.InotifyError: - raise errors.ConfdFatalError(err) + self.polling = True try: - reloaded = self.reader.Reload() + reloaded = self.processor.reader.Reload() if reloaded: logging.info("Reloaded ganeti config") else: logging.debug("Skipped double config reload") except errors.ConfigurationError: - # transform a ConfigurationError in a fatal error, that will cause confd - # to quit. - raise errors.ConfdFatalError(err) + self.DisableConfd() + self.inotify_handler.disable() + return + + # Reset the timer. If we're polling it will go to the polling rate, if + # we're not it will delay it again to its base safe timeout. + self._ResetTimer() + + def _DisableTimer(self): + if self.timer_handle is not None: + self.mainloop.scheduler.cancel(self.timer_handle) + self.timer_handle = None + + def _EnableTimer(self): + if self.polling: + timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT + else: + timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT + + if self.timer_handle is None: + self.timer_handle = self.mainloop.scheduler.enter( + timeout, 1, self.OnTimer, []) + + def _ResetTimer(self): + self._DisableTimer() + self._EnableTimer() + + def OnTimer(self): + """Function called when the timer fires + + """ + self.timer_handle = None + reloaded = False + was_disabled = False + try: + if self.processor.reader is None: + was_disabled = True + self.EnableConfd() + reloaded = True + else: + reloaded = self.processor.reader.Reload() + except errors.ConfigurationError: + self.DisableConfd(silent=was_disabled) + return + + if self.polling and reloaded: + logging.info("Reloaded ganeti config") + elif reloaded: + # We have reloaded the config files, but received no inotify event. If + # an event is pending though, we just happen to have timed out before + # receiving it, so this is not a problem, and we shouldn't alert + if not self.notifier.check_events() and not was_disabled: + logging.warning("Config file reload at timeout (inotify failure)") + elif self.polling: + # We're polling, but we haven't reloaded the config: + # Going back to inotify mode + logging.debug("Moving from polling mode to inotify mode") + self.polling = False + try: + self.inotify_handler.enable() + except errors.InotifyError: + self.polling = True + else: + logging.debug("Performed configuration check") + + self._EnableTimer() + def DisableConfd(self, silent=False): + """Puts confd in non-serving mode + + """ + if not silent: + logging.warning("Confd is being disabled") + self.processor.Disable() + self.polling = False + self._ResetTimer() + + def EnableConfd(self): + self.processor.Enable() + logging.warning("Confd is being enabled") + self.polling = True + self._ResetTimer() def CheckConfd(options, args): @@ -241,8 +318,6 @@ def CheckConfd(options, args): print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY sys.exit(constants.EXIT_FAILURE) - ssconf.CheckMasterCandidate(options.debug) - def ExecConfd(options, args): """Main confd function, executed with PID file held @@ -250,15 +325,19 @@ def ExecConfd(options, args): """ mainloop = daemon.Mainloop() - # confd-level SimpleConfigReader - reader = ssconf.SimpleConfigReader() - # Asyncronous confd UDP server - processor = ConfdProcessor(reader) + processor = ConfdProcessor() + try: + processor.Enable() + except errors.ConfigurationError: + # If enabling the processor has failed, we can still go on, but confd will + # be disabled + logging.warning("Confd is starting in disabled mode") + pass server = ConfdAsyncUDPServer(options.bind_address, options.port, processor) # Configuration reloader - reloader = ConfdConfigurationReloader(reader) + reloader = ConfdConfigurationReloader(processor, mainloop) mainloop.Run()