Merge branch 'stable-2.8' into stable-2.9
[ganeti-local] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import asynchat
27 import collections
28 import os
29 import signal
30 import logging
31 import sched
32 import time
33 import socket
34 import select
35 import sys
36
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import netutils
41 from ganeti import ssconf
42 from ganeti import runtime
43 from ganeti import compat
44
45
46 class SchedulerBreakout(Exception):
47   """Exception used to get out of the scheduler loop
48
49   """
50
51
52 def AsyncoreDelayFunction(timeout):
53   """Asyncore-compatible scheduler delay function.
54
55   This is a delay function for sched that, rather than actually sleeping,
56   executes asyncore events happening in the meantime.
57
58   After an event has occurred, rather than returning, it raises a
59   SchedulerBreakout exception, which will force the current scheduler.run()
60   invocation to terminate, so that we can also check for signals. The main loop
61   will then call the scheduler run again, which will allow it to actually
62   process any due events.
63
64   This is needed because scheduler.run() doesn't support a count=..., as
65   asyncore loop, and the scheduler module documents throwing exceptions from
66   inside the delay function as an allowed usage model.
67
68   """
69   asyncore.loop(timeout=timeout, count=1, use_poll=True)
70   raise SchedulerBreakout()
71
72
73 class AsyncoreScheduler(sched.scheduler):
74   """Event scheduler integrated with asyncore
75
76   """
77   def __init__(self, timefunc):
78     """Initializes this class.
79
80     """
81     sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
82     self._max_delay = None
83
84   def run(self, max_delay=None): # pylint: disable=W0221
85     """Run any pending events.
86
87     @type max_delay: None or number
88     @param max_delay: Maximum delay (useful if caller has timeouts running)
89
90     """
91     assert self._max_delay is None
92
93     # The delay function used by the scheduler can't be different on each run,
94     # hence an instance variable must be used.
95     if max_delay is None:
96       self._max_delay = None
97     else:
98       self._max_delay = utils.RunningTimeout(max_delay, False)
99
100     try:
101       return sched.scheduler.run(self)
102     finally:
103       self._max_delay = None
104
105   def _LimitedDelay(self, duration):
106     """Custom delay function for C{sched.scheduler}.
107
108     """
109     if self._max_delay is None:
110       timeout = duration
111     else:
112       timeout = min(duration, self._max_delay.Remaining())
113
114     return AsyncoreDelayFunction(timeout)
115
116
117 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
118   """Base Ganeti Asyncore Dispacher
119
120   """
121   # this method is overriding an asyncore.dispatcher method
122   def handle_error(self):
123     """Log an error in handling any request, and proceed.
124
125     """
126     logging.exception("Error while handling asyncore request")
127
128   # this method is overriding an asyncore.dispatcher method
129   def writable(self):
130     """Most of the time we don't want to check for writability.
131
132     """
133     return False
134
135
136 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
137   """A stream server to use with asyncore.
138
139   Each request is accepted, and then dispatched to a separate asyncore
140   dispatcher to handle.
141
142   """
143
144   _REQUEST_QUEUE_SIZE = 5
145
146   def __init__(self, family, address):
147     """Constructor for AsyncUnixStreamSocket
148
149     @type family: integer
150     @param family: socket family (one of socket.AF_*)
151     @type address: address family dependent
152     @param address: address to bind the socket to
153
154     """
155     GanetiBaseAsyncoreDispatcher.__init__(self)
156     self.family = family
157     self.create_socket(self.family, socket.SOCK_STREAM)
158     self.set_reuse_addr()
159     self.bind(address)
160     self.listen(self._REQUEST_QUEUE_SIZE)
161
162   # this method is overriding an asyncore.dispatcher method
163   def handle_accept(self):
164     """Accept a new client connection.
165
166     Creates a new instance of the handler class, which will use asyncore to
167     serve the client.
168
169     """
170     accept_result = utils.IgnoreSignals(self.accept)
171     if accept_result is not None:
172       connected_socket, client_address = accept_result
173       if self.family == socket.AF_UNIX:
174         # override the client address, as for unix sockets nothing meaningful
175         # is passed in from accept anyway
176         client_address = netutils.GetSocketCredentials(connected_socket)
177       logging.info("Accepted connection from %s",
178                    netutils.FormatAddress(client_address, family=self.family))
179       self.handle_connection(connected_socket, client_address)
180
181   def handle_connection(self, connected_socket, client_address):
182     """Handle an already accepted connection.
183
184     """
185     raise NotImplementedError
186
187
188 class AsyncTerminatedMessageStream(asynchat.async_chat):
189   """A terminator separated message stream asyncore module.
190
191   Handles a stream connection receiving messages terminated by a defined
192   separator. For each complete message handle_message is called.
193
194   """
195   def __init__(self, connected_socket, peer_address, terminator, family,
196                unhandled_limit):
197     """AsyncTerminatedMessageStream constructor.
198
199     @type connected_socket: socket.socket
200     @param connected_socket: connected stream socket to receive messages from
201     @param peer_address: family-specific peer address
202     @type terminator: string
203     @param terminator: terminator separating messages in the stream
204     @type family: integer
205     @param family: socket family
206     @type unhandled_limit: integer or None
207     @param unhandled_limit: maximum unanswered messages
208
209     """
210     # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
211     # using a positional argument rather than a keyword one.
212     asynchat.async_chat.__init__(self, connected_socket)
213     self.connected_socket = connected_socket
214     # on python 2.4 there is no "family" attribute for the socket class
215     # FIXME: when we move to python 2.5 or above remove the family parameter
216     #self.family = self.connected_socket.family
217     self.family = family
218     self.peer_address = peer_address
219     self.terminator = terminator
220     self.unhandled_limit = unhandled_limit
221     self.set_terminator(terminator)
222     self.ibuffer = []
223     self.receive_count = 0
224     self.send_count = 0
225     self.oqueue = collections.deque()
226     self.iqueue = collections.deque()
227
228   # this method is overriding an asynchat.async_chat method
229   def collect_incoming_data(self, data):
230     self.ibuffer.append(data)
231
232   def _can_handle_message(self):
233     return (self.unhandled_limit is None or
234             (self.receive_count < self.send_count + self.unhandled_limit) and
235              not self.iqueue)
236
237   # this method is overriding an asynchat.async_chat method
238   def found_terminator(self):
239     message = "".join(self.ibuffer)
240     self.ibuffer = []
241     message_id = self.receive_count
242     # We need to increase the receive_count after checking if the message can
243     # be handled, but before calling handle_message
244     can_handle = self._can_handle_message()
245     self.receive_count += 1
246     if can_handle:
247       self.handle_message(message, message_id)
248     else:
249       self.iqueue.append((message, message_id))
250
251   def handle_message(self, message, message_id):
252     """Handle a terminated message.
253
254     @type message: string
255     @param message: message to handle
256     @type message_id: integer
257     @param message_id: stream's message sequence number
258
259     """
260     pass
261     # TODO: move this method to raise NotImplementedError
262     # raise NotImplementedError
263
264   def send_message(self, message):
265     """Send a message to the remote peer. This function is thread-safe.
266
267     @type message: string
268     @param message: message to send, without the terminator
269
270     @warning: If calling this function from a thread different than the one
271     performing the main asyncore loop, remember that you have to wake that one
272     up.
273
274     """
275     # If we just append the message we received to the output queue, this
276     # function can be safely called by multiple threads at the same time, and
277     # we don't need locking, since deques are thread safe. handle_write in the
278     # asyncore thread will handle the next input message if there are any
279     # enqueued.
280     self.oqueue.append(message)
281
282   # this method is overriding an asyncore.dispatcher method
283   def readable(self):
284     # read from the socket if we can handle the next requests
285     return self._can_handle_message() and asynchat.async_chat.readable(self)
286
287   # this method is overriding an asyncore.dispatcher method
288   def writable(self):
289     # the output queue may become full just after we called writable. This only
290     # works if we know we'll have something else waking us up from the select,
291     # in such case, anyway.
292     return asynchat.async_chat.writable(self) or self.oqueue
293
294   # this method is overriding an asyncore.dispatcher method
295   def handle_write(self):
296     if self.oqueue:
297       # if we have data in the output queue, then send_message was called.
298       # this means we can process one more message from the input queue, if
299       # there are any.
300       data = self.oqueue.popleft()
301       self.push(data + self.terminator)
302       self.send_count += 1
303       if self.iqueue:
304         self.handle_message(*self.iqueue.popleft())
305     self.initiate_send()
306
307   def close_log(self):
308     logging.info("Closing connection from %s",
309                  netutils.FormatAddress(self.peer_address, family=self.family))
310     self.close()
311
312   # this method is overriding an asyncore.dispatcher method
313   def handle_expt(self):
314     self.close_log()
315
316   # this method is overriding an asyncore.dispatcher method
317   def handle_error(self):
318     """Log an error in handling any request, and proceed.
319
320     """
321     logging.exception("Error while handling asyncore request")
322     self.close_log()
323
324
325 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
326   """An improved asyncore udp socket.
327
328   """
329   def __init__(self, family):
330     """Constructor for AsyncUDPSocket
331
332     """
333     GanetiBaseAsyncoreDispatcher.__init__(self)
334     self._out_queue = []
335     self._family = family
336     self.create_socket(family, socket.SOCK_DGRAM)
337
338   # this method is overriding an asyncore.dispatcher method
339   def handle_connect(self):
340     # Python thinks that the first udp message from a source qualifies as a
341     # "connect" and further ones are part of the same connection. We beg to
342     # differ and treat all messages equally.
343     pass
344
345   # this method is overriding an asyncore.dispatcher method
346   def handle_read(self):
347     recv_result = utils.IgnoreSignals(self.recvfrom,
348                                       constants.MAX_UDP_DATA_SIZE)
349     if recv_result is not None:
350       payload, address = recv_result
351       if self._family == socket.AF_INET6:
352         # we ignore 'flow info' and 'scope id' as we don't need them
353         ip, port, _, _ = address
354       else:
355         ip, port = address
356
357       self.handle_datagram(payload, ip, port)
358
359   def handle_datagram(self, payload, ip, port):
360     """Handle an already read udp datagram
361
362     """
363     raise NotImplementedError
364
365   # this method is overriding an asyncore.dispatcher method
366   def writable(self):
367     # We should check whether we can write to the socket only if we have
368     # something scheduled to be written
369     return bool(self._out_queue)
370
371   # this method is overriding an asyncore.dispatcher method
372   def handle_write(self):
373     if not self._out_queue:
374       logging.error("handle_write called with empty output queue")
375       return
376     (ip, port, payload) = self._out_queue[0]
377     utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
378     self._out_queue.pop(0)
379
380   def enqueue_send(self, ip, port, payload):
381     """Enqueue a datagram to be sent when possible
382
383     """
384     if len(payload) > constants.MAX_UDP_DATA_SIZE:
385       raise errors.UdpDataSizeError("Packet too big: %s > %s" % (len(payload),
386                                     constants.MAX_UDP_DATA_SIZE))
387     self._out_queue.append((ip, port, payload))
388
389   def process_next_packet(self, timeout=0):
390     """Process the next datagram, waiting for it if necessary.
391
392     @type timeout: float
393     @param timeout: how long to wait for data
394     @rtype: boolean
395     @return: True if some data has been handled, False otherwise
396
397     """
398     result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
399     if result is not None and result & select.POLLIN:
400       self.handle_read()
401       return True
402     else:
403       return False
404
405
406 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
407   """A way to notify the asyncore loop that something is going on.
408
409   If an asyncore daemon is multithreaded when a thread tries to push some data
410   to a socket, the main loop handling asynchronous requests might be sleeping
411   waiting on a select(). To avoid this it can create an instance of the
412   AsyncAwaker, which other threads can use to wake it up.
413
414   """
415   def __init__(self, signal_fn=None):
416     """Constructor for AsyncAwaker
417
418     @type signal_fn: function
419     @param signal_fn: function to call when awaken
420
421     """
422     GanetiBaseAsyncoreDispatcher.__init__(self)
423     assert signal_fn is None or callable(signal_fn)
424     (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
425                                                           socket.SOCK_STREAM)
426     self.in_socket.setblocking(0)
427     self.in_socket.shutdown(socket.SHUT_WR)
428     self.out_socket.shutdown(socket.SHUT_RD)
429     self.set_socket(self.in_socket)
430     self.need_signal = True
431     self.signal_fn = signal_fn
432     self.connected = True
433
434   # this method is overriding an asyncore.dispatcher method
435   def handle_read(self):
436     utils.IgnoreSignals(self.recv, 4096)
437     if self.signal_fn:
438       self.signal_fn()
439     self.need_signal = True
440
441   # this method is overriding an asyncore.dispatcher method
442   def close(self):
443     asyncore.dispatcher.close(self)
444     self.out_socket.close()
445
446   def signal(self):
447     """Signal the asyncore main loop.
448
449     Any data we send here will be ignored, but it will cause the select() call
450     to return.
451
452     """
453     # Yes, there is a race condition here. No, we don't care, at worst we're
454     # sending more than one wakeup token, which doesn't harm at all.
455     if self.need_signal:
456       self.need_signal = False
457       self.out_socket.send("\0")
458
459
460 class _ShutdownCheck:
461   """Logic for L{Mainloop} shutdown.
462
463   """
464   def __init__(self, fn):
465     """Initializes this class.
466
467     @type fn: callable
468     @param fn: Function returning C{None} if mainloop can be stopped or a
469       duration in seconds after which the function should be called again
470     @see: L{Mainloop.Run}
471
472     """
473     assert callable(fn)
474
475     self._fn = fn
476     self._defer = None
477
478   def CanShutdown(self):
479     """Checks whether mainloop can be stopped.
480
481     @rtype: bool
482
483     """
484     if self._defer and self._defer.Remaining() > 0:
485       # A deferred check has already been scheduled
486       return False
487
488     # Ask mainloop driver whether we can stop or should check again
489     timeout = self._fn()
490
491     if timeout is None:
492       # Yes, can stop mainloop
493       return True
494
495     # Schedule another check in the future
496     self._defer = utils.RunningTimeout(timeout, True)
497
498     return False
499
500
501 class Mainloop(object):
502   """Generic mainloop for daemons
503
504   @ivar scheduler: A sched.scheduler object, which can be used to register
505     timed events
506
507   """
508   _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
509
510   def __init__(self):
511     """Constructs a new Mainloop instance.
512
513     """
514     self._signal_wait = []
515     self.scheduler = AsyncoreScheduler(time.time)
516
517     # Resolve uid/gids used
518     runtime.GetEnts()
519
520   @utils.SignalHandled([signal.SIGCHLD])
521   @utils.SignalHandled([signal.SIGTERM])
522   @utils.SignalHandled([signal.SIGINT])
523   def Run(self, shutdown_wait_fn=None, signal_handlers=None):
524     """Runs the mainloop.
525
526     @type shutdown_wait_fn: callable
527     @param shutdown_wait_fn: Function to check whether loop can be terminated;
528       B{important}: function must be idempotent and must return either None
529       for shutting down or a timeout for another call
530     @type signal_handlers: dict
531     @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
532
533     """
534     assert isinstance(signal_handlers, dict) and \
535            len(signal_handlers) > 0, \
536            "Broken SignalHandled decorator"
537
538     # Counter for received signals
539     shutdown_signals = 0
540
541     # Logic to wait for shutdown
542     shutdown_waiter = None
543
544     # Start actual main loop
545     while True:
546       if shutdown_signals == 1 and shutdown_wait_fn is not None:
547         if shutdown_waiter is None:
548           shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)
549
550         # Let mainloop driver decide if we can already abort
551         if shutdown_waiter.CanShutdown():
552           break
553
554         # Re-evaluate in a second
555         timeout = 1.0
556
557       elif shutdown_signals >= 1:
558         # Abort loop if more than one signal has been sent or no callback has
559         # been given
560         break
561
562       else:
563         # Wait forever on I/O events
564         timeout = None
565
566       if self.scheduler.empty():
567         asyncore.loop(count=1, timeout=timeout, use_poll=True)
568       else:
569         try:
570           self.scheduler.run(max_delay=timeout)
571         except SchedulerBreakout:
572           pass
573
574       # Check whether a signal was raised
575       for (sig, handler) in signal_handlers.items():
576         if handler.called:
577           self._CallSignalWaiters(sig)
578           if sig in (signal.SIGTERM, signal.SIGINT):
579             logging.info("Received signal %s asking for shutdown", sig)
580             shutdown_signals += 1
581           handler.Clear()
582
583   def _CallSignalWaiters(self, signum):
584     """Calls all signal waiters for a certain signal.
585
586     @type signum: int
587     @param signum: Signal number
588
589     """
590     for owner in self._signal_wait:
591       owner.OnSignal(signum)
592
593   def RegisterSignal(self, owner):
594     """Registers a receiver for signal notifications
595
596     The receiver must support a "OnSignal(self, signum)" function.
597
598     @type owner: instance
599     @param owner: Receiver
600
601     """
602     self._signal_wait.append(owner)
603
604
605 def _VerifyDaemonUser(daemon_name):
606   """Verifies the process uid matches the configured uid.
607
608   This method verifies that a daemon is started as the user it is
609   intended to be run
610
611   @param daemon_name: The name of daemon to be started
612   @return: A tuple with the first item indicating success or not,
613            the second item current uid and third with expected uid
614
615   """
616   getents = runtime.GetEnts()
617   running_uid = os.getuid()
618   daemon_uids = {
619     constants.MASTERD: getents.masterd_uid,
620     constants.RAPI: getents.rapi_uid,
621     constants.NODED: getents.noded_uid,
622     constants.CONFD: getents.confd_uid,
623     }
624   assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name
625
626   return (daemon_uids[daemon_name] == running_uid, running_uid,
627           daemon_uids[daemon_name])
628
629
630 def _BeautifyError(err):
631   """Try to format an error better.
632
633   Since we're dealing with daemon startup errors, in many cases this
634   will be due to socket error and such, so we try to format these cases better.
635
636   @param err: an exception object
637   @rtype: string
638   @return: the formatted error description
639
640   """
641   try:
642     if isinstance(err, socket.error):
643       return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
644     elif isinstance(err, EnvironmentError):
645       if err.filename is None:
646         return "%s (errno=%s)" % (err.strerror, err.errno)
647       else:
648         return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
649                                             err.errno)
650     else:
651       return str(err)
652   except Exception: # pylint: disable=W0703
653     logging.exception("Error while handling existing error %s", err)
654     return "%s" % str(err)
655
656
657 def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
658   """Handler for SIGHUP.
659
660   @param reopen_fn: List of callback functions for reopening log files
661
662   """
663   logging.info("Reopening log files after receiving SIGHUP")
664
665   for fn in reopen_fn:
666     if fn:
667       fn()
668
669
670 def GenericMain(daemon_name, optionparser,
671                 check_fn, prepare_fn, exec_fn,
672                 multithreaded=False, console_logging=False,
673                 default_ssl_cert=None, default_ssl_key=None):
674   """Shared main function for daemons.
675
676   @type daemon_name: string
677   @param daemon_name: daemon name
678   @type optionparser: optparse.OptionParser
679   @param optionparser: initialized optionparser with daemon-specific options
680                        (common -f -d options will be handled by this module)
681   @type check_fn: function which accepts (options, args)
682   @param check_fn: function that checks start conditions and exits if they're
683                    not met
684   @type prepare_fn: function which accepts (options, args)
685   @param prepare_fn: function that is run before forking, or None;
686       it's result will be passed as the third parameter to exec_fn, or
687       if None was passed in, we will just pass None to exec_fn
688   @type exec_fn: function which accepts (options, args, prepare_results)
689   @param exec_fn: function that's executed with the daemon's pid file held, and
690                   runs the daemon itself.
691   @type multithreaded: bool
692   @param multithreaded: Whether the daemon uses threads
693   @type console_logging: boolean
694   @param console_logging: if True, the daemon will fall back to the system
695                           console if logging fails
696   @type default_ssl_cert: string
697   @param default_ssl_cert: Default SSL certificate path
698   @type default_ssl_key: string
699   @param default_ssl_key: Default SSL key path
700
701   """
702   optionparser.add_option("-f", "--foreground", dest="fork",
703                           help="Don't detach from the current terminal",
704                           default=True, action="store_false")
705   optionparser.add_option("-d", "--debug", dest="debug",
706                           help="Enable some debug messages",
707                           default=False, action="store_true")
708   optionparser.add_option("--syslog", dest="syslog",
709                           help="Enable logging to syslog (except debug"
710                           " messages); one of 'no', 'yes' or 'only' [%s]" %
711                           constants.SYSLOG_USAGE,
712                           default=constants.SYSLOG_USAGE,
713                           choices=["no", "yes", "only"])
714
715   family = ssconf.SimpleStore().GetPrimaryIPFamily()
716   # family will default to AF_INET if there is no ssconf file (e.g. when
717   # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
718   # <= 2.2 can not be AF_INET6
719   if daemon_name in constants.DAEMONS_PORTS:
720     default_bind_address = constants.IP4_ADDRESS_ANY
721     if family == netutils.IP6Address.family:
722       default_bind_address = constants.IP6_ADDRESS_ANY
723
724     default_port = netutils.GetDaemonPort(daemon_name)
725
726     # For networked daemons we allow choosing the port and bind address
727     optionparser.add_option("-p", "--port", dest="port",
728                             help="Network port (default: %s)" % default_port,
729                             default=default_port, type="int")
730     optionparser.add_option("-b", "--bind", dest="bind_address",
731                             help=("Bind address (default: '%s')" %
732                                   default_bind_address),
733                             default=default_bind_address, metavar="ADDRESS")
734     optionparser.add_option("-i", "--interface", dest="bind_interface",
735                             help=("Bind interface"), metavar="INTERFACE")
736
737   if default_ssl_key is not None and default_ssl_cert is not None:
738     optionparser.add_option("--no-ssl", dest="ssl",
739                             help="Do not secure HTTP protocol with SSL",
740                             default=True, action="store_false")
741     optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
742                             help=("SSL key path (default: %s)" %
743                                   default_ssl_key),
744                             default=default_ssl_key, type="string",
745                             metavar="SSL_KEY_PATH")
746     optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
747                             help=("SSL certificate path (default: %s)" %
748                                   default_ssl_cert),
749                             default=default_ssl_cert, type="string",
750                             metavar="SSL_CERT_PATH")
751
752   # Disable the use of fork(2) if the daemon uses threads
753   if multithreaded:
754     utils.DisableFork()
755
756   options, args = optionparser.parse_args()
757
758   if getattr(options, "bind_interface", None) is not None:
759     if options.bind_address != default_bind_address:
760       msg = ("Can't specify both, bind address (%s) and bind interface (%s)" %
761              (options.bind_address, options.bind_interface))
762       print >> sys.stderr, msg
763       sys.exit(constants.EXIT_FAILURE)
764     interface_ip_addresses = \
765       netutils.GetInterfaceIpAddresses(options.bind_interface)
766     if family == netutils.IP6Address.family:
767       if_addresses = interface_ip_addresses[constants.IP6_VERSION]
768     else:
769       if_addresses = interface_ip_addresses[constants.IP4_VERSION]
770     if len(if_addresses) < 1:
771       msg = "Failed to find IP for interface %s" % options.bind_interace
772       print >> sys.stderr, msg
773       sys.exit(constants.EXIT_FAILURE)
774     options.bind_address = if_addresses[0]
775
776   if getattr(options, "ssl", False):
777     ssl_paths = {
778       "certificate": options.ssl_cert,
779       "key": options.ssl_key,
780       }
781
782     for name, path in ssl_paths.iteritems():
783       if not os.path.isfile(path):
784         print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
785         sys.exit(constants.EXIT_FAILURE)
786
787     # TODO: By initiating http.HttpSslParams here we would only read the files
788     # once and have a proper validation (isfile returns False on directories)
789     # at the same time.
790
791   result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
792   if not result:
793     msg = ("%s started using wrong user ID (%d), expected %d" %
794            (daemon_name, running_uid, expected_uid))
795     print >> sys.stderr, msg
796     sys.exit(constants.EXIT_FAILURE)
797
798   if check_fn is not None:
799     check_fn(options, args)
800
801   log_filename = constants.DAEMONS_LOGFILES[daemon_name]
802
803   if options.fork:
804     utils.CloseFDs()
805     (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename)
806   else:
807     (wpipe, stdio_reopen_fn) = (None, None)
808
809   log_reopen_fn = \
810     utils.SetupLogging(log_filename, daemon_name,
811                        debug=options.debug,
812                        stderr_logging=not options.fork,
813                        multithreaded=multithreaded,
814                        syslog=options.syslog,
815                        console_logging=console_logging)
816
817   # Reopen log file(s) on SIGHUP
818   signal.signal(signal.SIGHUP,
819                 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
820
821   try:
822     utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
823   except errors.PidFileLockError, err:
824     print >> sys.stderr, "Error while locking PID file:\n%s" % err
825     sys.exit(constants.EXIT_FAILURE)
826
827   try:
828     try:
829       logging.info("%s daemon startup", daemon_name)
830       if callable(prepare_fn):
831         prep_results = prepare_fn(options, args)
832       else:
833         prep_results = None
834     except Exception, err:
835       utils.WriteErrorToFD(wpipe, _BeautifyError(err))
836       raise
837
838     if wpipe is not None:
839       # we're done with the preparation phase, we close the pipe to
840       # let the parent know it's safe to exit
841       os.close(wpipe)
842
843     exec_fn(options, args, prep_results)
844   finally:
845     utils.RemoveFile(utils.DaemonPidFileName(daemon_name))