diff --git a/lib/workerpool.py b/lib/workerpool.py index d9e2ae348aef4b2d065940b4ba6ea3d72c1b1380..58cce3672a0c415e2e723117b532f55f596f498c 100644 --- a/lib/workerpool.py +++ b/lib/workerpool.py @@ -98,7 +98,7 @@ class BaseWorker(threading.Thread, object): try: assert self._HasRunningTaskUnlocked() - (priority, _, _) = self._current_task + (priority, _, _, _) = self._current_task return priority finally: @@ -163,7 +163,7 @@ class BaseWorker(threading.Thread, object): finally: pool._lock.release() - (priority, _, args) = self._current_task + (priority, _, _, args) = self._current_task try: # Run the actual task assert defer is None @@ -196,8 +196,8 @@ class BaseWorker(threading.Thread, object): if defer: assert self._current_task # Schedule again for later run - (_, _, args) = self._current_task - pool._AddTaskUnlocked(args, defer.priority) + (_, _, _, args) = self._current_task + pool._AddTaskUnlocked(args, defer.priority, None) if self._current_task: self._current_task = None @@ -227,6 +227,18 @@ class WorkerPool(object): added to the pool. Due to the nature of threading, they're not guaranteed to finish in the same order. + @type _tasks: list of tuples + @ivar _tasks: Each tuple has the format (priority, order ID, task ID, + arguments). Priority and order ID are numeric and essentially control the + sort order. The order ID is an increasing number denoting the order in + which tasks are added to the queue. The task ID is controlled by user of + workerpool, see L{AddTask} for details. The task arguments are C{None} for + abandoned tasks, otherwise a sequence of arguments to be passed to + L{BaseWorker.RunTask}). The list must fulfill the heap property (for use by + the C{heapq} module). + @type _taskdata: dict; (task IDs as keys, tuples as values) + @ivar _taskdata: Mapping from task IDs to entries in L{_tasks} + """ def __init__(self, name, num_workers, worker_class): """Constructor for worker pool. @@ -255,6 +267,7 @@ class WorkerPool(object): # Queued tasks self._counter = itertools.count() self._tasks = [] + self._taskdata = {} # Start workers self.Resize(num_workers) @@ -268,42 +281,55 @@ class WorkerPool(object): while self._quiescing: self._pool_to_pool.wait() - def _AddTaskUnlocked(self, args, priority): + def _AddTaskUnlocked(self, args, priority, task_id): """Adds a task to the internal queue. @type args: sequence @param args: Arguments passed to L{BaseWorker.RunTask} @type priority: number @param priority: Task priority + @param task_id: Task ID """ assert isinstance(args, (tuple, list)), "Arguments must be a sequence" assert isinstance(priority, (int, long)), "Priority must be numeric" + task = [priority, self._counter.next(), task_id, args] + + if task_id is not None: + assert task_id not in self._taskdata + # Keep a reference to change priority later if necessary + self._taskdata[task_id] = task + # A counter is used to ensure elements are processed in their incoming # order. For processing they're sorted by priority and then counter. - heapq.heappush(self._tasks, (priority, self._counter.next(), args)) + heapq.heappush(self._tasks, task) # Notify a waiting worker self._pool_to_worker.notify() - def AddTask(self, args, priority=_DEFAULT_PRIORITY): + def AddTask(self, args, priority=_DEFAULT_PRIORITY, task_id=None): """Adds a task to the queue. @type args: sequence @param args: arguments passed to L{BaseWorker.RunTask} @type priority: number @param priority: Task priority + @param task_id: Task ID + @note: The task ID can be essentially anything that can be used as a + dictionary key. Callers, however, must ensure a task ID is unique while a + task is in the pool or while it might return to the pool due to deferring + using L{DeferTask}. """ self._lock.acquire() try: self._WaitWhileQuiescingUnlocked() - self._AddTaskUnlocked(args, priority) + self._AddTaskUnlocked(args, priority, task_id) finally: self._lock.release() - def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY): + def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY, task_id=None): """Add a list of tasks to the queue. @type tasks: list of tuples @@ -311,14 +337,18 @@ class WorkerPool(object): @type priority: number or list of numbers @param priority: Priority for all added tasks or a list with the priority for each task + @type task_id: list + @param task_id: List with the ID for each task + @note: See L{AddTask} for a note on task IDs. """ assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ - "Each task must be a sequence" - + "Each task must be a sequence" assert (isinstance(priority, (int, long)) or compat.all(isinstance(prio, (int, long)) for prio in priority)), \ "Priority must be numeric or be a list of numeric values" + assert task_id is None or isinstance(task_id, (tuple, list)), \ + "Task IDs must be in a sequence" if isinstance(priority, (int, long)): priority = [priority] * len(tasks) @@ -327,15 +357,23 @@ class WorkerPool(object): " number of tasks (%s)" % (len(priority), len(tasks))) + if task_id is None: + task_id = [None] * len(tasks) + elif len(task_id) != len(tasks): + raise errors.ProgrammerError("Number of task IDs (%s) doesn't match" + " number of tasks (%s)" % + (len(task_id), len(tasks))) + self._lock.acquire() try: self._WaitWhileQuiescingUnlocked() assert compat.all(isinstance(prio, (int, long)) for prio in priority) assert len(tasks) == len(priority) + assert len(tasks) == len(task_id) - for args, prio in zip(tasks, priority): - self._AddTaskUnlocked(args, prio) + for (args, prio, tid) in zip(tasks, priority, task_id): + self._AddTaskUnlocked(args, prio, tid) finally: self._lock.release() @@ -380,6 +418,11 @@ class WorkerPool(object): finally: self._worker_to_pool.notifyAll() + # Delete reference + (_, _, task_id, _) = task + if task_id is not None: + del self._taskdata[task_id] + return task logging.debug("Waiting for tasks") diff --git a/test/ganeti.workerpool_unittest.py b/test/ganeti.workerpool_unittest.py index 90576db35efa622266293ad689570bbcf45bcac4..2449cef67cbfefb0fad0a555b2dd5927b1e4368e 100755 --- a/test/ganeti.workerpool_unittest.py +++ b/test/ganeti.workerpool_unittest.py @@ -30,6 +30,7 @@ import random from ganeti import workerpool from ganeti import errors +from ganeti import utils import testutils @@ -290,7 +291,8 @@ class TestWorkerpool(unittest.TestCase): wp._lock.acquire() try: # The task queue must be empty now - self.failUnless(not wp._tasks) + self.assertFalse(wp._tasks) + self.assertFalse(wp._taskdata) finally: wp._lock.release() @@ -367,6 +369,8 @@ class TestWorkerpool(unittest.TestCase): self.assertRaises(errors.ProgrammerError, wp.AddManyTasks, [("x", ), ("y", )], priority=[1] * 5) + self.assertRaises(errors.ProgrammerError, wp.AddManyTasks, + [("x", ), ("y", )], task_id=[1] * 5) wp.Quiesce()