Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ f5acf5d9

History | View | Annotate | Download (24.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import asynchat
27
import collections
28
import os
29
import signal
30
import logging
31
import sched
32
import time
33
import socket
34
import select
35
import sys
36

    
37
from ganeti import utils
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import netutils
41
from ganeti import ssconf
42
from ganeti import runtime
43
from ganeti import compat
44

    
45

    
46
class SchedulerBreakout(Exception):
47
  """Exception used to get out of the scheduler loop
48

49
  """
50

    
51

    
52
def AsyncoreDelayFunction(timeout):
53
  """Asyncore-compatible scheduler delay function.
54

55
  This is a delay function for sched that, rather than actually sleeping,
56
  executes asyncore events happening in the meantime.
57

58
  After an event has occurred, rather than returning, it raises a
59
  SchedulerBreakout exception, which will force the current scheduler.run()
60
  invocation to terminate, so that we can also check for signals. The main loop
61
  will then call the scheduler run again, which will allow it to actually
62
  process any due events.
63

64
  This is needed because scheduler.run() doesn't support a count=..., as
65
  asyncore loop, and the scheduler module documents throwing exceptions from
66
  inside the delay function as an allowed usage model.
67

68
  """
69
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
70
  raise SchedulerBreakout()
71

    
72

    
73
class AsyncoreScheduler(sched.scheduler):
74
  """Event scheduler integrated with asyncore
75

76
  """
77
  def __init__(self, timefunc):
78
    """Initializes this class.
79

80
    """
81
    sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
82
    self._max_delay = None
83

    
84
  def run(self, max_delay=None): # pylint: disable=W0221
85
    """Run any pending events.
86

87
    @type max_delay: None or number
88
    @param max_delay: Maximum delay (useful if caller has timeouts running)
89

90
    """
91
    assert self._max_delay is None
92

    
93
    # The delay function used by the scheduler can't be different on each run,
94
    # hence an instance variable must be used.
95
    if max_delay is None:
96
      self._max_delay = None
97
    else:
98
      self._max_delay = utils.RunningTimeout(max_delay, False)
99

    
100
    try:
101
      return sched.scheduler.run(self)
102
    finally:
103
      self._max_delay = None
104

    
105
  def _LimitedDelay(self, duration):
106
    """Custom delay function for C{sched.scheduler}.
107

108
    """
109
    if self._max_delay is None:
110
      timeout = duration
111
    else:
112
      timeout = min(duration, self._max_delay.Remaining())
113

    
114
    return AsyncoreDelayFunction(timeout)
115

    
116

    
117
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
118
  """Base Ganeti Asyncore Dispacher
119

120
  """
121
  # this method is overriding an asyncore.dispatcher method
122
  def handle_error(self):
123
    """Log an error in handling any request, and proceed.
124

125
    """
126
    logging.exception("Error while handling asyncore request")
127

    
128
  # this method is overriding an asyncore.dispatcher method
129
  def writable(self):
130
    """Most of the time we don't want to check for writability.
131

132
    """
133
    return False
134

    
135

    
136
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
137
  """A stream server to use with asyncore.
138

139
  Each request is accepted, and then dispatched to a separate asyncore
140
  dispatcher to handle.
141

142
  """
143

    
144
  _REQUEST_QUEUE_SIZE = 5
145

    
146
  def __init__(self, family, address):
147
    """Constructor for AsyncUnixStreamSocket
148

149
    @type family: integer
150
    @param family: socket family (one of socket.AF_*)
151
    @type address: address family dependent
152
    @param address: address to bind the socket to
153

154
    """
155
    GanetiBaseAsyncoreDispatcher.__init__(self)
156
    self.family = family
157
    self.create_socket(self.family, socket.SOCK_STREAM)
158
    self.set_reuse_addr()
159
    self.bind(address)
160
    self.listen(self._REQUEST_QUEUE_SIZE)
161

    
162
  # this method is overriding an asyncore.dispatcher method
163
  def handle_accept(self):
164
    """Accept a new client connection.
165

166
    Creates a new instance of the handler class, which will use asyncore to
167
    serve the client.
168

169
    """
170
    accept_result = utils.IgnoreSignals(self.accept)
171
    if accept_result is not None:
172
      connected_socket, client_address = accept_result
173
      if self.family == socket.AF_UNIX:
174
        # override the client address, as for unix sockets nothing meaningful
175
        # is passed in from accept anyway
176
        client_address = netutils.GetSocketCredentials(connected_socket)
177
      logging.info("Accepted connection from %s",
178
                   netutils.FormatAddress(client_address, family=self.family))
179
      self.handle_connection(connected_socket, client_address)
180

    
181
  def handle_connection(self, connected_socket, client_address):
182
    """Handle an already accepted connection.
183

184
    """
185
    raise NotImplementedError
186

    
187

    
188
class AsyncTerminatedMessageStream(asynchat.async_chat):
189
  """A terminator separated message stream asyncore module.
190

191
  Handles a stream connection receiving messages terminated by a defined
192
  separator. For each complete message handle_message is called.
193

194
  """
195
  def __init__(self, connected_socket, peer_address, terminator, family,
196
               unhandled_limit):
197
    """AsyncTerminatedMessageStream constructor.
198

199
    @type connected_socket: socket.socket
200
    @param connected_socket: connected stream socket to receive messages from
201
    @param peer_address: family-specific peer address
202
    @type terminator: string
203
    @param terminator: terminator separating messages in the stream
204
    @type family: integer
205
    @param family: socket family
206
    @type unhandled_limit: integer or None
207
    @param unhandled_limit: maximum unanswered messages
208

209
    """
210
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
211
    # using a positional argument rather than a keyword one.
212
    asynchat.async_chat.__init__(self, connected_socket)
213
    self.connected_socket = connected_socket
214
    # on python 2.4 there is no "family" attribute for the socket class
215
    # FIXME: when we move to python 2.5 or above remove the family parameter
216
    #self.family = self.connected_socket.family
217
    self.family = family
218
    self.peer_address = peer_address
219
    self.terminator = terminator
220
    self.unhandled_limit = unhandled_limit
221
    self.set_terminator(terminator)
222
    self.ibuffer = []
223
    self.receive_count = 0
224
    self.send_count = 0
225
    self.oqueue = collections.deque()
226
    self.iqueue = collections.deque()
227

    
228
  # this method is overriding an asynchat.async_chat method
229
  def collect_incoming_data(self, data):
230
    self.ibuffer.append(data)
231

    
232
  def _can_handle_message(self):
233
    return (self.unhandled_limit is None or
234
            (self.receive_count < self.send_count + self.unhandled_limit) and
235
             not self.iqueue)
236

    
237
  # this method is overriding an asynchat.async_chat method
238
  def found_terminator(self):
239
    message = "".join(self.ibuffer)
240
    self.ibuffer = []
241
    message_id = self.receive_count
242
    # We need to increase the receive_count after checking if the message can
243
    # be handled, but before calling handle_message
244
    can_handle = self._can_handle_message()
245
    self.receive_count += 1
246
    if can_handle:
247
      self.handle_message(message, message_id)
248
    else:
249
      self.iqueue.append((message, message_id))
250

    
251
  def handle_message(self, message, message_id):
252
    """Handle a terminated message.
253

254
    @type message: string
255
    @param message: message to handle
256
    @type message_id: integer
257
    @param message_id: stream's message sequence number
258

259
    """
260
    pass
261
    # TODO: move this method to raise NotImplementedError
262
    # raise NotImplementedError
263

    
264
  def send_message(self, message):
265
    """Send a message to the remote peer. This function is thread-safe.
266

267
    @type message: string
268
    @param message: message to send, without the terminator
269

270
    @warning: If calling this function from a thread different than the one
271
    performing the main asyncore loop, remember that you have to wake that one
272
    up.
273

274
    """
275
    # If we just append the message we received to the output queue, this
276
    # function can be safely called by multiple threads at the same time, and
277
    # we don't need locking, since deques are thread safe. handle_write in the
278
    # asyncore thread will handle the next input message if there are any
279
    # enqueued.
280
    self.oqueue.append(message)
281

    
282
  # this method is overriding an asyncore.dispatcher method
283
  def readable(self):
284
    # read from the socket if we can handle the next requests
285
    return self._can_handle_message() and asynchat.async_chat.readable(self)
286

    
287
  # this method is overriding an asyncore.dispatcher method
288
  def writable(self):
289
    # the output queue may become full just after we called writable. This only
290
    # works if we know we'll have something else waking us up from the select,
291
    # in such case, anyway.
292
    return asynchat.async_chat.writable(self) or self.oqueue
293

    
294
  # this method is overriding an asyncore.dispatcher method
295
  def handle_write(self):
296
    if self.oqueue:
297
      # if we have data in the output queue, then send_message was called.
298
      # this means we can process one more message from the input queue, if
299
      # there are any.
300
      data = self.oqueue.popleft()
301
      self.push(data + self.terminator)
302
      self.send_count += 1
303
      if self.iqueue:
304
        self.handle_message(*self.iqueue.popleft())
305
    self.initiate_send()
306

    
307
  def close_log(self):
308
    logging.info("Closing connection from %s",
309
                 netutils.FormatAddress(self.peer_address, family=self.family))
310
    self.close()
311

    
312
  # this method is overriding an asyncore.dispatcher method
313
  def handle_expt(self):
314
    self.close_log()
315

    
316
  # this method is overriding an asyncore.dispatcher method
317
  def handle_error(self):
318
    """Log an error in handling any request, and proceed.
319

320
    """
321
    logging.exception("Error while handling asyncore request")
322
    self.close_log()
323

    
324

    
325
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
326
  """An improved asyncore udp socket.
327

328
  """
329
  def __init__(self, family):
330
    """Constructor for AsyncUDPSocket
331

332
    """
333
    GanetiBaseAsyncoreDispatcher.__init__(self)
334
    self._out_queue = []
335
    self._family = family
336
    self.create_socket(family, socket.SOCK_DGRAM)
337

    
338
  # this method is overriding an asyncore.dispatcher method
339
  def handle_connect(self):
340
    # Python thinks that the first udp message from a source qualifies as a
341
    # "connect" and further ones are part of the same connection. We beg to
342
    # differ and treat all messages equally.
343
    pass
344

    
345
  # this method is overriding an asyncore.dispatcher method
346
  def handle_read(self):
347
    recv_result = utils.IgnoreSignals(self.recvfrom,
348
                                      constants.MAX_UDP_DATA_SIZE)
349
    if recv_result is not None:
350
      payload, address = recv_result
351
      if self._family == socket.AF_INET6:
352
        # we ignore 'flow info' and 'scope id' as we don't need them
353
        ip, port, _, _ = address
354
      else:
355
        ip, port = address
356

    
357
      self.handle_datagram(payload, ip, port)
358

    
359
  def handle_datagram(self, payload, ip, port):
360
    """Handle an already read udp datagram
361

362
    """
363
    raise NotImplementedError
364

    
365
  # this method is overriding an asyncore.dispatcher method
366
  def writable(self):
367
    # We should check whether we can write to the socket only if we have
368
    # something scheduled to be written
369
    return bool(self._out_queue)
370

    
371
  # this method is overriding an asyncore.dispatcher method
372
  def handle_write(self):
373
    if not self._out_queue:
374
      logging.error("handle_write called with empty output queue")
375
      return
376
    (ip, port, payload) = self._out_queue[0]
377
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
378
    self._out_queue.pop(0)
379

    
380
  def enqueue_send(self, ip, port, payload):
381
    """Enqueue a datagram to be sent when possible
382

383
    """
384
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
385
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
386
                                    constants.MAX_UDP_DATA_SIZE))
387
    self._out_queue.append((ip, port, payload))
388

    
389
  def process_next_packet(self, timeout=0):
390
    """Process the next datagram, waiting for it if necessary.
391

392
    @type timeout: float
393
    @param timeout: how long to wait for data
394
    @rtype: boolean
395
    @return: True if some data has been handled, False otherwise
396

397
    """
398
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
399
    if result is not None and result & select.POLLIN:
400
      self.handle_read()
401
      return True
402
    else:
403
      return False
404

    
405

    
406
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
407
  """A way to notify the asyncore loop that something is going on.
408

409
  If an asyncore daemon is multithreaded when a thread tries to push some data
410
  to a socket, the main loop handling asynchronous requests might be sleeping
411
  waiting on a select(). To avoid this it can create an instance of the
412
  AsyncAwaker, which other threads can use to wake it up.
413

414
  """
415
  def __init__(self, signal_fn=None):
416
    """Constructor for AsyncAwaker
417

418
    @type signal_fn: function
419
    @param signal_fn: function to call when awaken
420

421
    """
422
    GanetiBaseAsyncoreDispatcher.__init__(self)
423
    assert signal_fn == None or callable(signal_fn)
424
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
425
                                                          socket.SOCK_STREAM)
426
    self.in_socket.setblocking(0)
427
    self.in_socket.shutdown(socket.SHUT_WR)
428
    self.out_socket.shutdown(socket.SHUT_RD)
429
    self.set_socket(self.in_socket)
430
    self.need_signal = True
431
    self.signal_fn = signal_fn
432
    self.connected = True
433

    
434
  # this method is overriding an asyncore.dispatcher method
435
  def handle_read(self):
436
    utils.IgnoreSignals(self.recv, 4096)
437
    if self.signal_fn:
438
      self.signal_fn()
439
    self.need_signal = True
440

    
441
  # this method is overriding an asyncore.dispatcher method
442
  def close(self):
443
    asyncore.dispatcher.close(self)
444
    self.out_socket.close()
445

    
446
  def signal(self):
447
    """Signal the asyncore main loop.
448

449
    Any data we send here will be ignored, but it will cause the select() call
450
    to return.
451

452
    """
453
    # Yes, there is a race condition here. No, we don't care, at worst we're
454
    # sending more than one wakeup token, which doesn't harm at all.
455
    if self.need_signal:
456
      self.need_signal = False
457
      self.out_socket.send("\0")
458

    
459

    
460
class Mainloop(object):
461
  """Generic mainloop for daemons
462

463
  @ivar scheduler: A sched.scheduler object, which can be used to register
464
    timed events
465

466
  """
467
  def __init__(self):
468
    """Constructs a new Mainloop instance.
469

470
    """
471
    self._signal_wait = []
472
    self.scheduler = AsyncoreScheduler(time.time)
473

    
474
    # Resolve uid/gids used
475
    runtime.GetEnts()
476

    
477
  @utils.SignalHandled([signal.SIGCHLD])
478
  @utils.SignalHandled([signal.SIGTERM])
479
  @utils.SignalHandled([signal.SIGINT])
480
  def Run(self, signal_handlers=None):
481
    """Runs the mainloop.
482

483
    @type signal_handlers: dict
484
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
485

486
    """
487
    assert isinstance(signal_handlers, dict) and \
488
           len(signal_handlers) > 0, \
489
           "Broken SignalHandled decorator"
490

    
491
    # Counter for received signals
492
    shutdown_signals = 0
493

    
494
    # Start actual main loop
495
    while shutdown_signals < 1:
496
      if not self.scheduler.empty():
497
        try:
498
          self.scheduler.run()
499
        except SchedulerBreakout:
500
          pass
501
      else:
502
        asyncore.loop(count=1, use_poll=True)
503

    
504
      # Check whether a signal was raised
505
      for (sig, handler) in signal_handlers.items():
506
        if handler.called:
507
          self._CallSignalWaiters(sig)
508
          if sig in (signal.SIGTERM, signal.SIGINT):
509
            logging.info("Received signal %s asking for shutdown", sig)
510
            shutdown_signals += 1
511
          handler.Clear()
512

    
513
  def _CallSignalWaiters(self, signum):
514
    """Calls all signal waiters for a certain signal.
515

516
    @type signum: int
517
    @param signum: Signal number
518

519
    """
520
    for owner in self._signal_wait:
521
      owner.OnSignal(signum)
522

    
523
  def RegisterSignal(self, owner):
524
    """Registers a receiver for signal notifications
525

526
    The receiver must support a "OnSignal(self, signum)" function.
527

528
    @type owner: instance
529
    @param owner: Receiver
530

531
    """
532
    self._signal_wait.append(owner)
533

    
534

    
535
def _VerifyDaemonUser(daemon_name):
536
  """Verifies the process uid matches the configured uid.
537

538
  This method verifies that a daemon is started as the user it is
539
  intended to be run
540

541
  @param daemon_name: The name of daemon to be started
542
  @return: A tuple with the first item indicating success or not,
543
           the second item current uid and third with expected uid
544

545
  """
546
  getents = runtime.GetEnts()
547
  running_uid = os.getuid()
548
  daemon_uids = {
549
    constants.MASTERD: getents.masterd_uid,
550
    constants.RAPI: getents.rapi_uid,
551
    constants.NODED: getents.noded_uid,
552
    constants.CONFD: getents.confd_uid,
553
    }
554

    
555
  return (daemon_uids[daemon_name] == running_uid, running_uid,
556
          daemon_uids[daemon_name])
557

    
558

    
559
def _BeautifyError(err):
560
  """Try to format an error better.
561

562
  Since we're dealing with daemon startup errors, in many cases this
563
  will be due to socket error and such, so we try to format these cases better.
564

565
  @param err: an exception object
566
  @rtype: string
567
  @return: the formatted error description
568

569
  """
570
  try:
571
    if isinstance(err, socket.error):
572
      return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
573
    elif isinstance(err, EnvironmentError):
574
      if err.filename is None:
575
        return "%s (errno=%s)" % (err.strerror, err.errno)
576
      else:
577
        return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
578
                                            err.errno)
579
    else:
580
      return str(err)
581
  except Exception: # pylint: disable=W0703
582
    logging.exception("Error while handling existing error %s", err)
583
    return "%s" % str(err)
584

    
585

    
586
def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
587
  """Handler for SIGHUP.
588

589
  @param reopen_fn: List of callback functions for reopening log files
590

591
  """
592
  logging.info("Reopening log files after receiving SIGHUP")
593

    
594
  for fn in reopen_fn:
595
    if fn:
596
      fn()
597

    
598

    
599
def GenericMain(daemon_name, optionparser,
600
                check_fn, prepare_fn, exec_fn,
601
                multithreaded=False, console_logging=False,
602
                default_ssl_cert=None, default_ssl_key=None):
603
  """Shared main function for daemons.
604

605
  @type daemon_name: string
606
  @param daemon_name: daemon name
607
  @type optionparser: optparse.OptionParser
608
  @param optionparser: initialized optionparser with daemon-specific options
609
                       (common -f -d options will be handled by this module)
610
  @type check_fn: function which accepts (options, args)
611
  @param check_fn: function that checks start conditions and exits if they're
612
                   not met
613
  @type prepare_fn: function which accepts (options, args)
614
  @param prepare_fn: function that is run before forking, or None;
615
      it's result will be passed as the third parameter to exec_fn, or
616
      if None was passed in, we will just pass None to exec_fn
617
  @type exec_fn: function which accepts (options, args, prepare_results)
618
  @param exec_fn: function that's executed with the daemon's pid file held, and
619
                  runs the daemon itself.
620
  @type multithreaded: bool
621
  @param multithreaded: Whether the daemon uses threads
622
  @type console_logging: boolean
623
  @param console_logging: if True, the daemon will fall back to the system
624
                          console if logging fails
625
  @type default_ssl_cert: string
626
  @param default_ssl_cert: Default SSL certificate path
627
  @type default_ssl_key: string
628
  @param default_ssl_key: Default SSL key path
629

630
  """
631
  optionparser.add_option("-f", "--foreground", dest="fork",
632
                          help="Don't detach from the current terminal",
633
                          default=True, action="store_false")
634
  optionparser.add_option("-d", "--debug", dest="debug",
635
                          help="Enable some debug messages",
636
                          default=False, action="store_true")
637
  optionparser.add_option("--syslog", dest="syslog",
638
                          help="Enable logging to syslog (except debug"
639
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
640
                          constants.SYSLOG_USAGE,
641
                          default=constants.SYSLOG_USAGE,
642
                          choices=["no", "yes", "only"])
643

    
644
  if daemon_name in constants.DAEMONS_PORTS:
645
    default_bind_address = constants.IP4_ADDRESS_ANY
646
    family = ssconf.SimpleStore().GetPrimaryIPFamily()
647
    # family will default to AF_INET if there is no ssconf file (e.g. when
648
    # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
649
    # <= 2.2 can not be AF_INET6
650
    if family == netutils.IP6Address.family:
651
      default_bind_address = constants.IP6_ADDRESS_ANY
652

    
653
    default_port = netutils.GetDaemonPort(daemon_name)
654

    
655
    # For networked daemons we allow choosing the port and bind address
656
    optionparser.add_option("-p", "--port", dest="port",
657
                            help="Network port (default: %s)" % default_port,
658
                            default=default_port, type="int")
659
    optionparser.add_option("-b", "--bind", dest="bind_address",
660
                            help=("Bind address (default: '%s')" %
661
                                  default_bind_address),
662
                            default=default_bind_address, metavar="ADDRESS")
663

    
664
  if default_ssl_key is not None and default_ssl_cert is not None:
665
    optionparser.add_option("--no-ssl", dest="ssl",
666
                            help="Do not secure HTTP protocol with SSL",
667
                            default=True, action="store_false")
668
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
669
                            help=("SSL key path (default: %s)" %
670
                                  default_ssl_key),
671
                            default=default_ssl_key, type="string",
672
                            metavar="SSL_KEY_PATH")
673
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
674
                            help=("SSL certificate path (default: %s)" %
675
                                  default_ssl_cert),
676
                            default=default_ssl_cert, type="string",
677
                            metavar="SSL_CERT_PATH")
678

    
679
  # Disable the use of fork(2) if the daemon uses threads
680
  if multithreaded:
681
    utils.DisableFork()
682

    
683
  options, args = optionparser.parse_args()
684

    
685
  if getattr(options, "ssl", False):
686
    ssl_paths = {
687
      "certificate": options.ssl_cert,
688
      "key": options.ssl_key,
689
      }
690

    
691
    for name, path in ssl_paths.iteritems():
692
      if not os.path.isfile(path):
693
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
694
        sys.exit(constants.EXIT_FAILURE)
695

    
696
    # TODO: By initiating http.HttpSslParams here we would only read the files
697
    # once and have a proper validation (isfile returns False on directories)
698
    # at the same time.
699

    
700
  result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
701
  if not result:
702
    msg = ("%s started using wrong user ID (%d), expected %d" %
703
           (daemon_name, running_uid, expected_uid))
704
    print >> sys.stderr, msg
705
    sys.exit(constants.EXIT_FAILURE)
706

    
707
  if check_fn is not None:
708
    check_fn(options, args)
709

    
710
  if options.fork:
711
    utils.CloseFDs()
712
    (wpipe, stdio_reopen_fn) = \
713
      utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
714
  else:
715
    (wpipe, stdio_reopen_fn) = (None, None)
716

    
717
  log_reopen_fn = \
718
    utils.SetupLogging(constants.DAEMONS_LOGFILES[daemon_name], daemon_name,
719
                       debug=options.debug,
720
                       stderr_logging=not options.fork,
721
                       multithreaded=multithreaded,
722
                       syslog=options.syslog,
723
                       console_logging=console_logging)
724

    
725
  # Reopen log file(s) on SIGHUP
726
  signal.signal(signal.SIGHUP,
727
                compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
728

    
729
  utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
730
  try:
731
    try:
732
      logging.info("%s daemon startup", daemon_name)
733
      if callable(prepare_fn):
734
        prep_results = prepare_fn(options, args)
735
      else:
736
        prep_results = None
737
    except Exception, err:
738
      utils.WriteErrorToFD(wpipe, _BeautifyError(err))
739
      raise
740

    
741
    if wpipe is not None:
742
      # we're done with the preparation phase, we close the pipe to
743
      # let the parent know it's safe to exit
744
      os.close(wpipe)
745

    
746
    exec_fn(options, args, prep_results)
747
  finally:
748
    utils.RemoveFile(utils.DaemonPidFileName(daemon_name))