Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ a0d2fe2c

History | View | Annotate | Download (21.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import asynchat
27
import collections
28
import os
29
import signal
30
import logging
31
import sched
32
import time
33
import socket
34
import select
35
import sys
36

    
37
from ganeti import utils
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import netutils
41
from ganeti import ssconf
42
from ganeti import runtime
43

    
44

    
45
class SchedulerBreakout(Exception):
46
  """Exception used to get out of the scheduler loop
47

48
  """
49

    
50

    
51
def AsyncoreDelayFunction(timeout):
52
  """Asyncore-compatible scheduler delay function.
53

54
  This is a delay function for sched that, rather than actually sleeping,
55
  executes asyncore events happening in the meantime.
56

57
  After an event has occurred, rather than returning, it raises a
58
  SchedulerBreakout exception, which will force the current scheduler.run()
59
  invocation to terminate, so that we can also check for signals. The main loop
60
  will then call the scheduler run again, which will allow it to actually
61
  process any due events.
62

63
  This is needed because scheduler.run() doesn't support a count=..., as
64
  asyncore loop, and the scheduler module documents throwing exceptions from
65
  inside the delay function as an allowed usage model.
66

67
  """
68
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
69
  raise SchedulerBreakout()
70

    
71

    
72
class AsyncoreScheduler(sched.scheduler):
73
  """Event scheduler integrated with asyncore
74

75
  """
76
  def __init__(self, timefunc):
77
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
78

    
79

    
80
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
81
  """Base Ganeti Asyncore Dispacher
82

83
  """
84
  # this method is overriding an asyncore.dispatcher method
85
  def handle_error(self):
86
    """Log an error in handling any request, and proceed.
87

88
    """
89
    logging.exception("Error while handling asyncore request")
90

    
91
  # this method is overriding an asyncore.dispatcher method
92
  def writable(self):
93
    """Most of the time we don't want to check for writability.
94

95
    """
96
    return False
97

    
98

    
99
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
100
  """A stream server to use with asyncore.
101

102
  Each request is accepted, and then dispatched to a separate asyncore
103
  dispatcher to handle.
104

105
  """
106

    
107
  _REQUEST_QUEUE_SIZE = 5
108

    
109
  def __init__(self, family, address):
110
    """Constructor for AsyncUnixStreamSocket
111

112
    @type family: integer
113
    @param family: socket family (one of socket.AF_*)
114
    @type address: address family dependent
115
    @param address: address to bind the socket to
116

117
    """
118
    GanetiBaseAsyncoreDispatcher.__init__(self)
119
    self.family = family
120
    self.create_socket(self.family, socket.SOCK_STREAM)
121
    self.set_reuse_addr()
122
    self.bind(address)
123
    self.listen(self._REQUEST_QUEUE_SIZE)
124

    
125
  # this method is overriding an asyncore.dispatcher method
126
  def handle_accept(self):
127
    """Accept a new client connection.
128

129
    Creates a new instance of the handler class, which will use asyncore to
130
    serve the client.
131

132
    """
133
    accept_result = utils.IgnoreSignals(self.accept)
134
    if accept_result is not None:
135
      connected_socket, client_address = accept_result
136
      if self.family == socket.AF_UNIX:
137
        # override the client address, as for unix sockets nothing meaningful
138
        # is passed in from accept anyway
139
        client_address = netutils.GetSocketCredentials(connected_socket)
140
      logging.info("Accepted connection from %s",
141
                   netutils.FormatAddress(client_address, family=self.family))
142
      self.handle_connection(connected_socket, client_address)
143

    
144
  def handle_connection(self, connected_socket, client_address):
145
    """Handle an already accepted connection.
146

147
    """
148
    raise NotImplementedError
149

    
150

    
151
class AsyncTerminatedMessageStream(asynchat.async_chat):
152
  """A terminator separated message stream asyncore module.
153

154
  Handles a stream connection receiving messages terminated by a defined
155
  separator. For each complete message handle_message is called.
156

157
  """
158
  def __init__(self, connected_socket, peer_address, terminator, family,
159
               unhandled_limit):
160
    """AsyncTerminatedMessageStream constructor.
161

162
    @type connected_socket: socket.socket
163
    @param connected_socket: connected stream socket to receive messages from
164
    @param peer_address: family-specific peer address
165
    @type terminator: string
166
    @param terminator: terminator separating messages in the stream
167
    @type family: integer
168
    @param family: socket family
169
    @type unhandled_limit: integer or None
170
    @param unhandled_limit: maximum unanswered messages
171

172
    """
173
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
174
    # using a positional argument rather than a keyword one.
175
    asynchat.async_chat.__init__(self, connected_socket)
176
    self.connected_socket = connected_socket
177
    # on python 2.4 there is no "family" attribute for the socket class
178
    # FIXME: when we move to python 2.5 or above remove the family parameter
179
    #self.family = self.connected_socket.family
180
    self.family = family
181
    self.peer_address = peer_address
182
    self.terminator = terminator
183
    self.unhandled_limit = unhandled_limit
184
    self.set_terminator(terminator)
185
    self.ibuffer = []
186
    self.receive_count = 0
187
    self.send_count = 0
188
    self.oqueue = collections.deque()
189
    self.iqueue = collections.deque()
190

    
191
  # this method is overriding an asynchat.async_chat method
192
  def collect_incoming_data(self, data):
193
    self.ibuffer.append(data)
194

    
195
  def _can_handle_message(self):
196
    return (self.unhandled_limit is None or
197
            (self.receive_count < self.send_count + self.unhandled_limit) and
198
             not self.iqueue)
199

    
200
  # this method is overriding an asynchat.async_chat method
201
  def found_terminator(self):
202
    message = "".join(self.ibuffer)
203
    self.ibuffer = []
204
    message_id = self.receive_count
205
    # We need to increase the receive_count after checking if the message can
206
    # be handled, but before calling handle_message
207
    can_handle = self._can_handle_message()
208
    self.receive_count += 1
209
    if can_handle:
210
      self.handle_message(message, message_id)
211
    else:
212
      self.iqueue.append((message, message_id))
213

    
214
  def handle_message(self, message, message_id):
215
    """Handle a terminated message.
216

217
    @type message: string
218
    @param message: message to handle
219
    @type message_id: integer
220
    @param message_id: stream's message sequence number
221

222
    """
223
    pass
224
    # TODO: move this method to raise NotImplementedError
225
    # raise NotImplementedError
226

    
227
  def send_message(self, message):
228
    """Send a message to the remote peer. This function is thread-safe.
229

230
    @type message: string
231
    @param message: message to send, without the terminator
232

233
    @warning: If calling this function from a thread different than the one
234
    performing the main asyncore loop, remember that you have to wake that one
235
    up.
236

237
    """
238
    # If we just append the message we received to the output queue, this
239
    # function can be safely called by multiple threads at the same time, and
240
    # we don't need locking, since deques are thread safe. handle_write in the
241
    # asyncore thread will handle the next input message if there are any
242
    # enqueued.
243
    self.oqueue.append(message)
244

    
245
  # this method is overriding an asyncore.dispatcher method
246
  def readable(self):
247
    # read from the socket if we can handle the next requests
248
    return self._can_handle_message() and asynchat.async_chat.readable(self)
249

    
250
  # this method is overriding an asyncore.dispatcher method
251
  def writable(self):
252
    # the output queue may become full just after we called writable. This only
253
    # works if we know we'll have something else waking us up from the select,
254
    # in such case, anyway.
255
    return asynchat.async_chat.writable(self) or self.oqueue
256

    
257
  # this method is overriding an asyncore.dispatcher method
258
  def handle_write(self):
259
    if self.oqueue:
260
      # if we have data in the output queue, then send_message was called.
261
      # this means we can process one more message from the input queue, if
262
      # there are any.
263
      data = self.oqueue.popleft()
264
      self.push(data + self.terminator)
265
      self.send_count += 1
266
      if self.iqueue:
267
        self.handle_message(*self.iqueue.popleft())
268
    self.initiate_send()
269

    
270
  def close_log(self):
271
    logging.info("Closing connection from %s",
272
                 netutils.FormatAddress(self.peer_address, family=self.family))
273
    self.close()
274

    
275
  # this method is overriding an asyncore.dispatcher method
276
  def handle_expt(self):
277
    self.close_log()
278

    
279
  # this method is overriding an asyncore.dispatcher method
280
  def handle_error(self):
281
    """Log an error in handling any request, and proceed.
282

283
    """
284
    logging.exception("Error while handling asyncore request")
285
    self.close_log()
286

    
287

    
288
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
289
  """An improved asyncore udp socket.
290

291
  """
292
  def __init__(self, family):
293
    """Constructor for AsyncUDPSocket
294

295
    """
296
    GanetiBaseAsyncoreDispatcher.__init__(self)
297
    self._out_queue = []
298
    self._family = family
299
    self.create_socket(family, socket.SOCK_DGRAM)
300

    
301
  # this method is overriding an asyncore.dispatcher method
302
  def handle_connect(self):
303
    # Python thinks that the first udp message from a source qualifies as a
304
    # "connect" and further ones are part of the same connection. We beg to
305
    # differ and treat all messages equally.
306
    pass
307

    
308
  # this method is overriding an asyncore.dispatcher method
309
  def handle_read(self):
310
    recv_result = utils.IgnoreSignals(self.recvfrom,
311
                                      constants.MAX_UDP_DATA_SIZE)
312
    if recv_result is not None:
313
      payload, address = recv_result
314
      if self._family == socket.AF_INET6:
315
        # we ignore 'flow info' and 'scope id' as we don't need them
316
        ip, port, _, _ = address
317
      else:
318
        ip, port = address
319

    
320
      self.handle_datagram(payload, ip, port)
321

    
322
  def handle_datagram(self, payload, ip, port):
323
    """Handle an already read udp datagram
324

325
    """
326
    raise NotImplementedError
327

    
328
  # this method is overriding an asyncore.dispatcher method
329
  def writable(self):
330
    # We should check whether we can write to the socket only if we have
331
    # something scheduled to be written
332
    return bool(self._out_queue)
333

    
334
  # this method is overriding an asyncore.dispatcher method
335
  def handle_write(self):
336
    if not self._out_queue:
337
      logging.error("handle_write called with empty output queue")
338
      return
339
    (ip, port, payload) = self._out_queue[0]
340
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
341
    self._out_queue.pop(0)
342

    
343
  def enqueue_send(self, ip, port, payload):
344
    """Enqueue a datagram to be sent when possible
345

346
    """
347
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
348
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
349
                                    constants.MAX_UDP_DATA_SIZE))
350
    self._out_queue.append((ip, port, payload))
351

    
352
  def process_next_packet(self, timeout=0):
353
    """Process the next datagram, waiting for it if necessary.
354

355
    @type timeout: float
356
    @param timeout: how long to wait for data
357
    @rtype: boolean
358
    @return: True if some data has been handled, False otherwise
359

360
    """
361
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
362
    if result is not None and result & select.POLLIN:
363
      self.handle_read()
364
      return True
365
    else:
366
      return False
367

    
368

    
369
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
370
  """A way to notify the asyncore loop that something is going on.
371

372
  If an asyncore daemon is multithreaded when a thread tries to push some data
373
  to a socket, the main loop handling asynchronous requests might be sleeping
374
  waiting on a select(). To avoid this it can create an instance of the
375
  AsyncAwaker, which other threads can use to wake it up.
376

377
  """
378
  def __init__(self, signal_fn=None):
379
    """Constructor for AsyncAwaker
380

381
    @type signal_fn: function
382
    @param signal_fn: function to call when awaken
383

384
    """
385
    GanetiBaseAsyncoreDispatcher.__init__(self)
386
    assert signal_fn == None or callable(signal_fn)
387
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
388
                                                          socket.SOCK_STREAM)
389
    self.in_socket.setblocking(0)
390
    self.in_socket.shutdown(socket.SHUT_WR)
391
    self.out_socket.shutdown(socket.SHUT_RD)
392
    self.set_socket(self.in_socket)
393
    self.need_signal = True
394
    self.signal_fn = signal_fn
395
    self.connected = True
396

    
397
  # this method is overriding an asyncore.dispatcher method
398
  def handle_read(self):
399
    utils.IgnoreSignals(self.recv, 4096)
400
    if self.signal_fn:
401
      self.signal_fn()
402
    self.need_signal = True
403

    
404
  # this method is overriding an asyncore.dispatcher method
405
  def close(self):
406
    asyncore.dispatcher.close(self)
407
    self.out_socket.close()
408

    
409
  def signal(self):
410
    """Signal the asyncore main loop.
411

412
    Any data we send here will be ignored, but it will cause the select() call
413
    to return.
414

415
    """
416
    # Yes, there is a race condition here. No, we don't care, at worst we're
417
    # sending more than one wakeup token, which doesn't harm at all.
418
    if self.need_signal:
419
      self.need_signal = False
420
      self.out_socket.send("\0")
421

    
422

    
423
class Mainloop(object):
424
  """Generic mainloop for daemons
425

426
  @ivar scheduler: A sched.scheduler object, which can be used to register
427
    timed events
428

429
  """
430
  def __init__(self):
431
    """Constructs a new Mainloop instance.
432

433
    """
434
    self._signal_wait = []
435
    self.scheduler = AsyncoreScheduler(time.time)
436

    
437
  @utils.SignalHandled([signal.SIGCHLD])
438
  @utils.SignalHandled([signal.SIGTERM])
439
  @utils.SignalHandled([signal.SIGINT])
440
  def Run(self, signal_handlers=None):
441
    """Runs the mainloop.
442

443
    @type signal_handlers: dict
444
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
445

446
    """
447
    assert isinstance(signal_handlers, dict) and \
448
           len(signal_handlers) > 0, \
449
           "Broken SignalHandled decorator"
450
    running = True
451
    # Start actual main loop
452
    while running:
453
      if not self.scheduler.empty():
454
        try:
455
          self.scheduler.run()
456
        except SchedulerBreakout:
457
          pass
458
      else:
459
        asyncore.loop(count=1, use_poll=True)
460

    
461
      # Check whether a signal was raised
462
      for sig in signal_handlers:
463
        handler = signal_handlers[sig]
464
        if handler.called:
465
          self._CallSignalWaiters(sig)
466
          running = sig not in (signal.SIGTERM, signal.SIGINT)
467
          handler.Clear()
468

    
469
  def _CallSignalWaiters(self, signum):
470
    """Calls all signal waiters for a certain signal.
471

472
    @type signum: int
473
    @param signum: Signal number
474

475
    """
476
    for owner in self._signal_wait:
477
      owner.OnSignal(signum)
478

    
479
  def RegisterSignal(self, owner):
480
    """Registers a receiver for signal notifications
481

482
    The receiver must support a "OnSignal(self, signum)" function.
483

484
    @type owner: instance
485
    @param owner: Receiver
486

487
    """
488
    self._signal_wait.append(owner)
489

    
490

    
491
def _VerifyDaemonUser(daemon_name):
492
  """Verifies the process uid matches the configured uid.
493

494
  This method verifies that a daemon is started as the user it is intended to be
495
  run
496

497
  @param daemon_name: The name of daemon to be started
498
  @return: A tuple with the first item indicating success or not,
499
           the second item current uid and third with expected uid
500

501
  """
502
  getents = runtime.GetEnts()
503
  running_uid = os.getuid()
504
  daemon_uids = {
505
    constants.MASTERD: getents.masterd_uid,
506
    constants.RAPI: getents.rapi_uid,
507
    constants.NODED: getents.noded_uid,
508
    constants.CONFD: getents.confd_uid,
509
    }
510

    
511
  return (daemon_uids[daemon_name] == running_uid, running_uid,
512
          daemon_uids[daemon_name])
513

    
514

    
515
def GenericMain(daemon_name, optionparser, check_fn, exec_fn,
516
                multithreaded=False, console_logging=False,
517
                default_ssl_cert=None, default_ssl_key=None):
518
  """Shared main function for daemons.
519

520
  @type daemon_name: string
521
  @param daemon_name: daemon name
522
  @type optionparser: optparse.OptionParser
523
  @param optionparser: initialized optionparser with daemon-specific options
524
                       (common -f -d options will be handled by this module)
525
  @type check_fn: function which accepts (options, args)
526
  @param check_fn: function that checks start conditions and exits if they're
527
                   not met
528
  @type exec_fn: function which accepts (options, args)
529
  @param exec_fn: function that's executed with the daemon's pid file held, and
530
                  runs the daemon itself.
531
  @type multithreaded: bool
532
  @param multithreaded: Whether the daemon uses threads
533
  @type console_logging: boolean
534
  @param console_logging: if True, the daemon will fall back to the system
535
                          console if logging fails
536
  @type default_ssl_cert: string
537
  @param default_ssl_cert: Default SSL certificate path
538
  @type default_ssl_key: string
539
  @param default_ssl_key: Default SSL key path
540

541
  """
542
  optionparser.add_option("-f", "--foreground", dest="fork",
543
                          help="Don't detach from the current terminal",
544
                          default=True, action="store_false")
545
  optionparser.add_option("-d", "--debug", dest="debug",
546
                          help="Enable some debug messages",
547
                          default=False, action="store_true")
548
  optionparser.add_option("--syslog", dest="syslog",
549
                          help="Enable logging to syslog (except debug"
550
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
551
                          constants.SYSLOG_USAGE,
552
                          default=constants.SYSLOG_USAGE,
553
                          choices=["no", "yes", "only"])
554

    
555
  if daemon_name in constants.DAEMONS_PORTS:
556
    default_bind_address = constants.IP4_ADDRESS_ANY
557
    family = ssconf.SimpleStore().GetPrimaryIPFamily()
558
    # family will default to AF_INET if there is no ssconf file (e.g. when
559
    # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
560
    # <= 2.2 can not be AF_INET6
561
    if family == netutils.IP6Address.family:
562
      default_bind_address = constants.IP6_ADDRESS_ANY
563

    
564
    default_port = netutils.GetDaemonPort(daemon_name)
565

    
566
    # For networked daemons we allow choosing the port and bind address
567
    optionparser.add_option("-p", "--port", dest="port",
568
                            help="Network port (default: %s)" % default_port,
569
                            default=default_port, type="int")
570
    optionparser.add_option("-b", "--bind", dest="bind_address",
571
                            help=("Bind address (default: '%s')" %
572
                                  default_bind_address),
573
                            default=default_bind_address, metavar="ADDRESS")
574

    
575
  if default_ssl_key is not None and default_ssl_cert is not None:
576
    optionparser.add_option("--no-ssl", dest="ssl",
577
                            help="Do not secure HTTP protocol with SSL",
578
                            default=True, action="store_false")
579
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
580
                            help=("SSL key path (default: %s)" %
581
                                  default_ssl_key),
582
                            default=default_ssl_key, type="string",
583
                            metavar="SSL_KEY_PATH")
584
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
585
                            help=("SSL certificate path (default: %s)" %
586
                                  default_ssl_cert),
587
                            default=default_ssl_cert, type="string",
588
                            metavar="SSL_CERT_PATH")
589

    
590
  # Disable the use of fork(2) if the daemon uses threads
591
  utils.no_fork = multithreaded
592

    
593
  options, args = optionparser.parse_args()
594

    
595
  if getattr(options, "ssl", False):
596
    ssl_paths = {
597
      "certificate": options.ssl_cert,
598
      "key": options.ssl_key,
599
      }
600

    
601
    for name, path in ssl_paths.iteritems():
602
      if not os.path.isfile(path):
603
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
604
        sys.exit(constants.EXIT_FAILURE)
605

    
606
    # TODO: By initiating http.HttpSslParams here we would only read the files
607
    # once and have a proper validation (isfile returns False on directories)
608
    # at the same time.
609

    
610
  result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
611
  if not result:
612
    msg = ("%s started using wrong user ID (%d), expected %d" %
613
           (daemon_name, running_uid, expected_uid))
614
    print >> sys.stderr, msg
615
    sys.exit(constants.EXIT_FAILURE)
616

    
617
  if check_fn is not None:
618
    check_fn(options, args)
619

    
620
  if options.fork:
621
    utils.CloseFDs()
622
    utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
623

    
624
  utils.WritePidFile(daemon_name)
625
  try:
626
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
627
                       debug=options.debug,
628
                       stderr_logging=not options.fork,
629
                       multithreaded=multithreaded,
630
                       program=daemon_name,
631
                       syslog=options.syslog,
632
                       console_logging=console_logging)
633
    logging.info("%s daemon startup", daemon_name)
634
    exec_fn(options, args)
635
  finally:
636
    utils.RemovePidFile(daemon_name)