Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-confd @ 5f3269fc

History | View | Annotate | Download (11.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2009, Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti configuration daemon
23

    
24
Ganeti-confd is a daemon to query master candidates for configuration values.
25
It uses UDP+HMAC for authentication with a global cluster key.
26

    
27
"""
28

    
29
import os
30
import sys
31
import logging
32
import asyncore
33
import socket
34
import pyinotify
35
import time
36
import errno
37

    
38
from optparse import OptionParser
39

    
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import daemon
43
from ganeti import ssconf
44
from ganeti.asyncnotifier import AsyncNotifier
45
from ganeti.confd.server import ConfdProcessor
46

    
47

    
48
class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
49
  """The confd udp server, suitable for use with asyncore.
50

    
51
  """
52
  def __init__(self, bind_address, port, processor):
53
    """Constructor for ConfdAsyncUDPServer
54

    
55
    @type bind_address: string
56
    @param bind_address: socket bind address ('' for all)
57
    @type port: int
58
    @param port: udp port
59
    @type processor: L{confd.server.ConfdProcessor}
60
    @param reader: ConfigReader to use to access the config
61

    
62
    """
63
    daemon.AsyncUDPSocket.__init__(self)
64
    self.bind_address = bind_address
65
    self.port = port
66
    self.processor = processor
67
    self.bind((bind_address, port))
68
    logging.debug("listening on ('%s':%d)" % (bind_address, port))
69

    
70
  # this method is overriding a daemon.AsyncUDPSocket method
71
  def handle_datagram(self, payload_in, ip, port):
72
    payload_out =  self.processor.ExecQuery(payload_in, ip, port)
73
    if payload_out is not None:
74
      self.enqueue_send(ip, port, payload_out)
75

    
76

    
77
class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
78

    
79
  def __init__(self, watch_manager, callback,
80
               file=constants.CLUSTER_CONF_FILE):
81
    """Constructor for ConfdInotifyEventHandler
82

    
83
    @type watch_manager: L{pyinotify.WatchManager}
84
    @param watch_manager: ganeti-confd inotify watch manager
85
    @type callback: function accepting a boolean
86
    @param callback: function to call when an inotify event happens
87
    @type file: string
88
    @param file: config file to watch
89

    
90
    """
91
    # no need to call the parent's constructor
92
    self.watch_manager = watch_manager
93
    self.callback = callback
94
    self.mask = pyinotify.EventsCodes.IN_IGNORED | \
95
                pyinotify.EventsCodes.IN_MODIFY
96
    self.file = file
97
    self.watch_handle = None
98

    
99
  def enable(self):
100
    """Watch the given file
101

    
102
    """
103
    if self.watch_handle is None:
104
      result = self.watch_manager.add_watch(self.file, self.mask)
105
      if not self.file in result or result[self.file] <= 0:
106
        raise errors.InotifyError("Could not add inotify watcher")
107
      else:
108
        self.watch_handle = result[self.file]
109

    
110
  def disable(self):
111
    """Stop watching the given file
112

    
113
    """
114
    if self.watch_handle is not None:
115
      result = self.watch_manager.rm_watch(self.watch_handle)
116
      if result[self.watch_handle]:
117
        self.watch_handle = None
118

    
119
  def process_IN_IGNORED(self, event):
120
    # Due to the fact that we monitor just for the cluster config file (rather
121
    # than for the whole data dir) when the file is replaced with another one
122
    # (which is what happens normally in ganeti) we're going to receive an
123
    # IN_IGNORED event from inotify, because of the file removal (which is
124
    # contextual with the replacement). In such a case we need to create
125
    # another watcher for the "new" file.
126
    logging.debug("Received 'ignored' inotify event for %s" % event.path)
127
    self.watch_handle = None
128

    
129
    try:
130
      # Since the kernel believes the file we were interested in is gone, it's
131
      # not going to notify us of any other events, until we set up, here, the
132
      # new watch. This is not a race condition, though, since we're anyway
133
      # going to realod the file after setting up the new watch.
134
      self.callback(False)
135
    except errors.ConfdFatalError, err:
136
      logging.critical("Critical error, shutting down: %s" % err)
137
      sys.exit(constants.EXIT_FAILURE)
138
    except:
139
      # we need to catch any exception here, log it, but proceed, because even
140
      # if we failed handling a single request, we still want the confd to
141
      # continue working.
142
      logging.error("Unexpected exception", exc_info=True)
143

    
144
  def process_IN_MODIFY(self, event):
145
    # This gets called when the config file is modified. Note that this doesn't
146
    # usually happen in Ganeti, as the config file is normally replaced by a
147
    # new one, at filesystem level, rather than actually modified (see
148
    # utils.WriteFile)
149
    logging.debug("Received 'modify' inotify event for %s" % event.path)
150

    
151
    try:
152
      self.callback(True)
153
    except errors.ConfdFatalError, err:
154
      logging.critical("Critical error, shutting down: %s" % err)
155
      sys.exit(constants.EXIT_FAILURE)
156
    except:
157
      # we need to catch any exception here, log it, but proceed, because even
158
      # if we failed handling a single request, we still want the confd to
159
      # continue working.
160
      logging.error("Unexpected exception", exc_info=True)
161

    
162
  def process_default(self, event):
163
    logging.error("Received unhandled inotify event: %s" % event)
164

    
165

    
166
class ConfdConfigurationReloader(object):
167
  """Logic to control when to reload the ganeti configuration
168

    
169
  This class is able to alter between inotify and polling, to rate-limit the
170
  number of reloads. When using inotify it also supports a fallback timed
171
  check, to verify that the reload hasn't failed.
172

    
173
  """
174
  def __init__(self, processor, mainloop):
175
    """Constructor for ConfdConfigurationReloader
176

    
177
    @type processor: L{confd.server.ConfdProcessor}
178
    @param processor: ganeti-confd ConfdProcessor
179
    @type mainloop: L{daemon.Mainloop}
180
    @param mainloop: ganeti-confd mainloop
181

    
182
    """
183
    self.processor = processor
184
    self.mainloop = mainloop
185

    
186
    self.polling = True
187
    self.last_notification = 0
188

    
189
    # Asyncronous inotify handler for config changes
190
    self.wm = pyinotify.WatchManager()
191
    self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
192
    self.notifier = AsyncNotifier(self.wm, self.inotify_handler)
193

    
194
    self.timer_handle = None
195
    self._EnableTimer()
196

    
197
  def OnInotify(self, notifier_enabled):
198
    """Receive an inotify notification.
199

    
200
    @type notifier_enabled: boolean
201
    @param notifier_enabled: whether the notifier is still enabled
202

    
203
    """
204
    current_time = time.time()
205
    time_delta = current_time - self.last_notification
206
    self.last_notification = current_time
207

    
208
    if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
209
      logging.debug("Moving from inotify mode to polling mode")
210
      self.polling = True
211
      if notifier_enabled:
212
        self.inotify_handler.disable()
213

    
214
    if not self.polling and not notifier_enabled:
215
      try:
216
        self.inotify_handler.enable()
217
      except errors.InotifyError:
218
        self.polling = True
219

    
220
    try:
221
      reloaded = self.processor.reader.Reload()
222
      if reloaded:
223
        logging.info("Reloaded ganeti config")
224
      else:
225
        logging.debug("Skipped double config reload")
226
    except errors.ConfigurationError:
227
      self.DisableConfd()
228
      self.inotify_handler.disable()
229
      return
230

    
231
    # Reset the timer. If we're polling it will go to the polling rate, if
232
    # we're not it will delay it again to its base safe timeout.
233
    self._ResetTimer()
234

    
235
  def _DisableTimer(self):
236
    if self.timer_handle is not None:
237
      self.mainloop.scheduler.cancel(self.timer_handle)
238
      self.timer_handle = None
239

    
240
  def _EnableTimer(self):
241
    if self.polling:
242
      timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
243
    else:
244
      timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
245

    
246
    if self.timer_handle is None:
247
      self.timer_handle = self.mainloop.scheduler.enter(
248
        timeout, 1, self.OnTimer, [])
249

    
250
  def _ResetTimer(self):
251
    self._DisableTimer()
252
    self._EnableTimer()
253

    
254
  def OnTimer(self):
255
    """Function called when the timer fires
256

    
257
    """
258
    self.timer_handle = None
259
    reloaded = False
260
    was_disabled = False
261
    try:
262
      if self.processor.reader is None:
263
        was_disabled = True
264
        self.EnableConfd()
265
        reloaded = True
266
      else:
267
        reloaded = self.processor.reader.Reload()
268
    except errors.ConfigurationError:
269
      self.DisableConfd(silent=was_disabled)
270
      return
271

    
272
    if self.polling and reloaded:
273
      logging.info("Reloaded ganeti config")
274
    elif reloaded:
275
      # We have reloaded the config files, but received no inotify event.  If
276
      # an event is pending though, we just happen to have timed out before
277
      # receiving it, so this is not a problem, and we shouldn't alert
278
      if not self.notifier.check_events() and not was_disabled:
279
        logging.warning("Config file reload at timeout (inotify failure)")
280
    elif self.polling:
281
      # We're polling, but we haven't reloaded the config:
282
      # Going back to inotify mode
283
      logging.debug("Moving from polling mode to inotify mode")
284
      self.polling = False
285
      try:
286
        self.inotify_handler.enable()
287
      except errors.InotifyError:
288
        self.polling = True
289
    else:
290
      logging.debug("Performed configuration check")
291

    
292
    self._EnableTimer()
293

    
294
  def DisableConfd(self, silent=False):
295
    """Puts confd in non-serving mode
296

    
297
    """
298
    if not silent:
299
      logging.warning("Confd is being disabled")
300
    self.processor.Disable()
301
    self.polling = False
302
    self._ResetTimer()
303

    
304
  def EnableConfd(self):
305
    self.processor.Enable()
306
    logging.warning("Confd is being enabled")
307
    self.polling = True
308
    self._ResetTimer()
309

    
310

    
311
def CheckConfd(options, args):
312
  """Initial checks whether to run exit with a failure.
313

    
314
  """
315
  # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
316
  # have more than one.
317
  if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
318
    print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
319
    sys.exit(constants.EXIT_FAILURE)
320

    
321

    
322
def ExecConfd(options, args):
323
  """Main confd function, executed with PID file held
324

    
325
  """
326
  mainloop = daemon.Mainloop()
327

    
328
  # Asyncronous confd UDP server
329
  processor = ConfdProcessor()
330
  try:
331
    processor.Enable()
332
  except errors.ConfigurationError:
333
    # If enabling the processor has failed, we can still go on, but confd will
334
    # be disabled
335
    logging.warning("Confd is starting in disabled mode")
336
    pass
337
  server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
338

    
339
  # Configuration reloader
340
  reloader = ConfdConfigurationReloader(processor, mainloop)
341

    
342
  mainloop.Run()
343

    
344

    
345
def main():
346
  """Main function for the confd daemon.
347

    
348
  """
349
  parser = OptionParser(description="Ganeti configuration daemon",
350
                        usage="%prog [-f] [-d] [-b ADDRESS]",
351
                        version="%%prog (ganeti) %s" %
352
                        constants.RELEASE_VERSION)
353

    
354
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
355
  dirs.append((constants.LOG_OS_DIR, 0750))
356
  dirs.append((constants.LOCK_DIR, 1777))
357
  daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
358

    
359

    
360
if __name__ == "__main__":
361
  main()