daemon.GenericMain: Don't use dict for SSL paths, improve CLI options
[ganeti-local] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import os
27 import signal
28 import errno
29 import logging
30 import sched
31 import time
32 import socket
33 import sys
34
35 from ganeti import utils
36 from ganeti import constants
37 from ganeti import errors
38
39
40 class SchedulerBreakout(Exception):
41   """Exception used to get out of the scheduler loop
42
43   """
44
45
46 def AsyncoreDelayFunction(timeout):
47   """Asyncore-compatible scheduler delay function.
48
49   This is a delay function for sched that, rather than actually sleeping,
50   executes asyncore events happening in the meantime.
51
52   After an event has occurred, rather than returning, it raises a
53   SchedulerBreakout exception, which will force the current scheduler.run()
54   invocation to terminate, so that we can also check for signals. The main loop
55   will then call the scheduler run again, which will allow it to actually
56   process any due events.
57
58   This is needed because scheduler.run() doesn't support a count=..., as
59   asyncore loop, and the scheduler module documents throwing exceptions from
60   inside the delay function as an allowed usage model.
61
62   """
63   asyncore.loop(timeout=timeout, count=1, use_poll=True)
64   raise SchedulerBreakout()
65
66
67 class AsyncoreScheduler(sched.scheduler):
68   """Event scheduler integrated with asyncore
69
70   """
71   def __init__(self, timefunc):
72     sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
73
74
75 class AsyncUDPSocket(asyncore.dispatcher):
76   """An improved asyncore udp socket.
77
78   """
79   def __init__(self):
80     """Constructor for AsyncUDPSocket
81
82     """
83     asyncore.dispatcher.__init__(self)
84     self._out_queue = []
85     self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
86
87   # this method is overriding an asyncore.dispatcher method
88   def handle_connect(self):
89     # Python thinks that the first udp message from a source qualifies as a
90     # "connect" and further ones are part of the same connection. We beg to
91     # differ and treat all messages equally.
92     pass
93
94   # this method is overriding an asyncore.dispatcher method
95   def handle_read(self):
96     try:
97       try:
98         payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE)
99       except socket.error, err:
100         if err.errno == errno.EINTR:
101           # we got a signal while trying to read. no need to do anything,
102           # handle_read will be called again if there is data on the socket.
103           return
104         else:
105           raise
106       ip, port = address
107       self.handle_datagram(payload, ip, port)
108     except:
109       # we need to catch any exception here, log it, but proceed, because even
110       # if we failed handling a single request, we still want to continue.
111       logging.error("Unexpected exception", exc_info=True)
112
113   def handle_datagram(self, payload, ip, port):
114     """Handle an already read udp datagram
115
116     """
117     raise NotImplementedError
118
119   # this method is overriding an asyncore.dispatcher method
120   def writable(self):
121     # We should check whether we can write to the socket only if we have
122     # something scheduled to be written
123     return bool(self._out_queue)
124
125   def handle_write(self):
126     try:
127       if not self._out_queue:
128         logging.error("handle_write called with empty output queue")
129         return
130       (ip, port, payload) = self._out_queue[0]
131       try:
132         self.sendto(payload, 0, (ip, port))
133       except socket.error, err:
134         if err.errno == errno.EINTR:
135           # we got a signal while trying to write. no need to do anything,
136           # handle_write will be called again because we haven't emptied the
137           # _out_queue, and we'll try again
138           return
139         else:
140           raise
141       self._out_queue.pop(0)
142     except:
143       # we need to catch any exception here, log it, but proceed, because even
144       # if we failed sending a single datagram we still want to continue.
145       logging.error("Unexpected exception", exc_info=True)
146
147   def enqueue_send(self, ip, port, payload):
148     """Enqueue a datagram to be sent when possible
149
150     """
151     if len(payload) > constants.MAX_UDP_DATA_SIZE:
152       raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
153                                     constants.MAX_UDP_DATA_SIZE))
154     self._out_queue.append((ip, port, payload))
155
156
157 class Mainloop(object):
158   """Generic mainloop for daemons
159
160   @ivar scheduler: A sched.scheduler object, which can be used to register
161     timed events
162
163   """
164   def __init__(self):
165     """Constructs a new Mainloop instance.
166
167     """
168     self._signal_wait = []
169     self.scheduler = AsyncoreScheduler(time.time)
170
171   @utils.SignalHandled([signal.SIGCHLD])
172   @utils.SignalHandled([signal.SIGTERM])
173   def Run(self, signal_handlers=None):
174     """Runs the mainloop.
175
176     @type signal_handlers: dict
177     @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
178
179     """
180     assert isinstance(signal_handlers, dict) and \
181            len(signal_handlers) > 0, \
182            "Broken SignalHandled decorator"
183     running = True
184     # Start actual main loop
185     while running:
186       if not self.scheduler.empty():
187         try:
188           self.scheduler.run()
189         except SchedulerBreakout:
190           pass
191       else:
192         asyncore.loop(count=1, use_poll=True)
193
194       # Check whether a signal was raised
195       for sig in signal_handlers:
196         handler = signal_handlers[sig]
197         if handler.called:
198           self._CallSignalWaiters(sig)
199           running = (sig != signal.SIGTERM)
200           handler.Clear()
201
202   def _CallSignalWaiters(self, signum):
203     """Calls all signal waiters for a certain signal.
204
205     @type signum: int
206     @param signum: Signal number
207
208     """
209     for owner in self._signal_wait:
210       owner.OnSignal(signum)
211
212   def RegisterSignal(self, owner):
213     """Registers a receiver for signal notifications
214
215     The receiver must support a "OnSignal(self, signum)" function.
216
217     @type owner: instance
218     @param owner: Receiver
219
220     """
221     self._signal_wait.append(owner)
222
223
224 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
225                 multithreaded=False,
226                 default_ssl_cert=None, default_ssl_key=None):
227   """Shared main function for daemons.
228
229   @type daemon_name: string
230   @param daemon_name: daemon name
231   @type optionparser: optparse.OptionParser
232   @param optionparser: initialized optionparser with daemon-specific options
233                        (common -f -d options will be handled by this module)
234   @type dirs: list of strings
235   @param dirs: list of directories that must exist for this daemon to work
236   @type check_fn: function which accepts (options, args)
237   @param check_fn: function that checks start conditions and exits if they're
238                    not met
239   @type exec_fn: function which accepts (options, args)
240   @param exec_fn: function that's executed with the daemon's pid file held, and
241                   runs the daemon itself.
242   @type multithreaded: bool
243   @param multithreaded: Whether the daemon uses threads
244   @type default_ssl_cert: string
245   @param default_ssl_cert: Default SSL certificate path
246   @type default_ssl_key: string
247   @param default_ssl_key: Default SSL key path
248
249   """
250   optionparser.add_option("-f", "--foreground", dest="fork",
251                           help="Don't detach from the current terminal",
252                           default=True, action="store_false")
253   optionparser.add_option("-d", "--debug", dest="debug",
254                           help="Enable some debug messages",
255                           default=False, action="store_true")
256
257   if daemon_name in constants.DAEMONS_PORTS:
258     default_bind_address = "0.0.0.0"
259     default_port = utils.GetDaemonPort(daemon_name)
260
261     # For networked daemons we allow choosing the port and bind address
262     optionparser.add_option("-p", "--port", dest="port",
263                             help="Network port (default: %s)" % default_port,
264                             default=default_port, type="int")
265     optionparser.add_option("-b", "--bind", dest="bind_address",
266                             help=("Bind address (default: %s)" %
267                                   default_bind_address),
268                             default=default_bind_address, metavar="ADDRESS")
269
270   if default_ssl_key is not None and default_ssl_cert is not None:
271     optionparser.add_option("--no-ssl", dest="ssl",
272                             help="Do not secure HTTP protocol with SSL",
273                             default=True, action="store_false")
274     optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
275                             help=("SSL key path (default: %s)" %
276                                   default_ssl_key),
277                             default=default_ssl_key, type="string",
278                             metavar="SSL_KEY_PATH")
279     optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
280                             help=("SSL certificate path (default: %s)" %
281                                   default_ssl_cert),
282                             default=default_ssl_cert, type="string",
283                             metavar="SSL_CERT_PATH")
284
285   # Disable the use of fork(2) if the daemon uses threads
286   utils.no_fork = multithreaded
287
288   options, args = optionparser.parse_args()
289
290   if getattr(options, "ssl", False):
291     ssl_paths = {
292       "certificate": options.ssl_cert,
293       "key": options.ssl_key,
294       }
295
296     for name, path in ssl_paths.iteritems():
297       if not os.path.isfile(path):
298         print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
299         sys.exit(constants.EXIT_FAILURE)
300
301     # TODO: By initiating http.HttpSslParams here we would only read the files
302     # once and have a proper validation (isfile returns False on directories)
303     # at the same time.
304
305   if check_fn is not None:
306     check_fn(options, args)
307
308   utils.EnsureDirs(dirs)
309
310   if options.fork:
311     utils.CloseFDs()
312     utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
313
314   utils.WritePidFile(daemon_name)
315   try:
316     utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
317                        debug=options.debug,
318                        stderr_logging=not options.fork,
319                        multithreaded=multithreaded)
320     logging.info("%s daemon startup", daemon_name)
321     exec_fn(options, args)
322   finally:
323     utils.RemovePidFile(daemon_name)