Commit aba0c4cb authored by Christos Stavrakakis's avatar Christos Stavrakakis
Browse files

cyclades: Reconciliation for pending vm tasks

Extend reconciliation mechanism, to detect instances with stale pending
tasks, by looking if the corresponding job, 'vm.task_job_id', is still
running in the Ganeti backend.

Also refactor code that performed reconciliation for buildign vms, to
get the Ganeti job from the already retrieved job list.
parent 84f165f4
......@@ -71,6 +71,9 @@ class Command(BaseCommand):
make_option('--fix-unsynced-flavors', action='store_true',
dest='fix_unsynced_flavors', default=False,
help='Fix unsynced flavors between DB and Ganeti'),
make_option('--fix-pending-tasks', action='store_true',
dest='fix_pending_tasks', default=False,
help='Fix servers with stale pending tasks.'),
make_option('--fix-all', action='store_true', dest='fix_all',
default=False, help='Enable all --fix-* arguments'),
)
......
......@@ -68,7 +68,7 @@ setup_environ(settings)
import logging
import itertools
import bitarray
from datetime import datetime, timedelta
from datetime import datetime
from django.db import transaction
from synnefo.db.models import (Backend, VirtualMachine, Flavor,
......@@ -76,7 +76,6 @@ from synnefo.db.models import (Backend, VirtualMachine, Flavor,
BackendNetwork)
from synnefo.db.pools import IPPool
from synnefo.logic import utils, backend as backend_mod
from synnefo.logic.rapi import GanetiApiError
logger = logging.getLogger()
logging.basicConfig()
......@@ -86,6 +85,10 @@ try:
except AttributeError:
CHECK_INTERVAL = 60
GANETI_JOB_ERROR = "error"
GANETI_JOBS_PENDING = ["queued", "waiting", "running", "canceling"]
GANETI_JOBS_FINALIZED = ["success", "error", "canceled"]
class BackendReconciler(object):
def __init__(self, backend, logger, options=None):
......@@ -114,6 +117,9 @@ class BackendReconciler(object):
self.gnt_servers_keys = set(self.gnt_servers.keys())
log.debug("Got servers info from Ganeti backend.")
self.gnt_jobs = get_ganeti_jobs(backend)
log.debug("Got jobs from Ganeti backend")
self.event_time = datetime.now()
self.stale_servers = self.reconcile_stale_servers()
......@@ -122,31 +128,17 @@ class BackendReconciler(object):
self.close()
def get_build_status(self, db_server):
job = db_server.backendjobid
if job is None:
created = db_server.created
# Job has not yet been enqueued.
if self.event_time < created + timedelta(seconds=60):
job_id = db_server.backendjobid
if job_id in self.gnt_jobs:
gnt_job_status = self.gnt_jobs[job_id]["status"]
if gnt_job_status == GANETI_JOB_ERROR:
return "ERROR"
elif gnt_job_status not in GANETI_JOBS_FINALIZED:
return "RUNNING"
else:
return "ERROR"
return "FINALIZED"
else:
updated = db_server.backendtime
if self.event_time >= updated + timedelta(seconds=60):
try:
job_info = self.client.GetJobStatus(job_id=job)
finalized = ["success", "error", "cancelled"]
if job_info["status"] == "error":
return "ERROR"
elif job_info["status"] not in finalized:
return "RUNNING"
else:
return "FINALIZED"
except GanetiApiError:
return "ERROR"
else:
self.log.debug("Pending build for server '%s'", db_server.id)
return "RUNNING"
return "ERROR"
def reconcile_stale_servers(self):
# Detect stale servers
......@@ -218,6 +210,8 @@ class BackendReconciler(object):
gnt_server)
self.reconcile_unsynced_nics(server_id, db_server, gnt_server)
self.reconcile_unsynced_disks(server_id, db_server, gnt_server)
if db_server.task is not None:
self.reconcile_pending_task(server_id, db_server)
def reconcile_building_server(self, db_server):
self.log.info("Server '%s' is BUILD in DB, but 'ERROR' in Ganeti.",
......@@ -309,6 +303,25 @@ class BackendReconciler(object):
def reconcile_unsynced_disks(self, server_id, db_server, gnt_server):
pass
def reconcile_pending_task(self, server_id, db_server):
job_id = db_server.task_job_id
pending_task = False
if job_id not in self.gnt_jobs:
pending_task = True
else:
gnt_job_status = self.gnt_job[job_id]["status"]
if gnt_job_status in GANETI_JOBS_FINALIZED:
pending_task = True
if pending_task:
self.log.info("Found server '%s' with pending task: '%s'",
server_id, db_server.task)
if self.options["fixed_pending_tasks"]:
db_server.task = None
db_server.task_job_id = None
db_server.save()
self.log.info("Cleared pending task for server '%s", server_id)
def format_db_nic(nic):
return "Index: %s IP: %s Network: %s MAC: %s Firewall: %s" % (nic.index,
......@@ -435,6 +448,11 @@ def nics_from_instance(i):
return nics
def get_ganeti_jobs(backend):
gnt_jobs = backend_mod.get_jobs(backend)
return dict([(int(j["id"]), j) for j in gnt_jobs])
def disks_from_instance(i):
return dict([(index, {"size": size})
for index, size in enumerate(i["disk.sizes"])])
......
......@@ -33,7 +33,6 @@ from django.test import TestCase
from synnefo.db.models import VirtualMachine, Network, BackendNetwork
from synnefo.db import models_factory as mfactory
from synnefo.logic import reconciliation
from datetime import timedelta
from mock import patch
from snf_django.utils.testing import mocked_quotaholder
from time import time
......@@ -56,84 +55,34 @@ class ServerReconciliationTest(TestCase):
logger=log)
def test_building_vm(self, mrapi):
mrapi = self.reconciler.client
vm1 = mfactory.VirtualMachineFactory(backend=self.backend,
backendjobid=None,
operstate="BUILD")
self.reconciler.reconcile()
# Assert not deleted
vm1 = VirtualMachine.objects.get(id=vm1.id)
self.assertFalse(vm1.deleted)
self.assertEqual(vm1.operstate, "BUILD")
vm1.created = vm1.created - timedelta(seconds=120)
vm1.save()
with mocked_quotaholder():
self.reconciler.reconcile()
vm1 = VirtualMachine.objects.get(id=vm1.id)
self.assertEqual(vm1.operstate, "ERROR")
vm1 = mfactory.VirtualMachineFactory(backend=self.backend,
backendjobid=1,
deleted=False,
operstate="BUILD")
vm1.backendtime = vm1.created - timedelta(seconds=120)
vm1.backendjobid = 10
vm1.save()
for status in ["queued", "waiting", "running"]:
mrapi.GetJobStatus.return_value = {"status": status}
mrapi().GetJobs.return_value = [{"id": "1", "status": status}]
with mocked_quotaholder():
self.reconciler.reconcile()
vm1 = VirtualMachine.objects.get(id=vm1.id)
self.assertFalse(vm1.deleted)
self.assertEqual(vm1.operstate, "BUILD")
mrapi.GetJobStatus.return_value = {"status": "error"}
mrapi().GetJobs.return_value = [{"id": "1", "status": "error"}]
with mocked_quotaholder():
self.reconciler.reconcile()
vm1 = VirtualMachine.objects.get(id=vm1.id)
self.assertFalse(vm1.deleted)
self.assertEqual(vm1.operstate, "ERROR")
for status in ["success", "cancelled"]:
for status in ["success", "canceled"]:
vm1.operstate = "BUILD"
vm1.deleted = False
vm1.save()
mrapi.GetJobStatus.return_value = {"status": status}
mrapi().GetJobs.return_value = [{"id": "1", "status": status}]
with mocked_quotaholder():
self.reconciler.reconcile()
vm1 = VirtualMachine.objects.get(id=vm1.id)
self.assertTrue(vm1.deleted)
self.assertEqual(vm1.operstate, "DESTROYED")
vm1 = mfactory.VirtualMachineFactory(backend=self.backend,
backendjobid=1,
operstate="BUILD")
vm1.backendtime = vm1.created - timedelta(seconds=120)
vm1.backendjobid = 10
vm1.save()
cmrapi = self.reconciler.client
cmrapi.GetInstances.return_value = \
[{"name": vm1.backend_vm_id,
"beparams": {"maxmem": 1024,
"minmem": 1024,
"vcpus": 4},
"oper_state": False,
"mtime": time(),
"disk.sizes": [],
"nic.ips": [],
"nic.macs": [],
"nic.networks": [],
"tags": []}]
mrapi.GetJobStatus.return_value = {"status": "running"}
with mocked_quotaholder():
self.reconciler.reconcile()
vm1 = VirtualMachine.objects.get(id=vm1.id)
self.assertEqual(vm1.operstate, "BUILD")
mrapi.GetJobStatus.return_value = {"status": "error"}
with mocked_quotaholder():
self.reconciler.reconcile()
vm1 = VirtualMachine.objects.get(id=vm1.id)
self.assertEqual(vm1.operstate, "ERROR")
self.assertFalse(vm1.deleted)
self.assertEqual(vm1.operstate, "ERROR")
def test_stale_server(self, mrapi):
mrapi.GetInstances = []
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment