ConfdAsyncUDPServer: fix a docstring
[ganeti-local] / daemons / ganeti-confd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2009, Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti configuration daemon
23
24 Ganeti-confd is a daemon to query master candidates for configuration values.
25 It uses UDP+HMAC for authentication with a global cluster key.
26
27 """
28
29 import os
30 import sys
31 import logging
32 import asyncore
33 import socket
34 import pyinotify
35 import time
36 import errno
37
38 from optparse import OptionParser
39
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import daemon
43 from ganeti import ssconf
44 from ganeti.asyncnotifier import AsyncNotifier
45 from ganeti.confd.server import ConfdProcessor
46
47
48 class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
49   """The confd udp server, suitable for use with asyncore.
50
51   """
52   def __init__(self, bind_address, port, processor):
53     """Constructor for ConfdAsyncUDPServer
54
55     @type bind_address: string
56     @param bind_address: socket bind address ('' for all)
57     @type port: int
58     @param port: udp port
59     @type processor: L{confd.server.ConfdProcessor}
60     @param processor: ConfdProcessor to use to handle queries
61
62     """
63     daemon.AsyncUDPSocket.__init__(self)
64     self.bind_address = bind_address
65     self.port = port
66     self.processor = processor
67     self.bind((bind_address, port))
68     logging.debug("listening on ('%s':%d)" % (bind_address, port))
69
70   # this method is overriding a daemon.AsyncUDPSocket method
71   def handle_datagram(self, payload_in, ip, port):
72     payload_out =  self.processor.ExecQuery(payload_in, ip, port)
73     if payload_out is not None:
74       self.enqueue_send(ip, port, payload_out)
75
76
77 class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
78
79   def __init__(self, watch_manager, callback,
80                file=constants.CLUSTER_CONF_FILE):
81     """Constructor for ConfdInotifyEventHandler
82
83     @type watch_manager: L{pyinotify.WatchManager}
84     @param watch_manager: ganeti-confd inotify watch manager
85     @type callback: function accepting a boolean
86     @param callback: function to call when an inotify event happens
87     @type file: string
88     @param file: config file to watch
89
90     """
91     # no need to call the parent's constructor
92     self.watch_manager = watch_manager
93     self.callback = callback
94     self.mask = pyinotify.EventsCodes.IN_IGNORED | \
95                 pyinotify.EventsCodes.IN_MODIFY
96     self.file = file
97     self.watch_handle = None
98
99   def enable(self):
100     """Watch the given file
101
102     """
103     if self.watch_handle is None:
104       result = self.watch_manager.add_watch(self.file, self.mask)
105       if not self.file in result or result[self.file] <= 0:
106         raise errors.InotifyError("Could not add inotify watcher")
107       else:
108         self.watch_handle = result[self.file]
109
110   def disable(self):
111     """Stop watching the given file
112
113     """
114     if self.watch_handle is not None:
115       result = self.watch_manager.rm_watch(self.watch_handle)
116       if result[self.watch_handle]:
117         self.watch_handle = None
118
119   def process_IN_IGNORED(self, event):
120     # Due to the fact that we monitor just for the cluster config file (rather
121     # than for the whole data dir) when the file is replaced with another one
122     # (which is what happens normally in ganeti) we're going to receive an
123     # IN_IGNORED event from inotify, because of the file removal (which is
124     # contextual with the replacement). In such a case we need to create
125     # another watcher for the "new" file.
126     logging.debug("Received 'ignored' inotify event for %s" % event.path)
127     self.watch_handle = None
128
129     try:
130       # Since the kernel believes the file we were interested in is gone, it's
131       # not going to notify us of any other events, until we set up, here, the
132       # new watch. This is not a race condition, though, since we're anyway
133       # going to realod the file after setting up the new watch.
134       self.callback(False)
135     except errors.ConfdFatalError, err:
136       logging.critical("Critical error, shutting down: %s" % err)
137       sys.exit(constants.EXIT_FAILURE)
138     except:
139       # we need to catch any exception here, log it, but proceed, because even
140       # if we failed handling a single request, we still want the confd to
141       # continue working.
142       logging.error("Unexpected exception", exc_info=True)
143
144   def process_IN_MODIFY(self, event):
145     # This gets called when the config file is modified. Note that this doesn't
146     # usually happen in Ganeti, as the config file is normally replaced by a
147     # new one, at filesystem level, rather than actually modified (see
148     # utils.WriteFile)
149     logging.debug("Received 'modify' inotify event for %s" % event.path)
150
151     try:
152       self.callback(True)
153     except errors.ConfdFatalError, err:
154       logging.critical("Critical error, shutting down: %s" % err)
155       sys.exit(constants.EXIT_FAILURE)
156     except:
157       # we need to catch any exception here, log it, but proceed, because even
158       # if we failed handling a single request, we still want the confd to
159       # continue working.
160       logging.error("Unexpected exception", exc_info=True)
161
162   def process_default(self, event):
163     logging.error("Received unhandled inotify event: %s" % event)
164
165
166 class ConfdConfigurationReloader(object):
167   """Logic to control when to reload the ganeti configuration
168
169   This class is able to alter between inotify and polling, to rate-limit the
170   number of reloads. When using inotify it also supports a fallback timed
171   check, to verify that the reload hasn't failed.
172
173   """
174   def __init__(self, processor, mainloop):
175     """Constructor for ConfdConfigurationReloader
176
177     @type processor: L{confd.server.ConfdProcessor}
178     @param processor: ganeti-confd ConfdProcessor
179     @type mainloop: L{daemon.Mainloop}
180     @param mainloop: ganeti-confd mainloop
181
182     """
183     self.processor = processor
184     self.mainloop = mainloop
185
186     self.polling = True
187     self.last_notification = 0
188
189     # Asyncronous inotify handler for config changes
190     self.wm = pyinotify.WatchManager()
191     self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
192     self.notifier = AsyncNotifier(self.wm, self.inotify_handler)
193
194     self.timer_handle = None
195     self._EnableTimer()
196
197   def OnInotify(self, notifier_enabled):
198     """Receive an inotify notification.
199
200     @type notifier_enabled: boolean
201     @param notifier_enabled: whether the notifier is still enabled
202
203     """
204     current_time = time.time()
205     time_delta = current_time - self.last_notification
206     self.last_notification = current_time
207
208     if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
209       logging.debug("Moving from inotify mode to polling mode")
210       self.polling = True
211       if notifier_enabled:
212         self.inotify_handler.disable()
213
214     if not self.polling and not notifier_enabled:
215       try:
216         self.inotify_handler.enable()
217       except errors.InotifyError:
218         self.polling = True
219
220     try:
221       reloaded = self.processor.reader.Reload()
222       if reloaded:
223         logging.info("Reloaded ganeti config")
224       else:
225         logging.debug("Skipped double config reload")
226     except errors.ConfigurationError:
227       self.DisableConfd()
228       self.inotify_handler.disable()
229       return
230
231     # Reset the timer. If we're polling it will go to the polling rate, if
232     # we're not it will delay it again to its base safe timeout.
233     self._ResetTimer()
234
235   def _DisableTimer(self):
236     if self.timer_handle is not None:
237       self.mainloop.scheduler.cancel(self.timer_handle)
238       self.timer_handle = None
239
240   def _EnableTimer(self):
241     if self.polling:
242       timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
243     else:
244       timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
245
246     if self.timer_handle is None:
247       self.timer_handle = self.mainloop.scheduler.enter(
248         timeout, 1, self.OnTimer, [])
249
250   def _ResetTimer(self):
251     self._DisableTimer()
252     self._EnableTimer()
253
254   def OnTimer(self):
255     """Function called when the timer fires
256
257     """
258     self.timer_handle = None
259     reloaded = False
260     was_disabled = False
261     try:
262       if self.processor.reader is None:
263         was_disabled = True
264         self.EnableConfd()
265         reloaded = True
266       else:
267         reloaded = self.processor.reader.Reload()
268     except errors.ConfigurationError:
269       self.DisableConfd(silent=was_disabled)
270       return
271
272     if self.polling and reloaded:
273       logging.info("Reloaded ganeti config")
274     elif reloaded:
275       # We have reloaded the config files, but received no inotify event.  If
276       # an event is pending though, we just happen to have timed out before
277       # receiving it, so this is not a problem, and we shouldn't alert
278       if not self.notifier.check_events() and not was_disabled:
279         logging.warning("Config file reload at timeout (inotify failure)")
280     elif self.polling:
281       # We're polling, but we haven't reloaded the config:
282       # Going back to inotify mode
283       logging.debug("Moving from polling mode to inotify mode")
284       self.polling = False
285       try:
286         self.inotify_handler.enable()
287       except errors.InotifyError:
288         self.polling = True
289     else:
290       logging.debug("Performed configuration check")
291
292     self._EnableTimer()
293
294   def DisableConfd(self, silent=False):
295     """Puts confd in non-serving mode
296
297     """
298     if not silent:
299       logging.warning("Confd is being disabled")
300     self.processor.Disable()
301     self.polling = False
302     self._ResetTimer()
303
304   def EnableConfd(self):
305     self.processor.Enable()
306     logging.warning("Confd is being enabled")
307     self.polling = True
308     self._ResetTimer()
309
310
311 def CheckConfd(options, args):
312   """Initial checks whether to run exit with a failure.
313
314   """
315   # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
316   # have more than one.
317   if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
318     print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
319     sys.exit(constants.EXIT_FAILURE)
320
321
322 def ExecConfd(options, args):
323   """Main confd function, executed with PID file held
324
325   """
326   mainloop = daemon.Mainloop()
327
328   # Asyncronous confd UDP server
329   processor = ConfdProcessor()
330   try:
331     processor.Enable()
332   except errors.ConfigurationError:
333     # If enabling the processor has failed, we can still go on, but confd will
334     # be disabled
335     logging.warning("Confd is starting in disabled mode")
336     pass
337   server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
338
339   # Configuration reloader
340   reloader = ConfdConfigurationReloader(processor, mainloop)
341
342   mainloop.Run()
343
344
345 def main():
346   """Main function for the confd daemon.
347
348   """
349   parser = OptionParser(description="Ganeti configuration daemon",
350                         usage="%prog [-f] [-d] [-b ADDRESS]",
351                         version="%%prog (ganeti) %s" %
352                         constants.RELEASE_VERSION)
353
354   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
355   dirs.append((constants.LOG_OS_DIR, 0750))
356   dirs.append((constants.LOCK_DIR, 1777))
357   daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
358
359
360 if __name__ == "__main__":
361   main()