Commit aed05739 authored by Ilias Tsitsimpis's avatar Ilias Tsitsimpis
Browse files

Add Pithos TestCases

parent 838a96a0
......@@ -40,6 +40,7 @@ import datetime
import inspect
import logging
import os
import os.path
import paramiko
import prctl
import subprocess
......@@ -47,15 +48,17 @@ import signal
import socket
import sys
import time
import tempfile
from base64 import b64encode
from IPy import IP
from multiprocessing import Process, Queue
from random import choice
from random import choice, randint
from optparse import OptionParser, OptionValueError
from kamaki.clients.compute import ComputeClient
from kamaki.clients.cyclades import CycladesClient
from kamaki.clients.image import ImageClient
from kamaki.clients.pithos import PithosClient
from kamaki.clients import ClientError
from vncauthproxy.d3des import generate_response as d3des_generate_response
......@@ -74,9 +77,9 @@ API = None
TOKEN = None
PLANKTON = None
PLANKTON_USER = None
PITHOS = None
PITHOS_USER = None
NO_IPV6 = None
DEFAULT_API = "https://cyclades.okeanos.grnet.gr/api/v1.1"
DEFAULT_PLANKTON = "https://cyclades.okeanos.grnet.gr/plankton"
DEFAULT_PLANKTON_USER = "images@okeanos.grnet.gr"
NOFAILFAST = None
VERBOSE = None
......@@ -326,6 +329,82 @@ class ServersTestCase(unittest.TestCase):
self.assertEqual(names, dnames)
# --------------------------------------------------------------------
# Pithos Test Cases
class PithosTestCase(unittest.TestCase):
"""Test pithos functionality"""
@classmethod
def setUpClass(cls):
"""Initialize kamaki, get list of containers"""
log.info("Getting list of containers")
cls.client = PithosClient(PITHOS, TOKEN, PITHOS_USER)
cls.containers = cls.client.list_containers()
cls.result_dict = dict()
def test_001_list_containers(self):
"""Test container list actually returns containers"""
self.assertGreater(len(self.containers), 0)
def test_002_unique_containers(self):
"""Test if containers have unique names"""
names = [n['name'] for n in self.containers]
names = sorted(names)
self.assertEqual(sorted(list(set(names))), names)
def test_003_create_container(self):
"""Test create a container"""
rand_num = randint(1000, 9999)
rand_name = "%s%s" % (SNF_TEST_PREFIX, rand_num)
names = [n['name'] for n in self.containers]
while rand_name in names:
rand_num = randint(1000, 9999)
rand_name = "%s%s" % (SNF_TEST_PREFIX, rand_num)
# Create container
self.client.container = rand_name
self.client.container_put()
# Get list of containers
new_containers = self.client.list_containers()
new_container_names = [n['name'] for n in new_containers]
self.assertIn(rand_name, new_container_names)
def test_004_upload(self):
"""Test uploading something to pithos+"""
# Create a tmp file
with tempfile.TemporaryFile() as f:
f.write("This is a temp file")
f.seek(0, 0)
# Where to save file
self.client.upload_object("test.txt", f)
def test_005_download(self):
"""Test download something from pithos+"""
# Create tmp directory to save file
tmp_dir = tempfile.mkdtemp()
tmp_file = os.path.join(tmp_dir, "test.txt")
with open(tmp_file, "wb+") as f:
self.client.download_object("test.txt", f)
# Read file
f.seek(0, 0)
content = f.read()
# Remove files
os.unlink(tmp_file)
os.rmdir(tmp_dir)
# Compare results
self.assertEqual(content, "This is a temp file")
def test_006_remove(self):
"""Test removing files and containers"""
cont_name = self.client.container
self.client.del_object("test.txt")
self.client.purge_container()
# List containers
containers = self.client.list_containers()
cont_names = [n['name'] for n in containers]
self.assertNotIn(cont_name, cont_names)
# --------------------------------------------------------------------
# This class gets replicated into actual TestCases dynamically
class SpawnServerTestCase(unittest.TestCase):
"""Test scenario for server of the specified image"""
......@@ -1673,7 +1752,8 @@ def cleanup_networks(action_timeout, query_interval, delete_stale=False):
# Parse arguments functions
def parse_comma(option, opt, value, parser):
tests = set(['all', 'auth', 'images', 'flavors',
'servers', 'server_spawn', 'network_spawn'])
'pithos', 'servers', 'server_spawn',
'network_spawn'])
parse_input = value.split(',')
if not (set(parse_input)).issubset(tests):
......@@ -1696,15 +1776,23 @@ def parse_arguments(args):
parser.add_option("--api",
action="store", type="string", dest="api",
help="The API URI to use to reach the Synnefo API",
default=DEFAULT_API)
default=None)
parser.add_option("--plankton",
action="store", type="string", dest="plankton",
help="The API URI to use to reach the Plankton API",
default=DEFAULT_PLANKTON)
default=None)
parser.add_option("--plankton-user",
action="store", type="string", dest="plankton_user",
help="Owner of system images",
default=DEFAULT_PLANKTON_USER)
parser.add_option("--pithos",
action="store", type="string", dest="pithos",
help="The API URI to use to reach the Pithos API",
default=None)
parser.add_option("--pithos_user",
action="store", type="string", dest="pithos_user",
help="Owner of the pithos account",
default=None)
parser.add_option("--token",
action="store", type="string", dest="token",
help="The token to use for authentication to the API")
......@@ -1793,7 +1881,7 @@ def parse_arguments(args):
help='Set comma seperated tests for this run. \
Available tests: auth, images, flavors, \
servers, server_spawn, \
network_spawn. \
network_spawn, pithos. \
Default = all',
default='all',
callback=parse_comma)
......@@ -1807,14 +1895,14 @@ def parse_arguments(args):
if opts.delete_stale:
opts.show_stale = True
# `image-id' is mandatory
# `token' is mandatory
_mandatory_argument(opts.token, "--token")
# `api' is mandatory
_mandatory_argument(opts.api, "--api")
if not opts.show_stale:
if not opts.force_imageid:
print >>sys.stderr, red + \
"The --image-id argument is mandatory.\n" + \
normal
parser.print_help()
sys.exit(1)
# `image-id' is mandatory
_mandatory_argument(opts.force_imageid, "--image_id")
if opts.force_imageid != 'all':
try:
opts.force_imageid = str(opts.force_imageid)
......@@ -1824,17 +1912,23 @@ def parse_arguments(args):
"--image-id. Use a valid id, or `all'." + \
normal
sys.exit(1)
# `pithos' is mandatory
_mandatory_argument(opts.pithos, "--pithos")
# `pithos_user' is mandatory
_mandatory_argument(opts.pithos_user, "--pithos_user")
# `plankton' is mandatory
_mandatory_argument(opts.plankton, "--plankton")
# `token' is mandatory
if not opts.token:
return (opts, args)
def _mandatory_argument(Arg, Str):
if not Arg:
print >>sys.stderr, red + \
"The --token argument is mandatory.\n" + \
"The " + Str + " argument is mandatory.\n" + \
normal
parser.print_help()
sys.exit(1)
return (opts, args)
# --------------------------------------------------------------------
# Burnin main function
......@@ -1853,11 +1947,14 @@ def main():
(opts, args) = parse_arguments(sys.argv[1:])
# Some global variables
global API, TOKEN, PLANKTON, PLANKTON_USER, NO_IPV6, VERBOSE, NOFAILFAST
global API, TOKEN, PLANKTON, PLANKTON_USER
global PITHOS, PITHOS_USER, NO_IPV6, VERBOSE, NOFAILFAST
API = opts.api
TOKEN = opts.token
PLANKTON = opts.plankton
PLANKTON_USER = opts.plankton_user
PITHOS = opts.pithos
PITHOS_USER = opts.pithos_user
NO_IPV6 = opts.no_ipv6
VERBOSE = opts.verbose
NOFAILFAST = opts.nofailfast
......@@ -1944,12 +2041,15 @@ def main():
'images': ImagesTestCase,
'flavors': FlavorsTestCase,
'servers': ServersTestCase,
'pithos': PithosTestCase,
'server_spawn': ServerTestCase,
'network_spawn': NetworkTestCase}
seq_cases = []
if 'all' in opts.tests:
seq_cases = [UnauthorizedTestCase, ImagesTestCase, FlavorsTestCase,
ServersTestCase, ServerTestCase, NetworkTestCase]
seq_cases = [UnauthorizedTestCase, ImagesTestCase,
FlavorsTestCase, ServersTestCase,
PithosTestCase, ServerTestCase,
NetworkTestCase]
else:
for test in opts.tests:
seq_cases.append(test_dict[test])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment