4 # Copyright (C) 2009, Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti configuration daemon
24 Ganeti-confd is a daemon to query master candidates for configuration values.
25 It uses UDP+HMAC for authentication with a global cluster key.
37 from optparse import OptionParser
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import daemon
42 from ganeti import ssconf
43 from ganeti.asyncnotifier import AsyncNotifier
44 from ganeti.confd.server import ConfdProcessor
47 class ConfdAsyncUDPServer(asyncore.dispatcher):
48 """The confd udp server, suitable for use with asyncore.
51 def __init__(self, bind_address, port, processor):
52 """Constructor for ConfdAsyncUDPServer
54 @type bind_address: string
55 @param bind_address: socket bind address ('' for all)
58 @type processor: L{confd.server.ConfdProcessor}
59 @param reader: ConfigReader to use to access the config
62 asyncore.dispatcher.__init__(self)
63 self.bind_address = bind_address
65 self.processor = processor
66 self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
67 self.bind((bind_address, port))
68 logging.debug("listening on ('%s':%d)" % (bind_address, port))
70 # this method is overriding an asyncore.dispatcher method
71 def handle_connect(self):
72 # Python thinks that the first udp message from a source qualifies as a
73 # "connect" and further ones are part of the same connection. We beg to
74 # differ and treat all messages equally.
77 # this method is overriding an asyncore.dispatcher method
78 def handle_read(self):
80 payload_in, address = self.recvfrom(4096)
82 payload_out = self.processor.ExecQuery(payload_in, ip, port)
83 if payload_out is not None:
84 self.sendto(payload_out, 0, (ip, port))
86 # we need to catch any exception here, log it, but proceed, because even
87 # if we failed handling a single request, we still want the confd to
89 logging.error("Unexpected exception", exc_info=True)
91 # this method is overriding an asyncore.dispatcher method
93 # No need to check if we can write to the UDP socket
97 class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
99 def __init__(self, watch_manager, callback,
100 file=constants.CLUSTER_CONF_FILE):
101 """Constructor for ConfdInotifyEventHandler
103 @type watch_manager: L{pyinotify.WatchManager}
104 @param watch_manager: ganeti-confd inotify watch manager
105 @type callback: function accepting a boolean
106 @param callback: function to call when an inotify event happens
108 @param file: config file to watch
111 # no need to call the parent's constructor
112 self.watch_manager = watch_manager
113 self.callback = callback
114 self.mask = pyinotify.EventsCodes.IN_IGNORED | \
115 pyinotify.EventsCodes.IN_MODIFY
117 self.watch_handle = None
120 """Watch the given file
123 if self.watch_handle is None:
124 result = self.watch_manager.add_watch(self.file, self.mask)
125 if not self.file in result or result[self.file] <= 0:
126 raise errors.InotifyError("Could not add inotify watcher")
128 self.watch_handle = result[self.file]
131 """Stop watching the given file
134 if self.watch_handle is not None:
135 result = self.watch_manager.rm_watch(self.watch_handle)
136 if result[self.watch_handle]:
137 self.watch_handle = None
139 def process_IN_IGNORED(self, event):
140 # Due to the fact that we monitor just for the cluster config file (rather
141 # than for the whole data dir) when the file is replaced with another one
142 # (which is what happens normally in ganeti) we're going to receive an
143 # IN_IGNORED event from inotify, because of the file removal (which is
144 # contextual with the replacement). In such a case we need to create
145 # another watcher for the "new" file.
146 logging.debug("Received 'ignored' inotify event for %s" % event.path)
147 self.watch_handle = None
150 # Since the kernel believes the file we were interested in is gone, it's
151 # not going to notify us of any other events, until we set up, here, the
152 # new watch. This is not a race condition, though, since we're anyway
153 # going to realod the file after setting up the new watch.
155 except errors.ConfdFatalError, err:
156 logging.critical("Critical error, shutting down: %s" % err)
157 sys.exit(constants.EXIT_FAILURE)
159 # we need to catch any exception here, log it, but proceed, because even
160 # if we failed handling a single request, we still want the confd to
162 logging.error("Unexpected exception", exc_info=True)
164 def process_IN_MODIFY(self, event):
165 # This gets called when the config file is modified. Note that this doesn't
166 # usually happen in Ganeti, as the config file is normally replaced by a
167 # new one, at filesystem level, rather than actually modified (see
169 logging.debug("Received 'modify' inotify event for %s" % event.path)
173 except errors.ConfdFatalError, err:
174 logging.critical("Critical error, shutting down: %s" % err)
175 sys.exit(constants.EXIT_FAILURE)
177 # we need to catch any exception here, log it, but proceed, because even
178 # if we failed handling a single request, we still want the confd to
180 logging.error("Unexpected exception", exc_info=True)
182 def process_default(self, event):
183 logging.error("Received unhandled inotify event: %s" % event)
186 class ConfdConfigurationReloader(object):
187 """Logic to control when to reload the ganeti configuration
189 This class is able to alter between inotify and polling, to rate-limit the
190 number of reloads. When using inotify it also supports a fallback timed
191 check, to verify that the reload hasn't failed.
194 def __init__(self, processor, mainloop):
195 """Constructor for ConfdConfigurationReloader
197 @type processor: L{confd.server.ConfdProcessor}
198 @param processor: ganeti-confd ConfdProcessor
199 @type mainloop: L{daemon.Mainloop}
200 @param mainloop: ganeti-confd mainloop
203 self.processor = processor
204 self.mainloop = mainloop
207 self.last_notification = 0
209 # Asyncronous inotify handler for config changes
210 self.wm = pyinotify.WatchManager()
211 self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
212 self.notifier = AsyncNotifier(self.wm, self.inotify_handler)
214 self.timer_handle = None
217 def OnInotify(self, notifier_enabled):
218 """Receive an inotify notification.
220 @type notifier_enabled: boolean
221 @param notifier_enabled: whether the notifier is still enabled
224 current_time = time.time()
225 time_delta = current_time - self.last_notification
226 self.last_notification = current_time
228 if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
229 logging.debug("Moving from inotify mode to polling mode")
232 self.inotify_handler.disable()
234 if not self.polling and not notifier_enabled:
236 self.inotify_handler.enable()
237 except errors.InotifyError:
241 reloaded = self.processor.reader.Reload()
243 logging.info("Reloaded ganeti config")
245 logging.debug("Skipped double config reload")
246 except errors.ConfigurationError:
248 self.inotify_handler.disable()
251 # Reset the timer. If we're polling it will go to the polling rate, if
252 # we're not it will delay it again to its base safe timeout.
255 def _DisableTimer(self):
256 if self.timer_handle is not None:
257 self.mainloop.scheduler.cancel(self.timer_handle)
258 self.timer_handle = None
260 def _EnableTimer(self):
262 timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
264 timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
266 if self.timer_handle is None:
267 self.timer_handle = self.mainloop.scheduler.enter(
268 timeout, 1, self.OnTimer, [])
270 def _ResetTimer(self):
275 """Function called when the timer fires
278 self.timer_handle = None
282 if self.processor.reader is None:
287 reloaded = self.processor.reader.Reload()
288 except errors.ConfigurationError:
289 self.DisableConfd(silent=was_disabled)
292 if self.polling and reloaded:
293 logging.info("Reloaded ganeti config")
295 # We have reloaded the config files, but received no inotify event. If
296 # an event is pending though, we just happen to have timed out before
297 # receiving it, so this is not a problem, and we shouldn't alert
298 if not self.notifier.check_events() and not was_disabled:
299 logging.warning("Config file reload at timeout (inotify failure)")
301 # We're polling, but we haven't reloaded the config:
302 # Going back to inotify mode
303 logging.debug("Moving from polling mode to inotify mode")
306 self.inotify_handler.enable()
307 except errors.InotifyError:
310 logging.debug("Performed configuration check")
314 def DisableConfd(self, silent=False):
315 """Puts confd in non-serving mode
319 logging.warning("Confd is being disabled")
320 self.processor.Disable()
324 def EnableConfd(self):
325 self.processor.Enable()
326 logging.warning("Confd is being enabled")
331 def CheckConfd(options, args):
332 """Initial checks whether to run exit with a failure.
335 # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
336 # have more than one.
337 if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
338 print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
339 sys.exit(constants.EXIT_FAILURE)
342 def ExecConfd(options, args):
343 """Main confd function, executed with PID file held
346 mainloop = daemon.Mainloop()
348 # Asyncronous confd UDP server
349 processor = ConfdProcessor()
352 except errors.ConfigurationError:
353 # If enabling the processor has failed, we can still go on, but confd will
355 logging.warning("Confd is starting in disabled mode")
357 server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
359 # Configuration reloader
360 reloader = ConfdConfigurationReloader(processor, mainloop)
366 """Main function for the confd daemon.
369 parser = OptionParser(description="Ganeti configuration daemon",
370 usage="%prog [-f] [-d] [-b ADDRESS]",
371 version="%%prog (ganeti) %s" %
372 constants.RELEASE_VERSION)
374 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
375 dirs.append((constants.LOG_OS_DIR, 0750))
376 dirs.append((constants.LOCK_DIR, 1777))
377 daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
380 if __name__ == "__main__":