Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 93f1e606

History | View | Annotate | Download (28.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import asynchat
27
import collections
28
import os
29
import signal
30
import logging
31
import sched
32
import time
33
import socket
34
import select
35
import sys
36

    
37
from ganeti import utils
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import netutils
41
from ganeti import ssconf
42
from ganeti import runtime
43
from ganeti import compat
44

    
45

    
46
class SchedulerBreakout(Exception):
47
  """Exception used to get out of the scheduler loop
48

49
  """
50

    
51

    
52
def AsyncoreDelayFunction(timeout):
53
  """Asyncore-compatible scheduler delay function.
54

55
  This is a delay function for sched that, rather than actually sleeping,
56
  executes asyncore events happening in the meantime.
57

58
  After an event has occurred, rather than returning, it raises a
59
  SchedulerBreakout exception, which will force the current scheduler.run()
60
  invocation to terminate, so that we can also check for signals. The main loop
61
  will then call the scheduler run again, which will allow it to actually
62
  process any due events.
63

64
  This is needed because scheduler.run() doesn't support a count=..., as
65
  asyncore loop, and the scheduler module documents throwing exceptions from
66
  inside the delay function as an allowed usage model.
67

68
  """
69
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
70
  raise SchedulerBreakout()
71

    
72

    
73
class AsyncoreScheduler(sched.scheduler):
74
  """Event scheduler integrated with asyncore
75

76
  """
77
  def __init__(self, timefunc):
78
    """Initializes this class.
79

80
    """
81
    sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
82
    self._max_delay = None
83

    
84
  def run(self, max_delay=None): # pylint: disable=W0221
85
    """Run any pending events.
86

87
    @type max_delay: None or number
88
    @param max_delay: Maximum delay (useful if caller has timeouts running)
89

90
    """
91
    assert self._max_delay is None
92

    
93
    # The delay function used by the scheduler can't be different on each run,
94
    # hence an instance variable must be used.
95
    if max_delay is None:
96
      self._max_delay = None
97
    else:
98
      self._max_delay = utils.RunningTimeout(max_delay, False)
99

    
100
    try:
101
      return sched.scheduler.run(self)
102
    finally:
103
      self._max_delay = None
104

    
105
  def _LimitedDelay(self, duration):
106
    """Custom delay function for C{sched.scheduler}.
107

108
    """
109
    if self._max_delay is None:
110
      timeout = duration
111
    else:
112
      timeout = min(duration, self._max_delay.Remaining())
113

    
114
    return AsyncoreDelayFunction(timeout)
115

    
116

    
117
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
118
  """Base Ganeti Asyncore Dispacher
119

120
  """
121
  # this method is overriding an asyncore.dispatcher method
122
  def handle_error(self):
123
    """Log an error in handling any request, and proceed.
124

125
    """
126
    logging.exception("Error while handling asyncore request")
127

    
128
  # this method is overriding an asyncore.dispatcher method
129
  def writable(self):
130
    """Most of the time we don't want to check for writability.
131

132
    """
133
    return False
134

    
135

    
136
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
137
  """A stream server to use with asyncore.
138

139
  Each request is accepted, and then dispatched to a separate asyncore
140
  dispatcher to handle.
141

142
  """
143

    
144
  _REQUEST_QUEUE_SIZE = 5
145

    
146
  def __init__(self, family, address):
147
    """Constructor for AsyncStreamServer
148

149
    @type family: integer
150
    @param family: socket family (one of socket.AF_*)
151
    @type address: address family dependent
152
    @param address: address to bind the socket to
153

154
    """
155
    GanetiBaseAsyncoreDispatcher.__init__(self)
156
    self.family = family
157
    self.create_socket(self.family, socket.SOCK_STREAM)
158
    self.set_reuse_addr()
159
    self.bind(address)
160
    self.listen(self._REQUEST_QUEUE_SIZE)
161

    
162
  # this method is overriding an asyncore.dispatcher method
163
  def handle_accept(self):
164
    """Accept a new client connection.
165

166
    Creates a new instance of the handler class, which will use asyncore to
167
    serve the client.
168

169
    """
170
    accept_result = utils.IgnoreSignals(self.accept)
171
    if accept_result is not None:
172
      connected_socket, client_address = accept_result
173
      if self.family == socket.AF_UNIX:
174
        # override the client address, as for unix sockets nothing meaningful
175
        # is passed in from accept anyway
176
        client_address = netutils.GetSocketCredentials(connected_socket)
177
      logging.info("Accepted connection from %s",
178
                   netutils.FormatAddress(client_address, family=self.family))
179
      self.handle_connection(connected_socket, client_address)
180

    
181
  def handle_connection(self, connected_socket, client_address):
182
    """Handle an already accepted connection.
183

184
    """
185
    raise NotImplementedError
186

    
187

    
188
class AsyncTerminatedMessageStream(asynchat.async_chat):
189
  """A terminator separated message stream asyncore module.
190

191
  Handles a stream connection receiving messages terminated by a defined
192
  separator. For each complete message handle_message is called.
193

194
  """
195
  def __init__(self, connected_socket, peer_address, terminator, family,
196
               unhandled_limit):
197
    """AsyncTerminatedMessageStream constructor.
198

199
    @type connected_socket: socket.socket
200
    @param connected_socket: connected stream socket to receive messages from
201
    @param peer_address: family-specific peer address
202
    @type terminator: string
203
    @param terminator: terminator separating messages in the stream
204
    @type family: integer
205
    @param family: socket family
206
    @type unhandled_limit: integer or None
207
    @param unhandled_limit: maximum unanswered messages
208

209
    """
210
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
211
    # using a positional argument rather than a keyword one.
212
    asynchat.async_chat.__init__(self, connected_socket)
213
    self.connected_socket = connected_socket
214
    # on python 2.4 there is no "family" attribute for the socket class
215
    # FIXME: when we move to python 2.5 or above remove the family parameter
216
    #self.family = self.connected_socket.family
217
    self.family = family
218
    self.peer_address = peer_address
219
    self.terminator = terminator
220
    self.unhandled_limit = unhandled_limit
221
    self.set_terminator(terminator)
222
    self.ibuffer = []
223
    self.receive_count = 0
224
    self.send_count = 0
225
    self.oqueue = collections.deque()
226
    self.iqueue = collections.deque()
227

    
228
  # this method is overriding an asynchat.async_chat method
229
  def collect_incoming_data(self, data):
230
    self.ibuffer.append(data)
231

    
232
  def _can_handle_message(self):
233
    return (self.unhandled_limit is None or
234
            (self.receive_count < self.send_count + self.unhandled_limit) and
235
             not self.iqueue)
236

    
237
  # this method is overriding an asynchat.async_chat method
238
  def found_terminator(self):
239
    message = "".join(self.ibuffer)
240
    self.ibuffer = []
241
    message_id = self.receive_count
242
    # We need to increase the receive_count after checking if the message can
243
    # be handled, but before calling handle_message
244
    can_handle = self._can_handle_message()
245
    self.receive_count += 1
246
    if can_handle:
247
      self.handle_message(message, message_id)
248
    else:
249
      self.iqueue.append((message, message_id))
250

    
251
  def handle_message(self, message, message_id):
252
    """Handle a terminated message.
253

254
    @type message: string
255
    @param message: message to handle
256
    @type message_id: integer
257
    @param message_id: stream's message sequence number
258

259
    """
260
    pass
261
    # TODO: move this method to raise NotImplementedError
262
    # raise NotImplementedError
263

    
264
  def send_message(self, message):
265
    """Send a message to the remote peer. This function is thread-safe.
266

267
    @type message: string
268
    @param message: message to send, without the terminator
269

270
    @warning: If calling this function from a thread different than the one
271
    performing the main asyncore loop, remember that you have to wake that one
272
    up.
273

274
    """
275
    # If we just append the message we received to the output queue, this
276
    # function can be safely called by multiple threads at the same time, and
277
    # we don't need locking, since deques are thread safe. handle_write in the
278
    # asyncore thread will handle the next input message if there are any
279
    # enqueued.
280
    self.oqueue.append(message)
281

    
282
  # this method is overriding an asyncore.dispatcher method
283
  def readable(self):
284
    # read from the socket if we can handle the next requests
285
    return self._can_handle_message() and asynchat.async_chat.readable(self)
286

    
287
  # this method is overriding an asyncore.dispatcher method
288
  def writable(self):
289
    # the output queue may become full just after we called writable. This only
290
    # works if we know we'll have something else waking us up from the select,
291
    # in such case, anyway.
292
    return asynchat.async_chat.writable(self) or self.oqueue
293

    
294
  # this method is overriding an asyncore.dispatcher method
295
  def handle_write(self):
296
    if self.oqueue:
297
      # if we have data in the output queue, then send_message was called.
298
      # this means we can process one more message from the input queue, if
299
      # there are any.
300
      data = self.oqueue.popleft()
301
      self.push(data + self.terminator)
302
      self.send_count += 1
303
      if self.iqueue:
304
        self.handle_message(*self.iqueue.popleft())
305
    self.initiate_send()
306

    
307
  def close_log(self):
308
    logging.info("Closing connection from %s",
309
                 netutils.FormatAddress(self.peer_address, family=self.family))
310
    self.close()
311

    
312
  # this method is overriding an asyncore.dispatcher method
313
  def handle_expt(self):
314
    self.close_log()
315

    
316
  # this method is overriding an asyncore.dispatcher method
317
  def handle_error(self):
318
    """Log an error in handling any request, and proceed.
319

320
    """
321
    logging.exception("Error while handling asyncore request")
322
    self.close_log()
323

    
324

    
325
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
326
  """An improved asyncore udp socket.
327

328
  """
329
  def __init__(self, family):
330
    """Constructor for AsyncUDPSocket
331

332
    """
333
    GanetiBaseAsyncoreDispatcher.__init__(self)
334
    self._out_queue = []
335
    self._family = family
336
    self.create_socket(family, socket.SOCK_DGRAM)
337

    
338
  # this method is overriding an asyncore.dispatcher method
339
  def handle_connect(self):
340
    # Python thinks that the first udp message from a source qualifies as a
341
    # "connect" and further ones are part of the same connection. We beg to
342
    # differ and treat all messages equally.
343
    pass
344

    
345
  # this method is overriding an asyncore.dispatcher method
346
  def handle_read(self):
347
    recv_result = utils.IgnoreSignals(self.recvfrom,
348
                                      constants.MAX_UDP_DATA_SIZE)
349
    if recv_result is not None:
350
      payload, address = recv_result
351
      if self._family == socket.AF_INET6:
352
        # we ignore 'flow info' and 'scope id' as we don't need them
353
        ip, port, _, _ = address
354
      else:
355
        ip, port = address
356

    
357
      self.handle_datagram(payload, ip, port)
358

    
359
  def handle_datagram(self, payload, ip, port):
360
    """Handle an already read udp datagram
361

362
    """
363
    raise NotImplementedError
364

    
365
  # this method is overriding an asyncore.dispatcher method
366
  def writable(self):
367
    # We should check whether we can write to the socket only if we have
368
    # something scheduled to be written
369
    return bool(self._out_queue)
370

    
371
  # this method is overriding an asyncore.dispatcher method
372
  def handle_write(self):
373
    if not self._out_queue:
374
      logging.error("handle_write called with empty output queue")
375
      return
376
    (ip, port, payload) = self._out_queue[0]
377
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
378
    self._out_queue.pop(0)
379

    
380
  def enqueue_send(self, ip, port, payload):
381
    """Enqueue a datagram to be sent when possible
382

383
    """
384
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
385
      raise errors.UdpDataSizeError("Packet too big: %s > %s" % (len(payload),
386
                                    constants.MAX_UDP_DATA_SIZE))
387
    self._out_queue.append((ip, port, payload))
388

    
389
  def process_next_packet(self, timeout=0):
390
    """Process the next datagram, waiting for it if necessary.
391

392
    @type timeout: float
393
    @param timeout: how long to wait for data
394
    @rtype: boolean
395
    @return: True if some data has been handled, False otherwise
396

397
    """
398
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
399
    if result is not None and result & select.POLLIN:
400
      self.handle_read()
401
      return True
402
    else:
403
      return False
404

    
405

    
406
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
407
  """A way to notify the asyncore loop that something is going on.
408

409
  If an asyncore daemon is multithreaded when a thread tries to push some data
410
  to a socket, the main loop handling asynchronous requests might be sleeping
411
  waiting on a select(). To avoid this it can create an instance of the
412
  AsyncAwaker, which other threads can use to wake it up.
413

414
  """
415
  def __init__(self, signal_fn=None):
416
    """Constructor for AsyncAwaker
417

418
    @type signal_fn: function
419
    @param signal_fn: function to call when awaken
420

421
    """
422
    GanetiBaseAsyncoreDispatcher.__init__(self)
423
    assert signal_fn is None or callable(signal_fn)
424
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
425
                                                          socket.SOCK_STREAM)
426
    self.in_socket.setblocking(0)
427
    self.in_socket.shutdown(socket.SHUT_WR)
428
    self.out_socket.shutdown(socket.SHUT_RD)
429
    self.set_socket(self.in_socket)
430
    self.need_signal = True
431
    self.signal_fn = signal_fn
432
    self.connected = True
433

    
434
  # this method is overriding an asyncore.dispatcher method
435
  def handle_read(self):
436
    utils.IgnoreSignals(self.recv, 4096)
437
    if self.signal_fn:
438
      self.signal_fn()
439
    self.need_signal = True
440

    
441
  # this method is overriding an asyncore.dispatcher method
442
  def close(self):
443
    asyncore.dispatcher.close(self)
444
    self.out_socket.close()
445

    
446
  def signal(self):
447
    """Signal the asyncore main loop.
448

449
    Any data we send here will be ignored, but it will cause the select() call
450
    to return.
451

452
    """
453
    # Yes, there is a race condition here. No, we don't care, at worst we're
454
    # sending more than one wakeup token, which doesn't harm at all.
455
    if self.need_signal:
456
      self.need_signal = False
457
      self.out_socket.send(chr(0))
458

    
459

    
460
class _ShutdownCheck:
461
  """Logic for L{Mainloop} shutdown.
462

463
  """
464
  def __init__(self, fn):
465
    """Initializes this class.
466

467
    @type fn: callable
468
    @param fn: Function returning C{None} if mainloop can be stopped or a
469
      duration in seconds after which the function should be called again
470
    @see: L{Mainloop.Run}
471

472
    """
473
    assert callable(fn)
474

    
475
    self._fn = fn
476
    self._defer = None
477

    
478
  def CanShutdown(self):
479
    """Checks whether mainloop can be stopped.
480

481
    @rtype: bool
482

483
    """
484
    if self._defer and self._defer.Remaining() > 0:
485
      # A deferred check has already been scheduled
486
      return False
487

    
488
    # Ask mainloop driver whether we can stop or should check again
489
    timeout = self._fn()
490

    
491
    if timeout is None:
492
      # Yes, can stop mainloop
493
      return True
494

    
495
    # Schedule another check in the future
496
    self._defer = utils.RunningTimeout(timeout, True)
497

    
498
    return False
499

    
500

    
501
class Mainloop(object):
502
  """Generic mainloop for daemons
503

504
  @ivar scheduler: A sched.scheduler object, which can be used to register
505
    timed events
506

507
  """
508
  _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
509

    
510
  def __init__(self):
511
    """Constructs a new Mainloop instance.
512

513
    """
514
    self._signal_wait = []
515
    self.scheduler = AsyncoreScheduler(time.time)
516

    
517
    # Resolve uid/gids used
518
    runtime.GetEnts()
519

    
520
  @utils.SignalHandled([signal.SIGCHLD])
521
  @utils.SignalHandled([signal.SIGTERM])
522
  @utils.SignalHandled([signal.SIGINT])
523
  def Run(self, shutdown_wait_fn=None, signal_handlers=None):
524
    """Runs the mainloop.
525

526
    @type shutdown_wait_fn: callable
527
    @param shutdown_wait_fn: Function to check whether loop can be terminated;
528
      B{important}: function must be idempotent and must return either None
529
      for shutting down or a timeout for another call
530
    @type signal_handlers: dict
531
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
532

533
    """
534
    assert isinstance(signal_handlers, dict) and \
535
           len(signal_handlers) > 0, \
536
           "Broken SignalHandled decorator"
537

    
538
    # Counter for received signals
539
    shutdown_signals = 0
540

    
541
    # Logic to wait for shutdown
542
    shutdown_waiter = None
543

    
544
    # Start actual main loop
545
    while True:
546
      if shutdown_signals == 1 and shutdown_wait_fn is not None:
547
        if shutdown_waiter is None:
548
          shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)
549

    
550
        # Let mainloop driver decide if we can already abort
551
        if shutdown_waiter.CanShutdown():
552
          break
553

    
554
        # Re-evaluate in a second
555
        timeout = 1.0
556

    
557
      elif shutdown_signals >= 1:
558
        # Abort loop if more than one signal has been sent or no callback has
559
        # been given
560
        break
561

    
562
      else:
563
        # Wait forever on I/O events
564
        timeout = None
565

    
566
      if self.scheduler.empty():
567
        asyncore.loop(count=1, timeout=timeout, use_poll=True)
568
      else:
569
        try:
570
          self.scheduler.run(max_delay=timeout)
571
        except SchedulerBreakout:
572
          pass
573

    
574
      # Check whether a signal was raised
575
      for (sig, handler) in signal_handlers.items():
576
        if handler.called:
577
          self._CallSignalWaiters(sig)
578
          if sig in (signal.SIGTERM, signal.SIGINT):
579
            logging.info("Received signal %s asking for shutdown", sig)
580
            shutdown_signals += 1
581
          handler.Clear()
582

    
583
  def _CallSignalWaiters(self, signum):
584
    """Calls all signal waiters for a certain signal.
585

586
    @type signum: int
587
    @param signum: Signal number
588

589
    """
590
    for owner in self._signal_wait:
591
      owner.OnSignal(signum)
592

    
593
  def RegisterSignal(self, owner):
594
    """Registers a receiver for signal notifications
595

596
    The receiver must support a "OnSignal(self, signum)" function.
597

598
    @type owner: instance
599
    @param owner: Receiver
600

601
    """
602
    self._signal_wait.append(owner)
603

    
604

    
605
def _VerifyDaemonUser(daemon_name):
606
  """Verifies the process uid matches the configured uid.
607

608
  This method verifies that a daemon is started as the user it is
609
  intended to be run
610

611
  @param daemon_name: The name of daemon to be started
612
  @return: A tuple with the first item indicating success or not,
613
           the second item current uid and third with expected uid
614

615
  """
616
  getents = runtime.GetEnts()
617
  running_uid = os.getuid()
618
  daemon_uids = {
619
    constants.MASTERD: getents.masterd_uid,
620
    constants.RAPI: getents.rapi_uid,
621
    constants.NODED: getents.noded_uid,
622
    constants.CONFD: getents.confd_uid,
623
    }
624
  assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name
625

    
626
  return (daemon_uids[daemon_name] == running_uid, running_uid,
627
          daemon_uids[daemon_name])
628

    
629

    
630
def _BeautifyError(err):
631
  """Try to format an error better.
632

633
  Since we're dealing with daemon startup errors, in many cases this
634
  will be due to socket error and such, so we try to format these cases better.
635

636
  @param err: an exception object
637
  @rtype: string
638
  @return: the formatted error description
639

640
  """
641
  try:
642
    if isinstance(err, socket.error):
643
      return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
644
    elif isinstance(err, EnvironmentError):
645
      if err.filename is None:
646
        return "%s (errno=%s)" % (err.strerror, err.errno)
647
      else:
648
        return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
649
                                            err.errno)
650
    else:
651
      return str(err)
652
  except Exception: # pylint: disable=W0703
653
    logging.exception("Error while handling existing error %s", err)
654
    return "%s" % str(err)
655

    
656

    
657
def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
658
  """Handler for SIGHUP.
659

660
  @param reopen_fn: List of callback functions for reopening log files
661

662
  """
663
  logging.info("Reopening log files after receiving SIGHUP")
664

    
665
  for fn in reopen_fn:
666
    if fn:
667
      fn()
668

    
669

    
670
def GenericMain(daemon_name, optionparser,
671
                check_fn, prepare_fn, exec_fn,
672
                multithreaded=False, console_logging=False,
673
                default_ssl_cert=None, default_ssl_key=None,
674
                warn_breach=False):
675
  """Shared main function for daemons.
676

677
  @type daemon_name: string
678
  @param daemon_name: daemon name
679
  @type optionparser: optparse.OptionParser
680
  @param optionparser: initialized optionparser with daemon-specific options
681
                       (common -f -d options will be handled by this module)
682
  @type check_fn: function which accepts (options, args)
683
  @param check_fn: function that checks start conditions and exits if they're
684
                   not met
685
  @type prepare_fn: function which accepts (options, args)
686
  @param prepare_fn: function that is run before forking, or None;
687
      it's result will be passed as the third parameter to exec_fn, or
688
      if None was passed in, we will just pass None to exec_fn
689
  @type exec_fn: function which accepts (options, args, prepare_results)
690
  @param exec_fn: function that's executed with the daemon's pid file held, and
691
                  runs the daemon itself.
692
  @type multithreaded: bool
693
  @param multithreaded: Whether the daemon uses threads
694
  @type console_logging: boolean
695
  @param console_logging: if True, the daemon will fall back to the system
696
                          console if logging fails
697
  @type default_ssl_cert: string
698
  @param default_ssl_cert: Default SSL certificate path
699
  @type default_ssl_key: string
700
  @param default_ssl_key: Default SSL key path
701
  @type warn_breach: bool
702
  @param warn_breach: issue a warning at daemon launch time, before
703
      daemonizing, about the possibility of breaking parameter privacy
704
      invariants through the otherwise helpful debug logging.
705

706
  """
707
  optionparser.add_option("-f", "--foreground", dest="fork",
708
                          help="Don't detach from the current terminal",
709
                          default=True, action="store_false")
710
  optionparser.add_option("-d", "--debug", dest="debug",
711
                          help="Enable some debug messages",
712
                          default=False, action="store_true")
713
  optionparser.add_option("--syslog", dest="syslog",
714
                          help="Enable logging to syslog (except debug"
715
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
716
                          constants.SYSLOG_USAGE,
717
                          default=constants.SYSLOG_USAGE,
718
                          choices=["no", "yes", "only"])
719

    
720
  family = ssconf.SimpleStore().GetPrimaryIPFamily()
721
  # family will default to AF_INET if there is no ssconf file (e.g. when
722
  # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
723
  # <= 2.2 can not be AF_INET6
724
  if daemon_name in constants.DAEMONS_PORTS:
725
    default_bind_address = constants.IP4_ADDRESS_ANY
726
    if family == netutils.IP6Address.family:
727
      default_bind_address = constants.IP6_ADDRESS_ANY
728

    
729
    default_port = netutils.GetDaemonPort(daemon_name)
730

    
731
    # For networked daemons we allow choosing the port and bind address
732
    optionparser.add_option("-p", "--port", dest="port",
733
                            help="Network port (default: %s)" % default_port,
734
                            default=default_port, type="int")
735
    optionparser.add_option("-b", "--bind", dest="bind_address",
736
                            help=("Bind address (default: '%s')" %
737
                                  default_bind_address),
738
                            default=default_bind_address, metavar="ADDRESS")
739
    optionparser.add_option("-i", "--interface", dest="bind_interface",
740
                            help=("Bind interface"), metavar="INTERFACE")
741

    
742
  if default_ssl_key is not None and default_ssl_cert is not None:
743
    optionparser.add_option("--no-ssl", dest="ssl",
744
                            help="Do not secure HTTP protocol with SSL",
745
                            default=True, action="store_false")
746
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
747
                            help=("SSL key path (default: %s)" %
748
                                  default_ssl_key),
749
                            default=default_ssl_key, type="string",
750
                            metavar="SSL_KEY_PATH")
751
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
752
                            help=("SSL certificate path (default: %s)" %
753
                                  default_ssl_cert),
754
                            default=default_ssl_cert, type="string",
755
                            metavar="SSL_CERT_PATH")
756

    
757
  # Disable the use of fork(2) if the daemon uses threads
758
  if multithreaded:
759
    utils.DisableFork()
760

    
761
  options, args = optionparser.parse_args()
762

    
763
  if getattr(options, "bind_interface", None) is not None:
764
    if options.bind_address != default_bind_address:
765
      msg = ("Can't specify both, bind address (%s) and bind interface (%s)" %
766
             (options.bind_address, options.bind_interface))
767
      print >> sys.stderr, msg
768
      sys.exit(constants.EXIT_FAILURE)
769
    interface_ip_addresses = \
770
      netutils.GetInterfaceIpAddresses(options.bind_interface)
771
    if family == netutils.IP6Address.family:
772
      if_addresses = interface_ip_addresses[constants.IP6_VERSION]
773
    else:
774
      if_addresses = interface_ip_addresses[constants.IP4_VERSION]
775
    if len(if_addresses) < 1:
776
      msg = "Failed to find IP for interface %s" % options.bind_interace
777
      print >> sys.stderr, msg
778
      sys.exit(constants.EXIT_FAILURE)
779
    options.bind_address = if_addresses[0]
780

    
781
  if getattr(options, "ssl", False):
782
    ssl_paths = {
783
      "certificate": options.ssl_cert,
784
      "key": options.ssl_key,
785
      }
786

    
787
    for name, path in ssl_paths.iteritems():
788
      if not os.path.isfile(path):
789
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
790
        sys.exit(constants.EXIT_FAILURE)
791

    
792
    # TODO: By initiating http.HttpSslParams here we would only read the files
793
    # once and have a proper validation (isfile returns False on directories)
794
    # at the same time.
795

    
796
  result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
797
  if not result:
798
    msg = ("%s started using wrong user ID (%d), expected %d" %
799
           (daemon_name, running_uid, expected_uid))
800
    print >> sys.stderr, msg
801
    sys.exit(constants.EXIT_FAILURE)
802

    
803
  if check_fn is not None:
804
    check_fn(options, args)
805

    
806
  log_filename = constants.DAEMONS_LOGFILES[daemon_name]
807

    
808
  # node-daemon logging in lib/http/server.py, _HandleServerRequestInner
809
  if options.debug and warn_breach:
810
    sys.stderr.write(constants.DEBUG_MODE_CONFIDENTIALITY_WARNING % daemon_name)
811

    
812
  if options.fork:
813
    utils.CloseFDs()
814
    (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename)
815
  else:
816
    (wpipe, stdio_reopen_fn) = (None, None)
817

    
818
  log_reopen_fn = \
819
    utils.SetupLogging(log_filename, daemon_name,
820
                       debug=options.debug,
821
                       stderr_logging=not options.fork,
822
                       multithreaded=multithreaded,
823
                       syslog=options.syslog,
824
                       console_logging=console_logging)
825

    
826
  # Reopen log file(s) on SIGHUP
827
  signal.signal(signal.SIGHUP,
828
                compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
829

    
830
  try:
831
    utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
832
  except errors.PidFileLockError, err:
833
    print >> sys.stderr, "Error while locking PID file:\n%s" % err
834
    sys.exit(constants.EXIT_FAILURE)
835

    
836
  try:
837
    try:
838
      logging.info("%s daemon startup", daemon_name)
839
      if callable(prepare_fn):
840
        prep_results = prepare_fn(options, args)
841
      else:
842
        prep_results = None
843
    except Exception, err:
844
      utils.WriteErrorToFD(wpipe, _BeautifyError(err))
845
      raise
846

    
847
    if wpipe is not None:
848
      # we're done with the preparation phase, we close the pipe to
849
      # let the parent know it's safe to exit
850
      os.close(wpipe)
851

    
852
    exec_fn(options, args, prep_results)
853
  finally:
854
    utils.RemoveFile(utils.DaemonPidFileName(daemon_name))