Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 3329f4de

History | View | Annotate | Download (26.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import asynchat
27
import collections
28
import os
29
import signal
30
import logging
31
import sched
32
import time
33
import socket
34
import select
35
import sys
36

    
37
from ganeti import utils
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import netutils
41
from ganeti import ssconf
42
from ganeti import runtime
43
from ganeti import compat
44
from ganeti import pathutils
45

    
46

    
47
class SchedulerBreakout(Exception):
48
  """Exception used to get out of the scheduler loop
49

50
  """
51

    
52

    
53
def AsyncoreDelayFunction(timeout):
54
  """Asyncore-compatible scheduler delay function.
55

56
  This is a delay function for sched that, rather than actually sleeping,
57
  executes asyncore events happening in the meantime.
58

59
  After an event has occurred, rather than returning, it raises a
60
  SchedulerBreakout exception, which will force the current scheduler.run()
61
  invocation to terminate, so that we can also check for signals. The main loop
62
  will then call the scheduler run again, which will allow it to actually
63
  process any due events.
64

65
  This is needed because scheduler.run() doesn't support a count=..., as
66
  asyncore loop, and the scheduler module documents throwing exceptions from
67
  inside the delay function as an allowed usage model.
68

69
  """
70
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
71
  raise SchedulerBreakout()
72

    
73

    
74
class AsyncoreScheduler(sched.scheduler):
75
  """Event scheduler integrated with asyncore
76

77
  """
78
  def __init__(self, timefunc):
79
    """Initializes this class.
80

81
    """
82
    sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
83
    self._max_delay = None
84

    
85
  def run(self, max_delay=None): # pylint: disable=W0221
86
    """Run any pending events.
87

88
    @type max_delay: None or number
89
    @param max_delay: Maximum delay (useful if caller has timeouts running)
90

91
    """
92
    assert self._max_delay is None
93

    
94
    # The delay function used by the scheduler can't be different on each run,
95
    # hence an instance variable must be used.
96
    if max_delay is None:
97
      self._max_delay = None
98
    else:
99
      self._max_delay = utils.RunningTimeout(max_delay, False)
100

    
101
    try:
102
      return sched.scheduler.run(self)
103
    finally:
104
      self._max_delay = None
105

    
106
  def _LimitedDelay(self, duration):
107
    """Custom delay function for C{sched.scheduler}.
108

109
    """
110
    if self._max_delay is None:
111
      timeout = duration
112
    else:
113
      timeout = min(duration, self._max_delay.Remaining())
114

    
115
    return AsyncoreDelayFunction(timeout)
116

    
117

    
118
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
119
  """Base Ganeti Asyncore Dispacher
120

121
  """
122
  # this method is overriding an asyncore.dispatcher method
123
  def handle_error(self):
124
    """Log an error in handling any request, and proceed.
125

126
    """
127
    logging.exception("Error while handling asyncore request")
128

    
129
  # this method is overriding an asyncore.dispatcher method
130
  def writable(self):
131
    """Most of the time we don't want to check for writability.
132

133
    """
134
    return False
135

    
136

    
137
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
138
  """A stream server to use with asyncore.
139

140
  Each request is accepted, and then dispatched to a separate asyncore
141
  dispatcher to handle.
142

143
  """
144

    
145
  _REQUEST_QUEUE_SIZE = 5
146

    
147
  def __init__(self, family, address):
148
    """Constructor for AsyncUnixStreamSocket
149

150
    @type family: integer
151
    @param family: socket family (one of socket.AF_*)
152
    @type address: address family dependent
153
    @param address: address to bind the socket to
154

155
    """
156
    GanetiBaseAsyncoreDispatcher.__init__(self)
157
    self.family = family
158
    self.create_socket(self.family, socket.SOCK_STREAM)
159
    self.set_reuse_addr()
160
    self.bind(address)
161
    self.listen(self._REQUEST_QUEUE_SIZE)
162

    
163
  # this method is overriding an asyncore.dispatcher method
164
  def handle_accept(self):
165
    """Accept a new client connection.
166

167
    Creates a new instance of the handler class, which will use asyncore to
168
    serve the client.
169

170
    """
171
    accept_result = utils.IgnoreSignals(self.accept)
172
    if accept_result is not None:
173
      connected_socket, client_address = accept_result
174
      if self.family == socket.AF_UNIX:
175
        # override the client address, as for unix sockets nothing meaningful
176
        # is passed in from accept anyway
177
        client_address = netutils.GetSocketCredentials(connected_socket)
178
      logging.info("Accepted connection from %s",
179
                   netutils.FormatAddress(client_address, family=self.family))
180
      self.handle_connection(connected_socket, client_address)
181

    
182
  def handle_connection(self, connected_socket, client_address):
183
    """Handle an already accepted connection.
184

185
    """
186
    raise NotImplementedError
187

    
188

    
189
class AsyncTerminatedMessageStream(asynchat.async_chat):
190
  """A terminator separated message stream asyncore module.
191

192
  Handles a stream connection receiving messages terminated by a defined
193
  separator. For each complete message handle_message is called.
194

195
  """
196
  def __init__(self, connected_socket, peer_address, terminator, family,
197
               unhandled_limit):
198
    """AsyncTerminatedMessageStream constructor.
199

200
    @type connected_socket: socket.socket
201
    @param connected_socket: connected stream socket to receive messages from
202
    @param peer_address: family-specific peer address
203
    @type terminator: string
204
    @param terminator: terminator separating messages in the stream
205
    @type family: integer
206
    @param family: socket family
207
    @type unhandled_limit: integer or None
208
    @param unhandled_limit: maximum unanswered messages
209

210
    """
211
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
212
    # using a positional argument rather than a keyword one.
213
    asynchat.async_chat.__init__(self, connected_socket)
214
    self.connected_socket = connected_socket
215
    # on python 2.4 there is no "family" attribute for the socket class
216
    # FIXME: when we move to python 2.5 or above remove the family parameter
217
    #self.family = self.connected_socket.family
218
    self.family = family
219
    self.peer_address = peer_address
220
    self.terminator = terminator
221
    self.unhandled_limit = unhandled_limit
222
    self.set_terminator(terminator)
223
    self.ibuffer = []
224
    self.receive_count = 0
225
    self.send_count = 0
226
    self.oqueue = collections.deque()
227
    self.iqueue = collections.deque()
228

    
229
  # this method is overriding an asynchat.async_chat method
230
  def collect_incoming_data(self, data):
231
    self.ibuffer.append(data)
232

    
233
  def _can_handle_message(self):
234
    return (self.unhandled_limit is None or
235
            (self.receive_count < self.send_count + self.unhandled_limit) and
236
             not self.iqueue)
237

    
238
  # this method is overriding an asynchat.async_chat method
239
  def found_terminator(self):
240
    message = "".join(self.ibuffer)
241
    self.ibuffer = []
242
    message_id = self.receive_count
243
    # We need to increase the receive_count after checking if the message can
244
    # be handled, but before calling handle_message
245
    can_handle = self._can_handle_message()
246
    self.receive_count += 1
247
    if can_handle:
248
      self.handle_message(message, message_id)
249
    else:
250
      self.iqueue.append((message, message_id))
251

    
252
  def handle_message(self, message, message_id):
253
    """Handle a terminated message.
254

255
    @type message: string
256
    @param message: message to handle
257
    @type message_id: integer
258
    @param message_id: stream's message sequence number
259

260
    """
261
    pass
262
    # TODO: move this method to raise NotImplementedError
263
    # raise NotImplementedError
264

    
265
  def send_message(self, message):
266
    """Send a message to the remote peer. This function is thread-safe.
267

268
    @type message: string
269
    @param message: message to send, without the terminator
270

271
    @warning: If calling this function from a thread different than the one
272
    performing the main asyncore loop, remember that you have to wake that one
273
    up.
274

275
    """
276
    # If we just append the message we received to the output queue, this
277
    # function can be safely called by multiple threads at the same time, and
278
    # we don't need locking, since deques are thread safe. handle_write in the
279
    # asyncore thread will handle the next input message if there are any
280
    # enqueued.
281
    self.oqueue.append(message)
282

    
283
  # this method is overriding an asyncore.dispatcher method
284
  def readable(self):
285
    # read from the socket if we can handle the next requests
286
    return self._can_handle_message() and asynchat.async_chat.readable(self)
287

    
288
  # this method is overriding an asyncore.dispatcher method
289
  def writable(self):
290
    # the output queue may become full just after we called writable. This only
291
    # works if we know we'll have something else waking us up from the select,
292
    # in such case, anyway.
293
    return asynchat.async_chat.writable(self) or self.oqueue
294

    
295
  # this method is overriding an asyncore.dispatcher method
296
  def handle_write(self):
297
    if self.oqueue:
298
      # if we have data in the output queue, then send_message was called.
299
      # this means we can process one more message from the input queue, if
300
      # there are any.
301
      data = self.oqueue.popleft()
302
      self.push(data + self.terminator)
303
      self.send_count += 1
304
      if self.iqueue:
305
        self.handle_message(*self.iqueue.popleft())
306
    self.initiate_send()
307

    
308
  def close_log(self):
309
    logging.info("Closing connection from %s",
310
                 netutils.FormatAddress(self.peer_address, family=self.family))
311
    self.close()
312

    
313
  # this method is overriding an asyncore.dispatcher method
314
  def handle_expt(self):
315
    self.close_log()
316

    
317
  # this method is overriding an asyncore.dispatcher method
318
  def handle_error(self):
319
    """Log an error in handling any request, and proceed.
320

321
    """
322
    logging.exception("Error while handling asyncore request")
323
    self.close_log()
324

    
325

    
326
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
327
  """An improved asyncore udp socket.
328

329
  """
330
  def __init__(self, family):
331
    """Constructor for AsyncUDPSocket
332

333
    """
334
    GanetiBaseAsyncoreDispatcher.__init__(self)
335
    self._out_queue = []
336
    self._family = family
337
    self.create_socket(family, socket.SOCK_DGRAM)
338

    
339
  # this method is overriding an asyncore.dispatcher method
340
  def handle_connect(self):
341
    # Python thinks that the first udp message from a source qualifies as a
342
    # "connect" and further ones are part of the same connection. We beg to
343
    # differ and treat all messages equally.
344
    pass
345

    
346
  # this method is overriding an asyncore.dispatcher method
347
  def handle_read(self):
348
    recv_result = utils.IgnoreSignals(self.recvfrom,
349
                                      constants.MAX_UDP_DATA_SIZE)
350
    if recv_result is not None:
351
      payload, address = recv_result
352
      if self._family == socket.AF_INET6:
353
        # we ignore 'flow info' and 'scope id' as we don't need them
354
        ip, port, _, _ = address
355
      else:
356
        ip, port = address
357

    
358
      self.handle_datagram(payload, ip, port)
359

    
360
  def handle_datagram(self, payload, ip, port):
361
    """Handle an already read udp datagram
362

363
    """
364
    raise NotImplementedError
365

    
366
  # this method is overriding an asyncore.dispatcher method
367
  def writable(self):
368
    # We should check whether we can write to the socket only if we have
369
    # something scheduled to be written
370
    return bool(self._out_queue)
371

    
372
  # this method is overriding an asyncore.dispatcher method
373
  def handle_write(self):
374
    if not self._out_queue:
375
      logging.error("handle_write called with empty output queue")
376
      return
377
    (ip, port, payload) = self._out_queue[0]
378
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
379
    self._out_queue.pop(0)
380

    
381
  def enqueue_send(self, ip, port, payload):
382
    """Enqueue a datagram to be sent when possible
383

384
    """
385
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
386
      raise errors.UdpDataSizeError("Packet too big: %s > %s" % (len(payload),
387
                                    constants.MAX_UDP_DATA_SIZE))
388
    self._out_queue.append((ip, port, payload))
389

    
390
  def process_next_packet(self, timeout=0):
391
    """Process the next datagram, waiting for it if necessary.
392

393
    @type timeout: float
394
    @param timeout: how long to wait for data
395
    @rtype: boolean
396
    @return: True if some data has been handled, False otherwise
397

398
    """
399
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
400
    if result is not None and result & select.POLLIN:
401
      self.handle_read()
402
      return True
403
    else:
404
      return False
405

    
406

    
407
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
408
  """A way to notify the asyncore loop that something is going on.
409

410
  If an asyncore daemon is multithreaded when a thread tries to push some data
411
  to a socket, the main loop handling asynchronous requests might be sleeping
412
  waiting on a select(). To avoid this it can create an instance of the
413
  AsyncAwaker, which other threads can use to wake it up.
414

415
  """
416
  def __init__(self, signal_fn=None):
417
    """Constructor for AsyncAwaker
418

419
    @type signal_fn: function
420
    @param signal_fn: function to call when awaken
421

422
    """
423
    GanetiBaseAsyncoreDispatcher.__init__(self)
424
    assert signal_fn is None or callable(signal_fn)
425
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
426
                                                          socket.SOCK_STREAM)
427
    self.in_socket.setblocking(0)
428
    self.in_socket.shutdown(socket.SHUT_WR)
429
    self.out_socket.shutdown(socket.SHUT_RD)
430
    self.set_socket(self.in_socket)
431
    self.need_signal = True
432
    self.signal_fn = signal_fn
433
    self.connected = True
434

    
435
  # this method is overriding an asyncore.dispatcher method
436
  def handle_read(self):
437
    utils.IgnoreSignals(self.recv, 4096)
438
    if self.signal_fn:
439
      self.signal_fn()
440
    self.need_signal = True
441

    
442
  # this method is overriding an asyncore.dispatcher method
443
  def close(self):
444
    asyncore.dispatcher.close(self)
445
    self.out_socket.close()
446

    
447
  def signal(self):
448
    """Signal the asyncore main loop.
449

450
    Any data we send here will be ignored, but it will cause the select() call
451
    to return.
452

453
    """
454
    # Yes, there is a race condition here. No, we don't care, at worst we're
455
    # sending more than one wakeup token, which doesn't harm at all.
456
    if self.need_signal:
457
      self.need_signal = False
458
      self.out_socket.send("\0")
459

    
460

    
461
class _ShutdownCheck:
462
  """Logic for L{Mainloop} shutdown.
463

464
  """
465
  def __init__(self, fn):
466
    """Initializes this class.
467

468
    @type fn: callable
469
    @param fn: Function returning C{None} if mainloop can be stopped or a
470
      duration in seconds after which the function should be called again
471
    @see: L{Mainloop.Run}
472

473
    """
474
    assert callable(fn)
475

    
476
    self._fn = fn
477
    self._defer = None
478

    
479
  def CanShutdown(self):
480
    """Checks whether mainloop can be stopped.
481

482
    @rtype: bool
483

484
    """
485
    if self._defer and self._defer.Remaining() > 0:
486
      # A deferred check has already been scheduled
487
      return False
488

    
489
    # Ask mainloop driver whether we can stop or should check again
490
    timeout = self._fn()
491

    
492
    if timeout is None:
493
      # Yes, can stop mainloop
494
      return True
495

    
496
    # Schedule another check in the future
497
    self._defer = utils.RunningTimeout(timeout, True)
498

    
499
    return False
500

    
501

    
502
class Mainloop(object):
503
  """Generic mainloop for daemons
504

505
  @ivar scheduler: A sched.scheduler object, which can be used to register
506
    timed events
507

508
  """
509
  _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
510

    
511
  def __init__(self):
512
    """Constructs a new Mainloop instance.
513

514
    """
515
    self._signal_wait = []
516
    self.scheduler = AsyncoreScheduler(time.time)
517

    
518
    # Resolve uid/gids used
519
    runtime.GetEnts()
520

    
521
  @utils.SignalHandled([signal.SIGCHLD])
522
  @utils.SignalHandled([signal.SIGTERM])
523
  @utils.SignalHandled([signal.SIGINT])
524
  def Run(self, shutdown_wait_fn=None, signal_handlers=None):
525
    """Runs the mainloop.
526

527
    @type shutdown_wait_fn: callable
528
    @param shutdown_wait_fn: Function to check whether loop can be terminated;
529
      B{important}: function must be idempotent and must return either None
530
      for shutting down or a timeout for another call
531
    @type signal_handlers: dict
532
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
533

534
    """
535
    assert isinstance(signal_handlers, dict) and \
536
           len(signal_handlers) > 0, \
537
           "Broken SignalHandled decorator"
538

    
539
    # Counter for received signals
540
    shutdown_signals = 0
541

    
542
    # Logic to wait for shutdown
543
    shutdown_waiter = None
544

    
545
    # Start actual main loop
546
    while True:
547
      if shutdown_signals == 1 and shutdown_wait_fn is not None:
548
        if shutdown_waiter is None:
549
          shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)
550

    
551
        # Let mainloop driver decide if we can already abort
552
        if shutdown_waiter.CanShutdown():
553
          break
554

    
555
        # Re-evaluate in a second
556
        timeout = 1.0
557

    
558
      elif shutdown_signals >= 1:
559
        # Abort loop if more than one signal has been sent or no callback has
560
        # been given
561
        break
562

    
563
      else:
564
        # Wait forever on I/O events
565
        timeout = None
566

    
567
      if self.scheduler.empty():
568
        asyncore.loop(count=1, timeout=timeout, use_poll=True)
569
      else:
570
        try:
571
          self.scheduler.run(max_delay=timeout)
572
        except SchedulerBreakout:
573
          pass
574

    
575
      # Check whether a signal was raised
576
      for (sig, handler) in signal_handlers.items():
577
        if handler.called:
578
          self._CallSignalWaiters(sig)
579
          if sig in (signal.SIGTERM, signal.SIGINT):
580
            logging.info("Received signal %s asking for shutdown", sig)
581
            shutdown_signals += 1
582
          handler.Clear()
583

    
584
  def _CallSignalWaiters(self, signum):
585
    """Calls all signal waiters for a certain signal.
586

587
    @type signum: int
588
    @param signum: Signal number
589

590
    """
591
    for owner in self._signal_wait:
592
      owner.OnSignal(signum)
593

    
594
  def RegisterSignal(self, owner):
595
    """Registers a receiver for signal notifications
596

597
    The receiver must support a "OnSignal(self, signum)" function.
598

599
    @type owner: instance
600
    @param owner: Receiver
601

602
    """
603
    self._signal_wait.append(owner)
604

    
605

    
606
def _VerifyDaemonUser(daemon_name):
607
  """Verifies the process uid matches the configured uid.
608

609
  This method verifies that a daemon is started as the user it is
610
  intended to be run
611

612
  @param daemon_name: The name of daemon to be started
613
  @return: A tuple with the first item indicating success or not,
614
           the second item current uid and third with expected uid
615

616
  """
617
  getents = runtime.GetEnts()
618
  running_uid = os.getuid()
619
  daemon_uids = {
620
    constants.MASTERD: getents.masterd_uid,
621
    constants.RAPI: getents.rapi_uid,
622
    constants.NODED: getents.noded_uid,
623
    constants.CONFD: getents.confd_uid,
624
    }
625
  assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name
626

    
627
  return (daemon_uids[daemon_name] == running_uid, running_uid,
628
          daemon_uids[daemon_name])
629

    
630

    
631
def _BeautifyError(err):
632
  """Try to format an error better.
633

634
  Since we're dealing with daemon startup errors, in many cases this
635
  will be due to socket error and such, so we try to format these cases better.
636

637
  @param err: an exception object
638
  @rtype: string
639
  @return: the formatted error description
640

641
  """
642
  try:
643
    if isinstance(err, socket.error):
644
      return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
645
    elif isinstance(err, EnvironmentError):
646
      if err.filename is None:
647
        return "%s (errno=%s)" % (err.strerror, err.errno)
648
      else:
649
        return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
650
                                            err.errno)
651
    else:
652
      return str(err)
653
  except Exception: # pylint: disable=W0703
654
    logging.exception("Error while handling existing error %s", err)
655
    return "%s" % str(err)
656

    
657

    
658
def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
659
  """Handler for SIGHUP.
660

661
  @param reopen_fn: List of callback functions for reopening log files
662

663
  """
664
  logging.info("Reopening log files after receiving SIGHUP")
665

    
666
  for fn in reopen_fn:
667
    if fn:
668
      fn()
669

    
670

    
671
def GenericMain(daemon_name, optionparser,
672
                check_fn, prepare_fn, exec_fn,
673
                multithreaded=False, console_logging=False,
674
                default_ssl_cert=None, default_ssl_key=None):
675
  """Shared main function for daemons.
676

677
  @type daemon_name: string
678
  @param daemon_name: daemon name
679
  @type optionparser: optparse.OptionParser
680
  @param optionparser: initialized optionparser with daemon-specific options
681
                       (common -f -d options will be handled by this module)
682
  @type check_fn: function which accepts (options, args)
683
  @param check_fn: function that checks start conditions and exits if they're
684
                   not met
685
  @type prepare_fn: function which accepts (options, args)
686
  @param prepare_fn: function that is run before forking, or None;
687
      it's result will be passed as the third parameter to exec_fn, or
688
      if None was passed in, we will just pass None to exec_fn
689
  @type exec_fn: function which accepts (options, args, prepare_results)
690
  @param exec_fn: function that's executed with the daemon's pid file held, and
691
                  runs the daemon itself.
692
  @type multithreaded: bool
693
  @param multithreaded: Whether the daemon uses threads
694
  @type console_logging: boolean
695
  @param console_logging: if True, the daemon will fall back to the system
696
                          console if logging fails
697
  @type default_ssl_cert: string
698
  @param default_ssl_cert: Default SSL certificate path
699
  @type default_ssl_key: string
700
  @param default_ssl_key: Default SSL key path
701

702
  """
703
  optionparser.add_option("-f", "--foreground", dest="fork",
704
                          help="Don't detach from the current terminal",
705
                          default=True, action="store_false")
706
  optionparser.add_option("-d", "--debug", dest="debug",
707
                          help="Enable some debug messages",
708
                          default=False, action="store_true")
709
  optionparser.add_option("--syslog", dest="syslog",
710
                          help="Enable logging to syslog (except debug"
711
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
712
                          constants.SYSLOG_USAGE,
713
                          default=constants.SYSLOG_USAGE,
714
                          choices=["no", "yes", "only"])
715

    
716
  if daemon_name in constants.DAEMONS_PORTS:
717
    default_bind_address = constants.IP4_ADDRESS_ANY
718
    family = ssconf.SimpleStore().GetPrimaryIPFamily()
719
    # family will default to AF_INET if there is no ssconf file (e.g. when
720
    # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
721
    # <= 2.2 can not be AF_INET6
722
    if family == netutils.IP6Address.family:
723
      default_bind_address = constants.IP6_ADDRESS_ANY
724

    
725
    default_port = netutils.GetDaemonPort(daemon_name)
726

    
727
    # For networked daemons we allow choosing the port and bind address
728
    optionparser.add_option("-p", "--port", dest="port",
729
                            help="Network port (default: %s)" % default_port,
730
                            default=default_port, type="int")
731
    optionparser.add_option("-b", "--bind", dest="bind_address",
732
                            help=("Bind address (default: '%s')" %
733
                                  default_bind_address),
734
                            default=default_bind_address, metavar="ADDRESS")
735

    
736
  if default_ssl_key is not None and default_ssl_cert is not None:
737
    optionparser.add_option("--no-ssl", dest="ssl",
738
                            help="Do not secure HTTP protocol with SSL",
739
                            default=True, action="store_false")
740
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
741
                            help=("SSL key path (default: %s)" %
742
                                  default_ssl_key),
743
                            default=default_ssl_key, type="string",
744
                            metavar="SSL_KEY_PATH")
745
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
746
                            help=("SSL certificate path (default: %s)" %
747
                                  default_ssl_cert),
748
                            default=default_ssl_cert, type="string",
749
                            metavar="SSL_CERT_PATH")
750

    
751
  # Disable the use of fork(2) if the daemon uses threads
752
  if multithreaded:
753
    utils.DisableFork()
754

    
755
  options, args = optionparser.parse_args()
756

    
757
  if getattr(options, "ssl", False):
758
    ssl_paths = {
759
      "certificate": options.ssl_cert,
760
      "key": options.ssl_key,
761
      }
762

    
763
    for name, path in ssl_paths.iteritems():
764
      if not os.path.isfile(path):
765
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
766
        sys.exit(constants.EXIT_FAILURE)
767

    
768
    # TODO: By initiating http.HttpSslParams here we would only read the files
769
    # once and have a proper validation (isfile returns False on directories)
770
    # at the same time.
771

    
772
  result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
773
  if not result:
774
    msg = ("%s started using wrong user ID (%d), expected %d" %
775
           (daemon_name, running_uid, expected_uid))
776
    print >> sys.stderr, msg
777
    sys.exit(constants.EXIT_FAILURE)
778

    
779
  if check_fn is not None:
780
    check_fn(options, args)
781

    
782
  log_filename = pathutils.GetLogFilename(daemon_name)
783

    
784
  if options.fork:
785
    utils.CloseFDs()
786
    (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename)
787
  else:
788
    (wpipe, stdio_reopen_fn) = (None, None)
789

    
790
  log_reopen_fn = \
791
    utils.SetupLogging(log_filename, daemon_name,
792
                       debug=options.debug,
793
                       stderr_logging=not options.fork,
794
                       multithreaded=multithreaded,
795
                       syslog=options.syslog,
796
                       console_logging=console_logging)
797

    
798
  # Reopen log file(s) on SIGHUP
799
  signal.signal(signal.SIGHUP,
800
                compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
801

    
802
  try:
803
    utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
804
  except errors.PidFileLockError, err:
805
    print >> sys.stderr, "Error while locking PID file:\n%s" % err
806
    sys.exit(constants.EXIT_FAILURE)
807

    
808
  try:
809
    try:
810
      logging.info("%s daemon startup", daemon_name)
811
      if callable(prepare_fn):
812
        prep_results = prepare_fn(options, args)
813
      else:
814
        prep_results = None
815
    except Exception, err:
816
      utils.WriteErrorToFD(wpipe, _BeautifyError(err))
817
      raise
818

    
819
    if wpipe is not None:
820
      # we're done with the preparation phase, we close the pipe to
821
      # let the parent know it's safe to exit
822
      os.close(wpipe)
823

    
824
    exec_fn(options, args, prep_results)
825
  finally:
826
    utils.RemoveFile(utils.DaemonPidFileName(daemon_name))