Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 48bf6352

History | View | Annotate | Download (11.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import os
27
import signal
28
import errno
29
import logging
30
import sched
31
import time
32
import socket
33
import select
34
import sys
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import errors
39

    
40

    
41
class SchedulerBreakout(Exception):
42
  """Exception used to get out of the scheduler loop
43

44
  """
45

    
46

    
47
def AsyncoreDelayFunction(timeout):
48
  """Asyncore-compatible scheduler delay function.
49

50
  This is a delay function for sched that, rather than actually sleeping,
51
  executes asyncore events happening in the meantime.
52

53
  After an event has occurred, rather than returning, it raises a
54
  SchedulerBreakout exception, which will force the current scheduler.run()
55
  invocation to terminate, so that we can also check for signals. The main loop
56
  will then call the scheduler run again, which will allow it to actually
57
  process any due events.
58

59
  This is needed because scheduler.run() doesn't support a count=..., as
60
  asyncore loop, and the scheduler module documents throwing exceptions from
61
  inside the delay function as an allowed usage model.
62

63
  """
64
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
65
  raise SchedulerBreakout()
66

    
67

    
68
class AsyncoreScheduler(sched.scheduler):
69
  """Event scheduler integrated with asyncore
70

71
  """
72
  def __init__(self, timefunc):
73
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
74

    
75

    
76
class AsyncUDPSocket(asyncore.dispatcher):
77
  """An improved asyncore udp socket.
78

79
  """
80
  def __init__(self):
81
    """Constructor for AsyncUDPSocket
82

83
    """
84
    asyncore.dispatcher.__init__(self)
85
    self._out_queue = []
86
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
87

    
88
  # this method is overriding an asyncore.dispatcher method
89
  def handle_connect(self):
90
    # Python thinks that the first udp message from a source qualifies as a
91
    # "connect" and further ones are part of the same connection. We beg to
92
    # differ and treat all messages equally.
93
    pass
94

    
95
  def do_read(self):
96
    try:
97
      payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE)
98
    except socket.error, err:
99
      if err.errno == errno.EINTR:
100
        # we got a signal while trying to read. no need to do anything,
101
        # handle_read will be called again if there is data on the socket.
102
        return
103
      else:
104
        raise
105
    ip, port = address
106
    self.handle_datagram(payload, ip, port)
107

    
108
  # this method is overriding an asyncore.dispatcher method
109
  def handle_read(self):
110
    try:
111
      self.do_read()
112
    except: # pylint: disable-msg=W0702
113
      # we need to catch any exception here, log it, but proceed, because even
114
      # if we failed handling a single request, we still want to continue.
115
      logging.error("Unexpected exception", exc_info=True)
116

    
117
  def handle_datagram(self, payload, ip, port):
118
    """Handle an already read udp datagram
119

120
    """
121
    raise NotImplementedError
122

    
123
  # this method is overriding an asyncore.dispatcher method
124
  def writable(self):
125
    # We should check whether we can write to the socket only if we have
126
    # something scheduled to be written
127
    return bool(self._out_queue)
128

    
129
  # this method is overriding an asyncore.dispatcher method
130
  def handle_write(self):
131
    try:
132
      if not self._out_queue:
133
        logging.error("handle_write called with empty output queue")
134
        return
135
      (ip, port, payload) = self._out_queue[0]
136
      try:
137
        self.sendto(payload, 0, (ip, port))
138
      except socket.error, err:
139
        if err.errno == errno.EINTR:
140
          # we got a signal while trying to write. no need to do anything,
141
          # handle_write will be called again because we haven't emptied the
142
          # _out_queue, and we'll try again
143
          return
144
        else:
145
          raise
146
      self._out_queue.pop(0)
147
    except: # pylint: disable-msg=W0702
148
      # we need to catch any exception here, log it, but proceed, because even
149
      # if we failed sending a single datagram we still want to continue.
150
      logging.error("Unexpected exception", exc_info=True)
151

    
152
  def enqueue_send(self, ip, port, payload):
153
    """Enqueue a datagram to be sent when possible
154

155
    """
156
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
157
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
158
                                    constants.MAX_UDP_DATA_SIZE))
159
    self._out_queue.append((ip, port, payload))
160

    
161
  def process_next_packet(self, timeout=0):
162
    """Process the next datagram, waiting for it if necessary.
163

164
    @type timeout: float
165
    @param timeout: how long to wait for data
166
    @rtype: boolean
167
    @return: True if some data has been handled, False otherwise
168

169
    """
170
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
171
    if result is not None and result & select.POLLIN:
172
      self.do_read()
173
      return True
174
    else:
175
      return False
176

    
177

    
178
class Mainloop(object):
179
  """Generic mainloop for daemons
180

181
  @ivar scheduler: A sched.scheduler object, which can be used to register
182
    timed events
183

184
  """
185
  def __init__(self):
186
    """Constructs a new Mainloop instance.
187

188
    """
189
    self._signal_wait = []
190
    self.scheduler = AsyncoreScheduler(time.time)
191

    
192
  @utils.SignalHandled([signal.SIGCHLD])
193
  @utils.SignalHandled([signal.SIGTERM])
194
  def Run(self, signal_handlers=None):
195
    """Runs the mainloop.
196

197
    @type signal_handlers: dict
198
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
199

200
    """
201
    assert isinstance(signal_handlers, dict) and \
202
           len(signal_handlers) > 0, \
203
           "Broken SignalHandled decorator"
204
    running = True
205
    # Start actual main loop
206
    while running:
207
      if not self.scheduler.empty():
208
        try:
209
          self.scheduler.run()
210
        except SchedulerBreakout:
211
          pass
212
      else:
213
        asyncore.loop(count=1, use_poll=True)
214

    
215
      # Check whether a signal was raised
216
      for sig in signal_handlers:
217
        handler = signal_handlers[sig]
218
        if handler.called:
219
          self._CallSignalWaiters(sig)
220
          running = (sig != signal.SIGTERM)
221
          handler.Clear()
222

    
223
  def _CallSignalWaiters(self, signum):
224
    """Calls all signal waiters for a certain signal.
225

226
    @type signum: int
227
    @param signum: Signal number
228

229
    """
230
    for owner in self._signal_wait:
231
      owner.OnSignal(signum)
232

    
233
  def RegisterSignal(self, owner):
234
    """Registers a receiver for signal notifications
235

236
    The receiver must support a "OnSignal(self, signum)" function.
237

238
    @type owner: instance
239
    @param owner: Receiver
240

241
    """
242
    self._signal_wait.append(owner)
243

    
244

    
245
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
246
                console_logging=False):
247
  """Shared main function for daemons.
248

249
  @type daemon_name: string
250
  @param daemon_name: daemon name
251
  @type optionparser: optparse.OptionParser
252
  @param optionparser: initialized optionparser with daemon-specific options
253
                       (common -f -d options will be handled by this module)
254
  @type dirs: list of (string, integer)
255
  @param dirs: list of directories that must be created if they don't exist,
256
               and the permissions to be used to create them
257
  @type check_fn: function which accepts (options, args)
258
  @param check_fn: function that checks start conditions and exits if they're
259
                   not met
260
  @type exec_fn: function which accepts (options, args)
261
  @param exec_fn: function that's executed with the daemon's pid file held, and
262
                  runs the daemon itself.
263
  @type console_logging: boolean
264
  @param console_logging: if True, the daemon will fall back to the system
265
                          console if logging fails
266

267
  """
268
  optionparser.add_option("-f", "--foreground", dest="fork",
269
                          help="Don't detach from the current terminal",
270
                          default=True, action="store_false")
271
  optionparser.add_option("-d", "--debug", dest="debug",
272
                          help="Enable some debug messages",
273
                          default=False, action="store_true")
274
  optionparser.add_option("--syslog", dest="syslog",
275
                          help="Enable logging to syslog (except debug"
276
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
277
                          constants.SYSLOG_USAGE,
278
                          default=constants.SYSLOG_USAGE,
279
                          choices=["no", "yes", "only"])
280
  if daemon_name in constants.DAEMONS_PORTS:
281
    # for networked daemons we also allow choosing the bind port and address.
282
    # by default we use the port provided by utils.GetDaemonPort, and bind to
283
    # 0.0.0.0 (which is represented by and empty bind address.
284
    port = utils.GetDaemonPort(daemon_name)
285
    optionparser.add_option("-p", "--port", dest="port",
286
                            help="Network port (%s default)." % port,
287
                            default=port, type="int")
288
    optionparser.add_option("-b", "--bind", dest="bind_address",
289
                            help="Bind address",
290
                            default="", metavar="ADDRESS")
291

    
292
  if daemon_name in constants.DAEMONS_SSL:
293
    default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
294
    optionparser.add_option("--no-ssl", dest="ssl",
295
                            help="Do not secure HTTP protocol with SSL",
296
                            default=True, action="store_false")
297
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
298
                            help="SSL key",
299
                            default=default_key, type="string")
300
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
301
                            help="SSL certificate",
302
                            default=default_cert, type="string")
303

    
304
  multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
305

    
306
  options, args = optionparser.parse_args()
307

    
308
  if hasattr(options, 'ssl') and options.ssl:
309
    if not (options.ssl_cert and options.ssl_key):
310
      print >> sys.stderr, "Need key and certificate to use ssl"
311
      sys.exit(constants.EXIT_FAILURE)
312
    for fname in (options.ssl_cert, options.ssl_key):
313
      if not os.path.isfile(fname):
314
        print >> sys.stderr, "Need ssl file %s to run" % fname
315
        sys.exit(constants.EXIT_FAILURE)
316

    
317
  if check_fn is not None:
318
    check_fn(options, args)
319

    
320
  utils.EnsureDirs(dirs)
321

    
322
  if options.fork:
323
    utils.CloseFDs()
324
    utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
325

    
326
  utils.WritePidFile(daemon_name)
327
  try:
328
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
329
                       debug=options.debug,
330
                       stderr_logging=not options.fork,
331
                       multithreaded=multithread,
332
                       program=daemon_name,
333
                       syslog=options.syslog,
334
                       console_logging=console_logging)
335
    logging.info("%s daemon startup", daemon_name)
336
    exec_fn(options, args)
337
  finally:
338
    utils.RemovePidFile(daemon_name)