Commit 5b90d4b4 authored by Vangelis Koukis's avatar Vangelis Koukis
Browse files

Overhaul reconciliation mamagement command

Detect the following problems:
 * Detect stale DB servers without corresponding Ganeti instances
 * Detect orphan Ganeti instances, without corresponding DB entries
 * Detect out-of-sync operstate for DB entries wrt to Ganeti instances
Fix them as follows:
 * Simulate server deletion for stale DB servers
 * Issue RAPI DeleteInstance() call for orphan Ganeti instances
 * Set operstate in DB as in Ganeti for out-of-sync DB servers

See ./manage.py reconcile --help for more info, refs #1021.
parent d2de78fe
......@@ -4,6 +4,14 @@ README.Upgrade
This file documents the upgrade to newer versions of the Synnefo software.
For more information, please see README.deploy.
v0.5.5 -> v0.5.6
RECONCILIATION
* Implemented new reconciliation management command, please see
./manage.py reconcile --help for more info.
Recommended to run ./manage.py reconcile --detect-all periodically,
via cron.
v0.5.4 -> v0.5.5
LOGGING
* Changed the default logging settings for the dispatcher to also log
......
......@@ -27,57 +27,148 @@
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of GRNET S.A.
#
# Reconcile VM state - Management Script
from synnefo.db.models import VirtualMachine
from django.db.models import Q
from django.conf import settings
"""Reconciliation management command
Management command to reconcile the contents of the Synnefo DB with
the state of the Ganeti backend. See docstring on top of logic/reconcile.py
for a description of reconciliation rules.
"""
import sys
from datetime import datetime, timedelta
from optparse import make_option
from django.conf import settings
from django.db.models import Q
from django.core.management.base import BaseCommand
from synnefo.logic import amqp_connection
from synnefo.logic.amqp_connection import AMQPError
from synnefo.db.models import VirtualMachine
from synnefo.logic import reconciliation, backend
from synnefo.util.rapi import GanetiRapiClient
import json
import sys
class Command(BaseCommand):
prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
help = 'Reconcile VM status with the backend'
option_list = BaseCommand.option_list + (
make_option('--all', action='store_true', dest='all_vms', default=False,
help='Run the reconciliation function for all VMs, now'),
make_option('--interval', action='store', dest='interval', default=1,
help='Interval in minutes between reconciliations'),
)
def handle(self, all_vms = False, interval = 1, **options):
all = VirtualMachine.objects.filter(Q(deleted = False) &
Q(suspended = False))
if not all_vms:
now = datetime.now()
last_update = timedelta(minutes = settings.RECONCILIATION_MIN)
not_updated = VirtualMachine.objects.filter(Q(deleted = False) &
Q(suspended = False) &
Q(updated__lte = (now - last_update)))
to_update = ((all.count() / settings.RECONCILIATION_MIN) * interval)
else:
to_update = all.count()
not_updated = all
vm_ids = map(lambda x: x.id, not_updated[:to_update])
for vmid in vm_ids :
msg = dict(type = "reconcile", vmid = vmid)
try:
amqp_connection.send(json.dumps(msg), settings.EXCHANGE_CRON,
"reconciliation.%s.%s" % (self.prefix,vmid))
except AMQPError as e:
print >> sys.stderr, 'Error sending reconciliation request: %s' % e
raise
print "All: %d, To update: %d, Triggered update for: %s" % \
(all.count(), not_updated.count(), vm_ids)
can_import_settings = True
help = 'Reconcile contents of Synnefo DB with state of Ganeti backend'
output_transaction = True # The management command runs inside
# an SQL transaction
option_list = BaseCommand.option_list + (
make_option('--detect-stale', action='store_true', dest='detect_stale',
default=False, help='Detect stale VM entries in DB'),
make_option('--detect-orphans', action='store_true',
dest='detect_orphans',
default=False, help='Detect orphan instances in Ganeti'),
make_option('--detect-unsynced', action='store_true',
dest='detect_unsynced',
default=False, help='Detect unsynced operstate between ' +
'DB and Ganeti'),
make_option('--detect-all', action='store_true',
dest='detect_all',
default=False, help='Enable all --detect-* arguments'),
make_option('--fix-stale', action='store_true', dest='fix_stale',
default=False, help='Fix (remove) stale DB entries in DB'),
make_option('--fix-orphans', action='store_true', dest='fix_orphans',
default=False, help='Fix (remove) orphan Ganeti VMs'),
make_option('--fix-unsynced', action='store_true', dest='fix_unsynced',
default=False, help='Fix server operstate in DB, set ' +
'from Ganeti'),
make_option('--fix-all', action='store_true', dest='fix_all',
default=False, help='Enable all --fix-* arguments'))
def _process_args(self, options):
keys_detect = [k for k in options.keys() if k.startswith('detect_')]
keys_fix = [k for k in options.keys() if k.startswith('fix_')]
if options['detect_all']:
for kd in keys_detect:
options[kd] = True
if options['fix_all']:
for kf in keys_fix:
options[kf] = True
if not reduce(lambda x, y: x or y,
map(lambda x: options[x], keys_detect)):
raise Exception("At least one of --detect-* must be specified")
for kf in keys_fix:
kd = kf.replace('fix_', 'detect_', 1)
if (options[kf] and not options[kd]):
raise Exception("Cannot use --%s without corresponding "
"--%s argument" % (kf, kd))
def handle(self, **options):
verbosity = int(options['verbosity'])
self._process_args(options)
D = reconciliation.get_servers_from_db()
G = reconciliation.get_instances_from_ganeti()
#
# Detect problems
#
if options['detect_stale']:
stale = reconciliation.stale_servers_in_db(D, G)
if len(stale) > 0:
print >> sys.stderr, "Found the following stale server IDs: "
print " " + "\n ".join(
[str(x) for x in stale])
elif verbosity == 2:
print >> sys.stderr, "Found no stale server IDs in DB."
if options['detect_orphans']:
orphans = reconciliation.orphan_instances_in_ganeti(D, G)
if len(orphans) > 0:
print >> sys.stderr, "Found orphan Ganeti instances with IDs: "
print " " + "\n ".join(
[str(x) for x in orphans])
elif verbosity == 2:
print >> sys.stderr, "Found no orphan Ganeti instances."
if options['detect_unsynced']:
unsynced = reconciliation.unsynced_operstate(D, G)
if len(unsynced) > 0:
print >> sys.stderr, "The operstate of the following server" \
" IDs is out-of-sync:"
print " " + "\n ".join(
["%d is %s in DB, %s in Ganeti" %
(x[0], x[1], ('UP' if x[2] else 'DOWN'))
for x in unsynced])
elif verbosity == 2:
print >> sys.stderr, "The operstate of all servers is in sync."
#
# Then fix them
#
if options['fix_stale'] and len(stale) > 0:
print >> sys.stderr, \
"Simulating successful Ganeti removal for %d " \
"servers in the DB:" % len(stale)
for vm in VirtualMachine.objects.filter(pk__in=stale):
backend.process_op_status(vm=vm, jobid=-0,
opcode='OP_INSTANCE_REMOVE', status='success',
logmsg='Reconciliation: simulated Ganeti event')
print >> sys.stderr, " ...done"
if options['fix_orphans'] and len(orphans) > 0:
print >> sys.stderr, \
"Issuing OP_INSTANCE_REMOVE for %d Ganeti instances:" % \
len(orphans)
for id in orphans:
rapi = GanetiRapiClient(*settings.GANETI_CLUSTER_INFO)
rapi.DeleteInstance('%s%s' %
(settings.BACKEND_PREFIX_ID, str(id)))
print >> sys.stderr, " ...done"
if options['fix_unsynced'] and len(unsynced) > 0:
print >> sys.stderr, "Setting the state of %d out-of-sync VMs:" % \
len(unsynced)
for id, db_state, ganeti_up in unsynced:
vm = VirtualMachine.objects.get(pk=id)
opcode = "OP_INSTANCE_REBOOT" if ganeti_up \
else "OP_INSTANCE_SHUTDOWN"
backend.process_op_status(vm=vm, jobid=-0,
opcode=opcode, status='success',
logmsg='Reconciliation: simulated Ganeti event')
print >> sys.stderr, " ...done"
# Copyright 2011 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of GRNET S.A.
#
# Reconcile VM state - Management Script
from synnefo.db.models import VirtualMachine
from django.db.models import Q
from django.conf import settings
from datetime import datetime, timedelta
from optparse import make_option
from django.core.management.base import BaseCommand
from synnefo.logic import amqp_connection
from synnefo.logic.amqp_connection import AMQPError
import json
import sys
class Command(BaseCommand):
prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
help = 'Reconcile VM status with the backend'
option_list = BaseCommand.option_list + (
make_option('--all', action='store_true', dest='all_vms', default=False,
help='Run the reconciliation function for all VMs, now'),
make_option('--interval', action='store', dest='interval', default=1,
help='Interval in minutes between reconciliations'),
)
def handle(self, all_vms = False, interval = 1, **options):
all = VirtualMachine.objects.filter(Q(deleted = False) &
Q(suspended = False))
if not all_vms:
now = datetime.now()
last_update = timedelta(minutes = settings.RECONCILIATION_MIN)
not_updated = VirtualMachine.objects.filter(Q(deleted = False) &
Q(suspended = False) &
Q(updated__lte = (now - last_update)))
to_update = ((all.count() / settings.RECONCILIATION_MIN) * interval)
else:
to_update = all.count()
not_updated = all
vm_ids = map(lambda x: x.id, not_updated[:to_update])
for vmid in vm_ids :
msg = dict(type = "reconcile", vmid = vmid)
try:
amqp_connection.send(json.dumps(msg), settings.EXCHANGE_CRON,
"reconciliation.%s.%s" % (self.prefix,vmid))
except AMQPError as e:
print >> sys.stderr, 'Error sending reconciliation request: %s' % e
raise
print "All: %d, To update: %d, Triggered update for: %s" % \
(all.count(), not_updated.count(), vm_ids)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2011 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials
# provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# The views and conclusions contained in the software and
# documentation are those of the authors and should not be
# interpreted as representing official policies, either expressed
# or implied, of GRNET S.A.
#
"""Business logic for reconciliation
Reconcile the contents of the DB with the actual state of the
Ganeti backend.
Let D be the set of VMs in the DB, G the set of VMs in Ganeti.
RULES:
R1. Stale servers in DB:
For any v in D but not in G:
Set deleted=True.
R2. Orphan instances in Ganet:
For any v in G with deleted=True in D:
Issue OP_INSTANCE_DESTROY.
R3. Unsynced operstate:
For any v whose operating state differs between G and V:
Set the operating state in D based on the state in G.
In the code, D, G are Python dicts mapping instance ids to operating state.
For D, the operating state is chosen from VirtualMachine.OPER_STATES.
For G, the operating state is True if the machine is up, False otherwise.
"""
import sys
from synnefo.logic import log
_logger = log.get_logger("reconciliation")
from django.core.management import setup_environ
try:
from synnefo import settings
except ImportError:
raise Exception("Cannot import settings, make sure PYTHONPATH contains "
"the parent directory of the Synnefo Django project.")
setup_environ(settings)
from synnefo.db.models import VirtualMachine
from synnefo.util.rapi import GanetiRapiClient
def stale_servers_in_db(D, G):
idD = set(D.keys())
idG = set(G.keys())
return idD - idG
def orphan_instances_in_ganeti(D, G):
idD = set(D.keys())
idG = set(G.keys())
return idG - idD
def unsynced_operstate(D, G):
unsynced = set()
idD = set(D.keys())
idG = set(G.keys())
for i in idD & idG:
if (G[i] and D[i] != 'STARTED' or
not G[i] and D[i] not in ('BUILD', 'ERROR', 'STOPPED')):
unsynced.add((i, D[i], G[i]))
return unsynced
def get_servers_from_db():
vms = VirtualMachine.objects.filter(deleted=False)
return dict(map(lambda x: (x.id, x.operstate), vms))
def get_instances_from_ganeti():
rapi = GanetiRapiClient(*settings.GANETI_CLUSTER_INFO)
ganeti_instances = rapi.GetInstances(bulk=True)
snf_instances = {}
prefix = settings.BACKEND_PREFIX_ID
for i in ganeti_instances:
if i['name'].startswith(prefix):
try:
id = int(i['name'].split(prefix)[1])
except Exception:
_logger.error("Ignoring instance with malformed name %s",
i['name'])
continue
if id in snf_instances:
_logger.error("Ignoring instance with duplicate Synnefo id %s",
i['name'])
continue
snf_instances[id] = i['oper_state']
return snf_instances
def main():
print get_instances_from_ganeti()
if __name__ == "__main__":
log.console_output(_logger)
sys.exit(main())
......@@ -41,6 +41,7 @@ from synnefo.db.models import *
from synnefo.logic import backend
from synnefo.logic import credits
from synnefo.logic import users
from synnefo.logic import reconciliation
from synnefo.logic.utils import get_rsapi_state
......@@ -346,3 +347,39 @@ class ProcessProgressUpdateTestCase(TestCase):
#self.assertRaises(VirtualMachine.IllegalState,
# backend.process_create_progress, vm, 1)
class ReconciliationTestCase(TestCase):
SERVERS = 1000
fixtures = ['db_test_data']
def test_get_servers_from_db(self):
"""Test getting a dictionary from each server to its operstate"""
reconciliation.get_servers_from_db()
self.assertEquals(reconciliation.get_servers_from_db(),
{30000: 'STARTED', 30001: 'STOPPED', 30002: 'BUILD'})
def test_stale_servers_in_db(self):
"""Test discovery of stale entries in DB"""
D = {1: 'STARTED', 2: 'STOPPED', 3: 'STARTED', 4: 'BUILD', 5: 'BUILD'}
G = {1: True, 3: True}
self.assertEquals(reconciliation.stale_servers_in_db(D, G),
[2, 4, 5])
def test_orphan_instances_in_ganeti(self):
"""Test discovery of orphan instances in Ganeti, without a DB entry"""
G = {1: True, 2: False, 3: False, 4: True, 50: True}
D = {1: True, 3: False}
self.assertEquals(reconciliation.orphan_instances_in_ganeti(D, G),
[2, 4, 50])
def test_unsynced_operstate(self):
"""Test discovery of unsynced operstate between the DB and Ganeti"""
G = {1: True, 2: False, 3: True, 4: False, 50: True}
D = {1: 'STARTED', 2: 'STARTED', 3: 'BUILD', 4: 'STARTED', 50: 'BUILD'}
self.assertEquals(reconciliation.unsynced_operstate(D, G),
set((2, 'STARTED', False),
(3, 'BUILD', True), (4, 'STARTED', False),
(50, 'BUILD', True)))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment