confd: s/confd_event_handler/inotify_handler/
[ganeti-local] / daemons / ganeti-confd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2009, Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti configuration daemon
23
24 Ganeti-confd is a daemon to query master candidates for configuration values.
25 It uses UDP+HMAC for authentication with a global cluster key.
26
27 """
28
29 import os
30 import sys
31 import logging
32 import asyncore
33 import socket
34 import pyinotify
35 import time
36
37 from optparse import OptionParser
38
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import daemon
42 from ganeti import ssconf
43 from ganeti.asyncnotifier import AsyncNotifier
44 from ganeti.confd.server import ConfdProcessor
45
46
47 class ConfdAsyncUDPServer(asyncore.dispatcher):
48   """The confd udp server, suitable for use with asyncore.
49
50   """
51   def __init__(self, bind_address, port, processor):
52     """Constructor for ConfdAsyncUDPServer
53
54     @type bind_address: string
55     @param bind_address: socket bind address ('' for all)
56     @type port: int
57     @param port: udp port
58     @type processor: L{confd.server.ConfdProcessor}
59     @param reader: ConfigReader to use to access the config
60
61     """
62     asyncore.dispatcher.__init__(self)
63     self.bind_address = bind_address
64     self.port = port
65     self.processor = processor
66     self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
67     self.bind((bind_address, port))
68     logging.debug("listening on ('%s':%d)" % (bind_address, port))
69
70   # this method is overriding an asyncore.dispatcher method
71   def handle_connect(self):
72     # Python thinks that the first udp message from a source qualifies as a
73     # "connect" and further ones are part of the same connection. We beg to
74     # differ and treat all messages equally.
75     pass
76
77   # this method is overriding an asyncore.dispatcher method
78   def handle_read(self):
79     try:
80       payload_in, address = self.recvfrom(4096)
81       ip, port = address
82       payload_out =  self.processor.ExecQuery(payload_in, ip, port)
83       if payload_out is not None:
84         self.sendto(payload_out, 0, (ip, port))
85     except:
86       # we need to catch any exception here, log it, but proceed, because even
87       # if we failed handling a single request, we still want the confd to
88       # continue working.
89       logging.error("Unexpected exception", exc_info=True)
90
91   # this method is overriding an asyncore.dispatcher method
92   def writable(self):
93     # No need to check if we can write to the UDP socket
94     return False
95
96
97 class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
98
99   def __init__(self, watch_manager, callback,
100                file=constants.CLUSTER_CONF_FILE):
101     """Constructor for ConfdInotifyEventHandler
102
103     @type watch_manager: L{pyinotify.WatchManager}
104     @param watch_manager: ganeti-confd inotify watch manager
105     @type callback: function accepting a boolean
106     @param callback: function to call when an inotify event happens
107     @type file: string
108     @param file: config file to watch
109
110     """
111     # no need to call the parent's constructor
112     self.watch_manager = watch_manager
113     self.callback = callback
114     self.mask = pyinotify.EventsCodes.IN_IGNORED | \
115                 pyinotify.EventsCodes.IN_MODIFY
116     self.file = file
117     self.watch_handle = None
118     self.enable()
119
120   def enable(self):
121     """Watch the given file
122
123     """
124     if self.watch_handle is None:
125       result = self.watch_manager.add_watch(self.file, self.mask)
126       if not self.file in result or result[self.file] <= 0:
127         raise errors.InotifyError("Could not add inotify watcher")
128       else:
129         self.watch_handle = result[self.file]
130
131   def disable(self):
132     """Stop watching the given file
133
134     """
135     if self.watch_handle is not None:
136       result = self.watch_manager.rm_watch(self.watch_handle)
137       if result[self.watch_handle]:
138         self.watch_handle = None
139
140   def process_IN_IGNORED(self, event):
141     # Due to the fact that we monitor just for the cluster config file (rather
142     # than for the whole data dir) when the file is replaced with another one
143     # (which is what happens normally in ganeti) we're going to receive an
144     # IN_IGNORED event from inotify, because of the file removal (which is
145     # contextual with the replacement). In such a case we need to create
146     # another watcher for the "new" file.
147     logging.debug("Received 'ignored' inotify event for %s" % event.path)
148     self.watch_handle = None
149
150     try:
151       # Since the kernel believes the file we were interested in is gone, it's
152       # not going to notify us of any other events, until we set up, here, the
153       # new watch. This is not a race condition, though, since we're anyway
154       # going to realod the file after setting up the new watch.
155       self.callback(False)
156     except errors.ConfdFatalError, err:
157       logging.critical("Critical error, shutting down: %s" % err)
158       sys.exit(constants.EXIT_FAILURE)
159     except:
160       # we need to catch any exception here, log it, but proceed, because even
161       # if we failed handling a single request, we still want the confd to
162       # continue working.
163       logging.error("Unexpected exception", exc_info=True)
164
165   def process_IN_MODIFY(self, event):
166     # This gets called when the config file is modified. Note that this doesn't
167     # usually happen in Ganeti, as the config file is normally replaced by a
168     # new one, at filesystem level, rather than actually modified (see
169     # utils.WriteFile)
170     logging.debug("Received 'modify' inotify event for %s" % event.path)
171
172     try:
173       self.callback(True)
174     except errors.ConfdFatalError, err:
175       logging.critical("Critical error, shutting down: %s" % err)
176       sys.exit(constants.EXIT_FAILURE)
177     except:
178       # we need to catch any exception here, log it, but proceed, because even
179       # if we failed handling a single request, we still want the confd to
180       # continue working.
181       logging.error("Unexpected exception", exc_info=True)
182
183   def process_default(self, event):
184     logging.error("Received unhandled inotify event: %s" % event)
185
186
187 class ConfdConfigurationReloader(object):
188   """Logic to control when to reload the ganeti configuration
189
190   This class is able to alter between inotify and polling, to rate-limit the
191   number of reloads. When using inotify it also supports a fallback timed
192   check, to verify that the reload hasn't failed.
193
194   """
195   def __init__(self, reader, mainloop):
196     """Constructor for ConfdConfigurationReloader
197
198     @type reader: L{ssconf.SimpleConfigReader}
199     @param reader: ganeti-confd SimpleConfigReader
200     @type mainloop: L{daemon.Mainloop}
201     @param mainloop: ganeti-confd mainloop
202
203     """
204     self.reader = reader
205     self.mainloop = mainloop
206
207     self.polling = False
208     self.last_notification = 0
209
210     # Asyncronous inotify handler for config changes
211     self.wm = pyinotify.WatchManager()
212     self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
213     self.notifier = AsyncNotifier(self.wm, self.inotify_handler)
214
215     self.timer_handle = None
216     self._EnableTimer()
217
218   def OnInotify(self, notifier_enabled):
219     """Receive an inotify notification.
220
221     @type notifier_enabled: boolean
222     @param notifier_enabled: whether the notifier is still enabled
223
224     """
225     current_time = time.time()
226     time_delta = current_time - self.last_notification
227     self.last_notification = current_time
228
229     if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
230       logging.debug("Moving from inotify mode to polling mode")
231       self.polling = True
232       if notifier_enabled:
233         self.inotify_handler.disable()
234
235     if not self.polling and not notifier_enabled:
236       try:
237         self.inotify_handler.enable()
238       except errors.InotifyError:
239         raise errors.ConfdFatalError(err)
240
241     try:
242       reloaded = self.reader.Reload()
243       if reloaded:
244         logging.info("Reloaded ganeti config")
245       else:
246         logging.debug("Skipped double config reload")
247     except errors.ConfigurationError:
248       # transform a ConfigurationError in a fatal error, that will cause confd
249       # to quit.
250       raise errors.ConfdFatalError(err)
251
252     # Reset the timer. If we're polling it will go to the polling rate, if
253     # we're not it will delay it again to its base safe timeout.
254     self._DisableTimer()
255     self._EnableTimer()
256
257   def _DisableTimer(self):
258     if self.timer_handle is not None:
259       self.mainloop.scheduler.cancel(self.timer_handle)
260       self.timer_handle = None
261
262   def _EnableTimer(self):
263     if self.polling:
264       timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
265     else:
266       timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
267
268     if self.timer_handle is None:
269       self.timer_handle = self.mainloop.scheduler.enter(
270         timeout, 1, self.OnTimer, [])
271
272   def OnTimer(self):
273     """Function called when the timer fires
274
275     """
276     self.timer_handle = None
277     try:
278       reloaded = self.reader.Reload()
279     except errors.ConfigurationError:
280       # transform a ConfigurationError in a fatal error, that will cause confd
281       # to quit.
282       raise errors.ConfdFatalError(err)
283
284     if self.polling and reloaded:
285       logging.info("Reloaded ganeti config")
286     elif reloaded:
287       # We have reloaded the config files, but received no inotify event.  If
288       # an event is pending though, we just happen to have timed out before
289       # receiving it, so this is not a problem, and we shouldn't alert
290       if not self.notifier.check_events():
291         logging.warning("Config file reload at timeout (inotify failure)")
292     elif self.polling:
293       # We're polling, but we haven't reloaded the config:
294       # Going back to inotify mode
295       logging.debug("Moving from polling mode to inotify mode")
296       self.polling = False
297       self.inotify_handler.enable()
298     else:
299       logging.debug("Performed configuration check")
300
301     self._EnableTimer()
302
303
304 def CheckConfd(options, args):
305   """Initial checks whether to run exit with a failure.
306
307   """
308   # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
309   # have more than one.
310   if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
311     print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
312     sys.exit(constants.EXIT_FAILURE)
313
314   ssconf.CheckMasterCandidate(options.debug)
315
316
317 def ExecConfd(options, args):
318   """Main confd function, executed with PID file held
319
320   """
321   mainloop = daemon.Mainloop()
322
323   # confd-level SimpleConfigReader
324   reader = ssconf.SimpleConfigReader()
325
326   # Asyncronous confd UDP server
327   processor = ConfdProcessor(reader)
328   server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
329
330   # Configuration reloader
331   reloader = ConfdConfigurationReloader(reader, mainloop)
332
333   mainloop.Run()
334
335
336 def main():
337   """Main function for the confd daemon.
338
339   """
340   parser = OptionParser(description="Ganeti configuration daemon",
341                         usage="%prog [-f] [-d] [-b ADDRESS]",
342                         version="%%prog (ganeti) %s" %
343                         constants.RELEASE_VERSION)
344
345   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
346   dirs.append((constants.LOG_OS_DIR, 0750))
347   dirs.append((constants.LOCK_DIR, 1777))
348   daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
349
350
351 if __name__ == "__main__":
352   main()