Revision 5483fd73
b/lib/server/masterd.py | ||
---|---|---|
125 | 125 |
self.server.request_workers.AddTask((self.server, message, self)) |
126 | 126 |
|
127 | 127 |
|
128 |
class _MasterShutdownCheck: |
|
129 |
"""Logic for master daemon shutdown. |
|
130 |
|
|
131 |
""" |
|
132 |
#: How long to wait between checks |
|
133 |
_CHECK_INTERVAL = 5.0 |
|
134 |
|
|
135 |
#: How long to wait after all jobs are done (e.g. to give clients time to |
|
136 |
#: retrieve the job status) |
|
137 |
_SHUTDOWN_LINGER = 5.0 |
|
138 |
|
|
139 |
def __init__(self): |
|
140 |
"""Initializes this class. |
|
141 |
|
|
142 |
""" |
|
143 |
self._had_active_jobs = None |
|
144 |
self._linger_timeout = None |
|
145 |
|
|
146 |
def __call__(self, jq_prepare_result): |
|
147 |
"""Determines if master daemon is ready for shutdown. |
|
148 |
|
|
149 |
@param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown} |
|
150 |
@rtype: None or number |
|
151 |
@return: None if master daemon is ready, timeout if the check must be |
|
152 |
repeated |
|
153 |
|
|
154 |
""" |
|
155 |
if jq_prepare_result: |
|
156 |
# Check again shortly |
|
157 |
logging.info("Job queue has been notified for shutdown but is still" |
|
158 |
" busy; next check in %s seconds", self._CHECK_INTERVAL) |
|
159 |
self._had_active_jobs = True |
|
160 |
return self._CHECK_INTERVAL |
|
161 |
|
|
162 |
if not self._had_active_jobs: |
|
163 |
# Can shut down as there were no active jobs on the first check |
|
164 |
return None |
|
165 |
|
|
166 |
# No jobs are running anymore, but maybe some clients want to collect some |
|
167 |
# information. Give them a short amount of time. |
|
168 |
if self._linger_timeout is None: |
|
169 |
self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True) |
|
170 |
|
|
171 |
remaining = self._linger_timeout.Remaining() |
|
172 |
|
|
173 |
logging.info("Job queue no longer busy; shutting down master daemon" |
|
174 |
" in %s seconds", remaining) |
|
175 |
|
|
176 |
# TODO: Should the master daemon socket be closed at this point? Doing so |
|
177 |
# wouldn't affect existing connections. |
|
178 |
|
|
179 |
if remaining < 0: |
|
180 |
return None |
|
181 |
else: |
|
182 |
return remaining |
|
183 |
|
|
184 |
|
|
128 | 185 |
class MasterServer(daemon.AsyncStreamServer): |
129 | 186 |
"""Master Server. |
130 | 187 |
|
... | ... | |
154 | 211 |
self.context = None |
155 | 212 |
self.request_workers = None |
156 | 213 |
|
214 |
self._shutdown_check = None |
|
215 |
|
|
157 | 216 |
def handle_connection(self, connected_socket, client_address): |
158 | 217 |
# TODO: add connection count and limit the number of open connections to a |
159 | 218 |
# maximum number to avoid breaking for lack of file descriptors or memory. |
... | ... | |
165 | 224 |
CLIENT_REQUEST_WORKERS, |
166 | 225 |
ClientRequestWorker) |
167 | 226 |
|
227 |
def WaitForShutdown(self): |
|
228 |
"""Prepares server for shutdown. |
|
229 |
|
|
230 |
""" |
|
231 |
if self._shutdown_check is None: |
|
232 |
self._shutdown_check = _MasterShutdownCheck() |
|
233 |
|
|
234 |
return self._shutdown_check(self.context.jobqueue.PrepareShutdown()) |
|
235 |
|
|
168 | 236 |
def server_cleanup(self): |
169 | 237 |
"""Cleanup the server. |
170 | 238 |
|
... | ... | |
636 | 704 |
try: |
637 | 705 |
master.setup_queue() |
638 | 706 |
try: |
639 |
mainloop.Run() |
|
707 |
mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
|
|
640 | 708 |
finally: |
641 | 709 |
master.server_cleanup() |
642 | 710 |
finally: |
... | ... | |
644 | 712 |
finally: |
645 | 713 |
utils.RemoveFile(constants.MASTER_SOCKET) |
646 | 714 |
|
715 |
logging.info("Clean master daemon shutdown") |
|
716 |
|
|
647 | 717 |
|
648 | 718 |
def Main(): |
649 | 719 |
"""Main function""" |
Also available in: Unified diff