Statistics
| Branch: | Tag: | Revision:

root / daemons / ganeti-confd @ e9c8deab

History | View | Annotate | Download (8.8 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2009, Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti configuration daemon
23

    
24
Ganeti-confd is a daemon to query master candidates for configuration values.
25
It uses UDP+HMAC for authentication with a global cluster key.
26

    
27
"""
28

    
29
# pylint: disable-msg=C0103
30
# C0103: Invalid name ganeti-confd
31

    
32
import os
33
import sys
34
import logging
35
import time
36

    
37
try:
38
  # pylint: disable-msg=E0611
39
  from pyinotify import pyinotify
40
except ImportError:
41
  import pyinotify
42

    
43
from optparse import OptionParser
44

    
45
from ganeti import asyncnotifier
46
from ganeti import confd
47
from ganeti.confd import server as confd_server
48
from ganeti import constants
49
from ganeti import errors
50
from ganeti import daemon
51

    
52

    
53
class ConfdAsyncUDPServer(daemon.AsyncUDPSocket):
54
  """The confd udp server, suitable for use with asyncore.
55

    
56
  """
57
  def __init__(self, bind_address, port, processor):
58
    """Constructor for ConfdAsyncUDPServer
59

    
60
    @type bind_address: string
61
    @param bind_address: socket bind address ('' for all)
62
    @type port: int
63
    @param port: udp port
64
    @type processor: L{confd.server.ConfdProcessor}
65
    @param processor: ConfdProcessor to use to handle queries
66

    
67
    """
68
    daemon.AsyncUDPSocket.__init__(self)
69
    self.bind_address = bind_address
70
    self.port = port
71
    self.processor = processor
72
    self.bind((bind_address, port))
73
    logging.debug("listening on ('%s':%d)", bind_address, port)
74

    
75
  # this method is overriding a daemon.AsyncUDPSocket method
76
  def handle_datagram(self, payload_in, ip, port):
77
    try:
78
      query = confd.UnpackMagic(payload_in)
79
    except errors.ConfdMagicError, err:
80
      logging.debug(err)
81
      return
82

    
83
    answer =  self.processor.ExecQuery(query, ip, port)
84
    if answer is not None:
85
      try:
86
        self.enqueue_send(ip, port, confd.PackMagic(answer))
87
      except errors.UdpDataSizeError:
88
        logging.error("Reply too big to fit in an udp packet.")
89

    
90

    
91
class ConfdConfigurationReloader(object):
92
  """Logic to control when to reload the ganeti configuration
93

    
94
  This class is able to alter between inotify and polling, to rate-limit the
95
  number of reloads. When using inotify it also supports a fallback timed
96
  check, to verify that the reload hasn't failed.
97

    
98
  """
99
  def __init__(self, processor, mainloop):
100
    """Constructor for ConfdConfigurationReloader
101

    
102
    @type processor: L{confd.server.ConfdProcessor}
103
    @param processor: ganeti-confd ConfdProcessor
104
    @type mainloop: L{daemon.Mainloop}
105
    @param mainloop: ganeti-confd mainloop
106

    
107
    """
108
    self.processor = processor
109
    self.mainloop = mainloop
110

    
111
    self.polling = True
112
    self.last_notification = 0
113

    
114
    # Asyncronous inotify handler for config changes
115
    cfg_file = constants.CLUSTER_CONF_FILE
116
    self.wm = pyinotify.WatchManager()
117
    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
118
                                                                self.OnInotify,
119
                                                                cfg_file)
120
    notifier_class = asyncnotifier.ErrorLoggingAsyncNotifier
121
    self.notifier = notifier_class(self.wm, self.inotify_handler)
122

    
123
    self.timer_handle = None
124
    self._EnableTimer()
125

    
126
  def OnInotify(self, notifier_enabled):
127
    """Receive an inotify notification.
128

    
129
    @type notifier_enabled: boolean
130
    @param notifier_enabled: whether the notifier is still enabled
131

    
132
    """
133
    current_time = time.time()
134
    time_delta = current_time - self.last_notification
135
    self.last_notification = current_time
136

    
137
    if time_delta < constants.CONFD_CONFIG_RELOAD_RATELIMIT:
138
      logging.debug("Moving from inotify mode to polling mode")
139
      self.polling = True
140
      if notifier_enabled:
141
        self.inotify_handler.disable()
142

    
143
    if not self.polling and not notifier_enabled:
144
      try:
145
        self.inotify_handler.enable()
146
      except errors.InotifyError:
147
        self.polling = True
148

    
149
    try:
150
      reloaded = self.processor.reader.Reload()
151
      if reloaded:
152
        logging.info("Reloaded ganeti config")
153
      else:
154
        logging.debug("Skipped double config reload")
155
    except errors.ConfigurationError:
156
      self.DisableConfd()
157
      self.inotify_handler.disable()
158
      return
159

    
160
    # Reset the timer. If we're polling it will go to the polling rate, if
161
    # we're not it will delay it again to its base safe timeout.
162
    self._ResetTimer()
163

    
164
  def _DisableTimer(self):
165
    if self.timer_handle is not None:
166
      self.mainloop.scheduler.cancel(self.timer_handle)
167
      self.timer_handle = None
168

    
169
  def _EnableTimer(self):
170
    if self.polling:
171
      timeout = constants.CONFD_CONFIG_RELOAD_RATELIMIT
172
    else:
173
      timeout = constants.CONFD_CONFIG_RELOAD_TIMEOUT
174

    
175
    if self.timer_handle is None:
176
      self.timer_handle = self.mainloop.scheduler.enter(
177
        timeout, 1, self.OnTimer, [])
178

    
179
  def _ResetTimer(self):
180
    self._DisableTimer()
181
    self._EnableTimer()
182

    
183
  def OnTimer(self):
184
    """Function called when the timer fires
185

    
186
    """
187
    self.timer_handle = None
188
    reloaded = False
189
    was_disabled = False
190
    try:
191
      if self.processor.reader is None:
192
        was_disabled = True
193
        self.EnableConfd()
194
        reloaded = True
195
      else:
196
        reloaded = self.processor.reader.Reload()
197
    except errors.ConfigurationError:
198
      self.DisableConfd(silent=was_disabled)
199
      return
200

    
201
    if self.polling and reloaded:
202
      logging.info("Reloaded ganeti config")
203
    elif reloaded:
204
      # We have reloaded the config files, but received no inotify event.  If
205
      # an event is pending though, we just happen to have timed out before
206
      # receiving it, so this is not a problem, and we shouldn't alert
207
      if not self.notifier.check_events() and not was_disabled:
208
        logging.warning("Config file reload at timeout (inotify failure)")
209
    elif self.polling:
210
      # We're polling, but we haven't reloaded the config:
211
      # Going back to inotify mode
212
      logging.debug("Moving from polling mode to inotify mode")
213
      self.polling = False
214
      try:
215
        self.inotify_handler.enable()
216
      except errors.InotifyError:
217
        self.polling = True
218
    else:
219
      logging.debug("Performed configuration check")
220

    
221
    self._EnableTimer()
222

    
223
  def DisableConfd(self, silent=False):
224
    """Puts confd in non-serving mode
225

    
226
    """
227
    if not silent:
228
      logging.warning("Confd is being disabled")
229
    self.processor.Disable()
230
    self.polling = False
231
    self._ResetTimer()
232

    
233
  def EnableConfd(self):
234
    self.processor.Enable()
235
    logging.warning("Confd is being enabled")
236
    self.polling = True
237
    self._ResetTimer()
238

    
239

    
240
def CheckConfd(_, args):
241
  """Initial checks whether to run exit with a failure.
242

    
243
  """
244
  if args: # confd doesn't take any arguments
245
    print >> sys.stderr, ("Usage: %s [-f] [-d] [-b ADDRESS]" % sys.argv[0])
246
    sys.exit(constants.EXIT_FAILURE)
247

    
248
  # TODO: collapse HMAC daemons handling in daemons GenericMain, when we'll
249
  # have more than one.
250
  if not os.path.isfile(constants.CONFD_HMAC_KEY):
251
    print >> sys.stderr, "Need HMAC key %s to run" % constants.CONFD_HMAC_KEY
252
    sys.exit(constants.EXIT_FAILURE)
253

    
254

    
255
def ExecConfd(options, _):
256
  """Main confd function, executed with PID file held
257

    
258
  """
259
  # TODO: clarify how the server and reloader variables work (they are
260
  # not used)
261
  # pylint: disable-msg=W0612
262
  mainloop = daemon.Mainloop()
263

    
264
  # Asyncronous confd UDP server
265
  processor = confd_server.ConfdProcessor()
266
  try:
267
    processor.Enable()
268
  except errors.ConfigurationError:
269
    # If enabling the processor has failed, we can still go on, but confd will
270
    # be disabled
271
    logging.warning("Confd is starting in disabled mode")
272

    
273
  server = ConfdAsyncUDPServer(options.bind_address, options.port, processor)
274

    
275
  # Configuration reloader
276
  reloader = ConfdConfigurationReloader(processor, mainloop)
277

    
278
  mainloop.Run()
279

    
280

    
281
def main():
282
  """Main function for the confd daemon.
283

    
284
  """
285
  parser = OptionParser(description="Ganeti configuration daemon",
286
                        usage="%prog [-f] [-d] [-b ADDRESS]",
287
                        version="%%prog (ganeti) %s" %
288
                        constants.RELEASE_VERSION)
289

    
290
  dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
291
  dirs.append((constants.LOCK_DIR, 1777))
292
  daemon.GenericMain(constants.CONFD, parser, dirs, CheckConfd, ExecConfd)
293

    
294

    
295
if __name__ == "__main__":
296
  main()