Commit 31c08008 authored by Vangelis Koukis's avatar Vangelis Koukis
Browse files

Improve integration test suite

Implement generic timeout handler, implement PING IPv4 and IPv6 test,
implement connecting over ssh to the public IPv4 and IPv6 address for
hostname verification, numerous fixes.
parent 100d585d
......@@ -42,17 +42,21 @@ import os
import paramiko
import subprocess
import socket
import struct
import sys
import time
from random import choice
from IPy import IP
from random import choice
from kamaki.client import Client, ClientError
from vncauthproxy.d3des import generate_response as d3des_generate_response
# Use backported unittest functionality if Python < 2.7
try:
import unittest2 as unittest
except ImportError:
if sys.version_info < (2, 7):
raise Exception("The unittest2 package is required for Python < 2.7")
import unittest
......@@ -187,10 +191,11 @@ class SpawnServerTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Initialize a kamaki instance"""
log.info("Spawning server for image `%s'", self.imagename)
log.info("Spawning server for image `%s'", cls.imagename)
cls.client = Client(API, TOKEN)
def _get_ipv4(self, server):
"""Get the public IPv4 of a server from the detailed server info"""
public_addrs = filter(lambda x: x["id"] == "public",
server["addresses"]["values"])
self.assertEqual(len(public_addrs), 1)
......@@ -200,6 +205,7 @@ class SpawnServerTestCase(unittest.TestCase):
return ipv4_addrs[0]["addr"]
def _get_ipv6(self, server):
"""Get the public IPv6 of a server from the detailed server info"""
public_addrs = filter(lambda x: x["id"] == "public",
server["addresses"]["values"])
self.assertEqual(len(public_addrs), 1)
......@@ -208,58 +214,113 @@ class SpawnServerTestCase(unittest.TestCase):
self.assertEqual(len(ipv6_addrs), 1)
return ipv6_addrs[0]["addr"]
def _get_tcp_connection(family, host, port):
tmout = time.time() + self.action_timeout
while True:
self.assertLess(time.time(), tmout,
"Timed out trying to to %s:%s" % (host, port))
sock = None
for res in \
socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, 0,
socket.AI_PASSIVE):
af, socktype, proto, canonname, sa = res
try:
sock = socket.socket(af, socktype, proto)
except socket.error as msg:
sock = None
continue
try:
sock.connect(sa)
except socket.error as msg:
sock.close()
sock = None
continue
self.assertIsNotNone(sock)
return sock
time.sleep(self.query_interval)
def _connect_loginname(self, os):
"""Return the login name for connections based on the server OS"""
if os in ('ubuntu', 'kubuntu', 'fedora'):
return 'user'
elif os == 'windows':
return 'Administrator'
else:
return 'root'
def _verify_server_status(self, current_status, new_status):
"""Verify a server has switched to a specified status"""
log.info("Getting status for server %d, Image %s",
self.serverid, self.imagename)
server = self.client.get_server_details(self.serverid)
self.assertIn(server["status"], (current_status, new_status))
self.assertEquals(server["status"], new_status)
def _get_connected_tcp_socket(self, family, host, port):
"""Get a connected socket from the specified family to host:port"""
sock = None
for res in \
socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, 0,
socket.AI_PASSIVE):
af, socktype, proto, canonname, sa = res
try:
sock = socket.socket(af, socktype, proto)
except socket.error as msg:
sock = None
continue
try:
sock.connect(sa)
except socket.error as msg:
sock.close()
sock = None
continue
self.assertIsNotNone(sock)
return sock
def _ping_once(self, ipv6, ip):
"""Test server responds to a single IPv4 or IPv6 ping"""
log.info("PING IPv%s to %s", "6" if ipv6 else "4", ip)
cmd = "ping%s -c 2 -w 3 %s" % ("6" if ipv6 else "", ip)
ping = subprocess.Popen(cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = ping.communicate()
ret = ping.wait()
self.assertEquals(ret, 0)
def _wait_for_status_transition(self, current_status, new_status,
fail_timeout, warn_timeout=None):
if warn_timeout is None:
def _get_hostname_over_ssh(self, hostip, username, password):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(hostip, username=username, password=password)
except socket.error:
raise AssertionError
stdin, stdout, stderr = ssh.exec_command("hostname")
lines = stdout.readlines()
self.assertEqual(len(lines), 1)
return lines
def _try_until_timeout_expires(self, warn_timeout, fail_timeout,
opmsg, callable, *args, **kwargs):
if warn_timeout == fail_timeout:
warn_timeout = fail_timeout + 1
warn_tmout = time.time() + warn_timeout
fail_tmout = time.time() + fail_timeout
while True:
if time.time() > warn_tmout:
log.warning("Server %d: %s still not %s",
self.serverid, self.servername, new_status)
self.assertLess(time.time(), fail_tmout)
log.info("Getting status for server %d, Image %s",
self.serverid, self.imagename)
server = self.client.get_server_details(self.serverid)
self.assertIn(server["status"], (current_status, new_status))
if server["status"] == new_status:
break
log.warning("Server %d: %s operation '%s' not done yet",
opmsg, self.serverid, self.servername)
try:
return callable(*args, **kwargs)
except AssertionError:
pass
self.assertLess(time.time(), fail_tmout,
"operation '%s' timed out" % opmsg)
time.sleep(self.query_interval)
def _get_hostname_over_ssh(self, hostip, username, password):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAdPolicy())
ssh.connect(hostip, username=username, password=password)
stdin, stdout, stderr = ssh.exec_command("hostname")
lines = stdout.readlines()
self.assertEqual(len(lines), 1)
self.assertEqual(lines[0], "notsnf-%s" % self.serverid)
def _insist_on_tcp_connection(self, family, host, port):
familystr = {socket.AF_INET: 'IPv4', socket.AF_INET6: 'IPv6'}
msg = "connect over %s to %s:%s" % \
(familystr.get(family, "Unknown"), host, port)
sock = self._try_until_timeout_expires(
self.action_timeout, self.action_timeout,
msg, self._get_connected_tcp_socket,
family, host, port)
return sock
def _insist_on_status_transition(self, current_status, new_status,
fail_timeout, warn_timeout=None):
msg = "status transition %s -> %s" % (current_status, new_status)
if warn_timeout is None:
warn_timeout = fail_timeout
self._try_until_timeout_expires(warn_timeout, fail_timeout,
msg, self._verify_server_status,
current_status, new_status)
def _insist_on_ssh_hostname(self, hostip, username, password):
msg = "ssh to %s, as %s/%s" % (hostip, username, password)
hostname = self._try_until_timeout_expires(
self.action_timeout, self.action_timeout,
msg, self._get_hostname_over_ssh,
hostip, username, password)
# The hostname must be of the form 'prefix-id'
self.assertTrue(hostname.endswith("-%d\n" % self.serverid))
def _skipIf(self, condition, msg):
if condition:
......@@ -277,6 +338,7 @@ class SpawnServerTestCase(unittest.TestCase):
# Update class attributes to reflect data on building server
cls = type(self)
cls.serverid = server["id"]
cls.username = None
cls.passwd = server["adminPass"]
def test_002a_server_is_building_in_list(self):
......@@ -300,8 +362,17 @@ class SpawnServerTestCase(unittest.TestCase):
def test_002c_set_server_metadata(self):
image = self.client.get_image_details(self.imageid)
self.client.update_server_metadata(
self.serverid, OS=image["metadata"]["values"]["OS"])
os = image["metadata"]["values"]["OS"]
loginname = image["metadata"]["values"].get("loginname", None)
self.client.update_server_metadata(self.serverid, OS=os)
# Determine the username to use for future connections
# to this host
cls = type(self)
cls.username = loginname
if not cls.username:
cls.username = self._connect_loginname(os)
self.assertIsNotNone(cls.username)
def test_002d_verify_server_metadata(self):
"""Test server metadata keys are set based on image metadata"""
......@@ -311,17 +382,45 @@ class SpawnServerTestCase(unittest.TestCase):
def test_003_server_becomes_active(self):
"""Test server becomes ACTIVE"""
self._wait_for_status_transition("BUILD", "ACTIVE",
self._insist_on_status_transition("BUILD", "ACTIVE",
self.build_fail, self.build_warning)
def test_003a_get_server_oob_console(self):
"""Test getting OOB server console over VNC"""
"""Test getting OOB server console over VNC
Implementation of RFB protocol follows
http://www.realvnc.com/docs/rfbproto.pdf.
"""
console = self.client.get_server_console(self.serverid)
self.assertEquals(console['type'], "vnc")
sock = self._get_tcp_connection(socket.AF_UNSPEC,
sock = self._insist_on_tcp_connection(socket.AF_UNSPEC,
console["host"], console["port"])
# Step 1. ProtocolVersion message (par. 6.1.1)
version = sock.recv(1024)
self.assertTrue(version.startswith("RFB "))
self.assertEquals(version, 'RFB 003.008\n')
sock.send(version)
# Step 2. Security (par 6.1.2): Only VNC Authentication supported
sec = sock.recv(1024)
self.assertEquals(list(sec), ['\x01', '\x02'])
# Step 3. Request VNC Authentication (par 6.1.2)
sock.send('\x02')
# Step 4. Receive Challenge (par 6.2.2)
challenge = sock.recv(1024)
self.assertEquals(len(challenge), 16)
# Step 5. DES-Encrypt challenge, use password as key (par 6.2.2)
response = d3des_generate_response(
(console["password"] + '\0' * 8)[:8], challenge)
sock.send(response)
# Step 6. SecurityResult (par 6.1.3)
result = sock.recv(4)
self.assertEquals(list(result), ['\x00', '\x00', '\x00', '\x00'])
sock.close()
def test_004_server_has_ipv4(self):
......@@ -339,26 +438,20 @@ class SpawnServerTestCase(unittest.TestCase):
def test_006_server_responds_to_ping_IPv4(self):
"""Test server responds to ping on IPv4 address"""
server = self.client.get_server_details(self.serverid)
ping = subprocess.Popen("ping -c 4 -w 4 %s" % self._get_ipv4(server),
shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(stdout, stderr) = ping.communicate()
ret = ping.wait()
self.assertEquals(ret, 0,
"ping IPv4 %s failed.\nStdout:\n%s\nStderr:\n%s" %
(self._get_ipv4(server), stdout, stderr))
ip = self._get_ipv4(server)
self._try_until_timeout_expires(self.action_timeout,
self.action_timeout,
"PING IPv4", self._ping_once,
False, ip)
def test_007_server_responds_to_ping_IPv6(self):
"""Test server responds to ping on IPv6 address"""
server = self.client.get_server_details(self.serverid)
ping6 = subprocess.Popen("ping6 -c 4 -w 4 %s" % self._get_ipv6(server),
shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(stdout, stderr) = ping6.communicate()
ret = ping6.wait()
self.assertEquals(ret, 0,
"ping IPv6 %s failed.\nStdout:\n%s\nStderr:\n%s" %
(self._get_ipv6(server), stdout, stderr))
ip = self._get_ipv6(server)
self._try_until_timeout_expires(self.action_timeout,
self.action_timeout,
"PING IPv6", self._ping_once,
True, ip)
def test_008_submit_shutdown_request(self):
"""Test submit request to shutdown server"""
......@@ -366,7 +459,8 @@ class SpawnServerTestCase(unittest.TestCase):
def test_009_server_becomes_stopped(self):
"""Test server becomes STOPPED"""
self._wait_for_status_transition("ACTIVE", "STOPPED",
self._insist_on_status_transition("ACTIVE", "STOPPED",
self.action_timeout,
self.action_timeout)
def test_010_submit_start_request(self):
......@@ -375,7 +469,8 @@ class SpawnServerTestCase(unittest.TestCase):
def test_011_server_becomes_active(self):
"""Test server becomes ACTIVE again"""
self._wait_for_status_transition("STOPPED", "ACTIVE",
self._insist_on_status_transition("STOPPED", "ACTIVE",
self.action_timeout,
self.action_timeout)
def test_011a_server_responds_to_ping_IPv4(self):
......@@ -384,26 +479,24 @@ class SpawnServerTestCase(unittest.TestCase):
def test_012_ssh_to_server_IPv4(self):
"""Test SSH to server public IPv4 works, verify hostname"""
self._skipIf(self.is_windows, "only for Linux servers")
self._skipIf(self.is_windows, "only valid for Linux servers")
server = self.client.get_server_details(self.serverid)
hostname = self._get_hostname_over_ssh(self._get_ipv4(server),
"root", self.passwd)
self.assertEqual(hostname, "notsnf-%s" % self.serverid)
self._insist_on_ssh_hostname(self._get_ipv4(server),
self.username, self.passwd)
def test_013_ssh_to_server_IPv6(self):
"""Test SSH to server public IPv6 works, verify hostname"""
self._skipIf(self.is_windows, "only for Linux servers")
self._skipIf(self.is_windows, "only valid for Linux servers")
server = self.client.get_server_details(self.serverid)
hostname = self._get_hostname_over_ssh(self._get_ipv6(server),
"root", self.passwd)
self.assertEqual(hostname, "notsnf-%s" % self.serverid)
self._insist_on_ssh_hostname(self._get_ipv6(server),
self.username, self.passwd)
def test_014_rdp_to_server_IPv4(self):
"Test RDP connection to server public IPv4 works"""
self._skipIf(not self.is_windows, "only for Windows servers")
self._skipIf(not self.is_windows, "only valid for Windows servers")
server = self.client.get_server_details(self.serverid)
ipv4 = self._get_ipv4(server)
sock = _get_tcp_connection(socket.AF_INET, ipv4, 3389)
sock = _insist_on_tcp_connection(socket.AF_INET, ipv4, 3389)
# No actual RDP processing done. We assume the RDP server is there
# if the connection to the RDP port is successful.
......@@ -411,7 +504,7 @@ class SpawnServerTestCase(unittest.TestCase):
def test_015_rdp_to_server_IPv6(self):
"Test RDP connection to server public IPv6 works"""
self._skipIf(not self.is_windows, "only for Windows servers")
self._skipIf(not self.is_windows, "only valid for Windows servers")
server = self.client.get_server_details(self.serverid)
ipv6 = self._get_ipv6(server)
sock = _get_tcp_connection(socket.AF_INET6, ipv6, 3389)
......@@ -422,7 +515,7 @@ class SpawnServerTestCase(unittest.TestCase):
def test_016_personality_is_enforced(self):
"""Test file injection for personality enforcement"""
self._skipIf(self.is_windows, "only for Linux servers")
self._skipIf(self.is_windows, "only implemented for Linux servers")
self.assertTrue(False, "test not implemented, will fail")
......@@ -470,6 +563,10 @@ def parse_arguments(args):
parser = OptionParser(**kw)
parser.disable_interspersed_args()
parser.add_option("--failfast",
action="store_true", dest="failfast",
help="Fail immediately if one of the tests fails",
default=False)
parser.add_option("--action-timeout",
action="store", type="int", dest="action_timeout",
metavar="TIMEOUT",
......@@ -587,7 +684,7 @@ def main():
#
suites = map(unittest.TestLoader().loadTestsFromTestCase, cases)
alltests = unittest.TestSuite(suites)
unittest.TextTestRunner(verbosity=2).run(alltests)
unittest.TextTestRunner(verbosity=2, failfast=opts.failfast).run(alltests)
if __name__ == "__main__":
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment