Skip to content
Snippets Groups Projects
Commit 52c47e4e authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

workerpool: Add support for task priority


To add job priorities, the worker pool underlying the job queue must
support priorities per task. This patch adds them to the worker pool.

Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent ea8ac9c9
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,54 @@ As for 2.1 and 2.2 we divide the 2.3 design into three areas: ...@@ -20,6 +20,54 @@ As for 2.1 and 2.2 we divide the 2.3 design into three areas:
Core changes Core changes
------------ ------------
Job priorities
~~~~~~~~~~~~~~
Current state and shortcomings
++++++++++++++++++++++++++++++
.. TODO: Describe current situation
Proposed changes
++++++++++++++++
.. TODO: Describe changes to job queue and potentially client programs
Worker pool
^^^^^^^^^^^
To support job priorities in the job queue, the worker pool underlying
the job queue must be enhanced to support task priorities. Currently
tasks are processed in the order they are added to the queue (but, due
to their nature, they don't necessarily finish in that order). All tasks
are equal. To support tasks with higher or lower priority, a few changes
have to be made to the queue inside a worker pool.
Each task is assigned a priority when added to the queue. This priority
can not be changed until the task is executed (this is fine as in all
current use-cases, tasks are added to a pool and then forgotten about
until they're done).
A task's priority can be compared to Unix' process priorities. The lower
the priority number, the closer to the queue's front it is. A task with
priority 0 is going to be run before one with priority 10. Tasks with
the same priority are executed in the order in which they were added.
While a task is running it can query its own priority. If it's not ready
yet for finishing, it can raise an exception to defer itself, optionally
changing its own priority. This is useful for the following cases:
- A task is trying to acquire locks, but those locks are still held by
other tasks. By deferring itself, the task gives others a chance to
run. This is especially useful when all workers are busy.
- If a task decides it hasn't gotten its locks in a long time, it can
start to increase its own priority.
- Tasks waiting for long-running operations running asynchronously could
defer themselves while waiting for a long-running operation.
With these changes, the job queue will be able to implement per-job
priorities.
Feature changes Feature changes
--------------- ---------------
......
...@@ -23,14 +23,34 @@ ...@@ -23,14 +23,34 @@
""" """
import collections
import logging import logging
import threading import threading
import heapq
from ganeti import compat from ganeti import compat
from ganeti import errors
_TERMINATE = object() _TERMINATE = object()
_DEFAULT_PRIORITY = 0
class DeferTask(Exception):
"""Special exception class to defer a task.
This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
task. Optionally, the priority of the task can be changed.
"""
def __init__(self, priority=None):
"""Initializes this class.
@type priority: number
@param priority: New task priority (None means no change)
"""
Exception.__init__(self)
self.priority = priority
class BaseWorker(threading.Thread, object): class BaseWorker(threading.Thread, object):
...@@ -64,6 +84,22 @@ class BaseWorker(threading.Thread, object): ...@@ -64,6 +84,22 @@ class BaseWorker(threading.Thread, object):
finally: finally:
self.pool._lock.release() self.pool._lock.release()
def GetCurrentPriority(self):
"""Returns the priority of the current task.
Should only be called from within L{RunTask}.
"""
self.pool._lock.acquire()
try:
assert self._HasRunningTaskUnlocked()
(priority, _, _) = self._current_task
return priority
finally:
self.pool._lock.release()
def _HasRunningTaskUnlocked(self): def _HasRunningTaskUnlocked(self):
"""Returns whether this worker is currently running a task. """Returns whether this worker is currently running a task.
...@@ -80,6 +116,8 @@ class BaseWorker(threading.Thread, object): ...@@ -80,6 +116,8 @@ class BaseWorker(threading.Thread, object):
while True: while True:
assert self._current_task is None assert self._current_task is None
defer = None
try: try:
# Wait on lock to be told either to terminate or to do a task # Wait on lock to be told either to terminate or to do a task
pool._lock.acquire() pool._lock.acquire()
...@@ -104,11 +142,24 @@ class BaseWorker(threading.Thread, object): ...@@ -104,11 +142,24 @@ class BaseWorker(threading.Thread, object):
finally: finally:
pool._lock.release() pool._lock.release()
# Run the actual task (priority, _, args) = self._current_task
try: try:
logging.debug("Starting task %r", self._current_task) # Run the actual task
self.RunTask(*self._current_task) assert defer is None
logging.debug("Done with task %r", self._current_task) logging.debug("Starting task %r, priority %s", args, priority)
self.RunTask(*args) # pylint: disable-msg=W0142
logging.debug("Done with task %r, priority %s", args, priority)
except DeferTask, err:
defer = err
if defer.priority is None:
# Use same priority
defer.priority = priority
logging.debug("Deferring task %r, new priority %s", defer.priority)
assert self._HasRunningTaskUnlocked()
except: # pylint: disable-msg=W0702 except: # pylint: disable-msg=W0702
logging.exception("Caught unhandled exception") logging.exception("Caught unhandled exception")
...@@ -117,6 +168,12 @@ class BaseWorker(threading.Thread, object): ...@@ -117,6 +168,12 @@ class BaseWorker(threading.Thread, object):
# Notify pool # Notify pool
pool._lock.acquire() pool._lock.acquire()
try: try:
if defer:
assert self._current_task
# Schedule again for later run
(_, _, args) = self._current_task
pool._AddTaskUnlocked(args, defer.priority)
if self._current_task: if self._current_task:
self._current_task = None self._current_task = None
pool._worker_to_pool.notifyAll() pool._worker_to_pool.notifyAll()
...@@ -170,7 +227,8 @@ class WorkerPool(object): ...@@ -170,7 +227,8 @@ class WorkerPool(object):
self._termworkers = [] self._termworkers = []
# Queued tasks # Queued tasks
self._tasks = collections.deque() self._counter = 0
self._tasks = []
# Start workers # Start workers
self.Resize(num_workers) self.Resize(num_workers)
...@@ -184,44 +242,77 @@ class WorkerPool(object): ...@@ -184,44 +242,77 @@ class WorkerPool(object):
while self._quiescing: while self._quiescing:
self._pool_to_pool.wait() self._pool_to_pool.wait()
def _AddTaskUnlocked(self, args): def _AddTaskUnlocked(self, args, priority):
"""Adds a task to the internal queue.
@type args: sequence
@param args: Arguments passed to L{BaseWorker.RunTask}
@type priority: number
@param priority: Task priority
"""
assert isinstance(args, (tuple, list)), "Arguments must be a sequence" assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
assert isinstance(priority, (int, long)), "Priority must be numeric"
self._tasks.append(args) # This counter is used to ensure elements are processed in their
# incoming order. For processing they're sorted by priority and then
# counter.
self._counter += 1
heapq.heappush(self._tasks, (priority, self._counter, args))
# Notify a waiting worker # Notify a waiting worker
self._pool_to_worker.notify() self._pool_to_worker.notify()
def AddTask(self, args): def AddTask(self, args, priority=_DEFAULT_PRIORITY):
"""Adds a task to the queue. """Adds a task to the queue.
@type args: sequence @type args: sequence
@param args: arguments passed to L{BaseWorker.RunTask} @param args: arguments passed to L{BaseWorker.RunTask}
@type priority: number
@param priority: Task priority
""" """
self._lock.acquire() self._lock.acquire()
try: try:
self._WaitWhileQuiescingUnlocked() self._WaitWhileQuiescingUnlocked()
self._AddTaskUnlocked(args) self._AddTaskUnlocked(args, priority)
finally: finally:
self._lock.release() self._lock.release()
def AddManyTasks(self, tasks): def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
"""Add a list of tasks to the queue. """Add a list of tasks to the queue.
@type tasks: list of tuples @type tasks: list of tuples
@param tasks: list of args passed to L{BaseWorker.RunTask} @param tasks: list of args passed to L{BaseWorker.RunTask}
@type priority: number or list of numbers
@param priority: Priority for all added tasks or a list with the priority
for each task
""" """
assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \ assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
"Each task must be a sequence" "Each task must be a sequence"
assert (isinstance(priority, (int, long)) or
compat.all(isinstance(prio, (int, long)) for prio in priority)), \
"Priority must be numeric or be a list of numeric values"
if isinstance(priority, (int, long)):
priority = [priority] * len(tasks)
elif len(priority) != len(tasks):
raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
" number of tasks (%s)" %
(len(priority), len(tasks)))
self._lock.acquire() self._lock.acquire()
try: try:
self._WaitWhileQuiescingUnlocked() self._WaitWhileQuiescingUnlocked()
for args in tasks: assert compat.all(isinstance(prio, (int, long)) for prio in priority)
self._AddTaskUnlocked(args) assert len(tasks) == len(priority)
for args, priority in zip(tasks, priority):
self._AddTaskUnlocked(args, priority)
finally: finally:
self._lock.release() self._lock.release()
...@@ -254,7 +345,7 @@ class WorkerPool(object): ...@@ -254,7 +345,7 @@ class WorkerPool(object):
# Get task from queue and tell pool about it # Get task from queue and tell pool about it
try: try:
return self._tasks.popleft() return heapq.heappop(self._tasks)
finally: finally:
self._worker_to_pool.notifyAll() self._worker_to_pool.notifyAll()
......
#!/usr/bin/python #!/usr/bin/python
# #
# Copyright (C) 2008 Google Inc. # Copyright (C) 2008, 2009, 2010 Google Inc.
# #
# This program is free software; you can redistribute it and/or modify # This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by # it under the terms of the GNU General Public License as published by
...@@ -26,13 +26,15 @@ import threading ...@@ -26,13 +26,15 @@ import threading
import time import time
import sys import sys
import zlib import zlib
import random
from ganeti import workerpool from ganeti import workerpool
from ganeti import errors
import testutils import testutils
class CountingContext(object):
class CountingContext(object):
def __init__(self): def __init__(self):
self._lock = threading.Condition(threading.Lock()) self._lock = threading.Condition(threading.Lock())
self.done = 0 self.done = 0
...@@ -57,7 +59,6 @@ class CountingContext(object): ...@@ -57,7 +59,6 @@ class CountingContext(object):
class CountingBaseWorker(workerpool.BaseWorker): class CountingBaseWorker(workerpool.BaseWorker):
def RunTask(self, ctx, text): def RunTask(self, ctx, text):
ctx.DoneTask() ctx.DoneTask()
...@@ -83,6 +84,46 @@ class ChecksumBaseWorker(workerpool.BaseWorker): ...@@ -83,6 +84,46 @@ class ChecksumBaseWorker(workerpool.BaseWorker):
ctx.lock.release() ctx.lock.release()
class ListBuilderContext:
def __init__(self):
self.lock = threading.Lock()
self.result = []
self.prioresult = {}
class ListBuilderWorker(workerpool.BaseWorker):
def RunTask(self, ctx, data):
ctx.lock.acquire()
try:
ctx.result.append((self.GetCurrentPriority(), data))
ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
finally:
ctx.lock.release()
class DeferringTaskContext:
def __init__(self):
self.lock = threading.Lock()
self.prioresult = {}
self.samepriodefer = {}
class DeferringWorker(workerpool.BaseWorker):
def RunTask(self, ctx, num, targetprio):
ctx.lock.acquire()
try:
if num in ctx.samepriodefer:
del ctx.samepriodefer[num]
raise workerpool.DeferTask()
if self.GetCurrentPriority() > targetprio:
raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
finally:
ctx.lock.release()
class TestWorkerpool(unittest.TestCase): class TestWorkerpool(unittest.TestCase):
"""Workerpool tests""" """Workerpool tests"""
...@@ -206,6 +247,220 @@ class TestWorkerpool(unittest.TestCase): ...@@ -206,6 +247,220 @@ class TestWorkerpool(unittest.TestCase):
finally: finally:
wp._lock.release() wp._lock.release()
def testPriorityChecksum(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order and respects the priority
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = ChecksumContext()
data = {}
tasks = []
priorities = []
for i in range(1, 333):
prio = i % 7
tasks.append((ctx, i))
priorities.append(prio)
data.setdefault(prio, []).append(i)
wp.AddManyTasks(tasks, priority=priorities)
wp.Quiesce()
self._CheckNoTasks(wp)
# Check sum
ctx.lock.acquire()
try:
checksum = ChecksumContext.CHECKSUM_START
for priority in sorted(data.keys()):
for i in data[priority]:
checksum = ChecksumContext.UpdateChecksum(checksum, i)
self.assertEqual(checksum, ctx.checksum)
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testPriorityListManyTasks(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order and respects the priority
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = ListBuilderContext()
# Use static seed for this test
rnd = random.Random(0)
data = {}
tasks = []
priorities = []
for i in range(1, 333):
prio = int(rnd.random() * 10)
tasks.append((ctx, i))
priorities.append(prio)
data.setdefault(prio, []).append((prio, i))
wp.AddManyTasks(tasks, priority=priorities)
self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
[("x", ), ("y", )], priority=[1] * 5)
wp.Quiesce()
self._CheckNoTasks(wp)
# Check result
ctx.lock.acquire()
try:
expresult = []
for priority in sorted(data.keys()):
expresult.extend(data[priority])
self.assertEqual(expresult, ctx.result)
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testPriorityListSingleTasks(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order and respects the priority
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = ListBuilderContext()
# Use static seed for this test
rnd = random.Random(26279)
data = {}
for i in range(1, 333):
prio = int(rnd.random() * 30)
wp.AddTask((ctx, i), priority=prio)
data.setdefault(prio, []).append(i)
# Cause some distortion
if i % 11 == 0:
time.sleep(.001)
if i % 41 == 0:
wp.Quiesce()
wp.Quiesce()
self._CheckNoTasks(wp)
# Check result
ctx.lock.acquire()
try:
self.assertEqual(data, ctx.prioresult)
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testPriorityListSingleTasks(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order and respects the priority
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = ListBuilderContext()
# Use static seed for this test
rnd = random.Random(26279)
data = {}
for i in range(1, 333):
prio = int(rnd.random() * 30)
wp.AddTask((ctx, i), priority=prio)
data.setdefault(prio, []).append(i)
# Cause some distortion
if i % 11 == 0:
time.sleep(.001)
if i % 41 == 0:
wp.Quiesce()
wp.Quiesce()
self._CheckNoTasks(wp)
# Check result
ctx.lock.acquire()
try:
self.assertEqual(data, ctx.prioresult)
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testDeferTask(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order and respects the priority
wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = DeferringTaskContext()
# Use static seed for this test
rnd = random.Random(14921)
data = {}
for i in range(1, 333):
ctx.lock.acquire()
try:
if i % 5 == 0:
ctx.samepriodefer[i] = True
finally:
ctx.lock.release()
prio = int(rnd.random() * 30)
wp.AddTask((ctx, i, prio), priority=50)
data.setdefault(prio, set()).add(i)
# Cause some distortion
if i % 24 == 0:
time.sleep(.001)
if i % 31 == 0:
wp.Quiesce()
wp.Quiesce()
self._CheckNoTasks(wp)
# Check result
ctx.lock.acquire()
try:
self.assertEqual(data, ctx.prioresult)
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
if __name__ == '__main__': if __name__ == '__main__':
testutils.GanetiTestProgram() testutils.GanetiTestProgram()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment