Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 92b61ec7

History | View | Annotate | Download (11.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import os
27
import signal
28
import errno
29
import logging
30
import sched
31
import time
32
import socket
33
import sys
34

    
35
from ganeti import utils
36
from ganeti import constants
37
from ganeti import errors
38

    
39

    
40
class SchedulerBreakout(Exception):
41
  """Exception used to get out of the scheduler loop
42

43
  """
44

    
45

    
46
def AsyncoreDelayFunction(timeout):
47
  """Asyncore-compatible scheduler delay function.
48

49
  This is a delay function for sched that, rather than actually sleeping,
50
  executes asyncore events happening in the meantime.
51

52
  After an event has occurred, rather than returning, it raises a
53
  SchedulerBreakout exception, which will force the current scheduler.run()
54
  invocation to terminate, so that we can also check for signals. The main loop
55
  will then call the scheduler run again, which will allow it to actually
56
  process any due events.
57

58
  This is needed because scheduler.run() doesn't support a count=..., as
59
  asyncore loop, and the scheduler module documents throwing exceptions from
60
  inside the delay function as an allowed usage model.
61

62
  """
63
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
64
  raise SchedulerBreakout()
65

    
66

    
67
class AsyncoreScheduler(sched.scheduler):
68
  """Event scheduler integrated with asyncore
69

70
  """
71
  def __init__(self, timefunc):
72
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
73

    
74

    
75
class AsyncUDPSocket(asyncore.dispatcher):
76
  """An improved asyncore udp socket.
77

78
  """
79
  def __init__(self):
80
    """Constructor for AsyncUDPSocket
81

82
    """
83
    asyncore.dispatcher.__init__(self)
84
    self._out_queue = []
85
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
86

    
87
  # this method is overriding an asyncore.dispatcher method
88
  def handle_connect(self):
89
    # Python thinks that the first udp message from a source qualifies as a
90
    # "connect" and further ones are part of the same connection. We beg to
91
    # differ and treat all messages equally.
92
    pass
93

    
94
  # this method is overriding an asyncore.dispatcher method
95
  def handle_read(self):
96
    try:
97
      try:
98
        payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE)
99
      except socket.error, err:
100
        if err.errno == errno.EINTR:
101
          # we got a signal while trying to read. no need to do anything,
102
          # handle_read will be called again if there is data on the socket.
103
          return
104
        else:
105
          raise
106
      ip, port = address
107
      self.handle_datagram(payload, ip, port)
108
    except: # pylint: disable-msg=W0702
109
      # we need to catch any exception here, log it, but proceed, because even
110
      # if we failed handling a single request, we still want to continue.
111
      logging.error("Unexpected exception", exc_info=True)
112

    
113
  def handle_datagram(self, payload, ip, port):
114
    """Handle an already read udp datagram
115

116
    """
117
    raise NotImplementedError
118

    
119
  # this method is overriding an asyncore.dispatcher method
120
  def writable(self):
121
    # We should check whether we can write to the socket only if we have
122
    # something scheduled to be written
123
    return bool(self._out_queue)
124

    
125
  def handle_write(self):
126
    try:
127
      if not self._out_queue:
128
        logging.error("handle_write called with empty output queue")
129
        return
130
      (ip, port, payload) = self._out_queue[0]
131
      try:
132
        self.sendto(payload, 0, (ip, port))
133
      except socket.error, err:
134
        if err.errno == errno.EINTR:
135
          # we got a signal while trying to write. no need to do anything,
136
          # handle_write will be called again because we haven't emptied the
137
          # _out_queue, and we'll try again
138
          return
139
        else:
140
          raise
141
      self._out_queue.pop(0)
142
    except: # pylint: disable-msg=W0702
143
      # we need to catch any exception here, log it, but proceed, because even
144
      # if we failed sending a single datagram we still want to continue.
145
      logging.error("Unexpected exception", exc_info=True)
146

    
147
  def enqueue_send(self, ip, port, payload):
148
    """Enqueue a datagram to be sent when possible
149

150
    """
151
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
152
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
153
                                    constants.MAX_UDP_DATA_SIZE))
154
    self._out_queue.append((ip, port, payload))
155

    
156

    
157
class Mainloop(object):
158
  """Generic mainloop for daemons
159

160
  @ivar scheduler: A sched.scheduler object, which can be used to register
161
    timed events
162

163
  """
164
  def __init__(self):
165
    """Constructs a new Mainloop instance.
166

167
    """
168
    self._signal_wait = []
169
    self.scheduler = AsyncoreScheduler(time.time)
170

    
171
  @utils.SignalHandled([signal.SIGCHLD])
172
  @utils.SignalHandled([signal.SIGTERM])
173
  def Run(self, signal_handlers=None):
174
    """Runs the mainloop.
175

176
    @type signal_handlers: dict
177
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
178

179
    """
180
    assert isinstance(signal_handlers, dict) and \
181
           len(signal_handlers) > 0, \
182
           "Broken SignalHandled decorator"
183
    running = True
184
    # Start actual main loop
185
    while running:
186
      if not self.scheduler.empty():
187
        try:
188
          self.scheduler.run()
189
        except SchedulerBreakout:
190
          pass
191
      else:
192
        asyncore.loop(count=1, use_poll=True)
193

    
194
      # Check whether a signal was raised
195
      for sig in signal_handlers:
196
        handler = signal_handlers[sig]
197
        if handler.called:
198
          self._CallSignalWaiters(sig)
199
          running = (sig != signal.SIGTERM)
200
          handler.Clear()
201

    
202
  def _CallSignalWaiters(self, signum):
203
    """Calls all signal waiters for a certain signal.
204

205
    @type signum: int
206
    @param signum: Signal number
207

208
    """
209
    for owner in self._signal_wait:
210
      owner.OnSignal(signum)
211

    
212
  def RegisterSignal(self, owner):
213
    """Registers a receiver for signal notifications
214

215
    The receiver must support a "OnSignal(self, signum)" function.
216

217
    @type owner: instance
218
    @param owner: Receiver
219

220
    """
221
    self._signal_wait.append(owner)
222

    
223

    
224
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
225
                multithreaded=False,
226
                default_ssl_cert=None, default_ssl_key=None):
227
  """Shared main function for daemons.
228

229
  @type daemon_name: string
230
  @param daemon_name: daemon name
231
  @type optionparser: optparse.OptionParser
232
  @param optionparser: initialized optionparser with daemon-specific options
233
                       (common -f -d options will be handled by this module)
234
  @type dirs: list of strings
235
  @param dirs: list of directories that must exist for this daemon to work
236
  @type check_fn: function which accepts (options, args)
237
  @param check_fn: function that checks start conditions and exits if they're
238
                   not met
239
  @type exec_fn: function which accepts (options, args)
240
  @param exec_fn: function that's executed with the daemon's pid file held, and
241
                  runs the daemon itself.
242
  @type multithreaded: bool
243
  @param multithreaded: Whether the daemon uses threads
244
  @type default_ssl_cert: string
245
  @param default_ssl_cert: Default SSL certificate path
246
  @type default_ssl_key: string
247
  @param default_ssl_key: Default SSL key path
248

249
  """
250
  optionparser.add_option("-f", "--foreground", dest="fork",
251
                          help="Don't detach from the current terminal",
252
                          default=True, action="store_false")
253
  optionparser.add_option("-d", "--debug", dest="debug",
254
                          help="Enable some debug messages",
255
                          default=False, action="store_true")
256
  optionparser.add_option("--syslog", dest="syslog",
257
                          help="Enable logging to syslog (except debug"
258
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
259
                          constants.SYSLOG_USAGE,
260
                          default=constants.SYSLOG_USAGE,
261
                          choices=["no", "yes", "only"])
262

    
263
  if daemon_name in constants.DAEMONS_PORTS:
264
    default_bind_address = "0.0.0.0"
265
    default_port = utils.GetDaemonPort(daemon_name)
266

    
267
    # For networked daemons we allow choosing the port and bind address
268
    optionparser.add_option("-p", "--port", dest="port",
269
                            help="Network port (default: %s)" % default_port,
270
                            default=default_port, type="int")
271
    optionparser.add_option("-b", "--bind", dest="bind_address",
272
                            help=("Bind address (default: %s)" %
273
                                  default_bind_address),
274
                            default=default_bind_address, metavar="ADDRESS")
275

    
276
  if default_ssl_key is not None and default_ssl_cert is not None:
277
    optionparser.add_option("--no-ssl", dest="ssl",
278
                            help="Do not secure HTTP protocol with SSL",
279
                            default=True, action="store_false")
280
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
281
                            help=("SSL key path (default: %s)" %
282
                                  default_ssl_key),
283
                            default=default_ssl_key, type="string",
284
                            metavar="SSL_KEY_PATH")
285
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
286
                            help=("SSL certificate path (default: %s)" %
287
                                  default_ssl_cert),
288
                            default=default_ssl_cert, type="string",
289
                            metavar="SSL_CERT_PATH")
290

    
291
  # Disable the use of fork(2) if the daemon uses threads
292
  utils.no_fork = multithreaded
293

    
294
  options, args = optionparser.parse_args()
295

    
296
  if getattr(options, "ssl", False):
297
    ssl_paths = {
298
      "certificate": options.ssl_cert,
299
      "key": options.ssl_key,
300
      }
301

    
302
    for name, path in ssl_paths.iteritems():
303
      if not os.path.isfile(path):
304
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
305
        sys.exit(constants.EXIT_FAILURE)
306

    
307
    # TODO: By initiating http.HttpSslParams here we would only read the files
308
    # once and have a proper validation (isfile returns False on directories)
309
    # at the same time.
310

    
311
  if check_fn is not None:
312
    check_fn(options, args)
313

    
314
  utils.EnsureDirs(dirs)
315

    
316
  if options.fork:
317
    utils.CloseFDs()
318
    utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
319

    
320
  utils.WritePidFile(daemon_name)
321
  try:
322
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
323
                       debug=options.debug,
324
                       stderr_logging=not options.fork,
325
                       multithreaded=multithreaded,
326
                       program=daemon_name,
327
                       syslog=options.syslog)
328
    logging.info("%s daemon startup", daemon_name)
329
    exec_fn(options, args)
330
  finally:
331
    utils.RemovePidFile(daemon_name)