Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ f961e2ba

History | View | Annotate | Download (10.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import os
27
import signal
28
import logging
29
import sched
30
import time
31
import socket
32
import select
33
import sys
34

    
35
from ganeti import utils
36
from ganeti import constants
37
from ganeti import errors
38

    
39

    
40
class SchedulerBreakout(Exception):
41
  """Exception used to get out of the scheduler loop
42

43
  """
44

    
45

    
46
def AsyncoreDelayFunction(timeout):
47
  """Asyncore-compatible scheduler delay function.
48

49
  This is a delay function for sched that, rather than actually sleeping,
50
  executes asyncore events happening in the meantime.
51

52
  After an event has occurred, rather than returning, it raises a
53
  SchedulerBreakout exception, which will force the current scheduler.run()
54
  invocation to terminate, so that we can also check for signals. The main loop
55
  will then call the scheduler run again, which will allow it to actually
56
  process any due events.
57

58
  This is needed because scheduler.run() doesn't support a count=..., as
59
  asyncore loop, and the scheduler module documents throwing exceptions from
60
  inside the delay function as an allowed usage model.
61

62
  """
63
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
64
  raise SchedulerBreakout()
65

    
66

    
67
class AsyncoreScheduler(sched.scheduler):
68
  """Event scheduler integrated with asyncore
69

70
  """
71
  def __init__(self, timefunc):
72
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
73

    
74

    
75
class AsyncUDPSocket(asyncore.dispatcher):
76
  """An improved asyncore udp socket.
77

78
  """
79
  def __init__(self):
80
    """Constructor for AsyncUDPSocket
81

82
    """
83
    asyncore.dispatcher.__init__(self)
84
    self._out_queue = []
85
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
86

    
87
  # this method is overriding an asyncore.dispatcher method
88
  def handle_connect(self):
89
    # Python thinks that the first udp message from a source qualifies as a
90
    # "connect" and further ones are part of the same connection. We beg to
91
    # differ and treat all messages equally.
92
    pass
93

    
94
  # this method is overriding an asyncore.dispatcher method
95
  def handle_read(self):
96
    payload, address = utils.IgnoreSignals(self.recvfrom,
97
                                           constants.MAX_UDP_DATA_SIZE)
98
    ip, port = address
99
    self.handle_datagram(payload, ip, port)
100

    
101
  def handle_datagram(self, payload, ip, port):
102
    """Handle an already read udp datagram
103

104
    """
105
    raise NotImplementedError
106

    
107
  # this method is overriding an asyncore.dispatcher method
108
  def writable(self):
109
    # We should check whether we can write to the socket only if we have
110
    # something scheduled to be written
111
    return bool(self._out_queue)
112

    
113
  # this method is overriding an asyncore.dispatcher method
114
  def handle_write(self):
115
    if not self._out_queue:
116
      logging.error("handle_write called with empty output queue")
117
      return
118
    (ip, port, payload) = self._out_queue[0]
119
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
120
    self._out_queue.pop(0)
121

    
122
  # this method is overriding an asyncore.dispatcher method
123
  def handle_error(self):
124
    """Log an error in handling any request, and proceed.
125

126
    """
127
    logging.exception("Error while handling asyncore request")
128

    
129
  def enqueue_send(self, ip, port, payload):
130
    """Enqueue a datagram to be sent when possible
131

132
    """
133
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
134
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
135
                                    constants.MAX_UDP_DATA_SIZE))
136
    self._out_queue.append((ip, port, payload))
137

    
138
  def process_next_packet(self, timeout=0):
139
    """Process the next datagram, waiting for it if necessary.
140

141
    @type timeout: float
142
    @param timeout: how long to wait for data
143
    @rtype: boolean
144
    @return: True if some data has been handled, False otherwise
145

146
    """
147
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
148
    if result is not None and result & select.POLLIN:
149
      self.handle_read()
150
      return True
151
    else:
152
      return False
153

    
154

    
155
class Mainloop(object):
156
  """Generic mainloop for daemons
157

158
  @ivar scheduler: A sched.scheduler object, which can be used to register
159
    timed events
160

161
  """
162
  def __init__(self):
163
    """Constructs a new Mainloop instance.
164

165
    """
166
    self._signal_wait = []
167
    self.scheduler = AsyncoreScheduler(time.time)
168

    
169
  @utils.SignalHandled([signal.SIGCHLD])
170
  @utils.SignalHandled([signal.SIGTERM])
171
  def Run(self, signal_handlers=None):
172
    """Runs the mainloop.
173

174
    @type signal_handlers: dict
175
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
176

177
    """
178
    assert isinstance(signal_handlers, dict) and \
179
           len(signal_handlers) > 0, \
180
           "Broken SignalHandled decorator"
181
    running = True
182
    # Start actual main loop
183
    while running:
184
      if not self.scheduler.empty():
185
        try:
186
          self.scheduler.run()
187
        except SchedulerBreakout:
188
          pass
189
      else:
190
        asyncore.loop(count=1, use_poll=True)
191

    
192
      # Check whether a signal was raised
193
      for sig in signal_handlers:
194
        handler = signal_handlers[sig]
195
        if handler.called:
196
          self._CallSignalWaiters(sig)
197
          running = (sig != signal.SIGTERM)
198
          handler.Clear()
199

    
200
  def _CallSignalWaiters(self, signum):
201
    """Calls all signal waiters for a certain signal.
202

203
    @type signum: int
204
    @param signum: Signal number
205

206
    """
207
    for owner in self._signal_wait:
208
      owner.OnSignal(signum)
209

    
210
  def RegisterSignal(self, owner):
211
    """Registers a receiver for signal notifications
212

213
    The receiver must support a "OnSignal(self, signum)" function.
214

215
    @type owner: instance
216
    @param owner: Receiver
217

218
    """
219
    self._signal_wait.append(owner)
220

    
221

    
222
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
223
                console_logging=False):
224
  """Shared main function for daemons.
225

226
  @type daemon_name: string
227
  @param daemon_name: daemon name
228
  @type optionparser: optparse.OptionParser
229
  @param optionparser: initialized optionparser with daemon-specific options
230
                       (common -f -d options will be handled by this module)
231
  @type dirs: list of (string, integer)
232
  @param dirs: list of directories that must be created if they don't exist,
233
               and the permissions to be used to create them
234
  @type check_fn: function which accepts (options, args)
235
  @param check_fn: function that checks start conditions and exits if they're
236
                   not met
237
  @type exec_fn: function which accepts (options, args)
238
  @param exec_fn: function that's executed with the daemon's pid file held, and
239
                  runs the daemon itself.
240
  @type console_logging: boolean
241
  @param console_logging: if True, the daemon will fall back to the system
242
                          console if logging fails
243

244
  """
245
  optionparser.add_option("-f", "--foreground", dest="fork",
246
                          help="Don't detach from the current terminal",
247
                          default=True, action="store_false")
248
  optionparser.add_option("-d", "--debug", dest="debug",
249
                          help="Enable some debug messages",
250
                          default=False, action="store_true")
251
  optionparser.add_option("--syslog", dest="syslog",
252
                          help="Enable logging to syslog (except debug"
253
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
254
                          constants.SYSLOG_USAGE,
255
                          default=constants.SYSLOG_USAGE,
256
                          choices=["no", "yes", "only"])
257
  if daemon_name in constants.DAEMONS_PORTS:
258
    # for networked daemons we also allow choosing the bind port and address.
259
    # by default we use the port provided by utils.GetDaemonPort, and bind to
260
    # 0.0.0.0 (which is represented by and empty bind address.
261
    port = utils.GetDaemonPort(daemon_name)
262
    optionparser.add_option("-p", "--port", dest="port",
263
                            help="Network port (%s default)." % port,
264
                            default=port, type="int")
265
    optionparser.add_option("-b", "--bind", dest="bind_address",
266
                            help="Bind address",
267
                            default="", metavar="ADDRESS")
268

    
269
  if daemon_name in constants.DAEMONS_SSL:
270
    default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
271
    optionparser.add_option("--no-ssl", dest="ssl",
272
                            help="Do not secure HTTP protocol with SSL",
273
                            default=True, action="store_false")
274
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
275
                            help="SSL key",
276
                            default=default_key, type="string")
277
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
278
                            help="SSL certificate",
279
                            default=default_cert, type="string")
280

    
281
  multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
282

    
283
  options, args = optionparser.parse_args()
284

    
285
  if hasattr(options, 'ssl') and options.ssl:
286
    if not (options.ssl_cert and options.ssl_key):
287
      print >> sys.stderr, "Need key and certificate to use ssl"
288
      sys.exit(constants.EXIT_FAILURE)
289
    for fname in (options.ssl_cert, options.ssl_key):
290
      if not os.path.isfile(fname):
291
        print >> sys.stderr, "Need ssl file %s to run" % fname
292
        sys.exit(constants.EXIT_FAILURE)
293

    
294
  if check_fn is not None:
295
    check_fn(options, args)
296

    
297
  utils.EnsureDirs(dirs)
298

    
299
  if options.fork:
300
    utils.CloseFDs()
301
    utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
302

    
303
  utils.WritePidFile(daemon_name)
304
  try:
305
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
306
                       debug=options.debug,
307
                       stderr_logging=not options.fork,
308
                       multithreaded=multithread,
309
                       program=daemon_name,
310
                       syslog=options.syslog,
311
                       console_logging=console_logging)
312
    logging.info("%s daemon startup", daemon_name)
313
    exec_fn(options, args)
314
  finally:
315
    utils.RemovePidFile(daemon_name)