Skip to content
Snippets Groups Projects
Commit 125b74b2 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

workerpool: Change data structure for priority change

To prepare for the addition of a new function allowing changing a
pending task's priority, the internal data structure is slightly
changed. The (optional) task ID is stored as part of the task entry. A
new dictionary provides a mapping from the task ID to its task entry. If
the task ID is None, the entry is not added to the map.

Task entries used to be a tuple, but since modifying the priority
requires changing an entry, they are changed to lists in this patch.
Tuple items can not be modified.

The underlying idea is from [1].

[1]:
http://docs.python.org/library/heapq.html#priority-queue-implementation-notes



Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarBernardo Dal Seno <bdalseno@google.com>
parent 39f0eea5
No related branches found
Tags v2.0.0
No related merge requests found
......@@ -98,7 +98,7 @@ class BaseWorker(threading.Thread, object):
try:
assert self._HasRunningTaskUnlocked()
(priority, _, _) = self._current_task
(priority, _, _, _) = self._current_task
return priority
finally:
......@@ -163,7 +163,7 @@ class BaseWorker(threading.Thread, object):
finally:
pool._lock.release()
(priority, _, args) = self._current_task
(priority, _, _, args) = self._current_task
try:
# Run the actual task
assert defer is None
......@@ -196,8 +196,8 @@ class BaseWorker(threading.Thread, object):
if defer:
assert self._current_task
# Schedule again for later run
(_, _, args) = self._current_task
pool._AddTaskUnlocked(args, defer.priority)
(_, _, _, args) = self._current_task
pool._AddTaskUnlocked(args, defer.priority, None)
if self._current_task:
self._current_task = None
......@@ -227,6 +227,18 @@ class WorkerPool(object):
added to the pool. Due to the nature of threading, they're not
guaranteed to finish in the same order.
@type _tasks: list of tuples
@ivar _tasks: Each tuple has the format (priority, order ID, task ID,
arguments). Priority and order ID are numeric and essentially control the
sort order. The order ID is an increasing number denoting the order in
which tasks are added to the queue. The task ID is controlled by user of
workerpool, see L{AddTask} for details. The task arguments are C{None} for
abandoned tasks, otherwise a sequence of arguments to be passed to
L{BaseWorker.RunTask}). The list must fulfill the heap property (for use by
the C{heapq} module).
@type _taskdata: dict; (task IDs as keys, tuples as values)
@ivar _taskdata: Mapping from task IDs to entries in L{_tasks}
"""
def __init__(self, name, num_workers, worker_class):
"""Constructor for worker pool.
......@@ -255,6 +267,7 @@ class WorkerPool(object):
# Queued tasks
self._counter = itertools.count()
self._tasks = []
self._taskdata = {}
# Start workers
self.Resize(num_workers)
......@@ -268,42 +281,55 @@ class WorkerPool(object):
while self._quiescing:
self._pool_to_pool.wait()
def _AddTaskUnlocked(self, args, priority):
def _AddTaskUnlocked(self, args, priority, task_id):
"""Adds a task to the internal queue.
@type args: sequence
@param args: Arguments passed to L{BaseWorker.RunTask}
@type priority: number
@param priority: Task priority
@param task_id: Task ID
"""
assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
assert isinstance(priority, (int, long)), "Priority must be numeric"
task = [priority, self._counter.next(), task_id, args]
if task_id is not None:
assert task_id not in self._taskdata
# Keep a reference to change priority later if necessary
self._taskdata[task_id] = task
# A counter is used to ensure elements are processed in their incoming
# order. For processing they're sorted by priority and then counter.
heapq.heappush(self._tasks, (priority, self._counter.next(), args))
heapq.heappush(self._tasks, task)
# Notify a waiting worker
self._pool_to_worker.notify()
def AddTask(self, args, priority=_DEFAULT_PRIORITY):
def AddTask(self, args, priority=_DEFAULT_PRIORITY, task_id=None):
"""Adds a task to the queue.
@type args: sequence
@param args: arguments passed to L{BaseWorker.RunTask}
@type priority: number
@param priority: Task priority
@param task_id: Task ID
@note: The task ID can be essentially anything that can be used as a
dictionary key. Callers, however, must ensure a task ID is unique while a
task is in the pool or while it might return to the pool due to deferring
using L{DeferTask}.
"""
self._lock.acquire()
try:
self._WaitWhileQuiescingUnlocked()
self._AddTaskUnlocked(args, priority)
self._AddTaskUnlocked(args, priority, task_id)
finally:
self._lock.release()
def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY, task_id=None):
"""Add a list of tasks to the queue.
@type tasks: list of tuples
......@@ -311,14 +337,18 @@ class WorkerPool(object):
@type priority: number or list of numbers
@param priority: Priority for all added tasks or a list with the priority
for each task
@type task_id: list
@param task_id: List with the ID for each task
@note: See L{AddTask} for a note on task IDs.
"""
assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
"Each task must be a sequence"
"Each task must be a sequence"
assert (isinstance(priority, (int, long)) or
compat.all(isinstance(prio, (int, long)) for prio in priority)), \
"Priority must be numeric or be a list of numeric values"
assert task_id is None or isinstance(task_id, (tuple, list)), \
"Task IDs must be in a sequence"
if isinstance(priority, (int, long)):
priority = [priority] * len(tasks)
......@@ -327,15 +357,23 @@ class WorkerPool(object):
" number of tasks (%s)" %
(len(priority), len(tasks)))
if task_id is None:
task_id = [None] * len(tasks)
elif len(task_id) != len(tasks):
raise errors.ProgrammerError("Number of task IDs (%s) doesn't match"
" number of tasks (%s)" %
(len(task_id), len(tasks)))
self._lock.acquire()
try:
self._WaitWhileQuiescingUnlocked()
assert compat.all(isinstance(prio, (int, long)) for prio in priority)
assert len(tasks) == len(priority)
assert len(tasks) == len(task_id)
for args, prio in zip(tasks, priority):
self._AddTaskUnlocked(args, prio)
for (args, prio, tid) in zip(tasks, priority, task_id):
self._AddTaskUnlocked(args, prio, tid)
finally:
self._lock.release()
......@@ -380,6 +418,11 @@ class WorkerPool(object):
finally:
self._worker_to_pool.notifyAll()
# Delete reference
(_, _, task_id, _) = task
if task_id is not None:
del self._taskdata[task_id]
return task
logging.debug("Waiting for tasks")
......
......@@ -30,6 +30,7 @@ import random
from ganeti import workerpool
from ganeti import errors
from ganeti import utils
import testutils
......@@ -290,7 +291,8 @@ class TestWorkerpool(unittest.TestCase):
wp._lock.acquire()
try:
# The task queue must be empty now
self.failUnless(not wp._tasks)
self.assertFalse(wp._tasks)
self.assertFalse(wp._taskdata)
finally:
wp._lock.release()
......@@ -367,6 +369,8 @@ class TestWorkerpool(unittest.TestCase):
self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
[("x", ), ("y", )], priority=[1] * 5)
self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
[("x", ), ("y", )], task_id=[1] * 5)
wp.Quiesce()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment