diff --git a/lib/server/masterd.py b/lib/server/masterd.py index ba840d61c2f61e32c814558f252d9bd97037fc63..3985b635cce3afba02a083540e26f47effe0fddc 100644 --- a/lib/server/masterd.py +++ b/lib/server/masterd.py @@ -125,6 +125,63 @@ class MasterClientHandler(daemon.AsyncTerminatedMessageStream): self.server.request_workers.AddTask((self.server, message, self)) +class _MasterShutdownCheck: + """Logic for master daemon shutdown. + + """ + #: How long to wait between checks + _CHECK_INTERVAL = 5.0 + + #: How long to wait after all jobs are done (e.g. to give clients time to + #: retrieve the job status) + _SHUTDOWN_LINGER = 5.0 + + def __init__(self): + """Initializes this class. + + """ + self._had_active_jobs = None + self._linger_timeout = None + + def __call__(self, jq_prepare_result): + """Determines if master daemon is ready for shutdown. + + @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown} + @rtype: None or number + @return: None if master daemon is ready, timeout if the check must be + repeated + + """ + if jq_prepare_result: + # Check again shortly + logging.info("Job queue has been notified for shutdown but is still" + " busy; next check in %s seconds", self._CHECK_INTERVAL) + self._had_active_jobs = True + return self._CHECK_INTERVAL + + if not self._had_active_jobs: + # Can shut down as there were no active jobs on the first check + return None + + # No jobs are running anymore, but maybe some clients want to collect some + # information. Give them a short amount of time. + if self._linger_timeout is None: + self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True) + + remaining = self._linger_timeout.Remaining() + + logging.info("Job queue no longer busy; shutting down master daemon" + " in %s seconds", remaining) + + # TODO: Should the master daemon socket be closed at this point? Doing so + # wouldn't affect existing connections. + + if remaining < 0: + return None + else: + return remaining + + class MasterServer(daemon.AsyncStreamServer): """Master Server. @@ -154,6 +211,8 @@ class MasterServer(daemon.AsyncStreamServer): self.context = None self.request_workers = None + self._shutdown_check = None + def handle_connection(self, connected_socket, client_address): # TODO: add connection count and limit the number of open connections to a # maximum number to avoid breaking for lack of file descriptors or memory. @@ -165,6 +224,15 @@ class MasterServer(daemon.AsyncStreamServer): CLIENT_REQUEST_WORKERS, ClientRequestWorker) + def WaitForShutdown(self): + """Prepares server for shutdown. + + """ + if self._shutdown_check is None: + self._shutdown_check = _MasterShutdownCheck() + + return self._shutdown_check(self.context.jobqueue.PrepareShutdown()) + def server_cleanup(self): """Cleanup the server. @@ -636,7 +704,7 @@ def ExecMasterd(options, args, prep_data): # pylint: disable=W0613 try: master.setup_queue() try: - mainloop.Run() + mainloop.Run(shutdown_wait_fn=master.WaitForShutdown) finally: master.server_cleanup() finally: @@ -644,6 +712,8 @@ def ExecMasterd(options, args, prep_data): # pylint: disable=W0613 finally: utils.RemoveFile(constants.MASTER_SOCKET) + logging.info("Clean master daemon shutdown") + def Main(): """Main function"""