diff --git a/lib/workerpool.py b/lib/workerpool.py index 63d85cc8c73d39cfe0e38a3aae486a976844ef59..0075bd774236a2dd4d1be4a1bc5f2d5403e7ee8b 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -48,8 +48,6 @@ class BaseWorker(threading.Thread, object): super(BaseWorker, self).__init__() self.pool = pool self.worker_id = worker_id - - # Also used by WorkerPool self._current_task = None def ShouldTerminate(self): @@ -58,6 +56,22 @@ class BaseWorker(threading.Thread, object): """ return self.pool.ShouldWorkerTerminate(self) + def _HasRunningTaskUnlocked(self): + """Returns whether this worker is currently running a task. + + """ + return (self._current_task is not None) + + def HasRunningTask(self): + """Returns whether this worker is currently running a task. + + """ + self.pool._lock.acquire() + try: + return self._HasRunningTaskUnlocked() + finally: + self.pool._lock.release() + def run(self): """Main thread function. @@ -66,7 +80,7 @@ class BaseWorker(threading.Thread, object): """ pool = self.pool - assert self._current_task is None + assert not self.HasRunningTask() while True: try: @@ -78,9 +92,13 @@ class BaseWorker(threading.Thread, object): # We only wait if there's no task for us. if not pool._tasks: + logging.debug("Worker %s: waiting for tasks", self.worker_id) + # wait() releases the lock and sleeps until notified pool._lock.wait() + logging.debug("Worker %s: notified while waiting", self.worker_id) + # Were we woken up in order to terminate? if pool._ShouldWorkerTerminateUnlocked(self): break @@ -99,20 +117,26 @@ class BaseWorker(threading.Thread, object): # Run the actual task try: + logging.debug("Worker %s: starting task %r", + self.worker_id, self._current_task) self.RunTask(*self._current_task) + logging.debug("Worker %s: done with task %r", + self.worker_id, self._current_task) except: logging.error("Worker %s: Caught unhandled exception", self.worker_id, exc_info=True) finally: - self._current_task = None - # Notify pool pool._lock.acquire() try: - pool._lock.notifyAll() + if self._current_task: + self._current_task = None + pool._lock.notifyAll() finally: pool._lock.release() + logging.debug("Worker %s: terminates", self.worker_id) + def RunTask(self, *args): """Function called to start a task. @@ -198,7 +222,7 @@ class WorkerPool(object): """ for worker in self._workers + self._termworkers: - if worker._current_task is not None: + if worker._HasRunningTaskUnlocked(): return True return False