-
René Nussbaumer authored
hansmi helped me with merging the conflict. Thanks Conflicts: lib/workerpool.py Signed-off-by:
René Nussbaumer <rn@google.com> Reviewed-by:
Iustin Pop <iustin@google.com>
c30421e0
ganeti.workerpool_unittest.py 12.01 KiB
#!/usr/bin/python
#
# Copyright (C) 2008, 2009, 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for unittesting the workerpool module"""
import unittest
import threading
import time
import sys
import zlib
import random
from ganeti import workerpool
from ganeti import errors
import testutils
class CountingContext(object):
def __init__(self):
self._lock = threading.Condition(threading.Lock())
self.done = 0
def DoneTask(self):
self._lock.acquire()
try:
self.done += 1
finally:
self._lock.release()
def GetDoneTasks(self):
self._lock.acquire()
try:
return self.done
finally:
self._lock.release()
@staticmethod
def UpdateChecksum(current, value):
return zlib.adler32(str(value), current)
class CountingBaseWorker(workerpool.BaseWorker):
def RunTask(self, ctx, text):
ctx.DoneTask()
class ChecksumContext:
CHECKSUM_START = zlib.adler32("")
def __init__(self):
self.lock = threading.Condition(threading.Lock())
self.checksum = self.CHECKSUM_START
@staticmethod
def UpdateChecksum(current, value):
return zlib.adler32(str(value), current)
class ChecksumBaseWorker(workerpool.BaseWorker):
def RunTask(self, ctx, number):
name = "number%s" % number
self.SetTaskName(name)
# This assertion needs to be checked before updating the checksum. A
# failing assertion will then cause the result to be wrong.
assert self.getName() == ("%s/%s" % (self._worker_id, name))
ctx.lock.acquire()
try:
ctx.checksum = ctx.UpdateChecksum(ctx.checksum, number)
finally:
ctx.lock.release()
class ListBuilderContext:
def __init__(self):
self.lock = threading.Lock()
self.result = []
self.prioresult = {}
class ListBuilderWorker(workerpool.BaseWorker):
def RunTask(self, ctx, data):
ctx.lock.acquire()
try:
ctx.result.append((self.GetCurrentPriority(), data))
ctx.prioresult.setdefault(self.GetCurrentPriority(), []).append(data)
finally:
ctx.lock.release()
class DeferringTaskContext:
def __init__(self):
self.lock = threading.Lock()
self.prioresult = {}
self.samepriodefer = {}
class DeferringWorker(workerpool.BaseWorker):
def RunTask(self, ctx, num, targetprio):
ctx.lock.acquire()
try:
if num in ctx.samepriodefer:
del ctx.samepriodefer[num]
raise workerpool.DeferTask()
if self.GetCurrentPriority() > targetprio:
raise workerpool.DeferTask(priority=self.GetCurrentPriority() - 1)
ctx.prioresult.setdefault(self.GetCurrentPriority(), set()).add(num)
finally:
ctx.lock.release()
class TestWorkerpool(unittest.TestCase):
"""Workerpool tests"""
def testCounting(self):
ctx = CountingContext()
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
for i in range(10):
wp.AddTask((ctx, "Hello world %s" % i))
wp.Quiesce()
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
self.assertEquals(ctx.GetDoneTasks(), 10)
def testNoTasks(self):
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self._CheckNoTasks(wp)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testNoTasksQuiesce(self):
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self._CheckNoTasks(wp)
wp.Quiesce()
self._CheckNoTasks(wp)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testChecksum(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order.
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = ChecksumContext()
checksum = ChecksumContext.CHECKSUM_START
for i in range(1, 100):
checksum = ChecksumContext.UpdateChecksum(checksum, i)
wp.AddTask((ctx, i))
wp.Quiesce()
self._CheckNoTasks(wp)
# Check sum
ctx.lock.acquire()
try:
self.assertEqual(checksum, ctx.checksum)
finally:
ctx.lock.release()
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testAddManyTasks(self):
ctx = CountingContext()
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
wp.AddTask((ctx, "A separate hello"))
wp.AddTask((ctx, "Once more, hi!"))
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
wp.Quiesce()
self._CheckNoTasks(wp)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
self.assertEquals(ctx.GetDoneTasks(), 22)
def testManyTasksSequence(self):
ctx = CountingContext()
wp = workerpool.WorkerPool("Test", 3, CountingBaseWorker)
try:
self._CheckWorkerCount(wp, 3)
self.assertRaises(AssertionError, wp.AddManyTasks,
["Hello world %s" % i for i in range(10)])
self.assertRaises(AssertionError, wp.AddManyTasks,
[i for i in range(10)])
wp.AddManyTasks([(ctx, "Hello world %s" % i, ) for i in range(10)])
wp.AddTask((ctx, "A separate hello"))
wp.Quiesce()
self._CheckNoTasks(wp)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
self.assertEquals(ctx.GetDoneTasks(), 11)
def _CheckNoTasks(self, wp):
wp._lock.acquire()
try:
# The task queue must be empty now
self.failUnless(not wp._tasks)
finally:
wp._lock.release()
def _CheckWorkerCount(self, wp, num_workers):
wp._lock.acquire()
try:
self.assertEqual(len(wp._workers), num_workers)
finally:
wp._lock.release()
def testPriorityChecksum(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order and respects the priority
wp = workerpool.WorkerPool("Test", 1, ChecksumBaseWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = ChecksumContext()
data = {}
tasks = []
priorities = []
for i in range(1, 333):
prio = i % 7
tasks.append((ctx, i))
priorities.append(prio)
data.setdefault(prio, []).append(i)
wp.AddManyTasks(tasks, priority=priorities)
wp.Quiesce()
self._CheckNoTasks(wp)
# Check sum
ctx.lock.acquire()
try:
checksum = ChecksumContext.CHECKSUM_START
for priority in sorted(data.keys()):
for i in data[priority]:
checksum = ChecksumContext.UpdateChecksum(checksum, i)
self.assertEqual(checksum, ctx.checksum)
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testPriorityListManyTasks(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order and respects the priority
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = ListBuilderContext()
# Use static seed for this test
rnd = random.Random(0)
data = {}
tasks = []
priorities = []
for i in range(1, 333):
prio = int(rnd.random() * 10)
tasks.append((ctx, i))
priorities.append(prio)
data.setdefault(prio, []).append((prio, i))
wp.AddManyTasks(tasks, priority=priorities)
self.assertRaises(errors.ProgrammerError, wp.AddManyTasks,
[("x", ), ("y", )], priority=[1] * 5)
wp.Quiesce()
self._CheckNoTasks(wp)
# Check result
ctx.lock.acquire()
try:
expresult = []
for priority in sorted(data.keys()):
expresult.extend(data[priority])
self.assertEqual(expresult, ctx.result)
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testPriorityListSingleTasks(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order and respects the priority
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = ListBuilderContext()
# Use static seed for this test
rnd = random.Random(26279)
data = {}
for i in range(1, 333):
prio = int(rnd.random() * 30)
wp.AddTask((ctx, i), priority=prio)
data.setdefault(prio, []).append(i)
# Cause some distortion
if i % 11 == 0:
time.sleep(.001)
if i % 41 == 0:
wp.Quiesce()
wp.Quiesce()
self._CheckNoTasks(wp)
# Check result
ctx.lock.acquire()
try:
self.assertEqual(data, ctx.prioresult)
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testPriorityListSingleTasks(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order and respects the priority
wp = workerpool.WorkerPool("Test", 1, ListBuilderWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = ListBuilderContext()
# Use static seed for this test
rnd = random.Random(26279)
data = {}
for i in range(1, 333):
prio = int(rnd.random() * 30)
wp.AddTask((ctx, i), priority=prio)
data.setdefault(prio, []).append(i)
# Cause some distortion
if i % 11 == 0:
time.sleep(.001)
if i % 41 == 0:
wp.Quiesce()
wp.Quiesce()
self._CheckNoTasks(wp)
# Check result
ctx.lock.acquire()
try:
self.assertEqual(data, ctx.prioresult)
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
def testDeferTask(self):
# Tests whether all tasks are run and, since we're only using a single
# thread, whether everything is started in order and respects the priority
wp = workerpool.WorkerPool("Test", 1, DeferringWorker)
try:
self._CheckWorkerCount(wp, 1)
ctx = DeferringTaskContext()
# Use static seed for this test
rnd = random.Random(14921)
data = {}
for i in range(1, 333):
ctx.lock.acquire()
try:
if i % 5 == 0:
ctx.samepriodefer[i] = True
finally:
ctx.lock.release()
prio = int(rnd.random() * 30)
wp.AddTask((ctx, i, prio), priority=50)
data.setdefault(prio, set()).add(i)
# Cause some distortion
if i % 24 == 0:
time.sleep(.001)
if i % 31 == 0:
wp.Quiesce()
wp.Quiesce()
self._CheckNoTasks(wp)
# Check result
ctx.lock.acquire()
try:
self.assertEqual(data, ctx.prioresult)
finally:
ctx.lock.release()
self._CheckWorkerCount(wp, 1)
finally:
wp.TerminateWorkers()
self._CheckWorkerCount(wp, 0)
if __name__ == '__main__':
testutils.GanetiTestProgram()