4 # Copyright (C) 2009, Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti configuration daemon
24 Ganeti-confd is a daemon to query master candidates for configuration values.
25 It uses UDP+HMAC for authentication with a global cluster key.
37 from optparse import OptionParser
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import daemon
42 from ganeti import ssconf
43 from ganeti.asyncnotifier import AsyncNotifier
44 from ganeti.confd.server import ConfdProcessor
47 class ConfdAsyncUDPServer(asyncore.dispatcher):
48 """The confd udp server, suitable for use with asyncore.
51 def __init__(self, bind_address, port, processor):
52 """Constructor for ConfdAsyncUDPServer
54 @type bind_address: string
55 @param bind_address: socket bind address ('' for all)
58 @type processor: L{confd.server.ConfdProcessor}
59 @param reader: ConfigReader to use to access the config
62 asyncore.dispatcher.__init__(self)
63 self.bind_address = bind_address
65 self.processor = processor
66 self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
67 self.bind((bind_address, port))
68 logging.debug("listening on ('%s':%d)" % (bind_address, port))
70 # this method is overriding an asyncore.dispatcher method
71 def handle_connect(self):
72 # Python thinks that the first udp message from a source qualifies as a
73 # "connect" and further ones are part of the same connection. We beg to
74 # differ and treat all messages equally.
77 # this method is overriding an asyncore.dispatcher method
78 def handle_read(self):
80 payload_in, address = self.recvfrom(4096)
82 payload_out = self.processor.ExecQuery(payload_in, ip, port)
83 if payload_out is not None:
84 self.sendto(payload_out, 0, (ip, port))
86 # we need to catch any exception here, log it, but proceed, because even
87 # if we failed handling a single request, we still want the confd to
89 logging.error("Unexpected exception", exc_info=True)
91 # this method is overriding an asyncore.dispatcher method
93 # No need to check if we can write to the UDP socket
97 class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
99 def __init__(self, watch_manager, callback,
100 file=constants.CLUSTER_CONF_FILE):
101 """Constructor for ConfdInotifyEventHandler
103 @type watch_manager: L{pyinotify.WatchManager}
104 @param watch_manager: ganeti-confd inotify watch manager
105 @type callback: function accepting a boolean
106 @param callback: function to call when an inotify event happens
108 @param file: config file to watch
111 # no need to call the parent's constructor
112 self.watch_manager = watch_manager
113 self.callback = callback
114 self.mask = pyinotify.EventsCodes.IN_IGNORED | \
115 pyinotify.EventsCodes.IN_MODIFY
117 self.watch_handle = None
120 """Watch the given file
123 if self.watch_handle is None:
124 result = self.watch_manager.add_watch(self.file, self.mask)
125 if not self.file in result or result[self.file] <= 0:
126 raise errors.InotifyError("Could not add inotify watcher")
128 self.watch_handle = result[self.file]
131 """Stop watching the given file
134 if self.watch_handle is not None:
135 result = self.watch_manager.rm_watch(self.watch_handle)
136 if result[self.watch_handle]:
137 self.watch_handle = None
139 def process_IN_IGNORED(self, event):
140 # Due to the fact that we monitor just for the cluster config file (rather
141 # than for the whole data dir) when the file is replaced with another one
142 # (which is what happens normally in ganeti) we're going to receive an
143 # IN_IGNORED event from inotify, because of the file removal (which is
144 # contextual with the replacement). In such a case we need to create
145 # another watcher for the "new" file.
146 logging.debug("Received 'ignored' inotify event for %s" % event.path)
147 self.watch_handle = None
150 # Since the kernel believes the file we were interested in is gone, it's
151 # not going to notify us of any other events, until we set up, here, the
152 # new watch. This is not a race condition, though, since we're anyway
153 # going to realod the file after setting up the new watch.
155 except errors.ConfdFatalError, err:
156 logging.critical("Critical error, shutting down: %s" % err)
157 sys.exit(constants.EXIT_FAILURE)
159 # we need to catch any exception here, log it, but proceed, because even
160 # if we failed handling a single request, we still want the confd to
162 logging.error("Unexpected exception", exc_info=True)
164 def process_IN_MODIFY(self, event):
165 # This gets called when the config file is modified. Note that this doesn't
166 # usually happen in Ganeti, as the config file is normally replaced by a
167 # new one, at filesystem level, rather than actually modified (see
169 logging.debug("Received 'modify' inotify event for %s" % event.path)
173 except errors.ConfdFatalError, err:
174 logging.critical("Critical error, shutting down: %s" % err)
175 sys.exit(constants.EXIT_FAILURE)
177 # we need to catch any exception here, log it, but proceed, because even
178 # if we failed handling a single request, we still want the confd to
180 logging.error("Unexpected exception", exc_info=True)
182 def process_default(self, event):
183 logging.error("Received unhandled inotify event: %s" % event)
186 class ConfdConfigurationReloader(object):
187 """Logic to control when to reload the ganeti configuration
189 This class is able to alter between inotify and polling, to rate-limit the
190 number of reloads. When using inotify it also supports a fallback timed
191 check, to verify that the reload hasn't failed.
194 def __init__(self, processor, mainloop):
195 """Constructor for ConfdConfigurationReloader
197 @type processor: L{confd.server.ConfdProcessor}
198 @param processor: ganeti-confd ConfdProcessor
199 @type mainloop: L{daemon.Mainloop}
200 @param mainloop: ganeti-confd mainloop
203 self.processor = processor
204 self.mainloop = mainloop
207 self.last_notification = 0
209 # Asyncronous inotify handler for config changes
210 self.wm = pyinotify.WatchManager()
211 self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
212 self.notifier = AsyncNotifier(self.wm, self.inotify_handler)
214 self.timer_handle = None
217 def OnInotify(self, notifier_enabled):
218 """Receive an inotify notification.
220 @type notifier_enabled: boolean
221 @param notifier_enabled: whether the notifier is still enabled
224 current_time = time.time()
225 time_delta = current_time - self.last_notification
226 self.last_notification = current_time
228 if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
229 logging.debug("Moving from inotify mode to polling mode")
232 self.inotify_handler.disable()
234 if not self.polling and not notifier_enabled:
236 self.inotify_handler.enable()
237 except errors.InotifyError:
238 raise errors.ConfdFatalError(err)
241 reloaded = self.processor.reader.Reload()
243 logging.info("Reloaded ganeti config")
245 logging.debug("Skipped double config reload")
246 except errors.ConfigurationError:
247 # transform a ConfigurationError in a fatal error, that will cause confd
249 raise errors.ConfdFatalError(err)
251 # Reset the timer. If we're polling it will go to the polling rate, if
252 # we're not it will delay it again to its base safe timeout.
256 def _DisableTimer(self):
257 if self.timer_handle is not None:
258 self.mainloop.scheduler.cancel(self.timer_handle)
259 self.timer_handle = None
261 def _EnableTimer(self):
263 timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
265 timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
267 if self.timer_handle is None:
268 self.timer_handle = self.mainloop.scheduler.enter(
269 timeout, 1, self.OnTimer, [])
272 """Function called when the timer fires
275 self.timer_handle = None
277 reloaded = self.processor.reader.Reload()
278 except errors.ConfigurationError:
279 # transform a ConfigurationError in a fatal error, that will cause confd
281 raise errors.ConfdFatalError(err)
283 if self.polling and reloaded:
284 logging.info("Reloaded ganeti config")
286 # We have reloaded the config files, but received no inotify event. If
287 # an event is pending though, we just happen to have timed out before
288 # receiving it, so this is not a problem, and we shouldn't alert
289 if not self.notifier.check_events():
290 logging.warning("Config file reload at timeout (inotify failure)")
292 # We're polling, but we haven't reloaded the config:
293 # Going back to inotify mode
294 logging.debug("Moving from polling mode to inotify mode")
296 self.inotify_handler.enable()
298 logging.debug("Performed configuration check")
303 def CheckConfd(options, args):
304 """Initial checks whether to run exit with a failure.
307 # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
308 # have more than one.
309 if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
310 print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
311 sys.exit(constants.EXIT_FAILURE)
313 ssconf.CheckMasterCandidate(options.debug)
316 def ExecConfd(options, args):
317 """Main confd function, executed with PID file held
320 mainloop = daemon.Mainloop()
322 # Asyncronous confd UDP server
323 processor = ConfdProcessor()
326 except errors.ConfigurationError:
327 # If enabling the processor has failed, we can still go on, but confd will be disabled
329 server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
331 # Configuration reloader
332 reloader = ConfdConfigurationReloader(processor, mainloop)
338 """Main function for the confd daemon.
341 parser = OptionParser(description="Ganeti configuration daemon",
342 usage="%prog [-f] [-d] [-b ADDRESS]",
343 version="%%prog (ganeti) %s" %
344 constants.RELEASE_VERSION)
346 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
347 dirs.append((constants.LOG_OS_DIR, 0750))
348 dirs.append((constants.LOCK_DIR, 1777))
349 daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
352 if __name__ == "__main__":