ganeti-confd: explicitely log failed big sends
[ganeti-local] / daemons / ganeti-confd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2009, Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti configuration daemon
23
24 Ganeti-confd is a daemon to query master candidates for configuration values.
25 It uses UDP+HMAC for authentication with a global cluster key.
26
27 """
28
29 import os
30 import sys
31 import logging
32 import asyncore
33 import socket
34 import pyinotify
35 import time
36 import errno
37
38 from optparse import OptionParser
39
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import daemon
43 from ganeti import ssconf
44 from ganeti.asyncnotifier import AsyncNotifier
45 from ganeti.confd.server import ConfdProcessor
46
47
48 class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
49   """The confd udp server, suitable for use with asyncore.
50
51   """
52   def __init__(self, bind_address, port, processor):
53     """Constructor for ConfdAsyncUDPServer
54
55     @type bind_address: string
56     @param bind_address: socket bind address ('' for all)
57     @type port: int
58     @param port: udp port
59     @type processor: L{confd.server.ConfdProcessor}
60     @param processor: ConfdProcessor to use to handle queries
61
62     """
63     daemon.AsyncUDPSocket.__init__(self)
64     self.bind_address = bind_address
65     self.port = port
66     self.processor = processor
67     self.bind((bind_address, port))
68     logging.debug("listening on ('%s':%d)" % (bind_address, port))
69
70   # this method is overriding a daemon.AsyncUDPSocket method
71   def handle_datagram(self, payload_in, ip, port):
72
73     if len(payload_in) < len(constants.CONFD_MAGIC_FOURCC):
74       logging.debug("Received a query which is too short to be true")
75       return
76
77     magic_number = payload_in[:4]
78     query = payload_in[4:]
79
80     if magic_number != constants.CONFD_MAGIC_FOURCC:
81       logging.debug("Received a query with an unknown magic number")
82       return
83
84     answer =  self.processor.ExecQuery(query, ip, port)
85     if answer is not None:
86       payload_out = ''.join([constants.CONFD_MAGIC_FOURCC, answer])
87       try:
88         self.enqueue_send(ip, port, payload_out)
89       except errors.UdpDataSizeError:
90         logging.error("Reply too big to fit in an udp packet.")
91
92
93 class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
94
95   def __init__(self, watch_manager, callback,
96                file=constants.CLUSTER_CONF_FILE):
97     """Constructor for ConfdInotifyEventHandler
98
99     @type watch_manager: L{pyinotify.WatchManager}
100     @param watch_manager: ganeti-confd inotify watch manager
101     @type callback: function accepting a boolean
102     @param callback: function to call when an inotify event happens
103     @type file: string
104     @param file: config file to watch
105
106     """
107     # no need to call the parent's constructor
108     self.watch_manager = watch_manager
109     self.callback = callback
110     self.mask = pyinotify.EventsCodes.IN_IGNORED | \
111                 pyinotify.EventsCodes.IN_MODIFY
112     self.file = file
113     self.watch_handle = None
114
115   def enable(self):
116     """Watch the given file
117
118     """
119     if self.watch_handle is None:
120       result = self.watch_manager.add_watch(self.file, self.mask)
121       if not self.file in result or result[self.file] <= 0:
122         raise errors.InotifyError("Could not add inotify watcher")
123       else:
124         self.watch_handle = result[self.file]
125
126   def disable(self):
127     """Stop watching the given file
128
129     """
130     if self.watch_handle is not None:
131       result = self.watch_manager.rm_watch(self.watch_handle)
132       if result[self.watch_handle]:
133         self.watch_handle = None
134
135   def process_IN_IGNORED(self, event):
136     # Due to the fact that we monitor just for the cluster config file (rather
137     # than for the whole data dir) when the file is replaced with another one
138     # (which is what happens normally in ganeti) we're going to receive an
139     # IN_IGNORED event from inotify, because of the file removal (which is
140     # contextual with the replacement). In such a case we need to create
141     # another watcher for the "new" file.
142     logging.debug("Received 'ignored' inotify event for %s" % event.path)
143     self.watch_handle = None
144
145     try:
146       # Since the kernel believes the file we were interested in is gone, it's
147       # not going to notify us of any other events, until we set up, here, the
148       # new watch. This is not a race condition, though, since we're anyway
149       # going to realod the file after setting up the new watch.
150       self.callback(False)
151     except errors.ConfdFatalError, err:
152       logging.critical("Critical error, shutting down: %s" % err)
153       sys.exit(constants.EXIT_FAILURE)
154     except:
155       # we need to catch any exception here, log it, but proceed, because even
156       # if we failed handling a single request, we still want the confd to
157       # continue working.
158       logging.error("Unexpected exception", exc_info=True)
159
160   def process_IN_MODIFY(self, event):
161     # This gets called when the config file is modified. Note that this doesn't
162     # usually happen in Ganeti, as the config file is normally replaced by a
163     # new one, at filesystem level, rather than actually modified (see
164     # utils.WriteFile)
165     logging.debug("Received 'modify' inotify event for %s" % event.path)
166
167     try:
168       self.callback(True)
169     except errors.ConfdFatalError, err:
170       logging.critical("Critical error, shutting down: %s" % err)
171       sys.exit(constants.EXIT_FAILURE)
172     except:
173       # we need to catch any exception here, log it, but proceed, because even
174       # if we failed handling a single request, we still want the confd to
175       # continue working.
176       logging.error("Unexpected exception", exc_info=True)
177
178   def process_default(self, event):
179     logging.error("Received unhandled inotify event: %s" % event)
180
181
182 class ConfdConfigurationReloader(object):
183   """Logic to control when to reload the ganeti configuration
184
185   This class is able to alter between inotify and polling, to rate-limit the
186   number of reloads. When using inotify it also supports a fallback timed
187   check, to verify that the reload hasn't failed.
188
189   """
190   def __init__(self, processor, mainloop):
191     """Constructor for ConfdConfigurationReloader
192
193     @type processor: L{confd.server.ConfdProcessor}
194     @param processor: ganeti-confd ConfdProcessor
195     @type mainloop: L{daemon.Mainloop}
196     @param mainloop: ganeti-confd mainloop
197
198     """
199     self.processor = processor
200     self.mainloop = mainloop
201
202     self.polling = True
203     self.last_notification = 0
204
205     # Asyncronous inotify handler for config changes
206     self.wm = pyinotify.WatchManager()
207     self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
208     self.notifier = AsyncNotifier(self.wm, self.inotify_handler)
209
210     self.timer_handle = None
211     self._EnableTimer()
212
213   def OnInotify(self, notifier_enabled):
214     """Receive an inotify notification.
215
216     @type notifier_enabled: boolean
217     @param notifier_enabled: whether the notifier is still enabled
218
219     """
220     current_time = time.time()
221     time_delta = current_time - self.last_notification
222     self.last_notification = current_time
223
224     if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
225       logging.debug("Moving from inotify mode to polling mode")
226       self.polling = True
227       if notifier_enabled:
228         self.inotify_handler.disable()
229
230     if not self.polling and not notifier_enabled:
231       try:
232         self.inotify_handler.enable()
233       except errors.InotifyError:
234         self.polling = True
235
236     try:
237       reloaded = self.processor.reader.Reload()
238       if reloaded:
239         logging.info("Reloaded ganeti config")
240       else:
241         logging.debug("Skipped double config reload")
242     except errors.ConfigurationError:
243       self.DisableConfd()
244       self.inotify_handler.disable()
245       return
246
247     # Reset the timer. If we're polling it will go to the polling rate, if
248     # we're not it will delay it again to its base safe timeout.
249     self._ResetTimer()
250
251   def _DisableTimer(self):
252     if self.timer_handle is not None:
253       self.mainloop.scheduler.cancel(self.timer_handle)
254       self.timer_handle = None
255
256   def _EnableTimer(self):
257     if self.polling:
258       timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
259     else:
260       timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
261
262     if self.timer_handle is None:
263       self.timer_handle = self.mainloop.scheduler.enter(
264         timeout, 1, self.OnTimer, [])
265
266   def _ResetTimer(self):
267     self._DisableTimer()
268     self._EnableTimer()
269
270   def OnTimer(self):
271     """Function called when the timer fires
272
273     """
274     self.timer_handle = None
275     reloaded = False
276     was_disabled = False
277     try:
278       if self.processor.reader is None:
279         was_disabled = True
280         self.EnableConfd()
281         reloaded = True
282       else:
283         reloaded = self.processor.reader.Reload()
284     except errors.ConfigurationError:
285       self.DisableConfd(silent=was_disabled)
286       return
287
288     if self.polling and reloaded:
289       logging.info("Reloaded ganeti config")
290     elif reloaded:
291       # We have reloaded the config files, but received no inotify event.  If
292       # an event is pending though, we just happen to have timed out before
293       # receiving it, so this is not a problem, and we shouldn't alert
294       if not self.notifier.check_events() and not was_disabled:
295         logging.warning("Config file reload at timeout (inotify failure)")
296     elif self.polling:
297       # We're polling, but we haven't reloaded the config:
298       # Going back to inotify mode
299       logging.debug("Moving from polling mode to inotify mode")
300       self.polling = False
301       try:
302         self.inotify_handler.enable()
303       except errors.InotifyError:
304         self.polling = True
305     else:
306       logging.debug("Performed configuration check")
307
308     self._EnableTimer()
309
310   def DisableConfd(self, silent=False):
311     """Puts confd in non-serving mode
312
313     """
314     if not silent:
315       logging.warning("Confd is being disabled")
316     self.processor.Disable()
317     self.polling = False
318     self._ResetTimer()
319
320   def EnableConfd(self):
321     self.processor.Enable()
322     logging.warning("Confd is being enabled")
323     self.polling = True
324     self._ResetTimer()
325
326
327 def CheckConfd(options, args):
328   """Initial checks whether to run exit with a failure.
329
330   """
331   # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
332   # have more than one.
333   if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
334     print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
335     sys.exit(constants.EXIT_FAILURE)
336
337
338 def ExecConfd(options, args):
339   """Main confd function, executed with PID file held
340
341   """
342   mainloop = daemon.Mainloop()
343
344   # Asyncronous confd UDP server
345   processor = ConfdProcessor()
346   try:
347     processor.Enable()
348   except errors.ConfigurationError:
349     # If enabling the processor has failed, we can still go on, but confd will
350     # be disabled
351     logging.warning("Confd is starting in disabled mode")
352     pass
353   server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
354
355   # Configuration reloader
356   reloader = ConfdConfigurationReloader(processor, mainloop)
357
358   mainloop.Run()
359
360
361 def main():
362   """Main function for the confd daemon.
363
364   """
365   parser = OptionParser(description="Ganeti configuration daemon",
366                         usage="%prog [-f] [-d] [-b ADDRESS]",
367                         version="%%prog (ganeti) %s" %
368                         constants.RELEASE_VERSION)
369
370   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
371   dirs.append((constants.LOG_OS_DIR, 0750))
372   dirs.append((constants.LOCK_DIR, 1777))
373   daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
374
375
376 if __name__ == "__main__":
377   main()