4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
35 from ganeti import utils
36 from ganeti import constants
37 from ganeti import errors
40 class SchedulerBreakout(Exception):
41 """Exception used to get out of the scheduler loop
46 def AsyncoreDelayFunction(timeout):
47 """Asyncore-compatible scheduler delay function.
49 This is a delay function for sched that, rather than actually sleeping,
50 executes asyncore events happening in the meantime.
52 After an event has occurred, rather than returning, it raises a
53 SchedulerBreakout exception, which will force the current scheduler.run()
54 invocation to terminate, so that we can also check for signals. The main loop
55 will then call the scheduler run again, which will allow it to actually
56 process any due events.
58 This is needed because scheduler.run() doesn't support a count=..., as
59 asyncore loop, and the scheduler module documents throwing exceptions from
60 inside the delay function as an allowed usage model.
63 asyncore.loop(timeout=timeout, count=1, use_poll=True)
64 raise SchedulerBreakout()
67 class AsyncoreScheduler(sched.scheduler):
68 """Event scheduler integrated with asyncore
71 def __init__(self, timefunc):
72 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
75 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
76 """Base Ganeti Asyncore Dispacher
79 # this method is overriding an asyncore.dispatcher method
80 def handle_error(self):
81 """Log an error in handling any request, and proceed.
84 logging.exception("Error while handling asyncore request")
86 # this method is overriding an asyncore.dispatcher method
88 """Most of the time we don't want to check for writability.
94 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
95 """An improved asyncore udp socket.
99 """Constructor for AsyncUDPSocket
102 GanetiBaseAsyncoreDispatcher.__init__(self)
104 self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
106 # this method is overriding an asyncore.dispatcher method
107 def handle_connect(self):
108 # Python thinks that the first udp message from a source qualifies as a
109 # "connect" and further ones are part of the same connection. We beg to
110 # differ and treat all messages equally.
113 # this method is overriding an asyncore.dispatcher method
114 def handle_read(self):
115 recv_result = utils.IgnoreSignals(self.recvfrom,
116 constants.MAX_UDP_DATA_SIZE)
117 if recv_result is not None:
118 payload, address = recv_result
120 self.handle_datagram(payload, ip, port)
122 def handle_datagram(self, payload, ip, port):
123 """Handle an already read udp datagram
126 raise NotImplementedError
128 # this method is overriding an asyncore.dispatcher method
130 # We should check whether we can write to the socket only if we have
131 # something scheduled to be written
132 return bool(self._out_queue)
134 # this method is overriding an asyncore.dispatcher method
135 def handle_write(self):
136 if not self._out_queue:
137 logging.error("handle_write called with empty output queue")
139 (ip, port, payload) = self._out_queue[0]
140 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
141 self._out_queue.pop(0)
143 def enqueue_send(self, ip, port, payload):
144 """Enqueue a datagram to be sent when possible
147 if len(payload) > constants.MAX_UDP_DATA_SIZE:
148 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
149 constants.MAX_UDP_DATA_SIZE))
150 self._out_queue.append((ip, port, payload))
152 def process_next_packet(self, timeout=0):
153 """Process the next datagram, waiting for it if necessary.
156 @param timeout: how long to wait for data
158 @return: True if some data has been handled, False otherwise
161 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
162 if result is not None and result & select.POLLIN:
169 class Mainloop(object):
170 """Generic mainloop for daemons
172 @ivar scheduler: A sched.scheduler object, which can be used to register
177 """Constructs a new Mainloop instance.
180 self._signal_wait = []
181 self.scheduler = AsyncoreScheduler(time.time)
183 @utils.SignalHandled([signal.SIGCHLD])
184 @utils.SignalHandled([signal.SIGTERM])
185 @utils.SignalHandled([signal.SIGINT])
186 def Run(self, signal_handlers=None):
187 """Runs the mainloop.
189 @type signal_handlers: dict
190 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
193 assert isinstance(signal_handlers, dict) and \
194 len(signal_handlers) > 0, \
195 "Broken SignalHandled decorator"
197 # Start actual main loop
199 if not self.scheduler.empty():
202 except SchedulerBreakout:
205 asyncore.loop(count=1, use_poll=True)
207 # Check whether a signal was raised
208 for sig in signal_handlers:
209 handler = signal_handlers[sig]
211 self._CallSignalWaiters(sig)
212 running = sig not in (signal.SIGTERM, signal.SIGINT)
215 def _CallSignalWaiters(self, signum):
216 """Calls all signal waiters for a certain signal.
219 @param signum: Signal number
222 for owner in self._signal_wait:
223 owner.OnSignal(signum)
225 def RegisterSignal(self, owner):
226 """Registers a receiver for signal notifications
228 The receiver must support a "OnSignal(self, signum)" function.
230 @type owner: instance
231 @param owner: Receiver
234 self._signal_wait.append(owner)
237 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
238 multithreaded=False, console_logging=False,
239 default_ssl_cert=None, default_ssl_key=None):
240 """Shared main function for daemons.
242 @type daemon_name: string
243 @param daemon_name: daemon name
244 @type optionparser: optparse.OptionParser
245 @param optionparser: initialized optionparser with daemon-specific options
246 (common -f -d options will be handled by this module)
247 @type dirs: list of (string, integer)
248 @param dirs: list of directories that must be created if they don't exist,
249 and the permissions to be used to create them
250 @type check_fn: function which accepts (options, args)
251 @param check_fn: function that checks start conditions and exits if they're
253 @type exec_fn: function which accepts (options, args)
254 @param exec_fn: function that's executed with the daemon's pid file held, and
255 runs the daemon itself.
256 @type multithreaded: bool
257 @param multithreaded: Whether the daemon uses threads
258 @type console_logging: boolean
259 @param console_logging: if True, the daemon will fall back to the system
260 console if logging fails
261 @type default_ssl_cert: string
262 @param default_ssl_cert: Default SSL certificate path
263 @type default_ssl_key: string
264 @param default_ssl_key: Default SSL key path
267 optionparser.add_option("-f", "--foreground", dest="fork",
268 help="Don't detach from the current terminal",
269 default=True, action="store_false")
270 optionparser.add_option("-d", "--debug", dest="debug",
271 help="Enable some debug messages",
272 default=False, action="store_true")
273 optionparser.add_option("--syslog", dest="syslog",
274 help="Enable logging to syslog (except debug"
275 " messages); one of 'no', 'yes' or 'only' [%s]" %
276 constants.SYSLOG_USAGE,
277 default=constants.SYSLOG_USAGE,
278 choices=["no", "yes", "only"])
280 if daemon_name in constants.DAEMONS_PORTS:
281 default_bind_address = "0.0.0.0"
282 default_port = utils.GetDaemonPort(daemon_name)
284 # For networked daemons we allow choosing the port and bind address
285 optionparser.add_option("-p", "--port", dest="port",
286 help="Network port (default: %s)" % default_port,
287 default=default_port, type="int")
288 optionparser.add_option("-b", "--bind", dest="bind_address",
289 help=("Bind address (default: %s)" %
290 default_bind_address),
291 default=default_bind_address, metavar="ADDRESS")
293 if default_ssl_key is not None and default_ssl_cert is not None:
294 optionparser.add_option("--no-ssl", dest="ssl",
295 help="Do not secure HTTP protocol with SSL",
296 default=True, action="store_false")
297 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
298 help=("SSL key path (default: %s)" %
300 default=default_ssl_key, type="string",
301 metavar="SSL_KEY_PATH")
302 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
303 help=("SSL certificate path (default: %s)" %
305 default=default_ssl_cert, type="string",
306 metavar="SSL_CERT_PATH")
308 # Disable the use of fork(2) if the daemon uses threads
309 utils.no_fork = multithreaded
311 options, args = optionparser.parse_args()
313 if getattr(options, "ssl", False):
315 "certificate": options.ssl_cert,
316 "key": options.ssl_key,
319 for name, path in ssl_paths.iteritems():
320 if not os.path.isfile(path):
321 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
322 sys.exit(constants.EXIT_FAILURE)
324 # TODO: By initiating http.HttpSslParams here we would only read the files
325 # once and have a proper validation (isfile returns False on directories)
328 if check_fn is not None:
329 check_fn(options, args)
331 utils.EnsureDirs(dirs)
335 utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
337 utils.WritePidFile(daemon_name)
339 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
341 stderr_logging=not options.fork,
342 multithreaded=multithreaded,
344 syslog=options.syslog,
345 console_logging=console_logging)
346 logging.info("%s daemon startup", daemon_name)
347 exec_fn(options, args)
349 utils.RemovePidFile(daemon_name)