ErrorLoggingAsyncNotifier
[ganeti-local] / daemons / ganeti-confd
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2009, Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti configuration daemon
23
24 Ganeti-confd is a daemon to query master candidates for configuration values.
25 It uses UDP+HMAC for authentication with a global cluster key.
26
27 """
28
29 # pylint: disable-msg=C0103
30 # C0103: Invalid name ganeti-confd
31
32 import os
33 import sys
34 import logging
35 import time
36
37 try:
38   # pylint: disable-msg=E0611
39   from pyinotify import pyinotify
40 except ImportError:
41   import pyinotify
42
43 from optparse import OptionParser
44
45 from ganeti import asyncnotifier
46 from ganeti import confd
47 from ganeti.confd import server as confd_server
48 from ganeti import constants
49 from ganeti import errors
50 from ganeti import daemon
51
52
53 class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
54   """The confd udp server, suitable for use with asyncore.
55
56   """
57   def __init__(self, bind_address, port, processor):
58     """Constructor for ConfdAsyncUDPServer
59
60     @type bind_address: string
61     @param bind_address: socket bind address ('' for all)
62     @type port: int
63     @param port: udp port
64     @type processor: L{confd.server.ConfdProcessor}
65     @param processor: ConfdProcessor to use to handle queries
66
67     """
68     daemon.AsyncUDPSocket.__init__(self)
69     self.bind_address = bind_address
70     self.port = port
71     self.processor = processor
72     self.bind((bind_address, port))
73     logging.debug("listening on ('%s':%d)", bind_address, port)
74
75   # this method is overriding a daemon.AsyncUDPSocket method
76   def handle_datagram(self, payload_in, ip, port):
77     try:
78       query = confd.UnpackMagic(payload_in)
79     except errors.ConfdMagicError, err:
80       logging.debug(err)
81       return
82
83     answer =  self.processor.ExecQuery(query, ip, port)
84     if answer is not None:
85       try:
86         self.enqueue_send(ip, port, confd.PackMagic(answer))
87       except errors.UdpDataSizeError:
88         logging.error("Reply too big to fit in an udp packet.")
89
90
91 class ConfdConfigurationReloader(object):
92   """Logic to control when to reload the ganeti configuration
93
94   This class is able to alter between inotify and polling, to rate-limit the
95   number of reloads. When using inotify it also supports a fallback timed
96   check, to verify that the reload hasn't failed.
97
98   """
99   def __init__(self, processor, mainloop):
100     """Constructor for ConfdConfigurationReloader
101
102     @type processor: L{confd.server.ConfdProcessor}
103     @param processor: ganeti-confd ConfdProcessor
104     @type mainloop: L{daemon.Mainloop}
105     @param mainloop: ganeti-confd mainloop
106
107     """
108     self.processor = processor
109     self.mainloop = mainloop
110
111     self.polling = True
112     self.last_notification = 0
113
114     # Asyncronous inotify handler for config changes
115     cfg_file = constants.CLUSTER_CONF_FILE
116     self.wm = pyinotify.WatchManager()
117     self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
118                                                                 self.OnInotify,
119                                                                 cfg_file)
120     notifier_class = asyncnotifier.ErrorLoggingAsyncNotifier
121     self.notifier = notifier_class(self.wm, self.inotify_handler)
122
123     self.timer_handle = None
124     self._EnableTimer()
125
126   def OnInotify(self, notifier_enabled):
127     """Receive an inotify notification.
128
129     @type notifier_enabled: boolean
130     @param notifier_enabled: whether the notifier is still enabled
131
132     """
133     current_time = time.time()
134     time_delta = current_time - self.last_notification
135     self.last_notification = current_time
136
137     if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
138       logging.debug("Moving from inotify mode to polling mode")
139       self.polling = True
140       if notifier_enabled:
141         self.inotify_handler.disable()
142
143     if not self.polling and not notifier_enabled:
144       try:
145         self.inotify_handler.enable()
146       except errors.InotifyError:
147         self.polling = True
148
149     try:
150       reloaded = self.processor.reader.Reload()
151       if reloaded:
152         logging.info("Reloaded ganeti config")
153       else:
154         logging.debug("Skipped double config reload")
155     except errors.ConfigurationError:
156       self.DisableConfd()
157       self.inotify_handler.disable()
158       return
159
160     # Reset the timer. If we're polling it will go to the polling rate, if
161     # we're not it will delay it again to its base safe timeout.
162     self._ResetTimer()
163
164   def _DisableTimer(self):
165     if self.timer_handle is not None:
166       self.mainloop.scheduler.cancel(self.timer_handle)
167       self.timer_handle = None
168
169   def _EnableTimer(self):
170     if self.polling:
171       timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
172     else:
173       timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
174
175     if self.timer_handle is None:
176       self.timer_handle = self.mainloop.scheduler.enter(
177         timeout, 1, self.OnTimer, [])
178
179   def _ResetTimer(self):
180     self._DisableTimer()
181     self._EnableTimer()
182
183   def OnTimer(self):
184     """Function called when the timer fires
185
186     """
187     self.timer_handle = None
188     reloaded = False
189     was_disabled = False
190     try:
191       if self.processor.reader is None:
192         was_disabled = True
193         self.EnableConfd()
194         reloaded = True
195       else:
196         reloaded = self.processor.reader.Reload()
197     except errors.ConfigurationError:
198       self.DisableConfd(silent=was_disabled)
199       return
200
201     if self.polling and reloaded:
202       logging.info("Reloaded ganeti config")
203     elif reloaded:
204       # We have reloaded the config files, but received no inotify event.  If
205       # an event is pending though, we just happen to have timed out before
206       # receiving it, so this is not a problem, and we shouldn't alert
207       if not self.notifier.check_events() and not was_disabled:
208         logging.warning("Config file reload at timeout (inotify failure)")
209     elif self.polling:
210       # We're polling, but we haven't reloaded the config:
211       # Going back to inotify mode
212       logging.debug("Moving from polling mode to inotify mode")
213       self.polling = False
214       try:
215         self.inotify_handler.enable()
216       except errors.InotifyError:
217         self.polling = True
218     else:
219       logging.debug("Performed configuration check")
220
221     self._EnableTimer()
222
223   def DisableConfd(self, silent=False):
224     """Puts confd in non-serving mode
225
226     """
227     if not silent:
228       logging.warning("Confd is being disabled")
229     self.processor.Disable()
230     self.polling = False
231     self._ResetTimer()
232
233   def EnableConfd(self):
234     self.processor.Enable()
235     logging.warning("Confd is being enabled")
236     self.polling = True
237     self._ResetTimer()
238
239
240 def CheckConfd(_, args):
241   """Initial checks whether to run exit with a failure.
242
243   """
244   if args: # confd doesn't take any arguments
245     print >> sys.stderr, ("Usage: %s [-f] [-d] [-b ADDRESS]" % sys.argv[0])
246     sys.exit(constants.EXIT_FAILURE)
247
248   # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
249   # have more than one.
250   if not os.path.isfile(constants.CONFD_HMAC_KEY):
251     print >> sys.stderr, "Need HMAC key %s to run" % constants.CONFD_HMAC_KEY
252     sys.exit(constants.EXIT_FAILURE)
253
254
255 def ExecConfd(options, _):
256   """Main confd function, executed with PID file held
257
258   """
259   # TODO: clarify how the server and reloader variables work (they are
260   # not used)
261   # pylint: disable-msg=W0612
262   mainloop = daemon.Mainloop()
263
264   # Asyncronous confd UDP server
265   processor = confd_server.ConfdProcessor()
266   try:
267     processor.Enable()
268   except errors.ConfigurationError:
269     # If enabling the processor has failed, we can still go on, but confd will
270     # be disabled
271     logging.warning("Confd is starting in disabled mode")
272
273   server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
274
275   # Configuration reloader
276   reloader = ConfdConfigurationReloader(processor, mainloop)
277
278   mainloop.Run()
279
280
281 def main():
282   """Main function for the confd daemon.
283
284   """
285   parser = OptionParser(description="Ganeti configuration daemon",
286                         usage="%prog [-f] [-d] [-b ADDRESS]",
287                         version="%%prog (ganeti) %s" %
288                         constants.RELEASE_VERSION)
289
290   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
291   dirs.append((constants.LOCK_DIR, 1777))
292   daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
293
294
295 if __name__ == "__main__":
296   main()