Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ e0545ee9

History | View | Annotate | Download (23.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import asynchat
27
import collections
28
import os
29
import signal
30
import logging
31
import sched
32
import time
33
import socket
34
import select
35
import sys
36

    
37
from ganeti import utils
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import netutils
41
from ganeti import ssconf
42
from ganeti import runtime
43
from ganeti import compat
44

    
45

    
46
class SchedulerBreakout(Exception):
47
  """Exception used to get out of the scheduler loop
48

49
  """
50

    
51

    
52
def AsyncoreDelayFunction(timeout):
53
  """Asyncore-compatible scheduler delay function.
54

55
  This is a delay function for sched that, rather than actually sleeping,
56
  executes asyncore events happening in the meantime.
57

58
  After an event has occurred, rather than returning, it raises a
59
  SchedulerBreakout exception, which will force the current scheduler.run()
60
  invocation to terminate, so that we can also check for signals. The main loop
61
  will then call the scheduler run again, which will allow it to actually
62
  process any due events.
63

64
  This is needed because scheduler.run() doesn't support a count=..., as
65
  asyncore loop, and the scheduler module documents throwing exceptions from
66
  inside the delay function as an allowed usage model.
67

68
  """
69
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
70
  raise SchedulerBreakout()
71

    
72

    
73
class AsyncoreScheduler(sched.scheduler):
74
  """Event scheduler integrated with asyncore
75

76
  """
77
  def __init__(self, timefunc):
78
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
79

    
80

    
81
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
82
  """Base Ganeti Asyncore Dispacher
83

84
  """
85
  # this method is overriding an asyncore.dispatcher method
86
  def handle_error(self):
87
    """Log an error in handling any request, and proceed.
88

89
    """
90
    logging.exception("Error while handling asyncore request")
91

    
92
  # this method is overriding an asyncore.dispatcher method
93
  def writable(self):
94
    """Most of the time we don't want to check for writability.
95

96
    """
97
    return False
98

    
99

    
100
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
101
  """A stream server to use with asyncore.
102

103
  Each request is accepted, and then dispatched to a separate asyncore
104
  dispatcher to handle.
105

106
  """
107

    
108
  _REQUEST_QUEUE_SIZE = 5
109

    
110
  def __init__(self, family, address):
111
    """Constructor for AsyncUnixStreamSocket
112

113
    @type family: integer
114
    @param family: socket family (one of socket.AF_*)
115
    @type address: address family dependent
116
    @param address: address to bind the socket to
117

118
    """
119
    GanetiBaseAsyncoreDispatcher.__init__(self)
120
    self.family = family
121
    self.create_socket(self.family, socket.SOCK_STREAM)
122
    self.set_reuse_addr()
123
    self.bind(address)
124
    self.listen(self._REQUEST_QUEUE_SIZE)
125

    
126
  # this method is overriding an asyncore.dispatcher method
127
  def handle_accept(self):
128
    """Accept a new client connection.
129

130
    Creates a new instance of the handler class, which will use asyncore to
131
    serve the client.
132

133
    """
134
    accept_result = utils.IgnoreSignals(self.accept)
135
    if accept_result is not None:
136
      connected_socket, client_address = accept_result
137
      if self.family == socket.AF_UNIX:
138
        # override the client address, as for unix sockets nothing meaningful
139
        # is passed in from accept anyway
140
        client_address = netutils.GetSocketCredentials(connected_socket)
141
      logging.info("Accepted connection from %s",
142
                   netutils.FormatAddress(client_address, family=self.family))
143
      self.handle_connection(connected_socket, client_address)
144

    
145
  def handle_connection(self, connected_socket, client_address):
146
    """Handle an already accepted connection.
147

148
    """
149
    raise NotImplementedError
150

    
151

    
152
class AsyncTerminatedMessageStream(asynchat.async_chat):
153
  """A terminator separated message stream asyncore module.
154

155
  Handles a stream connection receiving messages terminated by a defined
156
  separator. For each complete message handle_message is called.
157

158
  """
159
  def __init__(self, connected_socket, peer_address, terminator, family,
160
               unhandled_limit):
161
    """AsyncTerminatedMessageStream constructor.
162

163
    @type connected_socket: socket.socket
164
    @param connected_socket: connected stream socket to receive messages from
165
    @param peer_address: family-specific peer address
166
    @type terminator: string
167
    @param terminator: terminator separating messages in the stream
168
    @type family: integer
169
    @param family: socket family
170
    @type unhandled_limit: integer or None
171
    @param unhandled_limit: maximum unanswered messages
172

173
    """
174
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
175
    # using a positional argument rather than a keyword one.
176
    asynchat.async_chat.__init__(self, connected_socket)
177
    self.connected_socket = connected_socket
178
    # on python 2.4 there is no "family" attribute for the socket class
179
    # FIXME: when we move to python 2.5 or above remove the family parameter
180
    #self.family = self.connected_socket.family
181
    self.family = family
182
    self.peer_address = peer_address
183
    self.terminator = terminator
184
    self.unhandled_limit = unhandled_limit
185
    self.set_terminator(terminator)
186
    self.ibuffer = []
187
    self.receive_count = 0
188
    self.send_count = 0
189
    self.oqueue = collections.deque()
190
    self.iqueue = collections.deque()
191

    
192
  # this method is overriding an asynchat.async_chat method
193
  def collect_incoming_data(self, data):
194
    self.ibuffer.append(data)
195

    
196
  def _can_handle_message(self):
197
    return (self.unhandled_limit is None or
198
            (self.receive_count < self.send_count + self.unhandled_limit) and
199
             not self.iqueue)
200

    
201
  # this method is overriding an asynchat.async_chat method
202
  def found_terminator(self):
203
    message = "".join(self.ibuffer)
204
    self.ibuffer = []
205
    message_id = self.receive_count
206
    # We need to increase the receive_count after checking if the message can
207
    # be handled, but before calling handle_message
208
    can_handle = self._can_handle_message()
209
    self.receive_count += 1
210
    if can_handle:
211
      self.handle_message(message, message_id)
212
    else:
213
      self.iqueue.append((message, message_id))
214

    
215
  def handle_message(self, message, message_id):
216
    """Handle a terminated message.
217

218
    @type message: string
219
    @param message: message to handle
220
    @type message_id: integer
221
    @param message_id: stream's message sequence number
222

223
    """
224
    pass
225
    # TODO: move this method to raise NotImplementedError
226
    # raise NotImplementedError
227

    
228
  def send_message(self, message):
229
    """Send a message to the remote peer. This function is thread-safe.
230

231
    @type message: string
232
    @param message: message to send, without the terminator
233

234
    @warning: If calling this function from a thread different than the one
235
    performing the main asyncore loop, remember that you have to wake that one
236
    up.
237

238
    """
239
    # If we just append the message we received to the output queue, this
240
    # function can be safely called by multiple threads at the same time, and
241
    # we don't need locking, since deques are thread safe. handle_write in the
242
    # asyncore thread will handle the next input message if there are any
243
    # enqueued.
244
    self.oqueue.append(message)
245

    
246
  # this method is overriding an asyncore.dispatcher method
247
  def readable(self):
248
    # read from the socket if we can handle the next requests
249
    return self._can_handle_message() and asynchat.async_chat.readable(self)
250

    
251
  # this method is overriding an asyncore.dispatcher method
252
  def writable(self):
253
    # the output queue may become full just after we called writable. This only
254
    # works if we know we'll have something else waking us up from the select,
255
    # in such case, anyway.
256
    return asynchat.async_chat.writable(self) or self.oqueue
257

    
258
  # this method is overriding an asyncore.dispatcher method
259
  def handle_write(self):
260
    if self.oqueue:
261
      # if we have data in the output queue, then send_message was called.
262
      # this means we can process one more message from the input queue, if
263
      # there are any.
264
      data = self.oqueue.popleft()
265
      self.push(data + self.terminator)
266
      self.send_count += 1
267
      if self.iqueue:
268
        self.handle_message(*self.iqueue.popleft())
269
    self.initiate_send()
270

    
271
  def close_log(self):
272
    logging.info("Closing connection from %s",
273
                 netutils.FormatAddress(self.peer_address, family=self.family))
274
    self.close()
275

    
276
  # this method is overriding an asyncore.dispatcher method
277
  def handle_expt(self):
278
    self.close_log()
279

    
280
  # this method is overriding an asyncore.dispatcher method
281
  def handle_error(self):
282
    """Log an error in handling any request, and proceed.
283

284
    """
285
    logging.exception("Error while handling asyncore request")
286
    self.close_log()
287

    
288

    
289
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
290
  """An improved asyncore udp socket.
291

292
  """
293
  def __init__(self, family):
294
    """Constructor for AsyncUDPSocket
295

296
    """
297
    GanetiBaseAsyncoreDispatcher.__init__(self)
298
    self._out_queue = []
299
    self._family = family
300
    self.create_socket(family, socket.SOCK_DGRAM)
301

    
302
  # this method is overriding an asyncore.dispatcher method
303
  def handle_connect(self):
304
    # Python thinks that the first udp message from a source qualifies as a
305
    # "connect" and further ones are part of the same connection. We beg to
306
    # differ and treat all messages equally.
307
    pass
308

    
309
  # this method is overriding an asyncore.dispatcher method
310
  def handle_read(self):
311
    recv_result = utils.IgnoreSignals(self.recvfrom,
312
                                      constants.MAX_UDP_DATA_SIZE)
313
    if recv_result is not None:
314
      payload, address = recv_result
315
      if self._family == socket.AF_INET6:
316
        # we ignore 'flow info' and 'scope id' as we don't need them
317
        ip, port, _, _ = address
318
      else:
319
        ip, port = address
320

    
321
      self.handle_datagram(payload, ip, port)
322

    
323
  def handle_datagram(self, payload, ip, port):
324
    """Handle an already read udp datagram
325

326
    """
327
    raise NotImplementedError
328

    
329
  # this method is overriding an asyncore.dispatcher method
330
  def writable(self):
331
    # We should check whether we can write to the socket only if we have
332
    # something scheduled to be written
333
    return bool(self._out_queue)
334

    
335
  # this method is overriding an asyncore.dispatcher method
336
  def handle_write(self):
337
    if not self._out_queue:
338
      logging.error("handle_write called with empty output queue")
339
      return
340
    (ip, port, payload) = self._out_queue[0]
341
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
342
    self._out_queue.pop(0)
343

    
344
  def enqueue_send(self, ip, port, payload):
345
    """Enqueue a datagram to be sent when possible
346

347
    """
348
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
349
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
350
                                    constants.MAX_UDP_DATA_SIZE))
351
    self._out_queue.append((ip, port, payload))
352

    
353
  def process_next_packet(self, timeout=0):
354
    """Process the next datagram, waiting for it if necessary.
355

356
    @type timeout: float
357
    @param timeout: how long to wait for data
358
    @rtype: boolean
359
    @return: True if some data has been handled, False otherwise
360

361
    """
362
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
363
    if result is not None and result & select.POLLIN:
364
      self.handle_read()
365
      return True
366
    else:
367
      return False
368

    
369

    
370
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
371
  """A way to notify the asyncore loop that something is going on.
372

373
  If an asyncore daemon is multithreaded when a thread tries to push some data
374
  to a socket, the main loop handling asynchronous requests might be sleeping
375
  waiting on a select(). To avoid this it can create an instance of the
376
  AsyncAwaker, which other threads can use to wake it up.
377

378
  """
379
  def __init__(self, signal_fn=None):
380
    """Constructor for AsyncAwaker
381

382
    @type signal_fn: function
383
    @param signal_fn: function to call when awaken
384

385
    """
386
    GanetiBaseAsyncoreDispatcher.__init__(self)
387
    assert signal_fn == None or callable(signal_fn)
388
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
389
                                                          socket.SOCK_STREAM)
390
    self.in_socket.setblocking(0)
391
    self.in_socket.shutdown(socket.SHUT_WR)
392
    self.out_socket.shutdown(socket.SHUT_RD)
393
    self.set_socket(self.in_socket)
394
    self.need_signal = True
395
    self.signal_fn = signal_fn
396
    self.connected = True
397

    
398
  # this method is overriding an asyncore.dispatcher method
399
  def handle_read(self):
400
    utils.IgnoreSignals(self.recv, 4096)
401
    if self.signal_fn:
402
      self.signal_fn()
403
    self.need_signal = True
404

    
405
  # this method is overriding an asyncore.dispatcher method
406
  def close(self):
407
    asyncore.dispatcher.close(self)
408
    self.out_socket.close()
409

    
410
  def signal(self):
411
    """Signal the asyncore main loop.
412

413
    Any data we send here will be ignored, but it will cause the select() call
414
    to return.
415

416
    """
417
    # Yes, there is a race condition here. No, we don't care, at worst we're
418
    # sending more than one wakeup token, which doesn't harm at all.
419
    if self.need_signal:
420
      self.need_signal = False
421
      self.out_socket.send("\0")
422

    
423

    
424
class Mainloop(object):
425
  """Generic mainloop for daemons
426

427
  @ivar scheduler: A sched.scheduler object, which can be used to register
428
    timed events
429

430
  """
431
  def __init__(self):
432
    """Constructs a new Mainloop instance.
433

434
    """
435
    self._signal_wait = []
436
    self.scheduler = AsyncoreScheduler(time.time)
437

    
438
    # Resolve uid/gids used
439
    runtime.GetEnts()
440

    
441
  @utils.SignalHandled([signal.SIGCHLD])
442
  @utils.SignalHandled([signal.SIGTERM])
443
  @utils.SignalHandled([signal.SIGINT])
444
  def Run(self, signal_handlers=None):
445
    """Runs the mainloop.
446

447
    @type signal_handlers: dict
448
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
449

450
    """
451
    assert isinstance(signal_handlers, dict) and \
452
           len(signal_handlers) > 0, \
453
           "Broken SignalHandled decorator"
454

    
455
    # Counter for received signals
456
    shutdown_signals = 0
457

    
458
    # Start actual main loop
459
    while shutdown_signals < 1:
460
      if not self.scheduler.empty():
461
        try:
462
          self.scheduler.run()
463
        except SchedulerBreakout:
464
          pass
465
      else:
466
        asyncore.loop(count=1, use_poll=True)
467

    
468
      # Check whether a signal was raised
469
      for (sig, handler) in signal_handlers.items():
470
        if handler.called:
471
          self._CallSignalWaiters(sig)
472
          if sig in (signal.SIGTERM, signal.SIGINT):
473
            logging.info("Received signal %s asking for shutdown", sig)
474
            shutdown_signals += 1
475
          handler.Clear()
476

    
477
  def _CallSignalWaiters(self, signum):
478
    """Calls all signal waiters for a certain signal.
479

480
    @type signum: int
481
    @param signum: Signal number
482

483
    """
484
    for owner in self._signal_wait:
485
      owner.OnSignal(signum)
486

    
487
  def RegisterSignal(self, owner):
488
    """Registers a receiver for signal notifications
489

490
    The receiver must support a "OnSignal(self, signum)" function.
491

492
    @type owner: instance
493
    @param owner: Receiver
494

495
    """
496
    self._signal_wait.append(owner)
497

    
498

    
499
def _VerifyDaemonUser(daemon_name):
500
  """Verifies the process uid matches the configured uid.
501

502
  This method verifies that a daemon is started as the user it is
503
  intended to be run
504

505
  @param daemon_name: The name of daemon to be started
506
  @return: A tuple with the first item indicating success or not,
507
           the second item current uid and third with expected uid
508

509
  """
510
  getents = runtime.GetEnts()
511
  running_uid = os.getuid()
512
  daemon_uids = {
513
    constants.MASTERD: getents.masterd_uid,
514
    constants.RAPI: getents.rapi_uid,
515
    constants.NODED: getents.noded_uid,
516
    constants.CONFD: getents.confd_uid,
517
    }
518

    
519
  return (daemon_uids[daemon_name] == running_uid, running_uid,
520
          daemon_uids[daemon_name])
521

    
522

    
523
def _BeautifyError(err):
524
  """Try to format an error better.
525

526
  Since we're dealing with daemon startup errors, in many cases this
527
  will be due to socket error and such, so we try to format these cases better.
528

529
  @param err: an exception object
530
  @rtype: string
531
  @return: the formatted error description
532

533
  """
534
  try:
535
    if isinstance(err, socket.error):
536
      return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
537
    elif isinstance(err, EnvironmentError):
538
      if err.filename is None:
539
        return "%s (errno=%s)" % (err.strerror, err.errno)
540
      else:
541
        return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
542
                                            err.errno)
543
    else:
544
      return str(err)
545
  except Exception: # pylint: disable=W0703
546
    logging.exception("Error while handling existing error %s", err)
547
    return "%s" % str(err)
548

    
549

    
550
def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
551
  """Handler for SIGHUP.
552

553
  @param reopen_fn: List of callback functions for reopening log files
554

555
  """
556
  logging.info("Reopening log files after receiving SIGHUP")
557

    
558
  for fn in reopen_fn:
559
    if fn:
560
      fn()
561

    
562

    
563
def GenericMain(daemon_name, optionparser,
564
                check_fn, prepare_fn, exec_fn,
565
                multithreaded=False, console_logging=False,
566
                default_ssl_cert=None, default_ssl_key=None):
567
  """Shared main function for daemons.
568

569
  @type daemon_name: string
570
  @param daemon_name: daemon name
571
  @type optionparser: optparse.OptionParser
572
  @param optionparser: initialized optionparser with daemon-specific options
573
                       (common -f -d options will be handled by this module)
574
  @type check_fn: function which accepts (options, args)
575
  @param check_fn: function that checks start conditions and exits if they're
576
                   not met
577
  @type prepare_fn: function which accepts (options, args)
578
  @param prepare_fn: function that is run before forking, or None;
579
      it's result will be passed as the third parameter to exec_fn, or
580
      if None was passed in, we will just pass None to exec_fn
581
  @type exec_fn: function which accepts (options, args, prepare_results)
582
  @param exec_fn: function that's executed with the daemon's pid file held, and
583
                  runs the daemon itself.
584
  @type multithreaded: bool
585
  @param multithreaded: Whether the daemon uses threads
586
  @type console_logging: boolean
587
  @param console_logging: if True, the daemon will fall back to the system
588
                          console if logging fails
589
  @type default_ssl_cert: string
590
  @param default_ssl_cert: Default SSL certificate path
591
  @type default_ssl_key: string
592
  @param default_ssl_key: Default SSL key path
593

594
  """
595
  optionparser.add_option("-f", "--foreground", dest="fork",
596
                          help="Don't detach from the current terminal",
597
                          default=True, action="store_false")
598
  optionparser.add_option("-d", "--debug", dest="debug",
599
                          help="Enable some debug messages",
600
                          default=False, action="store_true")
601
  optionparser.add_option("--syslog", dest="syslog",
602
                          help="Enable logging to syslog (except debug"
603
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
604
                          constants.SYSLOG_USAGE,
605
                          default=constants.SYSLOG_USAGE,
606
                          choices=["no", "yes", "only"])
607

    
608
  if daemon_name in constants.DAEMONS_PORTS:
609
    default_bind_address = constants.IP4_ADDRESS_ANY
610
    family = ssconf.SimpleStore().GetPrimaryIPFamily()
611
    # family will default to AF_INET if there is no ssconf file (e.g. when
612
    # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
613
    # <= 2.2 can not be AF_INET6
614
    if family == netutils.IP6Address.family:
615
      default_bind_address = constants.IP6_ADDRESS_ANY
616

    
617
    default_port = netutils.GetDaemonPort(daemon_name)
618

    
619
    # For networked daemons we allow choosing the port and bind address
620
    optionparser.add_option("-p", "--port", dest="port",
621
                            help="Network port (default: %s)" % default_port,
622
                            default=default_port, type="int")
623
    optionparser.add_option("-b", "--bind", dest="bind_address",
624
                            help=("Bind address (default: '%s')" %
625
                                  default_bind_address),
626
                            default=default_bind_address, metavar="ADDRESS")
627

    
628
  if default_ssl_key is not None and default_ssl_cert is not None:
629
    optionparser.add_option("--no-ssl", dest="ssl",
630
                            help="Do not secure HTTP protocol with SSL",
631
                            default=True, action="store_false")
632
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
633
                            help=("SSL key path (default: %s)" %
634
                                  default_ssl_key),
635
                            default=default_ssl_key, type="string",
636
                            metavar="SSL_KEY_PATH")
637
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
638
                            help=("SSL certificate path (default: %s)" %
639
                                  default_ssl_cert),
640
                            default=default_ssl_cert, type="string",
641
                            metavar="SSL_CERT_PATH")
642

    
643
  # Disable the use of fork(2) if the daemon uses threads
644
  if multithreaded:
645
    utils.DisableFork()
646

    
647
  options, args = optionparser.parse_args()
648

    
649
  if getattr(options, "ssl", False):
650
    ssl_paths = {
651
      "certificate": options.ssl_cert,
652
      "key": options.ssl_key,
653
      }
654

    
655
    for name, path in ssl_paths.iteritems():
656
      if not os.path.isfile(path):
657
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
658
        sys.exit(constants.EXIT_FAILURE)
659

    
660
    # TODO: By initiating http.HttpSslParams here we would only read the files
661
    # once and have a proper validation (isfile returns False on directories)
662
    # at the same time.
663

    
664
  result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
665
  if not result:
666
    msg = ("%s started using wrong user ID (%d), expected %d" %
667
           (daemon_name, running_uid, expected_uid))
668
    print >> sys.stderr, msg
669
    sys.exit(constants.EXIT_FAILURE)
670

    
671
  if check_fn is not None:
672
    check_fn(options, args)
673

    
674
  if options.fork:
675
    utils.CloseFDs()
676
    (wpipe, stdio_reopen_fn) = \
677
      utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
678
  else:
679
    (wpipe, stdio_reopen_fn) = (None, None)
680

    
681
  log_reopen_fn = \
682
    utils.SetupLogging(constants.DAEMONS_LOGFILES[daemon_name], daemon_name,
683
                       debug=options.debug,
684
                       stderr_logging=not options.fork,
685
                       multithreaded=multithreaded,
686
                       syslog=options.syslog,
687
                       console_logging=console_logging)
688

    
689
  # Reopen log file(s) on SIGHUP
690
  signal.signal(signal.SIGHUP,
691
                compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
692

    
693
  utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
694
  try:
695
    try:
696
      logging.info("%s daemon startup", daemon_name)
697
      if callable(prepare_fn):
698
        prep_results = prepare_fn(options, args)
699
      else:
700
        prep_results = None
701
    except Exception, err:
702
      utils.WriteErrorToFD(wpipe, _BeautifyError(err))
703
      raise
704

    
705
    if wpipe is not None:
706
      # we're done with the preparation phase, we close the pipe to
707
      # let the parent know it's safe to exit
708
      os.close(wpipe)
709

    
710
    exec_fn(options, args, prep_results)
711
  finally:
712
    utils.RemoveFile(utils.DaemonPidFileName(daemon_name))