Commit 92c1c259 authored by Vangelis Koukis's avatar Vangelis Koukis
Browse files

Execute integration tests in parallel

Add support for execution of integration tests in parallel,
in processes executing distinct instances of the TextTestRunner.

A single TestCase runs in the same process, so class fixtures work.
Distinct subclasses of TestCase are run in parallel and may share no
data.

Refs #1155
parent 8c0c42ce
......@@ -35,19 +35,24 @@
"""Perform integration testing on a running Synnefo deployment"""
import __main__
import datetime
import inspect
import logging
import os
import paramiko
import prctl
import subprocess
import signal
import socket
import struct
import sys
import time
from IPy import IP
from multiprocessing import Process, Queue
from random import choice
from kamaki.client import Client, ClientError
from vncauthproxy.d3des import generate_response as d3des_generate_response
......@@ -60,12 +65,14 @@ except ImportError:
import unittest
API = "http://dev67.dev.grnet.gr:8000/api/v1.1"
TOKEN = "46e427d657b20defe352804f0eb6f8a2"
API = None
TOKEN = None
DEFAULT_API = "http://dev67.dev.grnet.gr:8000/api/v1.1"
DEFAULT_TOKEN = "46e427d657b20defe352804f0eb6f8a2"
# A unique id identifying this test run
UNIQUE_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
"%Y%m%d%H%M%S")
SNF_TEST_PREFIX = "snf-test"
TEST_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
"%Y%m%d%H%M%S")
SNF_TEST_PREFIX = "snf-test-"
# Setup logging (FIXME - verigak)
logging.basicConfig(format="%(message)s")
......@@ -216,17 +223,19 @@ class SpawnServerTestCase(unittest.TestCase):
def _connect_loginname(self, os):
"""Return the login name for connections based on the server OS"""
if os in ('ubuntu', 'kubuntu', 'fedora'):
return 'user'
elif os == 'windows':
return 'Administrator'
if os in ("ubuntu", "kubuntu", "fedora"):
return "user"
elif os == "windows":
return "Administrator"
else:
return 'root'
return "root"
def _verify_server_status(self, current_status, new_status):
"""Verify a server has switched to a specified status"""
server = self.client.get_server_details(self.serverid)
self.assertIn(server["status"], (current_status, new_status))
if server["status"] not in (current_status, new_status):
return None # Do not raise exception, return so the test fails
self.assertEquals(server["status"], new_status)
def _get_connected_tcp_socket(self, family, host, port):
......@@ -279,7 +288,7 @@ class SpawnServerTestCase(unittest.TestCase):
fail_tmout = time.time() + fail_timeout
while True:
self.assertLess(time.time(), fail_tmout,
"operation '%s' timed out" % opmsg)
"operation `%s' timed out" % opmsg)
if time.time() > warn_tmout:
log.warning("Server %d: `%s' operation `%s' not done yet",
self.serverid, self.servername, opmsg)
......@@ -291,7 +300,8 @@ class SpawnServerTestCase(unittest.TestCase):
time.sleep(self.query_interval)
def _insist_on_tcp_connection(self, family, host, port):
familystr = {socket.AF_INET: 'IPv4', socket.AF_INET6: 'IPv6'}
familystr = {socket.AF_INET: "IPv4", socket.AF_INET6: "IPv6",
socket.AF_UNSPEC: "Unspecified-IPv4/6"}
msg = "connect over %s to %s:%s" % \
(familystr.get(family, "Unknown"), host, port)
sock = self._try_until_timeout_expires(
......@@ -309,6 +319,9 @@ class SpawnServerTestCase(unittest.TestCase):
self._try_until_timeout_expires(warn_timeout, fail_timeout,
msg, self._verify_server_status,
current_status, new_status)
# Ensure the status is actually the expected one
server = self.client.get_server_details(self.serverid)
self.assertEquals(server["status"], new_status)
def _insist_on_ssh_hostname(self, hostip, username, password):
msg = "SSH to %s, as %s/%s" % (hostip, username, password)
......@@ -531,13 +544,85 @@ class SpawnServerTestCase(unittest.TestCase):
def test_019_server_no_longer_in_server_list(self):
"""Test server is no longer in server list"""
servers = self.client.list_servers()
self.assertNotIn(self.serverid, [s['id'] for s in servers])
self.assertNotIn(self.serverid, [s["id"] for s in servers])
class TestRunnerProcess(Process):
"""A distinct process used to execute part of the tests in parallel"""
def __init__(self, **kw):
Process.__init__(self, **kw)
kwargs = kw["kwargs"]
self.testq = kwargs["testq"]
self.runner = kwargs["runner"]
def run(self):
# Make sure this test runner process dies with the parent
# and is not left behind.
#
# WARNING: This uses the prctl(2) call and is
# Linux-specific.
prctl.set_pdeathsig(signal.SIGHUP)
while True:
log.debug("I am process %d, GETting from queue is %s",
os.getpid(), self.testq)
msg = self.testq.get()
log.debug("Dequeued msg: %s", msg)
if msg == "TEST_RUNNER_TERMINATE":
raise SystemExit
elif issubclass(msg, unittest.TestCase):
# Assemble a TestSuite, and run it
suite = unittest.TestLoader().loadTestsFromTestCase(msg)
self.runner.run(suite)
else:
raise Exception("Cannot handle msg: %s" % msg)
def _run_cases_in_parallel(cases, fanout=1, runner=None):
"""Run instances of TestCase in parallel, in a number of distinct processes
The cases iterable specifies the TestCases to be executed in parallel,
by test runners running in distinct processes.
The fanout parameter specifies the number of processes to spawn,
and defaults to 1.
The runner argument specifies the test runner class to use inside each
runner process.
"""
if runner is None:
runner = unittest.TextTestRunner()
# testq: The master process enqueues TestCase objects into this queue,
# test runner processes pick them up for execution, in parallel.
testq = Queue()
runners = []
for i in xrange(0, fanout):
kwargs = dict(testq=testq, runner=runner)
runners.append(TestRunnerProcess(kwargs=kwargs))
log.info("Spawning %d test runner processes", len(runners))
for p in runners:
p.start()
log.debug("Spawned %d test runners, PIDs are %s",
len(runners), [p.pid for p in runners])
# Enqueue test cases
map(testq.put, cases)
map(testq.put, ["TEST_RUNNER_TERMINATE"] * len(runners))
log.debug("Joining %d processes", len(runners))
for p in runners:
p.join()
log.debug("Done joining %d processes", len(runners))
def _spawn_server_test_case(**kwargs):
"""Construct a new unit test case class from SpawnServerTestCase"""
name = "SpawnServerTestCase_%s" % kwargs["imagename"].replace(" ", "_")
name = "SpawnServerTestCase_%d" % kwargs["imageid"]
cls = type(name, (SpawnServerTestCase,), kwargs)
# Patch extra parameters into test names by manipulating method docstrings
......@@ -545,7 +630,7 @@ def _spawn_server_test_case(**kwargs):
inspect.getmembers(cls, lambda x: inspect.ismethod(x)):
if hasattr(m, __doc__):
m.__func__.__doc__ = "[%s] %s" % (imagename, m.__doc__)
setattr(__main__,name,cls)
return cls
......@@ -559,12 +644,12 @@ def cleanup_servers(delete_stale=False):
print >> sys.stderr, "Found these stale servers from previous runs:"
print " " + \
"\n ".join(["%d: %s" % (s['id'], s['name']) for s in stale])
"\n ".join(["%d: %s" % (s["id"], s["name"]) for s in stale])
if delete_stale:
print >> sys.stderr, "Deleting %d stale servers:" % len(stale)
for server in stale:
c.delete_server(server['id'])
c.delete_server(server["id"])
print >> sys.stderr, " ...done"
else:
print >> sys.stderr, "Use --delete-stale to delete them."
......@@ -581,6 +666,14 @@ def parse_arguments(args):
parser = OptionParser(**kw)
parser.disable_interspersed_args()
parser.add_option("--api",
action="store", type="string", dest="api",
help="The API URI to use to reach the Synnefo API",
default=DEFAULT_API)
parser.add_option("--token",
action="store", type="string", dest="token",
help="The token to use for authentication to the API",
default=DEFAULT_TOKEN)
parser.add_option("--failfast",
action="store_true", dest="failfast",
help="Fail immediately if one of the tests fails",
......@@ -609,11 +702,12 @@ def parse_arguments(args):
help="Query server status when requests are pending " \
"every INTERVAL seconds",
default=3)
parser.add_option("--build-fanout",
action="store", type="int", dest="build_fanout",
parser.add_option("--fanout",
action="store", type="int", dest="fanout",
metavar="COUNT",
help="Test COUNT images in parallel, by submitting " \
"COUNT server build requests simultaneously",
help="Spawn up to COUNT child processes to execute " \
"in parallel, essentially have up to COUNT " \
"server build requests outstanding",
default=1)
parser.add_option("--force-flavor",
action="store", type="int", dest="force_flavorid",
......@@ -625,12 +719,12 @@ def parse_arguments(args):
parser.add_option("--show-stale",
action="store_true", dest="show_stale",
help="Show stale servers from previous runs, whose "\
"name starts with '%s'" % SNF_TEST_PREFIX,
"name starts with `%s'" % SNF_TEST_PREFIX,
default=False)
parser.add_option("--delete-stale",
action="store_true", dest="delete_stale",
help="Delete stale servers from previous runs, whose "\
"name starts with '%s'" % SNF_TEST_PREFIX,
"name starts with `%s'" % SNF_TEST_PREFIX,
default=False)
# FIXME: Change the default for build-fanout to 10
......@@ -650,13 +744,17 @@ def main():
IMPORTANT: Tests have dependencies and have to be run in the specified
order inside a single test case. They communicate through attributes of the
corresponding TestCase class (shared fixtures). TestCase classes for
distinct Images may be run in parallel.
corresponding TestCase class (shared fixtures). Distinct subclasses of
TestCase MAY SHARE NO DATA, since they are run in parallel, in distinct
test runner processes.
"""
(opts, args) = parse_arguments(sys.argv[1:])
global API, TOKEN
API = opts.api
TOKEN = opts.token
# Cleanup stale servers from previous runs
if opts.show_stale:
cleanup_servers(delete_stale=opts.delete_stale)
......@@ -667,43 +765,43 @@ def main():
DIMAGES = c.list_images(detail=True)
DFLAVORS = c.list_flavors(detail=True)
#
# Assemble all test cases
#
# Initial test cases
cases = [UnauthorizedTestCase, FlavorsTestCase, ImagesTestCase]
# Image test cases
imageid = 1
flavorid = opts.force_flavorid if opts.force_flavorid \
else choice(filter(lambda x: x["disk"] >= 20, DFLAVORS))["id"]
imagename = "Debian Base"
personality = None
servername = "%s-%s for %s" % (SNF_TEST_PREFIX, UNIQUE_RUN_ID, imagename)
is_windows = imagename.lower().find("windows") >= 0
case = _spawn_server_test_case(imageid=imageid, flavorid=flavorid,
imagename=imagename,
personality=personality,
servername=servername,
is_windows=is_windows,
action_timeout=opts.action_timeout,
build_warning=opts.build_warning,
build_fail=opts.build_fail,
query_interval=opts.query_interval)
cases.append(case)
# FIXME: logging, log, UNIQUE_RUN_ID arguments
# FIXME: logging, log, LOG PID, TEST_RUN_ID, arguments
# FIXME: Network testing? Create, destroy, connect, ping, disconnect VMs?
# Run them: FIXME: In parallel, FAILEARLY, catchbreak?
#unittest.main(verbosity=2, catchbreak=True)
#
# Run the resulting test suite
#
suites = map(unittest.TestLoader().loadTestsFromTestCase, cases)
alltests = unittest.TestSuite(suites)
unittest.TextTestRunner(verbosity=2, failfast=opts.failfast).run(alltests)
runner = unittest.TextTestRunner(verbosity=2, failfast=opts.failfast)
# The following cases run sequentially
seq_cases = [UnauthorizedTestCase, FlavorsTestCase, ImagesTestCase]
_run_cases_in_parallel(seq_cases, fanout=3, runner=runner)
# The following cases run in parallel
par_cases = []
for image in DIMAGES:
imageid = image["id"]
imagename = image["name"]
if opts.force_flavorid:
flavorid = opts.force_flavorid
else:
flavorid = choice([f["id"] for f in DFLAVORS if f["disk"] >= 20])
personality = None # FIXME
servername = "%s%s for %s" % (SNF_TEST_PREFIX, TEST_RUN_ID, imagename)
is_windows = imagename.lower().find("windows") >= 0
case = _spawn_server_test_case(imageid=imageid, flavorid=flavorid,
imagename=imagename,
personality=personality,
servername=servername,
is_windows=is_windows,
action_timeout=opts.action_timeout,
build_warning=opts.build_warning,
build_fail=opts.build_fail,
query_interval=opts.query_interval)
par_cases.append(case)
print "%s" % FlavorsTestCase
print "dict", __main__.__dict__
_run_cases_in_parallel(par_cases, fanout=opts.fanout, runner=runner)
if __name__ == "__main__":
sys.exit(main())
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment