4 # Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module with helper classes and functions for daemons"""
39 from ganeti import utils
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import netutils
43 from ganeti import ssconf
46 _DEFAULT_RUN_USER = "root"
47 _DEFAULT_RUN_GROUP = "root"
50 class SchedulerBreakout(Exception):
51 """Exception used to get out of the scheduler loop
56 def AsyncoreDelayFunction(timeout):
57 """Asyncore-compatible scheduler delay function.
59 This is a delay function for sched that, rather than actually sleeping,
60 executes asyncore events happening in the meantime.
62 After an event has occurred, rather than returning, it raises a
63 SchedulerBreakout exception, which will force the current scheduler.run()
64 invocation to terminate, so that we can also check for signals. The main loop
65 will then call the scheduler run again, which will allow it to actually
66 process any due events.
68 This is needed because scheduler.run() doesn't support a count=..., as
69 asyncore loop, and the scheduler module documents throwing exceptions from
70 inside the delay function as an allowed usage model.
73 asyncore.loop(timeout=timeout, count=1, use_poll=True)
74 raise SchedulerBreakout()
77 class AsyncoreScheduler(sched.scheduler):
78 """Event scheduler integrated with asyncore
81 def __init__(self, timefunc):
82 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
85 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
86 """Base Ganeti Asyncore Dispacher
89 # this method is overriding an asyncore.dispatcher method
90 def handle_error(self):
91 """Log an error in handling any request, and proceed.
94 logging.exception("Error while handling asyncore request")
96 # this method is overriding an asyncore.dispatcher method
98 """Most of the time we don't want to check for writability.
104 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
105 """A stream server to use with asyncore.
107 Each request is accepted, and then dispatched to a separate asyncore
108 dispatcher to handle.
112 _REQUEST_QUEUE_SIZE = 5
114 def __init__(self, family, address):
115 """Constructor for AsyncUnixStreamSocket
117 @type family: integer
118 @param family: socket family (one of socket.AF_*)
119 @type address: address family dependent
120 @param address: address to bind the socket to
123 GanetiBaseAsyncoreDispatcher.__init__(self)
125 self.create_socket(self.family, socket.SOCK_STREAM)
126 self.set_reuse_addr()
128 self.listen(self._REQUEST_QUEUE_SIZE)
130 # this method is overriding an asyncore.dispatcher method
131 def handle_accept(self):
132 """Accept a new client connection.
134 Creates a new instance of the handler class, which will use asyncore to
138 accept_result = utils.IgnoreSignals(self.accept)
139 if accept_result is not None:
140 connected_socket, client_address = accept_result
141 if self.family == socket.AF_UNIX:
142 # override the client address, as for unix sockets nothing meaningful
143 # is passed in from accept anyway
144 client_address = netutils.GetSocketCredentials(connected_socket)
145 logging.info("Accepted connection from %s",
146 netutils.FormatAddress(client_address, family=self.family))
147 self.handle_connection(connected_socket, client_address)
149 def handle_connection(self, connected_socket, client_address):
150 """Handle an already accepted connection.
153 raise NotImplementedError
156 class AsyncTerminatedMessageStream(asynchat.async_chat):
157 """A terminator separated message stream asyncore module.
159 Handles a stream connection receiving messages terminated by a defined
160 separator. For each complete message handle_message is called.
163 def __init__(self, connected_socket, peer_address, terminator, family,
165 """AsyncTerminatedMessageStream constructor.
167 @type connected_socket: socket.socket
168 @param connected_socket: connected stream socket to receive messages from
169 @param peer_address: family-specific peer address
170 @type terminator: string
171 @param terminator: terminator separating messages in the stream
172 @type family: integer
173 @param family: socket family
174 @type unhandled_limit: integer or None
175 @param unhandled_limit: maximum unanswered messages
178 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
179 # using a positional argument rather than a keyword one.
180 asynchat.async_chat.__init__(self, connected_socket)
181 self.connected_socket = connected_socket
182 # on python 2.4 there is no "family" attribute for the socket class
183 # FIXME: when we move to python 2.5 or above remove the family parameter
184 #self.family = self.connected_socket.family
186 self.peer_address = peer_address
187 self.terminator = terminator
188 self.unhandled_limit = unhandled_limit
189 self.set_terminator(terminator)
191 self.receive_count = 0
193 self.oqueue = collections.deque()
194 self.iqueue = collections.deque()
196 # this method is overriding an asynchat.async_chat method
197 def collect_incoming_data(self, data):
198 self.ibuffer.append(data)
200 def _can_handle_message(self):
201 return (self.unhandled_limit is None or
202 (self.receive_count < self.send_count + self.unhandled_limit) and
205 # this method is overriding an asynchat.async_chat method
206 def found_terminator(self):
207 message = "".join(self.ibuffer)
209 message_id = self.receive_count
210 # We need to increase the receive_count after checking if the message can
211 # be handled, but before calling handle_message
212 can_handle = self._can_handle_message()
213 self.receive_count += 1
215 self.handle_message(message, message_id)
217 self.iqueue.append((message, message_id))
219 def handle_message(self, message, message_id):
220 """Handle a terminated message.
222 @type message: string
223 @param message: message to handle
224 @type message_id: integer
225 @param message_id: stream's message sequence number
229 # TODO: move this method to raise NotImplementedError
230 # raise NotImplementedError
232 def send_message(self, message):
233 """Send a message to the remote peer. This function is thread-safe.
235 @type message: string
236 @param message: message to send, without the terminator
238 @warning: If calling this function from a thread different than the one
239 performing the main asyncore loop, remember that you have to wake that one
243 # If we just append the message we received to the output queue, this
244 # function can be safely called by multiple threads at the same time, and
245 # we don't need locking, since deques are thread safe. handle_write in the
246 # asyncore thread will handle the next input message if there are any
248 self.oqueue.append(message)
250 # this method is overriding an asyncore.dispatcher method
252 # read from the socket if we can handle the next requests
253 return self._can_handle_message() and asynchat.async_chat.readable(self)
255 # this method is overriding an asyncore.dispatcher method
257 # the output queue may become full just after we called writable. This only
258 # works if we know we'll have something else waking us up from the select,
259 # in such case, anyway.
260 return asynchat.async_chat.writable(self) or self.oqueue
262 # this method is overriding an asyncore.dispatcher method
263 def handle_write(self):
265 # if we have data in the output queue, then send_message was called.
266 # this means we can process one more message from the input queue, if
268 data = self.oqueue.popleft()
269 self.push(data + self.terminator)
272 self.handle_message(*self.iqueue.popleft())
276 logging.info("Closing connection from %s",
277 netutils.FormatAddress(self.peer_address, family=self.family))
280 # this method is overriding an asyncore.dispatcher method
281 def handle_expt(self):
284 # this method is overriding an asyncore.dispatcher method
285 def handle_error(self):
286 """Log an error in handling any request, and proceed.
289 logging.exception("Error while handling asyncore request")
293 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
294 """An improved asyncore udp socket.
297 def __init__(self, family):
298 """Constructor for AsyncUDPSocket
301 GanetiBaseAsyncoreDispatcher.__init__(self)
303 self._family = family
304 self.create_socket(family, socket.SOCK_DGRAM)
306 # this method is overriding an asyncore.dispatcher method
307 def handle_connect(self):
308 # Python thinks that the first udp message from a source qualifies as a
309 # "connect" and further ones are part of the same connection. We beg to
310 # differ and treat all messages equally.
313 # this method is overriding an asyncore.dispatcher method
314 def handle_read(self):
315 recv_result = utils.IgnoreSignals(self.recvfrom,
316 constants.MAX_UDP_DATA_SIZE)
317 if recv_result is not None:
318 payload, address = recv_result
319 if self._family == socket.AF_INET6:
320 # we ignore 'flow info' and 'scope id' as we don't need them
321 ip, port, _, _ = address
325 self.handle_datagram(payload, ip, port)
327 def handle_datagram(self, payload, ip, port):
328 """Handle an already read udp datagram
331 raise NotImplementedError
333 # this method is overriding an asyncore.dispatcher method
335 # We should check whether we can write to the socket only if we have
336 # something scheduled to be written
337 return bool(self._out_queue)
339 # this method is overriding an asyncore.dispatcher method
340 def handle_write(self):
341 if not self._out_queue:
342 logging.error("handle_write called with empty output queue")
344 (ip, port, payload) = self._out_queue[0]
345 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
346 self._out_queue.pop(0)
348 def enqueue_send(self, ip, port, payload):
349 """Enqueue a datagram to be sent when possible
352 if len(payload) > constants.MAX_UDP_DATA_SIZE:
353 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
354 constants.MAX_UDP_DATA_SIZE))
355 self._out_queue.append((ip, port, payload))
357 def process_next_packet(self, timeout=0):
358 """Process the next datagram, waiting for it if necessary.
361 @param timeout: how long to wait for data
363 @return: True if some data has been handled, False otherwise
366 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
367 if result is not None and result & select.POLLIN:
374 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
375 """A way to notify the asyncore loop that something is going on.
377 If an asyncore daemon is multithreaded when a thread tries to push some data
378 to a socket, the main loop handling asynchronous requests might be sleeping
379 waiting on a select(). To avoid this it can create an instance of the
380 AsyncAwaker, which other threads can use to wake it up.
383 def __init__(self, signal_fn=None):
384 """Constructor for AsyncAwaker
386 @type signal_fn: function
387 @param signal_fn: function to call when awaken
390 GanetiBaseAsyncoreDispatcher.__init__(self)
391 assert signal_fn == None or callable(signal_fn)
392 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
394 self.in_socket.setblocking(0)
395 self.in_socket.shutdown(socket.SHUT_WR)
396 self.out_socket.shutdown(socket.SHUT_RD)
397 self.set_socket(self.in_socket)
398 self.need_signal = True
399 self.signal_fn = signal_fn
400 self.connected = True
402 # this method is overriding an asyncore.dispatcher method
403 def handle_read(self):
404 utils.IgnoreSignals(self.recv, 4096)
407 self.need_signal = True
409 # this method is overriding an asyncore.dispatcher method
411 asyncore.dispatcher.close(self)
412 self.out_socket.close()
415 """Signal the asyncore main loop.
417 Any data we send here will be ignored, but it will cause the select() call
421 # Yes, there is a race condition here. No, we don't care, at worst we're
422 # sending more than one wakeup token, which doesn't harm at all.
424 self.need_signal = False
425 self.out_socket.send("\0")
428 class Mainloop(object):
429 """Generic mainloop for daemons
431 @ivar scheduler: A sched.scheduler object, which can be used to register
436 """Constructs a new Mainloop instance.
439 self._signal_wait = []
440 self.scheduler = AsyncoreScheduler(time.time)
442 @utils.SignalHandled([signal.SIGCHLD])
443 @utils.SignalHandled([signal.SIGTERM])
444 @utils.SignalHandled([signal.SIGINT])
445 def Run(self, signal_handlers=None):
446 """Runs the mainloop.
448 @type signal_handlers: dict
449 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
452 assert isinstance(signal_handlers, dict) and \
453 len(signal_handlers) > 0, \
454 "Broken SignalHandled decorator"
456 # Start actual main loop
458 if not self.scheduler.empty():
461 except SchedulerBreakout:
464 asyncore.loop(count=1, use_poll=True)
466 # Check whether a signal was raised
467 for sig in signal_handlers:
468 handler = signal_handlers[sig]
470 self._CallSignalWaiters(sig)
471 running = sig not in (signal.SIGTERM, signal.SIGINT)
474 def _CallSignalWaiters(self, signum):
475 """Calls all signal waiters for a certain signal.
478 @param signum: Signal number
481 for owner in self._signal_wait:
482 owner.OnSignal(signum)
484 def RegisterSignal(self, owner):
485 """Registers a receiver for signal notifications
487 The receiver must support a "OnSignal(self, signum)" function.
489 @type owner: instance
490 @param owner: Receiver
493 self._signal_wait.append(owner)
496 def GenericMain(daemon_name, optionparser, dirs, check_fn, exec_fn,
497 multithreaded=False, console_logging=False,
498 default_ssl_cert=None, default_ssl_key=None,
499 user=_DEFAULT_RUN_USER, group=_DEFAULT_RUN_GROUP):
500 """Shared main function for daemons.
502 @type daemon_name: string
503 @param daemon_name: daemon name
504 @type optionparser: optparse.OptionParser
505 @param optionparser: initialized optionparser with daemon-specific options
506 (common -f -d options will be handled by this module)
507 @type dirs: list of (string, integer)
508 @param dirs: list of directories that must be created if they don't exist,
509 and the permissions to be used to create them
510 @type check_fn: function which accepts (options, args)
511 @param check_fn: function that checks start conditions and exits if they're
513 @type exec_fn: function which accepts (options, args)
514 @param exec_fn: function that's executed with the daemon's pid file held, and
515 runs the daemon itself.
516 @type multithreaded: bool
517 @param multithreaded: Whether the daemon uses threads
518 @type console_logging: boolean
519 @param console_logging: if True, the daemon will fall back to the system
520 console if logging fails
521 @type default_ssl_cert: string
522 @param default_ssl_cert: Default SSL certificate path
523 @type default_ssl_key: string
524 @param default_ssl_key: Default SSL key path
525 @param user: Default user to run as
527 @param group: Default group to run as
531 optionparser.add_option("-f", "--foreground", dest="fork",
532 help="Don't detach from the current terminal",
533 default=True, action="store_false")
534 optionparser.add_option("-d", "--debug", dest="debug",
535 help="Enable some debug messages",
536 default=False, action="store_true")
537 optionparser.add_option("--syslog", dest="syslog",
538 help="Enable logging to syslog (except debug"
539 " messages); one of 'no', 'yes' or 'only' [%s]" %
540 constants.SYSLOG_USAGE,
541 default=constants.SYSLOG_USAGE,
542 choices=["no", "yes", "only"])
544 if daemon_name in constants.DAEMONS_PORTS:
545 default_bind_address = constants.IP4_ADDRESS_ANY
546 family = ssconf.SimpleStore().GetPrimaryIPFamily()
547 # family will default to AF_INET if there is no ssconf file (e.g. when
548 # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
549 # <= 2.2 can not be AF_INET6
550 if family == netutils.IP6Address.family:
551 default_bind_address = constants.IP6_ADDRESS_ANY
553 default_port = netutils.GetDaemonPort(daemon_name)
555 # For networked daemons we allow choosing the port and bind address
556 optionparser.add_option("-p", "--port", dest="port",
557 help="Network port (default: %s)" % default_port,
558 default=default_port, type="int")
559 optionparser.add_option("-b", "--bind", dest="bind_address",
560 help=("Bind address (default: '%s')" %
561 default_bind_address),
562 default=default_bind_address, metavar="ADDRESS")
564 if default_ssl_key is not None and default_ssl_cert is not None:
565 optionparser.add_option("--no-ssl", dest="ssl",
566 help="Do not secure HTTP protocol with SSL",
567 default=True, action="store_false")
568 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
569 help=("SSL key path (default: %s)" %
571 default=default_ssl_key, type="string",
572 metavar="SSL_KEY_PATH")
573 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
574 help=("SSL certificate path (default: %s)" %
576 default=default_ssl_cert, type="string",
577 metavar="SSL_CERT_PATH")
579 # Disable the use of fork(2) if the daemon uses threads
580 utils.no_fork = multithreaded
582 options, args = optionparser.parse_args()
584 if getattr(options, "ssl", False):
586 "certificate": options.ssl_cert,
587 "key": options.ssl_key,
590 for name, path in ssl_paths.iteritems():
591 if not os.path.isfile(path):
592 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
593 sys.exit(constants.EXIT_FAILURE)
595 # TODO: By initiating http.HttpSslParams here we would only read the files
596 # once and have a proper validation (isfile returns False on directories)
599 if check_fn is not None:
600 check_fn(options, args)
602 utils.EnsureDirs(dirs)
606 uid = pwd.getpwnam(user).pw_uid
607 gid = grp.getgrnam(group).gr_gid
609 raise errors.ConfigurationError("User or group not existing on system:"
610 " %s:%s" % (user, group))
612 utils.Daemonize(constants.DAEMONS_LOGFILES[daemon_name], uid, gid)
614 utils.WritePidFile(daemon_name)
616 utils.SetupLogging(logfile=constants.DAEMONS_LOGFILES[daemon_name],
618 stderr_logging=not options.fork,
619 multithreaded=multithreaded,
621 syslog=options.syslog,
622 console_logging=console_logging)
623 logging.info("%s daemon startup", daemon_name)
624 exec_fn(options, args)
626 utils.RemovePidFile(daemon_name)