Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
itminedu
kamaki
Commits
d1f78278
Commit
d1f78278
authored
Feb 06, 2013
by
Stavros Sachtouris
Browse files
Complete clients.cyclades unittests
parent
db77d79e
Changes
4
Hide whitespace changes
Inline
Side-by-side
kamaki/cli/commands/errors.py
View file @
d1f78278
...
...
@@ -227,7 +227,11 @@ class cyclades(object):
try
:
return
foo
(
self
,
*
args
,
**
kwargs
)
except
ClientError
as
ce
:
if
network_id
or
ce
.
status
==
421
:
if
network_id
and
ce
.
status
==
400
:
raiseCLIError
(
ce
,
'Network with id %s does not exist'
%
network_id
,
details
=
self
.
about_network_id
)
elif
network_id
or
ce
.
status
==
421
:
raiseCLIError
(
ce
,
'Network with id %s is in use'
%
network_id
,
details
=
[
...
...
kamaki/cli/commands/test_cli.py
View file @
d1f78278
...
...
@@ -45,11 +45,10 @@ _commands = [test_cmds]
class
_test_init
(
_command_init
):
def
main
(
self
,
client
,
method
=
None
):
tests
.
cnf
=
self
.
config
if
method
:
tests
.
main
([
client
,
method
])
tests
.
main
([
client
,
method
]
,
config
=
self
.
config
)
else
:
tests
.
main
([
client
])
tests
.
main
([
client
]
,
config
=
self
.
config
)
@
command
(
test_cmds
)
...
...
kamaki/clients/tests/__init__.py
View file @
d1f78278
...
...
@@ -45,7 +45,6 @@ def _add_value(foo, value):
return
foo
(
self
,
value
)
return
wrap
class
Generic
(
TestCase
):
_waits
=
[]
...
...
@@ -53,9 +52,9 @@ class Generic(TestCase):
_grp
=
None
_fetched
=
{}
def
__init__
(
self
,
specific
=
None
,
config
_file
=
None
,
group
=
None
):
def
__init__
(
self
,
specific
=
None
,
config
=
None
,
group
=
None
):
super
(
Generic
,
self
).
__init__
(
specific
)
self
.
_cnf
=
C
onfig
(
config_file
)
self
.
_cnf
=
c
onfig
or
Config
(
)
self
.
_grp
=
group
self
.
_waits
.
append
(
0.71828
)
for
i
in
range
(
10
):
...
...
@@ -111,6 +110,10 @@ class Generic(TestCase):
return
(
action_bar
,
action_cb
)
=
self
.
_safe_progress_bar
(
msg
)
action_gen
=
action_cb
(
len
(
items
))
try
:
action_gen
.
next
()
except
Exception
:
pass
for
item
in
items
:
action
(
item
)
action_gen
.
next
()
...
...
@@ -154,7 +157,10 @@ def init_parser():
def
main
(
argv
):
_main
(
argv
,
config
=
None
)
def
_main
(
argv
,
config
=
None
):
suiteFew
=
TestSuite
()
"""
if len(argv) == 0 or argv[0] == 'pithos':
...
...
@@ -166,17 +172,17 @@ def main(argv):
if
len
(
argv
)
==
0
or
argv
[
0
]
==
'cyclades'
:
from
kamaki.clients.tests.cyclades
import
Cyclades
test_method
=
'test_%s'
%
(
argv
[
1
]
if
len
(
argv
)
>
1
else
'000'
)
suiteFew
.
addTest
(
Cyclades
(
test_method
))
suiteFew
.
addTest
(
Cyclades
(
test_method
,
config
))
if
len
(
argv
)
==
0
or
argv
[
0
]
==
'image'
:
from
kamaki.clients.tests.image
import
Image
test_method
=
'test_%s'
%
(
argv
[
1
]
if
len
(
argv
)
>
1
else
'000'
)
suiteFew
.
addTest
(
Image
(
test_method
))
suiteFew
.
addTest
(
Image
(
test_method
,
config
))
if
len
(
argv
)
==
0
or
argv
[
0
]
==
'astakos'
:
from
kamaki.clients.tests.astakos
import
Astakos
if
len
(
argv
)
==
1
:
suiteFew
.
addTest
(
makeSuite
(
Astakos
))
suiteFew
.
addTest
(
makeSuite
(
Astakos
,
config
))
else
:
suiteFew
.
addTest
(
Astakos
(
'test_'
+
argv
[
1
]))
suiteFew
.
addTest
(
Astakos
(
'test_'
+
argv
[
1
]
,
config
))
TextTestRunner
(
verbosity
=
2
).
run
(
suiteFew
)
...
...
kamaki/clients/tests/cyclades.py
View file @
d1f78278
...
...
@@ -63,10 +63,12 @@ class Cyclades(tests.Generic):
def
tearDown
(
self
):
"""Destoy servers used in testing"""
self
.
do_with_progress_bar
(
self
.
_delete_network
,
'Delete %s networks'
%
len
(
self
.
networks
),
self
.
networks
.
keys
())
for
net
in
self
.
networks
.
keys
():
self
.
_delete_network
(
net
)
#self.do_with_progress_bar(
# self._delete_network,
# 'Delete %s networks' % len(self.networks),
# self.networks.keys())
for
server
in
self
.
servers
.
values
():
self
.
_delete_server
(
server
[
'id'
])
print
(
'DEL VM %s (%s)'
%
(
server
[
'id'
],
server
[
'name'
]))
...
...
@@ -111,7 +113,7 @@ class Cyclades(tests.Generic):
print
(
'Disconnect nics of network %s'
%
netid
)
self
.
client
.
disconnect_network_nics
(
netid
)
def
netwait
(
self
,
wait
):
def
netwait
(
wait
):
try
:
self
.
client
.
delete_network
(
netid
)
except
ClientError
:
...
...
@@ -119,11 +121,11 @@ class Cyclades(tests.Generic):
self
.
do_with_progress_bar
(
netwait
,
'Delete network %s'
%
netid
,
self
.
_waits
[:
5
])
self
.
_waits
[:
7
])
def
_wait_for_network
(
self
,
netid
,
status
):
def
netwait
(
self
,
wait
):
def
netwait
(
wait
):
r
=
self
.
client
.
get_network_details
(
netid
)
if
r
[
'status'
]
==
status
:
return
...
...
@@ -136,7 +138,7 @@ class Cyclades(tests.Generic):
def
_wait_for_nic
(
self
,
netid
,
servid
,
in_creation
=
True
):
self
.
_wait_for_network
(
netid
,
'ACTIVE'
)
def
nicwait
(
self
,
wait
):
def
nicwait
(
wait
):
nics
=
self
.
client
.
list_server_nics
(
servid
)
for
net
in
nics
:
found_nic
=
net
[
'network_id'
]
==
netid
...
...
@@ -151,6 +153,8 @@ class Cyclades(tests.Generic):
servid
,
''
if
in_creation
else
'dis'
),
self
.
_waits
[:
5
])
return
netid
in
[
net
[
'network_id'
]
\
for
net
in
self
.
client
.
list_server_nics
(
servid
)]
def
_has_status
(
self
,
servid
,
status
):
r
=
self
.
client
.
get_server_details
(
servid
)
...
...
@@ -259,9 +263,7 @@ class Cyclades(tests.Generic):
def
_test_0030_wait_test_servers_to_build
(
self
):
"""Pseudo-test to wait for VMs to load"""
from
sys
import
stdout
stdout
.
write
(
''
)
stdout
.
flush
()
print
(
''
)
self
.
_wait_for_status
(
self
.
server1
[
'id'
],
'BUILD'
)
self
.
_wait_for_status
(
self
.
server2
[
'id'
],
'BUILD'
)
...
...
@@ -319,9 +321,7 @@ class Cyclades(tests.Generic):
def
_test_0070_wait_test_servers_to_reboot
(
self
):
"""Pseudo-test to wait for VMs to load"""
from
sys
import
stdout
stdout
.
write
(
''
)
stdout
.
flush
()
print
(
''
)
self
.
_wait_for_status
(
self
.
server1
[
'id'
],
'REBOOT'
)
self
.
_wait_for_status
(
self
.
server2
[
'id'
],
'REBOOT'
)
...
...
@@ -470,3 +470,248 @@ class Cyclades(tests.Generic):
self
.
_wait_for_status
(
self
.
server1
[
'id'
],
'STOPPED'
)
r
=
self
.
client
.
get_server_details
(
self
.
server1
[
'id'
])
self
.
assertEqual
(
r
[
'status'
],
'ACTIVE'
)
def
test_get_server_console
(
self
):
"""Test get_server_console"""
self
.
server2
=
self
.
_create_server
(
self
.
servname2
,
self
.
flavorid
+
2
,
self
.
img
)
self
.
_wait_for_status
(
self
.
server2
[
'id'
],
'BUILD'
)
self
.
_test_0190_get_server_console
()
def
_test_0190_get_server_console
(
self
):
r
=
self
.
client
.
get_server_console
(
self
.
server2
[
'id'
])
self
.
assertTrue
(
'host'
in
r
)
self
.
assertTrue
(
'password'
in
r
)
self
.
assertTrue
(
'port'
in
r
)
self
.
assertTrue
(
'type'
in
r
)
def
test_get_firewall_profile
(
self
):
"""Test get_firewall_profile"""
self
.
server1
=
self
.
_create_server
(
self
.
servname1
,
self
.
flavorid
,
self
.
img
)
self
.
_test_0200_get_firewall_profile
()
def
_test_0200_get_firewall_profile
(
self
):
self
.
_wait_for_status
(
self
.
server1
[
'id'
],
'BUILD'
)
fprofile
=
self
.
client
.
get_firewall_profile
(
self
.
server1
[
'id'
])
self
.
assertTrue
(
fprofile
in
self
.
PROFILES
)
def
test_set_firewall_profile
(
self
):
"""Test set_firewall_profile"""
self
.
server1
=
self
.
_create_server
(
self
.
servname1
,
self
.
flavorid
,
self
.
img
)
self
.
_test_0210_set_firewall_profile
()
def
_test_0210_set_firewall_profile
(
self
):
self
.
_wait_for_status
(
self
.
server1
[
'id'
],
'BUILD'
)
PROFILES
=
[
'DISABLED'
,
'ENABLED'
,
'DISABLED'
,
'PROTECTED'
]
fprofile
=
self
.
client
.
get_firewall_profile
(
self
.
server1
[
'id'
])
print
(
''
)
count_success
=
0
for
counter
,
fprofile
in
enumerate
(
PROFILES
):
npos
=
counter
+
1
try
:
nprofile
=
PROFILES
[
npos
]
except
IndexError
:
nprofile
=
PROFILES
[
0
]
print
(
'
\t
profile swap %s: %s -> %s'
%
(
npos
,
fprofile
,
nprofile
))
self
.
client
.
set_firewall_profile
(
self
.
server1
[
'id'
],
nprofile
)
time
.
sleep
(
0.5
)
self
.
client
.
reboot_server
(
self
.
server1
[
'id'
],
hard
=
True
)
time
.
sleep
(
1
)
self
.
_wait_for_status
(
self
.
server1
[
'id'
],
'REBOOT'
)
time
.
sleep
(
0.5
)
changed
=
self
.
client
.
get_firewall_profile
(
self
.
server1
[
'id'
])
try
:
self
.
assertEqual
(
changed
,
nprofile
)
except
AssertionError
as
err
:
if
count_success
:
print
(
'
\t
FAIL in swap #%s'
%
npos
)
break
else
:
raise
err
count_success
+=
1
def
test_get_server_stats
(
self
):
self
.
server1
=
self
.
_create_server
(
self
.
servname1
,
self
.
flavorid
,
self
.
img
)
self
.
_test_0220_get_server_stats
()
def
_test_0220_get_server_stats
(
self
):
r
=
self
.
client
.
get_server_stats
(
self
.
server1
[
'id'
])
for
term
in
(
'cpuBar'
,
'cpuTimeSeries'
,
'netBar'
,
'netTimeSeries'
,
'refresh'
):
self
.
assertTrue
(
term
in
r
)
def
test_create_network
(
self
):
"""Test create_network"""
self
.
_test_0230_create_network
()
def
_test_0230_create_network
(
self
):
self
.
network1
=
self
.
_create_network
(
self
.
netname1
)
self
.
_wait_for_network
(
self
.
network1
[
'id'
],
'ACTIVE'
)
self
.
network1
=
self
.
client
.
get_network_details
(
self
.
network1
[
'id'
])
nets
=
self
.
client
.
list_networks
(
self
.
network1
[
'id'
])
chosen
=
[
net
for
net
in
nets
if
net
[
'id'
]
==
self
.
network1
[
'id'
]][
0
]
chosen
.
pop
(
'updated'
)
net1
=
dict
(
self
.
network1
)
net1
.
pop
(
'updated'
)
self
.
assert_dicts_are_deeply_equal
(
chosen
,
net1
)
def
test_connect_server
(
self
):
"""Test connect_server"""
self
.
server1
=
self
.
_create_server
(
self
.
servname1
,
self
.
flavorid
,
self
.
img
)
self
.
network1
=
self
.
_create_network
(
self
.
netname1
)
self
.
_wait_for_status
(
self
.
server1
[
'id'
],
'BUILD'
)
self
.
_wait_for_network
(
self
.
network1
[
'id'
],
'ACTIVE'
)
self
.
_test_0240_connect_server
()
def
_test_0250_connect_server
(
self
):
self
.
client
.
connect_server
(
self
.
server1
[
'id'
],
self
.
network1
[
'id'
])
self
.
assertTrue
(
self
.
_wait_for_nic
(
self
.
network1
[
'id'
],
self
.
server1
[
'id'
]))
def
test_disconnect_server
(
self
):
"""Test disconnect_server"""
self
.
test_connect_server
()
self
.
_test_0250_disconnect_server
()
def
_test_0250_disconnect_server
(
self
):
self
.
client
.
disconnect_server
(
self
.
server1
[
'id'
],
self
.
network1
[
'id'
])
self
.
assertTrue
(
self
.
_wait_for_nic
(
self
.
network1
[
'id'
],
self
.
server1
[
'id'
],
in_creation
=
False
))
def
_test_0260_wait_for_second_network
(
self
):
self
.
network2
=
self
.
_create_network
(
self
.
netname2
)
self
.
_wait_for_network
(
self
.
network2
[
'id'
],
'ACTIVE'
)
def
test_list_server_nics
(
self
):
"""Test list_server_nics"""
self
.
server1
=
self
.
_create_server
(
self
.
servname1
,
self
.
flavorid
,
self
.
img
)
self
.
network2
=
self
.
_create_network
(
self
.
netname2
)
self
.
_wait_for_status
(
self
.
server1
[
'id'
],
'BUILD'
)
self
.
_wait_for_network
(
self
.
network2
[
'id'
],
'ACTIVE'
)
self
.
_test_0280_list_server_nics
()
def
_test_0280_list_server_nics
(
self
):
r
=
self
.
client
.
list_server_nics
(
self
.
server1
[
'id'
])
len0
=
len
(
r
)
self
.
client
.
connect_server
(
self
.
server1
[
'id'
],
self
.
network2
[
'id'
])
self
.
assertTrue
(
self
.
_wait_for_nic
(
self
.
network2
[
'id'
],
self
.
server1
[
'id'
]))
r
=
self
.
client
.
list_server_nics
(
self
.
server1
[
'id'
])
self
.
assertTrue
(
len
(
r
)
>
len0
)
def
test_list_networks
(
self
):
"""Test list_network"""
self
.
network1
=
self
.
_create_network
(
self
.
netname1
)
self
.
_wait_for_network
(
self
.
network1
[
'id'
],
'ACTIVE'
)
self
.
_test_0290_list_networks
()
def
_test_0290_list_networks
(
self
):
r
=
self
.
client
.
list_networks
()
self
.
assertTrue
(
len
(
r
)
>
1
)
ids
=
[
net
[
'id'
]
for
net
in
r
]
names
=
[
net
[
'name'
]
for
net
in
r
]
self
.
assertTrue
(
'1'
in
ids
)
#self.assertTrue('public' in names)
self
.
assertTrue
(
self
.
network1
[
'id'
]
in
ids
)
self
.
assertTrue
(
self
.
network1
[
'name'
]
in
names
)
r
=
self
.
client
.
list_networks
(
detail
=
True
)
ids
=
[
net
[
'id'
]
for
net
in
r
]
names
=
[
net
[
'name'
]
for
net
in
r
]
for
net
in
r
:
self
.
assertTrue
(
net
[
'id'
]
in
ids
)
self
.
assertTrue
(
net
[
'name'
]
in
names
)
for
term
in
(
'status'
,
'updated'
,
'created'
):
self
.
assertTrue
(
term
in
net
.
keys
())
def
test_get_network_details
(
self
):
"""Test get_network_details"""
self
.
network1
=
self
.
_create_network
(
self
.
netname1
)
self
.
_test_0300_get_network_details
()
def
_test_0300_get_network_details
(
self
):
r
=
self
.
client
.
get_network_details
(
self
.
network1
[
'id'
])
net1
=
dict
(
self
.
network1
)
net1
.
pop
(
'status'
)
net1
.
pop
(
'updated'
,
None
)
net1
.
pop
(
'attachments'
)
r
.
pop
(
'status'
)
r
.
pop
(
'updated'
,
None
)
r
.
pop
(
'attachments'
)
self
.
assert_dicts_are_deeply_equal
(
net1
,
r
)
def
test_update_network_name
(
self
):
self
.
network2
=
self
.
_create_network
(
self
.
netname2
)
self
.
_test_0310_update_network_name
()
def
_test_0310_update_network_name
(
self
):
updated_name
=
self
.
netname2
+
'_upd'
self
.
client
.
update_network_name
(
self
.
network2
[
'id'
],
updated_name
)
def
netwait
(
wait
):
r
=
self
.
client
.
get_network_details
(
self
.
network2
[
'id'
])
if
r
[
'name'
]
==
updated_name
:
return
time
.
sleep
(
wait
)
self
.
do_with_progress_bar
(
netwait
,
'Network %s name is changing:'
%
self
.
network2
[
'id'
],
self
.
_waits
[:
5
])
r
=
self
.
client
.
get_network_details
(
self
.
network2
[
'id'
])
self
.
assertEqual
(
r
[
'name'
],
updated_name
)
""" Don't have auth to test this
def test_delete_image(self):
""Test delete_image""
self._test_0330_delete_image()
def _test_0330_delete_image(self):
images = self.client.list_images()
self.client.delete_image(images[2]['id'])
try:
r = self.client.get_image_details(images[2]['id'], success=(400))
except ClientError as err:
self.assertEqual(err.status, 404)
def test_create_image_metadata(self):
""Test create_image_metadata""
self._test_0340_create_image_metadata()
def _test_0340_create_image_metadata(self):
r = self.client.create_image_metadata(self.img, 'mykey', 'myval')
self.assertEqual(r['mykey'], 'myval')
def test_update_image_metadata(self):
""Test update_image_metadata""
self._test_0350_update_image_metadata()
def _test_0350_update_image_metadata(self):
r = self.client.create_image_metadata(self.img, 'mykey0', 'myval')
r = self.client.update_image_metadata(self.img, 'mykey0', 'myval0')
self.assertEqual(r['mykey0'], 'myval0')
def test_delete_image_metadata(self):
""Test delete_image_metadata""
self._test_0360_delete_image_metadata()
def _test_0360_delete_image_metadata(self):
self.client.create_image_metadata(self.img, 'mykey1', 'myval1')
self.client.delete_image_metadata(self.img, 'mykey1')
r = self.client.get_image_metadata(self.img)
self.assertNotEqual('mykey1' in r)
"""
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment