Commit 27caa993 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

workerpool: Allow processing of new tasks to be stopped



This is different from “Quiesce” in the sense that this function just
changes an internal flag and doesn't wait for the queue to be empty.
Tasks already being processed continue normally, but no new tasks will
be started. New tasks can still be added, but won't be processed.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 2db05c94
......@@ -246,6 +246,7 @@ class WorkerPool(object):
self._last_worker_id = 0
self._workers = []
self._quiescing = False
self._active = True
# Terminating workers
self._termworkers = []
......@@ -340,6 +341,28 @@ class WorkerPool(object):
finally:
self._lock.release()
def SetActive(self, active):
"""Enable/disable processing of tasks.
This is different from L{Quiesce} in the sense that this function just
changes an internal flag and doesn't wait for the queue to be empty. Tasks
already being processed continue normally, but no new tasks will be
started. New tasks can still be added.
@type active: bool
@param active: Whether tasks should be processed
"""
self._lock.acquire()
try:
self._active = active
if active:
# Tell all workers to continue processing
self._pool_to_worker.notifyAll()
finally:
self._lock.release()
def _WaitForTaskUnlocked(self, worker):
"""Waits for a task for a worker.
......@@ -351,7 +374,7 @@ class WorkerPool(object):
return _TERMINATE
# We only wait if there's no task for us.
if not self._tasks:
if not (self._active and self._tasks):
logging.debug("Waiting for tasks")
while True:
......@@ -364,7 +387,8 @@ class WorkerPool(object):
if self._ShouldWorkerTerminateUnlocked(worker):
return _TERMINATE
if self._tasks:
# Just loop if pool is not processing tasks at this time
if self._active and self._tasks:
break
# Get task from queue and tell pool about it
......
......@@ -170,6 +170,53 @@ class TestWorkerpool(unittest.TestCase):
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testActive(self):
ctx = CountingContext()
wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 5)
self.assertTrue(wp._active)
# Process some tasks
for _ in range(10):
wp.AddTask((ctx, None))
wp.Quiesce()
self._CheckNoTasks(wp)
self.assertEquals(ctx.GetDoneTasks(), 10)
# Repeat a few times
for count in range(10):
# Deactivate pool
wp.SetActive(False)
self._CheckNoTasks(wp)
# Queue some more tasks
for _ in range(10):
wp.AddTask((ctx, None))
for _ in range(5):
# Short delays to give other threads a chance to cause breakage
time.sleep(.01)
wp.AddTask((ctx, "Hello world %s" % 999))
self.assertFalse(wp._active)
self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
# Start processing again
wp.SetActive(True)
self.assertTrue(wp._active)
# Wait for tasks to finish
wp.Quiesce()
self._CheckNoTasks(wp)
self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
self._CheckWorkerCount(wp, 5)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testChecksum(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment