diff --git a/lib/workerpool.py b/lib/workerpool.py index 0887ba4bb1ea94eaab9b6da3537db5e3142dd0ce..0ca9155c6f4fb5e2b4d1a8888240828e6e099794 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -27,6 +27,8 @@ import collections import logging import threading +from ganeti import compat + class BaseWorker(threading.Thread, object): """Base worker class for worker pools. @@ -211,6 +213,9 @@ class WorkerPool(object): @param tasks: list of args passed to L{BaseWorker.RunTask} """ + assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ + "Each task must be a sequence" + self._lock.acquire() try: self._WaitWhileQuiescingUnlocked() diff --git a/test/ganeti.workerpool_unittest.py b/test/ganeti.workerpool_unittest.py index ca8fc1ae1ca4c9fde48b439e67d39ec154b0d5f0..cd15123d3c1c9ad3254d92128754aef7fd4722c9 100755 --- a/test/ganeti.workerpool_unittest.py +++ b/test/ganeti.workerpool_unittest.py @@ -169,6 +169,28 @@ class TestWorkerpool(unittest.TestCase): self.assertEquals(ctx.GetDoneTasks(), 22) + def testManyTasksSequence(self): + ctx = CountingContext() + wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker) + try: + self._CheckWorkerCount(wp, 3) + self.assertRaises(AssertionError, wp.AddManyTasks, + ["Hello world %s" % i for i in range(10)]) + self.assertRaises(AssertionError, wp.AddManyTasks, + [i for i in range(10)]) + + wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)]) + wp.AddTask(ctx, "A separate hello") + + wp.Quiesce() + + self._CheckNoTasks(wp) + finally: + wp.TerminateWorkers() + self._CheckWorkerCount(wp, 0) + + self.assertEquals(ctx.GetDoneTasks(), 11) + def _CheckNoTasks(self, wp): wp._lock.acquire() try: