4 # Copyright (C) 2009, Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti configuration daemon
24 Ganeti-confd is a daemon to query master candidates for configuration values.
25 It uses UDP+HMAC for authentication with a global cluster key.
37 from optparse import OptionParser
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import daemon
42 from ganeti import ssconf
43 from ganeti.asyncnotifier import AsyncNotifier
44 from ganeti.confd.server import ConfdProcessor
47 class ConfdAsyncUDPServer(asyncore.dispatcher):
48 """The confd udp server, suitable for use with asyncore.
51 def __init__(self, bind_address, port, processor):
52 """Constructor for ConfdAsyncUDPServer
54 @type bind_address: string
55 @param bind_address: socket bind address ('' for all)
58 @type processor: L{confd.server.ConfdProcessor}
59 @param reader: ConfigReader to use to access the config
62 asyncore.dispatcher.__init__(self)
63 self.bind_address = bind_address
65 self.processor = processor
66 self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
67 self.bind((bind_address, port))
68 logging.debug("listening on ('%s':%d)" % (bind_address, port))
70 # this method is overriding an asyncore.dispatcher method
71 def handle_connect(self):
72 # Python thinks that the first udp message from a source qualifies as a
73 # "connect" and further ones are part of the same connection. We beg to
74 # differ and treat all messages equally.
77 # this method is overriding an asyncore.dispatcher method
78 def handle_read(self):
80 payload_in, address = self.recvfrom(4096)
82 payload_out = self.processor.ExecQuery(payload_in, ip, port)
83 if payload_out is not None:
84 self.sendto(payload_out, 0, (ip, port))
86 # we need to catch any exception here, log it, but proceed, because even
87 # if we failed handling a single request, we still want the confd to
89 logging.error("Unexpected exception", exc_info=True)
91 # this method is overriding an asyncore.dispatcher method
93 # No need to check if we can write to the UDP socket
97 class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
99 def __init__(self, watch_manager, callback,
100 file=constants.CLUSTER_CONF_FILE):
101 """Constructor for ConfdInotifyEventHandler
103 @type watch_manager: L{pyinotify.WatchManager}
104 @param watch_manager: ganeti-confd inotify watch manager
105 @type callback: function accepting a boolean
106 @param callback: function to call when an inotify event happens
108 @param file: config file to watch
111 # no need to call the parent's constructor
112 self.watch_manager = watch_manager
113 self.callback = callback
114 self.mask = pyinotify.EventsCodes.IN_IGNORED | \
115 pyinotify.EventsCodes.IN_MODIFY
117 self.watch_handle = None
121 """Watch the given file
124 if self.watch_handle is None:
125 result = self.watch_manager.add_watch(self.file, self.mask)
126 if not self.file in result or result[self.file] <= 0:
127 raise errors.InotifyError("Could not add inotify watcher")
129 self.watch_handle = result[self.file]
132 """Stop watching the given file
135 if self.watch_handle is not None:
136 result = self.watch_manager.rm_watch(self.watch_handle)
137 if result[self.watch_handle]:
138 self.watch_handle = None
140 def process_IN_IGNORED(self, event):
141 # Due to the fact that we monitor just for the cluster config file (rather
142 # than for the whole data dir) when the file is replaced with another one
143 # (which is what happens normally in ganeti) we're going to receive an
144 # IN_IGNORED event from inotify, because of the file removal (which is
145 # contextual with the replacement). In such a case we need to create
146 # another watcher for the "new" file.
147 logging.debug("Received 'ignored' inotify event for %s" % event.path)
148 self.watch_handle = None
151 # Since the kernel believes the file we were interested in is gone, it's
152 # not going to notify us of any other events, until we set up, here, the
153 # new watch. This is not a race condition, though, since we're anyway
154 # going to realod the file after setting up the new watch.
156 except errors.ConfdFatalError, err:
157 logging.critical("Critical error, shutting down: %s" % err)
158 sys.exit(constants.EXIT_FAILURE)
160 # we need to catch any exception here, log it, but proceed, because even
161 # if we failed handling a single request, we still want the confd to
163 logging.error("Unexpected exception", exc_info=True)
165 def process_IN_MODIFY(self, event):
166 # This gets called when the config file is modified. Note that this doesn't
167 # usually happen in Ganeti, as the config file is normally replaced by a
168 # new one, at filesystem level, rather than actually modified (see
170 logging.debug("Received 'modify' inotify event for %s" % event.path)
174 except errors.ConfdFatalError, err:
175 logging.critical("Critical error, shutting down: %s" % err)
176 sys.exit(constants.EXIT_FAILURE)
178 # we need to catch any exception here, log it, but proceed, because even
179 # if we failed handling a single request, we still want the confd to
181 logging.error("Unexpected exception", exc_info=True)
183 def process_default(self, event):
184 logging.error("Received unhandled inotify event: %s" % event)
187 class ConfdConfigurationReloader(object):
188 """Logic to control when to reload the ganeti configuration
190 This class is able to alter between inotify and polling, to rate-limit the
191 number of reloads. When using inotify it also supports a fallback timed
192 check, to verify that the reload hasn't failed.
195 def __init__(self, reader, mainloop):
196 """Constructor for ConfdConfigurationReloader
198 @type reader: L{ssconf.SimpleConfigReader}
199 @param reader: ganeti-confd SimpleConfigReader
200 @type mainloop: L{daemon.Mainloop}
201 @param mainloop: ganeti-confd mainloop
205 self.mainloop = mainloop
208 self.last_notification = 0
210 # Asyncronous inotify handler for config changes
211 self.wm = pyinotify.WatchManager()
212 self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
213 self.notifier = AsyncNotifier(self.wm, self.inotify_handler)
215 self.timer_handle = None
218 def OnInotify(self, notifier_enabled):
219 """Receive an inotify notification.
221 @type notifier_enabled: boolean
222 @param notifier_enabled: whether the notifier is still enabled
225 current_time = time.time()
226 time_delta = current_time - self.last_notification
227 self.last_notification = current_time
229 if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
230 logging.debug("Moving from inotify mode to polling mode")
233 self.inotify_handler.disable()
235 if not self.polling and not notifier_enabled:
237 self.inotify_handler.enable()
238 except errors.InotifyError:
239 raise errors.ConfdFatalError(err)
242 reloaded = self.reader.Reload()
244 logging.info("Reloaded ganeti config")
246 logging.debug("Skipped double config reload")
247 except errors.ConfigurationError:
248 # transform a ConfigurationError in a fatal error, that will cause confd
250 raise errors.ConfdFatalError(err)
252 # Reset the timer. If we're polling it will go to the polling rate, if
253 # we're not it will delay it again to its base safe timeout.
257 def _DisableTimer(self):
258 if self.timer_handle is not None:
259 self.mainloop.scheduler.cancel(self.timer_handle)
260 self.timer_handle = None
262 def _EnableTimer(self):
264 timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
266 timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
268 if self.timer_handle is None:
269 self.timer_handle = self.mainloop.scheduler.enter(
270 timeout, 1, self.OnTimer, [])
273 """Function called when the timer fires
276 self.timer_handle = None
278 reloaded = self.reader.Reload()
279 except errors.ConfigurationError:
280 # transform a ConfigurationError in a fatal error, that will cause confd
282 raise errors.ConfdFatalError(err)
284 if self.polling and reloaded:
285 logging.info("Reloaded ganeti config")
287 # We have reloaded the config files, but received no inotify event. If
288 # an event is pending though, we just happen to have timed out before
289 # receiving it, so this is not a problem, and we shouldn't alert
290 if not self.notifier.check_events():
291 logging.warning("Config file reload at timeout (inotify failure)")
293 # We're polling, but we haven't reloaded the config:
294 # Going back to inotify mode
295 logging.debug("Moving from polling mode to inotify mode")
297 self.inotify_handler.enable()
299 logging.debug("Performed configuration check")
304 def CheckConfd(options, args):
305 """Initial checks whether to run exit with a failure.
308 # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
309 # have more than one.
310 if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
311 print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
312 sys.exit(constants.EXIT_FAILURE)
314 ssconf.CheckMasterCandidate(options.debug)
317 def ExecConfd(options, args):
318 """Main confd function, executed with PID file held
321 mainloop = daemon.Mainloop()
323 # confd-level SimpleConfigReader
324 reader = ssconf.SimpleConfigReader()
326 # Asyncronous confd UDP server
327 processor = ConfdProcessor(reader)
328 server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
330 # Configuration reloader
331 reloader = ConfdConfigurationReloader(reader, mainloop)
337 """Main function for the confd daemon.
340 parser = OptionParser(description="Ganeti configuration daemon",
341 usage="%prog [-f] [-d] [-b ADDRESS]",
342 version="%%prog (ganeti) %s" %
343 constants.RELEASE_VERSION)
345 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
346 dirs.append((constants.LOG_OS_DIR, 0750))
347 dirs.append((constants.LOCK_DIR, 1777))
348 daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
351 if __name__ == "__main__":