Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 5f3269fc

History | View | Annotate | Download (10.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import os
27
import select
28
import signal
29
import errno
30
import logging
31
import sched
32
import time
33
import socket
34

    
35
from ganeti import utils
36
from ganeti import constants
37
from ganeti import errors
38

    
39

    
40
class SchedulerBreakout(Exception):
41
  """Exception used to get out of the scheduler loop
42

43
  """
44

    
45

    
46
def AsyncoreDelayFunction(timeout):
47
  """Asyncore-compatible scheduler delay function.
48

49
  This is a delay function for sched that, rather than actually sleeping,
50
  executes asyncore events happening in the meantime.
51

52
  After an event has occurred, rather than returning, it raises a
53
  SchedulerBreakout exception, which will force the current scheduler.run()
54
  invocation to terminate, so that we can also check for signals. The main loop
55
  will then call the scheduler run again, which will allow it to actually
56
  process any due events.
57

58
  This is needed because scheduler.run() doesn't support a count=..., as
59
  asyncore loop, and the scheduler module documents throwing exceptions from
60
  inside the delay function as an allowed usage model.
61

62
  """
63
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
64
  raise SchedulerBreakout()
65

    
66

    
67
class AsyncoreScheduler(sched.scheduler):
68
  """Event scheduler integrated with asyncore
69

70
  """
71
  def __init__(self, timefunc):
72
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
73

    
74

    
75
class AsyncUDPSocket(asyncore.dispatcher):
76
  """An improved asyncore udp socket.
77

78
  """
79
  def __init__(self):
80
    """Constructor for AsyncUDPSocket
81

82
    """
83
    asyncore.dispatcher.__init__(self)
84
    self._out_queue = []
85
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
86

    
87
  # this method is overriding an asyncore.dispatcher method
88
  def handle_connect(self):
89
    # Python thinks that the first udp message from a source qualifies as a
90
    # "connect" and further ones are part of the same connection. We beg to
91
    # differ and treat all messages equally.
92
    pass
93

    
94
  # this method is overriding an asyncore.dispatcher method
95
  def handle_read(self):
96
    try:
97
      try:
98
        payload, address = self.recvfrom(4096)
99
      except socket.error, err:
100
        if err.errno == errno.EINTR:
101
          # we got a signal while trying to read. no need to do anything,
102
          # handle_read will be called again if there is data on the socket.
103
          return
104
        else:
105
          raise
106
      ip, port = address
107
      self.handle_datagram(payload, ip, port)
108
    except:
109
      # we need to catch any exception here, log it, but proceed, because even
110
      # if we failed handling a single request, we still want to continue.
111
      logging.error("Unexpected exception", exc_info=True)
112

    
113
  def handle_datagram(self, payload, ip, port):
114
    """Handle an already read udp datagram
115

116
    """
117
    raise NotImplementedError
118

    
119
  # this method is overriding an asyncore.dispatcher method
120
  def writable(self):
121
    # We should check whether we can write to the socket only if we have
122
    # something scheduled to be written
123
    return bool(self._out_queue)
124

    
125
  def handle_write(self):
126
    try:
127
      if not self._out_queue:
128
        logging.error("handle_write called with empty output queue")
129
        return
130
      (ip, port, payload) = self._out_queue[0]
131
      try:
132
        self.sendto(payload, 0, (ip, port))
133
      except socket.error, err:
134
        if err.errno == errno.EINTR:
135
          # we got a signal while trying to write. no need to do anything,
136
          # handle_write will be called again because we haven't emptied the
137
          # _out_queue, and we'll try again
138
          return
139
        else:
140
          raise
141
      self._out_queue.pop(0)
142
    except:
143
      # we need to catch any exception here, log it, but proceed, because even
144
      # if we failed sending a single datagram we still want to continue.
145
      logging.error("Unexpected exception", exc_info=True)
146

    
147
  def enqueue_send(self, ip, port, payload):
148
    """Enqueue a datagram to be sent when possible
149

150
    """
151
    self._out_queue.append((ip, port, payload))
152

    
153

    
154
class Mainloop(object):
155
  """Generic mainloop for daemons
156

157
  """
158
  def __init__(self):
159
    """Constructs a new Mainloop instance.
160

161
    @ivar scheduler: A L{sched.scheduler} object, which can be used to register
162
    timed events
163

164
    """
165
    self._signal_wait = []
166
    self.scheduler = AsyncoreScheduler(time.time)
167

    
168
  @utils.SignalHandled([signal.SIGCHLD])
169
  @utils.SignalHandled([signal.SIGTERM])
170
  def Run(self, stop_on_empty=False, signal_handlers=None):
171
    """Runs the mainloop.
172

173
    @type stop_on_empty: bool
174
    @param stop_on_empty: Whether to stop mainloop once all I/O waiters
175
                          unregistered
176
    @type signal_handlers: dict
177
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
178

179
    """
180
    assert isinstance(signal_handlers, dict) and \
181
           len(signal_handlers) > 0, \
182
           "Broken SignalHandled decorator"
183
    running = True
184
    # Start actual main loop
185
    while running:
186
      # Stop if nothing is listening anymore
187
      if stop_on_empty and not (self._io_wait):
188
        break
189

    
190
      if not self.scheduler.empty():
191
        try:
192
          self.scheduler.run()
193
        except SchedulerBreakout:
194
          pass
195
      else:
196
        asyncore.loop(count=1, use_poll=True)
197

    
198
      # Check whether a signal was raised
199
      for sig in signal_handlers:
200
        handler = signal_handlers[sig]
201
        if handler.called:
202
          self._CallSignalWaiters(sig)
203
          running = (sig != signal.SIGTERM)
204
          handler.Clear()
205

    
206
  def _CallSignalWaiters(self, signum):
207
    """Calls all signal waiters for a certain signal.
208

209
    @type signum: int
210
    @param signum: Signal number
211

212
    """
213
    for owner in self._signal_wait:
214
      owner.OnSignal(signum)
215

    
216
  def RegisterSignal(self, owner):
217
    """Registers a receiver for signal notifications
218

219
    The receiver must support a "OnSignal(self, signum)" function.
220

221
    @type owner: instance
222
    @param owner: Receiver
223

224
    """
225
    self._signal_wait.append(owner)
226

    
227

    
228
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
229
  """Shared main function for daemons.
230

231
  @type daemon_name: string
232
  @param daemon_name: daemon name
233
  @type optionparser: L{optparse.OptionParser}
234
  @param optionparser: initialized optionparser with daemon-specific options
235
                       (common -f -d options will be handled by this module)
236
  @type options: object @param options: OptionParser result, should contain at
237
                 least the fork and the debug options
238
  @type dirs: list of strings
239
  @param dirs: list of directories that must exist for this daemon to work
240
  @type check_fn: function which accepts (options, args)
241
  @param check_fn: function that checks start conditions and exits if they're
242
                   not met
243
  @type exec_fn: function which accepts (options, args)
244
  @param exec_fn: function that's executed with the daemon's pid file held, and
245
                  runs the daemon itself.
246

247
  """
248
  optionparser.add_option("-f", "--foreground", dest="fork",
249
                          help="Don't detach from the current terminal",
250
                          default=True, action="store_false")
251
  optionparser.add_option("-d", "--debug", dest="debug",
252
                          help="Enable some debug messages",
253
                          default=False, action="store_true")
254
  if daemon_name in constants.DAEMONS_PORTS:
255
    # for networked daemons we also allow choosing the bind port and address.
256
    # by default we use the port provided by utils.GetDaemonPort, and bind to
257
    # 0.0.0.0 (which is represented by and empty bind address.
258
    port = utils.GetDaemonPort(daemon_name)
259
    optionparser.add_option("-p", "--port", dest="port",
260
                            help="Network port (%s default)." % port,
261
                            default=port, type="int")
262
    optionparser.add_option("-b", "--bind", dest="bind_address",
263
                            help="Bind address",
264
                            default="", metavar="ADDRESS")
265

    
266
  if daemon_name in constants.DAEMONS_SSL:
267
    default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
268
    optionparser.add_option("--no-ssl", dest="ssl",
269
                            help="Do not secure HTTP protocol with SSL",
270
                            default=True, action="store_false")
271
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
272
                            help="SSL key",
273
                            default=default_key, type="string")
274
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
275
                            help="SSL certificate",
276
                            default=default_cert, type="string")
277

    
278
  multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
279

    
280
  options, args = optionparser.parse_args()
281

    
282
  if hasattr(options, 'ssl') and options.ssl:
283
    if not (options.ssl_cert and options.ssl_key):
284
      print >> sys.stderr, "Need key and certificate to use ssl"
285
      sys.exit(constants.EXIT_FAILURE)
286
    for fname in (options.ssl_cert, options.ssl_key):
287
      if not os.path.isfile(fname):
288
        print >> sys.stderr, "Need ssl file %s to run" % fname
289
        sys.exit(constants.EXIT_FAILURE)
290

    
291
  if check_fn is not None:
292
    check_fn(options, args)
293

    
294
  utils.EnsureDirs(dirs)
295

    
296
  if options.fork:
297
    utils.CloseFDs()
298
    utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
299

    
300
  utils.WritePidFile(daemon_name)
301
  try:
302
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
303
                       debug=options.debug,
304
                       stderr_logging=not options.fork,
305
                       multithreaded=multithread)
306
    logging.info("%s daemon startup" % daemon_name)
307
    exec_fn(options, args)
308
  finally:
309
    utils.RemovePidFile(daemon_name)
310