WaitForSocketCondition: rename, handle EINTR
[ganeti-local] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import os
27 import signal
28 import errno
29 import logging
30 import sched
31 import time
32 import socket
33 import sys
34
35 from ganeti import utils
36 from ganeti import constants
37 from ganeti import errors
38
39
40 class SchedulerBreakout(Exception):
41   """Exception used to get out of the scheduler loop
42
43   """
44
45
46 def AsyncoreDelayFunction(timeout):
47   """Asyncore-compatible scheduler delay function.
48
49   This is a delay function for sched that, rather than actually sleeping,
50   executes asyncore events happening in the meantime.
51
52   After an event has occurred, rather than returning, it raises a
53   SchedulerBreakout exception, which will force the current scheduler.run()
54   invocation to terminate, so that we can also check for signals. The main loop
55   will then call the scheduler run again, which will allow it to actually
56   process any due events.
57
58   This is needed because scheduler.run() doesn't support a count=..., as
59   asyncore loop, and the scheduler module documents throwing exceptions from
60   inside the delay function as an allowed usage model.
61
62   """
63   asyncore.loop(timeout=timeout, count=1, use_poll=True)
64   raise SchedulerBreakout()
65
66
67 class AsyncoreScheduler(sched.scheduler):
68   """Event scheduler integrated with asyncore
69
70   """
71   def __init__(self, timefunc):
72     sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
73
74
75 class AsyncUDPSocket(asyncore.dispatcher):
76   """An improved asyncore udp socket.
77
78   """
79   def __init__(self):
80     """Constructor for AsyncUDPSocket
81
82     """
83     asyncore.dispatcher.__init__(self)
84     self._out_queue = []
85     self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
86
87   # this method is overriding an asyncore.dispatcher method
88   def handle_connect(self):
89     # Python thinks that the first udp message from a source qualifies as a
90     # "connect" and further ones are part of the same connection. We beg to
91     # differ and treat all messages equally.
92     pass
93
94   def do_read(self):
95     try:
96       payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE)
97     except socket.error, err:
98       if err.errno == errno.EINTR:
99         # we got a signal while trying to read. no need to do anything,
100         # handle_read will be called again if there is data on the socket.
101         return
102       else:
103         raise
104     ip, port = address
105     self.handle_datagram(payload, ip, port)
106
107   # this method is overriding an asyncore.dispatcher method
108   def handle_read(self):
109     try:
110       self.do_read()
111     except: # pylint: disable-msg=W0702
112       # we need to catch any exception here, log it, but proceed, because even
113       # if we failed handling a single request, we still want to continue.
114       logging.error("Unexpected exception", exc_info=True)
115
116   def handle_datagram(self, payload, ip, port):
117     """Handle an already read udp datagram
118
119     """
120     raise NotImplementedError
121
122   # this method is overriding an asyncore.dispatcher method
123   def writable(self):
124     # We should check whether we can write to the socket only if we have
125     # something scheduled to be written
126     return bool(self._out_queue)
127
128   def handle_write(self):
129     try:
130       if not self._out_queue:
131         logging.error("handle_write called with empty output queue")
132         return
133       (ip, port, payload) = self._out_queue[0]
134       try:
135         self.sendto(payload, 0, (ip, port))
136       except socket.error, err:
137         if err.errno == errno.EINTR:
138           # we got a signal while trying to write. no need to do anything,
139           # handle_write will be called again because we haven't emptied the
140           # _out_queue, and we'll try again
141           return
142         else:
143           raise
144       self._out_queue.pop(0)
145     except: # pylint: disable-msg=W0702
146       # we need to catch any exception here, log it, but proceed, because even
147       # if we failed sending a single datagram we still want to continue.
148       logging.error("Unexpected exception", exc_info=True)
149
150   def enqueue_send(self, ip, port, payload):
151     """Enqueue a datagram to be sent when possible
152
153     """
154     if len(payload) > constants.MAX_UDP_DATA_SIZE:
155       raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
156                                     constants.MAX_UDP_DATA_SIZE))
157     self._out_queue.append((ip, port, payload))
158
159
160 class Mainloop(object):
161   """Generic mainloop for daemons
162
163   @ivar scheduler: A sched.scheduler object, which can be used to register
164     timed events
165
166   """
167   def __init__(self):
168     """Constructs a new Mainloop instance.
169
170     """
171     self._signal_wait = []
172     self.scheduler = AsyncoreScheduler(time.time)
173
174   @utils.SignalHandled([signal.SIGCHLD])
175   @utils.SignalHandled([signal.SIGTERM])
176   def Run(self, signal_handlers=None):
177     """Runs the mainloop.
178
179     @type signal_handlers: dict
180     @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
181
182     """
183     assert isinstance(signal_handlers, dict) and \
184            len(signal_handlers) > 0, \
185            "Broken SignalHandled decorator"
186     running = True
187     # Start actual main loop
188     while running:
189       if not self.scheduler.empty():
190         try:
191           self.scheduler.run()
192         except SchedulerBreakout:
193           pass
194       else:
195         asyncore.loop(count=1, use_poll=True)
196
197       # Check whether a signal was raised
198       for sig in signal_handlers:
199         handler = signal_handlers[sig]
200         if handler.called:
201           self._CallSignalWaiters(sig)
202           running = (sig != signal.SIGTERM)
203           handler.Clear()
204
205   def _CallSignalWaiters(self, signum):
206     """Calls all signal waiters for a certain signal.
207
208     @type signum: int
209     @param signum: Signal number
210
211     """
212     for owner in self._signal_wait:
213       owner.OnSignal(signum)
214
215   def RegisterSignal(self, owner):
216     """Registers a receiver for signal notifications
217
218     The receiver must support a "OnSignal(self, signum)" function.
219
220     @type owner: instance
221     @param owner: Receiver
222
223     """
224     self._signal_wait.append(owner)
225
226
227 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
228   """Shared main function for daemons.
229
230   @type daemon_name: string
231   @param daemon_name: daemon name
232   @type optionparser: optparse.OptionParser
233   @param optionparser: initialized optionparser with daemon-specific options
234                        (common -f -d options will be handled by this module)
235   @type dirs: list of strings
236   @param dirs: list of directories that must exist for this daemon to work
237   @type check_fn: function which accepts (options, args)
238   @param check_fn: function that checks start conditions and exits if they're
239                    not met
240   @type exec_fn: function which accepts (options, args)
241   @param exec_fn: function that's executed with the daemon's pid file held, and
242                   runs the daemon itself.
243
244   """
245   optionparser.add_option("-f", "--foreground", dest="fork",
246                           help="Don't detach from the current terminal",
247                           default=True, action="store_false")
248   optionparser.add_option("-d", "--debug", dest="debug",
249                           help="Enable some debug messages",
250                           default=False, action="store_true")
251   optionparser.add_option("--syslog", dest="syslog",
252                           help="Enable logging to syslog (except debug"
253                           " messages); one of 'no', 'yes' or 'only' [%s]" %
254                           constants.SYSLOG_USAGE,
255                           default=constants.SYSLOG_USAGE,
256                           choices=["no", "yes", "only"])
257   if daemon_name in constants.DAEMONS_PORTS:
258     # for networked daemons we also allow choosing the bind port and address.
259     # by default we use the port provided by utils.GetDaemonPort, and bind to
260     # 0.0.0.0 (which is represented by and empty bind address.
261     port = utils.GetDaemonPort(daemon_name)
262     optionparser.add_option("-p", "--port", dest="port",
263                             help="Network port (%s default)." % port,
264                             default=port, type="int")
265     optionparser.add_option("-b", "--bind", dest="bind_address",
266                             help="Bind address",
267                             default="", metavar="ADDRESS")
268
269   if daemon_name in constants.DAEMONS_SSL:
270     default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
271     optionparser.add_option("--no-ssl", dest="ssl",
272                             help="Do not secure HTTP protocol with SSL",
273                             default=True, action="store_false")
274     optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
275                             help="SSL key",
276                             default=default_key, type="string")
277     optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
278                             help="SSL certificate",
279                             default=default_cert, type="string")
280
281   multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
282
283   options, args = optionparser.parse_args()
284
285   if hasattr(options, 'ssl') and options.ssl:
286     if not (options.ssl_cert and options.ssl_key):
287       print >> sys.stderr, "Need key and certificate to use ssl"
288       sys.exit(constants.EXIT_FAILURE)
289     for fname in (options.ssl_cert, options.ssl_key):
290       if not os.path.isfile(fname):
291         print >> sys.stderr, "Need ssl file %s to run" % fname
292         sys.exit(constants.EXIT_FAILURE)
293
294   if check_fn is not None:
295     check_fn(options, args)
296
297   utils.EnsureDirs(dirs)
298
299   if options.fork:
300     utils.CloseFDs()
301     utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
302
303   utils.WritePidFile(daemon_name)
304   try:
305     utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
306                        debug=options.debug,
307                        stderr_logging=not options.fork,
308                        multithreaded=multithread,
309                        program=daemon_name,
310                        syslog=options.syslog)
311     logging.info("%s daemon startup", daemon_name)
312     exec_fn(options, args)
313   finally:
314     utils.RemovePidFile(daemon_name)