Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ c124045f

History | View | Annotate | Download (10.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import os
27
import select
28
import signal
29
import errno
30
import logging
31
import sched
32
import time
33
import socket
34
import sys
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import errors
39

    
40

    
41
class SchedulerBreakout(Exception):
42
  """Exception used to get out of the scheduler loop
43

44
  """
45

    
46

    
47
def AsyncoreDelayFunction(timeout):
48
  """Asyncore-compatible scheduler delay function.
49

50
  This is a delay function for sched that, rather than actually sleeping,
51
  executes asyncore events happening in the meantime.
52

53
  After an event has occurred, rather than returning, it raises a
54
  SchedulerBreakout exception, which will force the current scheduler.run()
55
  invocation to terminate, so that we can also check for signals. The main loop
56
  will then call the scheduler run again, which will allow it to actually
57
  process any due events.
58

59
  This is needed because scheduler.run() doesn't support a count=..., as
60
  asyncore loop, and the scheduler module documents throwing exceptions from
61
  inside the delay function as an allowed usage model.
62

63
  """
64
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
65
  raise SchedulerBreakout()
66

    
67

    
68
class AsyncoreScheduler(sched.scheduler):
69
  """Event scheduler integrated with asyncore
70

71
  """
72
  def __init__(self, timefunc):
73
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
74

    
75

    
76
class AsyncUDPSocket(asyncore.dispatcher):
77
  """An improved asyncore udp socket.
78

79
  """
80
  def __init__(self):
81
    """Constructor for AsyncUDPSocket
82

83
    """
84
    asyncore.dispatcher.__init__(self)
85
    self._out_queue = []
86
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
87

    
88
  # this method is overriding an asyncore.dispatcher method
89
  def handle_connect(self):
90
    # Python thinks that the first udp message from a source qualifies as a
91
    # "connect" and further ones are part of the same connection. We beg to
92
    # differ and treat all messages equally.
93
    pass
94

    
95
  # this method is overriding an asyncore.dispatcher method
96
  def handle_read(self):
97
    try:
98
      try:
99
        payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE)
100
      except socket.error, err:
101
        if err.errno == errno.EINTR:
102
          # we got a signal while trying to read. no need to do anything,
103
          # handle_read will be called again if there is data on the socket.
104
          return
105
        else:
106
          raise
107
      ip, port = address
108
      self.handle_datagram(payload, ip, port)
109
    except:
110
      # we need to catch any exception here, log it, but proceed, because even
111
      # if we failed handling a single request, we still want to continue.
112
      logging.error("Unexpected exception", exc_info=True)
113

    
114
  def handle_datagram(self, payload, ip, port):
115
    """Handle an already read udp datagram
116

117
    """
118
    raise NotImplementedError
119

    
120
  # this method is overriding an asyncore.dispatcher method
121
  def writable(self):
122
    # We should check whether we can write to the socket only if we have
123
    # something scheduled to be written
124
    return bool(self._out_queue)
125

    
126
  def handle_write(self):
127
    try:
128
      if not self._out_queue:
129
        logging.error("handle_write called with empty output queue")
130
        return
131
      (ip, port, payload) = self._out_queue[0]
132
      try:
133
        self.sendto(payload, 0, (ip, port))
134
      except socket.error, err:
135
        if err.errno == errno.EINTR:
136
          # we got a signal while trying to write. no need to do anything,
137
          # handle_write will be called again because we haven't emptied the
138
          # _out_queue, and we'll try again
139
          return
140
        else:
141
          raise
142
      self._out_queue.pop(0)
143
    except:
144
      # we need to catch any exception here, log it, but proceed, because even
145
      # if we failed sending a single datagram we still want to continue.
146
      logging.error("Unexpected exception", exc_info=True)
147

    
148
  def enqueue_send(self, ip, port, payload):
149
    """Enqueue a datagram to be sent when possible
150

151
    """
152
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
153
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
154
                                    constants.MAX_UDP_DATA_SIZE))
155
    self._out_queue.append((ip, port, payload))
156

    
157

    
158
class Mainloop(object):
159
  """Generic mainloop for daemons
160

161
  """
162
  def __init__(self):
163
    """Constructs a new Mainloop instance.
164

165
    @ivar scheduler: A L{sched.scheduler} object, which can be used to register
166
    timed events
167

168
    """
169
    self._signal_wait = []
170
    self.scheduler = AsyncoreScheduler(time.time)
171

    
172
  @utils.SignalHandled([signal.SIGCHLD])
173
  @utils.SignalHandled([signal.SIGTERM])
174
  def Run(self, stop_on_empty=False, signal_handlers=None):
175
    """Runs the mainloop.
176

177
    @type stop_on_empty: bool
178
    @param stop_on_empty: Whether to stop mainloop once all I/O waiters
179
                          unregistered
180
    @type signal_handlers: dict
181
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
182

183
    """
184
    assert isinstance(signal_handlers, dict) and \
185
           len(signal_handlers) > 0, \
186
           "Broken SignalHandled decorator"
187
    running = True
188
    # Start actual main loop
189
    while running:
190
      # Stop if nothing is listening anymore
191
      if stop_on_empty and not (self._io_wait):
192
        break
193

    
194
      if not self.scheduler.empty():
195
        try:
196
          self.scheduler.run()
197
        except SchedulerBreakout:
198
          pass
199
      else:
200
        asyncore.loop(count=1, use_poll=True)
201

    
202
      # Check whether a signal was raised
203
      for sig in signal_handlers:
204
        handler = signal_handlers[sig]
205
        if handler.called:
206
          self._CallSignalWaiters(sig)
207
          running = (sig != signal.SIGTERM)
208
          handler.Clear()
209

    
210
  def _CallSignalWaiters(self, signum):
211
    """Calls all signal waiters for a certain signal.
212

213
    @type signum: int
214
    @param signum: Signal number
215

216
    """
217
    for owner in self._signal_wait:
218
      owner.OnSignal(signum)
219

    
220
  def RegisterSignal(self, owner):
221
    """Registers a receiver for signal notifications
222

223
    The receiver must support a "OnSignal(self, signum)" function.
224

225
    @type owner: instance
226
    @param owner: Receiver
227

228
    """
229
    self._signal_wait.append(owner)
230

    
231

    
232
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
233
  """Shared main function for daemons.
234

235
  @type daemon_name: string
236
  @param daemon_name: daemon name
237
  @type optionparser: L{optparse.OptionParser}
238
  @param optionparser: initialized optionparser with daemon-specific options
239
                       (common -f -d options will be handled by this module)
240
  @type options: object @param options: OptionParser result, should contain at
241
                 least the fork and the debug options
242
  @type dirs: list of strings
243
  @param dirs: list of directories that must exist for this daemon to work
244
  @type check_fn: function which accepts (options, args)
245
  @param check_fn: function that checks start conditions and exits if they're
246
                   not met
247
  @type exec_fn: function which accepts (options, args)
248
  @param exec_fn: function that's executed with the daemon's pid file held, and
249
                  runs the daemon itself.
250

251
  """
252
  optionparser.add_option("-f", "--foreground", dest="fork",
253
                          help="Don't detach from the current terminal",
254
                          default=True, action="store_false")
255
  optionparser.add_option("-d", "--debug", dest="debug",
256
                          help="Enable some debug messages",
257
                          default=False, action="store_true")
258
  if daemon_name in constants.DAEMONS_PORTS:
259
    # for networked daemons we also allow choosing the bind port and address.
260
    # by default we use the port provided by utils.GetDaemonPort, and bind to
261
    # 0.0.0.0 (which is represented by and empty bind address.
262
    port = utils.GetDaemonPort(daemon_name)
263
    optionparser.add_option("-p", "--port", dest="port",
264
                            help="Network port (%s default)." % port,
265
                            default=port, type="int")
266
    optionparser.add_option("-b", "--bind", dest="bind_address",
267
                            help="Bind address",
268
                            default="", metavar="ADDRESS")
269

    
270
  if daemon_name in constants.DAEMONS_SSL:
271
    default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
272
    optionparser.add_option("--no-ssl", dest="ssl",
273
                            help="Do not secure HTTP protocol with SSL",
274
                            default=True, action="store_false")
275
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
276
                            help="SSL key",
277
                            default=default_key, type="string")
278
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
279
                            help="SSL certificate",
280
                            default=default_cert, type="string")
281

    
282
  multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
283

    
284
  options, args = optionparser.parse_args()
285

    
286
  if hasattr(options, 'ssl') and options.ssl:
287
    if not (options.ssl_cert and options.ssl_key):
288
      print >> sys.stderr, "Need key and certificate to use ssl"
289
      sys.exit(constants.EXIT_FAILURE)
290
    for fname in (options.ssl_cert, options.ssl_key):
291
      if not os.path.isfile(fname):
292
        print >> sys.stderr, "Need ssl file %s to run" % fname
293
        sys.exit(constants.EXIT_FAILURE)
294

    
295
  if check_fn is not None:
296
    check_fn(options, args)
297

    
298
  utils.EnsureDirs(dirs)
299

    
300
  if options.fork:
301
    utils.CloseFDs()
302
    utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
303

    
304
  utils.WritePidFile(daemon_name)
305
  try:
306
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
307
                       debug=options.debug,
308
                       stderr_logging=not options.fork,
309
                       multithreaded=multithread)
310
    logging.info("%s daemon startup" % daemon_name)
311
    exec_fn(options, args)
312
  finally:
313
    utils.RemovePidFile(daemon_name)