Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-confd @ 1fea2f0d

History | View | Annotate | Download (11.6 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2009, Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti configuration daemon
23

    
24
Ganeti-confd is a daemon to query master candidates for configuration values.
25
It uses UDP+HMAC for authentication with a global cluster key.
26

    
27
"""
28

    
29
import os
30
import sys
31
import logging
32
import pyinotify
33
import time
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import asyncnotifier
38
from ganeti import confd
39
from ganeti.confd import server as confd_server
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import daemon
43
from ganeti import ssconf
44

    
45

    
46
class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
47
  """The confd udp server, suitable for use with asyncore.
48

    
49
  """
50
  def __init__(self, bind_address, port, processor):
51
    """Constructor for ConfdAsyncUDPServer
52

    
53
    @type bind_address: string
54
    @param bind_address: socket bind address ('' for all)
55
    @type port: int
56
    @param port: udp port
57
    @type processor: L{confd.server.ConfdProcessor}
58
    @param processor: ConfdProcessor to use to handle queries
59

    
60
    """
61
    daemon.AsyncUDPSocket.__init__(self)
62
    self.bind_address = bind_address
63
    self.port = port
64
    self.processor = processor
65
    self.bind((bind_address, port))
66
    logging.debug("listening on ('%s':%d)" % (bind_address, port))
67

    
68
  # this method is overriding a daemon.AsyncUDPSocket method
69
  def handle_datagram(self, payload_in, ip, port):
70
    try:
71
      query = confd.UnpackMagic(payload_in)
72
    except errors.ConfdMagicError, err:
73
      logging.debug(err)
74
      return
75

    
76
    answer =  self.processor.ExecQuery(query, ip, port)
77
    if answer is not None:
78
      try:
79
        self.enqueue_send(ip, port, confd.PackMagic(answer))
80
      except errors.UdpDataSizeError:
81
        logging.error("Reply too big to fit in an udp packet.")
82

    
83

    
84
class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
85

    
86
  def __init__(self, watch_manager, callback,
87
               file=constants.CLUSTER_CONF_FILE):
88
    """Constructor for ConfdInotifyEventHandler
89

    
90
    @type watch_manager: L{pyinotify.WatchManager}
91
    @param watch_manager: ganeti-confd inotify watch manager
92
    @type callback: function accepting a boolean
93
    @param callback: function to call when an inotify event happens
94
    @type file: string
95
    @param file: config file to watch
96

    
97
    """
98
    # no need to call the parent's constructor
99
    self.watch_manager = watch_manager
100
    self.callback = callback
101
    self.mask = pyinotify.EventsCodes.IN_IGNORED | \
102
                pyinotify.EventsCodes.IN_MODIFY
103
    self.file = file
104
    self.watch_handle = None
105

    
106
  def enable(self):
107
    """Watch the given file
108

    
109
    """
110
    if self.watch_handle is None:
111
      result = self.watch_manager.add_watch(self.file, self.mask)
112
      if not self.file in result or result[self.file] <= 0:
113
        raise errors.InotifyError("Could not add inotify watcher")
114
      else:
115
        self.watch_handle = result[self.file]
116

    
117
  def disable(self):
118
    """Stop watching the given file
119

    
120
    """
121
    if self.watch_handle is not None:
122
      result = self.watch_manager.rm_watch(self.watch_handle)
123
      if result[self.watch_handle]:
124
        self.watch_handle = None
125

    
126
  def process_IN_IGNORED(self, event):
127
    # Due to the fact that we monitor just for the cluster config file (rather
128
    # than for the whole data dir) when the file is replaced with another one
129
    # (which is what happens normally in ganeti) we're going to receive an
130
    # IN_IGNORED event from inotify, because of the file removal (which is
131
    # contextual with the replacement). In such a case we need to create
132
    # another watcher for the "new" file.
133
    logging.debug("Received 'ignored' inotify event for %s" % event.path)
134
    self.watch_handle = None
135

    
136
    try:
137
      # Since the kernel believes the file we were interested in is gone, it's
138
      # not going to notify us of any other events, until we set up, here, the
139
      # new watch. This is not a race condition, though, since we're anyway
140
      # going to realod the file after setting up the new watch.
141
      self.callback(False)
142
    except errors.ConfdFatalError, err:
143
      logging.critical("Critical error, shutting down: %s" % err)
144
      sys.exit(constants.EXIT_FAILURE)
145
    except:
146
      # we need to catch any exception here, log it, but proceed, because even
147
      # if we failed handling a single request, we still want the confd to
148
      # continue working.
149
      logging.error("Unexpected exception", exc_info=True)
150

    
151
  def process_IN_MODIFY(self, event):
152
    # This gets called when the config file is modified. Note that this doesn't
153
    # usually happen in Ganeti, as the config file is normally replaced by a
154
    # new one, at filesystem level, rather than actually modified (see
155
    # utils.WriteFile)
156
    logging.debug("Received 'modify' inotify event for %s" % event.path)
157

    
158
    try:
159
      self.callback(True)
160
    except errors.ConfdFatalError, err:
161
      logging.critical("Critical error, shutting down: %s" % err)
162
      sys.exit(constants.EXIT_FAILURE)
163
    except:
164
      # we need to catch any exception here, log it, but proceed, because even
165
      # if we failed handling a single request, we still want the confd to
166
      # continue working.
167
      logging.error("Unexpected exception", exc_info=True)
168

    
169
  def process_default(self, event):
170
    logging.error("Received unhandled inotify event: %s" % event)
171

    
172

    
173
class ConfdConfigurationReloader(object):
174
  """Logic to control when to reload the ganeti configuration
175

    
176
  This class is able to alter between inotify and polling, to rate-limit the
177
  number of reloads. When using inotify it also supports a fallback timed
178
  check, to verify that the reload hasn't failed.
179

    
180
  """
181
  def __init__(self, processor, mainloop):
182
    """Constructor for ConfdConfigurationReloader
183

    
184
    @type processor: L{confd.server.ConfdProcessor}
185
    @param processor: ganeti-confd ConfdProcessor
186
    @type mainloop: L{daemon.Mainloop}
187
    @param mainloop: ganeti-confd mainloop
188

    
189
    """
190
    self.processor = processor
191
    self.mainloop = mainloop
192

    
193
    self.polling = True
194
    self.last_notification = 0
195

    
196
    # Asyncronous inotify handler for config changes
197
    self.wm = pyinotify.WatchManager()
198
    self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
199
    self.notifier = asyncnotifier.AsyncNotifier(self.wm, self.inotify_handler)
200

    
201
    self.timer_handle = None
202
    self._EnableTimer()
203

    
204
  def OnInotify(self, notifier_enabled):
205
    """Receive an inotify notification.
206

    
207
    @type notifier_enabled: boolean
208
    @param notifier_enabled: whether the notifier is still enabled
209

    
210
    """
211
    current_time = time.time()
212
    time_delta = current_time - self.last_notification
213
    self.last_notification = current_time
214

    
215
    if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
216
      logging.debug("Moving from inotify mode to polling mode")
217
      self.polling = True
218
      if notifier_enabled:
219
        self.inotify_handler.disable()
220

    
221
    if not self.polling and not notifier_enabled:
222
      try:
223
        self.inotify_handler.enable()
224
      except errors.InotifyError:
225
        self.polling = True
226

    
227
    try:
228
      reloaded = self.processor.reader.Reload()
229
      if reloaded:
230
        logging.info("Reloaded ganeti config")
231
      else:
232
        logging.debug("Skipped double config reload")
233
    except errors.ConfigurationError:
234
      self.DisableConfd()
235
      self.inotify_handler.disable()
236
      return
237

    
238
    # Reset the timer. If we're polling it will go to the polling rate, if
239
    # we're not it will delay it again to its base safe timeout.
240
    self._ResetTimer()
241

    
242
  def _DisableTimer(self):
243
    if self.timer_handle is not None:
244
      self.mainloop.scheduler.cancel(self.timer_handle)
245
      self.timer_handle = None
246

    
247
  def _EnableTimer(self):
248
    if self.polling:
249
      timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
250
    else:
251
      timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
252

    
253
    if self.timer_handle is None:
254
      self.timer_handle = self.mainloop.scheduler.enter(
255
        timeout, 1, self.OnTimer, [])
256

    
257
  def _ResetTimer(self):
258
    self._DisableTimer()
259
    self._EnableTimer()
260

    
261
  def OnTimer(self):
262
    """Function called when the timer fires
263

    
264
    """
265
    self.timer_handle = None
266
    reloaded = False
267
    was_disabled = False
268
    try:
269
      if self.processor.reader is None:
270
        was_disabled = True
271
        self.EnableConfd()
272
        reloaded = True
273
      else:
274
        reloaded = self.processor.reader.Reload()
275
    except errors.ConfigurationError:
276
      self.DisableConfd(silent=was_disabled)
277
      return
278

    
279
    if self.polling and reloaded:
280
      logging.info("Reloaded ganeti config")
281
    elif reloaded:
282
      # We have reloaded the config files, but received no inotify event.  If
283
      # an event is pending though, we just happen to have timed out before
284
      # receiving it, so this is not a problem, and we shouldn't alert
285
      if not self.notifier.check_events() and not was_disabled:
286
        logging.warning("Config file reload at timeout (inotify failure)")
287
    elif self.polling:
288
      # We're polling, but we haven't reloaded the config:
289
      # Going back to inotify mode
290
      logging.debug("Moving from polling mode to inotify mode")
291
      self.polling = False
292
      try:
293
        self.inotify_handler.enable()
294
      except errors.InotifyError:
295
        self.polling = True
296
    else:
297
      logging.debug("Performed configuration check")
298

    
299
    self._EnableTimer()
300

    
301
  def DisableConfd(self, silent=False):
302
    """Puts confd in non-serving mode
303

    
304
    """
305
    if not silent:
306
      logging.warning("Confd is being disabled")
307
    self.processor.Disable()
308
    self.polling = False
309
    self._ResetTimer()
310

    
311
  def EnableConfd(self):
312
    self.processor.Enable()
313
    logging.warning("Confd is being enabled")
314
    self.polling = True
315
    self._ResetTimer()
316

    
317

    
318
def CheckConfd(options, args):
319
  """Initial checks whether to run exit with a failure.
320

    
321
  """
322
  # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
323
  # have more than one.
324
  if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
325
    print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
326
    sys.exit(constants.EXIT_FAILURE)
327

    
328

    
329
def ExecConfd(options, args):
330
  """Main confd function, executed with PID file held
331

    
332
  """
333
  mainloop = daemon.Mainloop()
334

    
335
  # Asyncronous confd UDP server
336
  processor = confd_server.ConfdProcessor()
337
  try:
338
    processor.Enable()
339
  except errors.ConfigurationError:
340
    # If enabling the processor has failed, we can still go on, but confd will
341
    # be disabled
342
    logging.warning("Confd is starting in disabled mode")
343
    pass
344
  server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
345

    
346
  # Configuration reloader
347
  reloader = ConfdConfigurationReloader(processor, mainloop)
348

    
349
  mainloop.Run()
350

    
351

    
352
def main():
353
  """Main function for the confd daemon.
354

    
355
  """
356
  parser = OptionParser(description="Ganeti configuration daemon",
357
                        usage="%prog [-f] [-d] [-b ADDRESS]",
358
                        version="%%prog (ganeti) %s" %
359
                        constants.RELEASE_VERSION)
360

    
361
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
362
  dirs.append((constants.LOCK_DIR, 1777))
363
  daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
364

    
365

    
366
if __name__ == "__main__":
367
  main()