Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 743b53d4

History | View | Annotate | Download (18.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import asynchat
27
import grp
28
import os
29
import pwd
30
import signal
31
import logging
32
import sched
33
import time
34
import socket
35
import select
36
import sys
37

    
38
from ganeti import utils
39
from ganeti import constants
40
from ganeti import errors
41

    
42

    
43
_DEFAULT_RUN_USER = "root"
44
_DEFAULT_RUN_GROUP = "root"
45

    
46

    
47
class SchedulerBreakout(Exception):
48
  """Exception used to get out of the scheduler loop
49

50
  """
51

    
52

    
53
def AsyncoreDelayFunction(timeout):
54
  """Asyncore-compatible scheduler delay function.
55

56
  This is a delay function for sched that, rather than actually sleeping,
57
  executes asyncore events happening in the meantime.
58

59
  After an event has occurred, rather than returning, it raises a
60
  SchedulerBreakout exception, which will force the current scheduler.run()
61
  invocation to terminate, so that we can also check for signals. The main loop
62
  will then call the scheduler run again, which will allow it to actually
63
  process any due events.
64

65
  This is needed because scheduler.run() doesn't support a count=..., as
66
  asyncore loop, and the scheduler module documents throwing exceptions from
67
  inside the delay function as an allowed usage model.
68

69
  """
70
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
71
  raise SchedulerBreakout()
72

    
73

    
74
class AsyncoreScheduler(sched.scheduler):
75
  """Event scheduler integrated with asyncore
76

77
  """
78
  def __init__(self, timefunc):
79
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
80

    
81

    
82
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
83
  """Base Ganeti Asyncore Dispacher
84

85
  """
86
  # this method is overriding an asyncore.dispatcher method
87
  def handle_error(self):
88
    """Log an error in handling any request, and proceed.
89

90
    """
91
    logging.exception("Error while handling asyncore request")
92

    
93
  # this method is overriding an asyncore.dispatcher method
94
  def writable(self):
95
    """Most of the time we don't want to check for writability.
96

97
    """
98
    return False
99

    
100

    
101
def FormatAddress(family, address):
102
  """Format a client's address
103

104
  @type family: integer
105
  @param family: socket family (one of socket.AF_*)
106
  @type address: family specific (usually tuple)
107
  @param address: address, as reported by this class
108

109
  """
110
  if family == socket.AF_INET and len(address) == 2:
111
    return "%s:%d" % address
112
  elif family == socket.AF_UNIX and len(address) == 3:
113
    return "pid=%s, uid=%s, gid=%s" % address
114
  else:
115
    return str(address)
116

    
117

    
118
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
119
  """A stream server to use with asyncore.
120

121
  Each request is accepted, and then dispatched to a separate asyncore
122
  dispatcher to handle.
123

124
  """
125

    
126
  _REQUEST_QUEUE_SIZE = 5
127

    
128
  def __init__(self, family, address):
129
    """Constructor for AsyncUnixStreamSocket
130

131
    @type family: integer
132
    @param family: socket family (one of socket.AF_*)
133
    @type address: address family dependent
134
    @param address: address to bind the socket to
135

136
    """
137
    GanetiBaseAsyncoreDispatcher.__init__(self)
138
    self.family = family
139
    self.create_socket(self.family, socket.SOCK_STREAM)
140
    self.set_reuse_addr()
141
    self.bind(address)
142
    self.listen(self._REQUEST_QUEUE_SIZE)
143

    
144
  # this method is overriding an asyncore.dispatcher method
145
  def handle_accept(self):
146
    """Accept a new client connection.
147

148
    Creates a new instance of the handler class, which will use asyncore to
149
    serve the client.
150

151
    """
152
    accept_result = utils.IgnoreSignals(self.accept)
153
    if accept_result is not None:
154
      connected_socket, client_address = accept_result
155
      if self.family == socket.AF_UNIX:
156
        # override the client address, as for unix sockets nothing meaningful
157
        # is passed in from accept anyway
158
        client_address = utils.GetSocketCredentials(connected_socket)
159
      logging.info("Accepted connection from %s",
160
                   FormatAddress(self.family, client_address))
161
      self.handle_connection(connected_socket, client_address)
162

    
163
  def handle_connection(self, connected_socket, client_address):
164
    """Handle an already accepted connection.
165

166
    """
167
    raise NotImplementedError
168

    
169

    
170
class AsyncTerminatedMessageStream(asynchat.async_chat):
171
  """A terminator separated message stream asyncore module.
172

173
  Handles a stream connection receiving messages terminated by a defined
174
  separator. For each complete message handle_message is called.
175

176
  """
177
  def __init__(self, connected_socket, peer_address, terminator, family):
178
    """AsyncTerminatedMessageStream constructor.
179

180
    @type connected_socket: socket.socket
181
    @param connected_socket: connected stream socket to receive messages from
182
    @param peer_address: family-specific peer address
183
    @type terminator: string
184
    @param terminator: terminator separating messages in the stream
185
    @type family: integer
186
    @param family: socket family
187

188
    """
189
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
190
    # using a positional argument rather than a keyword one.
191
    asynchat.async_chat.__init__(self, connected_socket)
192
    self.connected_socket = connected_socket
193
    # on python 2.4 there is no "family" attribute for the socket class
194
    # FIXME: when we move to python 2.5 or above remove the family parameter
195
    #self.family = self.connected_socket.family
196
    self.family = family
197
    self.peer_address = peer_address
198
    self.terminator = terminator
199
    self.set_terminator(terminator)
200
    self.ibuffer = []
201
    self.next_incoming_message = 0
202

    
203
  # this method is overriding an asynchat.async_chat method
204
  def collect_incoming_data(self, data):
205
    self.ibuffer.append(data)
206

    
207
  # this method is overriding an asynchat.async_chat method
208
  def found_terminator(self):
209
    message = "".join(self.ibuffer)
210
    self.ibuffer = []
211
    message_id = self.next_incoming_message
212
    self.next_incoming_message += 1
213
    self.handle_message(message, message_id)
214

    
215
  def handle_message(self, message, message_id):
216
    """Handle a terminated message.
217

218
    @type message: string
219
    @param message: message to handle
220
    @type message_id: integer
221
    @param message_id: stream's message sequence number
222

223
    """
224
    pass
225
    # TODO: move this method to raise NotImplementedError
226
    # raise NotImplementedError
227

    
228
  def close_log(self):
229
    logging.info("Closing connection from %s",
230
                 FormatAddress(self.family, self.peer_address))
231
    self.close()
232

    
233
  # this method is overriding an asyncore.dispatcher method
234
  def handle_expt(self):
235
    self.close_log()
236

    
237
  # this method is overriding an asyncore.dispatcher method
238
  def handle_error(self):
239
    """Log an error in handling any request, and proceed.
240

241
    """
242
    logging.exception("Error while handling asyncore request")
243
    self.close_log()
244

    
245

    
246
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
247
  """An improved asyncore udp socket.
248

249
  """
250
  def __init__(self):
251
    """Constructor for AsyncUDPSocket
252

253
    """
254
    GanetiBaseAsyncoreDispatcher.__init__(self)
255
    self._out_queue = []
256
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
257

    
258
  # this method is overriding an asyncore.dispatcher method
259
  def handle_connect(self):
260
    # Python thinks that the first udp message from a source qualifies as a
261
    # "connect" and further ones are part of the same connection. We beg to
262
    # differ and treat all messages equally.
263
    pass
264

    
265
  # this method is overriding an asyncore.dispatcher method
266
  def handle_read(self):
267
    recv_result = utils.IgnoreSignals(self.recvfrom,
268
                                      constants.MAX_UDP_DATA_SIZE)
269
    if recv_result is not None:
270
      payload, address = recv_result
271
      ip, port = address
272
      self.handle_datagram(payload, ip, port)
273

    
274
  def handle_datagram(self, payload, ip, port):
275
    """Handle an already read udp datagram
276

277
    """
278
    raise NotImplementedError
279

    
280
  # this method is overriding an asyncore.dispatcher method
281
  def writable(self):
282
    # We should check whether we can write to the socket only if we have
283
    # something scheduled to be written
284
    return bool(self._out_queue)
285

    
286
  # this method is overriding an asyncore.dispatcher method
287
  def handle_write(self):
288
    if not self._out_queue:
289
      logging.error("handle_write called with empty output queue")
290
      return
291
    (ip, port, payload) = self._out_queue[0]
292
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
293
    self._out_queue.pop(0)
294

    
295
  def enqueue_send(self, ip, port, payload):
296
    """Enqueue a datagram to be sent when possible
297

298
    """
299
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
300
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
301
                                    constants.MAX_UDP_DATA_SIZE))
302
    self._out_queue.append((ip, port, payload))
303

    
304
  def process_next_packet(self, timeout=0):
305
    """Process the next datagram, waiting for it if necessary.
306

307
    @type timeout: float
308
    @param timeout: how long to wait for data
309
    @rtype: boolean
310
    @return: True if some data has been handled, False otherwise
311

312
    """
313
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
314
    if result is not None and result & select.POLLIN:
315
      self.handle_read()
316
      return True
317
    else:
318
      return False
319

    
320

    
321
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
322
  """A way to notify the asyncore loop that something is going on.
323

324
  If an asyncore daemon is multithreaded when a thread tries to push some data
325
  to a socket, the main loop handling asynchronous requests might be sleeping
326
  waiting on a select(). To avoid this it can create an instance of the
327
  AsyncAwaker, which other threads can use to wake it up.
328

329
  """
330
  def __init__(self, signal_fn=None):
331
    """Constructor for AsyncAwaker
332

333
    @type signal_fn: function
334
    @param signal_fn: function to call when awaken
335

336
    """
337
    GanetiBaseAsyncoreDispatcher.__init__(self)
338
    assert signal_fn == None or callable(signal_fn)
339
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
340
                                                          socket.SOCK_STREAM)
341
    self.in_socket.setblocking(0)
342
    self.set_socket(self.in_socket)
343
    self.need_signal = True
344
    self.signal_fn = signal_fn
345
    self.connected = True
346

    
347
  # this method is overriding an asyncore.dispatcher method
348
  def handle_read(self):
349
    utils.IgnoreSignals(self.recv, 4096)
350
    if self.signal_fn:
351
      self.signal_fn()
352
    self.need_signal = True
353

    
354
  # this method is overriding an asyncore.dispatcher method
355
  def close(self):
356
    asyncore.dispatcher.close(self)
357
    self.out_socket.close()
358

    
359
  def signal(self):
360
    """Signal the asyncore main loop.
361

362
    Any data we send here will be ignored, but it will cause the select() call
363
    to return.
364

365
    """
366
    # Yes, there is a race condition here. No, we don't care, at worst we're
367
    # sending more than one wakeup token, which doesn't harm at all.
368
    if self.need_signal:
369
      self.need_signal = False
370
      self.out_socket.send("\0")
371

    
372

    
373
class Mainloop(object):
374
  """Generic mainloop for daemons
375

376
  @ivar scheduler: A sched.scheduler object, which can be used to register
377
    timed events
378

379
  """
380
  def __init__(self):
381
    """Constructs a new Mainloop instance.
382

383
    """
384
    self._signal_wait = []
385
    self.scheduler = AsyncoreScheduler(time.time)
386

    
387
  @utils.SignalHandled([signal.SIGCHLD])
388
  @utils.SignalHandled([signal.SIGTERM])
389
  @utils.SignalHandled([signal.SIGINT])
390
  def Run(self, signal_handlers=None):
391
    """Runs the mainloop.
392

393
    @type signal_handlers: dict
394
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
395

396
    """
397
    assert isinstance(signal_handlers, dict) and \
398
           len(signal_handlers) > 0, \
399
           "Broken SignalHandled decorator"
400
    running = True
401
    # Start actual main loop
402
    while running:
403
      if not self.scheduler.empty():
404
        try:
405
          self.scheduler.run()
406
        except SchedulerBreakout:
407
          pass
408
      else:
409
        asyncore.loop(count=1, use_poll=True)
410

    
411
      # Check whether a signal was raised
412
      for sig in signal_handlers:
413
        handler = signal_handlers[sig]
414
        if handler.called:
415
          self._CallSignalWaiters(sig)
416
          running = sig not in (signal.SIGTERM, signal.SIGINT)
417
          handler.Clear()
418

    
419
  def _CallSignalWaiters(self, signum):
420
    """Calls all signal waiters for a certain signal.
421

422
    @type signum: int
423
    @param signum: Signal number
424

425
    """
426
    for owner in self._signal_wait:
427
      owner.OnSignal(signum)
428

    
429
  def RegisterSignal(self, owner):
430
    """Registers a receiver for signal notifications
431

432
    The receiver must support a "OnSignal(self, signum)" function.
433

434
    @type owner: instance
435
    @param owner: Receiver
436

437
    """
438
    self._signal_wait.append(owner)
439

    
440

    
441
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
442
                multithreaded=False, console_logging=False,
443
                default_ssl_cert=None, default_ssl_key=None,
444
                user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
445
  """Shared main function for daemons.
446

447
  @type daemon_name: string
448
  @param daemon_name: daemon name
449
  @type optionparser: optparse.OptionParser
450
  @param optionparser: initialized optionparser with daemon-specific options
451
                       (common -f -d options will be handled by this module)
452
  @type dirs: list of (string, integer)
453
  @param dirs: list of directories that must be created if they don't exist,
454
               and the permissions to be used to create them
455
  @type check_fn: function which accepts (options, args)
456
  @param check_fn: function that checks start conditions and exits if they're
457
                   not met
458
  @type exec_fn: function which accepts (options, args)
459
  @param exec_fn: function that's executed with the daemon's pid file held, and
460
                  runs the daemon itself.
461
  @type multithreaded: bool
462
  @param multithreaded: Whether the daemon uses threads
463
  @type console_logging: boolean
464
  @param console_logging: if True, the daemon will fall back to the system
465
                          console if logging fails
466
  @type default_ssl_cert: string
467
  @param default_ssl_cert: Default SSL certificate path
468
  @type default_ssl_key: string
469
  @param default_ssl_key: Default SSL key path
470
  @param user: Default user to run as
471
  @type user: string
472
  @param group: Default group to run as
473
  @type group: string
474

475
  """
476
  optionparser.add_option("-f", "--foreground", dest="fork",
477
                          help="Don't detach from the current terminal",
478
                          default=True, action="store_false")
479
  optionparser.add_option("-d", "--debug", dest="debug",
480
                          help="Enable some debug messages",
481
                          default=False, action="store_true")
482
  optionparser.add_option("--syslog", dest="syslog",
483
                          help="Enable logging to syslog (except debug"
484
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
485
                          constants.SYSLOG_USAGE,
486
                          default=constants.SYSLOG_USAGE,
487
                          choices=["no", "yes", "only"])
488

    
489
  if daemon_name in constants.DAEMONS_PORTS:
490
    default_bind_address = "0.0.0.0"
491
    default_port = utils.GetDaemonPort(daemon_name)
492

    
493
    # For networked daemons we allow choosing the port and bind address
494
    optionparser.add_option("-p", "--port", dest="port",
495
                            help="Network port (default: %s)" % default_port,
496
                            default=default_port, type="int")
497
    optionparser.add_option("-b", "--bind", dest="bind_address",
498
                            help=("Bind address (default: %s)" %
499
                                  default_bind_address),
500
                            default=default_bind_address, metavar="ADDRESS")
501

    
502
  if default_ssl_key is not None and default_ssl_cert is not None:
503
    optionparser.add_option("--no-ssl", dest="ssl",
504
                            help="Do not secure HTTP protocol with SSL",
505
                            default=True, action="store_false")
506
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
507
                            help=("SSL key path (default: %s)" %
508
                                  default_ssl_key),
509
                            default=default_ssl_key, type="string",
510
                            metavar="SSL_KEY_PATH")
511
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
512
                            help=("SSL certificate path (default: %s)" %
513
                                  default_ssl_cert),
514
                            default=default_ssl_cert, type="string",
515
                            metavar="SSL_CERT_PATH")
516

    
517
  # Disable the use of fork(2) if the daemon uses threads
518
  utils.no_fork = multithreaded
519

    
520
  options, args = optionparser.parse_args()
521

    
522
  if getattr(options, "ssl", False):
523
    ssl_paths = {
524
      "certificate": options.ssl_cert,
525
      "key": options.ssl_key,
526
      }
527

    
528
    for name, path in ssl_paths.iteritems():
529
      if not os.path.isfile(path):
530
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
531
        sys.exit(constants.EXIT_FAILURE)
532

    
533
    # TODO: By initiating http.HttpSslParams here we would only read the files
534
    # once and have a proper validation (isfile returns False on directories)
535
    # at the same time.
536

    
537
  if check_fn is not None:
538
    check_fn(options, args)
539

    
540
  utils.EnsureDirs(dirs)
541

    
542
  if options.fork:
543
    try:
544
      uid = pwd.getpwnam(user).pw_uid
545
      gid = grp.getgrnam(group).gr_gid
546
    except KeyError:
547
      raise errors.ConfigurationError("User or group not existing on system:"
548
                                      " %s:%s" % (user, group))
549
    utils.CloseFDs()
550
    utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
551

    
552
  utils.WritePidFile(daemon_name)
553
  try:
554
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
555
                       debug=options.debug,
556
                       stderr_logging=not options.fork,
557
                       multithreaded=multithreaded,
558
                       program=daemon_name,
559
                       syslog=options.syslog,
560
                       console_logging=console_logging)
561
    logging.info("%s daemon startup", daemon_name)
562
    exec_fn(options, args)
563
  finally:
564
    utils.RemovePidFile(daemon_name)