Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 37e62cb9

History | View | Annotate | Download (21 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import asynchat
27
import collections
28
import grp
29
import os
30
import pwd
31
import signal
32
import logging
33
import sched
34
import time
35
import socket
36
import select
37
import sys
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import errors
42

    
43

    
44
_DEFAULT_RUN_USER = "root"
45
_DEFAULT_RUN_GROUP = "root"
46

    
47

    
48
class SchedulerBreakout(Exception):
49
  """Exception used to get out of the scheduler loop
50

51
  """
52

    
53

    
54
def AsyncoreDelayFunction(timeout):
55
  """Asyncore-compatible scheduler delay function.
56

57
  This is a delay function for sched that, rather than actually sleeping,
58
  executes asyncore events happening in the meantime.
59

60
  After an event has occurred, rather than returning, it raises a
61
  SchedulerBreakout exception, which will force the current scheduler.run()
62
  invocation to terminate, so that we can also check for signals. The main loop
63
  will then call the scheduler run again, which will allow it to actually
64
  process any due events.
65

66
  This is needed because scheduler.run() doesn't support a count=..., as
67
  asyncore loop, and the scheduler module documents throwing exceptions from
68
  inside the delay function as an allowed usage model.
69

70
  """
71
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
72
  raise SchedulerBreakout()
73

    
74

    
75
class AsyncoreScheduler(sched.scheduler):
76
  """Event scheduler integrated with asyncore
77

78
  """
79
  def __init__(self, timefunc):
80
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
81

    
82

    
83
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
84
  """Base Ganeti Asyncore Dispacher
85

86
  """
87
  # this method is overriding an asyncore.dispatcher method
88
  def handle_error(self):
89
    """Log an error in handling any request, and proceed.
90

91
    """
92
    logging.exception("Error while handling asyncore request")
93

    
94
  # this method is overriding an asyncore.dispatcher method
95
  def writable(self):
96
    """Most of the time we don't want to check for writability.
97

98
    """
99
    return False
100

    
101

    
102
def FormatAddress(family, address):
103
  """Format a client's address
104

105
  @type family: integer
106
  @param family: socket family (one of socket.AF_*)
107
  @type address: family specific (usually tuple)
108
  @param address: address, as reported by this class
109

110
  """
111
  if family == socket.AF_INET and len(address) == 2:
112
    return "%s:%d" % address
113
  elif family == socket.AF_UNIX and len(address) == 3:
114
    return "pid=%s, uid=%s, gid=%s" % address
115
  else:
116
    return str(address)
117

    
118

    
119
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
120
  """A stream server to use with asyncore.
121

122
  Each request is accepted, and then dispatched to a separate asyncore
123
  dispatcher to handle.
124

125
  """
126

    
127
  _REQUEST_QUEUE_SIZE = 5
128

    
129
  def __init__(self, family, address):
130
    """Constructor for AsyncUnixStreamSocket
131

132
    @type family: integer
133
    @param family: socket family (one of socket.AF_*)
134
    @type address: address family dependent
135
    @param address: address to bind the socket to
136

137
    """
138
    GanetiBaseAsyncoreDispatcher.__init__(self)
139
    self.family = family
140
    self.create_socket(self.family, socket.SOCK_STREAM)
141
    self.set_reuse_addr()
142
    self.bind(address)
143
    self.listen(self._REQUEST_QUEUE_SIZE)
144

    
145
  # this method is overriding an asyncore.dispatcher method
146
  def handle_accept(self):
147
    """Accept a new client connection.
148

149
    Creates a new instance of the handler class, which will use asyncore to
150
    serve the client.
151

152
    """
153
    accept_result = utils.IgnoreSignals(self.accept)
154
    if accept_result is not None:
155
      connected_socket, client_address = accept_result
156
      if self.family == socket.AF_UNIX:
157
        # override the client address, as for unix sockets nothing meaningful
158
        # is passed in from accept anyway
159
        client_address = utils.GetSocketCredentials(connected_socket)
160
      logging.info("Accepted connection from %s",
161
                   FormatAddress(self.family, client_address))
162
      self.handle_connection(connected_socket, client_address)
163

    
164
  def handle_connection(self, connected_socket, client_address):
165
    """Handle an already accepted connection.
166

167
    """
168
    raise NotImplementedError
169

    
170

    
171
class AsyncTerminatedMessageStream(asynchat.async_chat):
172
  """A terminator separated message stream asyncore module.
173

174
  Handles a stream connection receiving messages terminated by a defined
175
  separator. For each complete message handle_message is called.
176

177
  """
178
  def __init__(self, connected_socket, peer_address, terminator, family,
179
               unhandled_limit):
180
    """AsyncTerminatedMessageStream constructor.
181

182
    @type connected_socket: socket.socket
183
    @param connected_socket: connected stream socket to receive messages from
184
    @param peer_address: family-specific peer address
185
    @type terminator: string
186
    @param terminator: terminator separating messages in the stream
187
    @type family: integer
188
    @param family: socket family
189
    @type unhandled_limit: integer or None
190
    @param unhandled_limit: maximum unanswered messages
191

192
    """
193
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
194
    # using a positional argument rather than a keyword one.
195
    asynchat.async_chat.__init__(self, connected_socket)
196
    self.connected_socket = connected_socket
197
    # on python 2.4 there is no "family" attribute for the socket class
198
    # FIXME: when we move to python 2.5 or above remove the family parameter
199
    #self.family = self.connected_socket.family
200
    self.family = family
201
    self.peer_address = peer_address
202
    self.terminator = terminator
203
    self.unhandled_limit = unhandled_limit
204
    self.set_terminator(terminator)
205
    self.ibuffer = []
206
    self.receive_count = 0
207
    self.send_count = 0
208
    self.oqueue = collections.deque()
209
    self.iqueue = collections.deque()
210

    
211
  # this method is overriding an asynchat.async_chat method
212
  def collect_incoming_data(self, data):
213
    self.ibuffer.append(data)
214

    
215
  def _can_handle_message(self):
216
    return (self.unhandled_limit is None or
217
            (self.receive_count < self.send_count + self.unhandled_limit) and
218
             not self.iqueue)
219

    
220
  # this method is overriding an asynchat.async_chat method
221
  def found_terminator(self):
222
    message = "".join(self.ibuffer)
223
    self.ibuffer = []
224
    message_id = self.receive_count
225
    # We need to increase the receive_count after checking if the message can
226
    # be handled, but before calling handle_message
227
    can_handle = self._can_handle_message()
228
    self.receive_count += 1
229
    if can_handle:
230
      self.handle_message(message, message_id)
231
    else:
232
      self.iqueue.append((message, message_id))
233

    
234
  def handle_message(self, message, message_id):
235
    """Handle a terminated message.
236

237
    @type message: string
238
    @param message: message to handle
239
    @type message_id: integer
240
    @param message_id: stream's message sequence number
241

242
    """
243
    pass
244
    # TODO: move this method to raise NotImplementedError
245
    # raise NotImplementedError
246

    
247
  def send_message(self, message):
248
    """Send a message to the remote peer. This function is thread-safe.
249

250
    @type message: string
251
    @param message: message to send, without the terminator
252

253
    @warning: If calling this function from a thread different than the one
254
    performing the main asyncore loop, remember that you have to wake that one
255
    up.
256

257
    """
258
    # If we just append the message we received to the output queue, this
259
    # function can be safely called by multiple threads at the same time, and
260
    # we don't need locking, since deques are thread safe. handle_write in the
261
    # asyncore thread will handle the next input message if there are any
262
    # enqueued.
263
    self.oqueue.append(message)
264

    
265
  # this method is overriding an asyncore.dispatcher method
266
  def readable(self):
267
    # read from the socket if we can handle the next requests
268
    return self._can_handle_message() and asynchat.async_chat.readable(self)
269

    
270
  # this method is overriding an asyncore.dispatcher method
271
  def writable(self):
272
    # the output queue may become full just after we called writable. This only
273
    # works if we know we'll have something else waking us up from the select,
274
    # in such case, anyway.
275
    return asynchat.async_chat.writable(self) or self.oqueue
276

    
277
  # this method is overriding an asyncore.dispatcher method
278
  def handle_write(self):
279
    if self.oqueue:
280
      # if we have data in the output queue, then send_message was called.
281
      # this means we can process one more message from the input queue, if
282
      # there are any.
283
      data = self.oqueue.popleft()
284
      self.push(data + self.terminator)
285
      self.send_count += 1
286
      if self.iqueue:
287
        self.handle_message(*self.iqueue.popleft())
288
    self.initiate_send()
289

    
290
  def close_log(self):
291
    logging.info("Closing connection from %s",
292
                 FormatAddress(self.family, self.peer_address))
293
    self.close()
294

    
295
  # this method is overriding an asyncore.dispatcher method
296
  def handle_expt(self):
297
    self.close_log()
298

    
299
  # this method is overriding an asyncore.dispatcher method
300
  def handle_error(self):
301
    """Log an error in handling any request, and proceed.
302

303
    """
304
    logging.exception("Error while handling asyncore request")
305
    self.close_log()
306

    
307

    
308
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
309
  """An improved asyncore udp socket.
310

311
  """
312
  def __init__(self):
313
    """Constructor for AsyncUDPSocket
314

315
    """
316
    GanetiBaseAsyncoreDispatcher.__init__(self)
317
    self._out_queue = []
318
    self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
319

    
320
  # this method is overriding an asyncore.dispatcher method
321
  def handle_connect(self):
322
    # Python thinks that the first udp message from a source qualifies as a
323
    # "connect" and further ones are part of the same connection. We beg to
324
    # differ and treat all messages equally.
325
    pass
326

    
327
  # this method is overriding an asyncore.dispatcher method
328
  def handle_read(self):
329
    recv_result = utils.IgnoreSignals(self.recvfrom,
330
                                      constants.MAX_UDP_DATA_SIZE)
331
    if recv_result is not None:
332
      payload, address = recv_result
333
      ip, port = address
334
      self.handle_datagram(payload, ip, port)
335

    
336
  def handle_datagram(self, payload, ip, port):
337
    """Handle an already read udp datagram
338

339
    """
340
    raise NotImplementedError
341

    
342
  # this method is overriding an asyncore.dispatcher method
343
  def writable(self):
344
    # We should check whether we can write to the socket only if we have
345
    # something scheduled to be written
346
    return bool(self._out_queue)
347

    
348
  # this method is overriding an asyncore.dispatcher method
349
  def handle_write(self):
350
    if not self._out_queue:
351
      logging.error("handle_write called with empty output queue")
352
      return
353
    (ip, port, payload) = self._out_queue[0]
354
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
355
    self._out_queue.pop(0)
356

    
357
  def enqueue_send(self, ip, port, payload):
358
    """Enqueue a datagram to be sent when possible
359

360
    """
361
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
362
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
363
                                    constants.MAX_UDP_DATA_SIZE))
364
    self._out_queue.append((ip, port, payload))
365

    
366
  def process_next_packet(self, timeout=0):
367
    """Process the next datagram, waiting for it if necessary.
368

369
    @type timeout: float
370
    @param timeout: how long to wait for data
371
    @rtype: boolean
372
    @return: True if some data has been handled, False otherwise
373

374
    """
375
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
376
    if result is not None and result & select.POLLIN:
377
      self.handle_read()
378
      return True
379
    else:
380
      return False
381

    
382

    
383
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
384
  """A way to notify the asyncore loop that something is going on.
385

386
  If an asyncore daemon is multithreaded when a thread tries to push some data
387
  to a socket, the main loop handling asynchronous requests might be sleeping
388
  waiting on a select(). To avoid this it can create an instance of the
389
  AsyncAwaker, which other threads can use to wake it up.
390

391
  """
392
  def __init__(self, signal_fn=None):
393
    """Constructor for AsyncAwaker
394

395
    @type signal_fn: function
396
    @param signal_fn: function to call when awaken
397

398
    """
399
    GanetiBaseAsyncoreDispatcher.__init__(self)
400
    assert signal_fn == None or callable(signal_fn)
401
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
402
                                                          socket.SOCK_STREAM)
403
    self.in_socket.setblocking(0)
404
    self.in_socket.shutdown(socket.SHUT_WR)
405
    self.out_socket.shutdown(socket.SHUT_RD)
406
    self.set_socket(self.in_socket)
407
    self.need_signal = True
408
    self.signal_fn = signal_fn
409
    self.connected = True
410

    
411
  # this method is overriding an asyncore.dispatcher method
412
  def handle_read(self):
413
    utils.IgnoreSignals(self.recv, 4096)
414
    if self.signal_fn:
415
      self.signal_fn()
416
    self.need_signal = True
417

    
418
  # this method is overriding an asyncore.dispatcher method
419
  def close(self):
420
    asyncore.dispatcher.close(self)
421
    self.out_socket.close()
422

    
423
  def signal(self):
424
    """Signal the asyncore main loop.
425

426
    Any data we send here will be ignored, but it will cause the select() call
427
    to return.
428

429
    """
430
    # Yes, there is a race condition here. No, we don't care, at worst we're
431
    # sending more than one wakeup token, which doesn't harm at all.
432
    if self.need_signal:
433
      self.need_signal = False
434
      self.out_socket.send("\0")
435

    
436

    
437
class Mainloop(object):
438
  """Generic mainloop for daemons
439

440
  @ivar scheduler: A sched.scheduler object, which can be used to register
441
    timed events
442

443
  """
444
  def __init__(self):
445
    """Constructs a new Mainloop instance.
446

447
    """
448
    self._signal_wait = []
449
    self.scheduler = AsyncoreScheduler(time.time)
450

    
451
  @utils.SignalHandled([signal.SIGCHLD])
452
  @utils.SignalHandled([signal.SIGTERM])
453
  @utils.SignalHandled([signal.SIGINT])
454
  def Run(self, signal_handlers=None):
455
    """Runs the mainloop.
456

457
    @type signal_handlers: dict
458
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
459

460
    """
461
    assert isinstance(signal_handlers, dict) and \
462
           len(signal_handlers) > 0, \
463
           "Broken SignalHandled decorator"
464
    running = True
465
    # Start actual main loop
466
    while running:
467
      if not self.scheduler.empty():
468
        try:
469
          self.scheduler.run()
470
        except SchedulerBreakout:
471
          pass
472
      else:
473
        asyncore.loop(count=1, use_poll=True)
474

    
475
      # Check whether a signal was raised
476
      for sig in signal_handlers:
477
        handler = signal_handlers[sig]
478
        if handler.called:
479
          self._CallSignalWaiters(sig)
480
          running = sig not in (signal.SIGTERM, signal.SIGINT)
481
          handler.Clear()
482

    
483
  def _CallSignalWaiters(self, signum):
484
    """Calls all signal waiters for a certain signal.
485

486
    @type signum: int
487
    @param signum: Signal number
488

489
    """
490
    for owner in self._signal_wait:
491
      owner.OnSignal(signum)
492

    
493
  def RegisterSignal(self, owner):
494
    """Registers a receiver for signal notifications
495

496
    The receiver must support a "OnSignal(self, signum)" function.
497

498
    @type owner: instance
499
    @param owner: Receiver
500

501
    """
502
    self._signal_wait.append(owner)
503

    
504

    
505
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
506
                multithreaded=False, console_logging=False,
507
                default_ssl_cert=None, default_ssl_key=None,
508
                user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
509
  """Shared main function for daemons.
510

511
  @type daemon_name: string
512
  @param daemon_name: daemon name
513
  @type optionparser: optparse.OptionParser
514
  @param optionparser: initialized optionparser with daemon-specific options
515
                       (common -f -d options will be handled by this module)
516
  @type dirs: list of (string, integer)
517
  @param dirs: list of directories that must be created if they don't exist,
518
               and the permissions to be used to create them
519
  @type check_fn: function which accepts (options, args)
520
  @param check_fn: function that checks start conditions and exits if they're
521
                   not met
522
  @type exec_fn: function which accepts (options, args)
523
  @param exec_fn: function that's executed with the daemon's pid file held, and
524
                  runs the daemon itself.
525
  @type multithreaded: bool
526
  @param multithreaded: Whether the daemon uses threads
527
  @type console_logging: boolean
528
  @param console_logging: if True, the daemon will fall back to the system
529
                          console if logging fails
530
  @type default_ssl_cert: string
531
  @param default_ssl_cert: Default SSL certificate path
532
  @type default_ssl_key: string
533
  @param default_ssl_key: Default SSL key path
534
  @param user: Default user to run as
535
  @type user: string
536
  @param group: Default group to run as
537
  @type group: string
538

539
  """
540
  optionparser.add_option("-f", "--foreground", dest="fork",
541
                          help="Don't detach from the current terminal",
542
                          default=True, action="store_false")
543
  optionparser.add_option("-d", "--debug", dest="debug",
544
                          help="Enable some debug messages",
545
                          default=False, action="store_true")
546
  optionparser.add_option("--syslog", dest="syslog",
547
                          help="Enable logging to syslog (except debug"
548
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
549
                          constants.SYSLOG_USAGE,
550
                          default=constants.SYSLOG_USAGE,
551
                          choices=["no", "yes", "only"])
552

    
553
  if daemon_name in constants.DAEMONS_PORTS:
554
    default_bind_address = "0.0.0.0"
555
    default_port = utils.GetDaemonPort(daemon_name)
556

    
557
    # For networked daemons we allow choosing the port and bind address
558
    optionparser.add_option("-p", "--port", dest="port",
559
                            help="Network port (default: %s)" % default_port,
560
                            default=default_port, type="int")
561
    optionparser.add_option("-b", "--bind", dest="bind_address",
562
                            help=("Bind address (default: %s)" %
563
                                  default_bind_address),
564
                            default=default_bind_address, metavar="ADDRESS")
565

    
566
  if default_ssl_key is not None and default_ssl_cert is not None:
567
    optionparser.add_option("--no-ssl", dest="ssl",
568
                            help="Do not secure HTTP protocol with SSL",
569
                            default=True, action="store_false")
570
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
571
                            help=("SSL key path (default: %s)" %
572
                                  default_ssl_key),
573
                            default=default_ssl_key, type="string",
574
                            metavar="SSL_KEY_PATH")
575
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
576
                            help=("SSL certificate path (default: %s)" %
577
                                  default_ssl_cert),
578
                            default=default_ssl_cert, type="string",
579
                            metavar="SSL_CERT_PATH")
580

    
581
  # Disable the use of fork(2) if the daemon uses threads
582
  utils.no_fork = multithreaded
583

    
584
  options, args = optionparser.parse_args()
585

    
586
  if getattr(options, "ssl", False):
587
    ssl_paths = {
588
      "certificate": options.ssl_cert,
589
      "key": options.ssl_key,
590
      }
591

    
592
    for name, path in ssl_paths.iteritems():
593
      if not os.path.isfile(path):
594
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
595
        sys.exit(constants.EXIT_FAILURE)
596

    
597
    # TODO: By initiating http.HttpSslParams here we would only read the files
598
    # once and have a proper validation (isfile returns False on directories)
599
    # at the same time.
600

    
601
  if check_fn is not None:
602
    check_fn(options, args)
603

    
604
  utils.EnsureDirs(dirs)
605

    
606
  if options.fork:
607
    try:
608
      uid = pwd.getpwnam(user).pw_uid
609
      gid = grp.getgrnam(group).gr_gid
610
    except KeyError:
611
      raise errors.ConfigurationError("User or group not existing on system:"
612
                                      " %s:%s" % (user, group))
613
    utils.CloseFDs()
614
    utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
615

    
616
  utils.WritePidFile(daemon_name)
617
  try:
618
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
619
                       debug=options.debug,
620
                       stderr_logging=not options.fork,
621
                       multithreaded=multithreaded,
622
                       program=daemon_name,
623
                       syslog=options.syslog,
624
                       console_logging=console_logging)
625
    logging.info("%s daemon startup", daemon_name)
626
    exec_fn(options, args)
627
  finally:
628
    utils.RemovePidFile(daemon_name)