Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-confd @ 0648750e

History | View | Annotate | Download (11.7 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2009, Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti configuration daemon
23

    
24
Ganeti-confd is a daemon to query master candidates for configuration values.
25
It uses UDP+HMAC for authentication with a global cluster key.
26

    
27
"""
28

    
29
import os
30
import sys
31
import logging
32
import pyinotify
33
import time
34

    
35
from optparse import OptionParser
36

    
37
from ganeti import asyncnotifier
38
from ganeti import confd
39
from ganeti.confd import server as confd_server
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import daemon
43
from ganeti import utils
44
from ganeti import ssconf
45

    
46

    
47
class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
48
  """The confd udp server, suitable for use with asyncore.
49

    
50
  """
51
  def __init__(self, bind_address, port, processor):
52
    """Constructor for ConfdAsyncUDPServer
53

    
54
    @type bind_address: string
55
    @param bind_address: socket bind address ('' for all)
56
    @type port: int
57
    @param port: udp port
58
    @type processor: L{confd.server.ConfdProcessor}
59
    @param processor: ConfdProcessor to use to handle queries
60

    
61
    """
62
    daemon.AsyncUDPSocket.__init__(self)
63
    self.bind_address = bind_address
64
    self.port = port
65
    self.processor = processor
66
    self.bind((bind_address, port))
67
    logging.debug("listening on ('%s':%d)" % (bind_address, port))
68

    
69
  # this method is overriding a daemon.AsyncUDPSocket method
70
  def handle_datagram(self, payload_in, ip, port):
71
    try:
72
      query = confd.UnpackMagic(payload_in)
73
    except errors.ConfdMagicError, err:
74
      logging.debug(err)
75
      return
76

    
77
    answer =  self.processor.ExecQuery(query, ip, port)
78
    if answer is not None:
79
      try:
80
        self.enqueue_send(ip, port, confd.PackMagic(answer))
81
      except errors.UdpDataSizeError:
82
        logging.error("Reply too big to fit in an udp packet.")
83

    
84

    
85
class ConfdInotifyEventHandler(pyinotify.ProcessEvent):
86

    
87
  def __init__(self, watch_manager, callback,
88
               file=constants.CLUSTER_CONF_FILE):
89
    """Constructor for ConfdInotifyEventHandler
90

    
91
    @type watch_manager: L{pyinotify.WatchManager}
92
    @param watch_manager: ganeti-confd inotify watch manager
93
    @type callback: function accepting a boolean
94
    @param callback: function to call when an inotify event happens
95
    @type file: string
96
    @param file: config file to watch
97

    
98
    """
99
    # no need to call the parent's constructor
100
    self.watch_manager = watch_manager
101
    self.callback = callback
102
    self.mask = pyinotify.EventsCodes.IN_IGNORED | \
103
                pyinotify.EventsCodes.IN_MODIFY
104
    self.file = file
105
    self.watch_handle = None
106

    
107
  def enable(self):
108
    """Watch the given file
109

    
110
    """
111
    if self.watch_handle is None:
112
      result = self.watch_manager.add_watch(self.file, self.mask)
113
      if not self.file in result or result[self.file] <= 0:
114
        raise errors.InotifyError("Could not add inotify watcher")
115
      else:
116
        self.watch_handle = result[self.file]
117

    
118
  def disable(self):
119
    """Stop watching the given file
120

    
121
    """
122
    if self.watch_handle is not None:
123
      result = self.watch_manager.rm_watch(self.watch_handle)
124
      if result[self.watch_handle]:
125
        self.watch_handle = None
126

    
127
  def process_IN_IGNORED(self, event):
128
    # Due to the fact that we monitor just for the cluster config file (rather
129
    # than for the whole data dir) when the file is replaced with another one
130
    # (which is what happens normally in ganeti) we're going to receive an
131
    # IN_IGNORED event from inotify, because of the file removal (which is
132
    # contextual with the replacement). In such a case we need to create
133
    # another watcher for the "new" file.
134
    logging.debug("Received 'ignored' inotify event for %s" % event.path)
135
    self.watch_handle = None
136

    
137
    try:
138
      # Since the kernel believes the file we were interested in is gone, it's
139
      # not going to notify us of any other events, until we set up, here, the
140
      # new watch. This is not a race condition, though, since we're anyway
141
      # going to realod the file after setting up the new watch.
142
      self.callback(False)
143
    except errors.ConfdFatalError, err:
144
      logging.critical("Critical error, shutting down: %s" % err)
145
      sys.exit(constants.EXIT_FAILURE)
146
    except:
147
      # we need to catch any exception here, log it, but proceed, because even
148
      # if we failed handling a single request, we still want the confd to
149
      # continue working.
150
      logging.error("Unexpected exception", exc_info=True)
151

    
152
  def process_IN_MODIFY(self, event):
153
    # This gets called when the config file is modified. Note that this doesn't
154
    # usually happen in Ganeti, as the config file is normally replaced by a
155
    # new one, at filesystem level, rather than actually modified (see
156
    # utils.WriteFile)
157
    logging.debug("Received 'modify' inotify event for %s" % event.path)
158

    
159
    try:
160
      self.callback(True)
161
    except errors.ConfdFatalError, err:
162
      logging.critical("Critical error, shutting down: %s" % err)
163
      sys.exit(constants.EXIT_FAILURE)
164
    except:
165
      # we need to catch any exception here, log it, but proceed, because even
166
      # if we failed handling a single request, we still want the confd to
167
      # continue working.
168
      logging.error("Unexpected exception", exc_info=True)
169

    
170
  def process_default(self, event):
171
    logging.error("Received unhandled inotify event: %s" % event)
172

    
173

    
174
class ConfdConfigurationReloader(object):
175
  """Logic to control when to reload the ganeti configuration
176

    
177
  This class is able to alter between inotify and polling, to rate-limit the
178
  number of reloads. When using inotify it also supports a fallback timed
179
  check, to verify that the reload hasn't failed.
180

    
181
  """
182
  def __init__(self, processor, mainloop):
183
    """Constructor for ConfdConfigurationReloader
184

    
185
    @type processor: L{confd.server.ConfdProcessor}
186
    @param processor: ganeti-confd ConfdProcessor
187
    @type mainloop: L{daemon.Mainloop}
188
    @param mainloop: ganeti-confd mainloop
189

    
190
    """
191
    self.processor = processor
192
    self.mainloop = mainloop
193

    
194
    self.polling = True
195
    self.last_notification = 0
196

    
197
    # Asyncronous inotify handler for config changes
198
    self.wm = pyinotify.WatchManager()
199
    self.inotify_handler = ConfdInotifyEventHandler(self.wm, self.OnInotify)
200
    self.notifier = asyncnotifier.AsyncNotifier(self.wm, self.inotify_handler)
201

    
202
    self.timer_handle = None
203
    self._EnableTimer()
204

    
205
  def OnInotify(self, notifier_enabled):
206
    """Receive an inotify notification.
207

    
208
    @type notifier_enabled: boolean
209
    @param notifier_enabled: whether the notifier is still enabled
210

    
211
    """
212
    current_time = time.time()
213
    time_delta = current_time - self.last_notification
214
    self.last_notification = current_time
215

    
216
    if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
217
      logging.debug("Moving from inotify mode to polling mode")
218
      self.polling = True
219
      if notifier_enabled:
220
        self.inotify_handler.disable()
221

    
222
    if not self.polling and not notifier_enabled:
223
      try:
224
        self.inotify_handler.enable()
225
      except errors.InotifyError:
226
        self.polling = True
227

    
228
    try:
229
      reloaded = self.processor.reader.Reload()
230
      if reloaded:
231
        logging.info("Reloaded ganeti config")
232
      else:
233
        logging.debug("Skipped double config reload")
234
    except errors.ConfigurationError:
235
      self.DisableConfd()
236
      self.inotify_handler.disable()
237
      return
238

    
239
    # Reset the timer. If we're polling it will go to the polling rate, if
240
    # we're not it will delay it again to its base safe timeout.
241
    self._ResetTimer()
242

    
243
  def _DisableTimer(self):
244
    if self.timer_handle is not None:
245
      self.mainloop.scheduler.cancel(self.timer_handle)
246
      self.timer_handle = None
247

    
248
  def _EnableTimer(self):
249
    if self.polling:
250
      timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
251
    else:
252
      timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
253

    
254
    if self.timer_handle is None:
255
      self.timer_handle = self.mainloop.scheduler.enter(
256
        timeout, 1, self.OnTimer, [])
257

    
258
  def _ResetTimer(self):
259
    self._DisableTimer()
260
    self._EnableTimer()
261

    
262
  def OnTimer(self):
263
    """Function called when the timer fires
264

    
265
    """
266
    self.timer_handle = None
267
    reloaded = False
268
    was_disabled = False
269
    try:
270
      if self.processor.reader is None:
271
        was_disabled = True
272
        self.EnableConfd()
273
        reloaded = True
274
      else:
275
        reloaded = self.processor.reader.Reload()
276
    except errors.ConfigurationError:
277
      self.DisableConfd(silent=was_disabled)
278
      return
279

    
280
    if self.polling and reloaded:
281
      logging.info("Reloaded ganeti config")
282
    elif reloaded:
283
      # We have reloaded the config files, but received no inotify event.  If
284
      # an event is pending though, we just happen to have timed out before
285
      # receiving it, so this is not a problem, and we shouldn't alert
286
      if not self.notifier.check_events() and not was_disabled:
287
        logging.warning("Config file reload at timeout (inotify failure)")
288
    elif self.polling:
289
      # We're polling, but we haven't reloaded the config:
290
      # Going back to inotify mode
291
      logging.debug("Moving from polling mode to inotify mode")
292
      self.polling = False
293
      try:
294
        self.inotify_handler.enable()
295
      except errors.InotifyError:
296
        self.polling = True
297
    else:
298
      logging.debug("Performed configuration check")
299

    
300
    self._EnableTimer()
301

    
302
  def DisableConfd(self, silent=False):
303
    """Puts confd in non-serving mode
304

    
305
    """
306
    if not silent:
307
      logging.warning("Confd is being disabled")
308
    self.processor.Disable()
309
    self.polling = False
310
    self._ResetTimer()
311

    
312
  def EnableConfd(self):
313
    self.processor.Enable()
314
    logging.warning("Confd is being enabled")
315
    self.polling = True
316
    self._ResetTimer()
317

    
318

    
319
def CheckConfd(options, args):
320
  """Initial checks whether to run exit with a failure.
321

    
322
  """
323
  # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
324
  # have more than one.
325
  if not os.path.isfile(constants.HMAC_CLUSTER_KEY):
326
    print >> sys.stderr, "Need HMAC key %s to run" % constants.HMAC_CLUSTER_KEY
327
    sys.exit(constants.EXIT_FAILURE)
328

    
329

    
330
def ExecConfd(options, args):
331
  """Main confd function, executed with PID file held
332

    
333
  """
334
  mainloop = daemon.Mainloop()
335

    
336
  # Asyncronous confd UDP server
337
  processor = confd_server.ConfdProcessor()
338
  try:
339
    processor.Enable()
340
  except errors.ConfigurationError:
341
    # If enabling the processor has failed, we can still go on, but confd will
342
    # be disabled
343
    logging.warning("Confd is starting in disabled mode")
344
    pass
345
  server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
346

    
347
  # Configuration reloader
348
  reloader = ConfdConfigurationReloader(processor, mainloop)
349

    
350
  mainloop.Run()
351

    
352

    
353
def main():
354
  """Main function for the confd daemon.
355

    
356
  """
357
  parser = OptionParser(description="Ganeti configuration daemon",
358
                        usage="%prog [-f] [-d] [-b ADDRESS]",
359
                        version="%%prog (ganeti) %s" %
360
                        constants.RELEASE_VERSION)
361

    
362
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
363
  dirs.append((constants.LOCK_DIR, 1777))
364
  daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
365

    
366

    
367
if __name__ == "__main__":
368
  main()