diff --git a/lib/workerpool.py b/lib/workerpool.py index d2fbfddbdbd6fb64cfcb2c2ac8a96b6d192df832..25f31b4abe022a1cbf0d5ddf045671efd440da6a 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -30,6 +30,9 @@ import threading from ganeti import compat +_TERMINATE = object() + + class BaseWorker(threading.Thread, object): """Base worker class for worker pools. @@ -82,34 +85,22 @@ class BaseWorker(threading.Thread, object): while True: try: - # We wait on lock to be told either terminate or do a task. + # Wait on lock to be told either to terminate or to do a task pool._lock.acquire() try: - if pool._ShouldWorkerTerminateUnlocked(self): - break - - # We only wait if there's no task for us. - if not pool._tasks: - logging.debug("Waiting for tasks") - - # wait() releases the lock and sleeps until notified - pool._pool_to_worker.wait() + task = pool._WaitForTaskUnlocked(self) - logging.debug("Notified while waiting") + if task is _TERMINATE: + # Told to terminate + break - # Were we woken up in order to terminate? - if pool._ShouldWorkerTerminateUnlocked(self): - break + if task is None: + # Spurious notification, ignore + continue - if not pool._tasks: - # Spurious notification, ignore - continue + self._current_task = task - # Get task from queue and tell pool about it - try: - self._current_task = pool._tasks.popleft() - finally: - pool._worker_to_pool.notifyAll() + assert self._HasRunningTaskUnlocked() finally: pool._lock.release() @@ -229,6 +220,39 @@ class WorkerPool(object): finally: self._lock.release() + def _WaitForTaskUnlocked(self, worker): + """Waits for a task for a worker. + + @type worker: L{BaseWorker} + @param worker: Worker thread + + """ + if self._ShouldWorkerTerminateUnlocked(worker): + return _TERMINATE + + # We only wait if there's no task for us. + if not self._tasks: + logging.debug("Waiting for tasks") + + # wait() releases the lock and sleeps until notified + self._pool_to_worker.wait() + + logging.debug("Notified while waiting") + + # Were we woken up in order to terminate? + if self._ShouldWorkerTerminateUnlocked(worker): + return _TERMINATE + + if not self._tasks: + # Spurious notification, ignore + return None + + # Get task from queue and tell pool about it + try: + return self._tasks.popleft() + finally: + self._worker_to_pool.notifyAll() + def _ShouldWorkerTerminateUnlocked(self, worker): """Returns whether a worker should terminate.