Commit 05e0992f authored by Christos Stavrakakis's avatar Christos Stavrakakis
Browse files

cyclades: refactor reconciliation

Refactor reconciliation code to use namedtuples and improve performance
for retrieving info from database.
parent 88a9b09c
......@@ -123,16 +123,16 @@ class Command(BaseCommand):
else:
backends = Backend.objects.filter(offline=False)
D = reconciliation.get_servers_from_db(backends)
G, GNics = reconciliation.get_instances_from_ganeti(backends)
with_nics = options["detect_unsynced_nics"]
DBNics = reconciliation.get_nics_from_db(backends)
DBVMs = reconciliation.get_servers_from_db(backend, with_nics)
GanetiVMs = reconciliation.get_instances_from_ganeti(backend)
#
# Detect problems
#
if options['detect_stale']:
stale = reconciliation.stale_servers_in_db(D, G)
stale = reconciliation.stale_servers_in_db(DBVMs, GanetiVMs)
if len(stale) > 0:
print >> sys.stderr, "Found the following stale server IDs: "
print " " + "\n ".join(
......@@ -141,7 +141,8 @@ class Command(BaseCommand):
print >> sys.stderr, "Found no stale server IDs in DB."
if options['detect_orphans']:
orphans = reconciliation.orphan_instances_in_ganeti(D, G)
orphans = reconciliation.orphan_instances_in_ganeti(DBVMs,
GanetiVMs)
if len(orphans) > 0:
print >> sys.stderr, "Found orphan Ganeti instances with IDs: "
print " " + "\n ".join(
......@@ -150,7 +151,7 @@ class Command(BaseCommand):
print >> sys.stderr, "Found no orphan Ganeti instances."
if options['detect_unsynced']:
unsynced = reconciliation.unsynced_operstate(D, G)
unsynced = reconciliation.unsynced_operstate(DBVMs, GanetiVMs)
if len(unsynced) > 0:
print >> sys.stderr, "The operstate of the following server" \
" IDs is out-of-sync:"
......@@ -162,7 +163,8 @@ class Command(BaseCommand):
print >> sys.stderr, "The operstate of all servers is in sync."
if options['detect_build_errors']:
build_errors = reconciliation.instances_with_build_errors(D, G)
build_errors = reconciliation.\
instances_with_build_errors(DBVMs, GanetiVMs)
if len(build_errors) > 0:
msg = "The os for the following server IDs was not build"\
" successfully:"
......@@ -181,7 +183,7 @@ class Command(BaseCommand):
': MAC: %s, IP: %s, Network: %s' % \
(info['mac'], info['ipv4'], info['network'])
unsynced_nics = reconciliation.unsynced_nics(DBNics, GNics)
unsynced_nics = reconciliation.unsynced_nics(DBVMs, GanetiVMs)
if len(unsynced_nics) > 0:
msg = "The NICs of the servers with the following IDs are"\
" unsynced:"
......
......@@ -70,8 +70,10 @@ setup_environ(settings)
from datetime import datetime, timedelta
from collections import namedtuple
from synnefo.db.models import (VirtualMachine, pooled_rapi_client)
from synnefo.db.models import (VirtualMachine, NetworkInterface,
pooled_rapi_client)
from synnefo.logic.rapi import GanetiApiError
from synnefo.logic.backend import get_instances
from synnefo.logic import utils
......@@ -90,6 +92,8 @@ def needs_reconciliation(vm):
return (now > vm.updated + timedelta(seconds=CHECK_INTERVAL)) or\
(now > vm.backendtime + timedelta(seconds=2*CHECK_INTERVAL))
VMState = namedtuple("VMState", ["state", "nics"])
def stale_servers_in_db(D, G):
idD = set(D.keys())
......@@ -131,18 +135,20 @@ def unsynced_operstate(D, G):
idG = set(G.keys())
for i in idD & idG:
vm_unsynced = (G[i] and D[i] != "STARTED") or\
(not G[i] and D[i] not in ('BUILD', 'ERROR', 'STOPPED'))
dbstate = D[i].state
gntstate = G[i].state
vm_unsynced = (gntstate and dbstate != "STARTED") or\
(not gntstate and dbstate not in ('BUILD', 'ERROR', 'STOPPED'))
if vm_unsynced:
unsynced.add((i, D[i], G[i]))
if not G[i] and D[i] == 'BUILD':
unsynced.add((i, dbstate, gntstate))
if not gntstate and dbstate == 'BUILD':
vm = VirtualMachine.objects.get(id=i)
if needs_reconciliation(vm):
with pooled_rapi_client(vm) as c:
try:
job_info = c.GetJobStatus(job_id=vm.backendjobid)
if job_info['status'] == 'success':
unsynced.add((i, D[i], G[i]))
unsynced.add((i, dbstate, gntstate))
except GanetiApiError:
pass
......@@ -175,9 +181,27 @@ def instances_with_build_errors(D, G):
return failed
def get_servers_from_db(backends):
def get_servers_from_db(backends, with_nics=True):
vms = VirtualMachine.objects.filter(deleted=False, backend__in=backends)
return dict(map(lambda x: (x.id, x.operstate), vms))
vm_info = vms.values_list("id", "operstate")
if with_nics:
nics = NetworkInterface.objects.filter(machine__in=vms)\
.order_by("machine")\
.values_list("machine", "index", "mac", "ipv4",
"network")
vm_nics = {}
for machine, machine_nics in itertools.groupby(nics,
lambda nic: nic[0]):
vm_nics[machine] = {}
for machine, index, mac, ipv4, network in machine_nics:
nic = {'mac': mac,
'network': utils.id_to_network_name(network),
'ipv4': ipv4 if ipv4 != '' else None
}
vm_nics[machine][index] = nic
servers = dict([(vm_id, VMState(state=state, nics=vm_nics.get(vm_id, [])))
for vm_id, state in vm_info])
return servers
def get_instances_from_ganeti(backends):
......@@ -186,7 +210,6 @@ def get_instances_from_ganeti(backends):
instances.append(get_instances(backend))
ganeti_instances = reduce(list.__add__, instances, [])
snf_instances = {}
snf_nics = {}
prefix = settings.BACKEND_PREFIX_ID
for i in ganeti_instances:
......@@ -203,10 +226,11 @@ def get_instances_from_ganeti(backends):
i['name'])
continue
snf_instances[id] = i['oper_state']
snf_nics[id] = get_nics_from_instance(i)
nics = get_nics_from_instance(i)
snf_instances[id] = VMState(state=i["oper_state"],
nics=nics)
return snf_instances, snf_nics
return snf_instances
#
......@@ -254,27 +278,7 @@ def get_nics_from_instance(i):
return nics
def get_nics_from_db(backends):
"""Get network interfaces for each vm in DB.
"""
instances = VirtualMachine.objects.filter(deleted=False,
backend__in=backends)
instances_nics = {}
for instance in instances:
nics = {}
for n in instance.nics.all():
ipv4 = n.ipv4
nic = {'mac': n.mac,
'network': n.network.backend_id,
'ipv4': ipv4 if ipv4 != '' else None
}
nics[n.index] = nic
instances_nics[instance.id] = nics
return instances_nics
def unsynced_nics(DBNics, GNics):
def unsynced_nics(DBVMs, GanetiVMs):
"""Find unsynced network interfaces between DB and Ganeti.
@ rtype: dict; {instance_id: ganeti_nics}
......@@ -282,13 +286,13 @@ def unsynced_nics(DBNics, GNics):
interfaces between DB and Ganeti and the network interfaces in Ganeti.
"""
idD = set(DBNics.keys())
idG = set(GNics.keys())
idD = set(DBVMs.keys())
idG = set(GanetiVMs.keys())
unsynced = {}
for i in idD & idG:
nicsD = DBNics[i]
nicsG = GNics[i]
nicsD = DBVMs[i].nics
nicsG = GanetiVMs[i].nics
if len(nicsD) != len(nicsG):
unsynced[i] = (nicsD, nicsG)
continue
......
......@@ -610,21 +610,31 @@ class UpdateBuildProgressTest(TestCase):
self.assertEqual(vm.buildpercentage, old)
from synnefo.logic.reconciliation import VMState
class ReconciliationTest(TestCase):
SERVERS = 1000
fixtures = ['db_test_data']
def get_vm(self, operstate, deleted=False):
flavor = mfactory.FlavorFactory(cpu=2, ram=1024)
vm = mfactory.VirtualMachineFactory(deleted=deleted, flavor=flavor)
vm.operstate = operstate
vm.save()
return vm
def test_get_servers_from_db(self):
"""Test getting a dictionary from each server to its operstate"""
backend = 30000
self.assertEquals(reconciliation.get_servers_from_db(backends=[backend]),
{30000: 'STARTED', 30001: 'STOPPED', 30002: 'BUILD'})
vm1 = self.get_vm('STARTED')
vm2 = self.get_vm('DESTROYED', deleted=True)
vm3 = self.get_vm('STOPPED')
self.assertEquals(reconciliation.get_servers_from_db(),
{vm1.id: VMState(state='STARTED', cpu=2, ram=1024, nics=[]),
vm3.id: VMState(state='STOPPED', cpu=2, ram=1024, nics=[])}
)
def test_stale_servers_in_db(self):
"""Test discovery of stale entries in DB"""
D = {1: 'STARTED', 2: 'STOPPED', 3: 'STARTED', 30000: 'BUILD',
30002: 'STOPPED'}
D = {1: None, 2: 'None', 3: None, 30000: 'BUILD',
30002: 'None'}
G = {1: True, 3: True, 30000: True}
self.assertEquals(reconciliation.stale_servers_in_db(D, G),
set([2, 30002]))
......@@ -666,13 +676,21 @@ class ReconciliationTest(TestCase):
def test_unsynced_operstate(self):
"""Test discovery of unsynced operstate between the DB and Ganeti"""
G = {1: True, 2: False, 3: True, 4: False, 50: True}
D = {1: 'STARTED', 2: 'STARTED', 3: 'BUILD', 4: 'STARTED', 50: 'BUILD'}
mkstate = lambda state: VMState(state=state, cpu=1, ram=1024, nics=[])
vm1 = self.get_vm("STARTED")
vm2 = self.get_vm("STARTED")
vm3= self.get_vm("BUILD")
vm4 = self.get_vm("STARTED")
vm5 = self.get_vm("BUILD")
D = {1: mkstate("STARTED"), 2: mkstate("STARTED"), 3: mkstate("BUILD"),
4: mkstate("STARTED"), 50: mkstate("BUILD")}
G = {vm1.id: mkstate(True), vm2.id: mkstate(False),
vm4.id: mkstate(True), vm4.id: mkstate(False),
vm5.id: mkstate(False)}
self.assertEquals(reconciliation.unsynced_operstate(D, G),
set([(2, 'STARTED', False),
(3, 'BUILD', True), (4, 'STARTED', False),
(50, 'BUILD', True)]))
set([(vm2.id, "STARTED", False),
(vm4.id, "STARTED", False)]))
from synnefo.logic.test.rapi_pool_tests import *
from synnefo.logic.test.utils_tests import *
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment