Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 7b4baeb1

History | View | Annotate | Download (23.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import asynchat
27
import collections
28
import os
29
import signal
30
import logging
31
import sched
32
import time
33
import socket
34
import select
35
import sys
36

    
37
from ganeti import utils
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import netutils
41
from ganeti import ssconf
42
from ganeti import runtime
43

    
44

    
45
class SchedulerBreakout(Exception):
46
  """Exception used to get out of the scheduler loop
47

48
  """
49

    
50

    
51
def AsyncoreDelayFunction(timeout):
52
  """Asyncore-compatible scheduler delay function.
53

54
  This is a delay function for sched that, rather than actually sleeping,
55
  executes asyncore events happening in the meantime.
56

57
  After an event has occurred, rather than returning, it raises a
58
  SchedulerBreakout exception, which will force the current scheduler.run()
59
  invocation to terminate, so that we can also check for signals. The main loop
60
  will then call the scheduler run again, which will allow it to actually
61
  process any due events.
62

63
  This is needed because scheduler.run() doesn't support a count=..., as
64
  asyncore loop, and the scheduler module documents throwing exceptions from
65
  inside the delay function as an allowed usage model.
66

67
  """
68
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
69
  raise SchedulerBreakout()
70

    
71

    
72
class AsyncoreScheduler(sched.scheduler):
73
  """Event scheduler integrated with asyncore
74

75
  """
76
  def __init__(self, timefunc):
77
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
78

    
79

    
80
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
81
  """Base Ganeti Asyncore Dispacher
82

83
  """
84
  # this method is overriding an asyncore.dispatcher method
85
  def handle_error(self):
86
    """Log an error in handling any request, and proceed.
87

88
    """
89
    logging.exception("Error while handling asyncore request")
90

    
91
  # this method is overriding an asyncore.dispatcher method
92
  def writable(self):
93
    """Most of the time we don't want to check for writability.
94

95
    """
96
    return False
97

    
98

    
99
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
100
  """A stream server to use with asyncore.
101

102
  Each request is accepted, and then dispatched to a separate asyncore
103
  dispatcher to handle.
104

105
  """
106

    
107
  _REQUEST_QUEUE_SIZE = 5
108

    
109
  def __init__(self, family, address):
110
    """Constructor for AsyncUnixStreamSocket
111

112
    @type family: integer
113
    @param family: socket family (one of socket.AF_*)
114
    @type address: address family dependent
115
    @param address: address to bind the socket to
116

117
    """
118
    GanetiBaseAsyncoreDispatcher.__init__(self)
119
    self.family = family
120
    self.create_socket(self.family, socket.SOCK_STREAM)
121
    self.set_reuse_addr()
122
    self.bind(address)
123
    self.listen(self._REQUEST_QUEUE_SIZE)
124

    
125
  # this method is overriding an asyncore.dispatcher method
126
  def handle_accept(self):
127
    """Accept a new client connection.
128

129
    Creates a new instance of the handler class, which will use asyncore to
130
    serve the client.
131

132
    """
133
    accept_result = utils.IgnoreSignals(self.accept)
134
    if accept_result is not None:
135
      connected_socket, client_address = accept_result
136
      if self.family == socket.AF_UNIX:
137
        # override the client address, as for unix sockets nothing meaningful
138
        # is passed in from accept anyway
139
        client_address = netutils.GetSocketCredentials(connected_socket)
140
      logging.info("Accepted connection from %s",
141
                   netutils.FormatAddress(client_address, family=self.family))
142
      self.handle_connection(connected_socket, client_address)
143

    
144
  def handle_connection(self, connected_socket, client_address):
145
    """Handle an already accepted connection.
146

147
    """
148
    raise NotImplementedError
149

    
150

    
151
class AsyncTerminatedMessageStream(asynchat.async_chat):
152
  """A terminator separated message stream asyncore module.
153

154
  Handles a stream connection receiving messages terminated by a defined
155
  separator. For each complete message handle_message is called.
156

157
  """
158
  def __init__(self, connected_socket, peer_address, terminator, family,
159
               unhandled_limit):
160
    """AsyncTerminatedMessageStream constructor.
161

162
    @type connected_socket: socket.socket
163
    @param connected_socket: connected stream socket to receive messages from
164
    @param peer_address: family-specific peer address
165
    @type terminator: string
166
    @param terminator: terminator separating messages in the stream
167
    @type family: integer
168
    @param family: socket family
169
    @type unhandled_limit: integer or None
170
    @param unhandled_limit: maximum unanswered messages
171

172
    """
173
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
174
    # using a positional argument rather than a keyword one.
175
    asynchat.async_chat.__init__(self, connected_socket)
176
    self.connected_socket = connected_socket
177
    # on python 2.4 there is no "family" attribute for the socket class
178
    # FIXME: when we move to python 2.5 or above remove the family parameter
179
    #self.family = self.connected_socket.family
180
    self.family = family
181
    self.peer_address = peer_address
182
    self.terminator = terminator
183
    self.unhandled_limit = unhandled_limit
184
    self.set_terminator(terminator)
185
    self.ibuffer = []
186
    self.receive_count = 0
187
    self.send_count = 0
188
    self.oqueue = collections.deque()
189
    self.iqueue = collections.deque()
190

    
191
  # this method is overriding an asynchat.async_chat method
192
  def collect_incoming_data(self, data):
193
    self.ibuffer.append(data)
194

    
195
  def _can_handle_message(self):
196
    return (self.unhandled_limit is None or
197
            (self.receive_count < self.send_count + self.unhandled_limit) and
198
             not self.iqueue)
199

    
200
  # this method is overriding an asynchat.async_chat method
201
  def found_terminator(self):
202
    message = "".join(self.ibuffer)
203
    self.ibuffer = []
204
    message_id = self.receive_count
205
    # We need to increase the receive_count after checking if the message can
206
    # be handled, but before calling handle_message
207
    can_handle = self._can_handle_message()
208
    self.receive_count += 1
209
    if can_handle:
210
      self.handle_message(message, message_id)
211
    else:
212
      self.iqueue.append((message, message_id))
213

    
214
  def handle_message(self, message, message_id):
215
    """Handle a terminated message.
216

217
    @type message: string
218
    @param message: message to handle
219
    @type message_id: integer
220
    @param message_id: stream's message sequence number
221

222
    """
223
    pass
224
    # TODO: move this method to raise NotImplementedError
225
    # raise NotImplementedError
226

    
227
  def send_message(self, message):
228
    """Send a message to the remote peer. This function is thread-safe.
229

230
    @type message: string
231
    @param message: message to send, without the terminator
232

233
    @warning: If calling this function from a thread different than the one
234
    performing the main asyncore loop, remember that you have to wake that one
235
    up.
236

237
    """
238
    # If we just append the message we received to the output queue, this
239
    # function can be safely called by multiple threads at the same time, and
240
    # we don't need locking, since deques are thread safe. handle_write in the
241
    # asyncore thread will handle the next input message if there are any
242
    # enqueued.
243
    self.oqueue.append(message)
244

    
245
  # this method is overriding an asyncore.dispatcher method
246
  def readable(self):
247
    # read from the socket if we can handle the next requests
248
    return self._can_handle_message() and asynchat.async_chat.readable(self)
249

    
250
  # this method is overriding an asyncore.dispatcher method
251
  def writable(self):
252
    # the output queue may become full just after we called writable. This only
253
    # works if we know we'll have something else waking us up from the select,
254
    # in such case, anyway.
255
    return asynchat.async_chat.writable(self) or self.oqueue
256

    
257
  # this method is overriding an asyncore.dispatcher method
258
  def handle_write(self):
259
    if self.oqueue:
260
      # if we have data in the output queue, then send_message was called.
261
      # this means we can process one more message from the input queue, if
262
      # there are any.
263
      data = self.oqueue.popleft()
264
      self.push(data + self.terminator)
265
      self.send_count += 1
266
      if self.iqueue:
267
        self.handle_message(*self.iqueue.popleft())
268
    self.initiate_send()
269

    
270
  def close_log(self):
271
    logging.info("Closing connection from %s",
272
                 netutils.FormatAddress(self.peer_address, family=self.family))
273
    self.close()
274

    
275
  # this method is overriding an asyncore.dispatcher method
276
  def handle_expt(self):
277
    self.close_log()
278

    
279
  # this method is overriding an asyncore.dispatcher method
280
  def handle_error(self):
281
    """Log an error in handling any request, and proceed.
282

283
    """
284
    logging.exception("Error while handling asyncore request")
285
    self.close_log()
286

    
287

    
288
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
289
  """An improved asyncore udp socket.
290

291
  """
292
  def __init__(self, family):
293
    """Constructor for AsyncUDPSocket
294

295
    """
296
    GanetiBaseAsyncoreDispatcher.__init__(self)
297
    self._out_queue = []
298
    self._family = family
299
    self.create_socket(family, socket.SOCK_DGRAM)
300

    
301
  # this method is overriding an asyncore.dispatcher method
302
  def handle_connect(self):
303
    # Python thinks that the first udp message from a source qualifies as a
304
    # "connect" and further ones are part of the same connection. We beg to
305
    # differ and treat all messages equally.
306
    pass
307

    
308
  # this method is overriding an asyncore.dispatcher method
309
  def handle_read(self):
310
    recv_result = utils.IgnoreSignals(self.recvfrom,
311
                                      constants.MAX_UDP_DATA_SIZE)
312
    if recv_result is not None:
313
      payload, address = recv_result
314
      if self._family == socket.AF_INET6:
315
        # we ignore 'flow info' and 'scope id' as we don't need them
316
        ip, port, _, _ = address
317
      else:
318
        ip, port = address
319

    
320
      self.handle_datagram(payload, ip, port)
321

    
322
  def handle_datagram(self, payload, ip, port):
323
    """Handle an already read udp datagram
324

325
    """
326
    raise NotImplementedError
327

    
328
  # this method is overriding an asyncore.dispatcher method
329
  def writable(self):
330
    # We should check whether we can write to the socket only if we have
331
    # something scheduled to be written
332
    return bool(self._out_queue)
333

    
334
  # this method is overriding an asyncore.dispatcher method
335
  def handle_write(self):
336
    if not self._out_queue:
337
      logging.error("handle_write called with empty output queue")
338
      return
339
    (ip, port, payload) = self._out_queue[0]
340
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
341
    self._out_queue.pop(0)
342

    
343
  def enqueue_send(self, ip, port, payload):
344
    """Enqueue a datagram to be sent when possible
345

346
    """
347
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
348
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
349
                                    constants.MAX_UDP_DATA_SIZE))
350
    self._out_queue.append((ip, port, payload))
351

    
352
  def process_next_packet(self, timeout=0):
353
    """Process the next datagram, waiting for it if necessary.
354

355
    @type timeout: float
356
    @param timeout: how long to wait for data
357
    @rtype: boolean
358
    @return: True if some data has been handled, False otherwise
359

360
    """
361
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
362
    if result is not None and result & select.POLLIN:
363
      self.handle_read()
364
      return True
365
    else:
366
      return False
367

    
368

    
369
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
370
  """A way to notify the asyncore loop that something is going on.
371

372
  If an asyncore daemon is multithreaded when a thread tries to push some data
373
  to a socket, the main loop handling asynchronous requests might be sleeping
374
  waiting on a select(). To avoid this it can create an instance of the
375
  AsyncAwaker, which other threads can use to wake it up.
376

377
  """
378
  def __init__(self, signal_fn=None):
379
    """Constructor for AsyncAwaker
380

381
    @type signal_fn: function
382
    @param signal_fn: function to call when awaken
383

384
    """
385
    GanetiBaseAsyncoreDispatcher.__init__(self)
386
    assert signal_fn == None or callable(signal_fn)
387
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
388
                                                          socket.SOCK_STREAM)
389
    self.in_socket.setblocking(0)
390
    self.in_socket.shutdown(socket.SHUT_WR)
391
    self.out_socket.shutdown(socket.SHUT_RD)
392
    self.set_socket(self.in_socket)
393
    self.need_signal = True
394
    self.signal_fn = signal_fn
395
    self.connected = True
396

    
397
  # this method is overriding an asyncore.dispatcher method
398
  def handle_read(self):
399
    utils.IgnoreSignals(self.recv, 4096)
400
    if self.signal_fn:
401
      self.signal_fn()
402
    self.need_signal = True
403

    
404
  # this method is overriding an asyncore.dispatcher method
405
  def close(self):
406
    asyncore.dispatcher.close(self)
407
    self.out_socket.close()
408

    
409
  def signal(self):
410
    """Signal the asyncore main loop.
411

412
    Any data we send here will be ignored, but it will cause the select() call
413
    to return.
414

415
    """
416
    # Yes, there is a race condition here. No, we don't care, at worst we're
417
    # sending more than one wakeup token, which doesn't harm at all.
418
    if self.need_signal:
419
      self.need_signal = False
420
      self.out_socket.send("\0")
421

    
422

    
423
class Mainloop(object):
424
  """Generic mainloop for daemons
425

426
  @ivar scheduler: A sched.scheduler object, which can be used to register
427
    timed events
428

429
  """
430
  def __init__(self):
431
    """Constructs a new Mainloop instance.
432

433
    """
434
    self._signal_wait = []
435
    self.scheduler = AsyncoreScheduler(time.time)
436

    
437
  @utils.SignalHandled([signal.SIGCHLD])
438
  @utils.SignalHandled([signal.SIGTERM])
439
  @utils.SignalHandled([signal.SIGINT])
440
  def Run(self, signal_handlers=None):
441
    """Runs the mainloop.
442

443
    @type signal_handlers: dict
444
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
445

446
    """
447
    assert isinstance(signal_handlers, dict) and \
448
           len(signal_handlers) > 0, \
449
           "Broken SignalHandled decorator"
450
    running = True
451
    # Start actual main loop
452
    while running:
453
      if not self.scheduler.empty():
454
        try:
455
          self.scheduler.run()
456
        except SchedulerBreakout:
457
          pass
458
      else:
459
        asyncore.loop(count=1, use_poll=True)
460

    
461
      # Check whether a signal was raised
462
      for sig in signal_handlers:
463
        handler = signal_handlers[sig]
464
        if handler.called:
465
          self._CallSignalWaiters(sig)
466
          running = sig not in (signal.SIGTERM, signal.SIGINT)
467
          handler.Clear()
468

    
469
  def _CallSignalWaiters(self, signum):
470
    """Calls all signal waiters for a certain signal.
471

472
    @type signum: int
473
    @param signum: Signal number
474

475
    """
476
    for owner in self._signal_wait:
477
      owner.OnSignal(signum)
478

    
479
  def RegisterSignal(self, owner):
480
    """Registers a receiver for signal notifications
481

482
    The receiver must support a "OnSignal(self, signum)" function.
483

484
    @type owner: instance
485
    @param owner: Receiver
486

487
    """
488
    self._signal_wait.append(owner)
489

    
490

    
491
def _VerifyDaemonUser(daemon_name):
492
  """Verifies the process uid matches the configured uid.
493

494
  This method verifies that a daemon is started as the user it is
495
  intended to be run
496

497
  @param daemon_name: The name of daemon to be started
498
  @return: A tuple with the first item indicating success or not,
499
           the second item current uid and third with expected uid
500

501
  """
502
  getents = runtime.GetEnts()
503
  running_uid = os.getuid()
504
  daemon_uids = {
505
    constants.MASTERD: getents.masterd_uid,
506
    constants.RAPI: getents.rapi_uid,
507
    constants.NODED: getents.noded_uid,
508
    constants.CONFD: getents.confd_uid,
509
    }
510

    
511
  return (daemon_uids[daemon_name] == running_uid, running_uid,
512
          daemon_uids[daemon_name])
513

    
514

    
515
def _BeautifyError(err):
516
  """Try to format an error better.
517

518
  Since we're dealing with daemon startup errors, in many cases this
519
  will be due to socket error and such, so we try to format these cases better.
520

521
  @param err: an exception object
522
  @rtype: string
523
  @return: the formatted error description
524

525
  """
526
  try:
527
    if isinstance(err, socket.error):
528
      return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
529
    elif isinstance(err, EnvironmentError):
530
      if err.filename is None:
531
        return "%s (errno=%s)" % (err.strerror, err.errno)
532
      else:
533
        return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
534
                                            err.errno)
535
    else:
536
      return str(err)
537
  except Exception: # pylint: disable-msg=W0703
538
    logging.exception("Error while handling existing error %s", err)
539
    return "%s" % str(err)
540

    
541

    
542
def GenericMain(daemon_name, optionparser,
543
                check_fn, prepare_fn, exec_fn,
544
                multithreaded=False, console_logging=False,
545
                default_ssl_cert=None, default_ssl_key=None):
546
  """Shared main function for daemons.
547

548
  @type daemon_name: string
549
  @param daemon_name: daemon name
550
  @type optionparser: optparse.OptionParser
551
  @param optionparser: initialized optionparser with daemon-specific options
552
                       (common -f -d options will be handled by this module)
553
  @type check_fn: function which accepts (options, args)
554
  @param check_fn: function that checks start conditions and exits if they're
555
                   not met
556
  @type prepare_fn: function which accepts (options, args)
557
  @param prepare_fn: function that is run before forking, or None;
558
      it's result will be passed as the third parameter to exec_fn, or
559
      if None was passed in, we will just pass None to exec_fn
560
  @type exec_fn: function which accepts (options, args, prepare_results)
561
  @param exec_fn: function that's executed with the daemon's pid file held, and
562
                  runs the daemon itself.
563
  @type multithreaded: bool
564
  @param multithreaded: Whether the daemon uses threads
565
  @type console_logging: boolean
566
  @param console_logging: if True, the daemon will fall back to the system
567
                          console if logging fails
568
  @type default_ssl_cert: string
569
  @param default_ssl_cert: Default SSL certificate path
570
  @type default_ssl_key: string
571
  @param default_ssl_key: Default SSL key path
572

573
  """
574
  optionparser.add_option("-f", "--foreground", dest="fork",
575
                          help="Don't detach from the current terminal",
576
                          default=True, action="store_false")
577
  optionparser.add_option("-d", "--debug", dest="debug",
578
                          help="Enable some debug messages",
579
                          default=False, action="store_true")
580
  optionparser.add_option("--syslog", dest="syslog",
581
                          help="Enable logging to syslog (except debug"
582
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
583
                          constants.SYSLOG_USAGE,
584
                          default=constants.SYSLOG_USAGE,
585
                          choices=["no", "yes", "only"])
586

    
587
  if daemon_name in constants.DAEMONS_PORTS:
588
    default_bind_address = constants.IP4_ADDRESS_ANY
589
    family = ssconf.SimpleStore().GetPrimaryIPFamily()
590
    # family will default to AF_INET if there is no ssconf file (e.g. when
591
    # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
592
    # <= 2.2 can not be AF_INET6
593
    if family == netutils.IP6Address.family:
594
      default_bind_address = constants.IP6_ADDRESS_ANY
595

    
596
    default_port = netutils.GetDaemonPort(daemon_name)
597

    
598
    # For networked daemons we allow choosing the port and bind address
599
    optionparser.add_option("-p", "--port", dest="port",
600
                            help="Network port (default: %s)" % default_port,
601
                            default=default_port, type="int")
602
    optionparser.add_option("-b", "--bind", dest="bind_address",
603
                            help=("Bind address (default: '%s')" %
604
                                  default_bind_address),
605
                            default=default_bind_address, metavar="ADDRESS")
606

    
607
  if default_ssl_key is not None and default_ssl_cert is not None:
608
    optionparser.add_option("--no-ssl", dest="ssl",
609
                            help="Do not secure HTTP protocol with SSL",
610
                            default=True, action="store_false")
611
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
612
                            help=("SSL key path (default: %s)" %
613
                                  default_ssl_key),
614
                            default=default_ssl_key, type="string",
615
                            metavar="SSL_KEY_PATH")
616
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
617
                            help=("SSL certificate path (default: %s)" %
618
                                  default_ssl_cert),
619
                            default=default_ssl_cert, type="string",
620
                            metavar="SSL_CERT_PATH")
621

    
622
  # Disable the use of fork(2) if the daemon uses threads
623
  if multithreaded:
624
    utils.DisableFork()
625

    
626
  options, args = optionparser.parse_args()
627

    
628
  if getattr(options, "ssl", False):
629
    ssl_paths = {
630
      "certificate": options.ssl_cert,
631
      "key": options.ssl_key,
632
      }
633

    
634
    for name, path in ssl_paths.iteritems():
635
      if not os.path.isfile(path):
636
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
637
        sys.exit(constants.EXIT_FAILURE)
638

    
639
    # TODO: By initiating http.HttpSslParams here we would only read the files
640
    # once and have a proper validation (isfile returns False on directories)
641
    # at the same time.
642

    
643
  result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
644
  if not result:
645
    msg = ("%s started using wrong user ID (%d), expected %d" %
646
           (daemon_name, running_uid, expected_uid))
647
    print >> sys.stderr, msg
648
    sys.exit(constants.EXIT_FAILURE)
649

    
650
  if check_fn is not None:
651
    check_fn(options, args)
652

    
653
  if options.fork:
654
    utils.CloseFDs()
655
    wpipe = utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
656
  else:
657
    wpipe = None
658

    
659
  utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
660
  try:
661
    try:
662
      utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
663
                         debug=options.debug,
664
                         stderr_logging=not options.fork,
665
                         multithreaded=multithreaded,
666
                         program=daemon_name,
667
                         syslog=options.syslog,
668
                         console_logging=console_logging)
669
      if callable(prepare_fn):
670
        prep_results = prepare_fn(options, args)
671
      else:
672
        prep_results = None
673
      logging.info("%s daemon startup", daemon_name)
674
    except Exception, err:
675
      utils.WriteErrorToFD(wpipe, _BeautifyError(err))
676
      raise
677

    
678
    if wpipe is not None:
679
      # we're done with the preparation phase, we close the pipe to
680
      # let the parent know it's safe to exit
681
      os.close(wpipe)
682

    
683
    exec_fn(options, args, prep_results)
684
  finally:
685
    utils.RemovePidFile(utils.DaemonPidFileName(daemon_name))