Commit 65d87596 authored by Christos Stavrakakis's avatar Christos Stavrakakis

Add more tests for db and logic

parent 01f90a77
# Copyright 2011 GRNET S.A. All rights reserved.
# Copyright 2012 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
......@@ -31,21 +31,202 @@
#
# Provides automated tests for db module
from synnefo.db.models import *
from django.test import TestCase
# Import pool tests
from synnefo.db.pools.tests import *
from synnefo.db.models import *
from synnefo.db import models_factory as mfact
from synnefo.db.pools import IPPool
from django.db import IntegrityError
from django.core.exceptions import MultipleObjectsReturned
from mock import patch
class FlavorTest(TestCase):
def test_flavor_name(self):
"""Test a flavor object name method."""
flavor = mfact.FlavorFactory(cpu=1, ram=1024, disk=40)
self.assertEqual(flavor.name, "C1R1024D40", "flavor.name is not"
" generated correctly. Name is %s instead of C1R1055D40" %
flavor.name)
class BackendTest(TestCase):
def setUp(self):
self.backend = mfact.BackendFactory()
@patch("synnefo.db.models.get_rapi_client")
def test_get_client(self, client):
id_ = self.backend.id
hash_ = self.backend.hash
name = self.backend.clustername
passwd = self.backend.password
user = self.backend.username
port = self.backend.port
self.backend.get_client()
client.assert_called_once_with(id_, hash_, name, port, user, passwd)
def test_save_hash(self):
"""Test that backend hash is generated on credential change"""
old_hash = self.backend.hash
for field in ['clustername', 'username', 'password', 'port']:
value = 5181 if field == 'port' else 'foo'
self.backend.__setattr__(field, value)
self.backend.save()
self.assertNotEqual(old_hash, self.backend.hash)
old_hash = self.backend.hash
def test_unique_index(self):
"""Test that each backend gets a unique index"""
backends = [self.backend]
for i in xrange(0, 14):
backends.append(mfact.BackendFactory())
indexes = map(lambda x: x.index, backends)
self.assertEqual(len(indexes), len(set(indexes)))
def test_backend_number(self):
"""Test that no more than 16 backends are created"""
for i in xrange(0, 14):
mfact.BackendFactory()
self.assertRaises(Exception, mfact.BackendFactory, ())
def test_delete_backend(self):
vm = mfact.VirtualMachineFactory(backend=self.backend, deleted=True)
bnet = mfact.BackendNetworkFactory(backend=self.backend)
self.backend.delete()
self.assertRaises(Backend.DoesNotExist, Backend.objects.get,
id=self.backend.id)
# Test that VM is not deleted
vm2 = VirtualMachine.objects.get(id=vm.id)
self.assertEqual(vm2.backend, None)
# Test tha backend networks are deleted, but not the network
self.assertRaises(BackendNetwork.DoesNotExist,
BackendNetwork.objects.get, id=bnet.id)
Network.objects.get(id=bnet.network.id)
def test_delete_active_backend(self):
"""Test that a backend with non-deleted VMS is not deleted"""
mfact.VirtualMachineFactory(backend=self.backend)
self.assertRaises(IntegrityError, self.backend.delete, ())
def test_password_encryption(self):
password_hash = self.backend.password
self.backend.password = '123'
self.assertNotEqual(self.backend.password_hash, '123')
self.assertNotEqual(self.backend.password_hash, password_hash)
self.assertEqual(self.backend.password, '123')
class VirtualMachineTest(TestCase):
def setUp(self):
self.vm = mfact.VirtualMachineFactory()
@patch("synnefo.db.models.get_rapi_client")
def test_get_client(self, client):
backend = self.vm.backend
id_ = backend.id
hash_ = backend.hash
name = backend.clustername
passwd = backend.password
user = backend.username
port = backend.port
self.vm.get_client()
client.assert_called_once_with(id_, hash_, name, port, user, passwd)
def test_create(self):
vm = mfact.VirtualMachineFactory()
self.assertEqual(vm.action, None)
self.assertEqual(vm.backendjobid, None)
self.assertEqual(vm.backendjobstatus, None)
self.assertEqual(vm.backendopcode, None)
self.assertEqual(vm.backendlogmsg, None)
self.assertEqual(vm.operstate, 'BUILD')
class NetworkTest(TestCase):
def setUp(self):
self.net = mfact.NetworkFactory()
def test_tags(self):
net1 = mfact.NetworkFactory(flavor='IP_LESS_ROUTED')
self.assertEqual(net1.backend_tag, ['ip-less-routed'])
net1 = mfact.NetworkFactory(flavor='CUSTOM')
self.assertEqual(net1.backend_tag, [])
def test_create_backend_network(self):
len_backends = len(Backend.objects.all())
back = mfact.BackendFactory()
self.net.create_backend_network(backend=back)
BackendNetwork.objects.get(network=self.net, backend=back)
back1 = mfact.BackendFactory()
back2 = mfact.BackendFactory()
self.net.create_backend_network()
BackendNetwork.objects.get(network=self.net, backend=back1)
BackendNetwork.objects.get(network=self.net, backend=back2)
self.assertEqual(len(BackendNetwork.objects.filter(network=self.net)),
len_backends + 3)
def test_pool(self):
pool = self.net.get_pool()
pool.network = self.net
self.assertTrue(isinstance(pool, IPPool))
def test_reserve_ip(self):
net1 = mfact.NetworkFactory(subnet='192.168.2.0/24')
net1.reserve_address('192.168.2.12')
pool = net1.get_pool()
self.assertFalse(pool.is_available('192.168.2.12'))
net1.release_address('192.168.2.12')
pool = net1.get_pool()
self.assertTrue(pool.is_available('192.168.2.12'))
class BackendNetworkTest(TestCase):
def test_mac_prefix(self):
network = mfact.NetworkFactory(mac_prefix='aa:bb:c')
backend = mfact.BackendFactory()
bnet = mfact.BackendNetworkFactory(network=network, backend=backend)
self.assertTrue(backend.index < 10)
self.assertEqual(bnet.mac_prefix, 'aa:bb:c%s' % backend.index)
class BridgePoolTest(TestCase):
def test_no_pool(self):
self.assertRaises(BridgePoolTable.DoesNotExist,
BridgePoolTable.get_pool)
def test_two_pools(self):
mfact.BridgePoolTableFactory()
mfact.BridgePoolTableFactory()
self.assertRaises(MultipleObjectsReturned, BridgePoolTable.get_pool)
class FlavorTestCase(TestCase):
fixtures = [ 'db_test_data' ]
def test_flavor(self):
"""Test a flavor object, its internal cost calculation and naming methods"""
flavor = Flavor.objects.get(pk=30000)
class AESTest(TestCase):
def test_encrypt_decrtypt(self):
from synnefo.db import aes_encrypt as aes
old = 'bar'
new = aes.decrypt_db_charfield(aes.encrypt_db_charfield(old))
self.assertEqual(old, new)
# test name property, should be C1R1024D10
f_name = flavor.name
def test_password_change(self):
from synnefo.db import aes_encrypt as aes
old_pass = aes.SECRET_ENCRYPTION_KEY
old = 'bar'
encrypted = aes.encrypt_db_charfield(old)
aes.SECRET_ENCRYPTION_KEY = 'foo2'
self.assertRaises(aes.CorruptedPassword, aes.decrypt_db_charfield,
encrypted)
aes.SECRET_ENCRYPTION_KEY = old_pass
new = aes.decrypt_db_charfield(encrypted)
self.assertEqual(old, new)
self.assertEqual(f_name, 'C1R1024D10', 'flavor.name is not generated correctly, C1R1024D10! (%s)' % (f_name,))
def test_big_secret(self):
from synnefo.db import aes_encrypt as aes
old = aes.SECRET_ENCRYPTION_KEY
aes.SECRET_ENCRYPTION_KEY = \
'91490231234814234812348913289481294812398421893489'
self.assertRaises(ValueError, aes.encrypt_db_charfield, 'la')
aes.SECRET_ENCRYPTION_KEY = old
......@@ -43,7 +43,8 @@ from synnefo.lib.utils import split_time
from datetime import datetime
from mock import patch
from synnefo.api.util import allocate_resource
from synnefo.logic.callbacks import update_db, update_net, update_network
from synnefo.logic.callbacks import (update_db, update_net, update_network,
update_build_progress)
now = datetime.now
from time import time
......@@ -428,173 +429,68 @@ class UpdateNetworkTest(TestCase):
self.assertFalse(pool.is_reserved('10.0.0.20'))
class ProcessOpStatusTestCase(TestCase):
fixtures = ['db_test_data']
msg_op = {
'instance': 'instance-name',
'type': 'ganeti-op-status',
'operation': 'OP_INSTANCE_STARTUP',
'jobId': 0,
'status': 'success',
'logmsg': 'unittest - simulated message'
}
def test_op_startup_success(self):
"""Test notification for successful OP_INSTANCE_START"""
msg = self.msg_op
msg['operation'] = 'OP_INSTANCE_STARTUP'
msg['status'] = 'success'
# This machine is initially in BUILD
vm = VirtualMachine.objects.get(pk=30002)
backend.process_op_status(vm, now(), msg["jobId"], msg["operation"],
msg["status"], msg["logmsg"])
self.assertEquals(get_rsapi_state(vm), 'ACTIVE')
def test_op_shutdown_success(self):
"""Test notification for successful OP_INSTANCE_SHUTDOWN"""
msg = self.msg_op
msg['operation'] = 'OP_INSTANCE_SHUTDOWN'
msg['status'] = 'success'
# This machine is initially in BUILD
vm = VirtualMachine.objects.get(pk=30002)
backend.process_op_status(vm, now(), msg["jobId"], msg["operation"],
msg["status"], msg["logmsg"])
self.assertEquals(get_rsapi_state(vm), 'STOPPED')
def test_op_reboot_success(self):
"""Test notification for successful OP_INSTANCE_REBOOT"""
msg = self.msg_op
msg['operation'] = 'OP_INSTANCE_REBOOT'
msg['status'] = 'success'
# This machine is initially in BUILD
vm = VirtualMachine.objects.get(pk=30002)
backend.process_op_status(vm, now(), msg["jobId"], msg["operation"],
msg["status"], msg["logmsg"])
self.assertEquals(get_rsapi_state(vm), 'ACTIVE')
@patch('synnefo.lib.amqp.AMQPClient')
class UpdateBuildProgressTest(TestCase):
def setUp(self):
self.vm = mfactory.VirtualMachineFactory()
def test_op_create_success(self):
"""Test notification for successful OP_INSTANCE_CREATE"""
msg = self.msg_op
msg['operation'] = 'OP_INSTANCE_CREATE'
msg['status'] = 'success'
def get_db_vm(self):
return VirtualMachine.objects.get(id=self.vm.id)
# This machine is initially in BUILD
vm = VirtualMachine.objects.get(pk=30002)
backend.process_op_status(vm, now(), msg["jobId"], msg["operation"],
msg["status"], msg["logmsg"])
self.assertEquals(get_rsapi_state(vm), 'ACTIVE')
def create_msg(self, **kwargs):
"""Create snf-progress-monitor message"""
msg = {'event_time': split_time(time())}
msg['type'] = 'image-copy-progress'
msg['progress'] = 0
for key, val in kwargs.items():
msg[key] = val
message = {'body': json.dumps(msg)}
return message
def test_op_remove_success(self):
"""Test notification for successful OP_INSTANCE_REMOVE"""
msg = self.msg_op
msg['operation'] = 'OP_INSTANCE_REMOVE'
msg['status'] = 'success'
def test_missing_attribute(self, client):
update_build_progress(client, json.dumps({'body': {}}))
client.basic_nack.assert_called_once()
# This machine is initially in BUILD
vm = VirtualMachine.objects.get(pk=30002)
backend.process_op_status(vm, now(), msg["jobId"], msg["operation"],
msg["status"], msg["logmsg"])
self.assertEquals(get_rsapi_state(vm), 'DELETED')
self.assertTrue(vm.deleted)
def test_op_create_error(self):
"""Test notification for failed OP_INSTANCE_CREATE"""
msg = self.msg_op
msg['operation'] = 'OP_INSTANCE_CREATE'
msg['status'] = 'error'
# This machine is initially in BUILD
vm = VirtualMachine.objects.get(pk=30002)
backend.process_op_status(vm, now(), msg["jobId"], msg["operation"],
msg["status"], msg["logmsg"])
self.assertEquals(get_rsapi_state(vm), 'ERROR')
self.assertFalse(vm.deleted)
def test_remove_machine_in_error(self):
"""Test notification for failed OP_INSTANCE_REMOVE, server in ERROR"""
msg = self.msg_op
msg['operation'] = 'OP_INSTANCE_REMOVE'
msg['status'] = 'error'
# This machine is initially in BUILD
vm = VirtualMachine.objects.get(pk=30002)
backend.process_op_status(vm, now(), 0, "OP_INSTANCE_CREATE", "error", "test")
self.assertEquals(get_rsapi_state(vm), 'ERROR')
backend.process_op_status(vm, now(), msg["jobId"], msg["operation"],
msg["status"], msg["logmsg"])
self.assertEquals(get_rsapi_state(vm), 'DELETED')
self.assertTrue(vm.deleted)
class ProcessNetStatusTestCase(TestCase):
fixtures = ['db_test_data']
def test_unhandled_exception(self, client):
update_build_progress(client, {})
client.basic_reject.assert_called_once()
def test_set_ipv4(self):
"""Test reception of a net status notification"""
msg = {'instance': 'instance-name',
'type': 'ganeti-net-status',
'nics': [
{'ip': '10.0.0.21',
'mac': 'aa:00:00:58:1e:b9',
'network':'snf-net-30000'}
]
}
vm = VirtualMachine.objects.get(pk=30000)
backend.process_net_status(vm, now(), msg['nics'])
self.assertEquals(vm.nics.all()[0].ipv4, '10.0.0.21')
def test_set_empty_ipv4(self):
"""Test reception of a net status notification with no IPv4 assigned"""
msg = {'instance': 'instance-name',
'type': 'ganeti-net-status',
'nics': [
{'ip': '',
'mac': 'aa:00:00:58:1e:b9',
'network':'snf-net-30000'}
]
}
vm = VirtualMachine.objects.get(pk=30000)
backend.process_net_status(vm, now(), msg['nics'])
self.assertEquals(vm.nics.all()[0].ipv4, '')
class ProcessProgressUpdateTestCase(TestCase):
fixtures = ['db_test_data']
def test_missing_instance(self, client):
msg = self.create_msg(instance='foo')
update_build_progress(client, msg)
client.basic_nack.assert_called_once()
def test_progress_update(self):
"""Test reception of a create progress notification"""
def test_wrong_type(self, client):
msg = self.create_msg(type="WRONG_TYPE")
update_build_progress(client, msg)
client.basic_ack.assert_called_once()
# This machine is in BUILD
vm = VirtualMachine.objects.get(pk=30002)
def test_progress_update(self, client):
rprogress = randint(10, 100)
backend.process_create_progress(vm, now(), rprogress)
self.assertEquals(vm.buildpercentage, rprogress)
#self.assertRaises(ValueError, backend.process_create_progress,
# vm, 9, 0)
self.assertRaises(ValueError, backend.process_create_progress,
now(), vm, -1)
self.assertRaises(ValueError, backend.process_create_progress,
now(), vm, 'a')
# This machine is ACTIVE
#vm = VirtualMachine.objects.get(pk=30000)
#self.assertRaises(VirtualMachine.IllegalState,
# backend.process_create_progress, vm, 1)
msg = self.create_msg(progress=rprogress,
instance=self.vm.backend_vm_id)
update_build_progress(client, msg)
client.basic_ack.assert_called_once()
vm = self.get_db_vm()
self.assertEqual(vm.buildpercentage, rprogress)
def test_invalid_value(self, client):
old = self.vm.buildpercentage
for rprogress in [0, -1, 'a']:
msg = self.create_msg(progress=rprogress,
instance=self.vm.backend_vm_id)
update_build_progress(client, msg)
client.basic_ack.assert_called_once()
vm = self.get_db_vm()
self.assertEqual(vm.buildpercentage, old)
class ReconciliationTestCase(TestCase):
class ReconciliationTest(TestCase):
SERVERS = 1000
fixtures = ['db_test_data']
def test_get_servers_from_db(self):
"""Test getting a dictionary from each server to its operstate"""
reconciliation.get_servers_from_db()
self.assertEquals(reconciliation.get_servers_from_db(),
{30000: 'STARTED', 30001: 'STOPPED', 30002: 'BUILD'})
......@@ -607,6 +503,29 @@ class ReconciliationTestCase(TestCase):
self.assertEquals(reconciliation.stale_servers_in_db(D, G),
set([2, 30002]))
@patch("synnefo.db.models.get_rapi_client")
def test_stale_building_vm(self, client):
vm = mfactory.VirtualMachineFactory()
vm.state = 'BUILD'
vm.backendjobid = 42
vm.save()
D = {vm.id: 'BUILD'}
G = {}
for status in ['queued', 'waiting', 'running']:
client.return_value.GetJobStatus.return_value = {'status': status}
self.assertEqual(reconciliation.stale_servers_in_db(D, G), set([]))
client.return_value.GetJobStatus.assert_called_once_with(vm.backendjobid)
client.reset_mock()
for status in ['success', 'error', 'canceled']:
client.return_value.GetJobStatus.return_value = {'status': status}
self.assertEqual(reconciliation.stale_servers_in_db(D, G), set([]))
client.return_value.GetInstance.assert_called_once_with(vm.backend_vm_id)
client.return_value.GetJobStatus.assert_called_once_with(vm.backendjobid)
client.reset_mock()
from synnefo.logic.rapi import GanetiApiError
client.return_value.GetJobStatus.side_effect = GanetiApiError('Foo')
self.assertEqual(reconciliation.stale_servers_in_db(D, G), set([vm.id]))
def test_orphan_instances_in_ganeti(self):
"""Test discovery of orphan instances in Ganeti, without a DB entry"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment