Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-confd @ 4f16b4a0

History | View | Annotate | Download (12.5 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2009, Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti configuration daemon
23

    
24
Ganeti-confd is a daemon to query master candidates for configuration values.
25
It uses UDP+HMAC for authentication with a global cluster key.
26

    
27
"""
28

    
29
import os
30
import sys
31
import logging
32
import asyncore
33
import socket
34
import pyinotify
35
import time
36
import errno
37

    
38
from optparse import OptionParser
39

    
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import daemon
43
from ganeti import ssconf
44
from ganeti.asyncnotifier import AsyncNotifier
45
from ganeti.confd.server import ConfdProcessor
46

    
47

    
48
class ConfdAsyncUDPServer(asyncore.dispatcher):
49
  """The confd udp server, suitable for use with asyncore.
50

    
51
  """
52
  def __init__(self, bind_address, port, processor):
53
    """Constructor for ConfdAsyncUDPServer
54

    
55
    @type bind_address: string
56
    @param bind_address: socket bind address ('' for all)
57
    @type port: int
58
    @param port: udp port
59
    @type processor: L{confd.server.ConfdProcessor}
60
    @param reader: ConfigReader to use to access the config
61

    
62
    """
63
    asyncore.dispatcher.__init__(self)
64
    self.bind_address = bind_address
65
    self.port = port
66
    self.processor = processor
67
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
68
    self.bind((bind_address, port))
69
    logging.debug("listening on ('%s':%d)" % (bind_address, port))
70

    
71
  # this method is overriding an asyncore.dispatcher method
72
  def handle_connect(self):
73
    # Python thinks that the first udp message from a source qualifies as a
74
    # "connect" and further ones are part of the same connection. We beg to
75
    # differ and treat all messages equally.
76
    pass
77

    
78
  # this method is overriding an asyncore.dispatcher method
79
  def handle_read(self):
80
    try:
81
      try:
82
        payload_in, address = self.recvfrom(4096)
83
      except socket.error, err:
84
        if err.errno == errno.EINTR:
85
          # we got a signal while trying to read. no need to do anything,
86
          # handle_read will be called again if there is data on the socket.
87
          return
88
        else:
89
          raise
90
      ip, port = address
91
      payload_out =  self.processor.ExecQuery(payload_in, ip, port)
92
      if payload_out is not None:
93
        self.sendto(payload_out, 0, (ip, port))
94
    except:
95
      # we need to catch any exception here, log it, but proceed, because even
96
      # if we failed handling a single request, we still want the confd to
97
      # continue working.
98
      logging.error("Unexpected exception", exc_info=True)
99

    
100
  # this method is overriding an asyncore.dispatcher method
101
  def writable(self):
102
    # No need to check if we can write to the UDP socket
103
    return False
104

    
105

    
106
class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
107

    
108
  def __init__(self, watch_manager, callback,
109
               file=constants.CLUSTER_CONF_FILE):
110
    """Constructor for ConfdInotifyEventHandler
111

    
112
    @type watch_manager: L{pyinotify.WatchManager}
113
    @param watch_manager: ganeti-confd inotify watch manager
114
    @type callback: function accepting a boolean
115
    @param callback: function to call when an inotify event happens
116
    @type file: string
117
    @param file: config file to watch
118

    
119
    """
120
    # no need to call the parent's constructor
121
    self.watch_manager = watch_manager
122
    self.callback = callback
123
    self.mask = pyinotify.EventsCodes.IN_IGNORED | \
124
                pyinotify.EventsCodes.IN_MODIFY
125
    self.file = file
126
    self.watch_handle = None
127

    
128
  def enable(self):
129
    """Watch the given file
130

    
131
    """
132
    if self.watch_handle is None:
133
      result = self.watch_manager.add_watch(self.file, self.mask)
134
      if not self.file in result or result[self.file] <= 0:
135
        raise errors.InotifyError("Could not add inotify watcher")
136
      else:
137
        self.watch_handle = result[self.file]
138

    
139
  def disable(self):
140
    """Stop watching the given file
141

    
142
    """
143
    if self.watch_handle is not None:
144
      result = self.watch_manager.rm_watch(self.watch_handle)
145
      if result[self.watch_handle]:
146
        self.watch_handle = None
147

    
148
  def process_IN_IGNORED(self, event):
149
    # Due to the fact that we monitor just for the cluster config file (rather
150
    # than for the whole data dir) when the file is replaced with another one
151
    # (which is what happens normally in ganeti) we're going to receive an
152
    # IN_IGNORED event from inotify, because of the file removal (which is
153
    # contextual with the replacement). In such a case we need to create
154
    # another watcher for the "new" file.
155
    logging.debug("Received 'ignored' inotify event for %s" % event.path)
156
    self.watch_handle = None
157

    
158
    try:
159
      # Since the kernel believes the file we were interested in is gone, it's
160
      # not going to notify us of any other events, until we set up, here, the
161
      # new watch. This is not a race condition, though, since we're anyway
162
      # going to realod the file after setting up the new watch.
163
      self.callback(False)
164
    except errors.ConfdFatalError, err:
165
      logging.critical("Critical error, shutting down: %s" % err)
166
      sys.exit(constants.EXIT_FAILURE)
167
    except:
168
      # we need to catch any exception here, log it, but proceed, because even
169
      # if we failed handling a single request, we still want the confd to
170
      # continue working.
171
      logging.error("Unexpected exception", exc_info=True)
172

    
173
  def process_IN_MODIFY(self, event):
174
    # This gets called when the config file is modified. Note that this doesn't
175
    # usually happen in Ganeti, as the config file is normally replaced by a
176
    # new one, at filesystem level, rather than actually modified (see
177
    # utils.WriteFile)
178
    logging.debug("Received 'modify' inotify event for %s" % event.path)
179

    
180
    try:
181
      self.callback(True)
182
    except errors.ConfdFatalError, err:
183
      logging.critical("Critical error, shutting down: %s" % err)
184
      sys.exit(constants.EXIT_FAILURE)
185
    except:
186
      # we need to catch any exception here, log it, but proceed, because even
187
      # if we failed handling a single request, we still want the confd to
188
      # continue working.
189
      logging.error("Unexpected exception", exc_info=True)
190

    
191
  def process_default(self, event):
192
    logging.error("Received unhandled inotify event: %s" % event)
193

    
194

    
195
class ConfdConfigurationReloader(object):
196
  """Logic to control when to reload the ganeti configuration
197

    
198
  This class is able to alter between inotify and polling, to rate-limit the
199
  number of reloads. When using inotify it also supports a fallback timed
200
  check, to verify that the reload hasn't failed.
201

    
202
  """
203
  def __init__(self, processor, mainloop):
204
    """Constructor for ConfdConfigurationReloader
205

    
206
    @type processor: L{confd.server.ConfdProcessor}
207
    @param processor: ganeti-confd ConfdProcessor
208
    @type mainloop: L{daemon.Mainloop}
209
    @param mainloop: ganeti-confd mainloop
210

    
211
    """
212
    self.processor = processor
213
    self.mainloop = mainloop
214

    
215
    self.polling = True
216
    self.last_notification = 0
217

    
218
    # Asyncronous inotify handler for config changes
219
    self.wm = pyinotify.WatchManager()
220
    self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
221
    self.notifier = AsyncNotifier(self.wm, self.inotify_handler)
222

    
223
    self.timer_handle = None
224
    self._EnableTimer()
225

    
226
  def OnInotify(self, notifier_enabled):
227
    """Receive an inotify notification.
228

    
229
    @type notifier_enabled: boolean
230
    @param notifier_enabled: whether the notifier is still enabled
231

    
232
    """
233
    current_time = time.time()
234
    time_delta = current_time - self.last_notification
235
    self.last_notification = current_time
236

    
237
    if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
238
      logging.debug("Moving from inotify mode to polling mode")
239
      self.polling = True
240
      if notifier_enabled:
241
        self.inotify_handler.disable()
242

    
243
    if not self.polling and not notifier_enabled:
244
      try:
245
        self.inotify_handler.enable()
246
      except errors.InotifyError:
247
        self.polling = True
248

    
249
    try:
250
      reloaded = self.processor.reader.Reload()
251
      if reloaded:
252
        logging.info("Reloaded ganeti config")
253
      else:
254
        logging.debug("Skipped double config reload")
255
    except errors.ConfigurationError:
256
      self.DisableConfd()
257
      self.inotify_handler.disable()
258
      return
259

    
260
    # Reset the timer. If we're polling it will go to the polling rate, if
261
    # we're not it will delay it again to its base safe timeout.
262
    self._ResetTimer()
263

    
264
  def _DisableTimer(self):
265
    if self.timer_handle is not None:
266
      self.mainloop.scheduler.cancel(self.timer_handle)
267
      self.timer_handle = None
268

    
269
  def _EnableTimer(self):
270
    if self.polling:
271
      timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
272
    else:
273
      timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
274

    
275
    if self.timer_handle is None:
276
      self.timer_handle = self.mainloop.scheduler.enter(
277
        timeout, 1, self.OnTimer, [])
278

    
279
  def _ResetTimer(self):
280
    self._DisableTimer()
281
    self._EnableTimer()
282

    
283
  def OnTimer(self):
284
    """Function called when the timer fires
285

    
286
    """
287
    self.timer_handle = None
288
    reloaded = False
289
    was_disabled = False
290
    try:
291
      if self.processor.reader is None:
292
        was_disabled = True
293
        self.EnableConfd()
294
        reloaded = True
295
      else:
296
        reloaded = self.processor.reader.Reload()
297
    except errors.ConfigurationError:
298
      self.DisableConfd(silent=was_disabled)
299
      return
300

    
301
    if self.polling and reloaded:
302
      logging.info("Reloaded ganeti config")
303
    elif reloaded:
304
      # We have reloaded the config files, but received no inotify event.  If
305
      # an event is pending though, we just happen to have timed out before
306
      # receiving it, so this is not a problem, and we shouldn't alert
307
      if not self.notifier.check_events() and not was_disabled:
308
        logging.warning("Config file reload at timeout (inotify failure)")
309
    elif self.polling:
310
      # We're polling, but we haven't reloaded the config:
311
      # Going back to inotify mode
312
      logging.debug("Moving from polling mode to inotify mode")
313
      self.polling = False
314
      try:
315
        self.inotify_handler.enable()
316
      except errors.InotifyError:
317
        self.polling = True
318
    else:
319
      logging.debug("Performed configuration check")
320

    
321
    self._EnableTimer()
322

    
323
  def DisableConfd(self, silent=False):
324
    """Puts confd in non-serving mode
325

    
326
    """
327
    if not silent:
328
      logging.warning("Confd is being disabled")
329
    self.processor.Disable()
330
    self.polling = False
331
    self._ResetTimer()
332

    
333
  def EnableConfd(self):
334
    self.processor.Enable()
335
    logging.warning("Confd is being enabled")
336
    self.polling = True
337
    self._ResetTimer()
338

    
339

    
340
def CheckConfd(options, args):
341
  """Initial checks whether to run exit with a failure.
342

    
343
  """
344
  # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
345
  # have more than one.
346
  if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
347
    print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
348
    sys.exit(constants.EXIT_FAILURE)
349

    
350

    
351
def ExecConfd(options, args):
352
  """Main confd function, executed with PID file held
353

    
354
  """
355
  mainloop = daemon.Mainloop()
356

    
357
  # Asyncronous confd UDP server
358
  processor = ConfdProcessor()
359
  try:
360
    processor.Enable()
361
  except errors.ConfigurationError:
362
    # If enabling the processor has failed, we can still go on, but confd will
363
    # be disabled
364
    logging.warning("Confd is starting in disabled mode")
365
    pass
366
  server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
367

    
368
  # Configuration reloader
369
  reloader = ConfdConfigurationReloader(processor, mainloop)
370

    
371
  mainloop.Run()
372

    
373

    
374
def main():
375
  """Main function for the confd daemon.
376

    
377
  """
378
  parser = OptionParser(description="Ganeti configuration daemon",
379
                        usage="%prog [-f] [-d] [-b ADDRESS]",
380
                        version="%%prog (ganeti) %s" %
381
                        constants.RELEASE_VERSION)
382

    
383
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
384
  dirs.append((constants.LOG_OS_DIR, 0750))
385
  dirs.append((constants.LOCK_DIR, 1777))
386
  daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
387

    
388

    
389
if __name__ == "__main__":
390
  main()