Commit c2a8e8ba authored by Guido Trotter's avatar Guido Trotter
Browse files

WorkerPool.AddManyTasks



Useful if we want to add many tasks at once, without contention with the
previous one we added starting.
Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parent 4c36bdf5
...@@ -180,6 +180,13 @@ class WorkerPool(object): ...@@ -180,6 +180,13 @@ class WorkerPool(object):
# TODO: Implement dynamic resizing? # TODO: Implement dynamic resizing?
def _WaitWhileQuiescingUnlocked(self):
"""Wait until the worker pool has finished quiescing.
"""
while self._quiescing:
self._pool_to_pool.wait()
def AddTask(self, *args): def AddTask(self, *args):
"""Adds a task to the queue. """Adds a task to the queue.
...@@ -188,11 +195,8 @@ class WorkerPool(object): ...@@ -188,11 +195,8 @@ class WorkerPool(object):
""" """
self._lock.acquire() self._lock.acquire()
try: try:
# Don't add new tasks while we're quiescing self._WaitWhileQuiescingUnlocked()
while self._quiescing:
self._pool_to_pool.wait()
# Add task to internal queue
self._tasks.append(args) self._tasks.append(args)
# Wake one idling worker up # Wake one idling worker up
...@@ -200,6 +204,24 @@ class WorkerPool(object): ...@@ -200,6 +204,24 @@ class WorkerPool(object):
finally: finally:
self._lock.release() self._lock.release()
def AddManyTasks(self, tasks):
"""Add a list of tasks to the queue.
@type tasks: list of tuples
@param tasks: list of args passed to L{BaseWorker.RunTask}
"""
self._lock.acquire()
try:
self._WaitWhileQuiescingUnlocked()
self._tasks.extend(tasks)
for _ in tasks:
self._pool_to_worker.notify()
finally:
self._lock.release()
def _ShouldWorkerTerminateUnlocked(self, worker): def _ShouldWorkerTerminateUnlocked(self, worker):
"""Returns whether a worker should terminate. """Returns whether a worker should terminate.
......
...@@ -121,6 +121,23 @@ class TestWorkerpool(unittest.TestCase): ...@@ -121,6 +121,23 @@ class TestWorkerpool(unittest.TestCase):
wp.TerminateWorkers() wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0) self._CheckWorkerCount(wp, 0)
def testAddManyTasks(self):
wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
wp.AddManyTasks(["Hello world %s" % i for i in range(10)])
wp.AddTask("A separate hello")
wp.AddTask("Once more, hi!")
wp.AddManyTasks([("Hello world %s" % i, ) for i in range(10)])
wp.Quiesce()
self._CheckNoTasks(wp)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def _CheckNoTasks(self, wp): def _CheckNoTasks(self, wp):
wp._lock.acquire() wp._lock.acquire()
try: try:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment