From 5483fd739f21071e98aeff1805cd172f59691b7f Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <hansmi@google.com>
Date: Thu, 17 Nov 2011 12:07:57 +0100
Subject: [PATCH] masterd: Shutdown only once running jobs have been processed

Until now, if masterd received a fatal signal, it would start shutting
down immediately. In the meantime it would hang while jobs are still
processed. Clients couldn't connect anymore to retrieve a jobs' status.

This this patch masterd checks if any job is running before shutting
down. If there is it'll check again every five seconds. Once all jobs
are finished, it waits another five seconds to give clients a chance to
retrieve the jobs' status. After that masterd will shutdown in a clean
fashion.

If a second signal is received the old behaviour is preserved.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>
---
 lib/server/masterd.py | 72 ++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 71 insertions(+), 1 deletion(-)

diff --git a/lib/server/masterd.py b/lib/server/masterd.py
index ba840d61c..3985b635c 100644
--- a/lib/server/masterd.py
+++ b/lib/server/masterd.py
@@ -125,6 +125,63 @@ class MasterClientHandler(daemon.AsyncTerminatedMessageStream):
     self.server.request_workers.AddTask((self.server, message, self))
 
 
+class _MasterShutdownCheck:
+  """Logic for master daemon shutdown.
+
+  """
+  #: How long to wait between checks
+  _CHECK_INTERVAL = 5.0
+
+  #: How long to wait after all jobs are done (e.g. to give clients time to
+  #: retrieve the job status)
+  _SHUTDOWN_LINGER = 5.0
+
+  def __init__(self):
+    """Initializes this class.
+
+    """
+    self._had_active_jobs = None
+    self._linger_timeout = None
+
+  def __call__(self, jq_prepare_result):
+    """Determines if master daemon is ready for shutdown.
+
+    @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
+    @rtype: None or number
+    @return: None if master daemon is ready, timeout if the check must be
+             repeated
+
+    """
+    if jq_prepare_result:
+      # Check again shortly
+      logging.info("Job queue has been notified for shutdown but is still"
+                   " busy; next check in %s seconds", self._CHECK_INTERVAL)
+      self._had_active_jobs = True
+      return self._CHECK_INTERVAL
+
+    if not self._had_active_jobs:
+      # Can shut down as there were no active jobs on the first check
+      return None
+
+    # No jobs are running anymore, but maybe some clients want to collect some
+    # information. Give them a short amount of time.
+    if self._linger_timeout is None:
+      self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
+
+    remaining = self._linger_timeout.Remaining()
+
+    logging.info("Job queue no longer busy; shutting down master daemon"
+                 " in %s seconds", remaining)
+
+    # TODO: Should the master daemon socket be closed at this point? Doing so
+    # wouldn't affect existing connections.
+
+    if remaining < 0:
+      return None
+    else:
+      return remaining
+
+
 class MasterServer(daemon.AsyncStreamServer):
   """Master Server.
 
@@ -154,6 +211,8 @@ class MasterServer(daemon.AsyncStreamServer):
     self.context = None
     self.request_workers = None
 
+    self._shutdown_check = None
+
   def handle_connection(self, connected_socket, client_address):
     # TODO: add connection count and limit the number of open connections to a
     # maximum number to avoid breaking for lack of file descriptors or memory.
@@ -165,6 +224,15 @@ class MasterServer(daemon.AsyncStreamServer):
                                                  CLIENT_REQUEST_WORKERS,
                                                  ClientRequestWorker)
 
+  def WaitForShutdown(self):
+    """Prepares server for shutdown.
+
+    """
+    if self._shutdown_check is None:
+      self._shutdown_check = _MasterShutdownCheck()
+
+    return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
+
   def server_cleanup(self):
     """Cleanup the server.
 
@@ -636,7 +704,7 @@ def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
     try:
       master.setup_queue()
       try:
-        mainloop.Run()
+        mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
       finally:
         master.server_cleanup()
     finally:
@@ -644,6 +712,8 @@ def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
   finally:
     utils.RemoveFile(constants.MASTER_SOCKET)
 
+  logging.info("Clean master daemon shutdown")
+
 
 def Main():
   """Main function"""
-- 
GitLab