4 # Copyright (C) 2006, 2007, 2008, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import netutils
41 from ganeti import ssconf
42 from ganeti import runtime
43 from ganeti import compat
46 class SchedulerBreakout(Exception):
47 """Exception used to get out of the scheduler loop
52 def AsyncoreDelayFunction(timeout):
53 """Asyncore-compatible scheduler delay function.
55 This is a delay function for sched that, rather than actually sleeping,
56 executes asyncore events happening in the meantime.
58 After an event has occurred, rather than returning, it raises a
59 SchedulerBreakout exception, which will force the current scheduler.run()
60 invocation to terminate, so that we can also check for signals. The main loop
61 will then call the scheduler run again, which will allow it to actually
62 process any due events.
64 This is needed because scheduler.run() doesn't support a count=..., as
65 asyncore loop, and the scheduler module documents throwing exceptions from
66 inside the delay function as an allowed usage model.
69 asyncore.loop(timeout=timeout, count=1, use_poll=True)
70 raise SchedulerBreakout()
73 class AsyncoreScheduler(sched.scheduler):
74 """Event scheduler integrated with asyncore
77 def __init__(self, timefunc):
78 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
81 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
82 """Base Ganeti Asyncore Dispacher
85 # this method is overriding an asyncore.dispatcher method
86 def handle_error(self):
87 """Log an error in handling any request, and proceed.
90 logging.exception("Error while handling asyncore request")
92 # this method is overriding an asyncore.dispatcher method
94 """Most of the time we don't want to check for writability.
100 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
101 """A stream server to use with asyncore.
103 Each request is accepted, and then dispatched to a separate asyncore
104 dispatcher to handle.
108 _REQUEST_QUEUE_SIZE = 5
110 def __init__(self, family, address):
111 """Constructor for AsyncUnixStreamSocket
113 @type family: integer
114 @param family: socket family (one of socket.AF_*)
115 @type address: address family dependent
116 @param address: address to bind the socket to
119 GanetiBaseAsyncoreDispatcher.__init__(self)
121 self.create_socket(self.family, socket.SOCK_STREAM)
122 self.set_reuse_addr()
124 self.listen(self._REQUEST_QUEUE_SIZE)
126 # this method is overriding an asyncore.dispatcher method
127 def handle_accept(self):
128 """Accept a new client connection.
130 Creates a new instance of the handler class, which will use asyncore to
134 accept_result = utils.IgnoreSignals(self.accept)
135 if accept_result is not None:
136 connected_socket, client_address = accept_result
137 if self.family == socket.AF_UNIX:
138 # override the client address, as for unix sockets nothing meaningful
139 # is passed in from accept anyway
140 client_address = netutils.GetSocketCredentials(connected_socket)
141 logging.info("Accepted connection from %s",
142 netutils.FormatAddress(client_address, family=self.family))
143 self.handle_connection(connected_socket, client_address)
145 def handle_connection(self, connected_socket, client_address):
146 """Handle an already accepted connection.
149 raise NotImplementedError
152 class AsyncTerminatedMessageStream(asynchat.async_chat):
153 """A terminator separated message stream asyncore module.
155 Handles a stream connection receiving messages terminated by a defined
156 separator. For each complete message handle_message is called.
159 def __init__(self, connected_socket, peer_address, terminator, family,
161 """AsyncTerminatedMessageStream constructor.
163 @type connected_socket: socket.socket
164 @param connected_socket: connected stream socket to receive messages from
165 @param peer_address: family-specific peer address
166 @type terminator: string
167 @param terminator: terminator separating messages in the stream
168 @type family: integer
169 @param family: socket family
170 @type unhandled_limit: integer or None
171 @param unhandled_limit: maximum unanswered messages
174 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
175 # using a positional argument rather than a keyword one.
176 asynchat.async_chat.__init__(self, connected_socket)
177 self.connected_socket = connected_socket
178 # on python 2.4 there is no "family" attribute for the socket class
179 # FIXME: when we move to python 2.5 or above remove the family parameter
180 #self.family = self.connected_socket.family
182 self.peer_address = peer_address
183 self.terminator = terminator
184 self.unhandled_limit = unhandled_limit
185 self.set_terminator(terminator)
187 self.receive_count = 0
189 self.oqueue = collections.deque()
190 self.iqueue = collections.deque()
192 # this method is overriding an asynchat.async_chat method
193 def collect_incoming_data(self, data):
194 self.ibuffer.append(data)
196 def _can_handle_message(self):
197 return (self.unhandled_limit is None or
198 (self.receive_count < self.send_count + self.unhandled_limit) and
201 # this method is overriding an asynchat.async_chat method
202 def found_terminator(self):
203 message = "".join(self.ibuffer)
205 message_id = self.receive_count
206 # We need to increase the receive_count after checking if the message can
207 # be handled, but before calling handle_message
208 can_handle = self._can_handle_message()
209 self.receive_count += 1
211 self.handle_message(message, message_id)
213 self.iqueue.append((message, message_id))
215 def handle_message(self, message, message_id):
216 """Handle a terminated message.
218 @type message: string
219 @param message: message to handle
220 @type message_id: integer
221 @param message_id: stream's message sequence number
225 # TODO: move this method to raise NotImplementedError
226 # raise NotImplementedError
228 def send_message(self, message):
229 """Send a message to the remote peer. This function is thread-safe.
231 @type message: string
232 @param message: message to send, without the terminator
234 @warning: If calling this function from a thread different than the one
235 performing the main asyncore loop, remember that you have to wake that one
239 # If we just append the message we received to the output queue, this
240 # function can be safely called by multiple threads at the same time, and
241 # we don't need locking, since deques are thread safe. handle_write in the
242 # asyncore thread will handle the next input message if there are any
244 self.oqueue.append(message)
246 # this method is overriding an asyncore.dispatcher method
248 # read from the socket if we can handle the next requests
249 return self._can_handle_message() and asynchat.async_chat.readable(self)
251 # this method is overriding an asyncore.dispatcher method
253 # the output queue may become full just after we called writable. This only
254 # works if we know we'll have something else waking us up from the select,
255 # in such case, anyway.
256 return asynchat.async_chat.writable(self) or self.oqueue
258 # this method is overriding an asyncore.dispatcher method
259 def handle_write(self):
261 # if we have data in the output queue, then send_message was called.
262 # this means we can process one more message from the input queue, if
264 data = self.oqueue.popleft()
265 self.push(data + self.terminator)
268 self.handle_message(*self.iqueue.popleft())
272 logging.info("Closing connection from %s",
273 netutils.FormatAddress(self.peer_address, family=self.family))
276 # this method is overriding an asyncore.dispatcher method
277 def handle_expt(self):
280 # this method is overriding an asyncore.dispatcher method
281 def handle_error(self):
282 """Log an error in handling any request, and proceed.
285 logging.exception("Error while handling asyncore request")
289 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
290 """An improved asyncore udp socket.
293 def __init__(self, family):
294 """Constructor for AsyncUDPSocket
297 GanetiBaseAsyncoreDispatcher.__init__(self)
299 self._family = family
300 self.create_socket(family, socket.SOCK_DGRAM)
302 # this method is overriding an asyncore.dispatcher method
303 def handle_connect(self):
304 # Python thinks that the first udp message from a source qualifies as a
305 # "connect" and further ones are part of the same connection. We beg to
306 # differ and treat all messages equally.
309 # this method is overriding an asyncore.dispatcher method
310 def handle_read(self):
311 recv_result = utils.IgnoreSignals(self.recvfrom,
312 constants.MAX_UDP_DATA_SIZE)
313 if recv_result is not None:
314 payload, address = recv_result
315 if self._family == socket.AF_INET6:
316 # we ignore 'flow info' and 'scope id' as we don't need them
317 ip, port, _, _ = address
321 self.handle_datagram(payload, ip, port)
323 def handle_datagram(self, payload, ip, port):
324 """Handle an already read udp datagram
327 raise NotImplementedError
329 # this method is overriding an asyncore.dispatcher method
331 # We should check whether we can write to the socket only if we have
332 # something scheduled to be written
333 return bool(self._out_queue)
335 # this method is overriding an asyncore.dispatcher method
336 def handle_write(self):
337 if not self._out_queue:
338 logging.error("handle_write called with empty output queue")
340 (ip, port, payload) = self._out_queue[0]
341 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
342 self._out_queue.pop(0)
344 def enqueue_send(self, ip, port, payload):
345 """Enqueue a datagram to be sent when possible
348 if len(payload) > constants.MAX_UDP_DATA_SIZE:
349 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
350 constants.MAX_UDP_DATA_SIZE))
351 self._out_queue.append((ip, port, payload))
353 def process_next_packet(self, timeout=0):
354 """Process the next datagram, waiting for it if necessary.
357 @param timeout: how long to wait for data
359 @return: True if some data has been handled, False otherwise
362 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
363 if result is not None and result & select.POLLIN:
370 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
371 """A way to notify the asyncore loop that something is going on.
373 If an asyncore daemon is multithreaded when a thread tries to push some data
374 to a socket, the main loop handling asynchronous requests might be sleeping
375 waiting on a select(). To avoid this it can create an instance of the
376 AsyncAwaker, which other threads can use to wake it up.
379 def __init__(self, signal_fn=None):
380 """Constructor for AsyncAwaker
382 @type signal_fn: function
383 @param signal_fn: function to call when awaken
386 GanetiBaseAsyncoreDispatcher.__init__(self)
387 assert signal_fn == None or callable(signal_fn)
388 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
390 self.in_socket.setblocking(0)
391 self.in_socket.shutdown(socket.SHUT_WR)
392 self.out_socket.shutdown(socket.SHUT_RD)
393 self.set_socket(self.in_socket)
394 self.need_signal = True
395 self.signal_fn = signal_fn
396 self.connected = True
398 # this method is overriding an asyncore.dispatcher method
399 def handle_read(self):
400 utils.IgnoreSignals(self.recv, 4096)
403 self.need_signal = True
405 # this method is overriding an asyncore.dispatcher method
407 asyncore.dispatcher.close(self)
408 self.out_socket.close()
411 """Signal the asyncore main loop.
413 Any data we send here will be ignored, but it will cause the select() call
417 # Yes, there is a race condition here. No, we don't care, at worst we're
418 # sending more than one wakeup token, which doesn't harm at all.
420 self.need_signal = False
421 self.out_socket.send("\0")
424 class Mainloop(object):
425 """Generic mainloop for daemons
427 @ivar scheduler: A sched.scheduler object, which can be used to register
432 """Constructs a new Mainloop instance.
435 self._signal_wait = []
436 self.scheduler = AsyncoreScheduler(time.time)
438 # Resolve uid/gids used
441 @utils.SignalHandled([signal.SIGCHLD])
442 @utils.SignalHandled([signal.SIGTERM])
443 @utils.SignalHandled([signal.SIGINT])
444 def Run(self, signal_handlers=None):
445 """Runs the mainloop.
447 @type signal_handlers: dict
448 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
451 assert isinstance(signal_handlers, dict) and \
452 len(signal_handlers) > 0, \
453 "Broken SignalHandled decorator"
456 # Start actual main loop
458 if not self.scheduler.empty():
461 except SchedulerBreakout:
464 asyncore.loop(count=1, use_poll=True)
466 # Check whether a signal was raised
467 for sig in signal_handlers:
468 handler = signal_handlers[sig]
470 self._CallSignalWaiters(sig)
471 running = sig not in (signal.SIGTERM, signal.SIGINT)
474 def _CallSignalWaiters(self, signum):
475 """Calls all signal waiters for a certain signal.
478 @param signum: Signal number
481 for owner in self._signal_wait:
482 owner.OnSignal(signum)
484 def RegisterSignal(self, owner):
485 """Registers a receiver for signal notifications
487 The receiver must support a "OnSignal(self, signum)" function.
489 @type owner: instance
490 @param owner: Receiver
493 self._signal_wait.append(owner)
496 def _VerifyDaemonUser(daemon_name):
497 """Verifies the process uid matches the configured uid.
499 This method verifies that a daemon is started as the user it is
502 @param daemon_name: The name of daemon to be started
503 @return: A tuple with the first item indicating success or not,
504 the second item current uid and third with expected uid
507 getents = runtime.GetEnts()
508 running_uid = os.getuid()
510 constants.MASTERD: getents.masterd_uid,
511 constants.RAPI: getents.rapi_uid,
512 constants.NODED: getents.noded_uid,
513 constants.CONFD: getents.confd_uid,
516 return (daemon_uids[daemon_name] == running_uid, running_uid,
517 daemon_uids[daemon_name])
520 def _BeautifyError(err):
521 """Try to format an error better.
523 Since we're dealing with daemon startup errors, in many cases this
524 will be due to socket error and such, so we try to format these cases better.
526 @param err: an exception object
528 @return: the formatted error description
532 if isinstance(err, socket.error):
533 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
534 elif isinstance(err, EnvironmentError):
535 if err.filename is None:
536 return "%s (errno=%s)" % (err.strerror, err.errno)
538 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
542 except Exception: # pylint: disable-msg=W0703
543 logging.exception("Error while handling existing error %s", err)
544 return "%s" % str(err)
547 def _HandleSigHup(reopen_cb, signum, frame): # pylint: disable-msg=W0613
548 """Handler for SIGHUP.
550 @param reopen_cb: Callback function for reopening log files
553 assert callable(reopen_cb)
554 logging.info("Reopening log files after receiving SIGHUP")
558 def GenericMain(daemon_name, optionparser,
559 check_fn, prepare_fn, exec_fn,
560 multithreaded=False, console_logging=False,
561 default_ssl_cert=None, default_ssl_key=None):
562 """Shared main function for daemons.
564 @type daemon_name: string
565 @param daemon_name: daemon name
566 @type optionparser: optparse.OptionParser
567 @param optionparser: initialized optionparser with daemon-specific options
568 (common -f -d options will be handled by this module)
569 @type check_fn: function which accepts (options, args)
570 @param check_fn: function that checks start conditions and exits if they're
572 @type prepare_fn: function which accepts (options, args)
573 @param prepare_fn: function that is run before forking, or None;
574 it's result will be passed as the third parameter to exec_fn, or
575 if None was passed in, we will just pass None to exec_fn
576 @type exec_fn: function which accepts (options, args, prepare_results)
577 @param exec_fn: function that's executed with the daemon's pid file held, and
578 runs the daemon itself.
579 @type multithreaded: bool
580 @param multithreaded: Whether the daemon uses threads
581 @type console_logging: boolean
582 @param console_logging: if True, the daemon will fall back to the system
583 console if logging fails
584 @type default_ssl_cert: string
585 @param default_ssl_cert: Default SSL certificate path
586 @type default_ssl_key: string
587 @param default_ssl_key: Default SSL key path
590 optionparser.add_option("-f", "--foreground", dest="fork",
591 help="Don't detach from the current terminal",
592 default=True, action="store_false")
593 optionparser.add_option("-d", "--debug", dest="debug",
594 help="Enable some debug messages",
595 default=False, action="store_true")
596 optionparser.add_option("--syslog", dest="syslog",
597 help="Enable logging to syslog (except debug"
598 " messages); one of 'no', 'yes' or 'only' [%s]" %
599 constants.SYSLOG_USAGE,
600 default=constants.SYSLOG_USAGE,
601 choices=["no", "yes", "only"])
603 if daemon_name in constants.DAEMONS_PORTS:
604 default_bind_address = constants.IP4_ADDRESS_ANY
605 family = ssconf.SimpleStore().GetPrimaryIPFamily()
606 # family will default to AF_INET if there is no ssconf file (e.g. when
607 # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
608 # <= 2.2 can not be AF_INET6
609 if family == netutils.IP6Address.family:
610 default_bind_address = constants.IP6_ADDRESS_ANY
612 default_port = netutils.GetDaemonPort(daemon_name)
614 # For networked daemons we allow choosing the port and bind address
615 optionparser.add_option("-p", "--port", dest="port",
616 help="Network port (default: %s)" % default_port,
617 default=default_port, type="int")
618 optionparser.add_option("-b", "--bind", dest="bind_address",
619 help=("Bind address (default: '%s')" %
620 default_bind_address),
621 default=default_bind_address, metavar="ADDRESS")
623 if default_ssl_key is not None and default_ssl_cert is not None:
624 optionparser.add_option("--no-ssl", dest="ssl",
625 help="Do not secure HTTP protocol with SSL",
626 default=True, action="store_false")
627 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
628 help=("SSL key path (default: %s)" %
630 default=default_ssl_key, type="string",
631 metavar="SSL_KEY_PATH")
632 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
633 help=("SSL certificate path (default: %s)" %
635 default=default_ssl_cert, type="string",
636 metavar="SSL_CERT_PATH")
638 # Disable the use of fork(2) if the daemon uses threads
642 options, args = optionparser.parse_args()
644 if getattr(options, "ssl", False):
646 "certificate": options.ssl_cert,
647 "key": options.ssl_key,
650 for name, path in ssl_paths.iteritems():
651 if not os.path.isfile(path):
652 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
653 sys.exit(constants.EXIT_FAILURE)
655 # TODO: By initiating http.HttpSslParams here we would only read the files
656 # once and have a proper validation (isfile returns False on directories)
659 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
661 msg = ("%s started using wrong user ID (%d), expected %d" %
662 (daemon_name, running_uid, expected_uid))
663 print >> sys.stderr, msg
664 sys.exit(constants.EXIT_FAILURE)
666 if check_fn is not None:
667 check_fn(options, args)
671 wpipe = utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
676 utils.SetupLogging(constants.DAEMONS_LOGFILES[daemon_name], daemon_name,
678 stderr_logging=not options.fork,
679 multithreaded=multithreaded,
680 syslog=options.syslog,
681 console_logging=console_logging)
683 # Reopen log file(s) on SIGHUP
684 signal.signal(signal.SIGHUP, compat.partial(_HandleSigHup, log_reopen_fn))
686 utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
689 logging.info("%s daemon startup", daemon_name)
690 if callable(prepare_fn):
691 prep_results = prepare_fn(options, args)
694 except Exception, err:
695 utils.WriteErrorToFD(wpipe, _BeautifyError(err))
698 if wpipe is not None:
699 # we're done with the preparation phase, we close the pipe to
700 # let the parent know it's safe to exit
703 exec_fn(options, args, prep_results)
705 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))