4 # Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import netutils
41 from ganeti import ssconf
42 from ganeti import runtime
45 class SchedulerBreakout(Exception):
46 """Exception used to get out of the scheduler loop
51 def AsyncoreDelayFunction(timeout):
52 """Asyncore-compatible scheduler delay function.
54 This is a delay function for sched that, rather than actually sleeping,
55 executes asyncore events happening in the meantime.
57 After an event has occurred, rather than returning, it raises a
58 SchedulerBreakout exception, which will force the current scheduler.run()
59 invocation to terminate, so that we can also check for signals. The main loop
60 will then call the scheduler run again, which will allow it to actually
61 process any due events.
63 This is needed because scheduler.run() doesn't support a count=..., as
64 asyncore loop, and the scheduler module documents throwing exceptions from
65 inside the delay function as an allowed usage model.
68 asyncore.loop(timeout=timeout, count=1, use_poll=True)
69 raise SchedulerBreakout()
72 class AsyncoreScheduler(sched.scheduler):
73 """Event scheduler integrated with asyncore
76 def __init__(self, timefunc):
77 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
80 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
81 """Base Ganeti Asyncore Dispacher
84 # this method is overriding an asyncore.dispatcher method
85 def handle_error(self):
86 """Log an error in handling any request, and proceed.
89 logging.exception("Error while handling asyncore request")
91 # this method is overriding an asyncore.dispatcher method
93 """Most of the time we don't want to check for writability.
99 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
100 """A stream server to use with asyncore.
102 Each request is accepted, and then dispatched to a separate asyncore
103 dispatcher to handle.
107 _REQUEST_QUEUE_SIZE = 5
109 def __init__(self, family, address):
110 """Constructor for AsyncUnixStreamSocket
112 @type family: integer
113 @param family: socket family (one of socket.AF_*)
114 @type address: address family dependent
115 @param address: address to bind the socket to
118 GanetiBaseAsyncoreDispatcher.__init__(self)
120 self.create_socket(self.family, socket.SOCK_STREAM)
121 self.set_reuse_addr()
123 self.listen(self._REQUEST_QUEUE_SIZE)
125 # this method is overriding an asyncore.dispatcher method
126 def handle_accept(self):
127 """Accept a new client connection.
129 Creates a new instance of the handler class, which will use asyncore to
133 accept_result = utils.IgnoreSignals(self.accept)
134 if accept_result is not None:
135 connected_socket, client_address = accept_result
136 if self.family == socket.AF_UNIX:
137 # override the client address, as for unix sockets nothing meaningful
138 # is passed in from accept anyway
139 client_address = netutils.GetSocketCredentials(connected_socket)
140 logging.info("Accepted connection from %s",
141 netutils.FormatAddress(client_address, family=self.family))
142 self.handle_connection(connected_socket, client_address)
144 def handle_connection(self, connected_socket, client_address):
145 """Handle an already accepted connection.
148 raise NotImplementedError
151 class AsyncTerminatedMessageStream(asynchat.async_chat):
152 """A terminator separated message stream asyncore module.
154 Handles a stream connection receiving messages terminated by a defined
155 separator. For each complete message handle_message is called.
158 def __init__(self, connected_socket, peer_address, terminator, family,
160 """AsyncTerminatedMessageStream constructor.
162 @type connected_socket: socket.socket
163 @param connected_socket: connected stream socket to receive messages from
164 @param peer_address: family-specific peer address
165 @type terminator: string
166 @param terminator: terminator separating messages in the stream
167 @type family: integer
168 @param family: socket family
169 @type unhandled_limit: integer or None
170 @param unhandled_limit: maximum unanswered messages
173 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
174 # using a positional argument rather than a keyword one.
175 asynchat.async_chat.__init__(self, connected_socket)
176 self.connected_socket = connected_socket
177 # on python 2.4 there is no "family" attribute for the socket class
178 # FIXME: when we move to python 2.5 or above remove the family parameter
179 #self.family = self.connected_socket.family
181 self.peer_address = peer_address
182 self.terminator = terminator
183 self.unhandled_limit = unhandled_limit
184 self.set_terminator(terminator)
186 self.receive_count = 0
188 self.oqueue = collections.deque()
189 self.iqueue = collections.deque()
191 # this method is overriding an asynchat.async_chat method
192 def collect_incoming_data(self, data):
193 self.ibuffer.append(data)
195 def _can_handle_message(self):
196 return (self.unhandled_limit is None or
197 (self.receive_count < self.send_count + self.unhandled_limit) and
200 # this method is overriding an asynchat.async_chat method
201 def found_terminator(self):
202 message = "".join(self.ibuffer)
204 message_id = self.receive_count
205 # We need to increase the receive_count after checking if the message can
206 # be handled, but before calling handle_message
207 can_handle = self._can_handle_message()
208 self.receive_count += 1
210 self.handle_message(message, message_id)
212 self.iqueue.append((message, message_id))
214 def handle_message(self, message, message_id):
215 """Handle a terminated message.
217 @type message: string
218 @param message: message to handle
219 @type message_id: integer
220 @param message_id: stream's message sequence number
224 # TODO: move this method to raise NotImplementedError
225 # raise NotImplementedError
227 def send_message(self, message):
228 """Send a message to the remote peer. This function is thread-safe.
230 @type message: string
231 @param message: message to send, without the terminator
233 @warning: If calling this function from a thread different than the one
234 performing the main asyncore loop, remember that you have to wake that one
238 # If we just append the message we received to the output queue, this
239 # function can be safely called by multiple threads at the same time, and
240 # we don't need locking, since deques are thread safe. handle_write in the
241 # asyncore thread will handle the next input message if there are any
243 self.oqueue.append(message)
245 # this method is overriding an asyncore.dispatcher method
247 # read from the socket if we can handle the next requests
248 return self._can_handle_message() and asynchat.async_chat.readable(self)
250 # this method is overriding an asyncore.dispatcher method
252 # the output queue may become full just after we called writable. This only
253 # works if we know we'll have something else waking us up from the select,
254 # in such case, anyway.
255 return asynchat.async_chat.writable(self) or self.oqueue
257 # this method is overriding an asyncore.dispatcher method
258 def handle_write(self):
260 # if we have data in the output queue, then send_message was called.
261 # this means we can process one more message from the input queue, if
263 data = self.oqueue.popleft()
264 self.push(data + self.terminator)
267 self.handle_message(*self.iqueue.popleft())
271 logging.info("Closing connection from %s",
272 netutils.FormatAddress(self.peer_address, family=self.family))
275 # this method is overriding an asyncore.dispatcher method
276 def handle_expt(self):
279 # this method is overriding an asyncore.dispatcher method
280 def handle_error(self):
281 """Log an error in handling any request, and proceed.
284 logging.exception("Error while handling asyncore request")
288 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
289 """An improved asyncore udp socket.
292 def __init__(self, family):
293 """Constructor for AsyncUDPSocket
296 GanetiBaseAsyncoreDispatcher.__init__(self)
298 self._family = family
299 self.create_socket(family, socket.SOCK_DGRAM)
301 # this method is overriding an asyncore.dispatcher method
302 def handle_connect(self):
303 # Python thinks that the first udp message from a source qualifies as a
304 # "connect" and further ones are part of the same connection. We beg to
305 # differ and treat all messages equally.
308 # this method is overriding an asyncore.dispatcher method
309 def handle_read(self):
310 recv_result = utils.IgnoreSignals(self.recvfrom,
311 constants.MAX_UDP_DATA_SIZE)
312 if recv_result is not None:
313 payload, address = recv_result
314 if self._family == socket.AF_INET6:
315 # we ignore 'flow info' and 'scope id' as we don't need them
316 ip, port, _, _ = address
320 self.handle_datagram(payload, ip, port)
322 def handle_datagram(self, payload, ip, port):
323 """Handle an already read udp datagram
326 raise NotImplementedError
328 # this method is overriding an asyncore.dispatcher method
330 # We should check whether we can write to the socket only if we have
331 # something scheduled to be written
332 return bool(self._out_queue)
334 # this method is overriding an asyncore.dispatcher method
335 def handle_write(self):
336 if not self._out_queue:
337 logging.error("handle_write called with empty output queue")
339 (ip, port, payload) = self._out_queue[0]
340 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
341 self._out_queue.pop(0)
343 def enqueue_send(self, ip, port, payload):
344 """Enqueue a datagram to be sent when possible
347 if len(payload) > constants.MAX_UDP_DATA_SIZE:
348 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
349 constants.MAX_UDP_DATA_SIZE))
350 self._out_queue.append((ip, port, payload))
352 def process_next_packet(self, timeout=0):
353 """Process the next datagram, waiting for it if necessary.
356 @param timeout: how long to wait for data
358 @return: True if some data has been handled, False otherwise
361 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
362 if result is not None and result & select.POLLIN:
369 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
370 """A way to notify the asyncore loop that something is going on.
372 If an asyncore daemon is multithreaded when a thread tries to push some data
373 to a socket, the main loop handling asynchronous requests might be sleeping
374 waiting on a select(). To avoid this it can create an instance of the
375 AsyncAwaker, which other threads can use to wake it up.
378 def __init__(self, signal_fn=None):
379 """Constructor for AsyncAwaker
381 @type signal_fn: function
382 @param signal_fn: function to call when awaken
385 GanetiBaseAsyncoreDispatcher.__init__(self)
386 assert signal_fn == None or callable(signal_fn)
387 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
389 self.in_socket.setblocking(0)
390 self.in_socket.shutdown(socket.SHUT_WR)
391 self.out_socket.shutdown(socket.SHUT_RD)
392 self.set_socket(self.in_socket)
393 self.need_signal = True
394 self.signal_fn = signal_fn
395 self.connected = True
397 # this method is overriding an asyncore.dispatcher method
398 def handle_read(self):
399 utils.IgnoreSignals(self.recv, 4096)
402 self.need_signal = True
404 # this method is overriding an asyncore.dispatcher method
406 asyncore.dispatcher.close(self)
407 self.out_socket.close()
410 """Signal the asyncore main loop.
412 Any data we send here will be ignored, but it will cause the select() call
416 # Yes, there is a race condition here. No, we don't care, at worst we're
417 # sending more than one wakeup token, which doesn't harm at all.
419 self.need_signal = False
420 self.out_socket.send("\0")
423 class Mainloop(object):
424 """Generic mainloop for daemons
426 @ivar scheduler: A sched.scheduler object, which can be used to register
431 """Constructs a new Mainloop instance.
434 self._signal_wait = []
435 self.scheduler = AsyncoreScheduler(time.time)
437 @utils.SignalHandled([signal.SIGCHLD])
438 @utils.SignalHandled([signal.SIGTERM])
439 @utils.SignalHandled([signal.SIGINT])
440 def Run(self, signal_handlers=None):
441 """Runs the mainloop.
443 @type signal_handlers: dict
444 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
447 assert isinstance(signal_handlers, dict) and \
448 len(signal_handlers) > 0, \
449 "Broken SignalHandled decorator"
451 # Start actual main loop
453 if not self.scheduler.empty():
456 except SchedulerBreakout:
459 asyncore.loop(count=1, use_poll=True)
461 # Check whether a signal was raised
462 for sig in signal_handlers:
463 handler = signal_handlers[sig]
465 self._CallSignalWaiters(sig)
466 running = sig not in (signal.SIGTERM, signal.SIGINT)
469 def _CallSignalWaiters(self, signum):
470 """Calls all signal waiters for a certain signal.
473 @param signum: Signal number
476 for owner in self._signal_wait:
477 owner.OnSignal(signum)
479 def RegisterSignal(self, owner):
480 """Registers a receiver for signal notifications
482 The receiver must support a "OnSignal(self, signum)" function.
484 @type owner: instance
485 @param owner: Receiver
488 self._signal_wait.append(owner)
491 def _VerifyDaemonUser(daemon_name):
492 """Verifies the process uid matches the configured uid.
494 This method verifies that a daemon is started as the user it is
497 @param daemon_name: The name of daemon to be started
498 @return: A tuple with the first item indicating success or not,
499 the second item current uid and third with expected uid
502 getents = runtime.GetEnts()
503 running_uid = os.getuid()
505 constants.MASTERD: getents.masterd_uid,
506 constants.RAPI: getents.rapi_uid,
507 constants.NODED: getents.noded_uid,
508 constants.CONFD: getents.confd_uid,
511 return (daemon_uids[daemon_name] == running_uid, running_uid,
512 daemon_uids[daemon_name])
515 def _BeautifyError(err):
516 """Try to format an error better.
518 Since we're dealing with daemon startup errors, in many cases this
519 will be due to socket error and such, so we try to format these cases better.
521 @param err: an exception object
523 @return: the formatted error description
527 if isinstance(err, socket.error):
528 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
529 elif isinstance(err, EnvironmentError):
530 if err.filename is None:
531 return "%s (errno=%s)" % (err.strerror, err.errno)
533 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
537 except Exception: # pylint: disable-msg=W0703
538 logging.exception("Error while handling existing error %s", err)
539 return "%s" % str(err)
542 def GenericMain(daemon_name, optionparser,
543 check_fn, prepare_fn, exec_fn,
544 multithreaded=False, console_logging=False,
545 default_ssl_cert=None, default_ssl_key=None):
546 """Shared main function for daemons.
548 @type daemon_name: string
549 @param daemon_name: daemon name
550 @type optionparser: optparse.OptionParser
551 @param optionparser: initialized optionparser with daemon-specific options
552 (common -f -d options will be handled by this module)
553 @type check_fn: function which accepts (options, args)
554 @param check_fn: function that checks start conditions and exits if they're
556 @type prepare_fn: function which accepts (options, args)
557 @param prepare_fn: function that is run before forking, or None;
558 it's result will be passed as the third parameter to exec_fn, or
559 if None was passed in, we will just pass None to exec_fn
560 @type exec_fn: function which accepts (options, args, prepare_results)
561 @param exec_fn: function that's executed with the daemon's pid file held, and
562 runs the daemon itself.
563 @type multithreaded: bool
564 @param multithreaded: Whether the daemon uses threads
565 @type console_logging: boolean
566 @param console_logging: if True, the daemon will fall back to the system
567 console if logging fails
568 @type default_ssl_cert: string
569 @param default_ssl_cert: Default SSL certificate path
570 @type default_ssl_key: string
571 @param default_ssl_key: Default SSL key path
574 optionparser.add_option("-f", "--foreground", dest="fork",
575 help="Don't detach from the current terminal",
576 default=True, action="store_false")
577 optionparser.add_option("-d", "--debug", dest="debug",
578 help="Enable some debug messages",
579 default=False, action="store_true")
580 optionparser.add_option("--syslog", dest="syslog",
581 help="Enable logging to syslog (except debug"
582 " messages); one of 'no', 'yes' or 'only' [%s]" %
583 constants.SYSLOG_USAGE,
584 default=constants.SYSLOG_USAGE,
585 choices=["no", "yes", "only"])
587 if daemon_name in constants.DAEMONS_PORTS:
588 default_bind_address = constants.IP4_ADDRESS_ANY
589 family = ssconf.SimpleStore().GetPrimaryIPFamily()
590 # family will default to AF_INET if there is no ssconf file (e.g. when
591 # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
592 # <= 2.2 can not be AF_INET6
593 if family == netutils.IP6Address.family:
594 default_bind_address = constants.IP6_ADDRESS_ANY
596 default_port = netutils.GetDaemonPort(daemon_name)
598 # For networked daemons we allow choosing the port and bind address
599 optionparser.add_option("-p", "--port", dest="port",
600 help="Network port (default: %s)" % default_port,
601 default=default_port, type="int")
602 optionparser.add_option("-b", "--bind", dest="bind_address",
603 help=("Bind address (default: '%s')" %
604 default_bind_address),
605 default=default_bind_address, metavar="ADDRESS")
607 if default_ssl_key is not None and default_ssl_cert is not None:
608 optionparser.add_option("--no-ssl", dest="ssl",
609 help="Do not secure HTTP protocol with SSL",
610 default=True, action="store_false")
611 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
612 help=("SSL key path (default: %s)" %
614 default=default_ssl_key, type="string",
615 metavar="SSL_KEY_PATH")
616 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
617 help=("SSL certificate path (default: %s)" %
619 default=default_ssl_cert, type="string",
620 metavar="SSL_CERT_PATH")
622 # Disable the use of fork(2) if the daemon uses threads
623 utils.no_fork = multithreaded
625 options, args = optionparser.parse_args()
627 if getattr(options, "ssl", False):
629 "certificate": options.ssl_cert,
630 "key": options.ssl_key,
633 for name, path in ssl_paths.iteritems():
634 if not os.path.isfile(path):
635 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
636 sys.exit(constants.EXIT_FAILURE)
638 # TODO: By initiating http.HttpSslParams here we would only read the files
639 # once and have a proper validation (isfile returns False on directories)
642 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
644 msg = ("%s started using wrong user ID (%d), expected %d" %
645 (daemon_name, running_uid, expected_uid))
646 print >> sys.stderr, msg
647 sys.exit(constants.EXIT_FAILURE)
649 if check_fn is not None:
650 check_fn(options, args)
654 wpipe = utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
658 utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
661 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
663 stderr_logging=not options.fork,
664 multithreaded=multithreaded,
666 syslog=options.syslog,
667 console_logging=console_logging)
668 if callable(prepare_fn):
669 prep_results = prepare_fn(options, args)
672 logging.info("%s daemon startup", daemon_name)
673 except Exception, err:
674 utils.WriteErrorToFD(wpipe, _BeautifyError(err))
677 if wpipe is not None:
678 # we're done with the preparation phase, we close the pipe to
679 # let the parent know it's safe to exit
682 exec_fn(options, args, prep_results)
684 utils.RemovePidFile(daemon_name)