4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
38 from ganeti import utils
39 from ganeti import constants
40 from ganeti import errors
43 _DEFAULT_RUN_USER = "root"
44 _DEFAULT_RUN_GROUP = "root"
47 class SchedulerBreakout(Exception):
48 """Exception used to get out of the scheduler loop
53 def AsyncoreDelayFunction(timeout):
54 """Asyncore-compatible scheduler delay function.
56 This is a delay function for sched that, rather than actually sleeping,
57 executes asyncore events happening in the meantime.
59 After an event has occurred, rather than returning, it raises a
60 SchedulerBreakout exception, which will force the current scheduler.run()
61 invocation to terminate, so that we can also check for signals. The main loop
62 will then call the scheduler run again, which will allow it to actually
63 process any due events.
65 This is needed because scheduler.run() doesn't support a count=..., as
66 asyncore loop, and the scheduler module documents throwing exceptions from
67 inside the delay function as an allowed usage model.
70 asyncore.loop(timeout=timeout, count=1, use_poll=True)
71 raise SchedulerBreakout()
74 class AsyncoreScheduler(sched.scheduler):
75 """Event scheduler integrated with asyncore
78 def __init__(self, timefunc):
79 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
82 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
83 """Base Ganeti Asyncore Dispacher
86 # this method is overriding an asyncore.dispatcher method
87 def handle_error(self):
88 """Log an error in handling any request, and proceed.
91 logging.exception("Error while handling asyncore request")
93 # this method is overriding an asyncore.dispatcher method
95 """Most of the time we don't want to check for writability.
101 def FormatAddress(family, address):
102 """Format a client's address
104 @type family: integer
105 @param family: socket family (one of socket.AF_*)
106 @type address: family specific (usually tuple)
107 @param address: address, as reported by this class
110 if family == socket.AF_INET and len(address) == 2:
111 return "%s:%d" % address
112 elif family == socket.AF_UNIX and len(address) == 3:
113 return "pid=%s, uid=%s, gid=%s" % address
118 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
119 """A stream server to use with asyncore.
121 Each request is accepted, and then dispatched to a separate asyncore
122 dispatcher to handle.
126 _REQUEST_QUEUE_SIZE = 5
128 def __init__(self, family, address):
129 """Constructor for AsyncUnixStreamSocket
131 @type family: integer
132 @param family: socket family (one of socket.AF_*)
133 @type address: address family dependent
134 @param address: address to bind the socket to
137 GanetiBaseAsyncoreDispatcher.__init__(self)
139 self.create_socket(self.family, socket.SOCK_STREAM)
140 self.set_reuse_addr()
142 self.listen(self._REQUEST_QUEUE_SIZE)
144 # this method is overriding an asyncore.dispatcher method
145 def handle_accept(self):
146 """Accept a new client connection.
148 Creates a new instance of the handler class, which will use asyncore to
152 accept_result = utils.IgnoreSignals(self.accept)
153 if accept_result is not None:
154 connected_socket, client_address = accept_result
155 if self.family == socket.AF_UNIX:
156 # override the client address, as for unix sockets nothing meaningful
157 # is passed in from accept anyway
158 client_address = utils.GetSocketCredentials(connected_socket)
159 logging.info("Accepted connection from %s",
160 FormatAddress(self.family, client_address))
161 self.handle_connection(connected_socket, client_address)
163 def handle_connection(self, connected_socket, client_address):
164 """Handle an already accepted connection.
167 raise NotImplementedError
170 class AsyncTerminatedMessageStream(asynchat.async_chat):
171 """A terminator separated message stream asyncore module.
173 Handles a stream connection receiving messages terminated by a defined
174 separator. For each complete message handle_message is called.
177 def __init__(self, connected_socket, peer_address, terminator, family):
178 """AsyncTerminatedMessageStream constructor.
180 @type connected_socket: socket.socket
181 @param connected_socket: connected stream socket to receive messages from
182 @param peer_address: family-specific peer address
183 @type terminator: string
184 @param terminator: terminator separating messages in the stream
185 @type family: integer
186 @param family: socket family
189 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
190 # using a positional argument rather than a keyword one.
191 asynchat.async_chat.__init__(self, connected_socket)
192 self.connected_socket = connected_socket
193 # on python 2.4 there is no "family" attribute for the socket class
194 # FIXME: when we move to python 2.5 or above remove the family parameter
195 #self.family = self.connected_socket.family
197 self.peer_address = peer_address
198 self.terminator = terminator
199 self.set_terminator(terminator)
201 self.next_incoming_message = 0
203 # this method is overriding an asynchat.async_chat method
204 def collect_incoming_data(self, data):
205 self.ibuffer.append(data)
207 # this method is overriding an asynchat.async_chat method
208 def found_terminator(self):
209 message = "".join(self.ibuffer)
211 message_id = self.next_incoming_message
212 self.next_incoming_message += 1
213 self.handle_message(message, message_id)
215 def handle_message(self, message, message_id):
216 """Handle a terminated message.
218 @type message: string
219 @param message: message to handle
220 @type message_id: integer
221 @param message_id: stream's message sequence number
225 # TODO: move this method to raise NotImplementedError
226 # raise NotImplementedError
229 logging.info("Closing connection from %s",
230 FormatAddress(self.family, self.peer_address))
233 # this method is overriding an asyncore.dispatcher method
234 def handle_expt(self):
237 # this method is overriding an asyncore.dispatcher method
238 def handle_error(self):
239 """Log an error in handling any request, and proceed.
242 logging.exception("Error while handling asyncore request")
246 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
247 """An improved asyncore udp socket.
251 """Constructor for AsyncUDPSocket
254 GanetiBaseAsyncoreDispatcher.__init__(self)
256 self.create_socket(socket.AF_INET, socket.SOCK_DGRAM)
258 # this method is overriding an asyncore.dispatcher method
259 def handle_connect(self):
260 # Python thinks that the first udp message from a source qualifies as a
261 # "connect" and further ones are part of the same connection. We beg to
262 # differ and treat all messages equally.
265 # this method is overriding an asyncore.dispatcher method
266 def handle_read(self):
267 recv_result = utils.IgnoreSignals(self.recvfrom,
268 constants.MAX_UDP_DATA_SIZE)
269 if recv_result is not None:
270 payload, address = recv_result
272 self.handle_datagram(payload, ip, port)
274 def handle_datagram(self, payload, ip, port):
275 """Handle an already read udp datagram
278 raise NotImplementedError
280 # this method is overriding an asyncore.dispatcher method
282 # We should check whether we can write to the socket only if we have
283 # something scheduled to be written
284 return bool(self._out_queue)
286 # this method is overriding an asyncore.dispatcher method
287 def handle_write(self):
288 if not self._out_queue:
289 logging.error("handle_write called with empty output queue")
291 (ip, port, payload) = self._out_queue[0]
292 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
293 self._out_queue.pop(0)
295 def enqueue_send(self, ip, port, payload):
296 """Enqueue a datagram to be sent when possible
299 if len(payload) > constants.MAX_UDP_DATA_SIZE:
300 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
301 constants.MAX_UDP_DATA_SIZE))
302 self._out_queue.append((ip, port, payload))
304 def process_next_packet(self, timeout=0):
305 """Process the next datagram, waiting for it if necessary.
308 @param timeout: how long to wait for data
310 @return: True if some data has been handled, False otherwise
313 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
314 if result is not None and result & select.POLLIN:
321 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
322 """A way to notify the asyncore loop that something is going on.
324 If an asyncore daemon is multithreaded when a thread tries to push some data
325 to a socket, the main loop handling asynchronous requests might be sleeping
326 waiting on a select(). To avoid this it can create an instance of the
327 AsyncAwaker, which other threads can use to wake it up.
330 def __init__(self, signal_fn=None):
331 """Constructor for AsyncAwaker
333 @type signal_fn: function
334 @param signal_fn: function to call when awaken
337 GanetiBaseAsyncoreDispatcher.__init__(self)
338 assert signal_fn == None or callable(signal_fn)
339 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
341 self.in_socket.setblocking(0)
342 self.in_socket.shutdown(socket.SHUT_WR)
343 self.out_socket.shutdown(socket.SHUT_RD)
344 self.set_socket(self.in_socket)
345 self.need_signal = True
346 self.signal_fn = signal_fn
347 self.connected = True
349 # this method is overriding an asyncore.dispatcher method
350 def handle_read(self):
351 utils.IgnoreSignals(self.recv, 4096)
354 self.need_signal = True
356 # this method is overriding an asyncore.dispatcher method
358 asyncore.dispatcher.close(self)
359 self.out_socket.close()
362 """Signal the asyncore main loop.
364 Any data we send here will be ignored, but it will cause the select() call
368 # Yes, there is a race condition here. No, we don't care, at worst we're
369 # sending more than one wakeup token, which doesn't harm at all.
371 self.need_signal = False
372 self.out_socket.send("\0")
375 class Mainloop(object):
376 """Generic mainloop for daemons
378 @ivar scheduler: A sched.scheduler object, which can be used to register
383 """Constructs a new Mainloop instance.
386 self._signal_wait = []
387 self.scheduler = AsyncoreScheduler(time.time)
389 @utils.SignalHandled([signal.SIGCHLD])
390 @utils.SignalHandled([signal.SIGTERM])
391 @utils.SignalHandled([signal.SIGINT])
392 def Run(self, signal_handlers=None):
393 """Runs the mainloop.
395 @type signal_handlers: dict
396 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
399 assert isinstance(signal_handlers, dict) and \
400 len(signal_handlers) > 0, \
401 "Broken SignalHandled decorator"
403 # Start actual main loop
405 if not self.scheduler.empty():
408 except SchedulerBreakout:
411 asyncore.loop(count=1, use_poll=True)
413 # Check whether a signal was raised
414 for sig in signal_handlers:
415 handler = signal_handlers[sig]
417 self._CallSignalWaiters(sig)
418 running = sig not in (signal.SIGTERM, signal.SIGINT)
421 def _CallSignalWaiters(self, signum):
422 """Calls all signal waiters for a certain signal.
425 @param signum: Signal number
428 for owner in self._signal_wait:
429 owner.OnSignal(signum)
431 def RegisterSignal(self, owner):
432 """Registers a receiver for signal notifications
434 The receiver must support a "OnSignal(self, signum)" function.
436 @type owner: instance
437 @param owner: Receiver
440 self._signal_wait.append(owner)
443 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
444 multithreaded=False, console_logging=False,
445 default_ssl_cert=None, default_ssl_key=None,
446 user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
447 """Shared main function for daemons.
449 @type daemon_name: string
450 @param daemon_name: daemon name
451 @type optionparser: optparse.OptionParser
452 @param optionparser: initialized optionparser with daemon-specific options
453 (common -f -d options will be handled by this module)
454 @type dirs: list of (string, integer)
455 @param dirs: list of directories that must be created if they don't exist,
456 and the permissions to be used to create them
457 @type check_fn: function which accepts (options, args)
458 @param check_fn: function that checks start conditions and exits if they're
460 @type exec_fn: function which accepts (options, args)
461 @param exec_fn: function that's executed with the daemon's pid file held, and
462 runs the daemon itself.
463 @type multithreaded: bool
464 @param multithreaded: Whether the daemon uses threads
465 @type console_logging: boolean
466 @param console_logging: if True, the daemon will fall back to the system
467 console if logging fails
468 @type default_ssl_cert: string
469 @param default_ssl_cert: Default SSL certificate path
470 @type default_ssl_key: string
471 @param default_ssl_key: Default SSL key path
472 @param user: Default user to run as
474 @param group: Default group to run as
478 optionparser.add_option("-f", "--foreground", dest="fork",
479 help="Don't detach from the current terminal",
480 default=True, action="store_false")
481 optionparser.add_option("-d", "--debug", dest="debug",
482 help="Enable some debug messages",
483 default=False, action="store_true")
484 optionparser.add_option("--syslog", dest="syslog",
485 help="Enable logging to syslog (except debug"
486 " messages); one of 'no', 'yes' or 'only' [%s]" %
487 constants.SYSLOG_USAGE,
488 default=constants.SYSLOG_USAGE,
489 choices=["no", "yes", "only"])
491 if daemon_name in constants.DAEMONS_PORTS:
492 default_bind_address = "0.0.0.0"
493 default_port = utils.GetDaemonPort(daemon_name)
495 # For networked daemons we allow choosing the port and bind address
496 optionparser.add_option("-p", "--port", dest="port",
497 help="Network port (default: %s)" % default_port,
498 default=default_port, type="int")
499 optionparser.add_option("-b", "--bind", dest="bind_address",
500 help=("Bind address (default: %s)" %
501 default_bind_address),
502 default=default_bind_address, metavar="ADDRESS")
504 if default_ssl_key is not None and default_ssl_cert is not None:
505 optionparser.add_option("--no-ssl", dest="ssl",
506 help="Do not secure HTTP protocol with SSL",
507 default=True, action="store_false")
508 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
509 help=("SSL key path (default: %s)" %
511 default=default_ssl_key, type="string",
512 metavar="SSL_KEY_PATH")
513 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
514 help=("SSL certificate path (default: %s)" %
516 default=default_ssl_cert, type="string",
517 metavar="SSL_CERT_PATH")
519 # Disable the use of fork(2) if the daemon uses threads
520 utils.no_fork = multithreaded
522 options, args = optionparser.parse_args()
524 if getattr(options, "ssl", False):
526 "certificate": options.ssl_cert,
527 "key": options.ssl_key,
530 for name, path in ssl_paths.iteritems():
531 if not os.path.isfile(path):
532 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
533 sys.exit(constants.EXIT_FAILURE)
535 # TODO: By initiating http.HttpSslParams here we would only read the files
536 # once and have a proper validation (isfile returns False on directories)
539 if check_fn is not None:
540 check_fn(options, args)
542 utils.EnsureDirs(dirs)
546 uid = pwd.getpwnam(user).pw_uid
547 gid = grp.getgrnam(group).gr_gid
549 raise errors.ConfigurationError("User or group not existing on system:"
550 " %s:%s" % (user, group))
552 utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
554 utils.WritePidFile(daemon_name)
556 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
558 stderr_logging=not options.fork,
559 multithreaded=multithreaded,
561 syslog=options.syslog,
562 console_logging=console_logging)
563 logging.info("%s daemon startup", daemon_name)
564 exec_fn(options, args)
566 utils.RemovePidFile(daemon_name)