daemon.GenericMain: Don't use dict for SSL paths, improve CLI options
[ganeti-local] / daemons / ganeti-confd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2009, Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti configuration daemon
23
24 Ganeti-confd is a daemon to query master candidates for configuration values.
25 It uses UDP+HMAC for authentication with a global cluster key.
26
27 """
28
29 import os
30 import sys
31 import logging
32 import pyinotify
33 import time
34
35 from optparse import OptionParser
36
37 from ganeti import asyncnotifier
38 from ganeti import confd
39 from ganeti.confd import server as confd_server
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import daemon
43 from ganeti import utils
44 from ganeti import ssconf
45
46
47 class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
48   """The confd udp server, suitable for use with asyncore.
49
50   """
51   def __init__(self, bind_address, port, processor):
52     """Constructor for ConfdAsyncUDPServer
53
54     @type bind_address: string
55     @param bind_address: socket bind address ('' for all)
56     @type port: int
57     @param port: udp port
58     @type processor: L{confd.server.ConfdProcessor}
59     @param processor: ConfdProcessor to use to handle queries
60
61     """
62     daemon.AsyncUDPSocket.__init__(self)
63     self.bind_address = bind_address
64     self.port = port
65     self.processor = processor
66     self.bind((bind_address, port))
67     logging.debug("listening on ('%s':%d)" % (bind_address, port))
68
69   # this method is overriding a daemon.AsyncUDPSocket method
70   def handle_datagram(self, payload_in, ip, port):
71     try:
72       query = confd.UnpackMagic(payload_in)
73     except errors.ConfdMagicError, err:
74       logging.debug(err)
75       return
76
77     answer =  self.processor.ExecQuery(query, ip, port)
78     if answer is not None:
79       try:
80         self.enqueue_send(ip, port, confd.PackMagic(answer))
81       except errors.UdpDataSizeError:
82         logging.error("Reply too big to fit in an udp packet.")
83
84
85 class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
86
87   def __init__(self, watch_manager, callback,
88                file=constants.CLUSTER_CONF_FILE):
89     """Constructor for ConfdInotifyEventHandler
90
91     @type watch_manager: L{pyinotify.WatchManager}
92     @param watch_manager: ganeti-confd inotify watch manager
93     @type callback: function accepting a boolean
94     @param callback: function to call when an inotify event happens
95     @type file: string
96     @param file: config file to watch
97
98     """
99     # no need to call the parent's constructor
100     self.watch_manager = watch_manager
101     self.callback = callback
102     self.mask = pyinotify.EventsCodes.IN_IGNORED | \
103                 pyinotify.EventsCodes.IN_MODIFY
104     self.file = file
105     self.watch_handle = None
106
107   def enable(self):
108     """Watch the given file
109
110     """
111     if self.watch_handle is None:
112       result = self.watch_manager.add_watch(self.file, self.mask)
113       if not self.file in result or result[self.file] <= 0:
114         raise errors.InotifyError("Could not add inotify watcher")
115       else:
116         self.watch_handle = result[self.file]
117
118   def disable(self):
119     """Stop watching the given file
120
121     """
122     if self.watch_handle is not None:
123       result = self.watch_manager.rm_watch(self.watch_handle)
124       if result[self.watch_handle]:
125         self.watch_handle = None
126
127   def process_IN_IGNORED(self, event):
128     # Due to the fact that we monitor just for the cluster config file (rather
129     # than for the whole data dir) when the file is replaced with another one
130     # (which is what happens normally in ganeti) we're going to receive an
131     # IN_IGNORED event from inotify, because of the file removal (which is
132     # contextual with the replacement). In such a case we need to create
133     # another watcher for the "new" file.
134     logging.debug("Received 'ignored' inotify event for %s" % event.path)
135     self.watch_handle = None
136
137     try:
138       # Since the kernel believes the file we were interested in is gone, it's
139       # not going to notify us of any other events, until we set up, here, the
140       # new watch. This is not a race condition, though, since we're anyway
141       # going to realod the file after setting up the new watch.
142       self.callback(False)
143     except errors.ConfdFatalError, err:
144       logging.critical("Critical error, shutting down: %s" % err)
145       sys.exit(constants.EXIT_FAILURE)
146     except:
147       # we need to catch any exception here, log it, but proceed, because even
148       # if we failed handling a single request, we still want the confd to
149       # continue working.
150       logging.error("Unexpected exception", exc_info=True)
151
152   def process_IN_MODIFY(self, event):
153     # This gets called when the config file is modified. Note that this doesn't
154     # usually happen in Ganeti, as the config file is normally replaced by a
155     # new one, at filesystem level, rather than actually modified (see
156     # utils.WriteFile)
157     logging.debug("Received 'modify' inotify event for %s" % event.path)
158
159     try:
160       self.callback(True)
161     except errors.ConfdFatalError, err:
162       logging.critical("Critical error, shutting down: %s" % err)
163       sys.exit(constants.EXIT_FAILURE)
164     except:
165       # we need to catch any exception here, log it, but proceed, because even
166       # if we failed handling a single request, we still want the confd to
167       # continue working.
168       logging.error("Unexpected exception", exc_info=True)
169
170   def process_default(self, event):
171     logging.error("Received unhandled inotify event: %s" % event)
172
173
174 class ConfdConfigurationReloader(object):
175   """Logic to control when to reload the ganeti configuration
176
177   This class is able to alter between inotify and polling, to rate-limit the
178   number of reloads. When using inotify it also supports a fallback timed
179   check, to verify that the reload hasn't failed.
180
181   """
182   def __init__(self, processor, mainloop):
183     """Constructor for ConfdConfigurationReloader
184
185     @type processor: L{confd.server.ConfdProcessor}
186     @param processor: ganeti-confd ConfdProcessor
187     @type mainloop: L{daemon.Mainloop}
188     @param mainloop: ganeti-confd mainloop
189
190     """
191     self.processor = processor
192     self.mainloop = mainloop
193
194     self.polling = True
195     self.last_notification = 0
196
197     # Asyncronous inotify handler for config changes
198     self.wm = pyinotify.WatchManager()
199     self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
200     self.notifier = asyncnotifier.AsyncNotifier(self.wm, self.inotify_handler)
201
202     self.timer_handle = None
203     self._EnableTimer()
204
205   def OnInotify(self, notifier_enabled):
206     """Receive an inotify notification.
207
208     @type notifier_enabled: boolean
209     @param notifier_enabled: whether the notifier is still enabled
210
211     """
212     current_time = time.time()
213     time_delta = current_time - self.last_notification
214     self.last_notification = current_time
215
216     if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
217       logging.debug("Moving from inotify mode to polling mode")
218       self.polling = True
219       if notifier_enabled:
220         self.inotify_handler.disable()
221
222     if not self.polling and not notifier_enabled:
223       try:
224         self.inotify_handler.enable()
225       except errors.InotifyError:
226         self.polling = True
227
228     try:
229       reloaded = self.processor.reader.Reload()
230       if reloaded:
231         logging.info("Reloaded ganeti config")
232       else:
233         logging.debug("Skipped double config reload")
234     except errors.ConfigurationError:
235       self.DisableConfd()
236       self.inotify_handler.disable()
237       return
238
239     # Reset the timer. If we're polling it will go to the polling rate, if
240     # we're not it will delay it again to its base safe timeout.
241     self._ResetTimer()
242
243   def _DisableTimer(self):
244     if self.timer_handle is not None:
245       self.mainloop.scheduler.cancel(self.timer_handle)
246       self.timer_handle = None
247
248   def _EnableTimer(self):
249     if self.polling:
250       timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
251     else:
252       timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
253
254     if self.timer_handle is None:
255       self.timer_handle = self.mainloop.scheduler.enter(
256         timeout, 1, self.OnTimer, [])
257
258   def _ResetTimer(self):
259     self._DisableTimer()
260     self._EnableTimer()
261
262   def OnTimer(self):
263     """Function called when the timer fires
264
265     """
266     self.timer_handle = None
267     reloaded = False
268     was_disabled = False
269     try:
270       if self.processor.reader is None:
271         was_disabled = True
272         self.EnableConfd()
273         reloaded = True
274       else:
275         reloaded = self.processor.reader.Reload()
276     except errors.ConfigurationError:
277       self.DisableConfd(silent=was_disabled)
278       return
279
280     if self.polling and reloaded:
281       logging.info("Reloaded ganeti config")
282     elif reloaded:
283       # We have reloaded the config files, but received no inotify event.  If
284       # an event is pending though, we just happen to have timed out before
285       # receiving it, so this is not a problem, and we shouldn't alert
286       if not self.notifier.check_events() and not was_disabled:
287         logging.warning("Config file reload at timeout (inotify failure)")
288     elif self.polling:
289       # We're polling, but we haven't reloaded the config:
290       # Going back to inotify mode
291       logging.debug("Moving from polling mode to inotify mode")
292       self.polling = False
293       try:
294         self.inotify_handler.enable()
295       except errors.InotifyError:
296         self.polling = True
297     else:
298       logging.debug("Performed configuration check")
299
300     self._EnableTimer()
301
302   def DisableConfd(self, silent=False):
303     """Puts confd in non-serving mode
304
305     """
306     if not silent:
307       logging.warning("Confd is being disabled")
308     self.processor.Disable()
309     self.polling = False
310     self._ResetTimer()
311
312   def EnableConfd(self):
313     self.processor.Enable()
314     logging.warning("Confd is being enabled")
315     self.polling = True
316     self._ResetTimer()
317
318
319 def CheckConfd(options, args):
320   """Initial checks whether to run exit with a failure.
321
322   """
323   # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
324   # have more than one.
325   if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
326     print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
327     sys.exit(constants.EXIT_FAILURE)
328
329
330 def ExecConfd(options, args):
331   """Main confd function, executed with PID file held
332
333   """
334   mainloop = daemon.Mainloop()
335
336   # Asyncronous confd UDP server
337   processor = confd_server.ConfdProcessor()
338   try:
339     processor.Enable()
340   except errors.ConfigurationError:
341     # If enabling the processor has failed, we can still go on, but confd will
342     # be disabled
343     logging.warning("Confd is starting in disabled mode")
344     pass
345   server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
346
347   # Configuration reloader
348   reloader = ConfdConfigurationReloader(processor, mainloop)
349
350   mainloop.Run()
351
352
353 def main():
354   """Main function for the confd daemon.
355
356   """
357   parser = OptionParser(description="Ganeti configuration daemon",
358                         usage="%prog [-f] [-d] [-b ADDRESS]",
359                         version="%%prog (ganeti) %s" %
360                         constants.RELEASE_VERSION)
361
362   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
363   dirs.append((constants.LOCK_DIR, 1777))
364   daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
365
366
367 if __name__ == "__main__":
368   main()