Commit f1d92ee1 authored by John Giannelos's avatar John Giannelos
Browse files

Major changes in snf-burnin:

* NetworkTestCase checks network status changes
* NetworkTestcase handles networks with specific CIDR
* NetworkTestCase handles servers with attachments (nics)
* Parallel testcase spawning runs the same test-set in different processes
* Fix pickling error raised from multiprocess module
* Fix --delete-stale when VMs are connected to networks
parent 46b07249
......@@ -87,7 +87,6 @@ yellow = '\x1b[33m'
green = '\x1b[32m'
normal = '\x1b[0m'
class burninFormatter(logging.Formatter):
err_fmt = red + "ERROR: %(msg)s" + normal
......@@ -116,7 +115,6 @@ class burninFormatter(logging.Formatter):
return result
log = logging.getLogger("burnin")
log.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
......@@ -260,23 +258,28 @@ class SpawnServerTestCase(unittest.TestCase):
def _get_ipv4(self, server):
"""Get the public IPv4 of a server from the detailed server info"""
public_addrs = filter(lambda x: x["id"] == "public",
server["addresses"]["values"])
public_addrs = filter(lambda x: x["network_id"] == "public",
server["attachments"]["values"])
self.assertEqual(len(public_addrs), 1)
ipv4_addrs = filter(lambda x: x["version"] == 4,
public_addrs[0]["values"])
self.assertEqual(len(ipv4_addrs), 1)
return ipv4_addrs[0]["addr"]
self.assertTrue(public_addrs[0]['ipv4'] != None)
return public_addrs[0]['ipv4']
def _get_ipv6(self, server):
"""Get the public IPv6 of a server from the detailed server info"""
public_addrs = filter(lambda x: x["id"] == "public",
server["addresses"]["values"])
public_addrs = filter(lambda x: x["network_id"] == "public",
server["attachments"]["values"])
self.assertEqual(len(public_addrs), 1)
ipv6_addrs = filter(lambda x: x["version"] == 6,
public_addrs[0]["values"])
self.assertEqual(len(ipv6_addrs), 1)
return ipv6_addrs[0]["addr"]
self.assertTrue(public_addrs[0]['ipv6'] != None)
return public_addrs[0]['ipv6']
def _connect_loginname(self, os):
"""Return the login name for connections based on the server OS"""
......@@ -446,7 +449,7 @@ class SpawnServerTestCase(unittest.TestCase):
servers = self.client.list_servers(detail=True)
servers = filter(lambda x: x["name"] == self.servername, servers)
self.assertEqual(len(servers), 1)
server = servers[0]
self.assertEqual(server["name"], self.servername)
self.assertEqual(server["flavorRef"], self.flavorid)
......@@ -506,43 +509,43 @@ class SpawnServerTestCase(unittest.TestCase):
self._insist_on_status_transition("BUILD", "ACTIVE",
self.build_fail, self.build_warning)
def test_003a_get_server_oob_console(self):
"""Test getting OOB server console over VNC
# def test_003a_get_server_oob_console(self):
# """Test getting OOB server console over VNC
Implementation of RFB protocol follows
http://www.realvnc.com/docs/rfbproto.pdf.
# Implementation of RFB protocol follows
# http://www.realvnc.com/docs/rfbproto.pdf.
"""
console = self.cyclades.get_server_console(self.serverid)
self.assertEquals(console['type'], "vnc")
sock = self._insist_on_tcp_connection(socket.AF_INET,
console["host"], console["port"])
# Step 1. ProtocolVersion message (par. 6.1.1)
version = sock.recv(1024)
self.assertEquals(version, 'RFB 003.008\n')
sock.send(version)
# Step 2. Security (par 6.1.2): Only VNC Authentication supported
sec = sock.recv(1024)
self.assertEquals(list(sec), ['\x01', '\x02'])
# Step 3. Request VNC Authentication (par 6.1.2)
sock.send('\x02')
# Step 4. Receive Challenge (par 6.2.2)
challenge = sock.recv(1024)
self.assertEquals(len(challenge), 16)
# Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
response = d3des_generate_response(
(console["password"] + '\0' * 8)[:8], challenge)
sock.send(response)
# Step 6. SecurityResult (par 6.1.3)
result = sock.recv(4)
self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
sock.close()
# """
# console = self.cyclades.get_server_console(self.serverid)
# self.assertEquals(console['type'], "vnc")
# sock = self._insist_on_tcp_connection(socket.AF_INET,
# console["host"], console["port"])
# # Step 1. ProtocolVersion message (par. 6.1.1)
# version = sock.recv(1024)
# self.assertEquals(version, 'RFB 003.008\n')
# sock.send(version)
# # Step 2. Security (par 6.1.2): Only VNC Authentication supported
# sec = sock.recv(1024)
# self.assertEquals(list(sec), ['\x01', '\x02'])
# # Step 3. Request VNC Authentication (par 6.1.2)
# sock.send('\x02')
# # Step 4. Receive Challenge (par 6.2.2)
# challenge = sock.recv(1024)
# self.assertEquals(len(challenge), 16)
# # Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
# response = d3des_generate_response(
# (console["password"] + '\0' * 8)[:8], challenge)
# sock.send(response)
# # Step 6. SecurityResult (par 6.1.3)
# result = sock.recv(4)
# self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
# sock.close()
def test_004_server_has_ipv4(self):
"""Test active server has a valid IPv4 address"""
......@@ -737,13 +740,14 @@ class NetworkTestCase(unittest.TestCase):
def _get_ipv4(self, server):
"""Get the public IPv4 of a server from the detailed server info"""
public_addrs = filter(lambda x: x["id"] == "public",
server["addresses"]["values"])
public_addrs = filter(lambda x: x["network_id"] == "public",
server["attachments"]["values"])
self.assertEqual(len(public_addrs), 1)
ipv4_addrs = filter(lambda x: x["version"] == 4,
public_addrs[0]["values"])
self.assertEqual(len(ipv4_addrs), 1)
return ipv4_addrs[0]["addr"]
self.assertTrue(public_addrs[0]['ipv4'] != None)
return public_addrs[0]['ipv4']
def _connect_loginname(self, os):
"""Return the login name for connections based on the server OS"""
......@@ -852,7 +856,7 @@ class NetworkTestCase(unittest.TestCase):
name = SNF_TEST_PREFIX + TEST_RUN_ID
previous_num = len(self.client.list_networks())
network = self.client.create_network(name)
network = self.client.create_network(name,cidr='10.0.0.1/28')
#Test if right name is assigned
self.assertEqual(network['name'], name)
......@@ -862,8 +866,21 @@ class NetworkTestCase(unittest.TestCase):
cls.networkid = network['id']
networks = self.client.list_networks()
fail_tmout = time.time() + self.action_timeout
#Test if new network is created
self.assertTrue(len(networks) > previous_num)
while True:
d = self.client.get_network_details(network['id'])
if d['status'] == 'ACTIVE':
connected = True
break
elif time.time() > fail_tmout:
self.assertLess(time.time(), fail_tmout)
else:
log.info("Waiting for network to become ACTIVE")
time.sleep(self.query_interval)
self.assertTrue(connected)
def test_002_connect_to_network(self):
"""Test connect VMs to network"""
......@@ -877,10 +894,11 @@ class NetworkTestCase(unittest.TestCase):
fail_tmout = time.time() + self.action_timeout
while True:
connected = (self.client.get_network_details(self.networkid))
connections = connected['servers']['values']
if (self.serverid['A'] in connections) \
and (self.serverid['B'] in connections):
netsA=[x['network_id'] for x in self.client.get_server_details(self.serverid['A'])['attachments']['values']]
netsB=[x['network_id'] for x in self.client.get_server_details(self.serverid['B'])['attachments']['values']]
if (self.networkid in netsA) and (self.networkid in netsB):
conn_exists = True
break
elif time.time() > fail_tmout:
......@@ -1042,7 +1060,7 @@ class NetworkTestCase(unittest.TestCase):
user=loginname, password=myPass
):
if len(sudo('ifconfig eth1 192.168.0.12')) == 0:
if len(sudo('ifconfig eth1 10.0.0.5')) == 0:
res = True
else:
......@@ -1053,7 +1071,7 @@ class NetworkTestCase(unittest.TestCase):
user=loginname, password=myPass
):
if len(run('ifconfig eth1 192.168.0.12')) == 0:
if len(run('ifconfig eth1 10.0.0.5')) == 0:
res = True
self.assertTrue(res)
......@@ -1094,7 +1112,7 @@ class NetworkTestCase(unittest.TestCase):
user=loginname, password=myPass
):
if len(sudo('ifconfig eth1 192.168.0.13')) == 0:
if len(sudo('ifconfig eth1 10.0.0.6')) == 0:
res = True
else:
......@@ -1105,7 +1123,7 @@ class NetworkTestCase(unittest.TestCase):
user=loginname, password=myPass
):
if len(run('ifconfig eth1 192.168.0.13')) == 0:
if len(run('ifconfig eth1 10.0.0.6')) == 0:
res = True
self.assertTrue(res)
......@@ -1141,7 +1159,7 @@ class NetworkTestCase(unittest.TestCase):
except socket.error:
raise AssertionError
cmd = "if ping -c 2 -w 3 192.168.0.13 >/dev/null; \
cmd = "if ping -c 2 -w 3 10.0.0.6 >/dev/null; \
then echo \'True\'; fi;"
stdin, stdout, stderr = ssh.exec_command(cmd)
lines = stdout.readlines()
......@@ -1159,19 +1177,31 @@ class NetworkTestCase(unittest.TestCase):
log.info("Disconnecting servers from private network")
prev_state = self.client.get_network_details(self.networkid)
prev_conn = len(prev_state['servers']['values'])
prev_nics = prev_state['attachments']['values']
prev_conn = len(prev_nics)
nicsA=[x['id'] for x in self.client.get_server_details(self.serverid['A'])['attachments']['values']]
nicsB=[x['id'] for x in self.client.get_server_details(self.serverid['B'])['attachments']['values']]
for nic in prev_nics:
if nic in nicsA:
self.client.disconnect_server(self.serverid['A'], nic)
if nic in nicsB:
self.client.disconnect_server(self.serverid['B'], nic)
self.client.disconnect_server(self.serverid['A'], self.networkid)
self.client.disconnect_server(self.serverid['B'], self.networkid)
#Insist on deleting until action timeout
fail_tmout = time.time() + self.action_timeout
while True:
netsA=[x['network_id'] for x in self.client.get_server_details(self.serverid['A'])['attachments']['values']]
netsB=[x['network_id'] for x in self.client.get_server_details(self.serverid['B'])['attachments']['values']]
connected = (self.client.get_network_details(self.networkid))
connections = connected['servers']['values']
if ((self.serverid['A'] not in connections) and
(self.serverid['B'] not in connections)):
connections = connected['attachments']['values']
if (self.networkid not in netsA) and (self.networkid not in netsB):
conn_exists = False
break
elif time.time() > fail_tmout:
......@@ -1187,13 +1217,28 @@ class NetworkTestCase(unittest.TestCase):
log.info("Submitting delete network request")
self.client.delete_network(self.networkid)
networks = self.client.list_networks()
fail_tmout = time.time() + self.action_timeout
while True:
curr_net = []
for net in networks:
curr_net.append(net['id'])
curr_net = []
networks = self.client.list_networks()
for net in networks:
curr_net.append(net['id'])
if self.networkid not in curr_net:
self.assertTrue(self.networkid not in curr_net)
break
self.assertTrue(self.networkid not in curr_net)
elif time.time() > fail_tmout:
self.assertLess(time.time(), fail_tmout)
else:
time.sleep(self.query_interval)
def test_006_cleanup_servers(self):
"""Cleanup servers created for this test"""
......@@ -1230,7 +1275,7 @@ class TestRunnerProcess(Process):
Process.__init__(self, **kw)
kwargs = kw["kwargs"]
self.testq = kwargs["testq"]
self.runner = kwargs["runner"]
self.worker_folder = kwargs["worker_folder"]
def run(self):
# Make sure this test runner process dies with the parent
......@@ -1238,25 +1283,75 @@ class TestRunnerProcess(Process):
#
# WARNING: This uses the prctl(2) call and is
# Linux-specific.
prctl.set_pdeathsig(signal.SIGHUP)
multi = logging.getLogger("multiprocess")
while True:
log.debug("I am process %d, GETting from queue is %s",
os.getpid(), self.testq)
multi.debug("I am process %d, GETting from queue is %s" %
(os.getpid(), self.testq))
msg = self.testq.get()
log.debug("Dequeued msg: %s", msg)
multi.debug("Dequeued msg: %s" % msg)
if msg == "TEST_RUNNER_TERMINATE":
raise SystemExit
elif issubclass(msg, unittest.TestCase):
# Assemble a TestSuite, and run it
log_file = os.path.join(self.worker_folder, 'details_' +
(msg.__name__) + "_" +
TEST_RUN_ID + '.log')
fail_file = os.path.join(self.worker_folder, 'failed_' +
(msg.__name__) + "_" +
TEST_RUN_ID + '.log')
error_file = os.path.join(self.worker_folder, 'error_' +
(msg.__name__) + "_" +
TEST_RUN_ID + '.log')
f = open(log_file, 'w')
fail = open(fail_file,'w')
error = open(error_file, 'w')
log.info(yellow + '* Starting testcase: %s' % msg + normal)
runner = unittest.TextTestRunner(f, verbosity=2, failfast = True)
suite = unittest.TestLoader().loadTestsFromTestCase(msg)
self.runner.run(suite)
result = runner.run(suite)
for res in result.errors:
log.error("snf-burnin encountered an error in " \
"testcase: %s" %msg)
log.error("See log for details")
error.write(str(res[0]) + '\n')
error.write(str(res[0].shortDescription()) + '\n')
error.write('\n')
for res in result.failures:
log.error("snf-burnin failed in testcase: %s" %msg)
log.error("See log for details")
fail.write(str(res[0]) + '\n')
fail.write(str(res[0].shortDescription()) + '\n')
fail.write('\n')
if NOFAILFAST == False:
sys.exit()
if (len(result.failures) == 0) and (len(result.errors) == 0):
log.debug("Passed testcase: %s" %msg)
f.close()
fail.close()
error.close()
else:
raise Exception("Cannot handle msg: %s" % msg)
def _run_cases_in_parallel(cases, fanout=1, runner=None):
def _run_cases_in_parallel(cases, fanout, image_folder):
"""Run instances of TestCase in parallel, in a number of distinct processes
The cases iterable specifies the TestCases to be executed in parallel,
......@@ -1267,31 +1362,48 @@ def _run_cases_in_parallel(cases, fanout=1, runner=None):
runner process.
"""
if runner is None:
runner = unittest.TextTestRunner(verbosity=2, failfast=True)
multi = logging.getLogger("multiprocess")
handler = logging.StreamHandler()
multi.addHandler(handler)
# testq: The master process enqueues TestCase objects into this queue,
# test runner processes pick them up for execution, in parallel.
testq = Queue()
if VERBOSE:
multi.setLevel(logging.DEBUG)
else:
multi.setLevel(logging.INFO)
testq = []
worker_folder = []
runners = []
for i in xrange(0,fanout):
testq.append(Queue())
worker_folder.append(os.path.join(image_folder, 'process'+str(i)))
os.mkdir(worker_folder[i])
for i in xrange(0, fanout):
kwargs = dict(testq=testq, runner=runner)
kwargs = dict(testq=testq[i], worker_folder=worker_folder[i])
runners.append(TestRunnerProcess(kwargs=kwargs))
log.info("Spawning %d test runner processes", len(runners))
multi.debug("Spawning %d test runner processes" %len(runners))
for p in runners:
p.start()
log.debug("Spawned %d test runners, PIDs are %s",
len(runners), [p.pid for p in runners])
# Enqueue test cases
map(testq.put, cases)
map(testq.put, ["TEST_RUNNER_TERMINATE"] * len(runners))
for i in xrange(0, fanout):
map(testq[i].put, cases)
testq[i].put("TEST_RUNNER_TERMINATE")
multi.debug("Spawned %d test runners, PIDs are %s" %
(len(runners), [p.pid for p in runners]))
log.debug("Joining %d processes", len(runners))
multi.debug("Joining %d processes" % len(runners))
for p in runners:
p.join()
log.debug("Done joining %d processes", len(runners))
multi.debug("Done joining %d processes" % len(runners))
def _spawn_server_test_case(**kwargs):
......@@ -1308,7 +1420,9 @@ def _spawn_server_test_case(**kwargs):
# Make sure the class can be pickled, by listing it among
# the attributes of __main__. A PicklingError is raised otherwise.
setattr(__main__, name, cls)
thismodule = sys.modules[__name__]
setattr(thismodule, name, cls)
return cls
......@@ -1320,7 +1434,9 @@ def _spawn_network_test_case(**kwargs):
# Make sure the class can be pickled, by listing it among
# the attributes of __main__. A PicklingError is raised otherwise.
setattr(__main__, name, cls)
thismodule = sys.modules[__name__]
setattr(thismodule, name, cls)
return cls
......@@ -1347,15 +1463,32 @@ def cleanup_servers(delete_stale=False):
print >> sys.stderr, "Use --delete-stale to delete them."
def cleanup_networks(delete_stale=False):
def cleanup_networks(action_timeout, query_interval,delete_stale=False):
def isSnfTest(s):
if s.find(SNF_TEST_PREFIX) == -1:
return False
else:
return True
c = CycladesClient(API, TOKEN)
networks = c.list_networks()
stale = [n for n in networks if n["name"].startswith(SNF_TEST_PREFIX)]
if len(stale) == 0:
return
fail_tmout = time.time() + action_timeout
while True:
servers = c.list_servers()
staleServers = [s for s in servers if s["name"].startswith(SNF_TEST_PREFIX)]
if len(staleServers) == 0:
break
elif time.time() > fail_tmout:
log.error("Stale servers not deleted from previous run")
sys.exit()
else:
time.sleep(query_interval)
print >> sys.stderr, yellow + "Found these stale networks from previous runs:" + normal
print " " + \
......@@ -1480,6 +1613,10 @@ def parse_arguments(args):
help="Define the absolute path where the output \
log is stored. ",
default="/var/log/burnin/")
parser.add_option("--verbose", "-V",
action="store_true", dest="verbose",
help="Print detailed output about multiple processes spawning",
default=False)
parser.add_option("--set-tests",
action="callback",
dest="tests",
......@@ -1492,9 +1629,6 @@ def parse_arguments(args):
default='all',
callback=parse_comma)
# FIXME: Change the default for build-fanout to 10
# FIXME: Allow the user to specify a specific set of Images to test
(opts, args) = parser.parse_args(args)
# Verify arguments
......@@ -1538,17 +1672,19 @@ def main():
(opts, args) = parse_arguments(sys.argv[1:])
global API, TOKEN, PLANKTON, PLANKTON_USER, NO_IPV6
global API, TOKEN, PLANKTON, PLANKTON_USER, NO_IPV6, VERBOSE, NOFAILFAST
API = opts.api
TOKEN = opts.token
PLANKTON = opts.plankton
PLANKTON_USER = opts.plankton_user
NO_IPV6 = opts.no_ipv6
VERBOSE = opts.verbose
NOFAILFAST = opts.nofailfast
# Cleanup stale servers from previous runs
if opts.show_stale:
cleanup_servers(delete_stale=opts.delete_stale)
cleanup_networks(delete_stale=opts.delete_stale)
cleanup_networks(opts.action_timeout, opts.query_interval, delete_stale=opts.delete_stale)
return 0
# Initialize a kamaki instance, get flavors, images
......@@ -1616,6 +1752,7 @@ def main():
query_interval=opts.query_interval,
)
NetworkTestCase = _spawn_network_test_case(
action_timeout=opts.action_timeout,
imageid=imageid,
......@@ -1643,49 +1780,9 @@ def main():
image_folder = os.path.join(test_folder, imageid)
os.mkdir(image_folder)
for case in seq_cases:
test = (key for key, value in test_dict.items()
if value == case).next()
log.info(yellow + '* Starting testcase: %s' %test + normal)
log_file = os.path.join(image_folder, 'details_' +
(case.__name__) + "_" +
TEST_RUN_ID + '.log')
fail_file = os.path.join(image_folder, 'failed_' +
(case.__name__) + "_" +
TEST_RUN_ID + '.log')
error_file = os.path.join(image_folder, 'error_' +
(case.__name__) + "_" +
TEST_RUN_ID + '.log')
f = open(log_file, "w")
fail = open(fail_file, "w")
error = open(error_file, "w")
suite = unittest.TestLoader().loadTestsFromTestCase(case)
runner = unittest.TextTestRunner(f, verbosity=2, failfast=True)
result = runner.run(suite)
for res in result.errors:
log.error("snf-burnin encountered an error in " \
"testcase: %s" %test)
log.error("See log for details")
error.write(str(res[0]) + '\n')
error.write(str(res[0].shortDescription()) + '\n')
error.write('\n')
for res in result.failures:
log.error("snf-burnin failed in testcase: %s" %test)
log.error("See log for details")
fail.write(str(res[0]) + '\n')
fail.write(str(res[0].shortDescription()) + '\n')
fail.write('\n')
if opts.nofailfast == False:
sys.exit()
if (len(result.failures) == 0) and (len(result.errors) == 0):
log.debug("Passed testcase: %s" %test)
log.info('Parallel spawn:')
_run_cases_in_parallel(seq_cases, opts.fanout, image_folder)
if __name__ == "__main__":
sys.exit(main())