From c2a8e8ba68cb798e25c569adc308fdfc820921f7 Mon Sep 17 00:00:00 2001
From: Guido Trotter <ultrotter@google.com>
Date: Thu, 17 Jun 2010 09:15:17 +0100
Subject: [PATCH] WorkerPool.AddManyTasks

Useful if we want to add many tasks at once, without contention with the
previous one we added starting.

Signed-off-by: Guido Trotter <ultrotter@google.com>
Reviewed-by: Michael Hanselmann <hansmi@google.com>
---
 lib/workerpool.py                  | 30 ++++++++++++++++++++++++++----
 test/ganeti.workerpool_unittest.py | 17 +++++++++++++++++
 2 files changed, 43 insertions(+), 4 deletions(-)

diff --git a/lib/workerpool.py b/lib/workerpool.py
index 1ec89af67..0887ba4bb 100644
--- a/lib/workerpool.py
+++ b/lib/workerpool.py
@@ -180,6 +180,13 @@ class WorkerPool(object):
 
   # TODO: Implement dynamic resizing?
 
+  def _WaitWhileQuiescingUnlocked(self):
+    """Wait until the worker pool has finished quiescing.
+
+    """
+    while self._quiescing:
+      self._pool_to_pool.wait()
+
   def AddTask(self, *args):
     """Adds a task to the queue.
 
@@ -188,11 +195,8 @@ class WorkerPool(object):
     """
     self._lock.acquire()
     try:
-      # Don't add new tasks while we're quiescing
-      while self._quiescing:
-        self._pool_to_pool.wait()
+      self._WaitWhileQuiescingUnlocked()
 
-      # Add task to internal queue
       self._tasks.append(args)
 
       # Wake one idling worker up
@@ -200,6 +204,24 @@ class WorkerPool(object):
     finally:
       self._lock.release()
 
+  def AddManyTasks(self, tasks):
+    """Add a list of tasks to the queue.
+
+    @type tasks: list of tuples
+    @param tasks: list of args passed to L{BaseWorker.RunTask}
+
+    """
+    self._lock.acquire()
+    try:
+      self._WaitWhileQuiescingUnlocked()
+
+      self._tasks.extend(tasks)
+
+      for _ in tasks:
+        self._pool_to_worker.notify()
+    finally:
+      self._lock.release()
+
   def _ShouldWorkerTerminateUnlocked(self, worker):
     """Returns whether a worker should terminate.
 
diff --git a/test/ganeti.workerpool_unittest.py b/test/ganeti.workerpool_unittest.py
index e9cc30c4c..f5aee8e93 100755
--- a/test/ganeti.workerpool_unittest.py
+++ b/test/ganeti.workerpool_unittest.py
@@ -121,6 +121,23 @@ class TestWorkerpool(unittest.TestCase):
       wp.TerminateWorkers()
       self._CheckWorkerCount(wp, 0)
 
+  def testAddManyTasks(self):
+    wp = workerpool.WorkerPool("Test", 3, DummyBaseWorker)
+    try:
+      self._CheckWorkerCount(wp, 3)
+
+      wp.AddManyTasks(["Hello world %s" % i for i in range(10)])
+      wp.AddTask("A separate hello")
+      wp.AddTask("Once more, hi!")
+      wp.AddManyTasks([("Hello world %s" % i, ) for i in range(10)])
+
+      wp.Quiesce()
+
+      self._CheckNoTasks(wp)
+    finally:
+      wp.TerminateWorkers()
+      self._CheckWorkerCount(wp, 0)
+
   def _CheckNoTasks(self, wp):
     wp._lock.acquire()
     try:
-- 
GitLab