Confd IPv6 support
[ganeti-local] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import asynchat
27 import collections
28 import grp
29 import os
30 import pwd
31 import signal
32 import logging
33 import sched
34 import time
35 import socket
36 import select
37 import sys
38
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import netutils
43
44
45 _DEFAULT_RUN_USER = "root"
46 _DEFAULT_RUN_GROUP = "root"
47
48
49 class SchedulerBreakout(Exception):
50   """Exception used to get out of the scheduler loop
51
52   """
53
54
55 def AsyncoreDelayFunction(timeout):
56   """Asyncore-compatible scheduler delay function.
57
58   This is a delay function for sched that, rather than actually sleeping,
59   executes asyncore events happening in the meantime.
60
61   After an event has occurred, rather than returning, it raises a
62   SchedulerBreakout exception, which will force the current scheduler.run()
63   invocation to terminate, so that we can also check for signals. The main loop
64   will then call the scheduler run again, which will allow it to actually
65   process any due events.
66
67   This is needed because scheduler.run() doesn't support a count=..., as
68   asyncore loop, and the scheduler module documents throwing exceptions from
69   inside the delay function as an allowed usage model.
70
71   """
72   asyncore.loop(timeout=timeout, count=1, use_poll=True)
73   raise SchedulerBreakout()
74
75
76 class AsyncoreScheduler(sched.scheduler):
77   """Event scheduler integrated with asyncore
78
79   """
80   def __init__(self, timefunc):
81     sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
82
83
84 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
85   """Base Ganeti Asyncore Dispacher
86
87   """
88   # this method is overriding an asyncore.dispatcher method
89   def handle_error(self):
90     """Log an error in handling any request, and proceed.
91
92     """
93     logging.exception("Error while handling asyncore request")
94
95   # this method is overriding an asyncore.dispatcher method
96   def writable(self):
97     """Most of the time we don't want to check for writability.
98
99     """
100     return False
101
102
103 def FormatAddress(family, address):
104   """Format a client's address
105
106   @type family: integer
107   @param family: socket family (one of socket.AF_*)
108   @type address: family specific (usually tuple)
109   @param address: address, as reported by this class
110
111   """
112   if family == socket.AF_INET and len(address) == 2:
113     return "%s:%d" % address
114   elif family == socket.AF_UNIX and len(address) == 3:
115     return "pid=%s, uid=%s, gid=%s" % address
116   else:
117     return str(address)
118
119
120 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
121   """A stream server to use with asyncore.
122
123   Each request is accepted, and then dispatched to a separate asyncore
124   dispatcher to handle.
125
126   """
127
128   _REQUEST_QUEUE_SIZE = 5
129
130   def __init__(self, family, address):
131     """Constructor for AsyncUnixStreamSocket
132
133     @type family: integer
134     @param family: socket family (one of socket.AF_*)
135     @type address: address family dependent
136     @param address: address to bind the socket to
137
138     """
139     GanetiBaseAsyncoreDispatcher.__init__(self)
140     self.family = family
141     self.create_socket(self.family, socket.SOCK_STREAM)
142     self.set_reuse_addr()
143     self.bind(address)
144     self.listen(self._REQUEST_QUEUE_SIZE)
145
146   # this method is overriding an asyncore.dispatcher method
147   def handle_accept(self):
148     """Accept a new client connection.
149
150     Creates a new instance of the handler class, which will use asyncore to
151     serve the client.
152
153     """
154     accept_result = utils.IgnoreSignals(self.accept)
155     if accept_result is not None:
156       connected_socket, client_address = accept_result
157       if self.family == socket.AF_UNIX:
158         # override the client address, as for unix sockets nothing meaningful
159         # is passed in from accept anyway
160         client_address = netutils.GetSocketCredentials(connected_socket)
161       logging.info("Accepted connection from %s",
162                    FormatAddress(self.family, client_address))
163       self.handle_connection(connected_socket, client_address)
164
165   def handle_connection(self, connected_socket, client_address):
166     """Handle an already accepted connection.
167
168     """
169     raise NotImplementedError
170
171
172 class AsyncTerminatedMessageStream(asynchat.async_chat):
173   """A terminator separated message stream asyncore module.
174
175   Handles a stream connection receiving messages terminated by a defined
176   separator. For each complete message handle_message is called.
177
178   """
179   def __init__(self, connected_socket, peer_address, terminator, family,
180                unhandled_limit):
181     """AsyncTerminatedMessageStream constructor.
182
183     @type connected_socket: socket.socket
184     @param connected_socket: connected stream socket to receive messages from
185     @param peer_address: family-specific peer address
186     @type terminator: string
187     @param terminator: terminator separating messages in the stream
188     @type family: integer
189     @param family: socket family
190     @type unhandled_limit: integer or None
191     @param unhandled_limit: maximum unanswered messages
192
193     """
194     # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
195     # using a positional argument rather than a keyword one.
196     asynchat.async_chat.__init__(self, connected_socket)
197     self.connected_socket = connected_socket
198     # on python 2.4 there is no "family" attribute for the socket class
199     # FIXME: when we move to python 2.5 or above remove the family parameter
200     #self.family = self.connected_socket.family
201     self.family = family
202     self.peer_address = peer_address
203     self.terminator = terminator
204     self.unhandled_limit = unhandled_limit
205     self.set_terminator(terminator)
206     self.ibuffer = []
207     self.receive_count = 0
208     self.send_count = 0
209     self.oqueue = collections.deque()
210     self.iqueue = collections.deque()
211
212   # this method is overriding an asynchat.async_chat method
213   def collect_incoming_data(self, data):
214     self.ibuffer.append(data)
215
216   def _can_handle_message(self):
217     return (self.unhandled_limit is None or
218             (self.receive_count < self.send_count + self.unhandled_limit) and
219              not self.iqueue)
220
221   # this method is overriding an asynchat.async_chat method
222   def found_terminator(self):
223     message = "".join(self.ibuffer)
224     self.ibuffer = []
225     message_id = self.receive_count
226     # We need to increase the receive_count after checking if the message can
227     # be handled, but before calling handle_message
228     can_handle = self._can_handle_message()
229     self.receive_count += 1
230     if can_handle:
231       self.handle_message(message, message_id)
232     else:
233       self.iqueue.append((message, message_id))
234
235   def handle_message(self, message, message_id):
236     """Handle a terminated message.
237
238     @type message: string
239     @param message: message to handle
240     @type message_id: integer
241     @param message_id: stream's message sequence number
242
243     """
244     pass
245     # TODO: move this method to raise NotImplementedError
246     # raise NotImplementedError
247
248   def send_message(self, message):
249     """Send a message to the remote peer. This function is thread-safe.
250
251     @type message: string
252     @param message: message to send, without the terminator
253
254     @warning: If calling this function from a thread different than the one
255     performing the main asyncore loop, remember that you have to wake that one
256     up.
257
258     """
259     # If we just append the message we received to the output queue, this
260     # function can be safely called by multiple threads at the same time, and
261     # we don't need locking, since deques are thread safe. handle_write in the
262     # asyncore thread will handle the next input message if there are any
263     # enqueued.
264     self.oqueue.append(message)
265
266   # this method is overriding an asyncore.dispatcher method
267   def readable(self):
268     # read from the socket if we can handle the next requests
269     return self._can_handle_message() and asynchat.async_chat.readable(self)
270
271   # this method is overriding an asyncore.dispatcher method
272   def writable(self):
273     # the output queue may become full just after we called writable. This only
274     # works if we know we'll have something else waking us up from the select,
275     # in such case, anyway.
276     return asynchat.async_chat.writable(self) or self.oqueue
277
278   # this method is overriding an asyncore.dispatcher method
279   def handle_write(self):
280     if self.oqueue:
281       # if we have data in the output queue, then send_message was called.
282       # this means we can process one more message from the input queue, if
283       # there are any.
284       data = self.oqueue.popleft()
285       self.push(data + self.terminator)
286       self.send_count += 1
287       if self.iqueue:
288         self.handle_message(*self.iqueue.popleft())
289     self.initiate_send()
290
291   def close_log(self):
292     logging.info("Closing connection from %s",
293                  FormatAddress(self.family, self.peer_address))
294     self.close()
295
296   # this method is overriding an asyncore.dispatcher method
297   def handle_expt(self):
298     self.close_log()
299
300   # this method is overriding an asyncore.dispatcher method
301   def handle_error(self):
302     """Log an error in handling any request, and proceed.
303
304     """
305     logging.exception("Error while handling asyncore request")
306     self.close_log()
307
308
309 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
310   """An improved asyncore udp socket.
311
312   """
313   def __init__(self, family):
314     """Constructor for AsyncUDPSocket
315
316     """
317     GanetiBaseAsyncoreDispatcher.__init__(self)
318     self._out_queue = []
319     self._family = family
320     self.create_socket(family, socket.SOCK_DGRAM)
321
322   # this method is overriding an asyncore.dispatcher method
323   def handle_connect(self):
324     # Python thinks that the first udp message from a source qualifies as a
325     # "connect" and further ones are part of the same connection. We beg to
326     # differ and treat all messages equally.
327     pass
328
329   # this method is overriding an asyncore.dispatcher method
330   def handle_read(self):
331     recv_result = utils.IgnoreSignals(self.recvfrom,
332                                       constants.MAX_UDP_DATA_SIZE)
333     if recv_result is not None:
334       payload, address = recv_result
335       if self._family == socket.AF_INET6:
336         # we ignore 'flow info' and 'scope id' as we don't need them
337         ip, port, _, _ = address
338       else:
339         ip, port = address
340
341       self.handle_datagram(payload, ip, port)
342
343   def handle_datagram(self, payload, ip, port):
344     """Handle an already read udp datagram
345
346     """
347     raise NotImplementedError
348
349   # this method is overriding an asyncore.dispatcher method
350   def writable(self):
351     # We should check whether we can write to the socket only if we have
352     # something scheduled to be written
353     return bool(self._out_queue)
354
355   # this method is overriding an asyncore.dispatcher method
356   def handle_write(self):
357     if not self._out_queue:
358       logging.error("handle_write called with empty output queue")
359       return
360     (ip, port, payload) = self._out_queue[0]
361     utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
362     self._out_queue.pop(0)
363
364   def enqueue_send(self, ip, port, payload):
365     """Enqueue a datagram to be sent when possible
366
367     """
368     if len(payload) > constants.MAX_UDP_DATA_SIZE:
369       raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
370                                     constants.MAX_UDP_DATA_SIZE))
371     self._out_queue.append((ip, port, payload))
372
373   def process_next_packet(self, timeout=0):
374     """Process the next datagram, waiting for it if necessary.
375
376     @type timeout: float
377     @param timeout: how long to wait for data
378     @rtype: boolean
379     @return: True if some data has been handled, False otherwise
380
381     """
382     result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
383     if result is not None and result & select.POLLIN:
384       self.handle_read()
385       return True
386     else:
387       return False
388
389
390 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
391   """A way to notify the asyncore loop that something is going on.
392
393   If an asyncore daemon is multithreaded when a thread tries to push some data
394   to a socket, the main loop handling asynchronous requests might be sleeping
395   waiting on a select(). To avoid this it can create an instance of the
396   AsyncAwaker, which other threads can use to wake it up.
397
398   """
399   def __init__(self, signal_fn=None):
400     """Constructor for AsyncAwaker
401
402     @type signal_fn: function
403     @param signal_fn: function to call when awaken
404
405     """
406     GanetiBaseAsyncoreDispatcher.__init__(self)
407     assert signal_fn == None or callable(signal_fn)
408     (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
409                                                           socket.SOCK_STREAM)
410     self.in_socket.setblocking(0)
411     self.in_socket.shutdown(socket.SHUT_WR)
412     self.out_socket.shutdown(socket.SHUT_RD)
413     self.set_socket(self.in_socket)
414     self.need_signal = True
415     self.signal_fn = signal_fn
416     self.connected = True
417
418   # this method is overriding an asyncore.dispatcher method
419   def handle_read(self):
420     utils.IgnoreSignals(self.recv, 4096)
421     if self.signal_fn:
422       self.signal_fn()
423     self.need_signal = True
424
425   # this method is overriding an asyncore.dispatcher method
426   def close(self):
427     asyncore.dispatcher.close(self)
428     self.out_socket.close()
429
430   def signal(self):
431     """Signal the asyncore main loop.
432
433     Any data we send here will be ignored, but it will cause the select() call
434     to return.
435
436     """
437     # Yes, there is a race condition here. No, we don't care, at worst we're
438     # sending more than one wakeup token, which doesn't harm at all.
439     if self.need_signal:
440       self.need_signal = False
441       self.out_socket.send("\0")
442
443
444 class Mainloop(object):
445   """Generic mainloop for daemons
446
447   @ivar scheduler: A sched.scheduler object, which can be used to register
448     timed events
449
450   """
451   def __init__(self):
452     """Constructs a new Mainloop instance.
453
454     """
455     self._signal_wait = []
456     self.scheduler = AsyncoreScheduler(time.time)
457
458   @utils.SignalHandled([signal.SIGCHLD])
459   @utils.SignalHandled([signal.SIGTERM])
460   @utils.SignalHandled([signal.SIGINT])
461   def Run(self, signal_handlers=None):
462     """Runs the mainloop.
463
464     @type signal_handlers: dict
465     @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
466
467     """
468     assert isinstance(signal_handlers, dict) and \
469            len(signal_handlers) > 0, \
470            "Broken SignalHandled decorator"
471     running = True
472     # Start actual main loop
473     while running:
474       if not self.scheduler.empty():
475         try:
476           self.scheduler.run()
477         except SchedulerBreakout:
478           pass
479       else:
480         asyncore.loop(count=1, use_poll=True)
481
482       # Check whether a signal was raised
483       for sig in signal_handlers:
484         handler = signal_handlers[sig]
485         if handler.called:
486           self._CallSignalWaiters(sig)
487           running = sig not in (signal.SIGTERM, signal.SIGINT)
488           handler.Clear()
489
490   def _CallSignalWaiters(self, signum):
491     """Calls all signal waiters for a certain signal.
492
493     @type signum: int
494     @param signum: Signal number
495
496     """
497     for owner in self._signal_wait:
498       owner.OnSignal(signum)
499
500   def RegisterSignal(self, owner):
501     """Registers a receiver for signal notifications
502
503     The receiver must support a "OnSignal(self, signum)" function.
504
505     @type owner: instance
506     @param owner: Receiver
507
508     """
509     self._signal_wait.append(owner)
510
511
512 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
513                 multithreaded=False, console_logging=False,
514                 default_ssl_cert=None, default_ssl_key=None,
515                 user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
516   """Shared main function for daemons.
517
518   @type daemon_name: string
519   @param daemon_name: daemon name
520   @type optionparser: optparse.OptionParser
521   @param optionparser: initialized optionparser with daemon-specific options
522                        (common -f -d options will be handled by this module)
523   @type dirs: list of (string, integer)
524   @param dirs: list of directories that must be created if they don't exist,
525                and the permissions to be used to create them
526   @type check_fn: function which accepts (options, args)
527   @param check_fn: function that checks start conditions and exits if they're
528                    not met
529   @type exec_fn: function which accepts (options, args)
530   @param exec_fn: function that's executed with the daemon's pid file held, and
531                   runs the daemon itself.
532   @type multithreaded: bool
533   @param multithreaded: Whether the daemon uses threads
534   @type console_logging: boolean
535   @param console_logging: if True, the daemon will fall back to the system
536                           console if logging fails
537   @type default_ssl_cert: string
538   @param default_ssl_cert: Default SSL certificate path
539   @type default_ssl_key: string
540   @param default_ssl_key: Default SSL key path
541   @param user: Default user to run as
542   @type user: string
543   @param group: Default group to run as
544   @type group: string
545
546   """
547   optionparser.add_option("-f", "--foreground", dest="fork",
548                           help="Don't detach from the current terminal",
549                           default=True, action="store_false")
550   optionparser.add_option("-d", "--debug", dest="debug",
551                           help="Enable some debug messages",
552                           default=False, action="store_true")
553   optionparser.add_option("--syslog", dest="syslog",
554                           help="Enable logging to syslog (except debug"
555                           " messages); one of 'no', 'yes' or 'only' [%s]" %
556                           constants.SYSLOG_USAGE,
557                           default=constants.SYSLOG_USAGE,
558                           choices=["no", "yes", "only"])
559
560   if daemon_name in constants.DAEMONS_PORTS:
561     default_bind_address = constants.IP4_ADDRESS_ANY
562     default_port = netutils.GetDaemonPort(daemon_name)
563
564     # For networked daemons we allow choosing the port and bind address
565     optionparser.add_option("-p", "--port", dest="port",
566                             help="Network port (default: %s)" % default_port,
567                             default=default_port, type="int")
568     optionparser.add_option("-b", "--bind", dest="bind_address",
569                             help=("Bind address (default: %s)" %
570                                   default_bind_address),
571                             default=default_bind_address, metavar="ADDRESS")
572
573   if default_ssl_key is not None and default_ssl_cert is not None:
574     optionparser.add_option("--no-ssl", dest="ssl",
575                             help="Do not secure HTTP protocol with SSL",
576                             default=True, action="store_false")
577     optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
578                             help=("SSL key path (default: %s)" %
579                                   default_ssl_key),
580                             default=default_ssl_key, type="string",
581                             metavar="SSL_KEY_PATH")
582     optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
583                             help=("SSL certificate path (default: %s)" %
584                                   default_ssl_cert),
585                             default=default_ssl_cert, type="string",
586                             metavar="SSL_CERT_PATH")
587
588   # Disable the use of fork(2) if the daemon uses threads
589   utils.no_fork = multithreaded
590
591   options, args = optionparser.parse_args()
592
593   if getattr(options, "ssl", False):
594     ssl_paths = {
595       "certificate": options.ssl_cert,
596       "key": options.ssl_key,
597       }
598
599     for name, path in ssl_paths.iteritems():
600       if not os.path.isfile(path):
601         print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
602         sys.exit(constants.EXIT_FAILURE)
603
604     # TODO: By initiating http.HttpSslParams here we would only read the files
605     # once and have a proper validation (isfile returns False on directories)
606     # at the same time.
607
608   if check_fn is not None:
609     check_fn(options, args)
610
611   utils.EnsureDirs(dirs)
612
613   if options.fork:
614     try:
615       uid = pwd.getpwnam(user).pw_uid
616       gid = grp.getgrnam(group).gr_gid
617     except KeyError:
618       raise errors.ConfigurationError("User or group not existing on system:"
619                                       " %s:%s" % (user, group))
620     utils.CloseFDs()
621     utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
622
623   utils.WritePidFile(daemon_name)
624   try:
625     utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
626                        debug=options.debug,
627                        stderr_logging=not options.fork,
628                        multithreaded=multithreaded,
629                        program=daemon_name,
630                        syslog=options.syslog,
631                        console_logging=console_logging)
632     logging.info("%s daemon startup", daemon_name)
633     exec_fn(options, args)
634   finally:
635     utils.RemovePidFile(daemon_name)