diff --git a/lib/workerpool.py b/lib/workerpool.py index 1ec89af67a1a913eba6240696b60089957fd1644..0887ba4bb1ea94eaab9b6da3537db5e3142dd0ce 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -180,6 +180,13 @@ class WorkerPool(object): # TODO: Implement dynamic resizing? + def _WaitWhileQuiescingUnlocked(self): + """Wait until the worker pool has finished quiescing. + + """ + while self._quiescing: + self._pool_to_pool.wait() + def AddTask(self, *args): """Adds a task to the queue. @@ -188,11 +195,8 @@ class WorkerPool(object): """ self._lock.acquire() try: - # Don't add new tasks while we're quiescing - while self._quiescing: - self._pool_to_pool.wait() + self._WaitWhileQuiescingUnlocked() - # Add task to internal queue self._tasks.append(args) # Wake one idling worker up @@ -200,6 +204,24 @@ class WorkerPool(object): finally: self._lock.release() + def AddManyTasks(self, tasks): + """Add a list of tasks to the queue. + + @type tasks: list of tuples + @param tasks: list of args passed to L{BaseWorker.RunTask} + + """ + self._lock.acquire() + try: + self._WaitWhileQuiescingUnlocked() + + self._tasks.extend(tasks) + + for _ in tasks: + self._pool_to_worker.notify() + finally: + self._lock.release() + def _ShouldWorkerTerminateUnlocked(self, worker): """Returns whether a worker should terminate. diff --git a/test/ganeti.workerpool_unittest.py b/test/ganeti.workerpool_unittest.py index e9cc30c4cee4efcdd88e4852f8b921e256119bd5..f5aee8e93c37b6cc0f747de5cbe20677b1286d9a 100755 --- a/test/ganeti.workerpool_unittest.py +++ b/test/ganeti.workerpool_unittest.py @@ -121,6 +121,23 @@ class TestWorkerpool(unittest.TestCase): wp.TerminateWorkers() self._CheckWorkerCount(wp, 0) + def testAddManyTasks(self): + wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker) + try: + self._CheckWorkerCount(wp, 3) + + wp.AddManyTasks(["Hello world %s" % i for i in range(10)]) + wp.AddTask("A separate hello") + wp.AddTask("Once more, hi!") + wp.AddManyTasks([("Hello world %s" % i, ) for i in range(10)]) + + wp.Quiesce() + + self._CheckNoTasks(wp) + finally: + wp.TerminateWorkers() + self._CheckWorkerCount(wp, 0) + def _CheckNoTasks(self, wp): wp._lock.acquire() try: