diff --git a/doc/design-2.3.rst b/doc/design-2.3.rst
index efd8bf254bf73deb6cf8c2b9ea15728282244549..71a9d4d93d18ca9aa747e4f12d293d3b6eed9daa 100644
--- a/doc/design-2.3.rst
+++ b/doc/design-2.3.rst
@@ -20,6 +20,54 @@ As for 2.1 and 2.2 we divide the 2.3 design into three areas:
 Core changes
+Job priorities
+Current state and shortcomings
+.. TODO: Describe current situation
+Proposed changes
+.. TODO: Describe changes to job queue and potentially client programs
+Worker pool
+To support job priorities in the job queue, the worker pool underlying
+the job queue must be enhanced to support task priorities. Currently
+tasks are processed in the order they are added to the queue (but, due
+to their nature, they don't necessarily finish in that order). All tasks
+are equal. To support tasks with higher or lower priority, a few changes
+have to be made to the queue inside a worker pool.
+Each task is assigned a priority when added to the queue. This priority
+can not be changed until the task is executed (this is fine as in all
+current use-cases, tasks are added to a pool and then forgotten about
+until they're done).
+A task's priority can be compared to Unix' process priorities. The lower
+the priority number, the closer to the queue's front it is. A task with
+priority 0 is going to be run before one with priority 10. Tasks with
+the same priority are executed in the order in which they were added.
+While a task is running it can query its own priority. If it's not ready
+yet for finishing, it can raise an exception to defer itself, optionally
+changing its own priority. This is useful for the following cases:
+- A task is trying to acquire locks, but those locks are still held by
+  other tasks. By deferring itself, the task gives others a chance to
+  run. This is especially useful when all workers are busy.
+- If a task decides it hasn't gotten its locks in a long time, it can
+  start to increase its own priority.
+- Tasks waiting for long-running operations running asynchronously could
+  defer themselves while waiting for a long-running operation.
+With these changes, the job queue will be able to implement per-job
 Feature changes
diff --git a/lib/workerpool.py b/lib/workerpool.py
index 8127329467897fecc45a60f9cc904037a520c777..1838d97b7c3a275f89067c7898f307df494072c2 100644
--- a/lib/workerpool.py
+++ b/lib/workerpool.py
@@ -23,14 +23,34 @@
-import collections
 import logging
 import threading
+import heapq
 from ganeti import compat
+from ganeti import errors
 _TERMINATE = object()
+class DeferTask(Exception):
+  """Special exception class to defer a task.
+  This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
+  task. Optionally, the priority of the task can be changed.
+  """
+  def __init__(self, priority=None):
+    """Initializes this class.
+    @type priority: number
+    @param priority: New task priority (None means no change)
+    """
+    Exception.__init__(self)
+    self.priority = priority
 class BaseWorker(threading.Thread, object):
@@ -64,6 +84,22 @@ class BaseWorker(threading.Thread, object):
+  def GetCurrentPriority(self):
+    """Returns the priority of the current task.
+    Should only be called from within L{RunTask}.
+    """
+    self.pool._lock.acquire()
+    try:
+      assert self._HasRunningTaskUnlocked()
+      (priority, _, _) = self._current_task
+      return priority
+    finally:
+      self.pool._lock.release()
   def _HasRunningTaskUnlocked(self):
     """Returns whether this worker is currently running a task.
@@ -80,6 +116,8 @@ class BaseWorker(threading.Thread, object):
     while True:
       assert self._current_task is None
+      defer = None
         # Wait on lock to be told either to terminate or to do a task
@@ -104,11 +142,24 @@ class BaseWorker(threading.Thread, object):
-        # Run the actual task
+        (priority, _, args) = self._current_task
-          logging.debug("Starting task %r", self._current_task)
-          self.RunTask(*self._current_task)
-          logging.debug("Done with task %r", self._current_task)
+          # Run the actual task
+          assert defer is None
+          logging.debug("Starting task %r, priority %s", args, priority)
+          self.RunTask(*args) # pylint: disable-msg=W0142
+          logging.debug("Done with task %r, priority %s", args, priority)
+        except DeferTask, err:
+          defer = err
+          if defer.priority is None:
+            # Use same priority
+            defer.priority = priority
+          logging.debug("Deferring task %r, new priority %s", defer.priority)
+          assert self._HasRunningTaskUnlocked()
         except: # pylint: disable-msg=W0702
           logging.exception("Caught unhandled exception")
@@ -117,6 +168,12 @@ class BaseWorker(threading.Thread, object):
         # Notify pool
+          if defer:
+            assert self._current_task
+            # Schedule again for later run
+            (_, _, args) = self._current_task
+            pool._AddTaskUnlocked(args, defer.priority)
           if self._current_task:
             self._current_task = None
@@ -170,7 +227,8 @@ class WorkerPool(object):
     self._termworkers = []
     # Queued tasks
-    self._tasks = collections.deque()
+    self._counter = 0
+    self._tasks = []
     # Start workers
@@ -184,44 +242,77 @@ class WorkerPool(object):
     while self._quiescing:
-  def _AddTaskUnlocked(self, args):
+  def _AddTaskUnlocked(self, args, priority):
+    """Adds a task to the internal queue.
+    @type args: sequence
+    @param args: Arguments passed to L{BaseWorker.RunTask}
+    @type priority: number
+    @param priority: Task priority
+    """
     assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
+    assert isinstance(priority, (int, long)), "Priority must be numeric"
-    self._tasks.append(args)
+    # This counter is used to ensure elements are processed in their
+    # incoming order. For processing they're sorted by priority and then
+    # counter.
+    self._counter += 1
+    heapq.heappush(self._tasks, (priority, self._counter, args))
     # Notify a waiting worker
-  def AddTask(self, args):
+  def AddTask(self, args, priority=_DEFAULT_PRIORITY):
     """Adds a task to the queue.
     @type args: sequence
     @param args: arguments passed to L{BaseWorker.RunTask}
+    @type priority: number
+    @param priority: Task priority
-      self._AddTaskUnlocked(args)
+      self._AddTaskUnlocked(args, priority)
-  def AddManyTasks(self, tasks):
+  def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY):
     """Add a list of tasks to the queue.
     @type tasks: list of tuples
     @param tasks: list of args passed to L{BaseWorker.RunTask}
+    @type priority: number or list of numbers
+    @param priority: Priority for all added tasks or a list with the priority
+                     for each task
     assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
       "Each task must be a sequence"
+    assert (isinstance(priority, (int, long)) or
+            compat.all(isinstance(prio, (int, long)) for prio in priority)), \
+           "Priority must be numeric or be a list of numeric values"
+    if isinstance(priority, (int, long)):
+      priority = [priority] * len(tasks)
+    elif len(priority) != len(tasks):
+      raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
+                                   " number of tasks (%s)" %
+                                   (len(priority), len(tasks)))
-      for args in tasks:
-        self._AddTaskUnlocked(args)
+      assert compat.all(isinstance(prio, (int, long)) for prio in priority)
+      assert len(tasks) == len(priority)
+      for args, priority in zip(tasks, priority):
+        self._AddTaskUnlocked(args, priority)
@@ -254,7 +345,7 @@ class WorkerPool(object):
     # Get task from queue and tell pool about it
-      return self._tasks.popleft()
+      return heapq.heappop(self._tasks)
diff --git a/test/ganeti.workerpool_unittest.py b/test/ganeti.workerpool_unittest.py
index 586cc5e9f12fb38fa7c651cacb657314f0a13878..63868622ef52621eee2fb9a878b55888e845dee7 100755
--- a/test/ganeti.workerpool_unittest.py
+++ b/test/ganeti.workerpool_unittest.py
@@ -1,7 +1,7 @@
-# Copyright (C) 2008 Google Inc.
+# Copyright (C) 2008, 2009, 2010 Google Inc.
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -26,13 +26,15 @@ import threading
 import time
 import sys
 import zlib
+import random
 from ganeti import workerpool
+from ganeti import errors
 import testutils
-class CountingContext(object):
+class CountingContext(object):
   def __init__(self):
     self._lock = threading.Condition(threading.Lock())
     self.done = 0
@@ -57,7 +59,6 @@ class CountingContext(object):
 class CountingBaseWorker(workerpool.BaseWorker):
   def RunTask(self, ctx, text):
@@ -83,6 +84,46 @@ class ChecksumBaseWorker(workerpool.BaseWorker):
+class ListBuilderContext:
+  def __init__(self):
+    self.lock = threading.Lock()
+    self.result = []
+    self.prioresult = {}
+class ListBuilderWorker(workerpool.BaseWorker):
+  def RunTask(self, ctx, data):
+    ctx.lock.acquire()
+    try:
+      ctx.result.append((self.GetCurrentPriority(), data))
+      ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
+    finally:
+      ctx.lock.release()
+class DeferringTaskContext:
+  def __init__(self):
+    self.lock = threading.Lock()
+    self.prioresult = {}
+    self.samepriodefer = {}
+class DeferringWorker(workerpool.BaseWorker):
+  def RunTask(self, ctx, num, targetprio):
+    ctx.lock.acquire()
+    try:
+      if num in ctx.samepriodefer:
+        del ctx.samepriodefer[num]
+        raise workerpool.DeferTask()
+      if self.GetCurrentPriority() > targetprio:
+        raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
+      ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
+    finally:
+      ctx.lock.release()
 class TestWorkerpool(unittest.TestCase):
   """Workerpool tests"""
@@ -206,6 +247,220 @@ class TestWorkerpool(unittest.TestCase):
+  def testPriorityChecksum(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+      ctx = ChecksumContext()
+      data = {}
+      tasks = []
+      priorities = []
+      for i in range(1, 333):
+        prio = i % 7
+        tasks.append((ctx, i))
+        priorities.append(prio)
+        data.setdefault(prio, []).append(i)
+      wp.AddManyTasks(tasks, priority=priorities)
+      wp.Quiesce()
+      self._CheckNoTasks(wp)
+      # Check sum
+      ctx.lock.acquire()
+      try:
+        checksum = ChecksumContext.CHECKSUM_START
+        for priority in sorted(data.keys()):
+          for i in data[priority]:
+            checksum = ChecksumContext.UpdateChecksum(checksum, i)
+        self.assertEqual(checksum, ctx.checksum)
+      finally:
+        ctx.lock.release()
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+  def testPriorityListManyTasks(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+      ctx = ListBuilderContext()
+      # Use static seed for this test
+      rnd = random.Random(0)
+      data = {}
+      tasks = []
+      priorities = []
+      for i in range(1, 333):
+        prio = int(rnd.random() * 10)
+        tasks.append((ctx, i))
+        priorities.append(prio)
+        data.setdefault(prio, []).append((prio, i))
+      wp.AddManyTasks(tasks, priority=priorities)
+      self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
+                        [("x", ), ("y", )], priority=[1] * 5)
+      wp.Quiesce()
+      self._CheckNoTasks(wp)
+      # Check result
+      ctx.lock.acquire()
+      try:
+        expresult = []
+        for priority in sorted(data.keys()):
+          expresult.extend(data[priority])
+        self.assertEqual(expresult, ctx.result)
+      finally:
+        ctx.lock.release()
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+  def testPriorityListSingleTasks(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+      ctx = ListBuilderContext()
+      # Use static seed for this test
+      rnd = random.Random(26279)
+      data = {}
+      for i in range(1, 333):
+        prio = int(rnd.random() * 30)
+        wp.AddTask((ctx, i), priority=prio)
+        data.setdefault(prio, []).append(i)
+        # Cause some distortion
+        if i % 11 == 0:
+          time.sleep(.001)
+        if i % 41 == 0:
+          wp.Quiesce()
+      wp.Quiesce()
+      self._CheckNoTasks(wp)
+      # Check result
+      ctx.lock.acquire()
+      try:
+        self.assertEqual(data, ctx.prioresult)
+      finally:
+        ctx.lock.release()
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+  def testPriorityListSingleTasks(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+      ctx = ListBuilderContext()
+      # Use static seed for this test
+      rnd = random.Random(26279)
+      data = {}
+      for i in range(1, 333):
+        prio = int(rnd.random() * 30)
+        wp.AddTask((ctx, i), priority=prio)
+        data.setdefault(prio, []).append(i)
+        # Cause some distortion
+        if i % 11 == 0:
+          time.sleep(.001)
+        if i % 41 == 0:
+          wp.Quiesce()
+      wp.Quiesce()
+      self._CheckNoTasks(wp)
+      # Check result
+      ctx.lock.acquire()
+      try:
+        self.assertEqual(data, ctx.prioresult)
+      finally:
+        ctx.lock.release()
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+  def testDeferTask(self):
+    # Tests whether all tasks are run and, since we're only using a single
+    # thread, whether everything is started in order and respects the priority
+    wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
+    try:
+      self._CheckWorkerCount(wp, 1)
+      ctx = DeferringTaskContext()
+      # Use static seed for this test
+      rnd = random.Random(14921)
+      data = {}
+      for i in range(1, 333):
+        ctx.lock.acquire()
+        try:
+          if i % 5 == 0:
+            ctx.samepriodefer[i] = True
+        finally:
+          ctx.lock.release()
+        prio = int(rnd.random() * 30)
+        wp.AddTask((ctx, i, prio), priority=50)
+        data.setdefault(prio, set()).add(i)
+        # Cause some distortion
+        if i % 24 == 0:
+          time.sleep(.001)
+        if i % 31 == 0:
+          wp.Quiesce()
+      wp.Quiesce()
+      self._CheckNoTasks(wp)
+      # Check result
+      ctx.lock.acquire()
+      try:
+        self.assertEqual(data, ctx.prioresult)
+      finally:
+        ctx.lock.release()
+      self._CheckWorkerCount(wp, 1)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
 if __name__ == '__main__':