Merge branch 'devel-2.1'
[ganeti-local] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import os
27 import signal
28 import logging
29 import sched
30 import time
31 import socket
32 import select
33 import sys
34
35 from ganeti import utils
36 from ganeti import constants
37 from ganeti import errors
38
39
40 class SchedulerBreakout(Exception):
41   """Exception used to get out of the scheduler loop
42
43   """
44
45
46 def AsyncoreDelayFunction(timeout):
47   """Asyncore-compatible scheduler delay function.
48
49   This is a delay function for sched that, rather than actually sleeping,
50   executes asyncore events happening in the meantime.
51
52   After an event has occurred, rather than returning, it raises a
53   SchedulerBreakout exception, which will force the current scheduler.run()
54   invocation to terminate, so that we can also check for signals. The main loop
55   will then call the scheduler run again, which will allow it to actually
56   process any due events.
57
58   This is needed because scheduler.run() doesn't support a count=..., as
59   asyncore loop, and the scheduler module documents throwing exceptions from
60   inside the delay function as an allowed usage model.
61
62   """
63   asyncore.loop(timeout=timeout, count=1, use_poll=True)
64   raise SchedulerBreakout()
65
66
67 class AsyncoreScheduler(sched.scheduler):
68   """Event scheduler integrated with asyncore
69
70   """
71   def __init__(self, timefunc):
72     sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
73
74
75 class AsyncUDPSocket(asyncore.dispatcher):
76   """An improved asyncore udp socket.
77
78   """
79   def __init__(self):
80     """Constructor for AsyncUDPSocket
81
82     """
83     asyncore.dispatcher.__init__(self)
84     self._out_queue = []
85     self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
86
87   # this method is overriding an asyncore.dispatcher method
88   def handle_connect(self):
89     # Python thinks that the first udp message from a source qualifies as a
90     # "connect" and further ones are part of the same connection. We beg to
91     # differ and treat all messages equally.
92     pass
93
94   # this method is overriding an asyncore.dispatcher method
95   def handle_read(self):
96     payload, address = utils.IgnoreSignals(self.recvfrom,
97                                            constants.MAX_UDP_DATA_SIZE)
98     ip, port = address
99     self.handle_datagram(payload, ip, port)
100
101   def handle_datagram(self, payload, ip, port):
102     """Handle an already read udp datagram
103
104     """
105     raise NotImplementedError
106
107   # this method is overriding an asyncore.dispatcher method
108   def writable(self):
109     # We should check whether we can write to the socket only if we have
110     # something scheduled to be written
111     return bool(self._out_queue)
112
113   # this method is overriding an asyncore.dispatcher method
114   def handle_write(self):
115     if not self._out_queue:
116       logging.error("handle_write called with empty output queue")
117       return
118     (ip, port, payload) = self._out_queue[0]
119     utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
120     self._out_queue.pop(0)
121
122   # this method is overriding an asyncore.dispatcher method
123   def handle_error(self):
124     """Log an error in handling any request, and proceed.
125
126     """
127     logging.exception("Error while handling asyncore request")
128
129   def enqueue_send(self, ip, port, payload):
130     """Enqueue a datagram to be sent when possible
131
132     """
133     if len(payload) > constants.MAX_UDP_DATA_SIZE:
134       raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
135                                     constants.MAX_UDP_DATA_SIZE))
136     self._out_queue.append((ip, port, payload))
137
138   def process_next_packet(self, timeout=0):
139     """Process the next datagram, waiting for it if necessary.
140
141     @type timeout: float
142     @param timeout: how long to wait for data
143     @rtype: boolean
144     @return: True if some data has been handled, False otherwise
145
146     """
147     result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
148     if result is not None and result & select.POLLIN:
149       self.handle_read()
150       return True
151     else:
152       return False
153
154
155 class Mainloop(object):
156   """Generic mainloop for daemons
157
158   @ivar scheduler: A sched.scheduler object, which can be used to register
159     timed events
160
161   """
162   def __init__(self):
163     """Constructs a new Mainloop instance.
164
165     """
166     self._signal_wait = []
167     self.scheduler = AsyncoreScheduler(time.time)
168
169   @utils.SignalHandled([signal.SIGCHLD])
170   @utils.SignalHandled([signal.SIGTERM])
171   @utils.SignalHandled([signal.SIGINT])
172   def Run(self, signal_handlers=None):
173     """Runs the mainloop.
174
175     @type signal_handlers: dict
176     @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
177
178     """
179     assert isinstance(signal_handlers, dict) and \
180            len(signal_handlers) > 0, \
181            "Broken SignalHandled decorator"
182     running = True
183     # Start actual main loop
184     while running:
185       if not self.scheduler.empty():
186         try:
187           self.scheduler.run()
188         except SchedulerBreakout:
189           pass
190       else:
191         asyncore.loop(count=1, use_poll=True)
192
193       # Check whether a signal was raised
194       for sig in signal_handlers:
195         handler = signal_handlers[sig]
196         if handler.called:
197           self._CallSignalWaiters(sig)
198           running = sig not in (signal.SIGTERM, signal.SIGINT)
199           handler.Clear()
200
201   def _CallSignalWaiters(self, signum):
202     """Calls all signal waiters for a certain signal.
203
204     @type signum: int
205     @param signum: Signal number
206
207     """
208     for owner in self._signal_wait:
209       owner.OnSignal(signum)
210
211   def RegisterSignal(self, owner):
212     """Registers a receiver for signal notifications
213
214     The receiver must support a "OnSignal(self, signum)" function.
215
216     @type owner: instance
217     @param owner: Receiver
218
219     """
220     self._signal_wait.append(owner)
221
222
223 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
224                 multithreaded=False, console_logging=False,
225                 default_ssl_cert=None, default_ssl_key=None):
226   """Shared main function for daemons.
227
228   @type daemon_name: string
229   @param daemon_name: daemon name
230   @type optionparser: optparse.OptionParser
231   @param optionparser: initialized optionparser with daemon-specific options
232                        (common -f -d options will be handled by this module)
233   @type dirs: list of (string, integer)
234   @param dirs: list of directories that must be created if they don't exist,
235                and the permissions to be used to create them
236   @type check_fn: function which accepts (options, args)
237   @param check_fn: function that checks start conditions and exits if they're
238                    not met
239   @type exec_fn: function which accepts (options, args)
240   @param exec_fn: function that's executed with the daemon's pid file held, and
241                   runs the daemon itself.
242   @type multithreaded: bool
243   @param multithreaded: Whether the daemon uses threads
244   @type console_logging: boolean
245   @param console_logging: if True, the daemon will fall back to the system
246                           console if logging fails
247   @type default_ssl_cert: string
248   @param default_ssl_cert: Default SSL certificate path
249   @type default_ssl_key: string
250   @param default_ssl_key: Default SSL key path
251
252   """
253   optionparser.add_option("-f", "--foreground", dest="fork",
254                           help="Don't detach from the current terminal",
255                           default=True, action="store_false")
256   optionparser.add_option("-d", "--debug", dest="debug",
257                           help="Enable some debug messages",
258                           default=False, action="store_true")
259   optionparser.add_option("--syslog", dest="syslog",
260                           help="Enable logging to syslog (except debug"
261                           " messages); one of 'no', 'yes' or 'only' [%s]" %
262                           constants.SYSLOG_USAGE,
263                           default=constants.SYSLOG_USAGE,
264                           choices=["no", "yes", "only"])
265
266   if daemon_name in constants.DAEMONS_PORTS:
267     default_bind_address = "0.0.0.0"
268     default_port = utils.GetDaemonPort(daemon_name)
269
270     # For networked daemons we allow choosing the port and bind address
271     optionparser.add_option("-p", "--port", dest="port",
272                             help="Network port (default: %s)" % default_port,
273                             default=default_port, type="int")
274     optionparser.add_option("-b", "--bind", dest="bind_address",
275                             help=("Bind address (default: %s)" %
276                                   default_bind_address),
277                             default=default_bind_address, metavar="ADDRESS")
278
279   if default_ssl_key is not None and default_ssl_cert is not None:
280     optionparser.add_option("--no-ssl", dest="ssl",
281                             help="Do not secure HTTP protocol with SSL",
282                             default=True, action="store_false")
283     optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
284                             help=("SSL key path (default: %s)" %
285                                   default_ssl_key),
286                             default=default_ssl_key, type="string",
287                             metavar="SSL_KEY_PATH")
288     optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
289                             help=("SSL certificate path (default: %s)" %
290                                   default_ssl_cert),
291                             default=default_ssl_cert, type="string",
292                             metavar="SSL_CERT_PATH")
293
294   # Disable the use of fork(2) if the daemon uses threads
295   utils.no_fork = multithreaded
296
297   options, args = optionparser.parse_args()
298
299   if getattr(options, "ssl", False):
300     ssl_paths = {
301       "certificate": options.ssl_cert,
302       "key": options.ssl_key,
303       }
304
305     for name, path in ssl_paths.iteritems():
306       if not os.path.isfile(path):
307         print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
308         sys.exit(constants.EXIT_FAILURE)
309
310     # TODO: By initiating http.HttpSslParams here we would only read the files
311     # once and have a proper validation (isfile returns False on directories)
312     # at the same time.
313
314   if check_fn is not None:
315     check_fn(options, args)
316
317   utils.EnsureDirs(dirs)
318
319   if options.fork:
320     utils.CloseFDs()
321     utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
322
323   utils.WritePidFile(daemon_name)
324   try:
325     utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
326                        debug=options.debug,
327                        stderr_logging=not options.fork,
328                        multithreaded=multithreaded,
329                        program=daemon_name,
330                        syslog=options.syslog,
331                        console_logging=console_logging)
332     logging.info("%s daemon startup", daemon_name)
333     exec_fn(options, args)
334   finally:
335     utils.RemovePidFile(daemon_name)