Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-confd @ 9748ab35

History | View | Annotate | Download (11.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2009, Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti configuration daemon
23

    
24
Ganeti-confd is a daemon to query master candidates for configuration values.
25
It uses UDP+HMAC for authentication with a global cluster key.
26

    
27
"""
28

    
29
import os
30
import sys
31
import logging
32
import asyncore
33
import socket
34
import pyinotify
35
import time
36
import errno
37

    
38
from optparse import OptionParser
39

    
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import daemon
43
from ganeti import ssconf
44
from ganeti.asyncnotifier import AsyncNotifier
45
from ganeti.confd.server import ConfdProcessor
46
from ganeti.confd import PackMagic, UnpackMagic
47

    
48

    
49
class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
50
  """The confd udp server, suitable for use with asyncore.
51

    
52
  """
53
  def __init__(self, bind_address, port, processor):
54
    """Constructor for ConfdAsyncUDPServer
55

    
56
    @type bind_address: string
57
    @param bind_address: socket bind address ('' for all)
58
    @type port: int
59
    @param port: udp port
60
    @type processor: L{confd.server.ConfdProcessor}
61
    @param processor: ConfdProcessor to use to handle queries
62

    
63
    """
64
    daemon.AsyncUDPSocket.__init__(self)
65
    self.bind_address = bind_address
66
    self.port = port
67
    self.processor = processor
68
    self.bind((bind_address, port))
69
    logging.debug("listening on ('%s':%d)" % (bind_address, port))
70

    
71
  # this method is overriding a daemon.AsyncUDPSocket method
72
  def handle_datagram(self, payload_in, ip, port):
73
    try:
74
      query = UnpackMagic(payload_in)
75
    except errors.ConfdMagicError, err:
76
      logging.debug(err)
77
      return
78

    
79
    answer =  self.processor.ExecQuery(query, ip, port)
80
    if answer is not None:
81
      try:
82
        self.enqueue_send(ip, port, PackMagic(answer))
83
      except errors.UdpDataSizeError:
84
        logging.error("Reply too big to fit in an udp packet.")
85

    
86

    
87
class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
88

    
89
  def __init__(self, watch_manager, callback,
90
               file=constants.CLUSTER_CONF_FILE):
91
    """Constructor for ConfdInotifyEventHandler
92

    
93
    @type watch_manager: L{pyinotify.WatchManager}
94
    @param watch_manager: ganeti-confd inotify watch manager
95
    @type callback: function accepting a boolean
96
    @param callback: function to call when an inotify event happens
97
    @type file: string
98
    @param file: config file to watch
99

    
100
    """
101
    # no need to call the parent's constructor
102
    self.watch_manager = watch_manager
103
    self.callback = callback
104
    self.mask = pyinotify.EventsCodes.IN_IGNORED | \
105
                pyinotify.EventsCodes.IN_MODIFY
106
    self.file = file
107
    self.watch_handle = None
108

    
109
  def enable(self):
110
    """Watch the given file
111

    
112
    """
113
    if self.watch_handle is None:
114
      result = self.watch_manager.add_watch(self.file, self.mask)
115
      if not self.file in result or result[self.file] <= 0:
116
        raise errors.InotifyError("Could not add inotify watcher")
117
      else:
118
        self.watch_handle = result[self.file]
119

    
120
  def disable(self):
121
    """Stop watching the given file
122

    
123
    """
124
    if self.watch_handle is not None:
125
      result = self.watch_manager.rm_watch(self.watch_handle)
126
      if result[self.watch_handle]:
127
        self.watch_handle = None
128

    
129
  def process_IN_IGNORED(self, event):
130
    # Due to the fact that we monitor just for the cluster config file (rather
131
    # than for the whole data dir) when the file is replaced with another one
132
    # (which is what happens normally in ganeti) we're going to receive an
133
    # IN_IGNORED event from inotify, because of the file removal (which is
134
    # contextual with the replacement). In such a case we need to create
135
    # another watcher for the "new" file.
136
    logging.debug("Received 'ignored' inotify event for %s" % event.path)
137
    self.watch_handle = None
138

    
139
    try:
140
      # Since the kernel believes the file we were interested in is gone, it's
141
      # not going to notify us of any other events, until we set up, here, the
142
      # new watch. This is not a race condition, though, since we're anyway
143
      # going to realod the file after setting up the new watch.
144
      self.callback(False)
145
    except errors.ConfdFatalError, err:
146
      logging.critical("Critical error, shutting down: %s" % err)
147
      sys.exit(constants.EXIT_FAILURE)
148
    except:
149
      # we need to catch any exception here, log it, but proceed, because even
150
      # if we failed handling a single request, we still want the confd to
151
      # continue working.
152
      logging.error("Unexpected exception", exc_info=True)
153

    
154
  def process_IN_MODIFY(self, event):
155
    # This gets called when the config file is modified. Note that this doesn't
156
    # usually happen in Ganeti, as the config file is normally replaced by a
157
    # new one, at filesystem level, rather than actually modified (see
158
    # utils.WriteFile)
159
    logging.debug("Received 'modify' inotify event for %s" % event.path)
160

    
161
    try:
162
      self.callback(True)
163
    except errors.ConfdFatalError, err:
164
      logging.critical("Critical error, shutting down: %s" % err)
165
      sys.exit(constants.EXIT_FAILURE)
166
    except:
167
      # we need to catch any exception here, log it, but proceed, because even
168
      # if we failed handling a single request, we still want the confd to
169
      # continue working.
170
      logging.error("Unexpected exception", exc_info=True)
171

    
172
  def process_default(self, event):
173
    logging.error("Received unhandled inotify event: %s" % event)
174

    
175

    
176
class ConfdConfigurationReloader(object):
177
  """Logic to control when to reload the ganeti configuration
178

    
179
  This class is able to alter between inotify and polling, to rate-limit the
180
  number of reloads. When using inotify it also supports a fallback timed
181
  check, to verify that the reload hasn't failed.
182

    
183
  """
184
  def __init__(self, processor, mainloop):
185
    """Constructor for ConfdConfigurationReloader
186

    
187
    @type processor: L{confd.server.ConfdProcessor}
188
    @param processor: ganeti-confd ConfdProcessor
189
    @type mainloop: L{daemon.Mainloop}
190
    @param mainloop: ganeti-confd mainloop
191

    
192
    """
193
    self.processor = processor
194
    self.mainloop = mainloop
195

    
196
    self.polling = True
197
    self.last_notification = 0
198

    
199
    # Asyncronous inotify handler for config changes
200
    self.wm = pyinotify.WatchManager()
201
    self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
202
    self.notifier = AsyncNotifier(self.wm, self.inotify_handler)
203

    
204
    self.timer_handle = None
205
    self._EnableTimer()
206

    
207
  def OnInotify(self, notifier_enabled):
208
    """Receive an inotify notification.
209

    
210
    @type notifier_enabled: boolean
211
    @param notifier_enabled: whether the notifier is still enabled
212

    
213
    """
214
    current_time = time.time()
215
    time_delta = current_time - self.last_notification
216
    self.last_notification = current_time
217

    
218
    if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
219
      logging.debug("Moving from inotify mode to polling mode")
220
      self.polling = True
221
      if notifier_enabled:
222
        self.inotify_handler.disable()
223

    
224
    if not self.polling and not notifier_enabled:
225
      try:
226
        self.inotify_handler.enable()
227
      except errors.InotifyError:
228
        self.polling = True
229

    
230
    try:
231
      reloaded = self.processor.reader.Reload()
232
      if reloaded:
233
        logging.info("Reloaded ganeti config")
234
      else:
235
        logging.debug("Skipped double config reload")
236
    except errors.ConfigurationError:
237
      self.DisableConfd()
238
      self.inotify_handler.disable()
239
      return
240

    
241
    # Reset the timer. If we're polling it will go to the polling rate, if
242
    # we're not it will delay it again to its base safe timeout.
243
    self._ResetTimer()
244

    
245
  def _DisableTimer(self):
246
    if self.timer_handle is not None:
247
      self.mainloop.scheduler.cancel(self.timer_handle)
248
      self.timer_handle = None
249

    
250
  def _EnableTimer(self):
251
    if self.polling:
252
      timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
253
    else:
254
      timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
255

    
256
    if self.timer_handle is None:
257
      self.timer_handle = self.mainloop.scheduler.enter(
258
        timeout, 1, self.OnTimer, [])
259

    
260
  def _ResetTimer(self):
261
    self._DisableTimer()
262
    self._EnableTimer()
263

    
264
  def OnTimer(self):
265
    """Function called when the timer fires
266

    
267
    """
268
    self.timer_handle = None
269
    reloaded = False
270
    was_disabled = False
271
    try:
272
      if self.processor.reader is None:
273
        was_disabled = True
274
        self.EnableConfd()
275
        reloaded = True
276
      else:
277
        reloaded = self.processor.reader.Reload()
278
    except errors.ConfigurationError:
279
      self.DisableConfd(silent=was_disabled)
280
      return
281

    
282
    if self.polling and reloaded:
283
      logging.info("Reloaded ganeti config")
284
    elif reloaded:
285
      # We have reloaded the config files, but received no inotify event.  If
286
      # an event is pending though, we just happen to have timed out before
287
      # receiving it, so this is not a problem, and we shouldn't alert
288
      if not self.notifier.check_events() and not was_disabled:
289
        logging.warning("Config file reload at timeout (inotify failure)")
290
    elif self.polling:
291
      # We're polling, but we haven't reloaded the config:
292
      # Going back to inotify mode
293
      logging.debug("Moving from polling mode to inotify mode")
294
      self.polling = False
295
      try:
296
        self.inotify_handler.enable()
297
      except errors.InotifyError:
298
        self.polling = True
299
    else:
300
      logging.debug("Performed configuration check")
301

    
302
    self._EnableTimer()
303

    
304
  def DisableConfd(self, silent=False):
305
    """Puts confd in non-serving mode
306

    
307
    """
308
    if not silent:
309
      logging.warning("Confd is being disabled")
310
    self.processor.Disable()
311
    self.polling = False
312
    self._ResetTimer()
313

    
314
  def EnableConfd(self):
315
    self.processor.Enable()
316
    logging.warning("Confd is being enabled")
317
    self.polling = True
318
    self._ResetTimer()
319

    
320

    
321
def CheckConfd(options, args):
322
  """Initial checks whether to run exit with a failure.
323

    
324
  """
325
  # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
326
  # have more than one.
327
  if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
328
    print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
329
    sys.exit(constants.EXIT_FAILURE)
330

    
331

    
332
def ExecConfd(options, args):
333
  """Main confd function, executed with PID file held
334

    
335
  """
336
  mainloop = daemon.Mainloop()
337

    
338
  # Asyncronous confd UDP server
339
  processor = ConfdProcessor()
340
  try:
341
    processor.Enable()
342
  except errors.ConfigurationError:
343
    # If enabling the processor has failed, we can still go on, but confd will
344
    # be disabled
345
    logging.warning("Confd is starting in disabled mode")
346
    pass
347
  server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
348

    
349
  # Configuration reloader
350
  reloader = ConfdConfigurationReloader(processor, mainloop)
351

    
352
  mainloop.Run()
353

    
354

    
355
def main():
356
  """Main function for the confd daemon.
357

    
358
  """
359
  parser = OptionParser(description="Ganeti configuration daemon",
360
                        usage="%prog [-f] [-d] [-b ADDRESS]",
361
                        version="%%prog (ganeti) %s" %
362
                        constants.RELEASE_VERSION)
363

    
364
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
365
  dirs.append((constants.LOG_OS_DIR, 0750))
366
  dirs.append((constants.LOCK_DIR, 1777))
367
  daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
368

    
369

    
370
if __name__ == "__main__":
371
  main()