Commit 95ee8e90 authored by Christos Stavrakakis's avatar Christos Stavrakakis
Browse files

cyclades: Add unittests for volumes API

parent d5cc53f9
......@@ -15,18 +15,211 @@
import json
from mock import patch
from snf_django.utils.testing import BaseAPITest
from synnefo.db.models_factory import VolumeFactory, VolumeTypeFactory
from mock import patch, Mock
from snf_django.utils.testing import BaseAPITest, mocked_quotaholder
from synnefo.db.models_factory import (VolumeFactory, VolumeTypeFactory,
VirtualMachineFactory)
from synnefo.lib.services import get_service_path
from synnefo.cyclades_settings import cyclades_services
from synnefo.lib import join_urls
from copy import deepcopy
VOLUME_URL = get_service_path(cyclades_services, 'volume',
version='v2.0')
VOLUMES_URL = join_urls(VOLUME_URL, "volumes")
@patch("synnefo.logic.rapi_pool.GanetiRapiClient")
class VolumeAPITest(BaseAPITest):
def test_create_volume(self, mrapi):
vm = VirtualMachineFactory(
operstate="ACTIVE",
flavor__volume_type__disk_template="ext_vlmc")
user = vm.userid
_data = {"display_name": "test_vol",
"size": 2,
"server_id": vm.id}
# Test Success
mrapi().ModifyInstance.return_value = 42
with mocked_quotaholder():
r = self.post(VOLUMES_URL, user,
json.dumps({"volume": _data}), "json")
self.assertSuccess(r)
# Test create without size, name and server
for attr in ["display_name", "size", "server_id"]:
data = deepcopy(_data)
del data["size"]
with mocked_quotaholder():
r = self.post(VOLUMES_URL, user,
json.dumps({"volume": data}), "json")
self.assertBadRequest(r)
# Test invalid size
data = deepcopy(_data)
data["size"] = -2
with mocked_quotaholder():
r = self.post(VOLUMES_URL, user,
json.dumps({"volume": data}), "json")
self.assertBadRequest(r)
# Test deleted server or invalid state
data = deepcopy(_data)
vm.deleted = True
vm.save()
with mocked_quotaholder():
r = self.post(VOLUMES_URL, user,
json.dumps({"volume": data}), "json")
self.assertBadRequest(r)
vm.deleted = False
vm.operstate = "ERROR"
vm.save()
with mocked_quotaholder():
r = self.post(VOLUMES_URL, user,
json.dumps({"volume": data}), "json")
self.assertBadRequest(r)
vm.operstate = "ACTIVE"
vm.save()
# Test volume type different from VM's flavor or invalid vype
data = deepcopy(_data)
for disk_type in ["file", "plain", "drbd", "rbd"]:
vtype = VolumeTypeFactory(disk_template=disk_type)
data["volume_type"] = vtype.id
with mocked_quotaholder():
r = self.post(VOLUMES_URL, user,
json.dumps({"volume": data}), "json")
self.assertBadRequest(r)
for vtype in [434132421243, "foo"]:
data["volume_type"] = vtype
with mocked_quotaholder():
r = self.post(VOLUMES_URL, user,
json.dumps({"volume": data}), "json")
self.assertBadRequest(r)
# Test source for invalid disk template
for disk_type in ["file", "plain", "drbd", "rbd"]:
temp_vm = VirtualMachineFactory(
operstate="ACTIVE",
flavor__volume_type__disk_template=disk_type)
for attr in ["snapshot_id", "imageRef"]:
data = deepcopy(_data)
data["server_id"] = temp_vm.id
data[attr] = "3214231-413242134123-431242"
with mocked_quotaholder():
r = self.post(VOLUMES_URL, user,
json.dumps({"volume": data}), "json")
self.assertBadRequest(r)
# Test snapshot and image together
data = deepcopy(_data)
data["snapshot_id"] = "3214231-413242134123-431242"
data["imageRef"] = "3214231-413242134123-431242"
with mocked_quotaholder():
r = self.post(VOLUMES_URL, user,
json.dumps({"volume": data}), "json")
self.assertBadRequest(r)
# Test with Snapshot source
# Test unknwon snapshot
data = deepcopy(_data)
data["snapshot_id"] = "94321904321-432142134214-23142314"
with mocked_quotaholder():
r = self.post(VOLUMES_URL, user,
json.dumps({"volume": data}), "json")
self.assertBadRequest(r)
vm.task = None
vm.action = None
vm.save()
# Test success
snapshot = Mock()
snapshot.return_value = {'location': 'pithos://foo',
'mapfile': '1234',
'id': 1,
'name': 'test_image',
'size': 1024,
'is_snapshot': True,
'status': 'AVAILABLE',
'disk_format': 'diskdump'}
data["snapshot_id"] = 1
with patch("synnefo.volume.util.get_snapshot", snapshot):
with mocked_quotaholder():
r = self.post(VOLUMES_URL, user,
json.dumps({"volume": data}), "json")
self.assertSuccess(r)
# Test with Snapshot source
# Test unknwon snapshot
data = deepcopy(_data)
data["imageRef"] = "94321904321-432142134214-23142314"
with mocked_quotaholder():
r = self.post(VOLUMES_URL, user,
json.dumps({"volume": data}), "json")
self.assertBadRequest(r)
vm.task = None
vm.action = None
vm.save()
data["server_id"] = vm.id
# Test success
image = Mock()
image.return_value = {'location': 'pithos://foo',
'mapfile': '1234',
'id': 2,
'name': 'test_image',
'size': 1024,
'is_snapshot': False,
'is_image': False,
'status': 'AVAILABLE',
'disk_format': 'diskdump'}
data["imageRef"] = 2
with patch("synnefo.api.util.get_image", image):
with mocked_quotaholder():
r = self.post(VOLUMES_URL, user,
json.dumps({"volume": data}), "json")
self.assertSuccess(r)
def test_rud(self, mrapi):
vol = VolumeFactory(status="IN_USE")
user = vol.userid
# READ
r = self.get(join_urls(VOLUMES_URL, "detail"), user)
api_vols = json.loads(r.content)["volumes"]
self.assertEqual(len(api_vols), 1)
api_vol = api_vols[0]
self.assertEqual(api_vol["id"], str(vol.id))
self.assertEqual(api_vol["display_name"], vol.name)
self.assertEqual(api_vol["display_description"], vol.description)
volume_url = join_urls(VOLUMES_URL, str(vol.id))
r = self.get(volume_url, user)
self.assertSuccess(r)
# UPDATE
data = {
"volume": {
"display_name": "lolo",
"display_description": "lala"
}
}
r = self.put(volume_url, user, json.dumps(data), "json")
self.assertSuccess(r)
api_vol = json.loads(r.content)["volume"]
self.assertEqual(api_vol["display_name"], "lolo")
self.assertEqual(api_vol["display_description"], "lala")
# DELETE
mrapi().ModifyInstance.return_value = 42
r = self.delete(volume_url, user)
self.assertSuccess(r)
class VolumeMetadataAPITest(BaseAPITest):
def test_volume_metadata(self):
vol = VolumeFactory()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment