Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-confd @ 07b8a2b5

History | View | Annotate | Download (11.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2009, Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti configuration daemon
23

    
24
Ganeti-confd is a daemon to query master candidates for configuration values.
25
It uses UDP+HMAC for authentication with a global cluster key.
26

    
27
"""
28

    
29
import os
30
import sys
31
import logging
32
import time
33

    
34
try:
35
  from pyinotify import pyinotify
36
except ImportError:
37
  import pyinotify
38

    
39
from optparse import OptionParser
40

    
41
from ganeti import asyncnotifier
42
from ganeti import confd
43
from ganeti.confd import server as confd_server
44
from ganeti import constants
45
from ganeti import errors
46
from ganeti import daemon
47
from ganeti import ssconf
48

    
49

    
50
class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
51
  """The confd udp server, suitable for use with asyncore.
52

    
53
  """
54
  def __init__(self, bind_address, port, processor):
55
    """Constructor for ConfdAsyncUDPServer
56

    
57
    @type bind_address: string
58
    @param bind_address: socket bind address ('' for all)
59
    @type port: int
60
    @param port: udp port
61
    @type processor: L{confd.server.ConfdProcessor}
62
    @param processor: ConfdProcessor to use to handle queries
63

    
64
    """
65
    daemon.AsyncUDPSocket.__init__(self)
66
    self.bind_address = bind_address
67
    self.port = port
68
    self.processor = processor
69
    self.bind((bind_address, port))
70
    logging.debug("listening on ('%s':%d)", bind_address, port)
71

    
72
  # this method is overriding a daemon.AsyncUDPSocket method
73
  def handle_datagram(self, payload_in, ip, port):
74
    try:
75
      query = confd.UnpackMagic(payload_in)
76
    except errors.ConfdMagicError, err:
77
      logging.debug(err)
78
      return
79

    
80
    answer =  self.processor.ExecQuery(query, ip, port)
81
    if answer is not None:
82
      try:
83
        self.enqueue_send(ip, port, confd.PackMagic(answer))
84
      except errors.UdpDataSizeError:
85
        logging.error("Reply too big to fit in an udp packet.")
86

    
87

    
88
class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
89

    
90
  def __init__(self, watch_manager, callback,
91
               file=constants.CLUSTER_CONF_FILE):
92
    """Constructor for ConfdInotifyEventHandler
93

    
94
    @type watch_manager: L{pyinotify.WatchManager}
95
    @param watch_manager: ganeti-confd inotify watch manager
96
    @type callback: function accepting a boolean
97
    @param callback: function to call when an inotify event happens
98
    @type file: string
99
    @param file: config file to watch
100

    
101
    """
102
    # no need to call the parent's constructor
103
    self.watch_manager = watch_manager
104
    self.callback = callback
105
    self.mask = pyinotify.EventsCodes.IN_IGNORED | \
106
                pyinotify.EventsCodes.IN_MODIFY
107
    self.file = file
108
    self.watch_handle = None
109

    
110
  def enable(self):
111
    """Watch the given file
112

    
113
    """
114
    if self.watch_handle is None:
115
      result = self.watch_manager.add_watch(self.file, self.mask)
116
      if not self.file in result or result[self.file] <= 0:
117
        raise errors.InotifyError("Could not add inotify watcher")
118
      else:
119
        self.watch_handle = result[self.file]
120

    
121
  def disable(self):
122
    """Stop watching the given file
123

    
124
    """
125
    if self.watch_handle is not None:
126
      result = self.watch_manager.rm_watch(self.watch_handle)
127
      if result[self.watch_handle]:
128
        self.watch_handle = None
129

    
130
  def process_IN_IGNORED(self, event):
131
    # Due to the fact that we monitor just for the cluster config file (rather
132
    # than for the whole data dir) when the file is replaced with another one
133
    # (which is what happens normally in ganeti) we're going to receive an
134
    # IN_IGNORED event from inotify, because of the file removal (which is
135
    # contextual with the replacement). In such a case we need to create
136
    # another watcher for the "new" file.
137
    logging.debug("Received 'ignored' inotify event for %s", event.path)
138
    self.watch_handle = None
139

    
140
    try:
141
      # Since the kernel believes the file we were interested in is gone, it's
142
      # not going to notify us of any other events, until we set up, here, the
143
      # new watch. This is not a race condition, though, since we're anyway
144
      # going to realod the file after setting up the new watch.
145
      self.callback(False)
146
    except errors.ConfdFatalError, err:
147
      logging.critical("Critical error, shutting down: %s", err)
148
      sys.exit(constants.EXIT_FAILURE)
149
    except:
150
      # we need to catch any exception here, log it, but proceed, because even
151
      # if we failed handling a single request, we still want the confd to
152
      # continue working.
153
      logging.error("Unexpected exception", exc_info=True)
154

    
155
  def process_IN_MODIFY(self, event):
156
    # This gets called when the config file is modified. Note that this doesn't
157
    # usually happen in Ganeti, as the config file is normally replaced by a
158
    # new one, at filesystem level, rather than actually modified (see
159
    # utils.WriteFile)
160
    logging.debug("Received 'modify' inotify event for %s", event.path)
161

    
162
    try:
163
      self.callback(True)
164
    except errors.ConfdFatalError, err:
165
      logging.critical("Critical error, shutting down: %s", err)
166
      sys.exit(constants.EXIT_FAILURE)
167
    except:
168
      # we need to catch any exception here, log it, but proceed, because even
169
      # if we failed handling a single request, we still want the confd to
170
      # continue working.
171
      logging.error("Unexpected exception", exc_info=True)
172

    
173
  def process_default(self, event):
174
    logging.error("Received unhandled inotify event: %s", event)
175

    
176

    
177
class ConfdConfigurationReloader(object):
178
  """Logic to control when to reload the ganeti configuration
179

    
180
  This class is able to alter between inotify and polling, to rate-limit the
181
  number of reloads. When using inotify it also supports a fallback timed
182
  check, to verify that the reload hasn't failed.
183

    
184
  """
185
  def __init__(self, processor, mainloop):
186
    """Constructor for ConfdConfigurationReloader
187

    
188
    @type processor: L{confd.server.ConfdProcessor}
189
    @param processor: ganeti-confd ConfdProcessor
190
    @type mainloop: L{daemon.Mainloop}
191
    @param mainloop: ganeti-confd mainloop
192

    
193
    """
194
    self.processor = processor
195
    self.mainloop = mainloop
196

    
197
    self.polling = True
198
    self.last_notification = 0
199

    
200
    # Asyncronous inotify handler for config changes
201
    self.wm = pyinotify.WatchManager()
202
    self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
203
    self.notifier = asyncnotifier.AsyncNotifier(self.wm, self.inotify_handler)
204

    
205
    self.timer_handle = None
206
    self._EnableTimer()
207

    
208
  def OnInotify(self, notifier_enabled):
209
    """Receive an inotify notification.
210

    
211
    @type notifier_enabled: boolean
212
    @param notifier_enabled: whether the notifier is still enabled
213

    
214
    """
215
    current_time = time.time()
216
    time_delta = current_time - self.last_notification
217
    self.last_notification = current_time
218

    
219
    if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
220
      logging.debug("Moving from inotify mode to polling mode")
221
      self.polling = True
222
      if notifier_enabled:
223
        self.inotify_handler.disable()
224

    
225
    if not self.polling and not notifier_enabled:
226
      try:
227
        self.inotify_handler.enable()
228
      except errors.InotifyError:
229
        self.polling = True
230

    
231
    try:
232
      reloaded = self.processor.reader.Reload()
233
      if reloaded:
234
        logging.info("Reloaded ganeti config")
235
      else:
236
        logging.debug("Skipped double config reload")
237
    except errors.ConfigurationError:
238
      self.DisableConfd()
239
      self.inotify_handler.disable()
240
      return
241

    
242
    # Reset the timer. If we're polling it will go to the polling rate, if
243
    # we're not it will delay it again to its base safe timeout.
244
    self._ResetTimer()
245

    
246
  def _DisableTimer(self):
247
    if self.timer_handle is not None:
248
      self.mainloop.scheduler.cancel(self.timer_handle)
249
      self.timer_handle = None
250

    
251
  def _EnableTimer(self):
252
    if self.polling:
253
      timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
254
    else:
255
      timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
256

    
257
    if self.timer_handle is None:
258
      self.timer_handle = self.mainloop.scheduler.enter(
259
        timeout, 1, self.OnTimer, [])
260

    
261
  def _ResetTimer(self):
262
    self._DisableTimer()
263
    self._EnableTimer()
264

    
265
  def OnTimer(self):
266
    """Function called when the timer fires
267

    
268
    """
269
    self.timer_handle = None
270
    reloaded = False
271
    was_disabled = False
272
    try:
273
      if self.processor.reader is None:
274
        was_disabled = True
275
        self.EnableConfd()
276
        reloaded = True
277
      else:
278
        reloaded = self.processor.reader.Reload()
279
    except errors.ConfigurationError:
280
      self.DisableConfd(silent=was_disabled)
281
      return
282

    
283
    if self.polling and reloaded:
284
      logging.info("Reloaded ganeti config")
285
    elif reloaded:
286
      # We have reloaded the config files, but received no inotify event.  If
287
      # an event is pending though, we just happen to have timed out before
288
      # receiving it, so this is not a problem, and we shouldn't alert
289
      if not self.notifier.check_events() and not was_disabled:
290
        logging.warning("Config file reload at timeout (inotify failure)")
291
    elif self.polling:
292
      # We're polling, but we haven't reloaded the config:
293
      # Going back to inotify mode
294
      logging.debug("Moving from polling mode to inotify mode")
295
      self.polling = False
296
      try:
297
        self.inotify_handler.enable()
298
      except errors.InotifyError:
299
        self.polling = True
300
    else:
301
      logging.debug("Performed configuration check")
302

    
303
    self._EnableTimer()
304

    
305
  def DisableConfd(self, silent=False):
306
    """Puts confd in non-serving mode
307

    
308
    """
309
    if not silent:
310
      logging.warning("Confd is being disabled")
311
    self.processor.Disable()
312
    self.polling = False
313
    self._ResetTimer()
314

    
315
  def EnableConfd(self):
316
    self.processor.Enable()
317
    logging.warning("Confd is being enabled")
318
    self.polling = True
319
    self._ResetTimer()
320

    
321

    
322
def CheckConfd(options, args):
323
  """Initial checks whether to run exit with a failure.
324

    
325
  """
326
  # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
327
  # have more than one.
328
  if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
329
    print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
330
    sys.exit(constants.EXIT_FAILURE)
331

    
332

    
333
def ExecConfd(options, args):
334
  """Main confd function, executed with PID file held
335

    
336
  """
337
  mainloop = daemon.Mainloop()
338

    
339
  # Asyncronous confd UDP server
340
  processor = confd_server.ConfdProcessor()
341
  try:
342
    processor.Enable()
343
  except errors.ConfigurationError:
344
    # If enabling the processor has failed, we can still go on, but confd will
345
    # be disabled
346
    logging.warning("Confd is starting in disabled mode")
347
    pass
348
  server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
349

    
350
  # Configuration reloader
351
  reloader = ConfdConfigurationReloader(processor, mainloop)
352

    
353
  mainloop.Run()
354

    
355

    
356
def main():
357
  """Main function for the confd daemon.
358

    
359
  """
360
  parser = OptionParser(description="Ganeti configuration daemon",
361
                        usage="%prog [-f] [-d] [-b ADDRESS]",
362
                        version="%%prog (ganeti) %s" %
363
                        constants.RELEASE_VERSION)
364

    
365
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
366
  dirs.append((constants.LOCK_DIR, 1777))
367
  daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
368

    
369

    
370
if __name__ == "__main__":
371
  main()