confd/client: make it possible to update peer list
[ganeti-local] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import os
27 import select
28 import signal
29 import errno
30 import logging
31 import sched
32 import time
33 import socket
34 import sys
35
36 from ganeti import utils
37 from ganeti import constants
38 from ganeti import errors
39
40
41 class SchedulerBreakout(Exception):
42   """Exception used to get out of the scheduler loop
43
44   """
45
46
47 def AsyncoreDelayFunction(timeout):
48   """Asyncore-compatible scheduler delay function.
49
50   This is a delay function for sched that, rather than actually sleeping,
51   executes asyncore events happening in the meantime.
52
53   After an event has occurred, rather than returning, it raises a
54   SchedulerBreakout exception, which will force the current scheduler.run()
55   invocation to terminate, so that we can also check for signals. The main loop
56   will then call the scheduler run again, which will allow it to actually
57   process any due events.
58
59   This is needed because scheduler.run() doesn't support a count=..., as
60   asyncore loop, and the scheduler module documents throwing exceptions from
61   inside the delay function as an allowed usage model.
62
63   """
64   asyncore.loop(timeout=timeout, count=1, use_poll=True)
65   raise SchedulerBreakout()
66
67
68 class AsyncoreScheduler(sched.scheduler):
69   """Event scheduler integrated with asyncore
70
71   """
72   def __init__(self, timefunc):
73     sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
74
75
76 class AsyncUDPSocket(asyncore.dispatcher):
77   """An improved asyncore udp socket.
78
79   """
80   def __init__(self):
81     """Constructor for AsyncUDPSocket
82
83     """
84     asyncore.dispatcher.__init__(self)
85     self._out_queue = []
86     self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
87
88   # this method is overriding an asyncore.dispatcher method
89   def handle_connect(self):
90     # Python thinks that the first udp message from a source qualifies as a
91     # "connect" and further ones are part of the same connection. We beg to
92     # differ and treat all messages equally.
93     pass
94
95   # this method is overriding an asyncore.dispatcher method
96   def handle_read(self):
97     try:
98       try:
99         payload, address = self.recvfrom(constants.MAX_UDP_DATA_SIZE)
100       except socket.error, err:
101         if err.errno == errno.EINTR:
102           # we got a signal while trying to read. no need to do anything,
103           # handle_read will be called again if there is data on the socket.
104           return
105         else:
106           raise
107       ip, port = address
108       self.handle_datagram(payload, ip, port)
109     except:
110       # we need to catch any exception here, log it, but proceed, because even
111       # if we failed handling a single request, we still want to continue.
112       logging.error("Unexpected exception", exc_info=True)
113
114   def handle_datagram(self, payload, ip, port):
115     """Handle an already read udp datagram
116
117     """
118     raise NotImplementedError
119
120   # this method is overriding an asyncore.dispatcher method
121   def writable(self):
122     # We should check whether we can write to the socket only if we have
123     # something scheduled to be written
124     return bool(self._out_queue)
125
126   def handle_write(self):
127     try:
128       if not self._out_queue:
129         logging.error("handle_write called with empty output queue")
130         return
131       (ip, port, payload) = self._out_queue[0]
132       try:
133         self.sendto(payload, 0, (ip, port))
134       except socket.error, err:
135         if err.errno == errno.EINTR:
136           # we got a signal while trying to write. no need to do anything,
137           # handle_write will be called again because we haven't emptied the
138           # _out_queue, and we'll try again
139           return
140         else:
141           raise
142       self._out_queue.pop(0)
143     except:
144       # we need to catch any exception here, log it, but proceed, because even
145       # if we failed sending a single datagram we still want to continue.
146       logging.error("Unexpected exception", exc_info=True)
147
148   def enqueue_send(self, ip, port, payload):
149     """Enqueue a datagram to be sent when possible
150
151     """
152     if len(payload) > constants.MAX_UDP_DATA_SIZE:
153       raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
154                                     constants.MAX_UDP_DATA_SIZE))
155     self._out_queue.append((ip, port, payload))
156
157
158 class Mainloop(object):
159   """Generic mainloop for daemons
160
161   """
162   def __init__(self):
163     """Constructs a new Mainloop instance.
164
165     @ivar scheduler: A L{sched.scheduler} object, which can be used to register
166     timed events
167
168     """
169     self._signal_wait = []
170     self.scheduler = AsyncoreScheduler(time.time)
171
172   @utils.SignalHandled([signal.SIGCHLD])
173   @utils.SignalHandled([signal.SIGTERM])
174   def Run(self, stop_on_empty=False, signal_handlers=None):
175     """Runs the mainloop.
176
177     @type stop_on_empty: bool
178     @param stop_on_empty: Whether to stop mainloop once all I/O waiters
179                           unregistered
180     @type signal_handlers: dict
181     @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
182
183     """
184     assert isinstance(signal_handlers, dict) and \
185            len(signal_handlers) > 0, \
186            "Broken SignalHandled decorator"
187     running = True
188     # Start actual main loop
189     while running:
190       # Stop if nothing is listening anymore
191       if stop_on_empty and not (self._io_wait):
192         break
193
194       if not self.scheduler.empty():
195         try:
196           self.scheduler.run()
197         except SchedulerBreakout:
198           pass
199       else:
200         asyncore.loop(count=1, use_poll=True)
201
202       # Check whether a signal was raised
203       for sig in signal_handlers:
204         handler = signal_handlers[sig]
205         if handler.called:
206           self._CallSignalWaiters(sig)
207           running = (sig != signal.SIGTERM)
208           handler.Clear()
209
210   def _CallSignalWaiters(self, signum):
211     """Calls all signal waiters for a certain signal.
212
213     @type signum: int
214     @param signum: Signal number
215
216     """
217     for owner in self._signal_wait:
218       owner.OnSignal(signum)
219
220   def RegisterSignal(self, owner):
221     """Registers a receiver for signal notifications
222
223     The receiver must support a "OnSignal(self, signum)" function.
224
225     @type owner: instance
226     @param owner: Receiver
227
228     """
229     self._signal_wait.append(owner)
230
231
232 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
233   """Shared main function for daemons.
234
235   @type daemon_name: string
236   @param daemon_name: daemon name
237   @type optionparser: L{optparse.OptionParser}
238   @param optionparser: initialized optionparser with daemon-specific options
239                        (common -f -d options will be handled by this module)
240   @type options: object @param options: OptionParser result, should contain at
241                  least the fork and the debug options
242   @type dirs: list of strings
243   @param dirs: list of directories that must exist for this daemon to work
244   @type check_fn: function which accepts (options, args)
245   @param check_fn: function that checks start conditions and exits if they're
246                    not met
247   @type exec_fn: function which accepts (options, args)
248   @param exec_fn: function that's executed with the daemon's pid file held, and
249                   runs the daemon itself.
250
251   """
252   optionparser.add_option("-f", "--foreground", dest="fork",
253                           help="Don't detach from the current terminal",
254                           default=True, action="store_false")
255   optionparser.add_option("-d", "--debug", dest="debug",
256                           help="Enable some debug messages",
257                           default=False, action="store_true")
258   if daemon_name in constants.DAEMONS_PORTS:
259     # for networked daemons we also allow choosing the bind port and address.
260     # by default we use the port provided by utils.GetDaemonPort, and bind to
261     # 0.0.0.0 (which is represented by and empty bind address.
262     port = utils.GetDaemonPort(daemon_name)
263     optionparser.add_option("-p", "--port", dest="port",
264                             help="Network port (%s default)." % port,
265                             default=port, type="int")
266     optionparser.add_option("-b", "--bind", dest="bind_address",
267                             help="Bind address",
268                             default="", metavar="ADDRESS")
269
270   if daemon_name in constants.DAEMONS_SSL:
271     default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
272     optionparser.add_option("--no-ssl", dest="ssl",
273                             help="Do not secure HTTP protocol with SSL",
274                             default=True, action="store_false")
275     optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
276                             help="SSL key",
277                             default=default_key, type="string")
278     optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
279                             help="SSL certificate",
280                             default=default_cert, type="string")
281
282   multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
283
284   options, args = optionparser.parse_args()
285
286   if hasattr(options, 'ssl') and options.ssl:
287     if not (options.ssl_cert and options.ssl_key):
288       print >> sys.stderr, "Need key and certificate to use ssl"
289       sys.exit(constants.EXIT_FAILURE)
290     for fname in (options.ssl_cert, options.ssl_key):
291       if not os.path.isfile(fname):
292         print >> sys.stderr, "Need ssl file %s to run" % fname
293         sys.exit(constants.EXIT_FAILURE)
294
295   if check_fn is not None:
296     check_fn(options, args)
297
298   utils.EnsureDirs(dirs)
299
300   if options.fork:
301     utils.CloseFDs()
302     utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
303
304   utils.WritePidFile(daemon_name)
305   try:
306     utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
307                        debug=options.debug,
308                        stderr_logging=not options.fork,
309                        multithreaded=multithread)
310     logging.info("%s daemon startup" % daemon_name)
311     exec_fn(options, args)
312   finally:
313     utils.RemovePidFile(daemon_name)