Merge branch 'devel-2.1'
[ganeti-local] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import os
27 import signal
28 import errno
29 import logging
30 import sched
31 import time
32 import socket
33 import select
34 import sys
35
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import errors
39
40
41 class SchedulerBreakout(Exception):
42   """Exception used to get out of the scheduler loop
43
44   """
45
46
47 def AsyncoreDelayFunction(timeout):
48   """Asyncore-compatible scheduler delay function.
49
50   This is a delay function for sched that, rather than actually sleeping,
51   executes asyncore events happening in the meantime.
52
53   After an event has occurred, rather than returning, it raises a
54   SchedulerBreakout exception, which will force the current scheduler.run()
55   invocation to terminate, so that we can also check for signals. The main loop
56   will then call the scheduler run again, which will allow it to actually
57   process any due events.
58
59   This is needed because scheduler.run() doesn't support a count=..., as
60   asyncore loop, and the scheduler module documents throwing exceptions from
61   inside the delay function as an allowed usage model.
62
63   """
64   asyncore.loop(timeout=timeout, count=1, use_poll=True)
65   raise SchedulerBreakout()
66
67
68 class AsyncoreScheduler(sched.scheduler):
69   """Event scheduler integrated with asyncore
70
71   """
72   def __init__(self, timefunc):
73     sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
74
75
76 class AsyncUDPSocket(asyncore.dispatcher):
77   """An improved asyncore udp socket.
78
79   """
80   def __init__(self):
81     """Constructor for AsyncUDPSocket
82
83     """
84     asyncore.dispatcher.__init__(self)
85     self._out_queue = []
86     self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
87
88   # this method is overriding an asyncore.dispatcher method
89   def handle_connect(self):
90     # Python thinks that the first udp message from a source qualifies as a
91     # "connect" and further ones are part of the same connection. We beg to
92     # differ and treat all messages equally.
93     pass
94
95   def do_read(self):
96     try:
97       payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE)
98     except socket.error, err:
99       if err.errno == errno.EINTR:
100         # we got a signal while trying to read. no need to do anything,
101         # handle_read will be called again if there is data on the socket.
102         return
103       else:
104         raise
105     ip, port = address
106     self.handle_datagram(payload, ip, port)
107
108   # this method is overriding an asyncore.dispatcher method
109   def handle_read(self):
110     try:
111       self.do_read()
112     except: # pylint: disable-msg=W0702
113       # we need to catch any exception here, log it, but proceed, because even
114       # if we failed handling a single request, we still want to continue.
115       logging.error("Unexpected exception", exc_info=True)
116
117   def handle_datagram(self, payload, ip, port):
118     """Handle an already read udp datagram
119
120     """
121     raise NotImplementedError
122
123   # this method is overriding an asyncore.dispatcher method
124   def writable(self):
125     # We should check whether we can write to the socket only if we have
126     # something scheduled to be written
127     return bool(self._out_queue)
128
129   def handle_write(self):
130     try:
131       if not self._out_queue:
132         logging.error("handle_write called with empty output queue")
133         return
134       (ip, port, payload) = self._out_queue[0]
135       try:
136         self.sendto(payload, 0, (ip, port))
137       except socket.error, err:
138         if err.errno == errno.EINTR:
139           # we got a signal while trying to write. no need to do anything,
140           # handle_write will be called again because we haven't emptied the
141           # _out_queue, and we'll try again
142           return
143         else:
144           raise
145       self._out_queue.pop(0)
146     except: # pylint: disable-msg=W0702
147       # we need to catch any exception here, log it, but proceed, because even
148       # if we failed sending a single datagram we still want to continue.
149       logging.error("Unexpected exception", exc_info=True)
150
151   def enqueue_send(self, ip, port, payload):
152     """Enqueue a datagram to be sent when possible
153
154     """
155     if len(payload) > constants.MAX_UDP_DATA_SIZE:
156       raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
157                                     constants.MAX_UDP_DATA_SIZE))
158     self._out_queue.append((ip, port, payload))
159
160   def process_next_packet(self, timeout=0):
161     """Process the next datagram, waiting for it if necessary.
162
163     @type timeout: float
164     @param timeout: how long to wait for data
165     @rtype: boolean
166     @return: True if some data has been handled, False otherwise
167
168     """
169     result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
170     if result is not None and result & select.POLLIN:
171       self.do_read()
172       return True
173     else:
174       return False
175
176
177 class Mainloop(object):
178   """Generic mainloop for daemons
179
180   @ivar scheduler: A sched.scheduler object, which can be used to register
181     timed events
182
183   """
184   def __init__(self):
185     """Constructs a new Mainloop instance.
186
187     """
188     self._signal_wait = []
189     self.scheduler = AsyncoreScheduler(time.time)
190
191   @utils.SignalHandled([signal.SIGCHLD])
192   @utils.SignalHandled([signal.SIGTERM])
193   def Run(self, signal_handlers=None):
194     """Runs the mainloop.
195
196     @type signal_handlers: dict
197     @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
198
199     """
200     assert isinstance(signal_handlers, dict) and \
201            len(signal_handlers) > 0, \
202            "Broken SignalHandled decorator"
203     running = True
204     # Start actual main loop
205     while running:
206       if not self.scheduler.empty():
207         try:
208           self.scheduler.run()
209         except SchedulerBreakout:
210           pass
211       else:
212         asyncore.loop(count=1, use_poll=True)
213
214       # Check whether a signal was raised
215       for sig in signal_handlers:
216         handler = signal_handlers[sig]
217         if handler.called:
218           self._CallSignalWaiters(sig)
219           running = (sig != signal.SIGTERM)
220           handler.Clear()
221
222   def _CallSignalWaiters(self, signum):
223     """Calls all signal waiters for a certain signal.
224
225     @type signum: int
226     @param signum: Signal number
227
228     """
229     for owner in self._signal_wait:
230       owner.OnSignal(signum)
231
232   def RegisterSignal(self, owner):
233     """Registers a receiver for signal notifications
234
235     The receiver must support a "OnSignal(self, signum)" function.
236
237     @type owner: instance
238     @param owner: Receiver
239
240     """
241     self._signal_wait.append(owner)
242
243
244 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
245                 multithreaded=False,
246                 default_ssl_cert=None, default_ssl_key=None):
247   """Shared main function for daemons.
248
249   @type daemon_name: string
250   @param daemon_name: daemon name
251   @type optionparser: optparse.OptionParser
252   @param optionparser: initialized optionparser with daemon-specific options
253                        (common -f -d options will be handled by this module)
254   @type dirs: list of strings
255   @param dirs: list of directories that must exist for this daemon to work
256   @type check_fn: function which accepts (options, args)
257   @param check_fn: function that checks start conditions and exits if they're
258                    not met
259   @type exec_fn: function which accepts (options, args)
260   @param exec_fn: function that's executed with the daemon's pid file held, and
261                   runs the daemon itself.
262   @type multithreaded: bool
263   @param multithreaded: Whether the daemon uses threads
264   @type default_ssl_cert: string
265   @param default_ssl_cert: Default SSL certificate path
266   @type default_ssl_key: string
267   @param default_ssl_key: Default SSL key path
268
269   """
270   optionparser.add_option("-f", "--foreground", dest="fork",
271                           help="Don't detach from the current terminal",
272                           default=True, action="store_false")
273   optionparser.add_option("-d", "--debug", dest="debug",
274                           help="Enable some debug messages",
275                           default=False, action="store_true")
276   optionparser.add_option("--syslog", dest="syslog",
277                           help="Enable logging to syslog (except debug"
278                           " messages); one of 'no', 'yes' or 'only' [%s]" %
279                           constants.SYSLOG_USAGE,
280                           default=constants.SYSLOG_USAGE,
281                           choices=["no", "yes", "only"])
282
283   if daemon_name in constants.DAEMONS_PORTS:
284     default_bind_address = "0.0.0.0"
285     default_port = utils.GetDaemonPort(daemon_name)
286
287     # For networked daemons we allow choosing the port and bind address
288     optionparser.add_option("-p", "--port", dest="port",
289                             help="Network port (default: %s)" % default_port,
290                             default=default_port, type="int")
291     optionparser.add_option("-b", "--bind", dest="bind_address",
292                             help=("Bind address (default: %s)" %
293                                   default_bind_address),
294                             default=default_bind_address, metavar="ADDRESS")
295
296   if default_ssl_key is not None and default_ssl_cert is not None:
297     optionparser.add_option("--no-ssl", dest="ssl",
298                             help="Do not secure HTTP protocol with SSL",
299                             default=True, action="store_false")
300     optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
301                             help=("SSL key path (default: %s)" %
302                                   default_ssl_key),
303                             default=default_ssl_key, type="string",
304                             metavar="SSL_KEY_PATH")
305     optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
306                             help=("SSL certificate path (default: %s)" %
307                                   default_ssl_cert),
308                             default=default_ssl_cert, type="string",
309                             metavar="SSL_CERT_PATH")
310
311   # Disable the use of fork(2) if the daemon uses threads
312   utils.no_fork = multithreaded
313
314   options, args = optionparser.parse_args()
315
316   if getattr(options, "ssl", False):
317     ssl_paths = {
318       "certificate": options.ssl_cert,
319       "key": options.ssl_key,
320       }
321
322     for name, path in ssl_paths.iteritems():
323       if not os.path.isfile(path):
324         print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
325         sys.exit(constants.EXIT_FAILURE)
326
327     # TODO: By initiating http.HttpSslParams here we would only read the files
328     # once and have a proper validation (isfile returns False on directories)
329     # at the same time.
330
331   if check_fn is not None:
332     check_fn(options, args)
333
334   utils.EnsureDirs(dirs)
335
336   if options.fork:
337     utils.CloseFDs()
338     utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
339
340   utils.WritePidFile(daemon_name)
341   try:
342     utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
343                        debug=options.debug,
344                        stderr_logging=not options.fork,
345                        multithreaded=multithreaded,
346                        program=daemon_name,
347                        syslog=options.syslog)
348     logging.info("%s daemon startup", daemon_name)
349     exec_fn(options, args)
350   finally:
351     utils.RemovePidFile(daemon_name)