From 5483fd739f21071e98aeff1805cd172f59691b7f Mon Sep 17 00:00:00 2001 From: Michael Hanselmann Date: Thu, 17 Nov 2011 12:07:57 +0100 Subject: [PATCH] masterd: Shutdown only once running jobs have been processed Until now, if masterd received a fatal signal, it would start shutting down immediately. In the meantime it would hang while jobs are still processed. Clients couldn't connect anymore to retrieve a jobs' status. This this patch masterd checks if any job is running before shutting down. If there is it'll check again every five seconds. Once all jobs are finished, it waits another five seconds to give clients a chance to retrieve the jobs' status. After that masterd will shutdown in a clean fashion. If a second signal is received the old behaviour is preserved. Signed-off-by: Michael Hanselmann Reviewed-by: Iustin Pop --- lib/server/masterd.py | 72 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/lib/server/masterd.py b/lib/server/masterd.py index ba840d6..3985b63 100644 --- a/lib/server/masterd.py +++ b/lib/server/masterd.py @@ -125,6 +125,63 @@ class MasterClientHandler(daemon.AsyncTerminatedMessageStream): self.server.request_workers.AddTask((self.server, message, self)) +class _MasterShutdownCheck: + """Logic for master daemon shutdown. + + """ + #: How long to wait between checks + _CHECK_INTERVAL = 5.0 + + #: How long to wait after all jobs are done (e.g. to give clients time to + #: retrieve the job status) + _SHUTDOWN_LINGER = 5.0 + + def __init__(self): + """Initializes this class. + + """ + self._had_active_jobs = None + self._linger_timeout = None + + def __call__(self, jq_prepare_result): + """Determines if master daemon is ready for shutdown. + + @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown} + @rtype: None or number + @return: None if master daemon is ready, timeout if the check must be + repeated + + """ + if jq_prepare_result: + # Check again shortly + logging.info("Job queue has been notified for shutdown but is still" + " busy; next check in %s seconds", self._CHECK_INTERVAL) + self._had_active_jobs = True + return self._CHECK_INTERVAL + + if not self._had_active_jobs: + # Can shut down as there were no active jobs on the first check + return None + + # No jobs are running anymore, but maybe some clients want to collect some + # information. Give them a short amount of time. + if self._linger_timeout is None: + self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True) + + remaining = self._linger_timeout.Remaining() + + logging.info("Job queue no longer busy; shutting down master daemon" + " in %s seconds", remaining) + + # TODO: Should the master daemon socket be closed at this point? Doing so + # wouldn't affect existing connections. + + if remaining < 0: + return None + else: + return remaining + + class MasterServer(daemon.AsyncStreamServer): """Master Server. @@ -154,6 +211,8 @@ class MasterServer(daemon.AsyncStreamServer): self.context = None self.request_workers = None + self._shutdown_check = None + def handle_connection(self, connected_socket, client_address): # TODO: add connection count and limit the number of open connections to a # maximum number to avoid breaking for lack of file descriptors or memory. @@ -165,6 +224,15 @@ class MasterServer(daemon.AsyncStreamServer): CLIENT_REQUEST_WORKERS, ClientRequestWorker) + def WaitForShutdown(self): + """Prepares server for shutdown. + + """ + if self._shutdown_check is None: + self._shutdown_check = _MasterShutdownCheck() + + return self._shutdown_check(self.context.jobqueue.PrepareShutdown()) + def server_cleanup(self): """Cleanup the server. @@ -636,7 +704,7 @@ def ExecMasterd(options, args, prep_data): # pylint: disable=W0613 try: master.setup_queue() try: - mainloop.Run() + mainloop.Run(shutdown_wait_fn=master.WaitForShutdown) finally: master.server_cleanup() finally: @@ -644,6 +712,8 @@ def ExecMasterd(options, args, prep_data): # pylint: disable=W0613 finally: utils.RemoveFile(constants.MASTER_SOCKET) + logging.info("Clean master daemon shutdown") + def Main(): """Main function""" -- 1.7.10.4