From 21c5ad52c8a75d3781f2f0b2e9d7e5688559dbe8 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Sat, 17 Jul 2010 22:32:43 +0200 Subject: [PATCH] workerpool: Move waiting for new tasks for a worker to the pool This way fewer private variables of the pool are accesssed by the worker. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Iustin Pop <iustin@google.com> --- lib/workerpool.py | 68 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/lib/workerpool.py b/lib/workerpool.py index d2fbfddbd..25f31b4ab 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -30,6 +30,9 @@ import threading from ganeti import compat +_TERMINATE = object() + + class BaseWorker(threading.Thread, object): """Base worker class for worker pools. @@ -82,34 +85,22 @@ class BaseWorker(threading.Thread, object): while True: try: - # We wait on lock to be told either terminate or do a task. + # Wait on lock to be told either to terminate or to do a task pool._lock.acquire() try: - if pool._ShouldWorkerTerminateUnlocked(self): - break - - # We only wait if there's no task for us. - if not pool._tasks: - logging.debug("Waiting for tasks") - - # wait() releases the lock and sleeps until notified - pool._pool_to_worker.wait() + task = pool._WaitForTaskUnlocked(self) - logging.debug("Notified while waiting") + if task is _TERMINATE: + # Told to terminate + break - # Were we woken up in order to terminate? - if pool._ShouldWorkerTerminateUnlocked(self): - break + if task is None: + # Spurious notification, ignore + continue - if not pool._tasks: - # Spurious notification, ignore - continue + self._current_task = task - # Get task from queue and tell pool about it - try: - self._current_task = pool._tasks.popleft() - finally: - pool._worker_to_pool.notifyAll() + assert self._HasRunningTaskUnlocked() finally: pool._lock.release() @@ -229,6 +220,39 @@ class WorkerPool(object): finally: self._lock.release() + def _WaitForTaskUnlocked(self, worker): + """Waits for a task for a worker. + + @type worker: L{BaseWorker} + @param worker: Worker thread + + """ + if self._ShouldWorkerTerminateUnlocked(worker): + return _TERMINATE + + # We only wait if there's no task for us. + if not self._tasks: + logging.debug("Waiting for tasks") + + # wait() releases the lock and sleeps until notified + self._pool_to_worker.wait() + + logging.debug("Notified while waiting") + + # Were we woken up in order to terminate? + if self._ShouldWorkerTerminateUnlocked(worker): + return _TERMINATE + + if not self._tasks: + # Spurious notification, ignore + return None + + # Get task from queue and tell pool about it + try: + return self._tasks.popleft() + finally: + self._worker_to_pool.notifyAll() + def _ShouldWorkerTerminateUnlocked(self, worker): """Returns whether a worker should terminate. -- GitLab