Skip to content
Snippets Groups Projects
Commit bba69414 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

workerpool: Preserve task number when deferring


When a task is deferred it should receive the same task ID upon being
returned to the pool.

Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarBernardo Dal Seno <bdalseno@google.com>
parent 9a2564e7
No related branches found
No related tags found
No related merge requests found
...@@ -133,6 +133,22 @@ class BaseWorker(threading.Thread, object): ...@@ -133,6 +133,22 @@ class BaseWorker(threading.Thread, object):
""" """
return (self._current_task is not None) return (self._current_task is not None)
def _GetCurrentOrderAndTaskId(self):
"""Returns the order and task ID of the current task.
Should only be called from within L{RunTask}.
"""
self.pool._lock.acquire()
try:
assert self._HasRunningTaskUnlocked()
(_, order_id, task_id, _) = self._current_task
return (order_id, task_id)
finally:
self.pool._lock.release()
def run(self): def run(self):
"""Main thread function. """Main thread function.
...@@ -202,8 +218,8 @@ class BaseWorker(threading.Thread, object): ...@@ -202,8 +218,8 @@ class BaseWorker(threading.Thread, object):
if defer: if defer:
assert self._current_task assert self._current_task
# Schedule again for later run # Schedule again for later run
(_, _, _, args) = self._current_task (_, _, task_id, args) = self._current_task
pool._AddTaskUnlocked(args, defer.priority, None) pool._AddTaskUnlocked(args, defer.priority, task_id)
if self._current_task: if self._current_task:
self._current_task = None self._current_task = None
...@@ -299,6 +315,8 @@ class WorkerPool(object): ...@@ -299,6 +315,8 @@ class WorkerPool(object):
""" """
assert isinstance(args, (tuple, list)), "Arguments must be a sequence" assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
assert isinstance(priority, (int, long)), "Priority must be numeric" assert isinstance(priority, (int, long)), "Priority must be numeric"
assert task_id is None or isinstance(task_id, (int, long)), \
"Task ID must be numeric or None"
task = [priority, self._counter.next(), task_id, args] task = [priority, self._counter.next(), task_id, args]
......
...@@ -31,6 +31,7 @@ import random ...@@ -31,6 +31,7 @@ import random
from ganeti import workerpool from ganeti import workerpool
from ganeti import errors from ganeti import errors
from ganeti import utils from ganeti import utils
from ganeti import compat
import testutils import testutils
...@@ -114,12 +115,16 @@ class DeferringTaskContext: ...@@ -114,12 +115,16 @@ class DeferringTaskContext:
self.lock = threading.Lock() self.lock = threading.Lock()
self.prioresult = {} self.prioresult = {}
self.samepriodefer = {} self.samepriodefer = {}
self.num2ordertaskid = {}
class DeferringWorker(workerpool.BaseWorker): class DeferringWorker(workerpool.BaseWorker):
def RunTask(self, ctx, num, targetprio): def RunTask(self, ctx, num, targetprio):
ctx.lock.acquire() ctx.lock.acquire()
try: try:
otilst = ctx.num2ordertaskid.setdefault(num, [])
otilst.append(self._GetCurrentOrderAndTaskId())
if num in ctx.samepriodefer: if num in ctx.samepriodefer:
del ctx.samepriodefer[num] del ctx.samepriodefer[num]
raise workerpool.DeferTask() raise workerpool.DeferTask()
...@@ -466,6 +471,7 @@ class TestWorkerpool(unittest.TestCase): ...@@ -466,6 +471,7 @@ class TestWorkerpool(unittest.TestCase):
rnd = random.Random(14921) rnd = random.Random(14921)
data = {} data = {}
num2taskid = {}
for i in range(1, 333): for i in range(1, 333):
ctx.lock.acquire() ctx.lock.acquire()
try: try:
...@@ -475,7 +481,9 @@ class TestWorkerpool(unittest.TestCase): ...@@ -475,7 +481,9 @@ class TestWorkerpool(unittest.TestCase):
ctx.lock.release() ctx.lock.release()
prio = int(rnd.random() * 30) prio = int(rnd.random() * 30)
wp.AddTask((ctx, i, prio), priority=50) num2taskid[i] = 1000 * i
wp.AddTask((ctx, i, prio), priority=50,
task_id=num2taskid[i])
data.setdefault(prio, set()).add(i) data.setdefault(prio, set()).add(i)
# Cause some distortion # Cause some distortion
...@@ -492,6 +500,21 @@ class TestWorkerpool(unittest.TestCase): ...@@ -492,6 +500,21 @@ class TestWorkerpool(unittest.TestCase):
ctx.lock.acquire() ctx.lock.acquire()
try: try:
self.assertEqual(data, ctx.prioresult) self.assertEqual(data, ctx.prioresult)
all_order_ids = []
for (num, numordertaskid) in ctx.num2ordertaskid.items():
order_ids = map(compat.fst, numordertaskid)
self.assertFalse(utils.FindDuplicates(order_ids),
msg="Order ID has been reused")
all_order_ids.extend(order_ids)
for task_id in map(compat.snd, numordertaskid):
self.assertEqual(task_id, num2taskid[num],
msg=("Task %s used different task IDs" % num))
self.assertFalse(utils.FindDuplicates(all_order_ids),
msg="Order ID has been reused")
finally: finally:
ctx.lock.release() ctx.lock.release()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment