Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ aa9f8167

History | View | Annotate | Download (21.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import asynchat
27
import collections
28
import grp
29
import os
30
import pwd
31
import signal
32
import logging
33
import sched
34
import time
35
import socket
36
import select
37
import sys
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import netutils
43

    
44

    
45
_DEFAULT_RUN_USER = "root"
46
_DEFAULT_RUN_GROUP = "root"
47

    
48

    
49
class SchedulerBreakout(Exception):
50
  """Exception used to get out of the scheduler loop
51

52
  """
53

    
54

    
55
def AsyncoreDelayFunction(timeout):
56
  """Asyncore-compatible scheduler delay function.
57

58
  This is a delay function for sched that, rather than actually sleeping,
59
  executes asyncore events happening in the meantime.
60

61
  After an event has occurred, rather than returning, it raises a
62
  SchedulerBreakout exception, which will force the current scheduler.run()
63
  invocation to terminate, so that we can also check for signals. The main loop
64
  will then call the scheduler run again, which will allow it to actually
65
  process any due events.
66

67
  This is needed because scheduler.run() doesn't support a count=..., as
68
  asyncore loop, and the scheduler module documents throwing exceptions from
69
  inside the delay function as an allowed usage model.
70

71
  """
72
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
73
  raise SchedulerBreakout()
74

    
75

    
76
class AsyncoreScheduler(sched.scheduler):
77
  """Event scheduler integrated with asyncore
78

79
  """
80
  def __init__(self, timefunc):
81
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
82

    
83

    
84
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
85
  """Base Ganeti Asyncore Dispacher
86

87
  """
88
  # this method is overriding an asyncore.dispatcher method
89
  def handle_error(self):
90
    """Log an error in handling any request, and proceed.
91

92
    """
93
    logging.exception("Error while handling asyncore request")
94

    
95
  # this method is overriding an asyncore.dispatcher method
96
  def writable(self):
97
    """Most of the time we don't want to check for writability.
98

99
    """
100
    return False
101

    
102

    
103
def FormatAddress(family, address):
104
  """Format a client's address
105

106
  @type family: integer
107
  @param family: socket family (one of socket.AF_*)
108
  @type address: family specific (usually tuple)
109
  @param address: address, as reported by this class
110

111
  """
112
  if family == socket.AF_INET and len(address) == 2:
113
    return "%s:%d" % address
114
  elif family == socket.AF_UNIX and len(address) == 3:
115
    return "pid=%s, uid=%s, gid=%s" % address
116
  else:
117
    return str(address)
118

    
119

    
120
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
121
  """A stream server to use with asyncore.
122

123
  Each request is accepted, and then dispatched to a separate asyncore
124
  dispatcher to handle.
125

126
  """
127

    
128
  _REQUEST_QUEUE_SIZE = 5
129

    
130
  def __init__(self, family, address):
131
    """Constructor for AsyncUnixStreamSocket
132

133
    @type family: integer
134
    @param family: socket family (one of socket.AF_*)
135
    @type address: address family dependent
136
    @param address: address to bind the socket to
137

138
    """
139
    GanetiBaseAsyncoreDispatcher.__init__(self)
140
    self.family = family
141
    self.create_socket(self.family, socket.SOCK_STREAM)
142
    self.set_reuse_addr()
143
    self.bind(address)
144
    self.listen(self._REQUEST_QUEUE_SIZE)
145

    
146
  # this method is overriding an asyncore.dispatcher method
147
  def handle_accept(self):
148
    """Accept a new client connection.
149

150
    Creates a new instance of the handler class, which will use asyncore to
151
    serve the client.
152

153
    """
154
    accept_result = utils.IgnoreSignals(self.accept)
155
    if accept_result is not None:
156
      connected_socket, client_address = accept_result
157
      if self.family == socket.AF_UNIX:
158
        # override the client address, as for unix sockets nothing meaningful
159
        # is passed in from accept anyway
160
        client_address = netutils.GetSocketCredentials(connected_socket)
161
      logging.info("Accepted connection from %s",
162
                   FormatAddress(self.family, client_address))
163
      self.handle_connection(connected_socket, client_address)
164

    
165
  def handle_connection(self, connected_socket, client_address):
166
    """Handle an already accepted connection.
167

168
    """
169
    raise NotImplementedError
170

    
171

    
172
class AsyncTerminatedMessageStream(asynchat.async_chat):
173
  """A terminator separated message stream asyncore module.
174

175
  Handles a stream connection receiving messages terminated by a defined
176
  separator. For each complete message handle_message is called.
177

178
  """
179
  def __init__(self, connected_socket, peer_address, terminator, family,
180
               unhandled_limit):
181
    """AsyncTerminatedMessageStream constructor.
182

183
    @type connected_socket: socket.socket
184
    @param connected_socket: connected stream socket to receive messages from
185
    @param peer_address: family-specific peer address
186
    @type terminator: string
187
    @param terminator: terminator separating messages in the stream
188
    @type family: integer
189
    @param family: socket family
190
    @type unhandled_limit: integer or None
191
    @param unhandled_limit: maximum unanswered messages
192

193
    """
194
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
195
    # using a positional argument rather than a keyword one.
196
    asynchat.async_chat.__init__(self, connected_socket)
197
    self.connected_socket = connected_socket
198
    # on python 2.4 there is no "family" attribute for the socket class
199
    # FIXME: when we move to python 2.5 or above remove the family parameter
200
    #self.family = self.connected_socket.family
201
    self.family = family
202
    self.peer_address = peer_address
203
    self.terminator = terminator
204
    self.unhandled_limit = unhandled_limit
205
    self.set_terminator(terminator)
206
    self.ibuffer = []
207
    self.receive_count = 0
208
    self.send_count = 0
209
    self.oqueue = collections.deque()
210
    self.iqueue = collections.deque()
211

    
212
  # this method is overriding an asynchat.async_chat method
213
  def collect_incoming_data(self, data):
214
    self.ibuffer.append(data)
215

    
216
  def _can_handle_message(self):
217
    return (self.unhandled_limit is None or
218
            (self.receive_count < self.send_count + self.unhandled_limit) and
219
             not self.iqueue)
220

    
221
  # this method is overriding an asynchat.async_chat method
222
  def found_terminator(self):
223
    message = "".join(self.ibuffer)
224
    self.ibuffer = []
225
    message_id = self.receive_count
226
    # We need to increase the receive_count after checking if the message can
227
    # be handled, but before calling handle_message
228
    can_handle = self._can_handle_message()
229
    self.receive_count += 1
230
    if can_handle:
231
      self.handle_message(message, message_id)
232
    else:
233
      self.iqueue.append((message, message_id))
234

    
235
  def handle_message(self, message, message_id):
236
    """Handle a terminated message.
237

238
    @type message: string
239
    @param message: message to handle
240
    @type message_id: integer
241
    @param message_id: stream's message sequence number
242

243
    """
244
    pass
245
    # TODO: move this method to raise NotImplementedError
246
    # raise NotImplementedError
247

    
248
  def send_message(self, message):
249
    """Send a message to the remote peer. This function is thread-safe.
250

251
    @type message: string
252
    @param message: message to send, without the terminator
253

254
    @warning: If calling this function from a thread different than the one
255
    performing the main asyncore loop, remember that you have to wake that one
256
    up.
257

258
    """
259
    # If we just append the message we received to the output queue, this
260
    # function can be safely called by multiple threads at the same time, and
261
    # we don't need locking, since deques are thread safe. handle_write in the
262
    # asyncore thread will handle the next input message if there are any
263
    # enqueued.
264
    self.oqueue.append(message)
265

    
266
  # this method is overriding an asyncore.dispatcher method
267
  def readable(self):
268
    # read from the socket if we can handle the next requests
269
    return self._can_handle_message() and asynchat.async_chat.readable(self)
270

    
271
  # this method is overriding an asyncore.dispatcher method
272
  def writable(self):
273
    # the output queue may become full just after we called writable. This only
274
    # works if we know we'll have something else waking us up from the select,
275
    # in such case, anyway.
276
    return asynchat.async_chat.writable(self) or self.oqueue
277

    
278
  # this method is overriding an asyncore.dispatcher method
279
  def handle_write(self):
280
    if self.oqueue:
281
      # if we have data in the output queue, then send_message was called.
282
      # this means we can process one more message from the input queue, if
283
      # there are any.
284
      data = self.oqueue.popleft()
285
      self.push(data + self.terminator)
286
      self.send_count += 1
287
      if self.iqueue:
288
        self.handle_message(*self.iqueue.popleft())
289
    self.initiate_send()
290

    
291
  def close_log(self):
292
    logging.info("Closing connection from %s",
293
                 FormatAddress(self.family, self.peer_address))
294
    self.close()
295

    
296
  # this method is overriding an asyncore.dispatcher method
297
  def handle_expt(self):
298
    self.close_log()
299

    
300
  # this method is overriding an asyncore.dispatcher method
301
  def handle_error(self):
302
    """Log an error in handling any request, and proceed.
303

304
    """
305
    logging.exception("Error while handling asyncore request")
306
    self.close_log()
307

    
308

    
309
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
310
  """An improved asyncore udp socket.
311

312
  """
313
  def __init__(self, family):
314
    """Constructor for AsyncUDPSocket
315

316
    """
317
    GanetiBaseAsyncoreDispatcher.__init__(self)
318
    self._out_queue = []
319
    self._family = family
320
    self.create_socket(family, socket.SOCK_DGRAM)
321

    
322
  # this method is overriding an asyncore.dispatcher method
323
  def handle_connect(self):
324
    # Python thinks that the first udp message from a source qualifies as a
325
    # "connect" and further ones are part of the same connection. We beg to
326
    # differ and treat all messages equally.
327
    pass
328

    
329
  # this method is overriding an asyncore.dispatcher method
330
  def handle_read(self):
331
    recv_result = utils.IgnoreSignals(self.recvfrom,
332
                                      constants.MAX_UDP_DATA_SIZE)
333
    if recv_result is not None:
334
      payload, address = recv_result
335
      if self._family == socket.AF_INET6:
336
        # we ignore 'flow info' and 'scope id' as we don't need them
337
        ip, port, _, _ = address
338
      else:
339
        ip, port = address
340

    
341
      self.handle_datagram(payload, ip, port)
342

    
343
  def handle_datagram(self, payload, ip, port):
344
    """Handle an already read udp datagram
345

346
    """
347
    raise NotImplementedError
348

    
349
  # this method is overriding an asyncore.dispatcher method
350
  def writable(self):
351
    # We should check whether we can write to the socket only if we have
352
    # something scheduled to be written
353
    return bool(self._out_queue)
354

    
355
  # this method is overriding an asyncore.dispatcher method
356
  def handle_write(self):
357
    if not self._out_queue:
358
      logging.error("handle_write called with empty output queue")
359
      return
360
    (ip, port, payload) = self._out_queue[0]
361
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
362
    self._out_queue.pop(0)
363

    
364
  def enqueue_send(self, ip, port, payload):
365
    """Enqueue a datagram to be sent when possible
366

367
    """
368
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
369
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
370
                                    constants.MAX_UDP_DATA_SIZE))
371
    self._out_queue.append((ip, port, payload))
372

    
373
  def process_next_packet(self, timeout=0):
374
    """Process the next datagram, waiting for it if necessary.
375

376
    @type timeout: float
377
    @param timeout: how long to wait for data
378
    @rtype: boolean
379
    @return: True if some data has been handled, False otherwise
380

381
    """
382
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
383
    if result is not None and result & select.POLLIN:
384
      self.handle_read()
385
      return True
386
    else:
387
      return False
388

    
389

    
390
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
391
  """A way to notify the asyncore loop that something is going on.
392

393
  If an asyncore daemon is multithreaded when a thread tries to push some data
394
  to a socket, the main loop handling asynchronous requests might be sleeping
395
  waiting on a select(). To avoid this it can create an instance of the
396
  AsyncAwaker, which other threads can use to wake it up.
397

398
  """
399
  def __init__(self, signal_fn=None):
400
    """Constructor for AsyncAwaker
401

402
    @type signal_fn: function
403
    @param signal_fn: function to call when awaken
404

405
    """
406
    GanetiBaseAsyncoreDispatcher.__init__(self)
407
    assert signal_fn == None or callable(signal_fn)
408
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
409
                                                          socket.SOCK_STREAM)
410
    self.in_socket.setblocking(0)
411
    self.in_socket.shutdown(socket.SHUT_WR)
412
    self.out_socket.shutdown(socket.SHUT_RD)
413
    self.set_socket(self.in_socket)
414
    self.need_signal = True
415
    self.signal_fn = signal_fn
416
    self.connected = True
417

    
418
  # this method is overriding an asyncore.dispatcher method
419
  def handle_read(self):
420
    utils.IgnoreSignals(self.recv, 4096)
421
    if self.signal_fn:
422
      self.signal_fn()
423
    self.need_signal = True
424

    
425
  # this method is overriding an asyncore.dispatcher method
426
  def close(self):
427
    asyncore.dispatcher.close(self)
428
    self.out_socket.close()
429

    
430
  def signal(self):
431
    """Signal the asyncore main loop.
432

433
    Any data we send here will be ignored, but it will cause the select() call
434
    to return.
435

436
    """
437
    # Yes, there is a race condition here. No, we don't care, at worst we're
438
    # sending more than one wakeup token, which doesn't harm at all.
439
    if self.need_signal:
440
      self.need_signal = False
441
      self.out_socket.send("\0")
442

    
443

    
444
class Mainloop(object):
445
  """Generic mainloop for daemons
446

447
  @ivar scheduler: A sched.scheduler object, which can be used to register
448
    timed events
449

450
  """
451
  def __init__(self):
452
    """Constructs a new Mainloop instance.
453

454
    """
455
    self._signal_wait = []
456
    self.scheduler = AsyncoreScheduler(time.time)
457

    
458
  @utils.SignalHandled([signal.SIGCHLD])
459
  @utils.SignalHandled([signal.SIGTERM])
460
  @utils.SignalHandled([signal.SIGINT])
461
  def Run(self, signal_handlers=None):
462
    """Runs the mainloop.
463

464
    @type signal_handlers: dict
465
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
466

467
    """
468
    assert isinstance(signal_handlers, dict) and \
469
           len(signal_handlers) > 0, \
470
           "Broken SignalHandled decorator"
471
    running = True
472
    # Start actual main loop
473
    while running:
474
      if not self.scheduler.empty():
475
        try:
476
          self.scheduler.run()
477
        except SchedulerBreakout:
478
          pass
479
      else:
480
        asyncore.loop(count=1, use_poll=True)
481

    
482
      # Check whether a signal was raised
483
      for sig in signal_handlers:
484
        handler = signal_handlers[sig]
485
        if handler.called:
486
          self._CallSignalWaiters(sig)
487
          running = sig not in (signal.SIGTERM, signal.SIGINT)
488
          handler.Clear()
489

    
490
  def _CallSignalWaiters(self, signum):
491
    """Calls all signal waiters for a certain signal.
492

493
    @type signum: int
494
    @param signum: Signal number
495

496
    """
497
    for owner in self._signal_wait:
498
      owner.OnSignal(signum)
499

    
500
  def RegisterSignal(self, owner):
501
    """Registers a receiver for signal notifications
502

503
    The receiver must support a "OnSignal(self, signum)" function.
504

505
    @type owner: instance
506
    @param owner: Receiver
507

508
    """
509
    self._signal_wait.append(owner)
510

    
511

    
512
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
513
                multithreaded=False, console_logging=False,
514
                default_ssl_cert=None, default_ssl_key=None,
515
                user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
516
  """Shared main function for daemons.
517

518
  @type daemon_name: string
519
  @param daemon_name: daemon name
520
  @type optionparser: optparse.OptionParser
521
  @param optionparser: initialized optionparser with daemon-specific options
522
                       (common -f -d options will be handled by this module)
523
  @type dirs: list of (string, integer)
524
  @param dirs: list of directories that must be created if they don't exist,
525
               and the permissions to be used to create them
526
  @type check_fn: function which accepts (options, args)
527
  @param check_fn: function that checks start conditions and exits if they're
528
                   not met
529
  @type exec_fn: function which accepts (options, args)
530
  @param exec_fn: function that's executed with the daemon's pid file held, and
531
                  runs the daemon itself.
532
  @type multithreaded: bool
533
  @param multithreaded: Whether the daemon uses threads
534
  @type console_logging: boolean
535
  @param console_logging: if True, the daemon will fall back to the system
536
                          console if logging fails
537
  @type default_ssl_cert: string
538
  @param default_ssl_cert: Default SSL certificate path
539
  @type default_ssl_key: string
540
  @param default_ssl_key: Default SSL key path
541
  @param user: Default user to run as
542
  @type user: string
543
  @param group: Default group to run as
544
  @type group: string
545

546
  """
547
  optionparser.add_option("-f", "--foreground", dest="fork",
548
                          help="Don't detach from the current terminal",
549
                          default=True, action="store_false")
550
  optionparser.add_option("-d", "--debug", dest="debug",
551
                          help="Enable some debug messages",
552
                          default=False, action="store_true")
553
  optionparser.add_option("--syslog", dest="syslog",
554
                          help="Enable logging to syslog (except debug"
555
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
556
                          constants.SYSLOG_USAGE,
557
                          default=constants.SYSLOG_USAGE,
558
                          choices=["no", "yes", "only"])
559

    
560
  if daemon_name in constants.DAEMONS_PORTS:
561
    default_bind_address = constants.IP4_ADDRESS_ANY
562
    default_port = netutils.GetDaemonPort(daemon_name)
563

    
564
    # For networked daemons we allow choosing the port and bind address
565
    optionparser.add_option("-p", "--port", dest="port",
566
                            help="Network port (default: %s)" % default_port,
567
                            default=default_port, type="int")
568
    optionparser.add_option("-b", "--bind", dest="bind_address",
569
                            help=("Bind address (default: %s)" %
570
                                  default_bind_address),
571
                            default=default_bind_address, metavar="ADDRESS")
572

    
573
  if default_ssl_key is not None and default_ssl_cert is not None:
574
    optionparser.add_option("--no-ssl", dest="ssl",
575
                            help="Do not secure HTTP protocol with SSL",
576
                            default=True, action="store_false")
577
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
578
                            help=("SSL key path (default: %s)" %
579
                                  default_ssl_key),
580
                            default=default_ssl_key, type="string",
581
                            metavar="SSL_KEY_PATH")
582
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
583
                            help=("SSL certificate path (default: %s)" %
584
                                  default_ssl_cert),
585
                            default=default_ssl_cert, type="string",
586
                            metavar="SSL_CERT_PATH")
587

    
588
  # Disable the use of fork(2) if the daemon uses threads
589
  utils.no_fork = multithreaded
590

    
591
  options, args = optionparser.parse_args()
592

    
593
  if getattr(options, "ssl", False):
594
    ssl_paths = {
595
      "certificate": options.ssl_cert,
596
      "key": options.ssl_key,
597
      }
598

    
599
    for name, path in ssl_paths.iteritems():
600
      if not os.path.isfile(path):
601
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
602
        sys.exit(constants.EXIT_FAILURE)
603

    
604
    # TODO: By initiating http.HttpSslParams here we would only read the files
605
    # once and have a proper validation (isfile returns False on directories)
606
    # at the same time.
607

    
608
  if check_fn is not None:
609
    check_fn(options, args)
610

    
611
  utils.EnsureDirs(dirs)
612

    
613
  if options.fork:
614
    try:
615
      uid = pwd.getpwnam(user).pw_uid
616
      gid = grp.getgrnam(group).gr_gid
617
    except KeyError:
618
      raise errors.ConfigurationError("User or group not existing on system:"
619
                                      " %s:%s" % (user, group))
620
    utils.CloseFDs()
621
    utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
622

    
623
  utils.WritePidFile(daemon_name)
624
  try:
625
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
626
                       debug=options.debug,
627
                       stderr_logging=not options.fork,
628
                       multithreaded=multithreaded,
629
                       program=daemon_name,
630
                       syslog=options.syslog,
631
                       console_logging=console_logging)
632
    logging.info("%s daemon startup", daemon_name)
633
    exec_fn(options, args)
634
  finally:
635
    utils.RemovePidFile(daemon_name)