Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 232144d0

History | View | Annotate | Download (10.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import os
27
import signal
28
import errno
29
import logging
30
import sched
31
import time
32
import socket
33
import select
34
import sys
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import errors
39

    
40

    
41
class SchedulerBreakout(Exception):
42
  """Exception used to get out of the scheduler loop
43

44
  """
45

    
46

    
47
def AsyncoreDelayFunction(timeout):
48
  """Asyncore-compatible scheduler delay function.
49

50
  This is a delay function for sched that, rather than actually sleeping,
51
  executes asyncore events happening in the meantime.
52

53
  After an event has occurred, rather than returning, it raises a
54
  SchedulerBreakout exception, which will force the current scheduler.run()
55
  invocation to terminate, so that we can also check for signals. The main loop
56
  will then call the scheduler run again, which will allow it to actually
57
  process any due events.
58

59
  This is needed because scheduler.run() doesn't support a count=..., as
60
  asyncore loop, and the scheduler module documents throwing exceptions from
61
  inside the delay function as an allowed usage model.
62

63
  """
64
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
65
  raise SchedulerBreakout()
66

    
67

    
68
class AsyncoreScheduler(sched.scheduler):
69
  """Event scheduler integrated with asyncore
70

71
  """
72
  def __init__(self, timefunc):
73
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
74

    
75

    
76
class AsyncUDPSocket(asyncore.dispatcher):
77
  """An improved asyncore udp socket.
78

79
  """
80
  def __init__(self):
81
    """Constructor for AsyncUDPSocket
82

83
    """
84
    asyncore.dispatcher.__init__(self)
85
    self._out_queue = []
86
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
87

    
88
  # this method is overriding an asyncore.dispatcher method
89
  def handle_connect(self):
90
    # Python thinks that the first udp message from a source qualifies as a
91
    # "connect" and further ones are part of the same connection. We beg to
92
    # differ and treat all messages equally.
93
    pass
94

    
95
  # this method is overriding an asyncore.dispatcher method
96
  def handle_read(self):
97
    payload, address = utils.IgnoreSignals(self.recvfrom,
98
                                           constants.MAX_UDP_DATA_SIZE)
99
    ip, port = address
100
    self.handle_datagram(payload, ip, port)
101

    
102
  def handle_datagram(self, payload, ip, port):
103
    """Handle an already read udp datagram
104

105
    """
106
    raise NotImplementedError
107

    
108
  # this method is overriding an asyncore.dispatcher method
109
  def writable(self):
110
    # We should check whether we can write to the socket only if we have
111
    # something scheduled to be written
112
    return bool(self._out_queue)
113

    
114
  # this method is overriding an asyncore.dispatcher method
115
  def handle_write(self):
116
    if not self._out_queue:
117
      logging.error("handle_write called with empty output queue")
118
      return
119
    (ip, port, payload) = self._out_queue[0]
120
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
121
    self._out_queue.pop(0)
122

    
123
  # this method is overriding an asyncore.dispatcher method
124
  def handle_error(self):
125
    """Log an error in handling any request, and proceed.
126

127
    """
128
    logging.exception("Error while handling asyncore request")
129

    
130
  def enqueue_send(self, ip, port, payload):
131
    """Enqueue a datagram to be sent when possible
132

133
    """
134
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
135
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
136
                                    constants.MAX_UDP_DATA_SIZE))
137
    self._out_queue.append((ip, port, payload))
138

    
139
  def process_next_packet(self, timeout=0):
140
    """Process the next datagram, waiting for it if necessary.
141

142
    @type timeout: float
143
    @param timeout: how long to wait for data
144
    @rtype: boolean
145
    @return: True if some data has been handled, False otherwise
146

147
    """
148
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
149
    if result is not None and result & select.POLLIN:
150
      self.handle_read()
151
      return True
152
    else:
153
      return False
154

    
155

    
156
class Mainloop(object):
157
  """Generic mainloop for daemons
158

159
  @ivar scheduler: A sched.scheduler object, which can be used to register
160
    timed events
161

162
  """
163
  def __init__(self):
164
    """Constructs a new Mainloop instance.
165

166
    """
167
    self._signal_wait = []
168
    self.scheduler = AsyncoreScheduler(time.time)
169

    
170
  @utils.SignalHandled([signal.SIGCHLD])
171
  @utils.SignalHandled([signal.SIGTERM])
172
  def Run(self, signal_handlers=None):
173
    """Runs the mainloop.
174

175
    @type signal_handlers: dict
176
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
177

178
    """
179
    assert isinstance(signal_handlers, dict) and \
180
           len(signal_handlers) > 0, \
181
           "Broken SignalHandled decorator"
182
    running = True
183
    # Start actual main loop
184
    while running:
185
      if not self.scheduler.empty():
186
        try:
187
          self.scheduler.run()
188
        except SchedulerBreakout:
189
          pass
190
      else:
191
        asyncore.loop(count=1, use_poll=True)
192

    
193
      # Check whether a signal was raised
194
      for sig in signal_handlers:
195
        handler = signal_handlers[sig]
196
        if handler.called:
197
          self._CallSignalWaiters(sig)
198
          running = (sig != signal.SIGTERM)
199
          handler.Clear()
200

    
201
  def _CallSignalWaiters(self, signum):
202
    """Calls all signal waiters for a certain signal.
203

204
    @type signum: int
205
    @param signum: Signal number
206

207
    """
208
    for owner in self._signal_wait:
209
      owner.OnSignal(signum)
210

    
211
  def RegisterSignal(self, owner):
212
    """Registers a receiver for signal notifications
213

214
    The receiver must support a "OnSignal(self, signum)" function.
215

216
    @type owner: instance
217
    @param owner: Receiver
218

219
    """
220
    self._signal_wait.append(owner)
221

    
222

    
223
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
224
                console_logging=False):
225
  """Shared main function for daemons.
226

227
  @type daemon_name: string
228
  @param daemon_name: daemon name
229
  @type optionparser: optparse.OptionParser
230
  @param optionparser: initialized optionparser with daemon-specific options
231
                       (common -f -d options will be handled by this module)
232
  @type dirs: list of (string, integer)
233
  @param dirs: list of directories that must be created if they don't exist,
234
               and the permissions to be used to create them
235
  @type check_fn: function which accepts (options, args)
236
  @param check_fn: function that checks start conditions and exits if they're
237
                   not met
238
  @type exec_fn: function which accepts (options, args)
239
  @param exec_fn: function that's executed with the daemon's pid file held, and
240
                  runs the daemon itself.
241
  @type console_logging: boolean
242
  @param console_logging: if True, the daemon will fall back to the system
243
                          console if logging fails
244

245
  """
246
  optionparser.add_option("-f", "--foreground", dest="fork",
247
                          help="Don't detach from the current terminal",
248
                          default=True, action="store_false")
249
  optionparser.add_option("-d", "--debug", dest="debug",
250
                          help="Enable some debug messages",
251
                          default=False, action="store_true")
252
  optionparser.add_option("--syslog", dest="syslog",
253
                          help="Enable logging to syslog (except debug"
254
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
255
                          constants.SYSLOG_USAGE,
256
                          default=constants.SYSLOG_USAGE,
257
                          choices=["no", "yes", "only"])
258
  if daemon_name in constants.DAEMONS_PORTS:
259
    # for networked daemons we also allow choosing the bind port and address.
260
    # by default we use the port provided by utils.GetDaemonPort, and bind to
261
    # 0.0.0.0 (which is represented by and empty bind address.
262
    port = utils.GetDaemonPort(daemon_name)
263
    optionparser.add_option("-p", "--port", dest="port",
264
                            help="Network port (%s default)." % port,
265
                            default=port, type="int")
266
    optionparser.add_option("-b", "--bind", dest="bind_address",
267
                            help="Bind address",
268
                            default="", metavar="ADDRESS")
269

    
270
  if daemon_name in constants.DAEMONS_SSL:
271
    default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
272
    optionparser.add_option("--no-ssl", dest="ssl",
273
                            help="Do not secure HTTP protocol with SSL",
274
                            default=True, action="store_false")
275
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
276
                            help="SSL key",
277
                            default=default_key, type="string")
278
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
279
                            help="SSL certificate",
280
                            default=default_cert, type="string")
281

    
282
  multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
283

    
284
  options, args = optionparser.parse_args()
285

    
286
  if hasattr(options, 'ssl') and options.ssl:
287
    if not (options.ssl_cert and options.ssl_key):
288
      print >> sys.stderr, "Need key and certificate to use ssl"
289
      sys.exit(constants.EXIT_FAILURE)
290
    for fname in (options.ssl_cert, options.ssl_key):
291
      if not os.path.isfile(fname):
292
        print >> sys.stderr, "Need ssl file %s to run" % fname
293
        sys.exit(constants.EXIT_FAILURE)
294

    
295
  if check_fn is not None:
296
    check_fn(options, args)
297

    
298
  utils.EnsureDirs(dirs)
299

    
300
  if options.fork:
301
    utils.CloseFDs()
302
    utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
303

    
304
  utils.WritePidFile(daemon_name)
305
  try:
306
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
307
                       debug=options.debug,
308
                       stderr_logging=not options.fork,
309
                       multithreaded=multithread,
310
                       program=daemon_name,
311
                       syslog=options.syslog,
312
                       console_logging=console_logging)
313
    logging.info("%s daemon startup", daemon_name)
314
    exec_fn(options, args)
315
  finally:
316
    utils.RemovePidFile(daemon_name)