diff --git a/lib/workerpool.py b/lib/workerpool.py index 91f213bc20aa2dc4e53a9f39d59e4dfc27009d1c..6d533fe2d8cbca33888e9ea1c6cda439baf506cb 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -246,6 +246,7 @@ class WorkerPool(object): self._last_worker_id = 0 self._workers = [] self._quiescing = False + self._active = True # Terminating workers self._termworkers = [] @@ -340,6 +341,28 @@ class WorkerPool(object): finally: self._lock.release() + def SetActive(self, active): + """Enable/disable processing of tasks. + + This is different from L{Quiesce} in the sense that this function just + changes an internal flag and doesn't wait for the queue to be empty. Tasks + already being processed continue normally, but no new tasks will be + started. New tasks can still be added. + + @type active: bool + @param active: Whether tasks should be processed + + """ + self._lock.acquire() + try: + self._active = active + + if active: + # Tell all workers to continue processing + self._pool_to_worker.notifyAll() + finally: + self._lock.release() + def _WaitForTaskUnlocked(self, worker): """Waits for a task for a worker. @@ -351,7 +374,7 @@ class WorkerPool(object): return _TERMINATE # We only wait if there's no task for us. - if not self._tasks: + if not (self._active and self._tasks): logging.debug("Waiting for tasks") while True: @@ -364,7 +387,8 @@ class WorkerPool(object): if self._ShouldWorkerTerminateUnlocked(worker): return _TERMINATE - if self._tasks: + # Just loop if pool is not processing tasks at this time + if self._active and self._tasks: break # Get task from queue and tell pool about it diff --git a/test/ganeti.workerpool_unittest.py b/test/ganeti.workerpool_unittest.py index 89b3b1adfd9a5208450cf1e6b0b1b9a863c38e47..1ad8d741d6e408111c011be519a557327d53b272 100755 --- a/test/ganeti.workerpool_unittest.py +++ b/test/ganeti.workerpool_unittest.py @@ -170,6 +170,53 @@ class TestWorkerpool(unittest.TestCase): wp.TerminateWorkers() self._CheckWorkerCount(wp, 0) + def testActive(self): + ctx = CountingContext() + wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker) + try: + self._CheckWorkerCount(wp, 5) + self.assertTrue(wp._active) + + # Process some tasks + for _ in range(10): + wp.AddTask((ctx, None)) + + wp.Quiesce() + self._CheckNoTasks(wp) + self.assertEquals(ctx.GetDoneTasks(), 10) + + # Repeat a few times + for count in range(10): + # Deactivate pool + wp.SetActive(False) + self._CheckNoTasks(wp) + + # Queue some more tasks + for _ in range(10): + wp.AddTask((ctx, None)) + + for _ in range(5): + # Short delays to give other threads a chance to cause breakage + time.sleep(.01) + wp.AddTask((ctx, "Hello world %s" % 999)) + self.assertFalse(wp._active) + + self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15)) + + # Start processing again + wp.SetActive(True) + self.assertTrue(wp._active) + + # Wait for tasks to finish + wp.Quiesce() + self._CheckNoTasks(wp) + self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15) + + self._CheckWorkerCount(wp, 5) + finally: + wp.TerminateWorkers() + self._CheckWorkerCount(wp, 0) + def testChecksum(self): # Tests whether all tasks are run and, since we're only using a single # thread, whether everything is started in order.