From bba69414ff37daf47bbcbff539b0ba8df9d7332c Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Thu, 1 Nov 2012 19:06:02 +0100 Subject: [PATCH] workerpool: Preserve task number when deferring When a task is deferred it should receive the same task ID upon being returned to the pool. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Bernardo Dal Seno <bdalseno@google.com> --- lib/workerpool.py | 22 ++++++++++++++++++++-- test/ganeti.workerpool_unittest.py | 25 ++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/lib/workerpool.py b/lib/workerpool.py index d77825fcf..6b558ce2c 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -133,6 +133,22 @@ class BaseWorker(threading.Thread, object): """ return (self._current_task is not None) + def _GetCurrentOrderAndTaskId(self): + """Returns the order and task ID of the current task. + + Should only be called from within L{RunTask}. + + """ + self.pool._lock.acquire() + try: + assert self._HasRunningTaskUnlocked() + + (_, order_id, task_id, _) = self._current_task + + return (order_id, task_id) + finally: + self.pool._lock.release() + def run(self): """Main thread function. @@ -202,8 +218,8 @@ class BaseWorker(threading.Thread, object): if defer: assert self._current_task # Schedule again for later run - (_, _, _, args) = self._current_task - pool._AddTaskUnlocked(args, defer.priority, None) + (_, _, task_id, args) = self._current_task + pool._AddTaskUnlocked(args, defer.priority, task_id) if self._current_task: self._current_task = None @@ -299,6 +315,8 @@ class WorkerPool(object): """ assert isinstance(args, (tuple, list)), "Arguments must be a sequence" assert isinstance(priority, (int, long)), "Priority must be numeric" + assert task_id is None or isinstance(task_id, (int, long)), \ + "Task ID must be numeric or None" task = [priority, self._counter.next(), task_id, args] diff --git a/test/ganeti.workerpool_unittest.py b/test/ganeti.workerpool_unittest.py index f890db0a6..8f35a69a3 100755 --- a/test/ganeti.workerpool_unittest.py +++ b/test/ganeti.workerpool_unittest.py @@ -31,6 +31,7 @@ import random from ganeti import workerpool from ganeti import errors from ganeti import utils +from ganeti import compat import testutils @@ -114,12 +115,16 @@ class DeferringTaskContext: self.lock = threading.Lock() self.prioresult = {} self.samepriodefer = {} + self.num2ordertaskid = {} class DeferringWorker(workerpool.BaseWorker): def RunTask(self, ctx, num, targetprio): ctx.lock.acquire() try: + otilst = ctx.num2ordertaskid.setdefault(num, []) + otilst.append(self._GetCurrentOrderAndTaskId()) + if num in ctx.samepriodefer: del ctx.samepriodefer[num] raise workerpool.DeferTask() @@ -466,6 +471,7 @@ class TestWorkerpool(unittest.TestCase): rnd = random.Random(14921) data = {} + num2taskid = {} for i in range(1, 333): ctx.lock.acquire() try: @@ -475,7 +481,9 @@ class TestWorkerpool(unittest.TestCase): ctx.lock.release() prio = int(rnd.random() * 30) - wp.AddTask((ctx, i, prio), priority=50) + num2taskid[i] = 1000 * i + wp.AddTask((ctx, i, prio), priority=50, + task_id=num2taskid[i]) data.setdefault(prio, set()).add(i) # Cause some distortion @@ -492,6 +500,21 @@ class TestWorkerpool(unittest.TestCase): ctx.lock.acquire() try: self.assertEqual(data, ctx.prioresult) + + all_order_ids = [] + + for (num, numordertaskid) in ctx.num2ordertaskid.items(): + order_ids = map(compat.fst, numordertaskid) + self.assertFalse(utils.FindDuplicates(order_ids), + msg="Order ID has been reused") + all_order_ids.extend(order_ids) + + for task_id in map(compat.snd, numordertaskid): + self.assertEqual(task_id, num2taskid[num], + msg=("Task %s used different task IDs" % num)) + + self.assertFalse(utils.FindDuplicates(all_order_ids), + msg="Order ID has been reused") finally: ctx.lock.release() -- GitLab