From 27caa99307862e7a37a9f0062d009710d875b320 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Wed, 16 Nov 2011 09:37:49 +0100 Subject: [PATCH] workerpool: Allow processing of new tasks to be stopped MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is different from βQuiesceβ in the sense that this function just changes an internal flag and doesn't wait for the queue to be empty. Tasks already being processed continue normally, but no new tasks will be started. New tasks can still be added, but won't be processed. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Iustin Pop <iustin@google.com> --- lib/workerpool.py | 28 ++++++++++++++++-- test/ganeti.workerpool_unittest.py | 47 ++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/lib/workerpool.py b/lib/workerpool.py index 91f213bc2..6d533fe2d 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -246,6 +246,7 @@ class WorkerPool(object): self._last_worker_id = 0 self._workers = [] self._quiescing = False + self._active = True # Terminating workers self._termworkers = [] @@ -340,6 +341,28 @@ class WorkerPool(object): finally: self._lock.release() + def SetActive(self, active): + """Enable/disable processing of tasks. + + This is different from L{Quiesce} in the sense that this function just + changes an internal flag and doesn't wait for the queue to be empty. Tasks + already being processed continue normally, but no new tasks will be + started. New tasks can still be added. + + @type active: bool + @param active: Whether tasks should be processed + + """ + self._lock.acquire() + try: + self._active = active + + if active: + # Tell all workers to continue processing + self._pool_to_worker.notifyAll() + finally: + self._lock.release() + def _WaitForTaskUnlocked(self, worker): """Waits for a task for a worker. @@ -351,7 +374,7 @@ class WorkerPool(object): return _TERMINATE # We only wait if there's no task for us. - if not self._tasks: + if not (self._active and self._tasks): logging.debug("Waiting for tasks") while True: @@ -364,7 +387,8 @@ class WorkerPool(object): if self._ShouldWorkerTerminateUnlocked(worker): return _TERMINATE - if self._tasks: + # Just loop if pool is not processing tasks at this time + if self._active and self._tasks: break # Get task from queue and tell pool about it diff --git a/test/ganeti.workerpool_unittest.py b/test/ganeti.workerpool_unittest.py index 89b3b1adf..1ad8d741d 100755 --- a/test/ganeti.workerpool_unittest.py +++ b/test/ganeti.workerpool_unittest.py @@ -170,6 +170,53 @@ class TestWorkerpool(unittest.TestCase): wp.TerminateWorkers() self._CheckWorkerCount(wp, 0) + def testActive(self): + ctx = CountingContext() + wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker) + try: + self._CheckWorkerCount(wp, 5) + self.assertTrue(wp._active) + + # Process some tasks + for _ in range(10): + wp.AddTask((ctx, None)) + + wp.Quiesce() + self._CheckNoTasks(wp) + self.assertEquals(ctx.GetDoneTasks(), 10) + + # Repeat a few times + for count in range(10): + # Deactivate pool + wp.SetActive(False) + self._CheckNoTasks(wp) + + # Queue some more tasks + for _ in range(10): + wp.AddTask((ctx, None)) + + for _ in range(5): + # Short delays to give other threads a chance to cause breakage + time.sleep(.01) + wp.AddTask((ctx, "Hello world %s" % 999)) + self.assertFalse(wp._active) + + self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15)) + + # Start processing again + wp.SetActive(True) + self.assertTrue(wp._active) + + # Wait for tasks to finish + wp.Quiesce() + self._CheckNoTasks(wp) + self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15) + + self._CheckWorkerCount(wp, 5) + finally: + wp.TerminateWorkers() + self._CheckWorkerCount(wp, 0) + def testChecksum(self): # Tests whether all tasks are run and, since we're only using a single # thread, whether everything is started in order. -- GitLab