Commit 21c5ad52 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

workerpool: Move waiting for new tasks for a worker to the pool



This way fewer private variables of the pool are accesssed by the worker.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 189d2714
......@@ -30,6 +30,9 @@ import threading
from ganeti import compat
_TERMINATE = object()
class BaseWorker(threading.Thread, object):
"""Base worker class for worker pools.
......@@ -82,34 +85,22 @@ class BaseWorker(threading.Thread, object):
while True:
try:
# We wait on lock to be told either terminate or do a task.
# Wait on lock to be told either to terminate or to do a task
pool._lock.acquire()
try:
if pool._ShouldWorkerTerminateUnlocked(self):
break
# We only wait if there's no task for us.
if not pool._tasks:
logging.debug("Waiting for tasks")
# wait() releases the lock and sleeps until notified
pool._pool_to_worker.wait()
task = pool._WaitForTaskUnlocked(self)
logging.debug("Notified while waiting")
if task is _TERMINATE:
# Told to terminate
break
# Were we woken up in order to terminate?
if pool._ShouldWorkerTerminateUnlocked(self):
break
if task is None:
# Spurious notification, ignore
continue
if not pool._tasks:
# Spurious notification, ignore
continue
self._current_task = task
# Get task from queue and tell pool about it
try:
self._current_task = pool._tasks.popleft()
finally:
pool._worker_to_pool.notifyAll()
assert self._HasRunningTaskUnlocked()
finally:
pool._lock.release()
......@@ -229,6 +220,39 @@ class WorkerPool(object):
finally:
self._lock.release()
def _WaitForTaskUnlocked(self, worker):
"""Waits for a task for a worker.
@type worker: L{BaseWorker}
@param worker: Worker thread
"""
if self._ShouldWorkerTerminateUnlocked(worker):
return _TERMINATE
# We only wait if there's no task for us.
if not self._tasks:
logging.debug("Waiting for tasks")
# wait() releases the lock and sleeps until notified
self._pool_to_worker.wait()
logging.debug("Notified while waiting")
# Were we woken up in order to terminate?
if self._ShouldWorkerTerminateUnlocked(worker):
return _TERMINATE
if not self._tasks:
# Spurious notification, ignore
return None
# Get task from queue and tell pool about it
try:
return self._tasks.popleft()
finally:
self._worker_to_pool.notifyAll()
def _ShouldWorkerTerminateUnlocked(self, worker):
"""Returns whether a worker should terminate.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment