Commit e73389ce authored by Christos Stavrakakis's avatar Christos Stavrakakis
Browse files

cyclades: Reconciliation for stale snapshots

Extend 'reconcile-servers' command to perform reconciliation for
snapshots, which can occur if the Ganeti job failed but the
corresponding message from eventd has never been processed. In order to
make reconciliation possible, Cyclades store a special metadata on the
snapshot that contains the Ganeti ID and the job ID that will create the
snapshot.

Closes grnet/synnefo#231
parent 64ebc698
......@@ -587,6 +587,16 @@ def adopt_instance_disk(server, gnt_disk):
return vol
def snapshot_state_from_job_status(job_status):
if job_status in rapi.JOB_STATUS_FINALIZED:
if (job_status == rapi.JOB_STATUS_SUCCESS):
return OBJECT_AVAILABLE
else:
return OBJECT_ERROR
else:
return OBJECT_UNAVAILABLE
def update_snapshot(snapshot_id, user_id, job_id, job_status, etime):
"""Update a snapshot based on the result of the Ganeti job.
......@@ -595,19 +605,12 @@ def update_snapshot(snapshot_id, user_id, job_id, job_status, etime):
Ganeti job that will create the snapshot has been completed or not.
"""
if job_status in rapi.JOB_STATUS_FINALIZED:
if (job_status == rapi.JOB_STATUS_SUCCESS):
state = OBJECT_AVAILABLE
else:
state = OBJECT_ERROR
else:
state = OBJECT_UNAVAILABLE
# Snapshot will already be in unavailable state. No need to update.
return
log.debug("Updating state of snapshot '%s' to '%s'", snapshot_id,
state)
volume.util.update_snapshot_state(snapshot_id, user_id, state=state)
state = snapshot_state_from_job_status(job_status)
if state != OBJECT_UNAVAILABLE:
log.debug("Updating state of snapshot '%s' to '%s'", snapshot_id,
state)
volume.util.update_snapshot_state(snapshot_id, user_id, state=state)
@transaction.commit_on_success
......
......@@ -64,6 +64,9 @@ class Command(SynnefoCommand):
make_option('--fix-pending-tasks', action='store_true',
dest='fix_pending_tasks', default=False,
help='Fix servers with stale pending tasks.'),
make_option('--fix-unsynced-snapshots', action='store_true',
dest='fix_unsynced_snapshots', default=False,
help='Fix unsynced snapshots.'),
make_option('--fix-all', action='store_true', dest='fix_all',
default=False, help='Enable all --fix-* arguments'),
)
......
......@@ -43,6 +43,7 @@ from django.conf import settings
import logging
import itertools
import bitarray
import simplejson as json
from datetime import datetime, timedelta
from synnefo.db import transaction
......@@ -53,6 +54,8 @@ from synnefo.db.models import (Backend, VirtualMachine, Flavor,
from synnefo.db import pools
from synnefo.logic import utils, rapi, backend as backend_mod
from synnefo.lib.utils import merge_time
from synnefo.plankton.backend import (PlanktonBackend, OBJECT_UNAVAILABLE,
OBJECT_ERROR)
logger = logging.getLogger()
logging.basicConfig()
......@@ -95,6 +98,7 @@ class BackendReconciler(object):
self.stale_servers = self.reconcile_stale_servers()
self.orphan_servers = self.reconcile_orphan_servers()
self.unsynced_servers = self.reconcile_unsynced_servers()
self.unsynced_snapshots = self.reconcile_unsynced_snapshots()
self.close()
def get_build_status(self, db_server):
......@@ -370,6 +374,49 @@ class BackendReconciler(object):
db_server.save()
self.log.info("Cleared pending task for server '%s", server_id)
def reconcile_unsynced_snapshots(self):
# Find the biggest ID of the retrieved Ganeti jobs. Reconciliation
# will be performed for IDs that are smaller from this.
max_job_id = max(self.gnt_jobs.keys()) if self.gnt_jobs.keys() else 0
with PlanktonBackend(None) as b:
# TODO: Currently this will return only public snapshots
snapshots = b.list_snapshots()
unavail_snapshots = [s for s in snapshots
if s["status"] == OBJECT_UNAVAILABLE]
for snapshot in unavail_snapshots:
uuid = snapshot["id"]
backend_info = snapshot["backend_info"]
if backend_info is None:
self.log.warning("Cannot perform reconciliation for"
" snapshot '%s'. Not enough information.",
uuid)
continue
job_info = json.loads(backend_info)
backend_id = job_info["ganeti_backend_id"]
job_id = job_info["ganeti_job_id"]
if backend_id == self.backend.id and job_id <= max_job_id:
if job_id in self.gnt_jobs:
job_status = self.gnt_jobs[job_id]["status"]
state = \
backend_mod.snapshot_state_from_job_status(job_status)
if state == OBJECT_UNAVAILABLE:
continue
else:
# Snapshot in unavailable but no job exists
state = OBJECT_ERROR
self.log.info("Snapshot '%s' is '%s' in Pithos DB but should"
" be %s", uuid, snapshot["status"], state)
if self.options["fix_unsynced_snapshots"]:
backend_mod.update_snapshot(uuid, snapshot["owner"],
job_id=-1,
job_status=job_status,
etime=self.event_time)
self.log.info("Fixed state of snapshot '%s'.", uuid)
NIC_MSG = ": %s\t".join(["ID", "State", "IP", "Network", "MAC", "Index",
"Firewall"]) + ": %s"
......
......@@ -37,7 +37,7 @@ from time import time
import json
## Test Callbacks
# Test Callbacks
@patch('synnefo.lib.amqp.AMQPClient')
class UpdateDBTest(TestCase):
def create_msg(self, **kwargs):
......@@ -314,6 +314,40 @@ class UpdateDBTest(TestCase):
update_db(client, msg)
self.assertTrue(client.basic_reject.called)
@patch("synnefo.plankton.backend.get_pithos_backend")
def test_error_snapshot(self, pithos_backend, client):
vm = mfactory.VirtualMachineFactory()
disks = [
(0, {"snapshot_info": json.dumps({"snapshot_id":
"test_snapshot_id"})})
]
msg = self.create_msg(operation='OP_INSTANCE_SNAPSHOT',
instance=vm.backend_vm_id,
job_fields={'disks': disks},
status="running")
update_db(client, msg)
self.assertEqual(pithos_backend().update_object_status.mock_calls, [])
msg = self.create_msg(operation='OP_INSTANCE_SNAPSHOT',
instance=vm.backend_vm_id,
job_fields={'disks': disks},
event_time=split_time(time()),
status="error")
update_db(client, msg)
pithos_backend().update_object_status\
.assert_called_once_with("test_snapshot_id", state=-1)
pithos_backend.reset_mock()
msg = self.create_msg(operation='OP_INSTANCE_SNAPSHOT',
instance=vm.backend_vm_id,
job_fields={'disks': disks},
event_time=split_time(time()),
status="success")
update_db(client, msg)
pithos_backend().update_object_status\
.assert_called_once_with("test_snapshot_id", state=1)
@patch('synnefo.lib.amqp.AMQPClient')
class UpdateNetTest(TestCase):
......
......@@ -559,6 +559,9 @@ def image_to_dict(location, metadata, permissions):
image["deleted_at"] = image["updated_at"]
else:
image["deleted_at"] = ""
# Ganeti ID and job ID to be used for snapshot reconciliation
image["backend_info"] = metadata.pop(PLANKTON_PREFIX + "backend_info",
None)
properties = {}
for key, val in metadata.items():
......
......@@ -14,9 +14,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import simplejson as json
from synnefo.db import transaction
from snf_django.lib.api import faults
from synnefo.plankton.backend import PlanktonBackend
from synnefo.plankton.backend import PlanktonBackend, OBJECT_ERROR
from synnefo.logic import backend
from synnefo.volume import util
from synnefo.util import units
......@@ -102,9 +103,22 @@ def create(user_id, volume, name, description, metadata, force=False):
" %s size." % units.show(size, "bytes", "gb"))
raise faults.OverLimit(msg)
backend.snapshot_instance(volume.machine, volume,
snapshot_name=mapfile,
snapshot_id=snapshot_id)
try:
job_id = backend.snapshot_instance(volume.machine, volume,
snapshot_name=mapfile,
snapshot_id=snapshot_id)
except:
# If failed to enqueue job to Ganeti, mark snapshot as ERROR
b.update_snapshot_state(snapshot_id, OBJECT_ERROR)
# Store the backend and job id as metadata in the snapshot in order
# to make reconciliation based on the Ganeti job possible.
backend_info = {
"ganeti_job_id": job_id,
"ganeti_backend_id": volume.machine.backend_id
}
metadata = {"backend_info": json.dumps(backend_info)}
b.update_metadata(snapshot_id, metadata)
snapshot = util.get_snapshot(user_id, snapshot_id)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment