Statistics
| Branch: | Tag: | Revision:

root / lib / daemon.py @ 0070a462

History | View | Annotate | Download (20.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module with helper classes and functions for daemons"""
23

    
24

    
25
import asyncore
26
import asynchat
27
import collections
28
import os
29
import signal
30
import logging
31
import sched
32
import time
33
import socket
34
import select
35
import sys
36

    
37
from ganeti import utils
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import netutils
41
from ganeti import ssconf
42

    
43

    
44
class SchedulerBreakout(Exception):
45
  """Exception used to get out of the scheduler loop
46

47
  """
48

    
49

    
50
def AsyncoreDelayFunction(timeout):
51
  """Asyncore-compatible scheduler delay function.
52

53
  This is a delay function for sched that, rather than actually sleeping,
54
  executes asyncore events happening in the meantime.
55

56
  After an event has occurred, rather than returning, it raises a
57
  SchedulerBreakout exception, which will force the current scheduler.run()
58
  invocation to terminate, so that we can also check for signals. The main loop
59
  will then call the scheduler run again, which will allow it to actually
60
  process any due events.
61

62
  This is needed because scheduler.run() doesn't support a count=..., as
63
  asyncore loop, and the scheduler module documents throwing exceptions from
64
  inside the delay function as an allowed usage model.
65

66
  """
67
  asyncore.loop(timeout=timeout, count=1, use_poll=True)
68
  raise SchedulerBreakout()
69

    
70

    
71
class AsyncoreScheduler(sched.scheduler):
72
  """Event scheduler integrated with asyncore
73

74
  """
75
  def __init__(self, timefunc):
76
    sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
77

    
78

    
79
class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
80
  """Base Ganeti Asyncore Dispacher
81

82
  """
83
  # this method is overriding an asyncore.dispatcher method
84
  def handle_error(self):
85
    """Log an error in handling any request, and proceed.
86

87
    """
88
    logging.exception("Error while handling asyncore request")
89

    
90
  # this method is overriding an asyncore.dispatcher method
91
  def writable(self):
92
    """Most of the time we don't want to check for writability.
93

94
    """
95
    return False
96

    
97

    
98
class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
99
  """A stream server to use with asyncore.
100

101
  Each request is accepted, and then dispatched to a separate asyncore
102
  dispatcher to handle.
103

104
  """
105

    
106
  _REQUEST_QUEUE_SIZE = 5
107

    
108
  def __init__(self, family, address):
109
    """Constructor for AsyncUnixStreamSocket
110

111
    @type family: integer
112
    @param family: socket family (one of socket.AF_*)
113
    @type address: address family dependent
114
    @param address: address to bind the socket to
115

116
    """
117
    GanetiBaseAsyncoreDispatcher.__init__(self)
118
    self.family = family
119
    self.create_socket(self.family, socket.SOCK_STREAM)
120
    self.set_reuse_addr()
121
    self.bind(address)
122
    self.listen(self._REQUEST_QUEUE_SIZE)
123

    
124
  # this method is overriding an asyncore.dispatcher method
125
  def handle_accept(self):
126
    """Accept a new client connection.
127

128
    Creates a new instance of the handler class, which will use asyncore to
129
    serve the client.
130

131
    """
132
    accept_result = utils.IgnoreSignals(self.accept)
133
    if accept_result is not None:
134
      connected_socket, client_address = accept_result
135
      if self.family == socket.AF_UNIX:
136
        # override the client address, as for unix sockets nothing meaningful
137
        # is passed in from accept anyway
138
        client_address = netutils.GetSocketCredentials(connected_socket)
139
      logging.info("Accepted connection from %s",
140
                   netutils.FormatAddress(client_address, family=self.family))
141
      self.handle_connection(connected_socket, client_address)
142

    
143
  def handle_connection(self, connected_socket, client_address):
144
    """Handle an already accepted connection.
145

146
    """
147
    raise NotImplementedError
148

    
149

    
150
class AsyncTerminatedMessageStream(asynchat.async_chat):
151
  """A terminator separated message stream asyncore module.
152

153
  Handles a stream connection receiving messages terminated by a defined
154
  separator. For each complete message handle_message is called.
155

156
  """
157
  def __init__(self, connected_socket, peer_address, terminator, family,
158
               unhandled_limit):
159
    """AsyncTerminatedMessageStream constructor.
160

161
    @type connected_socket: socket.socket
162
    @param connected_socket: connected stream socket to receive messages from
163
    @param peer_address: family-specific peer address
164
    @type terminator: string
165
    @param terminator: terminator separating messages in the stream
166
    @type family: integer
167
    @param family: socket family
168
    @type unhandled_limit: integer or None
169
    @param unhandled_limit: maximum unanswered messages
170

171
    """
172
    # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
173
    # using a positional argument rather than a keyword one.
174
    asynchat.async_chat.__init__(self, connected_socket)
175
    self.connected_socket = connected_socket
176
    # on python 2.4 there is no "family" attribute for the socket class
177
    # FIXME: when we move to python 2.5 or above remove the family parameter
178
    #self.family = self.connected_socket.family
179
    self.family = family
180
    self.peer_address = peer_address
181
    self.terminator = terminator
182
    self.unhandled_limit = unhandled_limit
183
    self.set_terminator(terminator)
184
    self.ibuffer = []
185
    self.receive_count = 0
186
    self.send_count = 0
187
    self.oqueue = collections.deque()
188
    self.iqueue = collections.deque()
189

    
190
  # this method is overriding an asynchat.async_chat method
191
  def collect_incoming_data(self, data):
192
    self.ibuffer.append(data)
193

    
194
  def _can_handle_message(self):
195
    return (self.unhandled_limit is None or
196
            (self.receive_count < self.send_count + self.unhandled_limit) and
197
             not self.iqueue)
198

    
199
  # this method is overriding an asynchat.async_chat method
200
  def found_terminator(self):
201
    message = "".join(self.ibuffer)
202
    self.ibuffer = []
203
    message_id = self.receive_count
204
    # We need to increase the receive_count after checking if the message can
205
    # be handled, but before calling handle_message
206
    can_handle = self._can_handle_message()
207
    self.receive_count += 1
208
    if can_handle:
209
      self.handle_message(message, message_id)
210
    else:
211
      self.iqueue.append((message, message_id))
212

    
213
  def handle_message(self, message, message_id):
214
    """Handle a terminated message.
215

216
    @type message: string
217
    @param message: message to handle
218
    @type message_id: integer
219
    @param message_id: stream's message sequence number
220

221
    """
222
    pass
223
    # TODO: move this method to raise NotImplementedError
224
    # raise NotImplementedError
225

    
226
  def send_message(self, message):
227
    """Send a message to the remote peer. This function is thread-safe.
228

229
    @type message: string
230
    @param message: message to send, without the terminator
231

232
    @warning: If calling this function from a thread different than the one
233
    performing the main asyncore loop, remember that you have to wake that one
234
    up.
235

236
    """
237
    # If we just append the message we received to the output queue, this
238
    # function can be safely called by multiple threads at the same time, and
239
    # we don't need locking, since deques are thread safe. handle_write in the
240
    # asyncore thread will handle the next input message if there are any
241
    # enqueued.
242
    self.oqueue.append(message)
243

    
244
  # this method is overriding an asyncore.dispatcher method
245
  def readable(self):
246
    # read from the socket if we can handle the next requests
247
    return self._can_handle_message() and asynchat.async_chat.readable(self)
248

    
249
  # this method is overriding an asyncore.dispatcher method
250
  def writable(self):
251
    # the output queue may become full just after we called writable. This only
252
    # works if we know we'll have something else waking us up from the select,
253
    # in such case, anyway.
254
    return asynchat.async_chat.writable(self) or self.oqueue
255

    
256
  # this method is overriding an asyncore.dispatcher method
257
  def handle_write(self):
258
    if self.oqueue:
259
      # if we have data in the output queue, then send_message was called.
260
      # this means we can process one more message from the input queue, if
261
      # there are any.
262
      data = self.oqueue.popleft()
263
      self.push(data + self.terminator)
264
      self.send_count += 1
265
      if self.iqueue:
266
        self.handle_message(*self.iqueue.popleft())
267
    self.initiate_send()
268

    
269
  def close_log(self):
270
    logging.info("Closing connection from %s",
271
                 netutils.FormatAddress(self.peer_address, family=self.family))
272
    self.close()
273

    
274
  # this method is overriding an asyncore.dispatcher method
275
  def handle_expt(self):
276
    self.close_log()
277

    
278
  # this method is overriding an asyncore.dispatcher method
279
  def handle_error(self):
280
    """Log an error in handling any request, and proceed.
281

282
    """
283
    logging.exception("Error while handling asyncore request")
284
    self.close_log()
285

    
286

    
287
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
288
  """An improved asyncore udp socket.
289

290
  """
291
  def __init__(self, family):
292
    """Constructor for AsyncUDPSocket
293

294
    """
295
    GanetiBaseAsyncoreDispatcher.__init__(self)
296
    self._out_queue = []
297
    self._family = family
298
    self.create_socket(family, socket.SOCK_DGRAM)
299

    
300
  # this method is overriding an asyncore.dispatcher method
301
  def handle_connect(self):
302
    # Python thinks that the first udp message from a source qualifies as a
303
    # "connect" and further ones are part of the same connection. We beg to
304
    # differ and treat all messages equally.
305
    pass
306

    
307
  # this method is overriding an asyncore.dispatcher method
308
  def handle_read(self):
309
    recv_result = utils.IgnoreSignals(self.recvfrom,
310
                                      constants.MAX_UDP_DATA_SIZE)
311
    if recv_result is not None:
312
      payload, address = recv_result
313
      if self._family == socket.AF_INET6:
314
        # we ignore 'flow info' and 'scope id' as we don't need them
315
        ip, port, _, _ = address
316
      else:
317
        ip, port = address
318

    
319
      self.handle_datagram(payload, ip, port)
320

    
321
  def handle_datagram(self, payload, ip, port):
322
    """Handle an already read udp datagram
323

324
    """
325
    raise NotImplementedError
326

    
327
  # this method is overriding an asyncore.dispatcher method
328
  def writable(self):
329
    # We should check whether we can write to the socket only if we have
330
    # something scheduled to be written
331
    return bool(self._out_queue)
332

    
333
  # this method is overriding an asyncore.dispatcher method
334
  def handle_write(self):
335
    if not self._out_queue:
336
      logging.error("handle_write called with empty output queue")
337
      return
338
    (ip, port, payload) = self._out_queue[0]
339
    utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
340
    self._out_queue.pop(0)
341

    
342
  def enqueue_send(self, ip, port, payload):
343
    """Enqueue a datagram to be sent when possible
344

345
    """
346
    if len(payload) > constants.MAX_UDP_DATA_SIZE:
347
      raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
348
                                    constants.MAX_UDP_DATA_SIZE))
349
    self._out_queue.append((ip, port, payload))
350

    
351
  def process_next_packet(self, timeout=0):
352
    """Process the next datagram, waiting for it if necessary.
353

354
    @type timeout: float
355
    @param timeout: how long to wait for data
356
    @rtype: boolean
357
    @return: True if some data has been handled, False otherwise
358

359
    """
360
    result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
361
    if result is not None and result & select.POLLIN:
362
      self.handle_read()
363
      return True
364
    else:
365
      return False
366

    
367

    
368
class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
369
  """A way to notify the asyncore loop that something is going on.
370

371
  If an asyncore daemon is multithreaded when a thread tries to push some data
372
  to a socket, the main loop handling asynchronous requests might be sleeping
373
  waiting on a select(). To avoid this it can create an instance of the
374
  AsyncAwaker, which other threads can use to wake it up.
375

376
  """
377
  def __init__(self, signal_fn=None):
378
    """Constructor for AsyncAwaker
379

380
    @type signal_fn: function
381
    @param signal_fn: function to call when awaken
382

383
    """
384
    GanetiBaseAsyncoreDispatcher.__init__(self)
385
    assert signal_fn == None or callable(signal_fn)
386
    (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
387
                                                          socket.SOCK_STREAM)
388
    self.in_socket.setblocking(0)
389
    self.in_socket.shutdown(socket.SHUT_WR)
390
    self.out_socket.shutdown(socket.SHUT_RD)
391
    self.set_socket(self.in_socket)
392
    self.need_signal = True
393
    self.signal_fn = signal_fn
394
    self.connected = True
395

    
396
  # this method is overriding an asyncore.dispatcher method
397
  def handle_read(self):
398
    utils.IgnoreSignals(self.recv, 4096)
399
    if self.signal_fn:
400
      self.signal_fn()
401
    self.need_signal = True
402

    
403
  # this method is overriding an asyncore.dispatcher method
404
  def close(self):
405
    asyncore.dispatcher.close(self)
406
    self.out_socket.close()
407

    
408
  def signal(self):
409
    """Signal the asyncore main loop.
410

411
    Any data we send here will be ignored, but it will cause the select() call
412
    to return.
413

414
    """
415
    # Yes, there is a race condition here. No, we don't care, at worst we're
416
    # sending more than one wakeup token, which doesn't harm at all.
417
    if self.need_signal:
418
      self.need_signal = False
419
      self.out_socket.send("\0")
420

    
421

    
422
class Mainloop(object):
423
  """Generic mainloop for daemons
424

425
  @ivar scheduler: A sched.scheduler object, which can be used to register
426
    timed events
427

428
  """
429
  def __init__(self):
430
    """Constructs a new Mainloop instance.
431

432
    """
433
    self._signal_wait = []
434
    self.scheduler = AsyncoreScheduler(time.time)
435

    
436
  @utils.SignalHandled([signal.SIGCHLD])
437
  @utils.SignalHandled([signal.SIGTERM])
438
  @utils.SignalHandled([signal.SIGINT])
439
  def Run(self, signal_handlers=None):
440
    """Runs the mainloop.
441

442
    @type signal_handlers: dict
443
    @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
444

445
    """
446
    assert isinstance(signal_handlers, dict) and \
447
           len(signal_handlers) > 0, \
448
           "Broken SignalHandled decorator"
449
    running = True
450
    # Start actual main loop
451
    while running:
452
      if not self.scheduler.empty():
453
        try:
454
          self.scheduler.run()
455
        except SchedulerBreakout:
456
          pass
457
      else:
458
        asyncore.loop(count=1, use_poll=True)
459

    
460
      # Check whether a signal was raised
461
      for sig in signal_handlers:
462
        handler = signal_handlers[sig]
463
        if handler.called:
464
          self._CallSignalWaiters(sig)
465
          running = sig not in (signal.SIGTERM, signal.SIGINT)
466
          handler.Clear()
467

    
468
  def _CallSignalWaiters(self, signum):
469
    """Calls all signal waiters for a certain signal.
470

471
    @type signum: int
472
    @param signum: Signal number
473

474
    """
475
    for owner in self._signal_wait:
476
      owner.OnSignal(signum)
477

    
478
  def RegisterSignal(self, owner):
479
    """Registers a receiver for signal notifications
480

481
    The receiver must support a "OnSignal(self, signum)" function.
482

483
    @type owner: instance
484
    @param owner: Receiver
485

486
    """
487
    self._signal_wait.append(owner)
488

    
489

    
490
def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
491
                multithreaded=False, console_logging=False,
492
                default_ssl_cert=None, default_ssl_key=None):
493
  """Shared main function for daemons.
494

495
  @type daemon_name: string
496
  @param daemon_name: daemon name
497
  @type optionparser: optparse.OptionParser
498
  @param optionparser: initialized optionparser with daemon-specific options
499
                       (common -f -d options will be handled by this module)
500
  @type dirs: list of (string, integer)
501
  @param dirs: list of directories that must be created if they don't exist,
502
               and the permissions to be used to create them
503
  @type check_fn: function which accepts (options, args)
504
  @param check_fn: function that checks start conditions and exits if they're
505
                   not met
506
  @type exec_fn: function which accepts (options, args)
507
  @param exec_fn: function that's executed with the daemon's pid file held, and
508
                  runs the daemon itself.
509
  @type multithreaded: bool
510
  @param multithreaded: Whether the daemon uses threads
511
  @type console_logging: boolean
512
  @param console_logging: if True, the daemon will fall back to the system
513
                          console if logging fails
514
  @type default_ssl_cert: string
515
  @param default_ssl_cert: Default SSL certificate path
516
  @type default_ssl_key: string
517
  @param default_ssl_key: Default SSL key path
518

519
  """
520
  optionparser.add_option("-f", "--foreground", dest="fork",
521
                          help="Don't detach from the current terminal",
522
                          default=True, action="store_false")
523
  optionparser.add_option("-d", "--debug", dest="debug",
524
                          help="Enable some debug messages",
525
                          default=False, action="store_true")
526
  optionparser.add_option("--syslog", dest="syslog",
527
                          help="Enable logging to syslog (except debug"
528
                          " messages); one of 'no', 'yes' or 'only' [%s]" %
529
                          constants.SYSLOG_USAGE,
530
                          default=constants.SYSLOG_USAGE,
531
                          choices=["no", "yes", "only"])
532

    
533
  if daemon_name in constants.DAEMONS_PORTS:
534
    default_bind_address = constants.IP4_ADDRESS_ANY
535
    family = ssconf.SimpleStore().GetPrimaryIPFamily()
536
    # family will default to AF_INET if there is no ssconf file (e.g. when
537
    # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
538
    # <= 2.2 can not be AF_INET6
539
    if family == netutils.IP6Address.family:
540
      default_bind_address = constants.IP6_ADDRESS_ANY
541

    
542
    default_port = netutils.GetDaemonPort(daemon_name)
543

    
544
    # For networked daemons we allow choosing the port and bind address
545
    optionparser.add_option("-p", "--port", dest="port",
546
                            help="Network port (default: %s)" % default_port,
547
                            default=default_port, type="int")
548
    optionparser.add_option("-b", "--bind", dest="bind_address",
549
                            help=("Bind address (default: '%s')" %
550
                                  default_bind_address),
551
                            default=default_bind_address, metavar="ADDRESS")
552

    
553
  if default_ssl_key is not None and default_ssl_cert is not None:
554
    optionparser.add_option("--no-ssl", dest="ssl",
555
                            help="Do not secure HTTP protocol with SSL",
556
                            default=True, action="store_false")
557
    optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
558
                            help=("SSL key path (default: %s)" %
559
                                  default_ssl_key),
560
                            default=default_ssl_key, type="string",
561
                            metavar="SSL_KEY_PATH")
562
    optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
563
                            help=("SSL certificate path (default: %s)" %
564
                                  default_ssl_cert),
565
                            default=default_ssl_cert, type="string",
566
                            metavar="SSL_CERT_PATH")
567

    
568
  # Disable the use of fork(2) if the daemon uses threads
569
  utils.no_fork = multithreaded
570

    
571
  options, args = optionparser.parse_args()
572

    
573
  if getattr(options, "ssl", False):
574
    ssl_paths = {
575
      "certificate": options.ssl_cert,
576
      "key": options.ssl_key,
577
      }
578

    
579
    for name, path in ssl_paths.iteritems():
580
      if not os.path.isfile(path):
581
        print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
582
        sys.exit(constants.EXIT_FAILURE)
583

    
584
    # TODO: By initiating http.HttpSslParams here we would only read the files
585
    # once and have a proper validation (isfile returns False on directories)
586
    # at the same time.
587

    
588
  if check_fn is not None:
589
    check_fn(options, args)
590

    
591
  utils.EnsureDirs(dirs)
592

    
593
  if options.fork:
594
    utils.CloseFDs()
595
    utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
596

    
597
  utils.WritePidFile(daemon_name)
598
  try:
599
    utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
600
                       debug=options.debug,
601
                       stderr_logging=not options.fork,
602
                       multithreaded=multithreaded,
603
                       program=daemon_name,
604
                       syslog=options.syslog,
605
                       console_logging=console_logging)
606
    logging.info("%s daemon startup", daemon_name)
607
    exec_fn(options, args)
608
  finally:
609
    utils.RemovePidFile(daemon_name)