Add new “daemon-util” script to start/stop Ganeti daemons
[ganeti-local] / daemons / ganeti-confd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2009, Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti configuration daemon
23
24 Ganeti-confd is a daemon to query master candidates for configuration values.
25 It uses UDP+HMAC for authentication with a global cluster key.
26
27 """
28
29 import os
30 import sys
31 import logging
32 import pyinotify
33 import time
34
35 from optparse import OptionParser
36
37 from ganeti import asyncnotifier
38 from ganeti import confd
39 from ganeti.confd import server as confd_server
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import daemon
43 from ganeti import ssconf
44
45
46 class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
47   """The confd udp server, suitable for use with asyncore.
48
49   """
50   def __init__(self, bind_address, port, processor):
51     """Constructor for ConfdAsyncUDPServer
52
53     @type bind_address: string
54     @param bind_address: socket bind address ('' for all)
55     @type port: int
56     @param port: udp port
57     @type processor: L{confd.server.ConfdProcessor}
58     @param processor: ConfdProcessor to use to handle queries
59
60     """
61     daemon.AsyncUDPSocket.__init__(self)
62     self.bind_address = bind_address
63     self.port = port
64     self.processor = processor
65     self.bind((bind_address, port))
66     logging.debug("listening on ('%s':%d)" % (bind_address, port))
67
68   # this method is overriding a daemon.AsyncUDPSocket method
69   def handle_datagram(self, payload_in, ip, port):
70     try:
71       query = confd.UnpackMagic(payload_in)
72     except errors.ConfdMagicError, err:
73       logging.debug(err)
74       return
75
76     answer =  self.processor.ExecQuery(query, ip, port)
77     if answer is not None:
78       try:
79         self.enqueue_send(ip, port, confd.PackMagic(answer))
80       except errors.UdpDataSizeError:
81         logging.error("Reply too big to fit in an udp packet.")
82
83
84 class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
85
86   def __init__(self, watch_manager, callback,
87                file=constants.CLUSTER_CONF_FILE):
88     """Constructor for ConfdInotifyEventHandler
89
90     @type watch_manager: L{pyinotify.WatchManager}
91     @param watch_manager: ganeti-confd inotify watch manager
92     @type callback: function accepting a boolean
93     @param callback: function to call when an inotify event happens
94     @type file: string
95     @param file: config file to watch
96
97     """
98     # no need to call the parent's constructor
99     self.watch_manager = watch_manager
100     self.callback = callback
101     self.mask = pyinotify.EventsCodes.IN_IGNORED | \
102                 pyinotify.EventsCodes.IN_MODIFY
103     self.file = file
104     self.watch_handle = None
105
106   def enable(self):
107     """Watch the given file
108
109     """
110     if self.watch_handle is None:
111       result = self.watch_manager.add_watch(self.file, self.mask)
112       if not self.file in result or result[self.file] <= 0:
113         raise errors.InotifyError("Could not add inotify watcher")
114       else:
115         self.watch_handle = result[self.file]
116
117   def disable(self):
118     """Stop watching the given file
119
120     """
121     if self.watch_handle is not None:
122       result = self.watch_manager.rm_watch(self.watch_handle)
123       if result[self.watch_handle]:
124         self.watch_handle = None
125
126   def process_IN_IGNORED(self, event):
127     # Due to the fact that we monitor just for the cluster config file (rather
128     # than for the whole data dir) when the file is replaced with another one
129     # (which is what happens normally in ganeti) we're going to receive an
130     # IN_IGNORED event from inotify, because of the file removal (which is
131     # contextual with the replacement). In such a case we need to create
132     # another watcher for the "new" file.
133     logging.debug("Received 'ignored' inotify event for %s" % event.path)
134     self.watch_handle = None
135
136     try:
137       # Since the kernel believes the file we were interested in is gone, it's
138       # not going to notify us of any other events, until we set up, here, the
139       # new watch. This is not a race condition, though, since we're anyway
140       # going to realod the file after setting up the new watch.
141       self.callback(False)
142     except errors.ConfdFatalError, err:
143       logging.critical("Critical error, shutting down: %s" % err)
144       sys.exit(constants.EXIT_FAILURE)
145     except:
146       # we need to catch any exception here, log it, but proceed, because even
147       # if we failed handling a single request, we still want the confd to
148       # continue working.
149       logging.error("Unexpected exception", exc_info=True)
150
151   def process_IN_MODIFY(self, event):
152     # This gets called when the config file is modified. Note that this doesn't
153     # usually happen in Ganeti, as the config file is normally replaced by a
154     # new one, at filesystem level, rather than actually modified (see
155     # utils.WriteFile)
156     logging.debug("Received 'modify' inotify event for %s" % event.path)
157
158     try:
159       self.callback(True)
160     except errors.ConfdFatalError, err:
161       logging.critical("Critical error, shutting down: %s" % err)
162       sys.exit(constants.EXIT_FAILURE)
163     except:
164       # we need to catch any exception here, log it, but proceed, because even
165       # if we failed handling a single request, we still want the confd to
166       # continue working.
167       logging.error("Unexpected exception", exc_info=True)
168
169   def process_default(self, event):
170     logging.error("Received unhandled inotify event: %s" % event)
171
172
173 class ConfdConfigurationReloader(object):
174   """Logic to control when to reload the ganeti configuration
175
176   This class is able to alter between inotify and polling, to rate-limit the
177   number of reloads. When using inotify it also supports a fallback timed
178   check, to verify that the reload hasn't failed.
179
180   """
181   def __init__(self, processor, mainloop):
182     """Constructor for ConfdConfigurationReloader
183
184     @type processor: L{confd.server.ConfdProcessor}
185     @param processor: ganeti-confd ConfdProcessor
186     @type mainloop: L{daemon.Mainloop}
187     @param mainloop: ganeti-confd mainloop
188
189     """
190     self.processor = processor
191     self.mainloop = mainloop
192
193     self.polling = True
194     self.last_notification = 0
195
196     # Asyncronous inotify handler for config changes
197     self.wm = pyinotify.WatchManager()
198     self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
199     self.notifier = asyncnotifier.AsyncNotifier(self.wm, self.inotify_handler)
200
201     self.timer_handle = None
202     self._EnableTimer()
203
204   def OnInotify(self, notifier_enabled):
205     """Receive an inotify notification.
206
207     @type notifier_enabled: boolean
208     @param notifier_enabled: whether the notifier is still enabled
209
210     """
211     current_time = time.time()
212     time_delta = current_time - self.last_notification
213     self.last_notification = current_time
214
215     if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
216       logging.debug("Moving from inotify mode to polling mode")
217       self.polling = True
218       if notifier_enabled:
219         self.inotify_handler.disable()
220
221     if not self.polling and not notifier_enabled:
222       try:
223         self.inotify_handler.enable()
224       except errors.InotifyError:
225         self.polling = True
226
227     try:
228       reloaded = self.processor.reader.Reload()
229       if reloaded:
230         logging.info("Reloaded ganeti config")
231       else:
232         logging.debug("Skipped double config reload")
233     except errors.ConfigurationError:
234       self.DisableConfd()
235       self.inotify_handler.disable()
236       return
237
238     # Reset the timer. If we're polling it will go to the polling rate, if
239     # we're not it will delay it again to its base safe timeout.
240     self._ResetTimer()
241
242   def _DisableTimer(self):
243     if self.timer_handle is not None:
244       self.mainloop.scheduler.cancel(self.timer_handle)
245       self.timer_handle = None
246
247   def _EnableTimer(self):
248     if self.polling:
249       timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
250     else:
251       timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
252
253     if self.timer_handle is None:
254       self.timer_handle = self.mainloop.scheduler.enter(
255         timeout, 1, self.OnTimer, [])
256
257   def _ResetTimer(self):
258     self._DisableTimer()
259     self._EnableTimer()
260
261   def OnTimer(self):
262     """Function called when the timer fires
263
264     """
265     self.timer_handle = None
266     reloaded = False
267     was_disabled = False
268     try:
269       if self.processor.reader is None:
270         was_disabled = True
271         self.EnableConfd()
272         reloaded = True
273       else:
274         reloaded = self.processor.reader.Reload()
275     except errors.ConfigurationError:
276       self.DisableConfd(silent=was_disabled)
277       return
278
279     if self.polling and reloaded:
280       logging.info("Reloaded ganeti config")
281     elif reloaded:
282       # We have reloaded the config files, but received no inotify event.  If
283       # an event is pending though, we just happen to have timed out before
284       # receiving it, so this is not a problem, and we shouldn't alert
285       if not self.notifier.check_events() and not was_disabled:
286         logging.warning("Config file reload at timeout (inotify failure)")
287     elif self.polling:
288       # We're polling, but we haven't reloaded the config:
289       # Going back to inotify mode
290       logging.debug("Moving from polling mode to inotify mode")
291       self.polling = False
292       try:
293         self.inotify_handler.enable()
294       except errors.InotifyError:
295         self.polling = True
296     else:
297       logging.debug("Performed configuration check")
298
299     self._EnableTimer()
300
301   def DisableConfd(self, silent=False):
302     """Puts confd in non-serving mode
303
304     """
305     if not silent:
306       logging.warning("Confd is being disabled")
307     self.processor.Disable()
308     self.polling = False
309     self._ResetTimer()
310
311   def EnableConfd(self):
312     self.processor.Enable()
313     logging.warning("Confd is being enabled")
314     self.polling = True
315     self._ResetTimer()
316
317
318 def CheckConfd(options, args):
319   """Initial checks whether to run exit with a failure.
320
321   """
322   # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
323   # have more than one.
324   if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
325     print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
326     sys.exit(constants.EXIT_FAILURE)
327
328
329 def ExecConfd(options, args):
330   """Main confd function, executed with PID file held
331
332   """
333   mainloop = daemon.Mainloop()
334
335   # Asyncronous confd UDP server
336   processor = confd_server.ConfdProcessor()
337   try:
338     processor.Enable()
339   except errors.ConfigurationError:
340     # If enabling the processor has failed, we can still go on, but confd will
341     # be disabled
342     logging.warning("Confd is starting in disabled mode")
343     pass
344   server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
345
346   # Configuration reloader
347   reloader = ConfdConfigurationReloader(processor, mainloop)
348
349   mainloop.Run()
350
351
352 def main():
353   """Main function for the confd daemon.
354
355   """
356   parser = OptionParser(description="Ganeti configuration daemon",
357                         usage="%prog [-f] [-d] [-b ADDRESS]",
358                         version="%%prog (ganeti) %s" %
359                         constants.RELEASE_VERSION)
360
361   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
362   dirs.append((constants.LOCK_DIR, 1777))
363   daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
364
365
366 if __name__ == "__main__":
367   main()