#!/usr/bin/python # # Copyright (C) 2009, Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. """Ganeti configuration daemon Ganeti-confd is a daemon to query master candidates for configuration values. It uses UDP+HMAC for authentication with a global cluster key. """ import os import sys import logging import asyncore import socket import pyinotify import time import errno from optparse import OptionParser from ganeti import constants from ganeti import errors from ganeti import daemon from ganeti import ssconf from ganeti.asyncnotifier import AsyncNotifier from ganeti.confd.server import ConfdProcessor class ConfdAsyncUDPServer(daemon.AsyncUDPSocket): """The confd udp server, suitable for use with asyncore. """ def __init__(self, bind_address, port, processor): """Constructor for ConfdAsyncUDPServer @type bind_address: string @param bind_address: socket bind address ('' for all) @type port: int @param port: udp port @type processor: L{confd.server.ConfdProcessor} @param processor: ConfdProcessor to use to handle queries """ daemon.AsyncUDPSocket.__init__(self) self.bind_address = bind_address self.port = port self.processor = processor self.bind((bind_address, port)) logging.debug("listening on ('%s':%d)" % (bind_address, port)) # this method is overriding a daemon.AsyncUDPSocket method def handle_datagram(self, payload_in, ip, port): if len(payload_in) < len(constants.CONFD_MAGIC_FOURCC): logging.debug("Received a query which is too short to be true") return magic_number = payload_in[:4] query = payload_in[4:] if magic_number != constants.CONFD_MAGIC_FOURCC: logging.debug("Received a query with an unknown magic number") return answer = self.processor.ExecQuery(query, ip, port) if answer is not None: payload_out = ''.join([constants.CONFD_MAGIC_FOURCC, answer]) try: self.enqueue_send(ip, port, payload_out) except errors.UdpDataSizeError: logging.error("Reply too big to fit in an udp packet.") class ConfdInotifyEventHandler(pyinotify.ProcessEvent): def __init__(self, watch_manager, callback, file=constants.CLUSTER_CONF_FILE): """Constructor for ConfdInotifyEventHandler @type watch_manager: L{pyinotify.WatchManager} @param watch_manager: ganeti-confd inotify watch manager @type callback: function accepting a boolean @param callback: function to call when an inotify event happens @type file: string @param file: config file to watch """ # no need to call the parent's constructor self.watch_manager = watch_manager self.callback = callback self.mask = pyinotify.EventsCodes.IN_IGNORED | \ pyinotify.EventsCodes.IN_MODIFY self.file = file self.watch_handle = None def enable(self): """Watch the given file """ if self.watch_handle is None: result = self.watch_manager.add_watch(self.file, self.mask) if not self.file in result or result[self.file] <= 0: raise errors.InotifyError("Could not add inotify watcher") else: self.watch_handle = result[self.file] def disable(self): """Stop watching the given file """ if self.watch_handle is not None: result = self.watch_manager.rm_watch(self.watch_handle) if result[self.watch_handle]: self.watch_handle = None def process_IN_IGNORED(self, event): # Due to the fact that we monitor just for the cluster config file (rather # than for the whole data dir) when the file is replaced with another one # (which is what happens normally in ganeti) we're going to receive an # IN_IGNORED event from inotify, because of the file removal (which is # contextual with the replacement). In such a case we need to create # another watcher for the "new" file. logging.debug("Received 'ignored' inotify event for %s" % event.path) self.watch_handle = None try: # Since the kernel believes the file we were interested in is gone, it's # not going to notify us of any other events, until we set up, here, the # new watch. This is not a race condition, though, since we're anyway # going to realod the file after setting up the new watch. self.callback(False) except errors.ConfdFatalError, err: logging.critical("Critical error, shutting down: %s" % err) sys.exit(constants.EXIT_FAILURE) except: # we need to catch any exception here, log it, but proceed, because even # if we failed handling a single request, we still want the confd to # continue working. logging.error("Unexpected exception", exc_info=True) def process_IN_MODIFY(self, event): # This gets called when the config file is modified. Note that this doesn't # usually happen in Ganeti, as the config file is normally replaced by a # new one, at filesystem level, rather than actually modified (see # utils.WriteFile) logging.debug("Received 'modify' inotify event for %s" % event.path) try: self.callback(True) except errors.ConfdFatalError, err: logging.critical("Critical error, shutting down: %s" % err) sys.exit(constants.EXIT_FAILURE) except: # we need to catch any exception here, log it, but proceed, because even # if we failed handling a single request, we still want the confd to # continue working. logging.error("Unexpected exception", exc_info=True) def process_default(self, event): logging.error("Received unhandled inotify event: %s" % event) class ConfdConfigurationReloader(object): """Logic to control when to reload the ganeti configuration This class is able to alter between inotify and polling, to rate-limit the number of reloads. When using inotify it also supports a fallback timed check, to verify that the reload hasn't failed. """ def __init__(self, processor, mainloop): """Constructor for ConfdConfigurationReloader @type processor: L{confd.server.ConfdProcessor} @param processor: ganeti-confd ConfdProcessor @type mainloop: L{daemon.Mainloop} @param mainloop: ganeti-confd mainloop """ self.processor = processor self.mainloop = mainloop self.polling = True self.last_notification = 0 # Asyncronous inotify handler for config changes self.wm = pyinotify.WatchManager() self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify) self.notifier = AsyncNotifier(self.wm, self.inotify_handler) self.timer_handle = None self._EnableTimer() def OnInotify(self, notifier_enabled): """Receive an inotify notification. @type notifier_enabled: boolean @param notifier_enabled: whether the notifier is still enabled """ current_time = time.time() time_delta = current_time - self.last_notification self.last_notification = current_time if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT: logging.debug("Moving from inotify mode to polling mode") self.polling = True if notifier_enabled: self.inotify_handler.disable() if not self.polling and not notifier_enabled: try: self.inotify_handler.enable() except errors.InotifyError: self.polling = True try: reloaded = self.processor.reader.Reload() if reloaded: logging.info("Reloaded ganeti config") else: logging.debug("Skipped double config reload") except errors.ConfigurationError: self.DisableConfd() self.inotify_handler.disable() return # Reset the timer. If we're polling it will go to the polling rate, if # we're not it will delay it again to its base safe timeout. self._ResetTimer() def _DisableTimer(self): if self.timer_handle is not None: self.mainloop.scheduler.cancel(self.timer_handle) self.timer_handle = None def _EnableTimer(self): if self.polling: timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT else: timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT if self.timer_handle is None: self.timer_handle = self.mainloop.scheduler.enter( timeout, 1, self.OnTimer, []) def _ResetTimer(self): self._DisableTimer() self._EnableTimer() def OnTimer(self): """Function called when the timer fires """ self.timer_handle = None reloaded = False was_disabled = False try: if self.processor.reader is None: was_disabled = True self.EnableConfd() reloaded = True else: reloaded = self.processor.reader.Reload() except errors.ConfigurationError: self.DisableConfd(silent=was_disabled) return if self.polling and reloaded: logging.info("Reloaded ganeti config") elif reloaded: # We have reloaded the config files, but received no inotify event. If # an event is pending though, we just happen to have timed out before # receiving it, so this is not a problem, and we shouldn't alert if not self.notifier.check_events() and not was_disabled: logging.warning("Config file reload at timeout (inotify failure)") elif self.polling: # We're polling, but we haven't reloaded the config: # Going back to inotify mode logging.debug("Moving from polling mode to inotify mode") self.polling = False try: self.inotify_handler.enable() except errors.InotifyError: self.polling = True else: logging.debug("Performed configuration check") self._EnableTimer() def DisableConfd(self, silent=False): """Puts confd in non-serving mode """ if not silent: logging.warning("Confd is being disabled") self.processor.Disable() self.polling = False self._ResetTimer() def EnableConfd(self): self.processor.Enable() logging.warning("Confd is being enabled") self.polling = True self._ResetTimer() def CheckConfd(options, args): """Initial checks whether to run exit with a failure. """ # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll # have more than one. if not os.path.isfile(constants.HMAC_CLUSTER_KEY): print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY sys.exit(constants.EXIT_FAILURE) def ExecConfd(options, args): """Main confd function, executed with PID file held """ mainloop = daemon.Mainloop() # Asyncronous confd UDP server processor = ConfdProcessor() try: processor.Enable() except errors.ConfigurationError: # If enabling the processor has failed, we can still go on, but confd will # be disabled logging.warning("Confd is starting in disabled mode") pass server = ConfdAsyncUDPServer(options.bind_address, options.port, processor) # Configuration reloader reloader = ConfdConfigurationReloader(processor, mainloop) mainloop.Run() def main(): """Main function for the confd daemon. """ parser = OptionParser(description="Ganeti configuration daemon", usage="%prog [-f] [-d] [-b ADDRESS]", version="%%prog (ganeti) %s" % constants.RELEASE_VERSION) dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS] dirs.append((constants.LOG_OS_DIR, 0750)) dirs.append((constants.LOCK_DIR, 1777)) daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd) if __name__ == "__main__": main()