Commit 2288b8e8 authored by Ilias Tsitsimpis's avatar Ilias Tsitsimpis
Browse files

burnin: Remove old implementation

We are going to re-implement burnin.
The goal is to have a cleaner structure (not all code in one file),
a better logging schema and remove code that is unused or duplicated.

Refs #3385
parent 30726141
#!/usr/bin/env python
# Copyright 2011 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
"""Perform integration testing on a running Synnefo deployment"""
#import __main__
import datetime
import inspect
import logging
import os
import os.path
import paramiko
import prctl
import subprocess
import signal
import socket
import sys
import time
import tempfile
from base64 import b64encode
from IPy import IP
from multiprocessing import Process, Queue
from random import choice, randint
from optparse import OptionParser, OptionValueError
from kamaki.clients.compute import ComputeClient
from kamaki.clients.cyclades import CycladesClient
from kamaki.clients.image import ImageClient
from kamaki.clients.pithos import PithosClient
from kamaki.clients.astakos import AstakosClient
from kamaki.clients import ClientError
from vncauthproxy.d3des import generate_response as d3des_generate_response
# Use backported unittest functionality if Python < 2.7
try:
import unittest2 as unittest
except ImportError:
if sys.version_info < (2, 7):
raise Exception("The unittest2 package is required for Python < 2.7")
import unittest
# --------------------------------------------------------------------
# Global Variables
AUTH_URL = None
TOKEN = None
SYSTEM_IMAGES_USER = None
NO_IPV6 = None
NOFAILFAST = None
VERBOSE = None
# A unique id identifying this test run
TEST_RUN_ID = datetime.datetime.strftime(datetime.datetime.now(),
"%Y%m%d%H%M%S")
SNF_TEST_PREFIX = "snf-test-"
red = '\x1b[31m'
yellow = '\x1b[33m'
green = '\x1b[32m'
normal = '\x1b[0m'
# --------------------------------------------------------------------
# Global functions
def _ssh_execute(hostip, username, password, command):
"""Execute a command via ssh"""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(hostip, username=username, password=password)
except socket.error, err:
raise AssertionError(err)
try:
stdin, stdout, stderr = ssh.exec_command(command)
except paramiko.SSHException, err:
raise AssertionError(err)
status = stdout.channel.recv_exit_status()
output = stdout.readlines()
ssh.close()
return output, status
def _get_user_id():
"""Authenticate to astakos and get unique users id"""
astakos = AstakosClient(AUTH_URL, TOKEN)
astakos.CONNECTION_RETRY_LIMIT = 2
authenticate = astakos.authenticate()
return authenticate['access']['user']['id']
# --------------------------------------------------------------------
# BurninTestReulst class
class BurninTestResult(unittest.TextTestResult):
def addSuccess(self, test):
super(BurninTestResult, self).addSuccess(test)
if self.showAll:
if hasattr(test, 'result_dict'):
run_details = test.result_dict
self.stream.write("\n")
for i in run_details:
self.stream.write("%s : %s \n" % (i, run_details[i]))
self.stream.write("\n")
elif self.dots:
self.stream.write('.')
self.stream.flush()
def addError(self, test, err):
super(BurninTestResult, self).addError(test, err)
if self.showAll:
self.stream.writeln("ERROR")
if hasattr(test, 'result_dict'):
run_details = test.result_dict
self.stream.write("\n")
for i in run_details:
self.stream.write("%s : %s \n" % (i, run_details[i]))
self.stream.write("\n")
elif self.dots:
self.stream.write('E')
self.stream.flush()
def addFailure(self, test, err):
super(BurninTestResult, self).addFailure(test, err)
if self.showAll:
self.stream.writeln("FAIL")
if hasattr(test, 'result_dict'):
run_details = test.result_dict
self.stream.write("\n")
for i in run_details:
self.stream.write("%s : %s \n" % (i, run_details[i]))
self.stream.write("\n")
elif self.dots:
self.stream.write('F')
self.stream.flush()
# --------------------------------------------------------------------
# Format Results
class burninFormatter(logging.Formatter):
err_fmt = red + "ERROR: %(message)s" + normal
dbg_fmt = green + "* %(message)s" + normal
info_fmt = "%(message)s"
def __init__(self, fmt="%(levelno)s: %(message)s"):
logging.Formatter.__init__(self, fmt)
def format(self, record):
format_orig = self._fmt
# Replace the original format with one customized by logging level
if record.levelno == 10: # DEBUG
self._fmt = burninFormatter.dbg_fmt
elif record.levelno == 20: # INFO
self._fmt = burninFormatter.info_fmt
elif record.levelno == 40: # ERROR
self._fmt = burninFormatter.err_fmt
result = logging.Formatter.format(self, record)
self._fmt = format_orig
return result
log = logging.getLogger("burnin")
log.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(burninFormatter())
log.addHandler(handler)
# --------------------------------------------------------------------
# UnauthorizedTestCase class
class UnauthorizedTestCase(unittest.TestCase):
"""Test unauthorized access"""
@classmethod
def setUpClass(cls):
cls.astakos = AstakosClient(AUTH_URL, TOKEN)
cls.astakos.CONNECTION_RETRY_LIMIT = 2
cls.compute_url = \
cls.astakos.get_service_endpoints('compute')['publicURL']
cls.result_dict = dict()
def test_unauthorized_access(self):
"""Test access without a valid token fails"""
log.info("Authentication test")
falseToken = '12345'
c = ComputeClient(self.compute_url, falseToken)
c.CONNECTION_RETRY_LIMIT = 2
with self.assertRaises(ClientError) as cm:
c.list_servers()
self.assertEqual(cm.exception.status, 401)
# --------------------------------------------------------------------
# This class gest replicated into Images TestCases dynamically
class ImagesTestCase(unittest.TestCase):
"""Test image lists for consistency"""
@classmethod
def setUpClass(cls):
"""Initialize kamaki, get (detailed) list of images"""
log.info("Getting simple and detailed list of images")
cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
# Compute Client
compute_url = \
cls.astakos_client.get_service_endpoints('compute')['publicURL']
cls.compute_client = ComputeClient(compute_url, TOKEN)
cls.compute_client.CONNECTION_RETRY_LIMIT = 2
# Image Client
image_url = \
cls.astakos_client.get_service_endpoints('image')['publicURL']
cls.image_client = ImageClient(image_url, TOKEN)
cls.image_client.CONNECTION_RETRY_LIMIT = 2
# Pithos Client
pithos_url = cls.astakos_client.\
get_service_endpoints('object-store')['publicURL']
cls.pithos_client = PithosClient(pithos_url, TOKEN)
cls.pithos_client.CONNECTION_RETRY_LIMIT = 2
# Get images
cls.images = \
filter(lambda x: not x['name'].startswith(SNF_TEST_PREFIX),
cls.image_client.list_public())
cls.dimages = \
filter(lambda x: not x['name'].startswith(SNF_TEST_PREFIX),
cls.image_client.list_public(detail=True))
cls.result_dict = dict()
# Get uniq user id
cls.uuid = _get_user_id()
log.info("Uniq user id = %s" % cls.uuid)
# Create temp directory and store it inside our class
# XXX: In my machine /tmp has not enough space
# so use current directory to be sure.
cls.temp_dir = tempfile.mkdtemp(dir=os.getcwd())
cls.temp_image_name = \
SNF_TEST_PREFIX + cls.imageid + ".diskdump"
@classmethod
def tearDownClass(cls):
"""Remove local files"""
try:
temp_file = os.path.join(cls.temp_dir, cls.temp_image_name)
os.unlink(temp_file)
except:
pass
try:
os.rmdir(cls.temp_dir)
except:
pass
def test_001_list_images(self):
"""Test image list actually returns images"""
self.assertGreater(len(self.images), 0)
def test_002_list_images_detailed(self):
"""Test detailed image list is the same length as list"""
self.assertEqual(len(self.dimages), len(self.images))
def test_003_same_image_names(self):
"""Test detailed and simple image list contain same names"""
names = sorted(map(lambda x: x["name"], self.images))
dnames = sorted(map(lambda x: x["name"], self.dimages))
self.assertEqual(names, dnames)
def test_004_unique_image_names(self):
"""Test system images have unique names"""
sys_images = filter(lambda x: x['owner'] == SYSTEM_IMAGES_USER,
self.dimages)
names = sorted(map(lambda x: x["name"], sys_images))
self.assertEqual(sorted(list(set(names))), names)
def test_005_image_metadata(self):
"""Test every image has specific metadata defined"""
keys = frozenset(["osfamily", "root_partition"])
details = self.compute_client.list_images(detail=True)
sys_images = filter(lambda x: x['user_id'] == SYSTEM_IMAGES_USER,
details)
for i in sys_images:
self.assertTrue(keys.issubset(i["metadata"].keys()))
def test_006_download_image(self):
"""Download image from pithos+"""
# Get image location
image = filter(
lambda x: x['id'] == self.imageid, self.dimages)[0]
image_location = \
image['location'].replace("://", " ").replace("/", " ").split()
log.info("Download image, with owner %s\n\tcontainer %s, and name %s"
% (image_location[1], image_location[2], image_location[3]))
self.pithos_client.account = image_location[1]
self.pithos_client.container = image_location[2]
temp_file = os.path.join(self.temp_dir, self.temp_image_name)
with open(temp_file, "wb+") as f:
self.pithos_client.download_object(image_location[3], f)
def test_007_upload_image(self):
"""Upload and register image"""
temp_file = os.path.join(self.temp_dir, self.temp_image_name)
log.info("Upload image to pithos+")
# Create container `images'
self.pithos_client.account = self.uuid
self.pithos_client.container = "images"
self.pithos_client.container_put()
with open(temp_file, "rb+") as f:
self.pithos_client.upload_object(self.temp_image_name, f)
log.info("Register image to plankton")
location = "pithos://" + self.uuid + \
"/images/" + self.temp_image_name
params = {'is_public': False}
properties = {'OSFAMILY': "linux", 'ROOT_PARTITION': 1}
self.image_client.register(
self.temp_image_name, location, params, properties)
# Get image id
details = self.image_client.list_public(detail=True)
detail = filter(lambda x: x['location'] == location, details)
self.assertEqual(len(detail), 1)
cls = type(self)
cls.temp_image_id = detail[0]['id']
log.info("Image registered with id %s" % detail[0]['id'])
def test_008_cleanup_image(self):
"""Cleanup image test"""
log.info("Cleanup image test")
# Remove image from pithos+
self.pithos_client.account = self.uuid
self.pithos_client.container = "images"
self.pithos_client.del_object(self.temp_image_name)
# --------------------------------------------------------------------
# FlavorsTestCase class
class FlavorsTestCase(unittest.TestCase):
"""Test flavor lists for consistency"""
@classmethod
def setUpClass(cls):
"""Initialize kamaki, get (detailed) list of flavors"""
log.info("Getting simple and detailed list of flavors")
cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
# Compute Client
compute_url = \
cls.astakos_client.get_service_endpoints('compute')['publicURL']
cls.compute_client = ComputeClient(compute_url, TOKEN)
cls.compute_client.CONNECTION_RETRY_LIMIT = 2
cls.flavors = cls.compute_client.list_flavors()
cls.dflavors = cls.compute_client.list_flavors(detail=True)
cls.result_dict = dict()
def test_001_list_flavors(self):
"""Test flavor list actually returns flavors"""
self.assertGreater(len(self.flavors), 0)
def test_002_list_flavors_detailed(self):
"""Test detailed flavor list is the same length as list"""
self.assertEquals(len(self.dflavors), len(self.flavors))
def test_003_same_flavor_names(self):
"""Test detailed and simple flavor list contain same names"""
names = sorted(map(lambda x: x["name"], self.flavors))
dnames = sorted(map(lambda x: x["name"], self.dflavors))
self.assertEqual(names, dnames)
def test_004_unique_flavor_names(self):
"""Test flavors have unique names"""
names = sorted(map(lambda x: x["name"], self.flavors))
self.assertEqual(sorted(list(set(names))), names)
def test_005_well_formed_flavor_names(self):
"""Test flavors have names of the form CxxRyyDzz
Where xx is vCPU count, yy is RAM in MiB, zz is Disk in GiB
"""
for f in self.dflavors:
flavor = (f["vcpus"], f["ram"], f["disk"], f["SNF:disk_template"])
self.assertEqual("C%dR%dD%d%s" % flavor,
f["name"],
"Flavor %s does not match its specs." % f["name"])
# --------------------------------------------------------------------
# ServersTestCase class
class ServersTestCase(unittest.TestCase):
"""Test server lists for consistency"""
@classmethod
def setUpClass(cls):
"""Initialize kamaki, get (detailed) list of servers"""
log.info("Getting simple and detailed list of servers")
cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
# Compute Client
compute_url = \
cls.astakos_client.get_service_endpoints('compute')['publicURL']
cls.compute_client = ComputeClient(compute_url, TOKEN)
cls.compute_client.CONNECTION_RETRY_LIMIT = 2
cls.servers = cls.compute_client.list_servers()
cls.dservers = cls.compute_client.list_servers(detail=True)
cls.result_dict = dict()
# def test_001_list_servers(self):
# """Test server list actually returns servers"""
# self.assertGreater(len(self.servers), 0)
def test_002_list_servers_detailed(self):
"""Test detailed server list is the same length as list"""
self.assertEqual(len(self.dservers), len(self.servers))
def test_003_same_server_names(self):
"""Test detailed and simple flavor list contain same names"""
names = sorted(map(lambda x: x["name"], self.servers))
dnames = sorted(map(lambda x: x["name"], self.dservers))
self.assertEqual(names, dnames)
# --------------------------------------------------------------------
# Pithos Test Cases
class PithosTestCase(unittest.TestCase):
"""Test pithos functionality"""
@classmethod
def setUpClass(cls):
"""Initialize kamaki, get list of containers"""
# Get uniq user id
cls.uuid = _get_user_id()
log.info("Uniq user id = %s" % cls.uuid)
log.info("Getting list of containers")
cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
# Pithos Client
pithos_url = cls.astakos_client.\
get_service_endpoints('object-store')['publicURL']
cls.pithos_client = PithosClient(pithos_url, TOKEN, cls.uuid)
cls.pithos_client.CONNECTION_RETRY_LIMIT = 2
cls.containers = cls.pithos_client.list_containers()
cls.result_dict = dict()
def test_001_list_containers(self):
"""Test container list actually returns containers"""
self.assertGreater(len(self.containers), 0)
def test_002_unique_containers(self):
"""Test if containers have unique names"""
names = [n['name'] for n in self.containers]
names = sorted(names)
self.assertEqual(sorted(list(set(names))), names)
def test_003_create_container(self):
"""Test create a container"""
rand_num = randint(1000, 9999)
rand_name = "%s%s" % (SNF_TEST_PREFIX, rand_num)
names = [n['name'] for n in self.containers]
while rand_name in names:
rand_num = randint(1000, 9999)
rand_name = "%s%s" % (SNF_TEST_PREFIX, rand_num)
# Create container
self.pithos_client.container = rand_name
self.pithos_client.container_put()
# Get list of containers
new_containers = self.pithos_client.list_containers()
new_container_names = [n['name'] for n in new_containers]
self.assertIn(rand_name, new_container_names)
def test_004_upload(self):
"""Test uploading something to pithos+"""
# Create a tmp file
with tempfile.TemporaryFile() as f:
f.write("This is a temp file")
f.seek(0, 0)
# Where to save file
self.pithos_client.upload_object("test.txt", f)
def test_005_download(self):
"""Test download something from pithos+"""
# Create tmp directory to save file
tmp_dir = tempfile.mkdtemp()
tmp_file = os.path.join(tmp_dir, "test.txt")
with open(tmp_file, "wb+") as f:
self.pithos_client.download_object("test.txt", f)
# Read file
f.seek(0, 0)
content = f.read()
# Remove files
os.unlink(tmp_file)
os.rmdir(tmp_dir)
# Compare results
self.assertEqual(content, "This is a temp file")
def test_006_remove(self):
"""Test removing files and containers"""
cont_name = self.pithos_client.container
self.pithos_client.del_object("test.txt")
self.pithos_client.purge_container()
# List containers
containers = self.pithos_client.list_containers()
cont_names = [n['name'] for n in containers]
self.assertNotIn(cont_name, cont_names)
# --------------------------------------------------------------------
# This class gets replicated into actual TestCases dynamically
class SpawnServerTestCase(unittest.TestCase):
"""Test scenario for server of the specified image"""
@classmethod
def setUpClass(cls):
"""Initialize a kamaki instance"""
log.info("Spawning server for image `%s'" % cls.imagename)
cls.astakos_client = AstakosClient(AUTH_URL, TOKEN)
cls.astakos_client.CONNECTION_RETRY_LIMIT = 2
# Cyclades Client
compute_url = \
cls.astakos_client.get_service_endpoints('compute')['publicURL']
cls.cyclades_client = CycladesClient(compute_url, TOKEN)
cls.cyclades_client.CONNECTION_RETRY_LIMIT = 2
cls.result_dict = dict()
def _get_ipv4(self, server):
"""Get the public IPv4 of a server from the detailed server info"""
nics = server["attachments"]
for nic in nics:
net_id = nic["network_id"]
if self.cyclades_client.get_network_details(net_id)["public"]:
public_addrs = nic["ipv4"]
self.assertTrue(public_addrs is not None)
return public_addrs
def _get_ipv6(self, server):
"""Get the public IPv6 of a server from the detailed server info"""
nics = server["attachments"]
for nic in nics:
net_id = nic["network_id"]
if self.cyclades_client.get_network_details(net_id)["public"]:
public_addrs = nic["ipv6"]
self.assertTrue(public_addrs is not None)
return public_addrs
def _connect_loginname(self, os_value):
"""Return the login name for connections based on the server OS"""
if os_value in ("Ubuntu", "Kubuntu", "Fedora"):
return "user"
elif os_value in ("windows", "windows_alpha1"):
return "Administrator"
else:
return "root"
def _verify_server_status(self, current_status, new_status):
"""Verify a server has switched to a specified status"""
server = self.cyclades_client.get_server_details(self.serverid)
if server["status"] not in (current_status, new_status):
return None # Do not raise exception, return so the test fails
self.assertEquals(server["status"], new_status)
def _get_connected_tcp_socket(self, family, host, port):
"""Get a connected socket from the specified family to host:port"""
sock = None
for res in \
socket.getaddrinfo(host, port, family, socket.SOCK_STREAM, 0,
socket.AI_PASSIVE):
af, socktype, proto, canonname, sa = res
try: