Merge remote branch 'origin/devel-2.1'
[ganeti-local] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import os
27 import signal
28 import logging
29 import sched
30 import time
31 import socket
32 import select
33 import sys
34
35 from ganeti import utils
36 from ganeti import constants
37 from ganeti import errors
38
39
40 class SchedulerBreakout(Exception):
41   """Exception used to get out of the scheduler loop
42
43   """
44
45
46 def AsyncoreDelayFunction(timeout):
47   """Asyncore-compatible scheduler delay function.
48
49   This is a delay function for sched that, rather than actually sleeping,
50   executes asyncore events happening in the meantime.
51
52   After an event has occurred, rather than returning, it raises a
53   SchedulerBreakout exception, which will force the current scheduler.run()
54   invocation to terminate, so that we can also check for signals. The main loop
55   will then call the scheduler run again, which will allow it to actually
56   process any due events.
57
58   This is needed because scheduler.run() doesn't support a count=..., as
59   asyncore loop, and the scheduler module documents throwing exceptions from
60   inside the delay function as an allowed usage model.
61
62   """
63   asyncore.loop(timeout=timeout, count=1, use_poll=True)
64   raise SchedulerBreakout()
65
66
67 class AsyncoreScheduler(sched.scheduler):
68   """Event scheduler integrated with asyncore
69
70   """
71   def __init__(self, timefunc):
72     sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
73
74
75 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
76   """Base Ganeti Asyncore Dispacher
77
78   """
79   # this method is overriding an asyncore.dispatcher method
80   def handle_error(self):
81     """Log an error in handling any request, and proceed.
82
83     """
84     logging.exception("Error while handling asyncore request")
85
86   # this method is overriding an asyncore.dispatcher method
87   def writable(self):
88     """Most of the time we don't want to check for writability.
89
90     """
91     return False
92
93
94 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
95   """An improved asyncore udp socket.
96
97   """
98   def __init__(self):
99     """Constructor for AsyncUDPSocket
100
101     """
102     GanetiBaseAsyncoreDispatcher.__init__(self)
103     self._out_queue = []
104     self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
105
106   # this method is overriding an asyncore.dispatcher method
107   def handle_connect(self):
108     # Python thinks that the first udp message from a source qualifies as a
109     # "connect" and further ones are part of the same connection. We beg to
110     # differ and treat all messages equally.
111     pass
112
113   # this method is overriding an asyncore.dispatcher method
114   def handle_read(self):
115     recv_result = utils.IgnoreSignals(self.recvfrom,
116                                       constants.MAX_UDP_DATA_SIZE)
117     if recv_result is not None:
118       payload, address = recv_result
119       ip, port = address
120       self.handle_datagram(payload, ip, port)
121
122   def handle_datagram(self, payload, ip, port):
123     """Handle an already read udp datagram
124
125     """
126     raise NotImplementedError
127
128   # this method is overriding an asyncore.dispatcher method
129   def writable(self):
130     # We should check whether we can write to the socket only if we have
131     # something scheduled to be written
132     return bool(self._out_queue)
133
134   # this method is overriding an asyncore.dispatcher method
135   def handle_write(self):
136     if not self._out_queue:
137       logging.error("handle_write called with empty output queue")
138       return
139     (ip, port, payload) = self._out_queue[0]
140     utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
141     self._out_queue.pop(0)
142
143   def enqueue_send(self, ip, port, payload):
144     """Enqueue a datagram to be sent when possible
145
146     """
147     if len(payload) > constants.MAX_UDP_DATA_SIZE:
148       raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
149                                     constants.MAX_UDP_DATA_SIZE))
150     self._out_queue.append((ip, port, payload))
151
152   def process_next_packet(self, timeout=0):
153     """Process the next datagram, waiting for it if necessary.
154
155     @type timeout: float
156     @param timeout: how long to wait for data
157     @rtype: boolean
158     @return: True if some data has been handled, False otherwise
159
160     """
161     result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
162     if result is not None and result & select.POLLIN:
163       self.handle_read()
164       return True
165     else:
166       return False
167
168
169 class Mainloop(object):
170   """Generic mainloop for daemons
171
172   @ivar scheduler: A sched.scheduler object, which can be used to register
173     timed events
174
175   """
176   def __init__(self):
177     """Constructs a new Mainloop instance.
178
179     """
180     self._signal_wait = []
181     self.scheduler = AsyncoreScheduler(time.time)
182
183   @utils.SignalHandled([signal.SIGCHLD])
184   @utils.SignalHandled([signal.SIGTERM])
185   @utils.SignalHandled([signal.SIGINT])
186   def Run(self, signal_handlers=None):
187     """Runs the mainloop.
188
189     @type signal_handlers: dict
190     @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
191
192     """
193     assert isinstance(signal_handlers, dict) and \
194            len(signal_handlers) > 0, \
195            "Broken SignalHandled decorator"
196     running = True
197     # Start actual main loop
198     while running:
199       if not self.scheduler.empty():
200         try:
201           self.scheduler.run()
202         except SchedulerBreakout:
203           pass
204       else:
205         asyncore.loop(count=1, use_poll=True)
206
207       # Check whether a signal was raised
208       for sig in signal_handlers:
209         handler = signal_handlers[sig]
210         if handler.called:
211           self._CallSignalWaiters(sig)
212           running = sig not in (signal.SIGTERM, signal.SIGINT)
213           handler.Clear()
214
215   def _CallSignalWaiters(self, signum):
216     """Calls all signal waiters for a certain signal.
217
218     @type signum: int
219     @param signum: Signal number
220
221     """
222     for owner in self._signal_wait:
223       owner.OnSignal(signum)
224
225   def RegisterSignal(self, owner):
226     """Registers a receiver for signal notifications
227
228     The receiver must support a "OnSignal(self, signum)" function.
229
230     @type owner: instance
231     @param owner: Receiver
232
233     """
234     self._signal_wait.append(owner)
235
236
237 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
238                 multithreaded=False, console_logging=False,
239                 default_ssl_cert=None, default_ssl_key=None):
240   """Shared main function for daemons.
241
242   @type daemon_name: string
243   @param daemon_name: daemon name
244   @type optionparser: optparse.OptionParser
245   @param optionparser: initialized optionparser with daemon-specific options
246                        (common -f -d options will be handled by this module)
247   @type dirs: list of (string, integer)
248   @param dirs: list of directories that must be created if they don't exist,
249                and the permissions to be used to create them
250   @type check_fn: function which accepts (options, args)
251   @param check_fn: function that checks start conditions and exits if they're
252                    not met
253   @type exec_fn: function which accepts (options, args)
254   @param exec_fn: function that's executed with the daemon's pid file held, and
255                   runs the daemon itself.
256   @type multithreaded: bool
257   @param multithreaded: Whether the daemon uses threads
258   @type console_logging: boolean
259   @param console_logging: if True, the daemon will fall back to the system
260                           console if logging fails
261   @type default_ssl_cert: string
262   @param default_ssl_cert: Default SSL certificate path
263   @type default_ssl_key: string
264   @param default_ssl_key: Default SSL key path
265
266   """
267   optionparser.add_option("-f", "--foreground", dest="fork",
268                           help="Don't detach from the current terminal",
269                           default=True, action="store_false")
270   optionparser.add_option("-d", "--debug", dest="debug",
271                           help="Enable some debug messages",
272                           default=False, action="store_true")
273   optionparser.add_option("--syslog", dest="syslog",
274                           help="Enable logging to syslog (except debug"
275                           " messages); one of 'no', 'yes' or 'only' [%s]" %
276                           constants.SYSLOG_USAGE,
277                           default=constants.SYSLOG_USAGE,
278                           choices=["no", "yes", "only"])
279
280   if daemon_name in constants.DAEMONS_PORTS:
281     default_bind_address = "0.0.0.0"
282     default_port = utils.GetDaemonPort(daemon_name)
283
284     # For networked daemons we allow choosing the port and bind address
285     optionparser.add_option("-p", "--port", dest="port",
286                             help="Network port (default: %s)" % default_port,
287                             default=default_port, type="int")
288     optionparser.add_option("-b", "--bind", dest="bind_address",
289                             help=("Bind address (default: %s)" %
290                                   default_bind_address),
291                             default=default_bind_address, metavar="ADDRESS")
292
293   if default_ssl_key is not None and default_ssl_cert is not None:
294     optionparser.add_option("--no-ssl", dest="ssl",
295                             help="Do not secure HTTP protocol with SSL",
296                             default=True, action="store_false")
297     optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
298                             help=("SSL key path (default: %s)" %
299                                   default_ssl_key),
300                             default=default_ssl_key, type="string",
301                             metavar="SSL_KEY_PATH")
302     optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
303                             help=("SSL certificate path (default: %s)" %
304                                   default_ssl_cert),
305                             default=default_ssl_cert, type="string",
306                             metavar="SSL_CERT_PATH")
307
308   # Disable the use of fork(2) if the daemon uses threads
309   utils.no_fork = multithreaded
310
311   options, args = optionparser.parse_args()
312
313   if getattr(options, "ssl", False):
314     ssl_paths = {
315       "certificate": options.ssl_cert,
316       "key": options.ssl_key,
317       }
318
319     for name, path in ssl_paths.iteritems():
320       if not os.path.isfile(path):
321         print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
322         sys.exit(constants.EXIT_FAILURE)
323
324     # TODO: By initiating http.HttpSslParams here we would only read the files
325     # once and have a proper validation (isfile returns False on directories)
326     # at the same time.
327
328   if check_fn is not None:
329     check_fn(options, args)
330
331   utils.EnsureDirs(dirs)
332
333   if options.fork:
334     utils.CloseFDs()
335     utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
336
337   utils.WritePidFile(daemon_name)
338   try:
339     utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
340                        debug=options.debug,
341                        stderr_logging=not options.fork,
342                        multithreaded=multithreaded,
343                        program=daemon_name,
344                        syslog=options.syslog,
345                        console_logging=console_logging)
346     logging.info("%s daemon startup", daemon_name)
347     exec_fn(options, args)
348   finally:
349     utils.RemovePidFile(daemon_name)