Commit fad3554f authored by John Giannelos's avatar John Giannelos
Browse files

Fixed insist on actions / fixed testing conditions

parent b7f7a2f5
#!/usr/bin/env python
#!/usr/bin/env python
# Copyright 2011 GRNET S.A. All rights reserved.
#
......@@ -354,20 +354,6 @@ class SpawnServerTestCase(unittest.TestCase):
# The hostname must be of the form 'prefix-id'
self.assertTrue(hostname.endswith("-%d\n" % self.serverid))
def _file_md5(filename, block_size = 2**20):
f = open(filename)
md5 = hashlib.md5()
while True:
data = f.read(block_size)
if not data:
break
md5.update(data)
f.close()
return md5.digest()
def _check_file_through_ssh(self, hostip, username, password, remotepath, content):
msg = "Trying file injection through SSH to %s, as %s/%s" % (hostip, username, password)
log.info(msg)
......@@ -392,7 +378,7 @@ class SpawnServerTestCase(unittest.TestCase):
remote_content = b64encode(f.read())
# Check if files are the same
return remote_content == content
return (remote_content == content)
def _skipIf(self, condition, msg):
if condition:
......@@ -458,44 +444,44 @@ class SpawnServerTestCase(unittest.TestCase):
self._insist_on_status_transition("BUILD", "ACTIVE",
self.build_fail, self.build_warning)
# def test_003a_get_server_oob_console(self):
# """Test getting OOB server console over VNC
def test_003a_get_server_oob_console(self):
"""Test getting OOB server console over VNC
# Implementation of RFB protocol follows
# http://www.realvnc.com/docs/rfbproto.pdf.
Implementation of RFB protocol follows
http://www.realvnc.com/docs/rfbproto.pdf.
# """
"""
# console = self.cyclades.get_server_console(self.serverid)
# self.assertEquals(console['type'], "vnc")
# sock = self._insist_on_tcp_connection(socket.AF_UNSPEC,
# console["host"], console["port"])
# # Step 1. ProtocolVersion message (par. 6.1.1)
# version = sock.recv(1024)
# self.assertEquals(version, 'RFB 003.008\n')
# sock.send(version)
# # Step 2. Security (par 6.1.2): Only VNC Authentication supported
# sec = sock.recv(1024)
# self.assertEquals(list(sec), ['\x01', '\x02'])
# # Step 3. Request VNC Authentication (par 6.1.2)
# sock.send('\x02')
# # Step 4. Receive Challenge (par 6.2.2)
# challenge = sock.recv(1024)
# self.assertEquals(len(challenge), 16)
# # Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
# response = d3des_generate_response(
# (console["password"] + '\0' * 8)[:8], challenge)
# sock.send(response)
# # Step 6. SecurityResult (par 6.1.3)
# result = sock.recv(4)
# self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
# sock.close()
console = self.cyclades.get_server_console(self.serverid)
self.assertEquals(console['type'], "vnc")
sock = self._insist_on_tcp_connection(socket.AF_UNSPEC,
console["host"], console["port"])
# Step 1. ProtocolVersion message (par. 6.1.1)
version = sock.recv(1024)
self.assertEquals(version, 'RFB 003.008\n')
sock.send(version)
# Step 2. Security (par 6.1.2): Only VNC Authentication supported
sec = sock.recv(1024)
self.assertEquals(list(sec), ['\x01', '\x02'])
# Step 3. Request VNC Authentication (par 6.1.2)
sock.send('\x02')
# Step 4. Receive Challenge (par 6.2.2)
challenge = sock.recv(1024)
self.assertEquals(len(challenge), 16)
# Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
response = d3des_generate_response(
(console["password"] + '\0' * 8)[:8], challenge)
sock.send(response)
# Step 6. SecurityResult (par 6.1.3)
result = sock.recv(4)
self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
sock.close()
def test_004_server_has_ipv4(self):
"""Test active server has a valid IPv4 address"""
......@@ -503,11 +489,11 @@ class SpawnServerTestCase(unittest.TestCase):
ipv4 = self._get_ipv4(server)
self.assertEquals(IP(ipv4).version(), 4)
# def test_005_server_has_ipv6(self):
# """Test active server has a valid IPv6 address"""
# server = self.client.get_server_details(self.serverid)
# ipv6 = self._get_ipv6(server)
# self.assertEquals(IP(ipv6).version(), 6)
def test_005_server_has_ipv6(self):
"""Test active server has a valid IPv6 address"""
server = self.client.get_server_details(self.serverid)
ipv6 = self._get_ipv6(server)
self.assertEquals(IP(ipv6).version(), 6)
def test_006_server_responds_to_ping_IPv4(self):
"""Test server responds to ping on IPv4 address"""
......@@ -519,15 +505,15 @@ class SpawnServerTestCase(unittest.TestCase):
self._ping_once,
False, ip)
# def test_007_server_responds_to_ping_IPv6(self):
# """Test server responds to ping on IPv6 address"""
# server = self.client.get_server_details(self.serverid)
# ip = self._get_ipv6(server)
# self._try_until_timeout_expires(self.action_timeout,
# self.action_timeout,
# "PING IPv6 to %s" % ip,
# self._ping_once,
# True, ip)
def test_007_server_responds_to_ping_IPv6(self):
"""Test server responds to ping on IPv6 address"""
server = self.client.get_server_details(self.serverid)
ip = self._get_ipv6(server)
self._try_until_timeout_expires(self.action_timeout,
self.action_timeout,
"PING IPv6 to %s" % ip,
self._ping_once,
True, ip)
def test_008_submit_shutdown_request(self):
"""Test submit request to shutdown server"""
......@@ -560,12 +546,12 @@ class SpawnServerTestCase(unittest.TestCase):
self._insist_on_ssh_hostname(self._get_ipv4(server),
self.username, self.passwd)
# def test_013_ssh_to_server_IPv6(self):
# """Test SSH to server public IPv6 works, verify hostname"""
# self._skipIf(self.is_windows, "only valid for Linux servers")
# server = self.client.get_server_details(self.serverid)
# self._insist_on_ssh_hostname(self._get_ipv6(server),
# self.username, self.passwd)
def test_013_ssh_to_server_IPv6(self):
"""Test SSH to server public IPv6 works, verify hostname"""
self._skipIf(self.is_windows, "only valid for Linux servers")
server = self.client.get_server_details(self.serverid)
self._insist_on_ssh_hostname(self._get_ipv6(server),
self.username, self.passwd)
def test_014_rdp_to_server_IPv4(self):
"Test RDP connection to server public IPv4 works"""
......@@ -579,16 +565,16 @@ class SpawnServerTestCase(unittest.TestCase):
# FIXME: Use rdesktop, analyze exit code? see manpage [costasd]
sock.close()
# def test_015_rdp_to_server_IPv6(self):
# "Test RDP connection to server public IPv6 works"""
# self._skipIf(not self.is_windows, "only valid for Windows servers")
# server = self.client.get_server_details(self.serverid)
# ipv6 = self._get_ipv6(server)
# sock = _get_tcp_connection(socket.AF_INET6, ipv6, 3389)
def test_015_rdp_to_server_IPv6(self):
"Test RDP connection to server public IPv6 works"""
self._skipIf(not self.is_windows, "only valid for Windows servers")
server = self.client.get_server_details(self.serverid)
ipv6 = self._get_ipv6(server)
sock = _get_tcp_connection(socket.AF_INET6, ipv6, 3389)
# # No actual RDP processing done. We assume the RDP server is there
# # if the connection to the RDP port is successful.
# sock.close()
# No actual RDP processing done. We assume the RDP server is there
# if the connection to the RDP port is successful.
sock.close()
def test_016_personality_is_enforced(self):
"""Test file injection for personality enforcement"""
......@@ -621,6 +607,7 @@ class SpawnServerTestCase(unittest.TestCase):
class NetworkTestCase(unittest.TestCase):
""" Testing networking in cyclades """
@classmethod
def setUpClass(cls):
"Initialize kamaki, get list of current networks"
......@@ -640,6 +627,8 @@ class NetworkTestCase(unittest.TestCase):
servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
is_windows = imagename.lower().find("windows") >= 0
#Run testcases for server spawning in order to ensure it is done right
setupCase = _spawn_server_test_case(imageid=str(imageid), flavorid=flavorid,
imagename=imagename,
personality=None,
......@@ -679,43 +668,80 @@ class NetworkTestCase(unittest.TestCase):
def test_002_connect_to_network(self):
"""Test connect VM to network"""
servers = self.compute.list_servers()
server = choice(servers)
#Pick a server created only for the test
server = choice([s for s in servers if s['name'].startswith(SNF_TEST_PREFIX)])
self.client.connect_server(server['id'], self.networkid)
#Update class attributes
cls = type(self)
cls.serverid = server['id']
#FIXME: Insist on new connection instead of this
time.sleep(15)
#Insist on connecting until action timeout
connected = (self.client.get_network_details(self.networkid))
connections = len(connected['servers']['values'])
fail_tmout = time.time()+self.action_timeout
self.assertTrue(connections>=1)
while True:
connections = connected['servers']['values']
if (self.serverid in connections):
conn_exists = True
if time.time() > fail_tmout:
self.assertLess(time.time(), fail_tmout)
else:
time.sleep(self.query_interval)
self.assertTrue(conn_exists)
def test_003_disconnect_from_network(self):
prev_state = self.client.get_network_details(self.networkid)
prev_conn = len(prev_state['servers']['values'])
self.client.disconnect_server(self.serverid, self.networkid)
#FIXME: Insist on deleting instead of this
time.sleep(15)
#Insist on deleting until action timeout
connected = (self.client.get_network_details(self.networkid))
curr_conn = len(connected['servers']['values'])
fail_tmout = time.time()+self.action_timeout
self.assertTrue(curr_conn < prev_conn)
while True:
connections = connected['servers']['values']
if (self.serverid not in connections):
conn_exists = False
if time.time() > fail_tmout:
self.assertLess(time.time(), fail_tmout)
else:
time.sleep(self.query_interval)
self.assertFalse(conn_exists)
def test_004_destroy_network(self):
"""Test submit delete network request"""
self.client.delete_network(self.networkid)
self.client.delete_network(self.networkid)
networks = self.client.list_networks()
self.assertEqual(len(networks),1)
curr_net = []
for net in networks:
curr_net.appent(net['id'])
self.assertTrue(self.networkid not in curr_net)
def test_005_cleanup_servers(self):
"""Cleanup servers created for this test"""
self.compute.delete_server(self.server_id)
fail_tmout = time.time()+self.action_timeout
#Ensure server gets deleted
while True:
status = self.compute.get_server_details(self.serverid)
if status == 'DELETED':
deleted = True
elif time.time() > fail_tmout:
self.assertLess(time.time(), fail_tmout)
else:
time.sleep(self.query_interval)
self.assertTrue(deleted)
class TestRunnerProcess(Process):
"""A distinct process used to execute part of the tests in parallel"""
......@@ -803,7 +829,18 @@ def _spawn_server_test_case(**kwargs):
# Make sure the class can be pickled, by listing it among
# the attributes of __main__. A PicklingError is raised otherwise.
setattr(__main__, name, cls)
return cls
return cls
def _spawn_network_test_case(**kwargs):
"""Construct a new unit test case class from NetworkTestCase"""
name = "NetworkTestCase"+TEST_RUN_ID
cls = type(name, (NetworkTestCase,), kwargs)
# Make sure the class can be pickled, by listing it among
# the attributes of __main__. A PicklingError is raised otherwise.
setattr(__main__, name, cls)
return cls
def cleanup_servers(delete_stale=False):
......@@ -909,7 +946,7 @@ def parse_arguments(args):
"name starts with `%s'" % SNF_TEST_PREFIX,
default=False)
parser.add_option("--force-personality",
action="store", dest="personality_path",
action="store", type="string", dest="personality_path",
help="Force a personality file injection. File path required. ",
default=None)
......@@ -1003,22 +1040,25 @@ def main():
servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
is_windows = imagename.lower().find("windows") >= 0
ServerTestCase = _spawn_server_test_case(imageid=imageid, flavorid=flavorid,
imagename=imagename,
personality=personality,
servername=servername,
is_windows=is_windows,
action_timeout=opts.action_timeout,
build_warning=opts.build_warning,
build_fail=opts.build_fail,
query_interval=opts.query_interval,
)
ServerTestCase = _spawn_server_test_case(imageid=imageid, flavorid=flavorid,
imagename=imagename,
personality=personality,
servername=servername,
is_windows=is_windows,
action_timeout=opts.action_timeout,
build_warning=opts.build_warning,
build_fail=opts.build_fail,
query_interval=opts.query_interval,
)
#Running all the testcases sequentially
#seq_cases = [UnauthorizedTestCase, FlavorsTestCase, ImagesTestCase, ServerTestCase, NetworkTestCase]
seq_cases = [NetworkTestCase]
newNetworkTestCase = _spawn_network_test_case(action_timeout = opts.action_timeout,
query_interval = opts.query_interval)
seq_cases = [newNetworkTestCase]
for case in seq_cases:
suite = unittest.TestLoader().loadTestsFromTestCase(case)
unittest.TextTestRunner(verbosity=2).run(suite)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment