Commit b3558df1 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

workerpool: Don't notify if there was no task

Workers have to notify their pool if they finished a task to make
the WorkerPool.Quiesce function work. This is done in the finally:
clause to notify even in case of an exception. However, before
we notified on each run, even if there was no task, thereby creating
some sort of an endless loop of notifications. In a future patch
we should split the single condition object into several to
produce less spurious notifications.

While we're at this, this patch also adds two new functions to
BaseWorker to query whether it's currently running a task and then
uses one of these functions in the WorkerPool instead of querying
the internal variable directly.

Reviewed-by: iustinp
parent 195c7f91
......@@ -48,8 +48,6 @@ class BaseWorker(threading.Thread, object):
super(BaseWorker, self).__init__()
self.pool = pool
self.worker_id = worker_id
# Also used by WorkerPool
self._current_task = None
def ShouldTerminate(self):
......@@ -58,6 +56,22 @@ class BaseWorker(threading.Thread, object):
"""
return self.pool.ShouldWorkerTerminate(self)
def _HasRunningTaskUnlocked(self):
"""Returns whether this worker is currently running a task.
"""
return (self._current_task is not None)
def HasRunningTask(self):
"""Returns whether this worker is currently running a task.
"""
self.pool._lock.acquire()
try:
return self._HasRunningTaskUnlocked()
finally:
self.pool._lock.release()
def run(self):
"""Main thread function.
......@@ -66,7 +80,7 @@ class BaseWorker(threading.Thread, object):
"""
pool = self.pool
assert self._current_task is None
assert not self.HasRunningTask()
while True:
try:
......@@ -78,9 +92,13 @@ class BaseWorker(threading.Thread, object):
# We only wait if there's no task for us.
if not pool._tasks:
logging.debug("Worker %s: waiting for tasks", self.worker_id)
# wait() releases the lock and sleeps until notified
pool._lock.wait()
logging.debug("Worker %s: notified while waiting", self.worker_id)
# Were we woken up in order to terminate?
if pool._ShouldWorkerTerminateUnlocked(self):
break
......@@ -99,20 +117,26 @@ class BaseWorker(threading.Thread, object):
# Run the actual task
try:
logging.debug("Worker %s: starting task %r",
self.worker_id, self._current_task)
self.RunTask(*self._current_task)
logging.debug("Worker %s: done with task %r",
self.worker_id, self._current_task)
except:
logging.error("Worker %s: Caught unhandled exception",
self.worker_id, exc_info=True)
finally:
self._current_task = None
# Notify pool
pool._lock.acquire()
try:
pool._lock.notifyAll()
if self._current_task:
self._current_task = None
pool._lock.notifyAll()
finally:
pool._lock.release()
logging.debug("Worker %s: terminates", self.worker_id)
def RunTask(self, *args):
"""Function called to start a task.
......@@ -198,7 +222,7 @@ class WorkerPool(object):
"""
for worker in self._workers + self._termworkers:
if worker._current_task is not None:
if worker._HasRunningTaskUnlocked():
return True
return False
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment