Change pyinotify import for broader compatibility
[ganeti-local] / daemons / ganeti-confd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2009, Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti configuration daemon
23
24 Ganeti-confd is a daemon to query master candidates for configuration values.
25 It uses UDP+HMAC for authentication with a global cluster key.
26
27 """
28
29 import os
30 import sys
31 import logging
32 import time
33
34 try:
35   from pyinotify import pyinotify
36 except ImportError:
37   import pyinotify
38
39 from optparse import OptionParser
40
41 from ganeti import asyncnotifier
42 from ganeti import confd
43 from ganeti.confd import server as confd_server
44 from ganeti import constants
45 from ganeti import errors
46 from ganeti import daemon
47 from ganeti import ssconf
48
49
50 class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
51   """The confd udp server, suitable for use with asyncore.
52
53   """
54   def __init__(self, bind_address, port, processor):
55     """Constructor for ConfdAsyncUDPServer
56
57     @type bind_address: string
58     @param bind_address: socket bind address ('' for all)
59     @type port: int
60     @param port: udp port
61     @type processor: L{confd.server.ConfdProcessor}
62     @param processor: ConfdProcessor to use to handle queries
63
64     """
65     daemon.AsyncUDPSocket.__init__(self)
66     self.bind_address = bind_address
67     self.port = port
68     self.processor = processor
69     self.bind((bind_address, port))
70     logging.debug("listening on ('%s':%d)" % (bind_address, port))
71
72   # this method is overriding a daemon.AsyncUDPSocket method
73   def handle_datagram(self, payload_in, ip, port):
74     try:
75       query = confd.UnpackMagic(payload_in)
76     except errors.ConfdMagicError, err:
77       logging.debug(err)
78       return
79
80     answer =  self.processor.ExecQuery(query, ip, port)
81     if answer is not None:
82       try:
83         self.enqueue_send(ip, port, confd.PackMagic(answer))
84       except errors.UdpDataSizeError:
85         logging.error("Reply too big to fit in an udp packet.")
86
87
88 class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
89
90   def __init__(self, watch_manager, callback,
91                file=constants.CLUSTER_CONF_FILE):
92     """Constructor for ConfdInotifyEventHandler
93
94     @type watch_manager: L{pyinotify.WatchManager}
95     @param watch_manager: ganeti-confd inotify watch manager
96     @type callback: function accepting a boolean
97     @param callback: function to call when an inotify event happens
98     @type file: string
99     @param file: config file to watch
100
101     """
102     # no need to call the parent's constructor
103     self.watch_manager = watch_manager
104     self.callback = callback
105     self.mask = pyinotify.EventsCodes.IN_IGNORED | \
106                 pyinotify.EventsCodes.IN_MODIFY
107     self.file = file
108     self.watch_handle = None
109
110   def enable(self):
111     """Watch the given file
112
113     """
114     if self.watch_handle is None:
115       result = self.watch_manager.add_watch(self.file, self.mask)
116       if not self.file in result or result[self.file] <= 0:
117         raise errors.InotifyError("Could not add inotify watcher")
118       else:
119         self.watch_handle = result[self.file]
120
121   def disable(self):
122     """Stop watching the given file
123
124     """
125     if self.watch_handle is not None:
126       result = self.watch_manager.rm_watch(self.watch_handle)
127       if result[self.watch_handle]:
128         self.watch_handle = None
129
130   def process_IN_IGNORED(self, event):
131     # Due to the fact that we monitor just for the cluster config file (rather
132     # than for the whole data dir) when the file is replaced with another one
133     # (which is what happens normally in ganeti) we're going to receive an
134     # IN_IGNORED event from inotify, because of the file removal (which is
135     # contextual with the replacement). In such a case we need to create
136     # another watcher for the "new" file.
137     logging.debug("Received 'ignored' inotify event for %s" % event.path)
138     self.watch_handle = None
139
140     try:
141       # Since the kernel believes the file we were interested in is gone, it's
142       # not going to notify us of any other events, until we set up, here, the
143       # new watch. This is not a race condition, though, since we're anyway
144       # going to realod the file after setting up the new watch.
145       self.callback(False)
146     except errors.ConfdFatalError, err:
147       logging.critical("Critical error, shutting down: %s" % err)
148       sys.exit(constants.EXIT_FAILURE)
149     except:
150       # we need to catch any exception here, log it, but proceed, because even
151       # if we failed handling a single request, we still want the confd to
152       # continue working.
153       logging.error("Unexpected exception", exc_info=True)
154
155   def process_IN_MODIFY(self, event):
156     # This gets called when the config file is modified. Note that this doesn't
157     # usually happen in Ganeti, as the config file is normally replaced by a
158     # new one, at filesystem level, rather than actually modified (see
159     # utils.WriteFile)
160     logging.debug("Received 'modify' inotify event for %s" % event.path)
161
162     try:
163       self.callback(True)
164     except errors.ConfdFatalError, err:
165       logging.critical("Critical error, shutting down: %s" % err)
166       sys.exit(constants.EXIT_FAILURE)
167     except:
168       # we need to catch any exception here, log it, but proceed, because even
169       # if we failed handling a single request, we still want the confd to
170       # continue working.
171       logging.error("Unexpected exception", exc_info=True)
172
173   def process_default(self, event):
174     logging.error("Received unhandled inotify event: %s" % event)
175
176
177 class ConfdConfigurationReloader(object):
178   """Logic to control when to reload the ganeti configuration
179
180   This class is able to alter between inotify and polling, to rate-limit the
181   number of reloads. When using inotify it also supports a fallback timed
182   check, to verify that the reload hasn't failed.
183
184   """
185   def __init__(self, processor, mainloop):
186     """Constructor for ConfdConfigurationReloader
187
188     @type processor: L{confd.server.ConfdProcessor}
189     @param processor: ganeti-confd ConfdProcessor
190     @type mainloop: L{daemon.Mainloop}
191     @param mainloop: ganeti-confd mainloop
192
193     """
194     self.processor = processor
195     self.mainloop = mainloop
196
197     self.polling = True
198     self.last_notification = 0
199
200     # Asyncronous inotify handler for config changes
201     self.wm = pyinotify.WatchManager()
202     self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
203     self.notifier = asyncnotifier.AsyncNotifier(self.wm, self.inotify_handler)
204
205     self.timer_handle = None
206     self._EnableTimer()
207
208   def OnInotify(self, notifier_enabled):
209     """Receive an inotify notification.
210
211     @type notifier_enabled: boolean
212     @param notifier_enabled: whether the notifier is still enabled
213
214     """
215     current_time = time.time()
216     time_delta = current_time - self.last_notification
217     self.last_notification = current_time
218
219     if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
220       logging.debug("Moving from inotify mode to polling mode")
221       self.polling = True
222       if notifier_enabled:
223         self.inotify_handler.disable()
224
225     if not self.polling and not notifier_enabled:
226       try:
227         self.inotify_handler.enable()
228       except errors.InotifyError:
229         self.polling = True
230
231     try:
232       reloaded = self.processor.reader.Reload()
233       if reloaded:
234         logging.info("Reloaded ganeti config")
235       else:
236         logging.debug("Skipped double config reload")
237     except errors.ConfigurationError:
238       self.DisableConfd()
239       self.inotify_handler.disable()
240       return
241
242     # Reset the timer. If we're polling it will go to the polling rate, if
243     # we're not it will delay it again to its base safe timeout.
244     self._ResetTimer()
245
246   def _DisableTimer(self):
247     if self.timer_handle is not None:
248       self.mainloop.scheduler.cancel(self.timer_handle)
249       self.timer_handle = None
250
251   def _EnableTimer(self):
252     if self.polling:
253       timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
254     else:
255       timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
256
257     if self.timer_handle is None:
258       self.timer_handle = self.mainloop.scheduler.enter(
259         timeout, 1, self.OnTimer, [])
260
261   def _ResetTimer(self):
262     self._DisableTimer()
263     self._EnableTimer()
264
265   def OnTimer(self):
266     """Function called when the timer fires
267
268     """
269     self.timer_handle = None
270     reloaded = False
271     was_disabled = False
272     try:
273       if self.processor.reader is None:
274         was_disabled = True
275         self.EnableConfd()
276         reloaded = True
277       else:
278         reloaded = self.processor.reader.Reload()
279     except errors.ConfigurationError:
280       self.DisableConfd(silent=was_disabled)
281       return
282
283     if self.polling and reloaded:
284       logging.info("Reloaded ganeti config")
285     elif reloaded:
286       # We have reloaded the config files, but received no inotify event.  If
287       # an event is pending though, we just happen to have timed out before
288       # receiving it, so this is not a problem, and we shouldn't alert
289       if not self.notifier.check_events() and not was_disabled:
290         logging.warning("Config file reload at timeout (inotify failure)")
291     elif self.polling:
292       # We're polling, but we haven't reloaded the config:
293       # Going back to inotify mode
294       logging.debug("Moving from polling mode to inotify mode")
295       self.polling = False
296       try:
297         self.inotify_handler.enable()
298       except errors.InotifyError:
299         self.polling = True
300     else:
301       logging.debug("Performed configuration check")
302
303     self._EnableTimer()
304
305   def DisableConfd(self, silent=False):
306     """Puts confd in non-serving mode
307
308     """
309     if not silent:
310       logging.warning("Confd is being disabled")
311     self.processor.Disable()
312     self.polling = False
313     self._ResetTimer()
314
315   def EnableConfd(self):
316     self.processor.Enable()
317     logging.warning("Confd is being enabled")
318     self.polling = True
319     self._ResetTimer()
320
321
322 def CheckConfd(options, args):
323   """Initial checks whether to run exit with a failure.
324
325   """
326   # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
327   # have more than one.
328   if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
329     print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
330     sys.exit(constants.EXIT_FAILURE)
331
332
333 def ExecConfd(options, args):
334   """Main confd function, executed with PID file held
335
336   """
337   mainloop = daemon.Mainloop()
338
339   # Asyncronous confd UDP server
340   processor = confd_server.ConfdProcessor()
341   try:
342     processor.Enable()
343   except errors.ConfigurationError:
344     # If enabling the processor has failed, we can still go on, but confd will
345     # be disabled
346     logging.warning("Confd is starting in disabled mode")
347     pass
348   server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
349
350   # Configuration reloader
351   reloader = ConfdConfigurationReloader(processor, mainloop)
352
353   mainloop.Run()
354
355
356 def main():
357   """Main function for the confd daemon.
358
359   """
360   parser = OptionParser(description="Ganeti configuration daemon",
361                         usage="%prog [-f] [-d] [-b ADDRESS]",
362                         version="%%prog (ganeti) %s" %
363                         constants.RELEASE_VERSION)
364
365   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
366   dirs.append((constants.LOCK_DIR, 1777))
367   daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
368
369
370 if __name__ == "__main__":
371   main()