Newer
Older
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for unittesting the utils module"""
import errno
import fcntl
import glob
Alexander Schreiber
committed
import socket
import stat
import tempfile
import time
import unittest
import random
import operator
from ganeti import constants
from ganeti import utils
from ganeti.utils import RunCmd, \
FirstFree, \
class TestParseCpuMask(unittest.TestCase):
"""Test case for the ParseCpuMask function."""
def testWellFormed(self):
self.assertEqual(utils.ParseCpuMask(""), [])
self.assertEqual(utils.ParseCpuMask("1"), [1])
self.assertEqual(utils.ParseCpuMask("0-2,4,5-5"), [0,1,2,4,5])
def testInvalidInput(self):
for data in ["garbage", "0,", "0-1-2", "2-1", "1-a"]:
self.assertRaises(errors.ParseError, utils.ParseCpuMask, data)
class TestGetMounts(unittest.TestCase):
"""Test case for GetMounts()."""
TESTDATA = (
"rootfs / rootfs rw 0 0\n"
"none /sys sysfs rw,nosuid,nodev,noexec,relatime 0 0\n"
"none /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n")
def setUp(self):
self.tmpfile = tempfile.NamedTemporaryFile()
utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
def testGetMounts(self):
self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
[
("rootfs", "/", "rootfs", "rw"),
("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
])
class TestFirstFree(unittest.TestCase):
"""Test case for the FirstFree function"""
def test(self):
"""Test FirstFree"""
self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
self.failUnlessEqual(FirstFree([]), None)
self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
class TestTimeFunctions(unittest.TestCase):
"""Test case for time functions"""
def runTest(self):
self.assertEqual(utils.SplitTime(1), (1, 0))
self.assertEqual(utils.SplitTime(1.5), (1, 500000))
self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
self.assertRaises(AssertionError, utils.SplitTime, -1)
self.assertEqual(utils.MergeTime((1, 0)), 1.0)
self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1218448917.481)
self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
class FieldSetTestCase(unittest.TestCase):
"""Test case for FieldSets"""
def testSimpleMatch(self):
f = utils.FieldSet("a", "b", "c", "def")
self.failUnless(f.Matches("a"))
self.failIf(f.Matches("d"), "Substring matched")
self.failIf(f.Matches("defghi"), "Prefix string matched")
self.failIf(f.NonMatching(["b", "c"]))
self.failIf(f.NonMatching(["a", "b", "c", "def"]))
self.failUnless(f.NonMatching(["a", "d"]))
def testRegexMatch(self):
f = utils.FieldSet("a", "b([0-9]+)", "c")
self.failUnless(f.Matches("b1"))
self.failUnless(f.Matches("b99"))
self.failIf(f.Matches("b/1"))
self.failIf(f.NonMatching(["b12", "c"]))
self.failUnless(f.NonMatching(["a", "1"]))
class TestForceDictType(unittest.TestCase):
"""Test case for ForceDictType"""
KEY_TYPES = {
"a": constants.VTYPE_INT,
"b": constants.VTYPE_BOOL,
"c": constants.VTYPE_STRING,
"d": constants.VTYPE_SIZE,
"e": constants.VTYPE_MAYBE_STRING,
}
def _fdt(self, dict, allowed_values=None):
if allowed_values is None:
utils.ForceDictType(dict, self.KEY_TYPES)
utils.ForceDictType(dict, self.KEY_TYPES, allowed_values=allowed_values)
return dict
def testSimpleDict(self):
self.assertEqual(self._fdt({}), {})
self.assertEqual(self._fdt({'a': 1}), {'a': 1})
self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
self.assertEqual(self._fdt({'b': False}), {'b': False})
self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
self.assertEqual(self._fdt({"e": None, }), {"e": None, })
self.assertEqual(self._fdt({"e": "Hello World", }), {"e": "Hello World", })
self.assertEqual(self._fdt({"e": False, }), {"e": '', })
self.assertEqual(self._fdt({"b": "hello", }, ["hello"]), {"b": "hello"})
def testErrors(self):
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
self.assertRaises(errors.TypeEnforcementError, self._fdt, {"b": "hello"})
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": object(), })
self.assertRaises(errors.TypeEnforcementError, self._fdt, {"e": [], })
self.assertRaises(errors.TypeEnforcementError, self._fdt, {"x": None, })
self.assertRaises(errors.TypeEnforcementError, self._fdt, [])
self.assertRaises(errors.ProgrammerError, utils.ForceDictType,
{"b": "hello"}, {"b": "no-such-type"})
class TestValidateServiceName(unittest.TestCase):
def testValid(self):
testnames = [
0, 1, 2, 3, 1024, 65000, 65534, 65535,
"ganeti",
"gnt-masterd",
"HELLO_WORLD_SVC",
"hello.world.1",
"0", "80", "1111", "65535",
]
for name in testnames:
self.assertEqual(utils.ValidateServiceName(name), name)
def testInvalid(self):
testnames = [
-15756, -1, 65536, 133428083,
"", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
"-8546", "-1", "65536",
(129 * "A"),
]
for name in testnames:
self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
class TestReadLockedPidFile(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.tmpdir)
def testNonExistent(self):
path = utils.PathJoin(self.tmpdir, "nonexist")
self.assert_(utils.ReadLockedPidFile(path) is None)
def testUnlocked(self):
path = utils.PathJoin(self.tmpdir, "pid")
utils.WriteFile(path, data="123")
self.assert_(utils.ReadLockedPidFile(path) is None)
def testLocked(self):
path = utils.PathJoin(self.tmpdir, "pid")
utils.WriteFile(path, data="123")
fl = utils.FileLock.Open(path)
try:
fl.Exclusive(blocking=True)
self.assertEqual(utils.ReadLockedPidFile(path), 123)
finally:
fl.Close()
self.assert_(utils.ReadLockedPidFile(path) is None)
def testError(self):
path = utils.PathJoin(self.tmpdir, "foobar", "pid")
utils.WriteFile(utils.PathJoin(self.tmpdir, "foobar"), data="")
# open(2) should return ENOTDIR
self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
class TestFindMatch(unittest.TestCase):
def test(self):
data = {
"aaaa": "Four A",
"bb": {"Two B": True},
re.compile(r"^x(foo|bar|bazX)([0-9]+)$"): (1, 2, 3),
}
self.assertEqual(utils.FindMatch(data, "aaaa"), ("Four A", []))
self.assertEqual(utils.FindMatch(data, "bb"), ({"Two B": True}, []))
for i in ["foo", "bar", "bazX"]:
for j in range(1, 100, 7):
self.assertEqual(utils.FindMatch(data, "x%s%s" % (i, j)),
((1, 2, 3), [i, str(j)]))
def testNoMatch(self):
self.assert_(utils.FindMatch({}, "") is None)
self.assert_(utils.FindMatch({}, "foo") is None)
self.assert_(utils.FindMatch({}, 1234) is None)
data = {
"X": "Hello World",
re.compile("^(something)$"): "Hello World",
}
self.assert_(utils.FindMatch(data, "") is None)
self.assert_(utils.FindMatch(data, "Hello World") is None)
class TestTryConvert(unittest.TestCase):
def test(self):
for src, fn, result in [
("1", int, 1),
("a", int, "a"),
("", bool, False),
("a", bool, True),
]:
self.assertEqual(utils.TryConvert(fn, src), result)