4 # Copyright (C) 2009, Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti configuration daemon
24 Ganeti-confd is a daemon to query master candidates for configuration values.
25 It uses UDP+HMAC for authentication with a global cluster key.
38 from optparse import OptionParser
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import daemon
43 from ganeti import ssconf
44 from ganeti.asyncnotifier import AsyncNotifier
45 from ganeti.confd.server import ConfdProcessor
48 class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
49 """The confd udp server, suitable for use with asyncore.
52 def __init__(self, bind_address, port, processor):
53 """Constructor for ConfdAsyncUDPServer
55 @type bind_address: string
56 @param bind_address: socket bind address ('' for all)
59 @type processor: L{confd.server.ConfdProcessor}
60 @param processor: ConfdProcessor to use to handle queries
63 daemon.AsyncUDPSocket.__init__(self)
64 self.bind_address = bind_address
66 self.processor = processor
67 self.bind((bind_address, port))
68 logging.debug("listening on ('%s':%d)" % (bind_address, port))
70 # this method is overriding a daemon.AsyncUDPSocket method
71 def handle_datagram(self, payload_in, ip, port):
73 if len(payload_in) < len(constants.CONFD_MAGIC_FOURCC):
74 logging.debug("Received a query which is too short to be true")
77 magic_number = payload_in[:4]
78 query = payload_in[4:]
80 if magic_number != constants.CONFD_MAGIC_FOURCC:
81 logging.debug("Received a query with an unknown magic number")
84 answer = self.processor.ExecQuery(query, ip, port)
85 if answer is not None:
86 payload_out = ''.join([constants.CONFD_MAGIC_FOURCC, answer])
88 self.enqueue_send(ip, port, payload_out)
89 except errors.UdpDataSizeError:
90 logging.error("Reply too big to fit in an udp packet.")
93 class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
95 def __init__(self, watch_manager, callback,
96 file=constants.CLUSTER_CONF_FILE):
97 """Constructor for ConfdInotifyEventHandler
99 @type watch_manager: L{pyinotify.WatchManager}
100 @param watch_manager: ganeti-confd inotify watch manager
101 @type callback: function accepting a boolean
102 @param callback: function to call when an inotify event happens
104 @param file: config file to watch
107 # no need to call the parent's constructor
108 self.watch_manager = watch_manager
109 self.callback = callback
110 self.mask = pyinotify.EventsCodes.IN_IGNORED | \
111 pyinotify.EventsCodes.IN_MODIFY
113 self.watch_handle = None
116 """Watch the given file
119 if self.watch_handle is None:
120 result = self.watch_manager.add_watch(self.file, self.mask)
121 if not self.file in result or result[self.file] <= 0:
122 raise errors.InotifyError("Could not add inotify watcher")
124 self.watch_handle = result[self.file]
127 """Stop watching the given file
130 if self.watch_handle is not None:
131 result = self.watch_manager.rm_watch(self.watch_handle)
132 if result[self.watch_handle]:
133 self.watch_handle = None
135 def process_IN_IGNORED(self, event):
136 # Due to the fact that we monitor just for the cluster config file (rather
137 # than for the whole data dir) when the file is replaced with another one
138 # (which is what happens normally in ganeti) we're going to receive an
139 # IN_IGNORED event from inotify, because of the file removal (which is
140 # contextual with the replacement). In such a case we need to create
141 # another watcher for the "new" file.
142 logging.debug("Received 'ignored' inotify event for %s" % event.path)
143 self.watch_handle = None
146 # Since the kernel believes the file we were interested in is gone, it's
147 # not going to notify us of any other events, until we set up, here, the
148 # new watch. This is not a race condition, though, since we're anyway
149 # going to realod the file after setting up the new watch.
151 except errors.ConfdFatalError, err:
152 logging.critical("Critical error, shutting down: %s" % err)
153 sys.exit(constants.EXIT_FAILURE)
155 # we need to catch any exception here, log it, but proceed, because even
156 # if we failed handling a single request, we still want the confd to
158 logging.error("Unexpected exception", exc_info=True)
160 def process_IN_MODIFY(self, event):
161 # This gets called when the config file is modified. Note that this doesn't
162 # usually happen in Ganeti, as the config file is normally replaced by a
163 # new one, at filesystem level, rather than actually modified (see
165 logging.debug("Received 'modify' inotify event for %s" % event.path)
169 except errors.ConfdFatalError, err:
170 logging.critical("Critical error, shutting down: %s" % err)
171 sys.exit(constants.EXIT_FAILURE)
173 # we need to catch any exception here, log it, but proceed, because even
174 # if we failed handling a single request, we still want the confd to
176 logging.error("Unexpected exception", exc_info=True)
178 def process_default(self, event):
179 logging.error("Received unhandled inotify event: %s" % event)
182 class ConfdConfigurationReloader(object):
183 """Logic to control when to reload the ganeti configuration
185 This class is able to alter between inotify and polling, to rate-limit the
186 number of reloads. When using inotify it also supports a fallback timed
187 check, to verify that the reload hasn't failed.
190 def __init__(self, processor, mainloop):
191 """Constructor for ConfdConfigurationReloader
193 @type processor: L{confd.server.ConfdProcessor}
194 @param processor: ganeti-confd ConfdProcessor
195 @type mainloop: L{daemon.Mainloop}
196 @param mainloop: ganeti-confd mainloop
199 self.processor = processor
200 self.mainloop = mainloop
203 self.last_notification = 0
205 # Asyncronous inotify handler for config changes
206 self.wm = pyinotify.WatchManager()
207 self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
208 self.notifier = AsyncNotifier(self.wm, self.inotify_handler)
210 self.timer_handle = None
213 def OnInotify(self, notifier_enabled):
214 """Receive an inotify notification.
216 @type notifier_enabled: boolean
217 @param notifier_enabled: whether the notifier is still enabled
220 current_time = time.time()
221 time_delta = current_time - self.last_notification
222 self.last_notification = current_time
224 if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
225 logging.debug("Moving from inotify mode to polling mode")
228 self.inotify_handler.disable()
230 if not self.polling and not notifier_enabled:
232 self.inotify_handler.enable()
233 except errors.InotifyError:
237 reloaded = self.processor.reader.Reload()
239 logging.info("Reloaded ganeti config")
241 logging.debug("Skipped double config reload")
242 except errors.ConfigurationError:
244 self.inotify_handler.disable()
247 # Reset the timer. If we're polling it will go to the polling rate, if
248 # we're not it will delay it again to its base safe timeout.
251 def _DisableTimer(self):
252 if self.timer_handle is not None:
253 self.mainloop.scheduler.cancel(self.timer_handle)
254 self.timer_handle = None
256 def _EnableTimer(self):
258 timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
260 timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
262 if self.timer_handle is None:
263 self.timer_handle = self.mainloop.scheduler.enter(
264 timeout, 1, self.OnTimer, [])
266 def _ResetTimer(self):
271 """Function called when the timer fires
274 self.timer_handle = None
278 if self.processor.reader is None:
283 reloaded = self.processor.reader.Reload()
284 except errors.ConfigurationError:
285 self.DisableConfd(silent=was_disabled)
288 if self.polling and reloaded:
289 logging.info("Reloaded ganeti config")
291 # We have reloaded the config files, but received no inotify event. If
292 # an event is pending though, we just happen to have timed out before
293 # receiving it, so this is not a problem, and we shouldn't alert
294 if not self.notifier.check_events() and not was_disabled:
295 logging.warning("Config file reload at timeout (inotify failure)")
297 # We're polling, but we haven't reloaded the config:
298 # Going back to inotify mode
299 logging.debug("Moving from polling mode to inotify mode")
302 self.inotify_handler.enable()
303 except errors.InotifyError:
306 logging.debug("Performed configuration check")
310 def DisableConfd(self, silent=False):
311 """Puts confd in non-serving mode
315 logging.warning("Confd is being disabled")
316 self.processor.Disable()
320 def EnableConfd(self):
321 self.processor.Enable()
322 logging.warning("Confd is being enabled")
327 def CheckConfd(options, args):
328 """Initial checks whether to run exit with a failure.
331 # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
332 # have more than one.
333 if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
334 print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
335 sys.exit(constants.EXIT_FAILURE)
338 def ExecConfd(options, args):
339 """Main confd function, executed with PID file held
342 mainloop = daemon.Mainloop()
344 # Asyncronous confd UDP server
345 processor = ConfdProcessor()
348 except errors.ConfigurationError:
349 # If enabling the processor has failed, we can still go on, but confd will
351 logging.warning("Confd is starting in disabled mode")
353 server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
355 # Configuration reloader
356 reloader = ConfdConfigurationReloader(processor, mainloop)
362 """Main function for the confd daemon.
365 parser = OptionParser(description="Ganeti configuration daemon",
366 usage="%prog [-f] [-d] [-b ADDRESS]",
367 version="%%prog (ganeti) %s" %
368 constants.RELEASE_VERSION)
370 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
371 dirs.append((constants.LOG_OS_DIR, 0750))
372 dirs.append((constants.LOCK_DIR, 1777))
373 daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
376 if __name__ == "__main__":