Move fourcc packing/unpacking to main confd module
[ganeti-local] / daemons / ganeti-confd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2009, Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti configuration daemon
23
24 Ganeti-confd is a daemon to query master candidates for configuration values.
25 It uses UDP+HMAC for authentication with a global cluster key.
26
27 """
28
29 import os
30 import sys
31 import logging
32 import asyncore
33 import socket
34 import pyinotify
35 import time
36 import errno
37
38 from optparse import OptionParser
39
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import daemon
43 from ganeti import ssconf
44 from ganeti.asyncnotifier import AsyncNotifier
45 from ganeti.confd.server import ConfdProcessor
46 from ganeti.confd import PackMagic, UnpackMagic
47
48
49 class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
50   """The confd udp server, suitable for use with asyncore.
51
52   """
53   def __init__(self, bind_address, port, processor):
54     """Constructor for ConfdAsyncUDPServer
55
56     @type bind_address: string
57     @param bind_address: socket bind address ('' for all)
58     @type port: int
59     @param port: udp port
60     @type processor: L{confd.server.ConfdProcessor}
61     @param processor: ConfdProcessor to use to handle queries
62
63     """
64     daemon.AsyncUDPSocket.__init__(self)
65     self.bind_address = bind_address
66     self.port = port
67     self.processor = processor
68     self.bind((bind_address, port))
69     logging.debug("listening on ('%s':%d)" % (bind_address, port))
70
71   # this method is overriding a daemon.AsyncUDPSocket method
72   def handle_datagram(self, payload_in, ip, port):
73     try:
74       query = UnpackMagic(payload_in)
75     except errors.ConfdMagicError, err:
76       logging.debug(err)
77       return
78
79     answer =  self.processor.ExecQuery(query, ip, port)
80     if answer is not None:
81       try:
82         self.enqueue_send(ip, port, PackMagic(answer))
83       except errors.UdpDataSizeError:
84         logging.error("Reply too big to fit in an udp packet.")
85
86
87 class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
88
89   def __init__(self, watch_manager, callback,
90                file=constants.CLUSTER_CONF_FILE):
91     """Constructor for ConfdInotifyEventHandler
92
93     @type watch_manager: L{pyinotify.WatchManager}
94     @param watch_manager: ganeti-confd inotify watch manager
95     @type callback: function accepting a boolean
96     @param callback: function to call when an inotify event happens
97     @type file: string
98     @param file: config file to watch
99
100     """
101     # no need to call the parent's constructor
102     self.watch_manager = watch_manager
103     self.callback = callback
104     self.mask = pyinotify.EventsCodes.IN_IGNORED | \
105                 pyinotify.EventsCodes.IN_MODIFY
106     self.file = file
107     self.watch_handle = None
108
109   def enable(self):
110     """Watch the given file
111
112     """
113     if self.watch_handle is None:
114       result = self.watch_manager.add_watch(self.file, self.mask)
115       if not self.file in result or result[self.file] <= 0:
116         raise errors.InotifyError("Could not add inotify watcher")
117       else:
118         self.watch_handle = result[self.file]
119
120   def disable(self):
121     """Stop watching the given file
122
123     """
124     if self.watch_handle is not None:
125       result = self.watch_manager.rm_watch(self.watch_handle)
126       if result[self.watch_handle]:
127         self.watch_handle = None
128
129   def process_IN_IGNORED(self, event):
130     # Due to the fact that we monitor just for the cluster config file (rather
131     # than for the whole data dir) when the file is replaced with another one
132     # (which is what happens normally in ganeti) we're going to receive an
133     # IN_IGNORED event from inotify, because of the file removal (which is
134     # contextual with the replacement). In such a case we need to create
135     # another watcher for the "new" file.
136     logging.debug("Received 'ignored' inotify event for %s" % event.path)
137     self.watch_handle = None
138
139     try:
140       # Since the kernel believes the file we were interested in is gone, it's
141       # not going to notify us of any other events, until we set up, here, the
142       # new watch. This is not a race condition, though, since we're anyway
143       # going to realod the file after setting up the new watch.
144       self.callback(False)
145     except errors.ConfdFatalError, err:
146       logging.critical("Critical error, shutting down: %s" % err)
147       sys.exit(constants.EXIT_FAILURE)
148     except:
149       # we need to catch any exception here, log it, but proceed, because even
150       # if we failed handling a single request, we still want the confd to
151       # continue working.
152       logging.error("Unexpected exception", exc_info=True)
153
154   def process_IN_MODIFY(self, event):
155     # This gets called when the config file is modified. Note that this doesn't
156     # usually happen in Ganeti, as the config file is normally replaced by a
157     # new one, at filesystem level, rather than actually modified (see
158     # utils.WriteFile)
159     logging.debug("Received 'modify' inotify event for %s" % event.path)
160
161     try:
162       self.callback(True)
163     except errors.ConfdFatalError, err:
164       logging.critical("Critical error, shutting down: %s" % err)
165       sys.exit(constants.EXIT_FAILURE)
166     except:
167       # we need to catch any exception here, log it, but proceed, because even
168       # if we failed handling a single request, we still want the confd to
169       # continue working.
170       logging.error("Unexpected exception", exc_info=True)
171
172   def process_default(self, event):
173     logging.error("Received unhandled inotify event: %s" % event)
174
175
176 class ConfdConfigurationReloader(object):
177   """Logic to control when to reload the ganeti configuration
178
179   This class is able to alter between inotify and polling, to rate-limit the
180   number of reloads. When using inotify it also supports a fallback timed
181   check, to verify that the reload hasn't failed.
182
183   """
184   def __init__(self, processor, mainloop):
185     """Constructor for ConfdConfigurationReloader
186
187     @type processor: L{confd.server.ConfdProcessor}
188     @param processor: ganeti-confd ConfdProcessor
189     @type mainloop: L{daemon.Mainloop}
190     @param mainloop: ganeti-confd mainloop
191
192     """
193     self.processor = processor
194     self.mainloop = mainloop
195
196     self.polling = True
197     self.last_notification = 0
198
199     # Asyncronous inotify handler for config changes
200     self.wm = pyinotify.WatchManager()
201     self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
202     self.notifier = AsyncNotifier(self.wm, self.inotify_handler)
203
204     self.timer_handle = None
205     self._EnableTimer()
206
207   def OnInotify(self, notifier_enabled):
208     """Receive an inotify notification.
209
210     @type notifier_enabled: boolean
211     @param notifier_enabled: whether the notifier is still enabled
212
213     """
214     current_time = time.time()
215     time_delta = current_time - self.last_notification
216     self.last_notification = current_time
217
218     if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
219       logging.debug("Moving from inotify mode to polling mode")
220       self.polling = True
221       if notifier_enabled:
222         self.inotify_handler.disable()
223
224     if not self.polling and not notifier_enabled:
225       try:
226         self.inotify_handler.enable()
227       except errors.InotifyError:
228         self.polling = True
229
230     try:
231       reloaded = self.processor.reader.Reload()
232       if reloaded:
233         logging.info("Reloaded ganeti config")
234       else:
235         logging.debug("Skipped double config reload")
236     except errors.ConfigurationError:
237       self.DisableConfd()
238       self.inotify_handler.disable()
239       return
240
241     # Reset the timer. If we're polling it will go to the polling rate, if
242     # we're not it will delay it again to its base safe timeout.
243     self._ResetTimer()
244
245   def _DisableTimer(self):
246     if self.timer_handle is not None:
247       self.mainloop.scheduler.cancel(self.timer_handle)
248       self.timer_handle = None
249
250   def _EnableTimer(self):
251     if self.polling:
252       timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
253     else:
254       timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
255
256     if self.timer_handle is None:
257       self.timer_handle = self.mainloop.scheduler.enter(
258         timeout, 1, self.OnTimer, [])
259
260   def _ResetTimer(self):
261     self._DisableTimer()
262     self._EnableTimer()
263
264   def OnTimer(self):
265     """Function called when the timer fires
266
267     """
268     self.timer_handle = None
269     reloaded = False
270     was_disabled = False
271     try:
272       if self.processor.reader is None:
273         was_disabled = True
274         self.EnableConfd()
275         reloaded = True
276       else:
277         reloaded = self.processor.reader.Reload()
278     except errors.ConfigurationError:
279       self.DisableConfd(silent=was_disabled)
280       return
281
282     if self.polling and reloaded:
283       logging.info("Reloaded ganeti config")
284     elif reloaded:
285       # We have reloaded the config files, but received no inotify event.  If
286       # an event is pending though, we just happen to have timed out before
287       # receiving it, so this is not a problem, and we shouldn't alert
288       if not self.notifier.check_events() and not was_disabled:
289         logging.warning("Config file reload at timeout (inotify failure)")
290     elif self.polling:
291       # We're polling, but we haven't reloaded the config:
292       # Going back to inotify mode
293       logging.debug("Moving from polling mode to inotify mode")
294       self.polling = False
295       try:
296         self.inotify_handler.enable()
297       except errors.InotifyError:
298         self.polling = True
299     else:
300       logging.debug("Performed configuration check")
301
302     self._EnableTimer()
303
304   def DisableConfd(self, silent=False):
305     """Puts confd in non-serving mode
306
307     """
308     if not silent:
309       logging.warning("Confd is being disabled")
310     self.processor.Disable()
311     self.polling = False
312     self._ResetTimer()
313
314   def EnableConfd(self):
315     self.processor.Enable()
316     logging.warning("Confd is being enabled")
317     self.polling = True
318     self._ResetTimer()
319
320
321 def CheckConfd(options, args):
322   """Initial checks whether to run exit with a failure.
323
324   """
325   # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
326   # have more than one.
327   if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
328     print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
329     sys.exit(constants.EXIT_FAILURE)
330
331
332 def ExecConfd(options, args):
333   """Main confd function, executed with PID file held
334
335   """
336   mainloop = daemon.Mainloop()
337
338   # Asyncronous confd UDP server
339   processor = ConfdProcessor()
340   try:
341     processor.Enable()
342   except errors.ConfigurationError:
343     # If enabling the processor has failed, we can still go on, but confd will
344     # be disabled
345     logging.warning("Confd is starting in disabled mode")
346     pass
347   server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
348
349   # Configuration reloader
350   reloader = ConfdConfigurationReloader(processor, mainloop)
351
352   mainloop.Run()
353
354
355 def main():
356   """Main function for the confd daemon.
357
358   """
359   parser = OptionParser(description="Ganeti configuration daemon",
360                         usage="%prog [-f] [-d] [-b ADDRESS]",
361                         version="%%prog (ganeti) %s" %
362                         constants.RELEASE_VERSION)
363
364   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
365   dirs.append((constants.LOG_OS_DIR, 0750))
366   dirs.append((constants.LOCK_DIR, 1777))
367   daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
368
369
370 if __name__ == "__main__":
371   main()