Newer
Older
#!/usr/bin/python
#
# Copyright (C) 2008 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for unittesting the workerpool module"""
import unittest
import threading
import time
import sys
import zlib
from ganeti import workerpool
class CountingContext(object):
def __init__(self):
self._lock = threading.Condition(threading.Lock())
self.done = 0
def DoneTask(self):
self._lock.acquire()
try:
self.done += 1
finally:
self._lock.release()
def GetDoneTasks(self):
self._lock.acquire()
try:
return self.done
finally:
self._lock.release()
@staticmethod
def UpdateChecksum(current, value):
return zlib.adler32(str(value), current)
class CountingBaseWorker(workerpool.BaseWorker):
def RunTask(self, ctx, text):
ctx.DoneTask()
class ChecksumContext:
CHECKSUM_START = zlib.adler32("")
def __init__(self):
self.lock = threading.Condition(threading.Lock())
self.checksum = self.CHECKSUM_START
@staticmethod
def UpdateChecksum(current, value):
return zlib.adler32(str(value), current)
class ChecksumBaseWorker(workerpool.BaseWorker):
def RunTask(self, ctx, number):
ctx.lock.acquire()
try:
ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
finally:
ctx.lock.release()
class TestWorkerpool(unittest.TestCase):
"""Workerpool tests"""
def testCounting(self):
ctx = CountingContext()
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
wp.AddTask((ctx, "Hello world %s" % i))
wp.Quiesce()
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
self.assertEquals(ctx.GetDoneTasks(), 10)
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self._CheckNoTasks(wp)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testNoTasksQuiesce(self):
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self._CheckNoTasks(wp)
wp.Quiesce()
self._CheckNoTasks(wp)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testChecksum(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order.
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = ChecksumContext()
checksum = ChecksumContext.CHECKSUM_START
checksum = ChecksumContext.UpdateChecksum(checksum, i)
wp.AddTask((ctx, i))
wp.Quiesce()
self._CheckNoTasks(wp)
# Check sum
ctx.lock.acquire()
try:
self.assertEqual(checksum, ctx.checksum)
finally:
ctx.lock.release()
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
ctx = CountingContext()
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
wp.AddTask((ctx, "A separate hello"))
wp.AddTask((ctx, "Once more, hi!"))
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
wp.Quiesce()
self._CheckNoTasks(wp)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
self.assertEquals(ctx.GetDoneTasks(), 22)
def testManyTasksSequence(self):
ctx = CountingContext()
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self.assertRaises(AssertionError, wp.AddManyTasks,
["Hello world %s" % i for i in range(10)])
self.assertRaises(AssertionError, wp.AddManyTasks,
[i for i in range(10)])
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
wp.AddTask((ctx, "A separate hello"))
wp.Quiesce()
self._CheckNoTasks(wp)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
self.assertEquals(ctx.GetDoneTasks(), 11)
def _CheckNoTasks(self, wp):
wp._lock.acquire()
try:
# The task queue must be empty now
self.failUnless(not wp._tasks)
finally:
wp._lock.release()
def _CheckWorkerCount(self, wp, num_workers):
wp._lock.acquire()
try:
self.assertEqual(len(wp._workers), num_workers)
finally:
wp._lock.release()
if __name__ == '__main__':