From b3558df1ce05334d20560d16cec79e7e4286ae09 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Tue, 8 Jul 2008 15:03:50 +0000 Subject: [PATCH] workerpool: Don't notify if there was no task Workers have to notify their pool if they finished a task to make the WorkerPool.Quiesce function work. This is done in the finally: clause to notify even in case of an exception. However, before we notified on each run, even if there was no task, thereby creating some sort of an endless loop of notifications. In a future patch we should split the single condition object into several to produce less spurious notifications. While we're at this, this patch also adds two new functions to BaseWorker to query whether it's currently running a task and then uses one of these functions in the WorkerPool instead of querying the internal variable directly. Reviewed-by: iustinp --- lib/workerpool.py | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/lib/workerpool.py b/lib/workerpool.py index 63d85cc8c..0075bd774 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -48,8 +48,6 @@ class BaseWorker(threading.Thread, object): super(BaseWorker, self).__init__() self.pool = pool self.worker_id = worker_id - - # Also used by WorkerPool self._current_task = None def ShouldTerminate(self): @@ -58,6 +56,22 @@ class BaseWorker(threading.Thread, object): """ return self.pool.ShouldWorkerTerminate(self) + def _HasRunningTaskUnlocked(self): + """Returns whether this worker is currently running a task. + + """ + return (self._current_task is not None) + + def HasRunningTask(self): + """Returns whether this worker is currently running a task. + + """ + self.pool._lock.acquire() + try: + return self._HasRunningTaskUnlocked() + finally: + self.pool._lock.release() + def run(self): """Main thread function. @@ -66,7 +80,7 @@ class BaseWorker(threading.Thread, object): """ pool = self.pool - assert self._current_task is None + assert not self.HasRunningTask() while True: try: @@ -78,9 +92,13 @@ class BaseWorker(threading.Thread, object): # We only wait if there's no task for us. if not pool._tasks: + logging.debug("Worker %s: waiting for tasks", self.worker_id) + # wait() releases the lock and sleeps until notified pool._lock.wait() + logging.debug("Worker %s: notified while waiting", self.worker_id) + # Were we woken up in order to terminate? if pool._ShouldWorkerTerminateUnlocked(self): break @@ -99,20 +117,26 @@ class BaseWorker(threading.Thread, object): # Run the actual task try: + logging.debug("Worker %s: starting task %r", + self.worker_id, self._current_task) self.RunTask(*self._current_task) + logging.debug("Worker %s: done with task %r", + self.worker_id, self._current_task) except: logging.error("Worker %s: Caught unhandled exception", self.worker_id, exc_info=True) finally: - self._current_task = None - # Notify pool pool._lock.acquire() try: - pool._lock.notifyAll() + if self._current_task: + self._current_task = None + pool._lock.notifyAll() finally: pool._lock.release() + logging.debug("Worker %s: terminates", self.worker_id) + def RunTask(self, *args): """Function called to start a task. @@ -198,7 +222,7 @@ class WorkerPool(object): """ for worker in self._workers + self._termworkers: - if worker._current_task is not None: + if worker._HasRunningTaskUnlocked(): return True return False -- GitLab