4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
35 from ganeti import utils
36 from ganeti import constants
37 from ganeti import errors
40 class SchedulerBreakout(Exception):
41 """Exception used to get out of the scheduler loop
46 def AsyncoreDelayFunction(timeout):
47 """Asyncore-compatible scheduler delay function.
49 This is a delay function for sched that, rather than actually sleeping,
50 executes asyncore events happening in the meantime.
52 After an event has occurred, rather than returning, it raises a
53 SchedulerBreakout exception, which will force the current scheduler.run()
54 invocation to terminate, so that we can also check for signals. The main loop
55 will then call the scheduler run again, which will allow it to actually
56 process any due events.
58 This is needed because scheduler.run() doesn't support a count=..., as
59 asyncore loop, and the scheduler module documents throwing exceptions from
60 inside the delay function as an allowed usage model.
63 asyncore.loop(timeout=timeout, count=1, use_poll=True)
64 raise SchedulerBreakout()
67 class AsyncoreScheduler(sched.scheduler):
68 """Event scheduler integrated with asyncore
71 def __init__(self, timefunc):
72 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
75 class AsyncUDPSocket(asyncore.dispatcher):
76 """An improved asyncore udp socket.
80 """Constructor for AsyncUDPSocket
83 asyncore.dispatcher.__init__(self)
85 self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
87 # this method is overriding an asyncore.dispatcher method
88 def handle_connect(self):
89 # Python thinks that the first udp message from a source qualifies as a
90 # "connect" and further ones are part of the same connection. We beg to
91 # differ and treat all messages equally.
94 # this method is overriding an asyncore.dispatcher method
95 def handle_read(self):
96 payload, address = utils.IgnoreSignals(self.recvfrom,
97 constants.MAX_UDP_DATA_SIZE)
99 self.handle_datagram(payload, ip, port)
101 def handle_datagram(self, payload, ip, port):
102 """Handle an already read udp datagram
105 raise NotImplementedError
107 # this method is overriding an asyncore.dispatcher method
109 # We should check whether we can write to the socket only if we have
110 # something scheduled to be written
111 return bool(self._out_queue)
113 # this method is overriding an asyncore.dispatcher method
114 def handle_write(self):
115 if not self._out_queue:
116 logging.error("handle_write called with empty output queue")
118 (ip, port, payload) = self._out_queue[0]
119 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
120 self._out_queue.pop(0)
122 # this method is overriding an asyncore.dispatcher method
123 def handle_error(self):
124 """Log an error in handling any request, and proceed.
127 logging.exception("Error while handling asyncore request")
129 def enqueue_send(self, ip, port, payload):
130 """Enqueue a datagram to be sent when possible
133 if len(payload) > constants.MAX_UDP_DATA_SIZE:
134 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
135 constants.MAX_UDP_DATA_SIZE))
136 self._out_queue.append((ip, port, payload))
138 def process_next_packet(self, timeout=0):
139 """Process the next datagram, waiting for it if necessary.
142 @param timeout: how long to wait for data
144 @return: True if some data has been handled, False otherwise
147 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
148 if result is not None and result & select.POLLIN:
155 class Mainloop(object):
156 """Generic mainloop for daemons
158 @ivar scheduler: A sched.scheduler object, which can be used to register
163 """Constructs a new Mainloop instance.
166 self._signal_wait = []
167 self.scheduler = AsyncoreScheduler(time.time)
169 @utils.SignalHandled([signal.SIGCHLD])
170 @utils.SignalHandled([signal.SIGTERM])
171 @utils.SignalHandled([signal.SIGINT])
172 def Run(self, signal_handlers=None):
173 """Runs the mainloop.
175 @type signal_handlers: dict
176 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
179 assert isinstance(signal_handlers, dict) and \
180 len(signal_handlers) > 0, \
181 "Broken SignalHandled decorator"
183 # Start actual main loop
185 if not self.scheduler.empty():
188 except SchedulerBreakout:
191 asyncore.loop(count=1, use_poll=True)
193 # Check whether a signal was raised
194 for sig in signal_handlers:
195 handler = signal_handlers[sig]
197 self._CallSignalWaiters(sig)
198 running = sig not in (signal.SIGTERM, signal.SIGINT)
201 def _CallSignalWaiters(self, signum):
202 """Calls all signal waiters for a certain signal.
205 @param signum: Signal number
208 for owner in self._signal_wait:
209 owner.OnSignal(signum)
211 def RegisterSignal(self, owner):
212 """Registers a receiver for signal notifications
214 The receiver must support a "OnSignal(self, signum)" function.
216 @type owner: instance
217 @param owner: Receiver
220 self._signal_wait.append(owner)
223 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
224 multithreaded=False, console_logging=False,
225 default_ssl_cert=None, default_ssl_key=None):
226 """Shared main function for daemons.
228 @type daemon_name: string
229 @param daemon_name: daemon name
230 @type optionparser: optparse.OptionParser
231 @param optionparser: initialized optionparser with daemon-specific options
232 (common -f -d options will be handled by this module)
233 @type dirs: list of (string, integer)
234 @param dirs: list of directories that must be created if they don't exist,
235 and the permissions to be used to create them
236 @type check_fn: function which accepts (options, args)
237 @param check_fn: function that checks start conditions and exits if they're
239 @type exec_fn: function which accepts (options, args)
240 @param exec_fn: function that's executed with the daemon's pid file held, and
241 runs the daemon itself.
242 @type multithreaded: bool
243 @param multithreaded: Whether the daemon uses threads
244 @type console_logging: boolean
245 @param console_logging: if True, the daemon will fall back to the system
246 console if logging fails
247 @type default_ssl_cert: string
248 @param default_ssl_cert: Default SSL certificate path
249 @type default_ssl_key: string
250 @param default_ssl_key: Default SSL key path
253 optionparser.add_option("-f", "--foreground", dest="fork",
254 help="Don't detach from the current terminal",
255 default=True, action="store_false")
256 optionparser.add_option("-d", "--debug", dest="debug",
257 help="Enable some debug messages",
258 default=False, action="store_true")
259 optionparser.add_option("--syslog", dest="syslog",
260 help="Enable logging to syslog (except debug"
261 " messages); one of 'no', 'yes' or 'only' [%s]" %
262 constants.SYSLOG_USAGE,
263 default=constants.SYSLOG_USAGE,
264 choices=["no", "yes", "only"])
266 if daemon_name in constants.DAEMONS_PORTS:
267 default_bind_address = "0.0.0.0"
268 default_port = utils.GetDaemonPort(daemon_name)
270 # For networked daemons we allow choosing the port and bind address
271 optionparser.add_option("-p", "--port", dest="port",
272 help="Network port (default: %s)" % default_port,
273 default=default_port, type="int")
274 optionparser.add_option("-b", "--bind", dest="bind_address",
275 help=("Bind address (default: %s)" %
276 default_bind_address),
277 default=default_bind_address, metavar="ADDRESS")
279 if default_ssl_key is not None and default_ssl_cert is not None:
280 optionparser.add_option("--no-ssl", dest="ssl",
281 help="Do not secure HTTP protocol with SSL",
282 default=True, action="store_false")
283 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
284 help=("SSL key path (default: %s)" %
286 default=default_ssl_key, type="string",
287 metavar="SSL_KEY_PATH")
288 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
289 help=("SSL certificate path (default: %s)" %
291 default=default_ssl_cert, type="string",
292 metavar="SSL_CERT_PATH")
294 # Disable the use of fork(2) if the daemon uses threads
295 utils.no_fork = multithreaded
297 options, args = optionparser.parse_args()
299 if getattr(options, "ssl", False):
301 "certificate": options.ssl_cert,
302 "key": options.ssl_key,
305 for name, path in ssl_paths.iteritems():
306 if not os.path.isfile(path):
307 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
308 sys.exit(constants.EXIT_FAILURE)
310 # TODO: By initiating http.HttpSslParams here we would only read the files
311 # once and have a proper validation (isfile returns False on directories)
314 if check_fn is not None:
315 check_fn(options, args)
317 utils.EnsureDirs(dirs)
321 utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
323 utils.WritePidFile(daemon_name)
325 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
327 stderr_logging=not options.fork,
328 multithreaded=multithreaded,
330 syslog=options.syslog,
331 console_logging=console_logging)
332 logging.info("%s daemon startup", daemon_name)
333 exec_fn(options, args)
335 utils.RemovePidFile(daemon_name)