Commit bf40d735 authored by Christos Stavrakakis's avatar Christos Stavrakakis
Browse files

cyclades: Process the progress of snapshots

Update the status of snapshots based on notifications about the progress
of Ganeti OP_INSTANCE_SNAPSHOT job. Extend snf-ganeti-eventd and
snf-dispatcher to create and handle such notifications. Currently, we
store the snapshot_id, in a hacky way as a reason in the
parent 2265b417
......@@ -32,6 +32,7 @@
# or implied, of GRNET S.A.
from django.conf import settings
from django.db import transaction
from django.utils import simplejson as json
from datetime import datetime, timedelta
from synnefo.db.models import (VirtualMachine, Network,
......@@ -43,6 +44,7 @@ from synnefo import quotas
from synnefo.api.util import release_resource
from synnefo.util.mac2eui64 import mac2eui64
from synnefo.logic import rapi
from synnefo.volume.util import update_snapshot_status
from logging import getLogger
log = getLogger(__name__)
......@@ -134,6 +136,15 @@ def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
if status not in [x[0] for x in BACKEND_STATUSES]:
raise VirtualMachine.InvalidBackendMsgError(opcode, status)
if opcode == "OP_INSTANCE_SNAPSHOT":
for disk_id, disk_info in job_fields.get("disks", []):
snapshot_name = disk_info.get("snapshot_name")
snapshot_info = json.loads(disk_info["snapshot_info"])
user_id = vm.userid
_process_snapshot_status(snapshot_name, snapshot_info,
user_id, etime, jobid, status)
vm.backendjobid = jobid
vm.backendjobstatus = status
vm.backendopcode = opcode
......@@ -508,6 +519,23 @@ def process_ganeti_disks(ganeti_disks):
return dict(new_disks)
def process_snapshot_status(*args, **kwargs):
return _process_snapshot_status(*args, **kwargs)
def _process_snapshot_status(snapshot_name, snapshot_info, user_id, etime,
jobid, status):
"""Process a notification for a snapshot."""
snapshot_id = snapshot_info.get("snapshot_id")
assert(snapshot_id is not None), "Missing snapshot_id"
if status in rapi.JOB_STATUS_FINALIZED:
snapshot_status = rapi.JOB_STATUS_SUCCESS and "AVAILABLE" or "ERROR"
log.debug("Updating status of snapshot '%s' to '%s'", snapshot_id,
update_snapshot_status(snapshot_id, user_id, status=snapshot_status)
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
if status not in [x[0] for x in BACKEND_STATUSES]:
......@@ -1155,11 +1183,13 @@ def detach_volume(vm, volume, depends=[]):
return client.ModifyInstance(**kwargs)
def snapshot_instance(vm, snapshot_name):
def snapshot_instance(vm, snapshot_name, snapshot_id):
#volume = instance.volumes.all()[0]
reason = json.dumps({"snapshot_id": snapshot_id})
with pooled_rapi_client(vm) as client:
return client.SnapshotInstance(instance=vm.backend_vm_id,
def get_instances(backend, bulk=True):
......@@ -546,7 +546,8 @@ class GanetiRapiClient(object): # pylint: disable=R0904
("/%s/instances/%s/deactivate-disks" %
(GANETI_RAPI_VERSION, instance)), None, None)
def SnapshotInstance(self, instance, snapshot_name, dry_run=False):
def SnapshotInstance(self, instance, snapshot_name, dry_run=False,
"""Replaces disks on an instance.
@type instance: str
......@@ -561,9 +562,10 @@ class GanetiRapiClient(object): # pylint: disable=R0904
body = {
"disks": [(0, {"snapshot_name": snapshot_name})],
query = []
_AppendIf(query, reason, ("reason", reason))
_AppendDryRunIf(query, dry_run)
return self._SendRequest(HTTP_PUT,
......@@ -85,5 +85,5 @@ class Command(BaseCommand):
msg = ("Created snapshot of volume '%s' with ID %s\n"
% (, snapshot["uuid"]))
% (, snapshot["id"]))
......@@ -53,10 +53,10 @@ class Command(BaseCommand):
with image_backend(user) as backend:
snapshots = backend.list_snapshots(user)
headers = ("id", "name", "volume_id", "size", "mapfile")
headers = ("id", "name", "volume_id", "size", "mapfile", "status")
table = []
for snap in snapshots:
fields = (snap["id"], snap["name"], snap["volume_id"],
snap["size"], snap["mapfile"])
snap["size"], snap["mapfile"], snap["status"])
pprint_table(self.stdout, table, headers)
......@@ -83,7 +83,7 @@ def create(user_id, volume, name, description, metadata, force=False):
with image_backend(user_id) as pithos_backend:
# move this to plankton backend
snapshot_uuid = pithos_backend.backend.register_object_map(
snapshot_id = pithos_backend.backend.register_object_map(
......@@ -97,9 +97,10 @@ def create(user_id, volume, name, description, metadata, force=False):
snapshot = util.get_snapshot(user_id, snapshot_uuid)
snapshot = util.get_snapshot(user_id, snapshot_id)
return snapshot
......@@ -71,3 +71,8 @@ def get_disk_template_provider(disk_template):
if disk_template.startswith("ext") and "_" in disk_template:
disk_template, provider = disk_template.split("_", 1)
return disk_template, provider
def update_snapshot_status(snapshot_id, user_id, status):
with image_backend(user_id) as b:
return b.update_status(snapshot_id, status=status)
......@@ -305,6 +305,16 @@ class JobFileHandler(pyinotify.ProcessEvent):
job_fields = {"nics": get_field(input, "nics"),
"disks": get_field(input, "disks"),
"beparams": get_field(input, "beparams")}
elif op_id == "OP_INSTANCE_SNAPSHOT":
job_fields = {"disks": get_field(input, "disks")}
reason = get_field(input, "reason")
snapshot_info = None
if isinstance(reason, list) and len(reason) > 0:
reason = reason[0]
if reason[0] == "gnt:user":
snapshot_info = reason[1]
self.logger.critical("LALALL %s", job_fields["disks"][0])
job_fields["disks"][0][1]["snapshot_info"] = snapshot_info
msg = {"type": "ganeti-op-status",
"instance": instances,
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment