Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ b11780bb

History | View | Annotate | Download (10.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import os
27
import signal
28
import logging
29
import sched
30
import time
31
import socket
32
import select
33
import sys
34

    
35
from ganeti import utils
36
from ganeti import constants
37
from ganeti import errors
38

    
39

    
40
class SchedulerBreakout(Exception):
41
  """Exception used to get out of the scheduler loop
42

43
  """
44

    
45

    
46
def AsyncoreDelayFunction(timeout):
47
  """Asyncore-compatible scheduler delay function.
48

49
  This is a delay function for sched that, rather than actually sleeping,
50
  executes asyncore events happening in the meantime.
51

52
  After an event has occurred, rather than returning, it raises a
53
  SchedulerBreakout exception, which will force the current scheduler.run()
54
  invocation to terminate, so that we can also check for signals. The main loop
55
  will then call the scheduler run again, which will allow it to actually
56
  process any due events.
57

58
  This is needed because scheduler.run() doesn't support a count=..., as
59
  asyncore loop, and the scheduler module documents throwing exceptions from
60
  inside the delay function as an allowed usage model.
61

62
  """
63
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
64
  raise SchedulerBreakout()
65

    
66

    
67
class AsyncoreScheduler(sched.scheduler):
68
  """Event scheduler integrated with asyncore
69

70
  """
71
  def __init__(self, timefunc):
72
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
73

    
74

    
75
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
76
  """Base Ganeti Asyncore Dispacher
77

78
  """
79
  # this method is overriding an asyncore.dispatcher method
80
  def handle_error(self):
81
    """Log an error in handling any request, and proceed.
82

83
    """
84
    logging.exception("Error while handling asyncore request")
85

    
86
  # this method is overriding an asyncore.dispatcher method
87
  def writable(self):
88
    """Most of the time we don't want to check for writability.
89

90
    """
91
    return False
92

    
93

    
94
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
95
  """An improved asyncore udp socket.
96

97
  """
98
  def __init__(self):
99
    """Constructor for AsyncUDPSocket
100

101
    """
102
    GanetiBaseAsyncoreDispatcher.__init__(self)
103
    self._out_queue = []
104
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
105

    
106
  # this method is overriding an asyncore.dispatcher method
107
  def handle_connect(self):
108
    # Python thinks that the first udp message from a source qualifies as a
109
    # "connect" and further ones are part of the same connection. We beg to
110
    # differ and treat all messages equally.
111
    pass
112

    
113
  # this method is overriding an asyncore.dispatcher method
114
  def handle_read(self):
115
    payload, address = utils.IgnoreSignals(self.recvfrom,
116
                                           constants.MAX_UDP_DATA_SIZE)
117
    ip, port = address
118
    self.handle_datagram(payload, ip, port)
119

    
120
  def handle_datagram(self, payload, ip, port):
121
    """Handle an already read udp datagram
122

123
    """
124
    raise NotImplementedError
125

    
126
  # this method is overriding an asyncore.dispatcher method
127
  def writable(self):
128
    # We should check whether we can write to the socket only if we have
129
    # something scheduled to be written
130
    return bool(self._out_queue)
131

    
132
  # this method is overriding an asyncore.dispatcher method
133
  def handle_write(self):
134
    if not self._out_queue:
135
      logging.error("handle_write called with empty output queue")
136
      return
137
    (ip, port, payload) = self._out_queue[0]
138
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
139
    self._out_queue.pop(0)
140

    
141
  def enqueue_send(self, ip, port, payload):
142
    """Enqueue a datagram to be sent when possible
143

144
    """
145
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
146
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
147
                                    constants.MAX_UDP_DATA_SIZE))
148
    self._out_queue.append((ip, port, payload))
149

    
150
  def process_next_packet(self, timeout=0):
151
    """Process the next datagram, waiting for it if necessary.
152

153
    @type timeout: float
154
    @param timeout: how long to wait for data
155
    @rtype: boolean
156
    @return: True if some data has been handled, False otherwise
157

158
    """
159
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
160
    if result is not None and result & select.POLLIN:
161
      self.handle_read()
162
      return True
163
    else:
164
      return False
165

    
166

    
167
class Mainloop(object):
168
  """Generic mainloop for daemons
169

170
  @ivar scheduler: A sched.scheduler object, which can be used to register
171
    timed events
172

173
  """
174
  def __init__(self):
175
    """Constructs a new Mainloop instance.
176

177
    """
178
    self._signal_wait = []
179
    self.scheduler = AsyncoreScheduler(time.time)
180

    
181
  @utils.SignalHandled([signal.SIGCHLD])
182
  @utils.SignalHandled([signal.SIGTERM])
183
  @utils.SignalHandled([signal.SIGINT])
184
  def Run(self, signal_handlers=None):
185
    """Runs the mainloop.
186

187
    @type signal_handlers: dict
188
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
189

190
    """
191
    assert isinstance(signal_handlers, dict) and \
192
           len(signal_handlers) > 0, \
193
           "Broken SignalHandled decorator"
194
    running = True
195
    # Start actual main loop
196
    while running:
197
      if not self.scheduler.empty():
198
        try:
199
          self.scheduler.run()
200
        except SchedulerBreakout:
201
          pass
202
      else:
203
        asyncore.loop(count=1, use_poll=True)
204

    
205
      # Check whether a signal was raised
206
      for sig in signal_handlers:
207
        handler = signal_handlers[sig]
208
        if handler.called:
209
          self._CallSignalWaiters(sig)
210
          running = sig not in (signal.SIGTERM, signal.SIGINT)
211
          handler.Clear()
212

    
213
  def _CallSignalWaiters(self, signum):
214
    """Calls all signal waiters for a certain signal.
215

216
    @type signum: int
217
    @param signum: Signal number
218

219
    """
220
    for owner in self._signal_wait:
221
      owner.OnSignal(signum)
222

    
223
  def RegisterSignal(self, owner):
224
    """Registers a receiver for signal notifications
225

226
    The receiver must support a "OnSignal(self, signum)" function.
227

228
    @type owner: instance
229
    @param owner: Receiver
230

231
    """
232
    self._signal_wait.append(owner)
233

    
234

    
235
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
236
                console_logging=False):
237
  """Shared main function for daemons.
238

239
  @type daemon_name: string
240
  @param daemon_name: daemon name
241
  @type optionparser: optparse.OptionParser
242
  @param optionparser: initialized optionparser with daemon-specific options
243
                       (common -f -d options will be handled by this module)
244
  @type dirs: list of (string, integer)
245
  @param dirs: list of directories that must be created if they don't exist,
246
               and the permissions to be used to create them
247
  @type check_fn: function which accepts (options, args)
248
  @param check_fn: function that checks start conditions and exits if they're
249
                   not met
250
  @type exec_fn: function which accepts (options, args)
251
  @param exec_fn: function that's executed with the daemon's pid file held, and
252
                  runs the daemon itself.
253
  @type console_logging: boolean
254
  @param console_logging: if True, the daemon will fall back to the system
255
                          console if logging fails
256

257
  """
258
  optionparser.add_option("-f", "--foreground", dest="fork",
259
                          help="Don't detach from the current terminal",
260
                          default=True, action="store_false")
261
  optionparser.add_option("-d", "--debug", dest="debug",
262
                          help="Enable some debug messages",
263
                          default=False, action="store_true")
264
  optionparser.add_option("--syslog", dest="syslog",
265
                          help="Enable logging to syslog (except debug"
266
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
267
                          constants.SYSLOG_USAGE,
268
                          default=constants.SYSLOG_USAGE,
269
                          choices=["no", "yes", "only"])
270
  if daemon_name in constants.DAEMONS_PORTS:
271
    # for networked daemons we also allow choosing the bind port and address.
272
    # by default we use the port provided by utils.GetDaemonPort, and bind to
273
    # 0.0.0.0 (which is represented by and empty bind address.
274
    port = utils.GetDaemonPort(daemon_name)
275
    optionparser.add_option("-p", "--port", dest="port",
276
                            help="Network port (%s default)." % port,
277
                            default=port, type="int")
278
    optionparser.add_option("-b", "--bind", dest="bind_address",
279
                            help="Bind address",
280
                            default="", metavar="ADDRESS")
281

    
282
  if daemon_name in constants.DAEMONS_SSL:
283
    default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
284
    optionparser.add_option("--no-ssl", dest="ssl",
285
                            help="Do not secure HTTP protocol with SSL",
286
                            default=True, action="store_false")
287
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
288
                            help="SSL key",
289
                            default=default_key, type="string")
290
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
291
                            help="SSL certificate",
292
                            default=default_cert, type="string")
293

    
294
  multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
295

    
296
  options, args = optionparser.parse_args()
297

    
298
  if hasattr(options, 'ssl') and options.ssl:
299
    if not (options.ssl_cert and options.ssl_key):
300
      print >> sys.stderr, "Need key and certificate to use ssl"
301
      sys.exit(constants.EXIT_FAILURE)
302
    for fname in (options.ssl_cert, options.ssl_key):
303
      if not os.path.isfile(fname):
304
        print >> sys.stderr, "Need ssl file %s to run" % fname
305
        sys.exit(constants.EXIT_FAILURE)
306

    
307
  if check_fn is not None:
308
    check_fn(options, args)
309

    
310
  utils.EnsureDirs(dirs)
311

    
312
  if options.fork:
313
    utils.CloseFDs()
314
    utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
315

    
316
  utils.WritePidFile(daemon_name)
317
  try:
318
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
319
                       debug=options.debug,
320
                       stderr_logging=not options.fork,
321
                       multithreaded=multithread,
322
                       program=daemon_name,
323
                       syslog=options.syslog,
324
                       console_logging=console_logging)
325
    logging.info("%s daemon startup", daemon_name)
326
    exec_fn(options, args)
327
  finally:
328
    utils.RemovePidFile(daemon_name)