Commit 129cce69 authored by Michael Hanselmann's avatar Michael Hanselmann

Add unit tests for cmdlib._WipeDisks

This is in preparation for adding disk wipe on growing disks.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarBernardo Dal Seno <bdalseno@google.com>
parent 1be3e86c
......@@ -385,11 +385,13 @@ class TestClusterVerifyFiles(unittest.TestCase):
class _FakeLU:
def __init__(self, cfg=NotImplemented, proc=NotImplemented):
def __init__(self, cfg=NotImplemented, proc=NotImplemented,
rpc=NotImplemented):
self.warning_log = []
self.info_log = []
self.cfg = cfg
self.proc = proc
self.rpc = rpc
def LogWarning(self, text, *args):
self.warning_log.append((text, args))
......@@ -1238,5 +1240,162 @@ class TestGenerateDiskTemplate(unittest.TestCase):
])
class _ConfigForDiskWipe:
def SetDiskID(self, device, node):
assert isinstance(device, objects.Disk)
assert node == "node1.example.com"
class _RpcForDiskWipe:
def __init__(self, pause_cb, wipe_cb):
self._pause_cb = pause_cb
self._wipe_cb = wipe_cb
def call_blockdev_pause_resume_sync(self, node, disks, pause):
assert node == "node1.example.com"
return rpc.RpcResult(data=self._pause_cb(disks, pause))
def call_blockdev_wipe(self, node, bdev, offset, size):
assert node == "node1.example.com"
return rpc.RpcResult(data=self._wipe_cb(bdev, offset, size))
class _DiskPauseTracker:
def __init__(self):
self.history = []
def __call__(self, (disks, instance), pause):
assert instance.disks == disks
self.history.extend((i.logical_id, i.size, pause)
for i in disks)
return (True, [True] * len(disks))
class TestWipeDisks(unittest.TestCase):
def testPauseFailure(self):
def _FailPause((disks, _), pause):
self.assertEqual(len(disks), 3)
self.assertTrue(pause)
return (False, "error")
lu = _FakeLU(rpc=_RpcForDiskWipe(_FailPause, NotImplemented),
cfg=_ConfigForDiskWipe())
disks = [
objects.Disk(dev_type=constants.LD_LV),
objects.Disk(dev_type=constants.LD_LV),
objects.Disk(dev_type=constants.LD_LV),
]
instance = objects.Instance(name="inst21201",
primary_node="node1.example.com",
disk_template=constants.DT_PLAIN,
disks=disks)
self.assertRaises(errors.OpExecError, cmdlib._WipeDisks, lu, instance)
def testFailingWipe(self):
pt = _DiskPauseTracker()
def _WipeCb((disk, _), offset, size):
assert disk.logical_id == "disk0"
return (False, None)
lu = _FakeLU(rpc=_RpcForDiskWipe(pt, _WipeCb),
cfg=_ConfigForDiskWipe())
disks = [
objects.Disk(dev_type=constants.LD_LV, logical_id="disk0",
size=100 * 1024),
objects.Disk(dev_type=constants.LD_LV, logical_id="disk1",
size=500 * 1024),
objects.Disk(dev_type=constants.LD_LV, logical_id="disk2", size=256),
]
instance = objects.Instance(name="inst562",
primary_node="node1.example.com",
disk_template=constants.DT_PLAIN,
disks=disks)
try:
cmdlib._WipeDisks(lu, instance)
except errors.OpExecError, err:
self.assertTrue(str(err), "Could not wipe disk 0 at offset 0 ")
else:
self.fail("Did not raise exception")
self.assertEqual(pt.history, [
("disk0", 100 * 1024, True),
("disk1", 500 * 1024, True),
("disk2", 256, True),
("disk0", 100 * 1024, False),
("disk1", 500 * 1024, False),
("disk2", 256, False),
])
def testNormalWipe(self):
pt = _DiskPauseTracker()
progress = {}
def _WipeCb((disk, _), offset, size):
assert isinstance(offset, (long, int))
assert isinstance(size, (long, int))
max_chunk_size = (disk.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT)
self.assertTrue(offset >= 0)
self.assertTrue((offset + size) <= disk.size)
self.assertTrue(size > 0)
self.assertTrue(size <= constants.MAX_WIPE_CHUNK)
self.assertTrue(size <= max_chunk_size)
self.assertTrue(offset == 0 or disk.logical_id in progress)
# Keep track of progress
cur_progress = progress.setdefault(disk.logical_id, 0)
self.assertEqual(cur_progress, offset)
progress[disk.logical_id] += size
return (True, None)
lu = _FakeLU(rpc=_RpcForDiskWipe(pt, _WipeCb),
cfg=_ConfigForDiskWipe())
disks = [
objects.Disk(dev_type=constants.LD_LV, logical_id="disk0", size=1024),
objects.Disk(dev_type=constants.LD_LV, logical_id="disk1",
size=500 * 1024),
objects.Disk(dev_type=constants.LD_LV, logical_id="disk2", size=128),
objects.Disk(dev_type=constants.LD_LV, logical_id="disk3",
size=constants.MAX_WIPE_CHUNK),
]
instance = objects.Instance(name="inst3560",
primary_node="node1.example.com",
disk_template=constants.DT_PLAIN,
disks=disks)
cmdlib._WipeDisks(lu, instance)
self.assertEqual(pt.history, [
("disk0", 1024, True),
("disk1", 500 * 1024, True),
("disk2", 128, True),
("disk3", constants.MAX_WIPE_CHUNK, True),
("disk0", 1024, False),
("disk1", 500 * 1024, False),
("disk2", 128, False),
("disk3", constants.MAX_WIPE_CHUNK, False),
])
# Ensure the complete disk has been wiped
self.assertEqual(progress, dict((i.logical_id, i.size) for i in disks))
if __name__ == "__main__":
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment