Commit 5483fd73 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

masterd: Shutdown only once running jobs have been processed



Until now, if masterd received a fatal signal, it would start shutting
down immediately. In the meantime it would hang while jobs are still
processed. Clients couldn't connect anymore to retrieve a jobs' status.

This this patch masterd checks if any job is running before shutting
down. If there is it'll check again every five seconds. Once all jobs
are finished, it waits another five seconds to give clients a chance to
retrieve the jobs' status. After that masterd will shutdown in a clean
fashion.

If a second signal is received the old behaviour is preserved.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 2d6b5414
......@@ -125,6 +125,63 @@ class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
self.server.request_workers.AddTask((self.server, message, self))
class _MasterShutdownCheck:
"""Logic for master daemon shutdown.
"""
#: How long to wait between checks
_CHECK_INTERVAL = 5.0
#: How long to wait after all jobs are done (e.g. to give clients time to
#: retrieve the job status)
_SHUTDOWN_LINGER = 5.0
def __init__(self):
"""Initializes this class.
"""
self._had_active_jobs = None
self._linger_timeout = None
def __call__(self, jq_prepare_result):
"""Determines if master daemon is ready for shutdown.
@param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
@rtype: None or number
@return: None if master daemon is ready, timeout if the check must be
repeated
"""
if jq_prepare_result:
# Check again shortly
logging.info("Job queue has been notified for shutdown but is still"
" busy; next check in %s seconds", self._CHECK_INTERVAL)
self._had_active_jobs = True
return self._CHECK_INTERVAL
if not self._had_active_jobs:
# Can shut down as there were no active jobs on the first check
return None
# No jobs are running anymore, but maybe some clients want to collect some
# information. Give them a short amount of time.
if self._linger_timeout is None:
self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
remaining = self._linger_timeout.Remaining()
logging.info("Job queue no longer busy; shutting down master daemon"
" in %s seconds", remaining)
# TODO: Should the master daemon socket be closed at this point? Doing so
# wouldn't affect existing connections.
if remaining < 0:
return None
else:
return remaining
class MasterServer(daemon.AsyncStreamServer):
"""Master Server.
......@@ -154,6 +211,8 @@ class MasterServer(daemon.AsyncStreamServer):
self.context = None
self.request_workers = None
self._shutdown_check = None
def handle_connection(self, connected_socket, client_address):
# TODO: add connection count and limit the number of open connections to a
# maximum number to avoid breaking for lack of file descriptors or memory.
......@@ -165,6 +224,15 @@ class MasterServer(daemon.AsyncStreamServer):
CLIENT_REQUEST_WORKERS,
ClientRequestWorker)
def WaitForShutdown(self):
"""Prepares server for shutdown.
"""
if self._shutdown_check is None:
self._shutdown_check = _MasterShutdownCheck()
return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
def server_cleanup(self):
"""Cleanup the server.
......@@ -636,7 +704,7 @@ def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
try:
master.setup_queue()
try:
mainloop.Run()
mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
finally:
master.server_cleanup()
finally:
......@@ -644,6 +712,8 @@ def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
finally:
utils.RemoveFile(constants.MASTER_SOCKET)
logging.info("Clean master daemon shutdown")
def Main():
"""Main function"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment