Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 981732fb

History | View | Annotate | Download (21.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import asynchat
27
import collections
28
import grp
29
import os
30
import pwd
31
import signal
32
import logging
33
import sched
34
import time
35
import socket
36
import select
37
import sys
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import netutils
43
from ganeti import ssconf
44

    
45

    
46
_DEFAULT_RUN_USER = "root"
47
_DEFAULT_RUN_GROUP = "root"
48

    
49

    
50
class SchedulerBreakout(Exception):
51
  """Exception used to get out of the scheduler loop
52

53
  """
54

    
55

    
56
def AsyncoreDelayFunction(timeout):
57
  """Asyncore-compatible scheduler delay function.
58

59
  This is a delay function for sched that, rather than actually sleeping,
60
  executes asyncore events happening in the meantime.
61

62
  After an event has occurred, rather than returning, it raises a
63
  SchedulerBreakout exception, which will force the current scheduler.run()
64
  invocation to terminate, so that we can also check for signals. The main loop
65
  will then call the scheduler run again, which will allow it to actually
66
  process any due events.
67

68
  This is needed because scheduler.run() doesn't support a count=..., as
69
  asyncore loop, and the scheduler module documents throwing exceptions from
70
  inside the delay function as an allowed usage model.
71

72
  """
73
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
74
  raise SchedulerBreakout()
75

    
76

    
77
class AsyncoreScheduler(sched.scheduler):
78
  """Event scheduler integrated with asyncore
79

80
  """
81
  def __init__(self, timefunc):
82
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
83

    
84

    
85
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
86
  """Base Ganeti Asyncore Dispacher
87

88
  """
89
  # this method is overriding an asyncore.dispatcher method
90
  def handle_error(self):
91
    """Log an error in handling any request, and proceed.
92

93
    """
94
    logging.exception("Error while handling asyncore request")
95

    
96
  # this method is overriding an asyncore.dispatcher method
97
  def writable(self):
98
    """Most of the time we don't want to check for writability.
99

100
    """
101
    return False
102

    
103

    
104
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
105
  """A stream server to use with asyncore.
106

107
  Each request is accepted, and then dispatched to a separate asyncore
108
  dispatcher to handle.
109

110
  """
111

    
112
  _REQUEST_QUEUE_SIZE = 5
113

    
114
  def __init__(self, family, address):
115
    """Constructor for AsyncUnixStreamSocket
116

117
    @type family: integer
118
    @param family: socket family (one of socket.AF_*)
119
    @type address: address family dependent
120
    @param address: address to bind the socket to
121

122
    """
123
    GanetiBaseAsyncoreDispatcher.__init__(self)
124
    self.family = family
125
    self.create_socket(self.family, socket.SOCK_STREAM)
126
    self.set_reuse_addr()
127
    self.bind(address)
128
    self.listen(self._REQUEST_QUEUE_SIZE)
129

    
130
  # this method is overriding an asyncore.dispatcher method
131
  def handle_accept(self):
132
    """Accept a new client connection.
133

134
    Creates a new instance of the handler class, which will use asyncore to
135
    serve the client.
136

137
    """
138
    accept_result = utils.IgnoreSignals(self.accept)
139
    if accept_result is not None:
140
      connected_socket, client_address = accept_result
141
      if self.family == socket.AF_UNIX:
142
        # override the client address, as for unix sockets nothing meaningful
143
        # is passed in from accept anyway
144
        client_address = netutils.GetSocketCredentials(connected_socket)
145
      logging.info("Accepted connection from %s",
146
                   netutils.FormatAddress(client_address, family=self.family))
147
      self.handle_connection(connected_socket, client_address)
148

    
149
  def handle_connection(self, connected_socket, client_address):
150
    """Handle an already accepted connection.
151

152
    """
153
    raise NotImplementedError
154

    
155

    
156
class AsyncTerminatedMessageStream(asynchat.async_chat):
157
  """A terminator separated message stream asyncore module.
158

159
  Handles a stream connection receiving messages terminated by a defined
160
  separator. For each complete message handle_message is called.
161

162
  """
163
  def __init__(self, connected_socket, peer_address, terminator, family,
164
               unhandled_limit):
165
    """AsyncTerminatedMessageStream constructor.
166

167
    @type connected_socket: socket.socket
168
    @param connected_socket: connected stream socket to receive messages from
169
    @param peer_address: family-specific peer address
170
    @type terminator: string
171
    @param terminator: terminator separating messages in the stream
172
    @type family: integer
173
    @param family: socket family
174
    @type unhandled_limit: integer or None
175
    @param unhandled_limit: maximum unanswered messages
176

177
    """
178
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
179
    # using a positional argument rather than a keyword one.
180
    asynchat.async_chat.__init__(self, connected_socket)
181
    self.connected_socket = connected_socket
182
    # on python 2.4 there is no "family" attribute for the socket class
183
    # FIXME: when we move to python 2.5 or above remove the family parameter
184
    #self.family = self.connected_socket.family
185
    self.family = family
186
    self.peer_address = peer_address
187
    self.terminator = terminator
188
    self.unhandled_limit = unhandled_limit
189
    self.set_terminator(terminator)
190
    self.ibuffer = []
191
    self.receive_count = 0
192
    self.send_count = 0
193
    self.oqueue = collections.deque()
194
    self.iqueue = collections.deque()
195

    
196
  # this method is overriding an asynchat.async_chat method
197
  def collect_incoming_data(self, data):
198
    self.ibuffer.append(data)
199

    
200
  def _can_handle_message(self):
201
    return (self.unhandled_limit is None or
202
            (self.receive_count < self.send_count + self.unhandled_limit) and
203
             not self.iqueue)
204

    
205
  # this method is overriding an asynchat.async_chat method
206
  def found_terminator(self):
207
    message = "".join(self.ibuffer)
208
    self.ibuffer = []
209
    message_id = self.receive_count
210
    # We need to increase the receive_count after checking if the message can
211
    # be handled, but before calling handle_message
212
    can_handle = self._can_handle_message()
213
    self.receive_count += 1
214
    if can_handle:
215
      self.handle_message(message, message_id)
216
    else:
217
      self.iqueue.append((message, message_id))
218

    
219
  def handle_message(self, message, message_id):
220
    """Handle a terminated message.
221

222
    @type message: string
223
    @param message: message to handle
224
    @type message_id: integer
225
    @param message_id: stream's message sequence number
226

227
    """
228
    pass
229
    # TODO: move this method to raise NotImplementedError
230
    # raise NotImplementedError
231

    
232
  def send_message(self, message):
233
    """Send a message to the remote peer. This function is thread-safe.
234

235
    @type message: string
236
    @param message: message to send, without the terminator
237

238
    @warning: If calling this function from a thread different than the one
239
    performing the main asyncore loop, remember that you have to wake that one
240
    up.
241

242
    """
243
    # If we just append the message we received to the output queue, this
244
    # function can be safely called by multiple threads at the same time, and
245
    # we don't need locking, since deques are thread safe. handle_write in the
246
    # asyncore thread will handle the next input message if there are any
247
    # enqueued.
248
    self.oqueue.append(message)
249

    
250
  # this method is overriding an asyncore.dispatcher method
251
  def readable(self):
252
    # read from the socket if we can handle the next requests
253
    return self._can_handle_message() and asynchat.async_chat.readable(self)
254

    
255
  # this method is overriding an asyncore.dispatcher method
256
  def writable(self):
257
    # the output queue may become full just after we called writable. This only
258
    # works if we know we'll have something else waking us up from the select,
259
    # in such case, anyway.
260
    return asynchat.async_chat.writable(self) or self.oqueue
261

    
262
  # this method is overriding an asyncore.dispatcher method
263
  def handle_write(self):
264
    if self.oqueue:
265
      # if we have data in the output queue, then send_message was called.
266
      # this means we can process one more message from the input queue, if
267
      # there are any.
268
      data = self.oqueue.popleft()
269
      self.push(data + self.terminator)
270
      self.send_count += 1
271
      if self.iqueue:
272
        self.handle_message(*self.iqueue.popleft())
273
    self.initiate_send()
274

    
275
  def close_log(self):
276
    logging.info("Closing connection from %s",
277
                 netutils.FormatAddress(self.peer_address, family=self.family))
278
    self.close()
279

    
280
  # this method is overriding an asyncore.dispatcher method
281
  def handle_expt(self):
282
    self.close_log()
283

    
284
  # this method is overriding an asyncore.dispatcher method
285
  def handle_error(self):
286
    """Log an error in handling any request, and proceed.
287

288
    """
289
    logging.exception("Error while handling asyncore request")
290
    self.close_log()
291

    
292

    
293
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
294
  """An improved asyncore udp socket.
295

296
  """
297
  def __init__(self, family):
298
    """Constructor for AsyncUDPSocket
299

300
    """
301
    GanetiBaseAsyncoreDispatcher.__init__(self)
302
    self._out_queue = []
303
    self._family = family
304
    self.create_socket(family, socket.SOCK_DGRAM)
305

    
306
  # this method is overriding an asyncore.dispatcher method
307
  def handle_connect(self):
308
    # Python thinks that the first udp message from a source qualifies as a
309
    # "connect" and further ones are part of the same connection. We beg to
310
    # differ and treat all messages equally.
311
    pass
312

    
313
  # this method is overriding an asyncore.dispatcher method
314
  def handle_read(self):
315
    recv_result = utils.IgnoreSignals(self.recvfrom,
316
                                      constants.MAX_UDP_DATA_SIZE)
317
    if recv_result is not None:
318
      payload, address = recv_result
319
      if self._family == socket.AF_INET6:
320
        # we ignore 'flow info' and 'scope id' as we don't need them
321
        ip, port, _, _ = address
322
      else:
323
        ip, port = address
324

    
325
      self.handle_datagram(payload, ip, port)
326

    
327
  def handle_datagram(self, payload, ip, port):
328
    """Handle an already read udp datagram
329

330
    """
331
    raise NotImplementedError
332

    
333
  # this method is overriding an asyncore.dispatcher method
334
  def writable(self):
335
    # We should check whether we can write to the socket only if we have
336
    # something scheduled to be written
337
    return bool(self._out_queue)
338

    
339
  # this method is overriding an asyncore.dispatcher method
340
  def handle_write(self):
341
    if not self._out_queue:
342
      logging.error("handle_write called with empty output queue")
343
      return
344
    (ip, port, payload) = self._out_queue[0]
345
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
346
    self._out_queue.pop(0)
347

    
348
  def enqueue_send(self, ip, port, payload):
349
    """Enqueue a datagram to be sent when possible
350

351
    """
352
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
353
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
354
                                    constants.MAX_UDP_DATA_SIZE))
355
    self._out_queue.append((ip, port, payload))
356

    
357
  def process_next_packet(self, timeout=0):
358
    """Process the next datagram, waiting for it if necessary.
359

360
    @type timeout: float
361
    @param timeout: how long to wait for data
362
    @rtype: boolean
363
    @return: True if some data has been handled, False otherwise
364

365
    """
366
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
367
    if result is not None and result & select.POLLIN:
368
      self.handle_read()
369
      return True
370
    else:
371
      return False
372

    
373

    
374
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
375
  """A way to notify the asyncore loop that something is going on.
376

377
  If an asyncore daemon is multithreaded when a thread tries to push some data
378
  to a socket, the main loop handling asynchronous requests might be sleeping
379
  waiting on a select(). To avoid this it can create an instance of the
380
  AsyncAwaker, which other threads can use to wake it up.
381

382
  """
383
  def __init__(self, signal_fn=None):
384
    """Constructor for AsyncAwaker
385

386
    @type signal_fn: function
387
    @param signal_fn: function to call when awaken
388

389
    """
390
    GanetiBaseAsyncoreDispatcher.__init__(self)
391
    assert signal_fn == None or callable(signal_fn)
392
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
393
                                                          socket.SOCK_STREAM)
394
    self.in_socket.setblocking(0)
395
    self.in_socket.shutdown(socket.SHUT_WR)
396
    self.out_socket.shutdown(socket.SHUT_RD)
397
    self.set_socket(self.in_socket)
398
    self.need_signal = True
399
    self.signal_fn = signal_fn
400
    self.connected = True
401

    
402
  # this method is overriding an asyncore.dispatcher method
403
  def handle_read(self):
404
    utils.IgnoreSignals(self.recv, 4096)
405
    if self.signal_fn:
406
      self.signal_fn()
407
    self.need_signal = True
408

    
409
  # this method is overriding an asyncore.dispatcher method
410
  def close(self):
411
    asyncore.dispatcher.close(self)
412
    self.out_socket.close()
413

    
414
  def signal(self):
415
    """Signal the asyncore main loop.
416

417
    Any data we send here will be ignored, but it will cause the select() call
418
    to return.
419

420
    """
421
    # Yes, there is a race condition here. No, we don't care, at worst we're
422
    # sending more than one wakeup token, which doesn't harm at all.
423
    if self.need_signal:
424
      self.need_signal = False
425
      self.out_socket.send("\0")
426

    
427

    
428
class Mainloop(object):
429
  """Generic mainloop for daemons
430

431
  @ivar scheduler: A sched.scheduler object, which can be used to register
432
    timed events
433

434
  """
435
  def __init__(self):
436
    """Constructs a new Mainloop instance.
437

438
    """
439
    self._signal_wait = []
440
    self.scheduler = AsyncoreScheduler(time.time)
441

    
442
  @utils.SignalHandled([signal.SIGCHLD])
443
  @utils.SignalHandled([signal.SIGTERM])
444
  @utils.SignalHandled([signal.SIGINT])
445
  def Run(self, signal_handlers=None):
446
    """Runs the mainloop.
447

448
    @type signal_handlers: dict
449
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
450

451
    """
452
    assert isinstance(signal_handlers, dict) and \
453
           len(signal_handlers) > 0, \
454
           "Broken SignalHandled decorator"
455
    running = True
456
    # Start actual main loop
457
    while running:
458
      if not self.scheduler.empty():
459
        try:
460
          self.scheduler.run()
461
        except SchedulerBreakout:
462
          pass
463
      else:
464
        asyncore.loop(count=1, use_poll=True)
465

    
466
      # Check whether a signal was raised
467
      for sig in signal_handlers:
468
        handler = signal_handlers[sig]
469
        if handler.called:
470
          self._CallSignalWaiters(sig)
471
          running = sig not in (signal.SIGTERM, signal.SIGINT)
472
          handler.Clear()
473

    
474
  def _CallSignalWaiters(self, signum):
475
    """Calls all signal waiters for a certain signal.
476

477
    @type signum: int
478
    @param signum: Signal number
479

480
    """
481
    for owner in self._signal_wait:
482
      owner.OnSignal(signum)
483

    
484
  def RegisterSignal(self, owner):
485
    """Registers a receiver for signal notifications
486

487
    The receiver must support a "OnSignal(self, signum)" function.
488

489
    @type owner: instance
490
    @param owner: Receiver
491

492
    """
493
    self._signal_wait.append(owner)
494

    
495

    
496
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
497
                multithreaded=False, console_logging=False,
498
                default_ssl_cert=None, default_ssl_key=None,
499
                user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
500
  """Shared main function for daemons.
501

502
  @type daemon_name: string
503
  @param daemon_name: daemon name
504
  @type optionparser: optparse.OptionParser
505
  @param optionparser: initialized optionparser with daemon-specific options
506
                       (common -f -d options will be handled by this module)
507
  @type dirs: list of (string, integer)
508
  @param dirs: list of directories that must be created if they don't exist,
509
               and the permissions to be used to create them
510
  @type check_fn: function which accepts (options, args)
511
  @param check_fn: function that checks start conditions and exits if they're
512
                   not met
513
  @type exec_fn: function which accepts (options, args)
514
  @param exec_fn: function that's executed with the daemon's pid file held, and
515
                  runs the daemon itself.
516
  @type multithreaded: bool
517
  @param multithreaded: Whether the daemon uses threads
518
  @type console_logging: boolean
519
  @param console_logging: if True, the daemon will fall back to the system
520
                          console if logging fails
521
  @type default_ssl_cert: string
522
  @param default_ssl_cert: Default SSL certificate path
523
  @type default_ssl_key: string
524
  @param default_ssl_key: Default SSL key path
525
  @param user: Default user to run as
526
  @type user: string
527
  @param group: Default group to run as
528
  @type group: string
529

530
  """
531
  optionparser.add_option("-f", "--foreground", dest="fork",
532
                          help="Don't detach from the current terminal",
533
                          default=True, action="store_false")
534
  optionparser.add_option("-d", "--debug", dest="debug",
535
                          help="Enable some debug messages",
536
                          default=False, action="store_true")
537
  optionparser.add_option("--syslog", dest="syslog",
538
                          help="Enable logging to syslog (except debug"
539
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
540
                          constants.SYSLOG_USAGE,
541
                          default=constants.SYSLOG_USAGE,
542
                          choices=["no", "yes", "only"])
543

    
544
  if daemon_name in constants.DAEMONS_PORTS:
545
    default_bind_address = constants.IP4_ADDRESS_ANY
546
    try:
547
      family = ssconf.SimpleStore().GetPrimaryIPFamily()
548
      if family == netutils.IP6Address.family:
549
        default_bind_address = constants.IP6_ADDRESS_ANY
550
    except errors.ConfigurationError:
551
      # This case occurs when adding a node, as there is no ssconf available
552
      # when noded is first started. In that case, however, the correct
553
      # bind_address must be passed
554
      pass
555

    
556
    default_port = netutils.GetDaemonPort(daemon_name)
557

    
558
    # For networked daemons we allow choosing the port and bind address
559
    optionparser.add_option("-p", "--port", dest="port",
560
                            help="Network port (default: %s)" % default_port,
561
                            default=default_port, type="int")
562
    optionparser.add_option("-b", "--bind", dest="bind_address",
563
                            help=("Bind address (default: '%s')" %
564
                                  default_bind_address),
565
                            default=default_bind_address, metavar="ADDRESS")
566

    
567
  if default_ssl_key is not None and default_ssl_cert is not None:
568
    optionparser.add_option("--no-ssl", dest="ssl",
569
                            help="Do not secure HTTP protocol with SSL",
570
                            default=True, action="store_false")
571
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
572
                            help=("SSL key path (default: %s)" %
573
                                  default_ssl_key),
574
                            default=default_ssl_key, type="string",
575
                            metavar="SSL_KEY_PATH")
576
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
577
                            help=("SSL certificate path (default: %s)" %
578
                                  default_ssl_cert),
579
                            default=default_ssl_cert, type="string",
580
                            metavar="SSL_CERT_PATH")
581

    
582
  # Disable the use of fork(2) if the daemon uses threads
583
  utils.no_fork = multithreaded
584

    
585
  options, args = optionparser.parse_args()
586

    
587
  if getattr(options, "ssl", False):
588
    ssl_paths = {
589
      "certificate": options.ssl_cert,
590
      "key": options.ssl_key,
591
      }
592

    
593
    for name, path in ssl_paths.iteritems():
594
      if not os.path.isfile(path):
595
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
596
        sys.exit(constants.EXIT_FAILURE)
597

    
598
    # TODO: By initiating http.HttpSslParams here we would only read the files
599
    # once and have a proper validation (isfile returns False on directories)
600
    # at the same time.
601

    
602
  if check_fn is not None:
603
    check_fn(options, args)
604

    
605
  utils.EnsureDirs(dirs)
606

    
607
  if options.fork:
608
    try:
609
      uid = pwd.getpwnam(user).pw_uid
610
      gid = grp.getgrnam(group).gr_gid
611
    except KeyError:
612
      raise errors.ConfigurationError("User or group not existing on system:"
613
                                      " %s:%s" % (user, group))
614
    utils.CloseFDs()
615
    utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
616

    
617
  utils.WritePidFile(daemon_name)
618
  try:
619
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
620
                       debug=options.debug,
621
                       stderr_logging=not options.fork,
622
                       multithreaded=multithreaded,
623
                       program=daemon_name,
624
                       syslog=options.syslog,
625
                       console_logging=console_logging)
626
    logging.info("%s daemon startup", daemon_name)
627
    exec_fn(options, args)
628
  finally:
629
    utils.RemovePidFile(daemon_name)