Commit 9a2564e7 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

workerpool: Add method to change task's priority



Using the task ID a pending task's priority can be changed. This will be
used to change the priority of jobs in the workerpool.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarBernardo Dal Seno <bdalseno@google.com>
parent 125b74b2
......@@ -54,6 +54,12 @@ class DeferTask(Exception):
self.priority = priority
class NoSuchTask(Exception):
"""Exception raised when a task can't be found.
"""
class BaseWorker(threading.Thread, object):
"""Base worker class for worker pools.
......@@ -377,6 +383,52 @@ class WorkerPool(object):
finally:
self._lock.release()
def ChangeTaskPriority(self, task_id, priority):
"""Changes a task's priority.
@param task_id: Task ID
@type priority: number
@param priority: New task priority
@raise NoSuchTask: When the task referred by C{task_id} can not be found
(it may never have existed, may have already been processed, or is
currently running)
"""
assert isinstance(priority, (int, long)), "Priority must be numeric"
self._lock.acquire()
try:
logging.debug("About to change priority of task %s to %s",
task_id, priority)
# Find old task
oldtask = self._taskdata.get(task_id, None)
if oldtask is None:
msg = "Task '%s' was not found" % task_id
logging.debug(msg)
raise NoSuchTask(msg)
# Prepare new task
newtask = [priority] + oldtask[1:]
# Mark old entry as abandoned (this doesn't change the sort order and
# therefore doesn't invalidate the heap property of L{self._tasks}).
# See also <http://docs.python.org/library/heapq.html#priority-queue-
# implementation-notes>.
oldtask[-1] = None
# Change reference to new task entry and forget the old one
assert task_id is not None
self._taskdata[task_id] = newtask
# Add a new task with the old number and arguments
heapq.heappush(self._tasks, newtask)
# Notify a waiting worker
self._pool_to_worker.notify()
finally:
self._lock.release()
def SetActive(self, active):
"""Enable/disable processing of tasks.
......@@ -418,8 +470,15 @@ class WorkerPool(object):
finally:
self._worker_to_pool.notifyAll()
(_, _, task_id, args) = task
# If the priority was changed, "args" is None
if args is None:
# Try again
logging.debug("Found abandoned task (%r)", task)
continue
# Delete reference
(_, _, task_id, _) = task
if task_id is not None:
del self._taskdata[task_id]
......
......@@ -132,6 +132,26 @@ class DeferringWorker(workerpool.BaseWorker):
ctx.lock.release()
class PriorityContext:
def __init__(self):
self.lock = threading.Lock()
self.result = []
class PriorityWorker(workerpool.BaseWorker):
def RunTask(self, ctx, data):
ctx.lock.acquire()
try:
ctx.result.append((self.GetCurrentPriority(), data))
finally:
ctx.lock.release()
class NotImplementedWorker(workerpool.BaseWorker):
def RunTask(self):
raise NotImplementedError
class TestWorkerpool(unittest.TestCase):
"""Workerpool tests"""
......@@ -274,6 +294,7 @@ class TestWorkerpool(unittest.TestCase):
["Hello world %s" % i for i in range(10)])
self.assertRaises(AssertionError, wp.AddManyTasks,
[i for i in range(10)])
self.assertRaises(AssertionError, wp.AddManyTasks, [], task_id=0)
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
wp.AddTask((ctx, "A separate hello"))
......@@ -479,6 +500,209 @@ class TestWorkerpool(unittest.TestCase):
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testChangeTaskPriority(self):
wp = workerpool.WorkerPool("Test", 1, PriorityWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = PriorityContext()
# Use static seed for this test
rnd = random.Random(4727)
# Disable processing of tasks
wp.SetActive(False)
# No task ID
self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
None, 0)
# Pre-generate task IDs and priorities
count = 100
task_ids = range(0, count)
priorities = range(200, 200 + count) * 2
rnd.shuffle(task_ids)
rnd.shuffle(priorities)
# Make sure there are some duplicate priorities, but not all
priorities[count * 2 - 10:count * 2 - 1] = \
priorities[count - 10: count - 1]
assert len(priorities) == 2 * count
assert priorities[0:(count - 1)] != priorities[count:(2 * count - 1)]
# Add some tasks; this loop consumes the first half of all previously
# generated priorities
for (idx, task_id) in enumerate(task_ids):
wp.AddTask((ctx, idx),
priority=priorities.pop(),
task_id=task_id)
self.assertEqual(len(wp._tasks), len(task_ids))
self.assertEqual(len(wp._taskdata), len(task_ids))
# Tasks have been added, so half of the priorities should have been
# consumed
assert len(priorities) == len(task_ids)
# Change task priority
expected = []
for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities):
wp.ChangeTaskPriority(task_id, prio)
expected.append((prio, idx))
self.assertEqual(len(wp._taskdata), len(task_ids))
# Half the entries are now abandoned tasks
self.assertEqual(len(wp._tasks), len(task_ids) * 2)
assert len(priorities) == count
assert len(task_ids) == count
# Start processing
wp.SetActive(True)
# Wait for tasks to finish
wp.Quiesce()
self._CheckNoTasks(wp)
for task_id in task_ids:
# All tasks are done
self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
task_id, 0)
# Check result
ctx.lock.acquire()
try:
self.assertEqual(ctx.result, sorted(expected))
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testChangeTaskPriorityInteralStructures(self):
wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker)
try:
self._CheckWorkerCount(wp, 1)
# Use static seed for this test
rnd = random.Random(643)
(num1, num2) = rnd.sample(range(1000), 2)
# Disable processing of tasks
wp.SetActive(False)
self.assertFalse(wp._tasks)
self.assertFalse(wp._taskdata)
# No priority or task ID
wp.AddTask(())
self.assertEqual(wp._tasks, [
[workerpool._DEFAULT_PRIORITY, 0, None, ()],
])
self.assertFalse(wp._taskdata)
# No task ID
wp.AddTask((), priority=7413)
self.assertEqual(wp._tasks, [
[workerpool._DEFAULT_PRIORITY, 0, None, ()],
[7413, 1, None, ()],
])
self.assertFalse(wp._taskdata)
# Start adding real tasks
wp.AddTask((), priority=10267659, task_id=num1)
self.assertEqual(wp._tasks, [
[workerpool._DEFAULT_PRIORITY, 0, None, ()],
[7413, 1, None, ()],
[10267659, 2, num1, ()],
])
self.assertEqual(wp._taskdata, {
num1: [10267659, 2, num1, ()],
})
wp.AddTask((), priority=123, task_id=num2)
self.assertEqual(sorted(wp._tasks), [
[workerpool._DEFAULT_PRIORITY, 0, None, ()],
[123, 3, num2, ()],
[7413, 1, None, ()],
[10267659, 2, num1, ()],
])
self.assertEqual(wp._taskdata, {
num1: [10267659, 2, num1, ()],
num2: [123, 3, num2, ()],
})
wp.ChangeTaskPriority(num1, 100)
self.assertEqual(sorted(wp._tasks), [
[workerpool._DEFAULT_PRIORITY, 0, None, ()],
[100, 2, num1, ()],
[123, 3, num2, ()],
[7413, 1, None, ()],
[10267659, 2, num1, None],
])
self.assertEqual(wp._taskdata, {
num1: [100, 2, num1, ()],
num2: [123, 3, num2, ()],
})
wp.ChangeTaskPriority(num2, 91337)
self.assertEqual(sorted(wp._tasks), [
[workerpool._DEFAULT_PRIORITY, 0, None, ()],
[100, 2, num1, ()],
[123, 3, num2, None],
[7413, 1, None, ()],
[91337, 3, num2, ()],
[10267659, 2, num1, None],
])
self.assertEqual(wp._taskdata, {
num1: [100, 2, num1, ()],
num2: [91337, 3, num2, ()],
})
wp.ChangeTaskPriority(num1, 10139)
self.assertEqual(sorted(wp._tasks), [
[workerpool._DEFAULT_PRIORITY, 0, None, ()],
[100, 2, num1, None],
[123, 3, num2, None],
[7413, 1, None, ()],
[10139, 2, num1, ()],
[91337, 3, num2, ()],
[10267659, 2, num1, None],
])
self.assertEqual(wp._taskdata, {
num1: [10139, 2, num1, ()],
num2: [91337, 3, num2, ()],
})
# Change to the same priority once again
wp.ChangeTaskPriority(num1, 10139)
self.assertEqual(sorted(wp._tasks), [
[workerpool._DEFAULT_PRIORITY, 0, None, ()],
[100, 2, num1, None],
[123, 3, num2, None],
[7413, 1, None, ()],
[10139, 2, num1, None],
[10139, 2, num1, ()],
[91337, 3, num2, ()],
[10267659, 2, num1, None],
])
self.assertEqual(wp._taskdata, {
num1: [10139, 2, num1, ()],
num2: [91337, 3, num2, ()],
})
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
if __name__ == "__main__":
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment