Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 5a062513

History | View | Annotate | Download (11.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import os
27
import signal
28
import errno
29
import logging
30
import sched
31
import time
32
import socket
33
import select
34
import sys
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import errors
39

    
40

    
41
class SchedulerBreakout(Exception):
42
  """Exception used to get out of the scheduler loop
43

44
  """
45

    
46

    
47
def AsyncoreDelayFunction(timeout):
48
  """Asyncore-compatible scheduler delay function.
49

50
  This is a delay function for sched that, rather than actually sleeping,
51
  executes asyncore events happening in the meantime.
52

53
  After an event has occurred, rather than returning, it raises a
54
  SchedulerBreakout exception, which will force the current scheduler.run()
55
  invocation to terminate, so that we can also check for signals. The main loop
56
  will then call the scheduler run again, which will allow it to actually
57
  process any due events.
58

59
  This is needed because scheduler.run() doesn't support a count=..., as
60
  asyncore loop, and the scheduler module documents throwing exceptions from
61
  inside the delay function as an allowed usage model.
62

63
  """
64
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
65
  raise SchedulerBreakout()
66

    
67

    
68
class AsyncoreScheduler(sched.scheduler):
69
  """Event scheduler integrated with asyncore
70

71
  """
72
  def __init__(self, timefunc):
73
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
74

    
75

    
76
class AsyncUDPSocket(asyncore.dispatcher):
77
  """An improved asyncore udp socket.
78

79
  """
80
  def __init__(self):
81
    """Constructor for AsyncUDPSocket
82

83
    """
84
    asyncore.dispatcher.__init__(self)
85
    self._out_queue = []
86
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
87

    
88
  # this method is overriding an asyncore.dispatcher method
89
  def handle_connect(self):
90
    # Python thinks that the first udp message from a source qualifies as a
91
    # "connect" and further ones are part of the same connection. We beg to
92
    # differ and treat all messages equally.
93
    pass
94

    
95
  def do_read(self):
96
    try:
97
      payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE)
98
    except socket.error, err:
99
      if err.errno == errno.EINTR:
100
        # we got a signal while trying to read. no need to do anything,
101
        # handle_read will be called again if there is data on the socket.
102
        return
103
      else:
104
        raise
105
    ip, port = address
106
    self.handle_datagram(payload, ip, port)
107

    
108
  # this method is overriding an asyncore.dispatcher method
109
  def handle_read(self):
110
    try:
111
      self.do_read()
112
    except: # pylint: disable-msg=W0702
113
      # we need to catch any exception here, log it, but proceed, because even
114
      # if we failed handling a single request, we still want to continue.
115
      logging.error("Unexpected exception", exc_info=True)
116

    
117
  def handle_datagram(self, payload, ip, port):
118
    """Handle an already read udp datagram
119

120
    """
121
    raise NotImplementedError
122

    
123
  # this method is overriding an asyncore.dispatcher method
124
  def writable(self):
125
    # We should check whether we can write to the socket only if we have
126
    # something scheduled to be written
127
    return bool(self._out_queue)
128

    
129
  def handle_write(self):
130
    try:
131
      if not self._out_queue:
132
        logging.error("handle_write called with empty output queue")
133
        return
134
      (ip, port, payload) = self._out_queue[0]
135
      try:
136
        self.sendto(payload, 0, (ip, port))
137
      except socket.error, err:
138
        if err.errno == errno.EINTR:
139
          # we got a signal while trying to write. no need to do anything,
140
          # handle_write will be called again because we haven't emptied the
141
          # _out_queue, and we'll try again
142
          return
143
        else:
144
          raise
145
      self._out_queue.pop(0)
146
    except: # pylint: disable-msg=W0702
147
      # we need to catch any exception here, log it, but proceed, because even
148
      # if we failed sending a single datagram we still want to continue.
149
      logging.error("Unexpected exception", exc_info=True)
150

    
151
  def enqueue_send(self, ip, port, payload):
152
    """Enqueue a datagram to be sent when possible
153

154
    """
155
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
156
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
157
                                    constants.MAX_UDP_DATA_SIZE))
158
    self._out_queue.append((ip, port, payload))
159

    
160
  def process_next_packet(self, timeout=0):
161
    """Process the next datagram, waiting for it if necessary.
162

163
    @type timeout: float
164
    @param timeout: how long to wait for data
165
    @rtype: boolean
166
    @return: True if some data has been handled, False otherwise
167

168
    """
169
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
170
    if result is not None and result & select.POLLIN:
171
      self.do_read()
172
      return True
173
    else:
174
      return False
175

    
176

    
177
class Mainloop(object):
178
  """Generic mainloop for daemons
179

180
  @ivar scheduler: A sched.scheduler object, which can be used to register
181
    timed events
182

183
  """
184
  def __init__(self):
185
    """Constructs a new Mainloop instance.
186

187
    """
188
    self._signal_wait = []
189
    self.scheduler = AsyncoreScheduler(time.time)
190

    
191
  @utils.SignalHandled([signal.SIGCHLD])
192
  @utils.SignalHandled([signal.SIGTERM])
193
  def Run(self, signal_handlers=None):
194
    """Runs the mainloop.
195

196
    @type signal_handlers: dict
197
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
198

199
    """
200
    assert isinstance(signal_handlers, dict) and \
201
           len(signal_handlers) > 0, \
202
           "Broken SignalHandled decorator"
203
    running = True
204
    # Start actual main loop
205
    while running:
206
      if not self.scheduler.empty():
207
        try:
208
          self.scheduler.run()
209
        except SchedulerBreakout:
210
          pass
211
      else:
212
        asyncore.loop(count=1, use_poll=True)
213

    
214
      # Check whether a signal was raised
215
      for sig in signal_handlers:
216
        handler = signal_handlers[sig]
217
        if handler.called:
218
          self._CallSignalWaiters(sig)
219
          running = (sig != signal.SIGTERM)
220
          handler.Clear()
221

    
222
  def _CallSignalWaiters(self, signum):
223
    """Calls all signal waiters for a certain signal.
224

225
    @type signum: int
226
    @param signum: Signal number
227

228
    """
229
    for owner in self._signal_wait:
230
      owner.OnSignal(signum)
231

    
232
  def RegisterSignal(self, owner):
233
    """Registers a receiver for signal notifications
234

235
    The receiver must support a "OnSignal(self, signum)" function.
236

237
    @type owner: instance
238
    @param owner: Receiver
239

240
    """
241
    self._signal_wait.append(owner)
242

    
243

    
244
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
245
  """Shared main function for daemons.
246

247
  @type daemon_name: string
248
  @param daemon_name: daemon name
249
  @type optionparser: optparse.OptionParser
250
  @param optionparser: initialized optionparser with daemon-specific options
251
                       (common -f -d options will be handled by this module)
252
  @type dirs: list of (string, integer)
253
  @param dirs: list of directories that must be created if they don't exist,
254
               and the permissions to be used to create them
255
  @type check_fn: function which accepts (options, args)
256
  @param check_fn: function that checks start conditions and exits if they're
257
                   not met
258
  @type exec_fn: function which accepts (options, args)
259
  @param exec_fn: function that's executed with the daemon's pid file held, and
260
                  runs the daemon itself.
261

262
  """
263
  optionparser.add_option("-f", "--foreground", dest="fork",
264
                          help="Don't detach from the current terminal",
265
                          default=True, action="store_false")
266
  optionparser.add_option("-d", "--debug", dest="debug",
267
                          help="Enable some debug messages",
268
                          default=False, action="store_true")
269
  optionparser.add_option("--syslog", dest="syslog",
270
                          help="Enable logging to syslog (except debug"
271
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
272
                          constants.SYSLOG_USAGE,
273
                          default=constants.SYSLOG_USAGE,
274
                          choices=["no", "yes", "only"])
275
  if daemon_name in constants.DAEMONS_PORTS:
276
    # for networked daemons we also allow choosing the bind port and address.
277
    # by default we use the port provided by utils.GetDaemonPort, and bind to
278
    # 0.0.0.0 (which is represented by and empty bind address.
279
    port = utils.GetDaemonPort(daemon_name)
280
    optionparser.add_option("-p", "--port", dest="port",
281
                            help="Network port (%s default)." % port,
282
                            default=port, type="int")
283
    optionparser.add_option("-b", "--bind", dest="bind_address",
284
                            help="Bind address",
285
                            default="", metavar="ADDRESS")
286

    
287
  if daemon_name in constants.DAEMONS_SSL:
288
    default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
289
    optionparser.add_option("--no-ssl", dest="ssl",
290
                            help="Do not secure HTTP protocol with SSL",
291
                            default=True, action="store_false")
292
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
293
                            help="SSL key",
294
                            default=default_key, type="string")
295
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
296
                            help="SSL certificate",
297
                            default=default_cert, type="string")
298

    
299
  multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
300

    
301
  options, args = optionparser.parse_args()
302

    
303
  if hasattr(options, 'ssl') and options.ssl:
304
    if not (options.ssl_cert and options.ssl_key):
305
      print >> sys.stderr, "Need key and certificate to use ssl"
306
      sys.exit(constants.EXIT_FAILURE)
307
    for fname in (options.ssl_cert, options.ssl_key):
308
      if not os.path.isfile(fname):
309
        print >> sys.stderr, "Need ssl file %s to run" % fname
310
        sys.exit(constants.EXIT_FAILURE)
311

    
312
  if check_fn is not None:
313
    check_fn(options, args)
314

    
315
  utils.EnsureDirs(dirs)
316

    
317
  if options.fork:
318
    utils.CloseFDs()
319
    utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
320

    
321
  utils.WritePidFile(daemon_name)
322
  try:
323
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
324
                       debug=options.debug,
325
                       stderr_logging=not options.fork,
326
                       multithreaded=multithread,
327
                       program=daemon_name,
328
                       syslog=options.syslog)
329
    logging.info("%s daemon startup", daemon_name)
330
    exec_fn(options, args)
331
  finally:
332
    utils.RemovePidFile(daemon_name)