Commit 53b1d12b authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Split conditions in worker pool

This patch splits the single threading.Condition object used in the
worker pool for synchronization into three.

- worker_to_pool: Notified if a worker wants to notify the pool
- pool_to_worker: Notified if the pool wants to notify a single
  or all workers
- pool_to_pool: Used for synchronization in Quiesce

Reviewed-by: ultrotter
parent 84b58db2
......@@ -95,7 +95,7 @@ class BaseWorker(threading.Thread, object):
logging.debug("Worker %s: waiting for tasks", self.worker_id)
# wait() releases the lock and sleeps until notified
pool._lock.wait()
pool._pool_to_worker.wait()
logging.debug("Worker %s: notified while waiting", self.worker_id)
......@@ -111,7 +111,7 @@ class BaseWorker(threading.Thread, object):
try:
self._current_task = pool._tasks.popleft()
finally:
pool._lock.notifyAll()
pool._worker_to_pool.notifyAll()
finally:
pool._lock.release()
......@@ -131,7 +131,7 @@ class BaseWorker(threading.Thread, object):
try:
if self._current_task:
self._current_task = None
pool._lock.notifyAll()
pool._worker_to_pool.notifyAll()
finally:
pool._lock.release()
......@@ -165,7 +165,10 @@ class WorkerPool(object):
"""
# Some of these variables are accessed by BaseWorker
self._lock = threading.Condition(threading.Lock())
self._lock = threading.Lock()
self._pool_to_pool = threading.Condition(self._lock)
self._pool_to_worker = threading.Condition(self._lock)
self._worker_to_pool = threading.Condition(self._lock)
self._worker_class = worker_class
self._last_worker_id = 0
self._workers = []
......@@ -193,11 +196,13 @@ class WorkerPool(object):
try:
# Don't add new tasks while we're quiescing
while self._quiescing:
self._lock.wait()
self._pool_to_pool.wait()
# Add task to internal queue
self._tasks.append(args)
self._lock.notify()
# Wake one idling worker up
self._pool_to_worker.notify()
finally:
self._lock.release()
......@@ -236,13 +241,13 @@ class WorkerPool(object):
# Wait while there are tasks pending or running
while self._tasks or self._HasRunningTasksUnlocked():
self._lock.wait()
self._worker_to_pool.wait()
finally:
self._quiescing = False
# Make sure AddTasks continues in case it was waiting
self._lock.notifyAll()
self._pool_to_pool.notifyAll()
self._lock.release()
......@@ -277,7 +282,7 @@ class WorkerPool(object):
self._termworkers += termworkers
# Notify workers that something has changed
self._lock.notifyAll()
self._pool_to_worker.notifyAll()
# Join all terminating workers
self._lock.release()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment