SimpleConfigReader: s/Reload/_Load/
[ganeti-local] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import os
26 import select
27 import signal
28 import errno
29 import time
30 import logging
31
32 from ganeti import utils
33 from ganeti import constants
34
35
36 class Timer(object):
37   def __init__(self, owner, timer_id, start, interval, repeat):
38     self.owner = owner
39     self.timer_id = timer_id
40     self.start = start
41     self.interval = interval
42     self.repeat = repeat
43
44
45 class Mainloop(object):
46   """Generic mainloop for daemons
47
48   """
49   def __init__(self):
50     """Constructs a new Mainloop instance.
51
52     """
53     self._io_wait = {}
54     self._io_wait_add = []
55     self._io_wait_remove = []
56     self._signal_wait = []
57     self._timer_id_last = 0
58     self._timer = {}
59     self._timer_add = []
60     self._timer_remove = []
61
62   def Run(self, handle_sigchld=True, handle_sigterm=True, stop_on_empty=False):
63     """Runs the mainloop.
64
65     @type handle_sigchld: bool
66     @param handle_sigchld: Whether to install handler for SIGCHLD
67     @type handle_sigterm: bool
68     @param handle_sigterm: Whether to install handler for SIGTERM
69     @type stop_on_empty: bool
70     @param stop_on_empty: Whether to stop mainloop once all I/O waiters
71                           unregistered
72
73     """
74     poller = select.poll()
75
76     # Setup signal handlers
77     if handle_sigchld:
78       sigchld_handler = utils.SignalHandler([signal.SIGCHLD])
79     else:
80       sigchld_handler = None
81     try:
82       if handle_sigterm:
83         sigterm_handler = utils.SignalHandler([signal.SIGTERM])
84       else:
85         sigterm_handler = None
86
87       try:
88         running = True
89         timeout = None
90         timeout_needs_update = True
91
92         # Start actual main loop
93         while running:
94           # Entries could be added again afterwards, hence removing first
95           if self._io_wait_remove:
96             for fd in self._io_wait_remove:
97               try:
98                 poller.unregister(fd)
99               except KeyError:
100                 pass
101               try:
102                 del self._io_wait[fd]
103               except KeyError:
104                 pass
105             self._io_wait_remove = []
106
107           # Add new entries
108           if self._io_wait_add:
109             for (owner, fd, conditions) in self._io_wait_add:
110               self._io_wait[fd] = owner
111               poller.register(fd, conditions)
112             self._io_wait_add = []
113
114           # Add new timers
115           if self._timer_add:
116             timeout_needs_update = True
117             for timer in self._timer_add:
118               self._timer[timer.timer_id] = timer
119             del self._timer_add[:]
120
121           # Remove timers
122           if self._timer_remove:
123             timeout_needs_update = True
124             for timer_id in self._timer_remove:
125               try:
126                 del self._timer[timer_id]
127               except KeyError:
128                 pass
129             del self._timer_remove[:]
130
131           # Stop if nothing is listening anymore
132           if stop_on_empty and not (self._io_wait or self._timer):
133             break
134
135           # Calculate timeout again if required
136           if timeout_needs_update:
137             timeout = self._CalcTimeout(time.time())
138             timeout_needs_update = False
139
140           # Wait for I/O events
141           try:
142             io_events = poller.poll(timeout)
143           except select.error, err:
144             # EINTR can happen when signals are sent
145             if err.args and err.args[0] in (errno.EINTR,):
146               io_events = None
147             else:
148               raise
149
150           after_poll = time.time()
151
152           if io_events:
153             # Check for I/O events
154             for (evfd, evcond) in io_events:
155               owner = self._io_wait.get(evfd, None)
156               if owner:
157                 owner.OnIO(evfd, evcond)
158
159           if self._timer:
160             self._CheckTimers(after_poll)
161
162           # Check whether signal was raised
163           if sigchld_handler and sigchld_handler.called:
164             self._CallSignalWaiters(signal.SIGCHLD)
165             sigchld_handler.Clear()
166
167           if sigterm_handler and sigterm_handler.called:
168             self._CallSignalWaiters(signal.SIGTERM)
169             running = False
170             sigterm_handler.Clear()
171       finally:
172         # Restore signal handlers
173         if sigterm_handler:
174           sigterm_handler.Reset()
175     finally:
176       if sigchld_handler:
177         sigchld_handler.Reset()
178
179   def _CalcTimeout(self, now):
180     if not self._timer:
181       return None
182
183     timeout = None
184
185     # TODO: Repeating timers
186
187     min_timeout = 0.001
188
189     for timer in self._timer.itervalues():
190       time_left = (timer.start + timer.interval) - now
191       if timeout is None or time_left < timeout:
192         timeout = time_left
193       if timeout < 0:
194         timeout = 0
195         break
196       elif timeout < min_timeout:
197         timeout = min_timeout
198         break
199
200     return timeout * 1000.0
201
202   def _CheckTimers(self, now):
203     # TODO: Repeating timers
204     for timer in self._timer.itervalues():
205       if now < (timer.start + timer.interval):
206         continue
207
208       timer.owner.OnTimer(timer.timer_id)
209
210       # TODO: Repeating timers should not be removed
211       self._timer_remove.append(timer.timer_id)
212
213   def _CallSignalWaiters(self, signum):
214     """Calls all signal waiters for a certain signal.
215
216     @type signum: int
217     @param signum: Signal number
218
219     """
220     for owner in self._signal_wait:
221       owner.OnSignal(signal.SIGCHLD)
222
223   def RegisterIO(self, owner, fd, condition):
224     """Registers a receiver for I/O notifications
225
226     The receiver must support a "OnIO(self, fd, conditions)" function.
227
228     @type owner: instance
229     @param owner: Receiver
230     @type fd: int
231     @param fd: File descriptor
232     @type condition: int
233     @param condition: ORed field of conditions to be notified
234                       (see select module)
235
236     """
237     # select.Poller also supports file() like objects, but we don't.
238     assert isinstance(fd, (int, long)), \
239       "Only integers are supported for file descriptors"
240
241     self._io_wait_add.append((owner, fd, condition))
242
243   def UnregisterIO(self, fd):
244     """Unregister a file descriptor.
245
246     It'll be unregistered the next time the mainloop checks for it.
247
248     @type fd: int
249     @param fd: File descriptor
250
251     """
252     # select.Poller also supports file() like objects, but we don't.
253     assert isinstance(fd, (int, long)), \
254       "Only integers are supported for file descriptors"
255
256     self._io_wait_remove.append(fd)
257
258   def RegisterSignal(self, owner):
259     """Registers a receiver for signal notifications
260
261     The receiver must support a "OnSignal(self, signum)" function.
262
263     @type owner: instance
264     @param owner: Receiver
265
266     """
267     self._signal_wait.append(owner)
268
269   def AddTimer(self, owner, interval, repeat):
270     """Add a new timer.
271
272     The receiver must support a "OnTimer(self, timer_id)" function.
273
274     @type owner: instance
275     @param owner: Receiver
276     @type interval: int or float
277     @param interval: Timer interval in seconds
278     @type repeat: bool
279     @param repeat: Whether this is a repeating timer or one-off
280
281     """
282     # TODO: Implement repeating timers
283     assert not repeat, "Repeating timers are not yet supported"
284
285     # Get new ID
286     self._timer_id_last += 1
287
288     timer_id = self._timer_id_last
289
290     self._timer_add.append(Timer(owner, timer_id, time.time(),
291                                  float(interval), repeat))
292
293     return timer_id
294
295   def RemoveTimer(self, timer_id):
296     """Removes a timer.
297
298     @type timer_id: int
299     @param timer_id: Timer ID
300
301     """
302     self._timer_remove.append(timer_id)
303
304
305 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn):
306   """Shared main function for daemons.
307
308   @type daemon_name: string
309   @param daemon_name: daemon name
310   @type optionparser: L{optparse.OptionParser}
311   @param optionparser: initialized optionparser with daemon-specific options
312                        (common -f -d options will be handled by this module)
313   @type options: object @param options: OptionParser result, should contain at
314                  least the fork and the debug options
315   @type dirs: list of strings
316   @param dirs: list of directories that must exist for this daemon to work
317   @type check_fn: function which accepts (options, args)
318   @param check_fn: function that checks start conditions and exits if they're
319                    not met
320   @type exec_fn: function which accepts (options, args)
321   @param exec_fn: function that's executed with the daemon's pid file held, and
322                   runs the daemon itself.
323
324   """
325   optionparser.add_option("-f", "--foreground", dest="fork",
326                           help="Don't detach from the current terminal",
327                           default=True, action="store_false")
328   optionparser.add_option("-d", "--debug", dest="debug",
329                           help="Enable some debug messages",
330                           default=False, action="store_true")
331   if daemon_name in constants.DAEMONS_PORTS:
332     # for networked daemons we also allow choosing the bind port and address.
333     # by default we use the port provided by utils.GetDaemonPort, and bind to
334     # 0.0.0.0 (which is represented by and empty bind address.
335     port = utils.GetDaemonPort(daemon_name)
336     optionparser.add_option("-p", "--port", dest="port",
337                             help="Network port (%s default)." % port,
338                             default=port, type="int")
339     optionparser.add_option("-b", "--bind", dest="bind_address",
340                             help="Bind address",
341                             default="", metavar="ADDRESS")
342
343   if daemon_name in constants.DAEMONS_SSL:
344     default_cert, default_key = constants.DAEMONS_SSL[daemon_name]
345     optionparser.add_option("--no-ssl", dest="ssl",
346                             help="Do not secure HTTP protocol with SSL",
347                             default=True, action="store_false")
348     optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
349                             help="SSL key",
350                             default=default_key, type="string")
351     optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
352                             help="SSL certificate",
353                             default=default_cert, type="string")
354
355   multithread = utils.no_fork = daemon_name in constants.MULTITHREADED_DAEMONS
356
357   options, args = optionparser.parse_args()
358
359   if hasattr(options, 'ssl') and options.ssl:
360     if not (options.ssl_cert and options.ssl_key):
361       print >> sys.stderr, "Need key and certificate to use ssl"
362       sys.exit(constants.EXIT_FAILURE)
363     for fname in (options.ssl_cert, options.ssl_key):
364       if not os.path.isfile(fname):
365         print >> sys.stderr, "Need ssl file %s to run" % fname
366         sys.exit(constants.EXIT_FAILURE)
367
368   if check_fn is not None:
369     check_fn(options, args)
370
371   utils.EnsureDirs(dirs)
372
373   if options.fork:
374     utils.CloseFDs()
375     utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
376
377   utils.WritePidFile(daemon_name)
378   try:
379     utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
380                        debug=options.debug,
381                        stderr_logging=not options.fork,
382                        multithreaded=multithread)
383     logging.info("%s daemon startup" % daemon_name)
384     exec_fn(options, args)
385   finally:
386     utils.RemovePidFile(daemon_name)
387