4 # Copyright (C) 2006, 2007, 2008 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import netutils
45 _DEFAULT_RUN_USER = "root"
46 _DEFAULT_RUN_GROUP = "root"
49 class SchedulerBreakout(Exception):
50 """Exception used to get out of the scheduler loop
55 def AsyncoreDelayFunction(timeout):
56 """Asyncore-compatible scheduler delay function.
58 This is a delay function for sched that, rather than actually sleeping,
59 executes asyncore events happening in the meantime.
61 After an event has occurred, rather than returning, it raises a
62 SchedulerBreakout exception, which will force the current scheduler.run()
63 invocation to terminate, so that we can also check for signals. The main loop
64 will then call the scheduler run again, which will allow it to actually
65 process any due events.
67 This is needed because scheduler.run() doesn't support a count=..., as
68 asyncore loop, and the scheduler module documents throwing exceptions from
69 inside the delay function as an allowed usage model.
72 asyncore.loop(timeout=timeout, count=1, use_poll=True)
73 raise SchedulerBreakout()
76 class AsyncoreScheduler(sched.scheduler):
77 """Event scheduler integrated with asyncore
80 def __init__(self, timefunc):
81 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
84 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
85 """Base Ganeti Asyncore Dispacher
88 # this method is overriding an asyncore.dispatcher method
89 def handle_error(self):
90 """Log an error in handling any request, and proceed.
93 logging.exception("Error while handling asyncore request")
95 # this method is overriding an asyncore.dispatcher method
97 """Most of the time we don't want to check for writability.
103 def FormatAddress(family, address):
104 """Format a client's address
106 @type family: integer
107 @param family: socket family (one of socket.AF_*)
108 @type address: family specific (usually tuple)
109 @param address: address, as reported by this class
112 if family == socket.AF_INET and len(address) == 2:
113 return "%s:%d" % address
114 elif family == socket.AF_UNIX and len(address) == 3:
115 return "pid=%s, uid=%s, gid=%s" % address
120 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
121 """A stream server to use with asyncore.
123 Each request is accepted, and then dispatched to a separate asyncore
124 dispatcher to handle.
128 _REQUEST_QUEUE_SIZE = 5
130 def __init__(self, family, address):
131 """Constructor for AsyncUnixStreamSocket
133 @type family: integer
134 @param family: socket family (one of socket.AF_*)
135 @type address: address family dependent
136 @param address: address to bind the socket to
139 GanetiBaseAsyncoreDispatcher.__init__(self)
141 self.create_socket(self.family, socket.SOCK_STREAM)
142 self.set_reuse_addr()
144 self.listen(self._REQUEST_QUEUE_SIZE)
146 # this method is overriding an asyncore.dispatcher method
147 def handle_accept(self):
148 """Accept a new client connection.
150 Creates a new instance of the handler class, which will use asyncore to
154 accept_result = utils.IgnoreSignals(self.accept)
155 if accept_result is not None:
156 connected_socket, client_address = accept_result
157 if self.family == socket.AF_UNIX:
158 # override the client address, as for unix sockets nothing meaningful
159 # is passed in from accept anyway
160 client_address = netutils.GetSocketCredentials(connected_socket)
161 logging.info("Accepted connection from %s",
162 FormatAddress(self.family, client_address))
163 self.handle_connection(connected_socket, client_address)
165 def handle_connection(self, connected_socket, client_address):
166 """Handle an already accepted connection.
169 raise NotImplementedError
172 class AsyncTerminatedMessageStream(asynchat.async_chat):
173 """A terminator separated message stream asyncore module.
175 Handles a stream connection receiving messages terminated by a defined
176 separator. For each complete message handle_message is called.
179 def __init__(self, connected_socket, peer_address, terminator, family,
181 """AsyncTerminatedMessageStream constructor.
183 @type connected_socket: socket.socket
184 @param connected_socket: connected stream socket to receive messages from
185 @param peer_address: family-specific peer address
186 @type terminator: string
187 @param terminator: terminator separating messages in the stream
188 @type family: integer
189 @param family: socket family
190 @type unhandled_limit: integer or None
191 @param unhandled_limit: maximum unanswered messages
194 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
195 # using a positional argument rather than a keyword one.
196 asynchat.async_chat.__init__(self, connected_socket)
197 self.connected_socket = connected_socket
198 # on python 2.4 there is no "family" attribute for the socket class
199 # FIXME: when we move to python 2.5 or above remove the family parameter
200 #self.family = self.connected_socket.family
202 self.peer_address = peer_address
203 self.terminator = terminator
204 self.unhandled_limit = unhandled_limit
205 self.set_terminator(terminator)
207 self.receive_count = 0
209 self.oqueue = collections.deque()
210 self.iqueue = collections.deque()
212 # this method is overriding an asynchat.async_chat method
213 def collect_incoming_data(self, data):
214 self.ibuffer.append(data)
216 def _can_handle_message(self):
217 return (self.unhandled_limit is None or
218 (self.receive_count < self.send_count + self.unhandled_limit) and
221 # this method is overriding an asynchat.async_chat method
222 def found_terminator(self):
223 message = "".join(self.ibuffer)
225 message_id = self.receive_count
226 # We need to increase the receive_count after checking if the message can
227 # be handled, but before calling handle_message
228 can_handle = self._can_handle_message()
229 self.receive_count += 1
231 self.handle_message(message, message_id)
233 self.iqueue.append((message, message_id))
235 def handle_message(self, message, message_id):
236 """Handle a terminated message.
238 @type message: string
239 @param message: message to handle
240 @type message_id: integer
241 @param message_id: stream's message sequence number
245 # TODO: move this method to raise NotImplementedError
246 # raise NotImplementedError
248 def send_message(self, message):
249 """Send a message to the remote peer. This function is thread-safe.
251 @type message: string
252 @param message: message to send, without the terminator
254 @warning: If calling this function from a thread different than the one
255 performing the main asyncore loop, remember that you have to wake that one
259 # If we just append the message we received to the output queue, this
260 # function can be safely called by multiple threads at the same time, and
261 # we don't need locking, since deques are thread safe. handle_write in the
262 # asyncore thread will handle the next input message if there are any
264 self.oqueue.append(message)
266 # this method is overriding an asyncore.dispatcher method
268 # read from the socket if we can handle the next requests
269 return self._can_handle_message() and asynchat.async_chat.readable(self)
271 # this method is overriding an asyncore.dispatcher method
273 # the output queue may become full just after we called writable. This only
274 # works if we know we'll have something else waking us up from the select,
275 # in such case, anyway.
276 return asynchat.async_chat.writable(self) or self.oqueue
278 # this method is overriding an asyncore.dispatcher method
279 def handle_write(self):
281 # if we have data in the output queue, then send_message was called.
282 # this means we can process one more message from the input queue, if
284 data = self.oqueue.popleft()
285 self.push(data + self.terminator)
288 self.handle_message(*self.iqueue.popleft())
292 logging.info("Closing connection from %s",
293 FormatAddress(self.family, self.peer_address))
296 # this method is overriding an asyncore.dispatcher method
297 def handle_expt(self):
300 # this method is overriding an asyncore.dispatcher method
301 def handle_error(self):
302 """Log an error in handling any request, and proceed.
305 logging.exception("Error while handling asyncore request")
309 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
310 """An improved asyncore udp socket.
313 def __init__(self, family):
314 """Constructor for AsyncUDPSocket
317 GanetiBaseAsyncoreDispatcher.__init__(self)
319 self._family = family
320 self.create_socket(family, socket.SOCK_DGRAM)
322 # this method is overriding an asyncore.dispatcher method
323 def handle_connect(self):
324 # Python thinks that the first udp message from a source qualifies as a
325 # "connect" and further ones are part of the same connection. We beg to
326 # differ and treat all messages equally.
329 # this method is overriding an asyncore.dispatcher method
330 def handle_read(self):
331 recv_result = utils.IgnoreSignals(self.recvfrom,
332 constants.MAX_UDP_DATA_SIZE)
333 if recv_result is not None:
334 payload, address = recv_result
335 if self._family == socket.AF_INET6:
336 # we ignore 'flow info' and 'scope id' as we don't need them
337 ip, port, _, _ = address
341 self.handle_datagram(payload, ip, port)
343 def handle_datagram(self, payload, ip, port):
344 """Handle an already read udp datagram
347 raise NotImplementedError
349 # this method is overriding an asyncore.dispatcher method
351 # We should check whether we can write to the socket only if we have
352 # something scheduled to be written
353 return bool(self._out_queue)
355 # this method is overriding an asyncore.dispatcher method
356 def handle_write(self):
357 if not self._out_queue:
358 logging.error("handle_write called with empty output queue")
360 (ip, port, payload) = self._out_queue[0]
361 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
362 self._out_queue.pop(0)
364 def enqueue_send(self, ip, port, payload):
365 """Enqueue a datagram to be sent when possible
368 if len(payload) > constants.MAX_UDP_DATA_SIZE:
369 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
370 constants.MAX_UDP_DATA_SIZE))
371 self._out_queue.append((ip, port, payload))
373 def process_next_packet(self, timeout=0):
374 """Process the next datagram, waiting for it if necessary.
377 @param timeout: how long to wait for data
379 @return: True if some data has been handled, False otherwise
382 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
383 if result is not None and result & select.POLLIN:
390 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
391 """A way to notify the asyncore loop that something is going on.
393 If an asyncore daemon is multithreaded when a thread tries to push some data
394 to a socket, the main loop handling asynchronous requests might be sleeping
395 waiting on a select(). To avoid this it can create an instance of the
396 AsyncAwaker, which other threads can use to wake it up.
399 def __init__(self, signal_fn=None):
400 """Constructor for AsyncAwaker
402 @type signal_fn: function
403 @param signal_fn: function to call when awaken
406 GanetiBaseAsyncoreDispatcher.__init__(self)
407 assert signal_fn == None or callable(signal_fn)
408 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
410 self.in_socket.setblocking(0)
411 self.in_socket.shutdown(socket.SHUT_WR)
412 self.out_socket.shutdown(socket.SHUT_RD)
413 self.set_socket(self.in_socket)
414 self.need_signal = True
415 self.signal_fn = signal_fn
416 self.connected = True
418 # this method is overriding an asyncore.dispatcher method
419 def handle_read(self):
420 utils.IgnoreSignals(self.recv, 4096)
423 self.need_signal = True
425 # this method is overriding an asyncore.dispatcher method
427 asyncore.dispatcher.close(self)
428 self.out_socket.close()
431 """Signal the asyncore main loop.
433 Any data we send here will be ignored, but it will cause the select() call
437 # Yes, there is a race condition here. No, we don't care, at worst we're
438 # sending more than one wakeup token, which doesn't harm at all.
440 self.need_signal = False
441 self.out_socket.send("\0")
444 class Mainloop(object):
445 """Generic mainloop for daemons
447 @ivar scheduler: A sched.scheduler object, which can be used to register
452 """Constructs a new Mainloop instance.
455 self._signal_wait = []
456 self.scheduler = AsyncoreScheduler(time.time)
458 @utils.SignalHandled([signal.SIGCHLD])
459 @utils.SignalHandled([signal.SIGTERM])
460 @utils.SignalHandled([signal.SIGINT])
461 def Run(self, signal_handlers=None):
462 """Runs the mainloop.
464 @type signal_handlers: dict
465 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
468 assert isinstance(signal_handlers, dict) and \
469 len(signal_handlers) > 0, \
470 "Broken SignalHandled decorator"
472 # Start actual main loop
474 if not self.scheduler.empty():
477 except SchedulerBreakout:
480 asyncore.loop(count=1, use_poll=True)
482 # Check whether a signal was raised
483 for sig in signal_handlers:
484 handler = signal_handlers[sig]
486 self._CallSignalWaiters(sig)
487 running = sig not in (signal.SIGTERM, signal.SIGINT)
490 def _CallSignalWaiters(self, signum):
491 """Calls all signal waiters for a certain signal.
494 @param signum: Signal number
497 for owner in self._signal_wait:
498 owner.OnSignal(signum)
500 def RegisterSignal(self, owner):
501 """Registers a receiver for signal notifications
503 The receiver must support a "OnSignal(self, signum)" function.
505 @type owner: instance
506 @param owner: Receiver
509 self._signal_wait.append(owner)
512 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
513 multithreaded=False, console_logging=False,
514 default_ssl_cert=None, default_ssl_key=None,
515 user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
516 """Shared main function for daemons.
518 @type daemon_name: string
519 @param daemon_name: daemon name
520 @type optionparser: optparse.OptionParser
521 @param optionparser: initialized optionparser with daemon-specific options
522 (common -f -d options will be handled by this module)
523 @type dirs: list of (string, integer)
524 @param dirs: list of directories that must be created if they don't exist,
525 and the permissions to be used to create them
526 @type check_fn: function which accepts (options, args)
527 @param check_fn: function that checks start conditions and exits if they're
529 @type exec_fn: function which accepts (options, args)
530 @param exec_fn: function that's executed with the daemon's pid file held, and
531 runs the daemon itself.
532 @type multithreaded: bool
533 @param multithreaded: Whether the daemon uses threads
534 @type console_logging: boolean
535 @param console_logging: if True, the daemon will fall back to the system
536 console if logging fails
537 @type default_ssl_cert: string
538 @param default_ssl_cert: Default SSL certificate path
539 @type default_ssl_key: string
540 @param default_ssl_key: Default SSL key path
541 @param user: Default user to run as
543 @param group: Default group to run as
547 optionparser.add_option("-f", "--foreground", dest="fork",
548 help="Don't detach from the current terminal",
549 default=True, action="store_false")
550 optionparser.add_option("-d", "--debug", dest="debug",
551 help="Enable some debug messages",
552 default=False, action="store_true")
553 optionparser.add_option("--syslog", dest="syslog",
554 help="Enable logging to syslog (except debug"
555 " messages); one of 'no', 'yes' or 'only' [%s]" %
556 constants.SYSLOG_USAGE,
557 default=constants.SYSLOG_USAGE,
558 choices=["no", "yes", "only"])
560 if daemon_name in constants.DAEMONS_PORTS:
561 default_bind_address = constants.IP4_ADDRESS_ANY
562 default_port = netutils.GetDaemonPort(daemon_name)
564 # For networked daemons we allow choosing the port and bind address
565 optionparser.add_option("-p", "--port", dest="port",
566 help="Network port (default: %s)" % default_port,
567 default=default_port, type="int")
568 optionparser.add_option("-b", "--bind", dest="bind_address",
569 help=("Bind address (default: %s)" %
570 default_bind_address),
571 default=default_bind_address, metavar="ADDRESS")
573 if default_ssl_key is not None and default_ssl_cert is not None:
574 optionparser.add_option("--no-ssl", dest="ssl",
575 help="Do not secure HTTP protocol with SSL",
576 default=True, action="store_false")
577 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
578 help=("SSL key path (default: %s)" %
580 default=default_ssl_key, type="string",
581 metavar="SSL_KEY_PATH")
582 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
583 help=("SSL certificate path (default: %s)" %
585 default=default_ssl_cert, type="string",
586 metavar="SSL_CERT_PATH")
588 # Disable the use of fork(2) if the daemon uses threads
589 utils.no_fork = multithreaded
591 options, args = optionparser.parse_args()
593 if getattr(options, "ssl", False):
595 "certificate": options.ssl_cert,
596 "key": options.ssl_key,
599 for name, path in ssl_paths.iteritems():
600 if not os.path.isfile(path):
601 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
602 sys.exit(constants.EXIT_FAILURE)
604 # TODO: By initiating http.HttpSslParams here we would only read the files
605 # once and have a proper validation (isfile returns False on directories)
608 if check_fn is not None:
609 check_fn(options, args)
611 utils.EnsureDirs(dirs)
615 uid = pwd.getpwnam(user).pw_uid
616 gid = grp.getgrnam(group).gr_gid
618 raise errors.ConfigurationError("User or group not existing on system:"
619 " %s:%s" % (user, group))
621 utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
623 utils.WritePidFile(daemon_name)
625 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
627 stderr_logging=not options.fork,
628 multithreaded=multithreaded,
630 syslog=options.syslog,
631 console_logging=console_logging)
632 logging.info("%s daemon startup", daemon_name)
633 exec_fn(options, args)
635 utils.RemovePidFile(daemon_name)