Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 1a8337f2

History | View | Annotate | Download (20.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import asynchat
27
import collections
28
import grp
29
import os
30
import pwd
31
import signal
32
import logging
33
import sched
34
import time
35
import socket
36
import select
37
import sys
38

    
39
from ganeti import utils
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import netutils
43

    
44

    
45
_DEFAULT_RUN_USER = "root"
46
_DEFAULT_RUN_GROUP = "root"
47

    
48

    
49
class SchedulerBreakout(Exception):
50
  """Exception used to get out of the scheduler loop
51

52
  """
53

    
54

    
55
def AsyncoreDelayFunction(timeout):
56
  """Asyncore-compatible scheduler delay function.
57

58
  This is a delay function for sched that, rather than actually sleeping,
59
  executes asyncore events happening in the meantime.
60

61
  After an event has occurred, rather than returning, it raises a
62
  SchedulerBreakout exception, which will force the current scheduler.run()
63
  invocation to terminate, so that we can also check for signals. The main loop
64
  will then call the scheduler run again, which will allow it to actually
65
  process any due events.
66

67
  This is needed because scheduler.run() doesn't support a count=..., as
68
  asyncore loop, and the scheduler module documents throwing exceptions from
69
  inside the delay function as an allowed usage model.
70

71
  """
72
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
73
  raise SchedulerBreakout()
74

    
75

    
76
class AsyncoreScheduler(sched.scheduler):
77
  """Event scheduler integrated with asyncore
78

79
  """
80
  def __init__(self, timefunc):
81
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
82

    
83

    
84
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
85
  """Base Ganeti Asyncore Dispacher
86

87
  """
88
  # this method is overriding an asyncore.dispatcher method
89
  def handle_error(self):
90
    """Log an error in handling any request, and proceed.
91

92
    """
93
    logging.exception("Error while handling asyncore request")
94

    
95
  # this method is overriding an asyncore.dispatcher method
96
  def writable(self):
97
    """Most of the time we don't want to check for writability.
98

99
    """
100
    return False
101

    
102

    
103
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
104
  """A stream server to use with asyncore.
105

106
  Each request is accepted, and then dispatched to a separate asyncore
107
  dispatcher to handle.
108

109
  """
110

    
111
  _REQUEST_QUEUE_SIZE = 5
112

    
113
  def __init__(self, family, address):
114
    """Constructor for AsyncUnixStreamSocket
115

116
    @type family: integer
117
    @param family: socket family (one of socket.AF_*)
118
    @type address: address family dependent
119
    @param address: address to bind the socket to
120

121
    """
122
    GanetiBaseAsyncoreDispatcher.__init__(self)
123
    self.family = family
124
    self.create_socket(self.family, socket.SOCK_STREAM)
125
    self.set_reuse_addr()
126
    self.bind(address)
127
    self.listen(self._REQUEST_QUEUE_SIZE)
128

    
129
  # this method is overriding an asyncore.dispatcher method
130
  def handle_accept(self):
131
    """Accept a new client connection.
132

133
    Creates a new instance of the handler class, which will use asyncore to
134
    serve the client.
135

136
    """
137
    accept_result = utils.IgnoreSignals(self.accept)
138
    if accept_result is not None:
139
      connected_socket, client_address = accept_result
140
      if self.family == socket.AF_UNIX:
141
        # override the client address, as for unix sockets nothing meaningful
142
        # is passed in from accept anyway
143
        client_address = netutils.GetSocketCredentials(connected_socket)
144
      logging.info("Accepted connection from %s",
145
                   netutils.FormatAddress(self.family, client_address))
146
      self.handle_connection(connected_socket, client_address)
147

    
148
  def handle_connection(self, connected_socket, client_address):
149
    """Handle an already accepted connection.
150

151
    """
152
    raise NotImplementedError
153

    
154

    
155
class AsyncTerminatedMessageStream(asynchat.async_chat):
156
  """A terminator separated message stream asyncore module.
157

158
  Handles a stream connection receiving messages terminated by a defined
159
  separator. For each complete message handle_message is called.
160

161
  """
162
  def __init__(self, connected_socket, peer_address, terminator, family,
163
               unhandled_limit):
164
    """AsyncTerminatedMessageStream constructor.
165

166
    @type connected_socket: socket.socket
167
    @param connected_socket: connected stream socket to receive messages from
168
    @param peer_address: family-specific peer address
169
    @type terminator: string
170
    @param terminator: terminator separating messages in the stream
171
    @type family: integer
172
    @param family: socket family
173
    @type unhandled_limit: integer or None
174
    @param unhandled_limit: maximum unanswered messages
175

176
    """
177
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
178
    # using a positional argument rather than a keyword one.
179
    asynchat.async_chat.__init__(self, connected_socket)
180
    self.connected_socket = connected_socket
181
    # on python 2.4 there is no "family" attribute for the socket class
182
    # FIXME: when we move to python 2.5 or above remove the family parameter
183
    #self.family = self.connected_socket.family
184
    self.family = family
185
    self.peer_address = peer_address
186
    self.terminator = terminator
187
    self.unhandled_limit = unhandled_limit
188
    self.set_terminator(terminator)
189
    self.ibuffer = []
190
    self.receive_count = 0
191
    self.send_count = 0
192
    self.oqueue = collections.deque()
193
    self.iqueue = collections.deque()
194

    
195
  # this method is overriding an asynchat.async_chat method
196
  def collect_incoming_data(self, data):
197
    self.ibuffer.append(data)
198

    
199
  def _can_handle_message(self):
200
    return (self.unhandled_limit is None or
201
            (self.receive_count < self.send_count + self.unhandled_limit) and
202
             not self.iqueue)
203

    
204
  # this method is overriding an asynchat.async_chat method
205
  def found_terminator(self):
206
    message = "".join(self.ibuffer)
207
    self.ibuffer = []
208
    message_id = self.receive_count
209
    # We need to increase the receive_count after checking if the message can
210
    # be handled, but before calling handle_message
211
    can_handle = self._can_handle_message()
212
    self.receive_count += 1
213
    if can_handle:
214
      self.handle_message(message, message_id)
215
    else:
216
      self.iqueue.append((message, message_id))
217

    
218
  def handle_message(self, message, message_id):
219
    """Handle a terminated message.
220

221
    @type message: string
222
    @param message: message to handle
223
    @type message_id: integer
224
    @param message_id: stream's message sequence number
225

226
    """
227
    pass
228
    # TODO: move this method to raise NotImplementedError
229
    # raise NotImplementedError
230

    
231
  def send_message(self, message):
232
    """Send a message to the remote peer. This function is thread-safe.
233

234
    @type message: string
235
    @param message: message to send, without the terminator
236

237
    @warning: If calling this function from a thread different than the one
238
    performing the main asyncore loop, remember that you have to wake that one
239
    up.
240

241
    """
242
    # If we just append the message we received to the output queue, this
243
    # function can be safely called by multiple threads at the same time, and
244
    # we don't need locking, since deques are thread safe. handle_write in the
245
    # asyncore thread will handle the next input message if there are any
246
    # enqueued.
247
    self.oqueue.append(message)
248

    
249
  # this method is overriding an asyncore.dispatcher method
250
  def readable(self):
251
    # read from the socket if we can handle the next requests
252
    return self._can_handle_message() and asynchat.async_chat.readable(self)
253

    
254
  # this method is overriding an asyncore.dispatcher method
255
  def writable(self):
256
    # the output queue may become full just after we called writable. This only
257
    # works if we know we'll have something else waking us up from the select,
258
    # in such case, anyway.
259
    return asynchat.async_chat.writable(self) or self.oqueue
260

    
261
  # this method is overriding an asyncore.dispatcher method
262
  def handle_write(self):
263
    if self.oqueue:
264
      # if we have data in the output queue, then send_message was called.
265
      # this means we can process one more message from the input queue, if
266
      # there are any.
267
      data = self.oqueue.popleft()
268
      self.push(data + self.terminator)
269
      self.send_count += 1
270
      if self.iqueue:
271
        self.handle_message(*self.iqueue.popleft())
272
    self.initiate_send()
273

    
274
  def close_log(self):
275
    logging.info("Closing connection from %s",
276
                 netutils.FormatAddress(self.family, self.peer_address))
277
    self.close()
278

    
279
  # this method is overriding an asyncore.dispatcher method
280
  def handle_expt(self):
281
    self.close_log()
282

    
283
  # this method is overriding an asyncore.dispatcher method
284
  def handle_error(self):
285
    """Log an error in handling any request, and proceed.
286

287
    """
288
    logging.exception("Error while handling asyncore request")
289
    self.close_log()
290

    
291

    
292
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
293
  """An improved asyncore udp socket.
294

295
  """
296
  def __init__(self, family):
297
    """Constructor for AsyncUDPSocket
298

299
    """
300
    GanetiBaseAsyncoreDispatcher.__init__(self)
301
    self._out_queue = []
302
    self._family = family
303
    self.create_socket(family, socket.SOCK_DGRAM)
304

    
305
  # this method is overriding an asyncore.dispatcher method
306
  def handle_connect(self):
307
    # Python thinks that the first udp message from a source qualifies as a
308
    # "connect" and further ones are part of the same connection. We beg to
309
    # differ and treat all messages equally.
310
    pass
311

    
312
  # this method is overriding an asyncore.dispatcher method
313
  def handle_read(self):
314
    recv_result = utils.IgnoreSignals(self.recvfrom,
315
                                      constants.MAX_UDP_DATA_SIZE)
316
    if recv_result is not None:
317
      payload, address = recv_result
318
      if self._family == socket.AF_INET6:
319
        # we ignore 'flow info' and 'scope id' as we don't need them
320
        ip, port, _, _ = address
321
      else:
322
        ip, port = address
323

    
324
      self.handle_datagram(payload, ip, port)
325

    
326
  def handle_datagram(self, payload, ip, port):
327
    """Handle an already read udp datagram
328

329
    """
330
    raise NotImplementedError
331

    
332
  # this method is overriding an asyncore.dispatcher method
333
  def writable(self):
334
    # We should check whether we can write to the socket only if we have
335
    # something scheduled to be written
336
    return bool(self._out_queue)
337

    
338
  # this method is overriding an asyncore.dispatcher method
339
  def handle_write(self):
340
    if not self._out_queue:
341
      logging.error("handle_write called with empty output queue")
342
      return
343
    (ip, port, payload) = self._out_queue[0]
344
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
345
    self._out_queue.pop(0)
346

    
347
  def enqueue_send(self, ip, port, payload):
348
    """Enqueue a datagram to be sent when possible
349

350
    """
351
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
352
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
353
                                    constants.MAX_UDP_DATA_SIZE))
354
    self._out_queue.append((ip, port, payload))
355

    
356
  def process_next_packet(self, timeout=0):
357
    """Process the next datagram, waiting for it if necessary.
358

359
    @type timeout: float
360
    @param timeout: how long to wait for data
361
    @rtype: boolean
362
    @return: True if some data has been handled, False otherwise
363

364
    """
365
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
366
    if result is not None and result & select.POLLIN:
367
      self.handle_read()
368
      return True
369
    else:
370
      return False
371

    
372

    
373
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
374
  """A way to notify the asyncore loop that something is going on.
375

376
  If an asyncore daemon is multithreaded when a thread tries to push some data
377
  to a socket, the main loop handling asynchronous requests might be sleeping
378
  waiting on a select(). To avoid this it can create an instance of the
379
  AsyncAwaker, which other threads can use to wake it up.
380

381
  """
382
  def __init__(self, signal_fn=None):
383
    """Constructor for AsyncAwaker
384

385
    @type signal_fn: function
386
    @param signal_fn: function to call when awaken
387

388
    """
389
    GanetiBaseAsyncoreDispatcher.__init__(self)
390
    assert signal_fn == None or callable(signal_fn)
391
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
392
                                                          socket.SOCK_STREAM)
393
    self.in_socket.setblocking(0)
394
    self.in_socket.shutdown(socket.SHUT_WR)
395
    self.out_socket.shutdown(socket.SHUT_RD)
396
    self.set_socket(self.in_socket)
397
    self.need_signal = True
398
    self.signal_fn = signal_fn
399
    self.connected = True
400

    
401
  # this method is overriding an asyncore.dispatcher method
402
  def handle_read(self):
403
    utils.IgnoreSignals(self.recv, 4096)
404
    if self.signal_fn:
405
      self.signal_fn()
406
    self.need_signal = True
407

    
408
  # this method is overriding an asyncore.dispatcher method
409
  def close(self):
410
    asyncore.dispatcher.close(self)
411
    self.out_socket.close()
412

    
413
  def signal(self):
414
    """Signal the asyncore main loop.
415

416
    Any data we send here will be ignored, but it will cause the select() call
417
    to return.
418

419
    """
420
    # Yes, there is a race condition here. No, we don't care, at worst we're
421
    # sending more than one wakeup token, which doesn't harm at all.
422
    if self.need_signal:
423
      self.need_signal = False
424
      self.out_socket.send("\0")
425

    
426

    
427
class Mainloop(object):
428
  """Generic mainloop for daemons
429

430
  @ivar scheduler: A sched.scheduler object, which can be used to register
431
    timed events
432

433
  """
434
  def __init__(self):
435
    """Constructs a new Mainloop instance.
436

437
    """
438
    self._signal_wait = []
439
    self.scheduler = AsyncoreScheduler(time.time)
440

    
441
  @utils.SignalHandled([signal.SIGCHLD])
442
  @utils.SignalHandled([signal.SIGTERM])
443
  @utils.SignalHandled([signal.SIGINT])
444
  def Run(self, signal_handlers=None):
445
    """Runs the mainloop.
446

447
    @type signal_handlers: dict
448
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
449

450
    """
451
    assert isinstance(signal_handlers, dict) and \
452
           len(signal_handlers) > 0, \
453
           "Broken SignalHandled decorator"
454
    running = True
455
    # Start actual main loop
456
    while running:
457
      if not self.scheduler.empty():
458
        try:
459
          self.scheduler.run()
460
        except SchedulerBreakout:
461
          pass
462
      else:
463
        asyncore.loop(count=1, use_poll=True)
464

    
465
      # Check whether a signal was raised
466
      for sig in signal_handlers:
467
        handler = signal_handlers[sig]
468
        if handler.called:
469
          self._CallSignalWaiters(sig)
470
          running = sig not in (signal.SIGTERM, signal.SIGINT)
471
          handler.Clear()
472

    
473
  def _CallSignalWaiters(self, signum):
474
    """Calls all signal waiters for a certain signal.
475

476
    @type signum: int
477
    @param signum: Signal number
478

479
    """
480
    for owner in self._signal_wait:
481
      owner.OnSignal(signum)
482

    
483
  def RegisterSignal(self, owner):
484
    """Registers a receiver for signal notifications
485

486
    The receiver must support a "OnSignal(self, signum)" function.
487

488
    @type owner: instance
489
    @param owner: Receiver
490

491
    """
492
    self._signal_wait.append(owner)
493

    
494

    
495
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
496
                multithreaded=False, console_logging=False,
497
                default_ssl_cert=None, default_ssl_key=None,
498
                user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
499
  """Shared main function for daemons.
500

501
  @type daemon_name: string
502
  @param daemon_name: daemon name
503
  @type optionparser: optparse.OptionParser
504
  @param optionparser: initialized optionparser with daemon-specific options
505
                       (common -f -d options will be handled by this module)
506
  @type dirs: list of (string, integer)
507
  @param dirs: list of directories that must be created if they don't exist,
508
               and the permissions to be used to create them
509
  @type check_fn: function which accepts (options, args)
510
  @param check_fn: function that checks start conditions and exits if they're
511
                   not met
512
  @type exec_fn: function which accepts (options, args)
513
  @param exec_fn: function that's executed with the daemon's pid file held, and
514
                  runs the daemon itself.
515
  @type multithreaded: bool
516
  @param multithreaded: Whether the daemon uses threads
517
  @type console_logging: boolean
518
  @param console_logging: if True, the daemon will fall back to the system
519
                          console if logging fails
520
  @type default_ssl_cert: string
521
  @param default_ssl_cert: Default SSL certificate path
522
  @type default_ssl_key: string
523
  @param default_ssl_key: Default SSL key path
524
  @param user: Default user to run as
525
  @type user: string
526
  @param group: Default group to run as
527
  @type group: string
528

529
  """
530
  optionparser.add_option("-f", "--foreground", dest="fork",
531
                          help="Don't detach from the current terminal",
532
                          default=True, action="store_false")
533
  optionparser.add_option("-d", "--debug", dest="debug",
534
                          help="Enable some debug messages",
535
                          default=False, action="store_true")
536
  optionparser.add_option("--syslog", dest="syslog",
537
                          help="Enable logging to syslog (except debug"
538
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
539
                          constants.SYSLOG_USAGE,
540
                          default=constants.SYSLOG_USAGE,
541
                          choices=["no", "yes", "only"])
542

    
543
  if daemon_name in constants.DAEMONS_PORTS:
544
    default_bind_address = constants.IP4_ADDRESS_ANY
545
    default_port = netutils.GetDaemonPort(daemon_name)
546

    
547
    # For networked daemons we allow choosing the port and bind address
548
    optionparser.add_option("-p", "--port", dest="port",
549
                            help="Network port (default: %s)" % default_port,
550
                            default=default_port, type="int")
551
    optionparser.add_option("-b", "--bind", dest="bind_address",
552
                            help=("Bind address (default: %s)" %
553
                                  default_bind_address),
554
                            default=default_bind_address, metavar="ADDRESS")
555

    
556
  if default_ssl_key is not None and default_ssl_cert is not None:
557
    optionparser.add_option("--no-ssl", dest="ssl",
558
                            help="Do not secure HTTP protocol with SSL",
559
                            default=True, action="store_false")
560
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
561
                            help=("SSL key path (default: %s)" %
562
                                  default_ssl_key),
563
                            default=default_ssl_key, type="string",
564
                            metavar="SSL_KEY_PATH")
565
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
566
                            help=("SSL certificate path (default: %s)" %
567
                                  default_ssl_cert),
568
                            default=default_ssl_cert, type="string",
569
                            metavar="SSL_CERT_PATH")
570

    
571
  # Disable the use of fork(2) if the daemon uses threads
572
  utils.no_fork = multithreaded
573

    
574
  options, args = optionparser.parse_args()
575

    
576
  if getattr(options, "ssl", False):
577
    ssl_paths = {
578
      "certificate": options.ssl_cert,
579
      "key": options.ssl_key,
580
      }
581

    
582
    for name, path in ssl_paths.iteritems():
583
      if not os.path.isfile(path):
584
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
585
        sys.exit(constants.EXIT_FAILURE)
586

    
587
    # TODO: By initiating http.HttpSslParams here we would only read the files
588
    # once and have a proper validation (isfile returns False on directories)
589
    # at the same time.
590

    
591
  if check_fn is not None:
592
    check_fn(options, args)
593

    
594
  utils.EnsureDirs(dirs)
595

    
596
  if options.fork:
597
    try:
598
      uid = pwd.getpwnam(user).pw_uid
599
      gid = grp.getgrnam(group).gr_gid
600
    except KeyError:
601
      raise errors.ConfigurationError("User or group not existing on system:"
602
                                      " %s:%s" % (user, group))
603
    utils.CloseFDs()
604
    utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
605

    
606
  utils.WritePidFile(daemon_name)
607
  try:
608
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
609
                       debug=options.debug,
610
                       stderr_logging=not options.fork,
611
                       multithreaded=multithreaded,
612
                       program=daemon_name,
613
                       syslog=options.syslog,
614
                       console_logging=console_logging)
615
    logging.info("%s daemon startup", daemon_name)
616
    exec_fn(options, args)
617
  finally:
618
    utils.RemovePidFile(daemon_name)