Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ b66ab629

History | View | Annotate | Download (16.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import asynchat
27
import os
28
import signal
29
import logging
30
import sched
31
import time
32
import socket
33
import select
34
import sys
35

    
36
from ganeti import utils
37
from ganeti import constants
38
from ganeti import errors
39

    
40

    
41
class SchedulerBreakout(Exception):
42
  """Exception used to get out of the scheduler loop
43

44
  """
45

    
46

    
47
def AsyncoreDelayFunction(timeout):
48
  """Asyncore-compatible scheduler delay function.
49

50
  This is a delay function for sched that, rather than actually sleeping,
51
  executes asyncore events happening in the meantime.
52

53
  After an event has occurred, rather than returning, it raises a
54
  SchedulerBreakout exception, which will force the current scheduler.run()
55
  invocation to terminate, so that we can also check for signals. The main loop
56
  will then call the scheduler run again, which will allow it to actually
57
  process any due events.
58

59
  This is needed because scheduler.run() doesn't support a count=..., as
60
  asyncore loop, and the scheduler module documents throwing exceptions from
61
  inside the delay function as an allowed usage model.
62

63
  """
64
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
65
  raise SchedulerBreakout()
66

    
67

    
68
class AsyncoreScheduler(sched.scheduler):
69
  """Event scheduler integrated with asyncore
70

71
  """
72
  def __init__(self, timefunc):
73
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
74

    
75

    
76
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
77
  """Base Ganeti Asyncore Dispacher
78

79
  """
80
  # this method is overriding an asyncore.dispatcher method
81
  def handle_error(self):
82
    """Log an error in handling any request, and proceed.
83

84
    """
85
    logging.exception("Error while handling asyncore request")
86

    
87
  # this method is overriding an asyncore.dispatcher method
88
  def writable(self):
89
    """Most of the time we don't want to check for writability.
90

91
    """
92
    return False
93

    
94

    
95
def FormatAddress(family, address):
96
  """Format a client's address
97

98
  @type family: integer
99
  @param family: socket family (one of socket.AF_*)
100
  @type address: family specific (usually tuple)
101
  @param address: address, as reported by this class
102

103
  """
104
  if family == socket.AF_INET and len(address) == 2:
105
    return "%s:%d" % address
106
  elif family == socket.AF_UNIX and len(address) == 3:
107
    return "pid=%s, uid=%s, gid=%s" % address
108
  else:
109
    return str(address)
110

    
111

    
112
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
113
  """A stream server to use with asyncore.
114

115
  Each request is accepted, and then dispatched to a separate asyncore
116
  dispatcher to handle.
117

118
  """
119

    
120
  _REQUEST_QUEUE_SIZE = 5
121

    
122
  def __init__(self, family, address):
123
    """Constructor for AsyncUnixStreamSocket
124

125
    @type family: integer
126
    @param family: socket family (one of socket.AF_*)
127
    @type address: address family dependent
128
    @param address: address to bind the socket to
129

130
    """
131
    GanetiBaseAsyncoreDispatcher.__init__(self)
132
    self.family = family
133
    self.create_socket(self.family, socket.SOCK_STREAM)
134
    self.set_reuse_addr()
135
    self.bind(address)
136
    self.listen(self._REQUEST_QUEUE_SIZE)
137

    
138
  # this method is overriding an asyncore.dispatcher method
139
  def handle_accept(self):
140
    """Accept a new client connection.
141

142
    Creates a new instance of the handler class, which will use asyncore to
143
    serve the client.
144

145
    """
146
    accept_result = utils.IgnoreSignals(self.accept)
147
    if accept_result is not None:
148
      connected_socket, client_address = accept_result
149
      if self.family == socket.AF_UNIX:
150
        # override the client address, as for unix sockets nothing meaningful
151
        # is passed in from accept anyway
152
        client_address = utils.GetSocketCredentials(connected_socket)
153
      logging.info("Accepted connection from %s",
154
                   FormatAddress(self.family, client_address))
155
      self.handle_connection(connected_socket, client_address)
156

    
157
  def handle_connection(self, connected_socket, client_address):
158
    """Handle an already accepted connection.
159

160
    """
161
    raise NotImplementedError
162

    
163

    
164
class AsyncTerminatedMessageStream(asynchat.async_chat):
165
  """A terminator separated message stream asyncore module.
166

167
  Handles a stream connection receiving messages terminated by a defined
168
  separator. For each complete message handle_message is called.
169

170
  """
171
  def __init__(self, connected_socket, peer_address, terminator, family):
172
    """AsyncTerminatedMessageStream constructor.
173

174
    @type connected_socket: socket.socket
175
    @param connected_socket: connected stream socket to receive messages from
176
    @param peer_address: family-specific peer address
177
    @type terminator: string
178
    @param terminator: terminator separating messages in the stream
179
    @type family: integer
180
    @param family: socket family
181

182
    """
183
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
184
    # using a positional argument rather than a keyword one.
185
    asynchat.async_chat.__init__(self, connected_socket)
186
    self.connected_socket = connected_socket
187
    # on python 2.4 there is no "family" attribute for the socket class
188
    # FIXME: when we move to python 2.5 or above remove the family parameter
189
    #self.family = self.connected_socket.family
190
    self.family = family
191
    self.peer_address = peer_address
192
    self.terminator = terminator
193
    self.set_terminator(terminator)
194
    self.ibuffer = []
195
    self.next_incoming_message = 0
196

    
197
  # this method is overriding an asynchat.async_chat method
198
  def collect_incoming_data(self, data):
199
    self.ibuffer.append(data)
200

    
201
  # this method is overriding an asynchat.async_chat method
202
  def found_terminator(self):
203
    message = "".join(self.ibuffer)
204
    self.ibuffer = []
205
    message_id = self.next_incoming_message
206
    self.next_incoming_message += 1
207
    self.handle_message(message, message_id)
208

    
209
  def handle_message(self, message, message_id):
210
    """Handle a terminated message.
211

212
    @type message: string
213
    @param message: message to handle
214
    @type message_id: integer
215
    @param message_id: stream's message sequence number
216

217
    """
218
    pass
219
    # TODO: move this method to raise NotImplementedError
220
    # raise NotImplementedError
221

    
222
  def close_log(self):
223
    logging.info("Closing connection from %s",
224
                 FormatAddress(self.family, self.peer_address))
225
    self.close()
226

    
227
  # this method is overriding an asyncore.dispatcher method
228
  def handle_expt(self):
229
    self.close_log()
230

    
231
  # this method is overriding an asyncore.dispatcher method
232
  def handle_error(self):
233
    """Log an error in handling any request, and proceed.
234

235
    """
236
    logging.exception("Error while handling asyncore request")
237
    self.close_log()
238

    
239

    
240
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
241
  """An improved asyncore udp socket.
242

243
  """
244
  def __init__(self):
245
    """Constructor for AsyncUDPSocket
246

247
    """
248
    GanetiBaseAsyncoreDispatcher.__init__(self)
249
    self._out_queue = []
250
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
251

    
252
  # this method is overriding an asyncore.dispatcher method
253
  def handle_connect(self):
254
    # Python thinks that the first udp message from a source qualifies as a
255
    # "connect" and further ones are part of the same connection. We beg to
256
    # differ and treat all messages equally.
257
    pass
258

    
259
  # this method is overriding an asyncore.dispatcher method
260
  def handle_read(self):
261
    recv_result = utils.IgnoreSignals(self.recvfrom,
262
                                      constants.MAX_UDP_DATA_SIZE)
263
    if recv_result is not None:
264
      payload, address = recv_result
265
      ip, port = address
266
      self.handle_datagram(payload, ip, port)
267

    
268
  def handle_datagram(self, payload, ip, port):
269
    """Handle an already read udp datagram
270

271
    """
272
    raise NotImplementedError
273

    
274
  # this method is overriding an asyncore.dispatcher method
275
  def writable(self):
276
    # We should check whether we can write to the socket only if we have
277
    # something scheduled to be written
278
    return bool(self._out_queue)
279

    
280
  # this method is overriding an asyncore.dispatcher method
281
  def handle_write(self):
282
    if not self._out_queue:
283
      logging.error("handle_write called with empty output queue")
284
      return
285
    (ip, port, payload) = self._out_queue[0]
286
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
287
    self._out_queue.pop(0)
288

    
289
  def enqueue_send(self, ip, port, payload):
290
    """Enqueue a datagram to be sent when possible
291

292
    """
293
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
294
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
295
                                    constants.MAX_UDP_DATA_SIZE))
296
    self._out_queue.append((ip, port, payload))
297

    
298
  def process_next_packet(self, timeout=0):
299
    """Process the next datagram, waiting for it if necessary.
300

301
    @type timeout: float
302
    @param timeout: how long to wait for data
303
    @rtype: boolean
304
    @return: True if some data has been handled, False otherwise
305

306
    """
307
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
308
    if result is not None and result & select.POLLIN:
309
      self.handle_read()
310
      return True
311
    else:
312
      return False
313

    
314

    
315
class Mainloop(object):
316
  """Generic mainloop for daemons
317

318
  @ivar scheduler: A sched.scheduler object, which can be used to register
319
    timed events
320

321
  """
322
  def __init__(self):
323
    """Constructs a new Mainloop instance.
324

325
    """
326
    self._signal_wait = []
327
    self.scheduler = AsyncoreScheduler(time.time)
328

    
329
  @utils.SignalHandled([signal.SIGCHLD])
330
  @utils.SignalHandled([signal.SIGTERM])
331
  @utils.SignalHandled([signal.SIGINT])
332
  def Run(self, signal_handlers=None):
333
    """Runs the mainloop.
334

335
    @type signal_handlers: dict
336
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
337

338
    """
339
    assert isinstance(signal_handlers, dict) and \
340
           len(signal_handlers) > 0, \
341
           "Broken SignalHandled decorator"
342
    running = True
343
    # Start actual main loop
344
    while running:
345
      if not self.scheduler.empty():
346
        try:
347
          self.scheduler.run()
348
        except SchedulerBreakout:
349
          pass
350
      else:
351
        asyncore.loop(count=1, use_poll=True)
352

    
353
      # Check whether a signal was raised
354
      for sig in signal_handlers:
355
        handler = signal_handlers[sig]
356
        if handler.called:
357
          self._CallSignalWaiters(sig)
358
          running = sig not in (signal.SIGTERM, signal.SIGINT)
359
          handler.Clear()
360

    
361
  def _CallSignalWaiters(self, signum):
362
    """Calls all signal waiters for a certain signal.
363

364
    @type signum: int
365
    @param signum: Signal number
366

367
    """
368
    for owner in self._signal_wait:
369
      owner.OnSignal(signum)
370

    
371
  def RegisterSignal(self, owner):
372
    """Registers a receiver for signal notifications
373

374
    The receiver must support a "OnSignal(self, signum)" function.
375

376
    @type owner: instance
377
    @param owner: Receiver
378

379
    """
380
    self._signal_wait.append(owner)
381

    
382

    
383
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
384
                multithreaded=False, console_logging=False,
385
                default_ssl_cert=None, default_ssl_key=None):
386
  """Shared main function for daemons.
387

388
  @type daemon_name: string
389
  @param daemon_name: daemon name
390
  @type optionparser: optparse.OptionParser
391
  @param optionparser: initialized optionparser with daemon-specific options
392
                       (common -f -d options will be handled by this module)
393
  @type dirs: list of (string, integer)
394
  @param dirs: list of directories that must be created if they don't exist,
395
               and the permissions to be used to create them
396
  @type check_fn: function which accepts (options, args)
397
  @param check_fn: function that checks start conditions and exits if they're
398
                   not met
399
  @type exec_fn: function which accepts (options, args)
400
  @param exec_fn: function that's executed with the daemon's pid file held, and
401
                  runs the daemon itself.
402
  @type multithreaded: bool
403
  @param multithreaded: Whether the daemon uses threads
404
  @type console_logging: boolean
405
  @param console_logging: if True, the daemon will fall back to the system
406
                          console if logging fails
407
  @type default_ssl_cert: string
408
  @param default_ssl_cert: Default SSL certificate path
409
  @type default_ssl_key: string
410
  @param default_ssl_key: Default SSL key path
411

412
  """
413
  optionparser.add_option("-f", "--foreground", dest="fork",
414
                          help="Don't detach from the current terminal",
415
                          default=True, action="store_false")
416
  optionparser.add_option("-d", "--debug", dest="debug",
417
                          help="Enable some debug messages",
418
                          default=False, action="store_true")
419
  optionparser.add_option("--syslog", dest="syslog",
420
                          help="Enable logging to syslog (except debug"
421
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
422
                          constants.SYSLOG_USAGE,
423
                          default=constants.SYSLOG_USAGE,
424
                          choices=["no", "yes", "only"])
425

    
426
  if daemon_name in constants.DAEMONS_PORTS:
427
    default_bind_address = "0.0.0.0"
428
    default_port = utils.GetDaemonPort(daemon_name)
429

    
430
    # For networked daemons we allow choosing the port and bind address
431
    optionparser.add_option("-p", "--port", dest="port",
432
                            help="Network port (default: %s)" % default_port,
433
                            default=default_port, type="int")
434
    optionparser.add_option("-b", "--bind", dest="bind_address",
435
                            help=("Bind address (default: %s)" %
436
                                  default_bind_address),
437
                            default=default_bind_address, metavar="ADDRESS")
438

    
439
  if default_ssl_key is not None and default_ssl_cert is not None:
440
    optionparser.add_option("--no-ssl", dest="ssl",
441
                            help="Do not secure HTTP protocol with SSL",
442
                            default=True, action="store_false")
443
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
444
                            help=("SSL key path (default: %s)" %
445
                                  default_ssl_key),
446
                            default=default_ssl_key, type="string",
447
                            metavar="SSL_KEY_PATH")
448
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
449
                            help=("SSL certificate path (default: %s)" %
450
                                  default_ssl_cert),
451
                            default=default_ssl_cert, type="string",
452
                            metavar="SSL_CERT_PATH")
453

    
454
  # Disable the use of fork(2) if the daemon uses threads
455
  utils.no_fork = multithreaded
456

    
457
  options, args = optionparser.parse_args()
458

    
459
  if getattr(options, "ssl", False):
460
    ssl_paths = {
461
      "certificate": options.ssl_cert,
462
      "key": options.ssl_key,
463
      }
464

    
465
    for name, path in ssl_paths.iteritems():
466
      if not os.path.isfile(path):
467
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
468
        sys.exit(constants.EXIT_FAILURE)
469

    
470
    # TODO: By initiating http.HttpSslParams here we would only read the files
471
    # once and have a proper validation (isfile returns False on directories)
472
    # at the same time.
473

    
474
  if check_fn is not None:
475
    check_fn(options, args)
476

    
477
  utils.EnsureDirs(dirs)
478

    
479
  if options.fork:
480
    utils.CloseFDs()
481
    utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
482

    
483
  utils.WritePidFile(daemon_name)
484
  try:
485
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
486
                       debug=options.debug,
487
                       stderr_logging=not options.fork,
488
                       multithreaded=multithreaded,
489
                       program=daemon_name,
490
                       syslog=options.syslog,
491
                       console_logging=console_logging)
492
    logging.info("%s daemon startup", daemon_name)
493
    exec_fn(options, args)
494
  finally:
495
    utils.RemovePidFile(daemon_name)