Commit d9f13745 authored by Christos Stavrakakis's avatar Christos Stavrakakis
Browse files

cyclades: Make snf-dispatcher handle volumes

Update snf-dispatcher to handle messages from snf-ganeti-eventd about
the instance disks(volumes). Handling of disks is done almost exactly as
for NICs.
parent bf44a602
......@@ -58,7 +58,9 @@ _reverse_tags = dict((v.split(':')[3], k) for k, v in _firewall_tags.items())
SIMPLE_NIC_FIELDS = ["state", "mac", "network", "firewall_profile", "index"]
COMPLEX_NIC_FIELDS = ["ipv4_address", "ipv6_address"]
DISK_FIELDS = ["status", "size", "index"]
def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
......@@ -119,7 +121,7 @@ def handle_vm_quotas(vm, job_id, job_opcode, job_status, job_fields):
def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
disks=None, job_fields=None):
"""Process a job progress notification from the backend
Process an incoming message from the backend (currently Ganeti).
......@@ -165,35 +167,36 @@ def process_op_status(vm, etime, jobid, opcode, status, logmsg, nics=None,
# in reversed order.
vm.backendtime = etime
if status in rapi.JOB_STATUS_FINALIZED and nics is not None:
# Update the NICs of the VM
_process_net_status(vm, etime, nics)
if status in rapi.JOB_STATUS_FINALIZED:
if nics is not None: # Update the NICs of the VM
_process_net_status(vm, etime, nics)
if disks is not None: # Update the disks of the VM
_process_disks_status(vm, etime, disks)
# Special case: if OP_INSTANCE_CREATE fails --> ERROR
if opcode == 'OP_INSTANCE_CREATE' and status in (rapi.JOB_STATUS_CANCELED,
new_operstate = "ERROR"
vm.backendtime = etime
# Update state of associated NICs
# Update state of associated attachments
elif opcode == 'OP_INSTANCE_REMOVE':
# Special case: OP_INSTANCE_REMOVE fails for machines in ERROR,
# when no instance exists at the Ganeti backend.
# See ticket #799 for all the details.
if (status == rapi.JOB_STATUS_SUCCESS or
(status == rapi.JOB_STATUS_ERROR and not vm_exists_in_backend(vm))):
# VM has been deleted
# server has been deleted, so delete the server's attachments
vm.volumes.all().update(deleted=True, machine=None)
for nic in vm.nics.all():
# Release the IP
# but first release the IP
# And delete the NIC.
vm.deleted = True
new_operstate = state_for_success
vm.backendtime = etime
status = rapi.JOB_STATUS_SUCCESS
#status = "success"
vm.volumes.all().update(deleted=True, machine=None)
if status in rapi.JOB_STATUS_FINALIZED:
# Job is finalized: Handle quotas/commissioning
......@@ -426,6 +429,85 @@ def terminate_active_ipaddress_log(nic, ip):
def process_disks_status(vm, etime, disks):
"""Wrap _process_disks_status inside transaction."""
_process_disks_status(vm, etime, disks)
def _process_disks_status(vm, etime, disks):
"""Process a disks status notification from the backend
Process an incoming message from the Ganeti backend,
detailing the disk configuration of a VM instance.
Update the state of the VM in the DB accordingly.
ganeti_disks = process_ganeti_disks(disks)
db_disks = dict([(, disk)
for disk in vm.volumes.filter(deleted=False)])
for disk_name in set(db_disks.keys()) | set(ganeti_disks.keys()):
db_disk = db_disks.get(disk_name)
ganeti_disk = ganeti_disks.get(disk_name)
if ganeti_disk is None:
if disk_is_stale(vm, disk):
log.debug("Removing stale disk '%s'" % db_disk)
# TODO: Handle disk deletion
db_disk.deleted = True
else:"disk '%s' is still being created" % db_disk)
elif db_disk is None:
msg = ("disk/%s of VM %s does not exist in DB! Cannot"
" automatically fix this issue!" % (disk_name, vm))
elif not disks_are_equal(db_disk, ganeti_disk):
for f in DISK_FIELDS:
# Update the disk in DB with the values from Ganeti disk
setattr(db_disk, f, ganeti_disk[f])
# TODO: Special case where the size of the disk has changed!!
assert(ganeti_disk["size"] == db_disk.size)
vm.backendtime = etime
def disks_are_equal(db_disk, gnt_disk):
for field in DISK_FIELDS:
if getattr(db_disk, field) != gnt_disk[field]:
return False
return True
def process_ganeti_disks(ganeti_disks):
"""Process disk dict from ganeti"""
new_disks = []
for index, gdisk in enumerate(ganeti_disks):
disk_name = gdisk.get("name", None)
if disk_name is not None:
disk_id = utils.id_from_disk_name(disk_name)
# Put as default value the index. If it is an unknown disk to
# synnefo it will be created automaticaly.
disk_id = UNKNOWN_DISK_PREFIX + str(index)
# Get disk size in GB
size = gdisk.get("size") >> 10
disk_info = {
'index': index,
'size': size,
'status': "IN_USE"}
new_disks.append((disk_id, disk_info))
return dict(new_disks)
def process_network_status(back_network, etime, jobid, opcode, status, logmsg):
if status not in [x[0] for x in BACKEND_STATUSES]:
......@@ -632,8 +714,8 @@ def create_instance(vm, nics, volumes, flavor, image):
kw['disk_template'] = flavor.disk_template
disks = []
for volume in volumes:
disk = {}
disk["size"] = volume.size * 1024
disk = {"name": volume.backend_volume_uuid,
"size": volume.size * 1024}
provider = flavor.disk_provider
if provider is not None:
disk["provider"] = provider
......@@ -779,6 +861,24 @@ def job_is_still_running(vm, job_id=None):
return False
def disk_is_stale(vm, disk, timeout=60):
"""Check if a disk is stale or exists in the Ganeti backend."""
# First check the state of the disk
if disk.status == "CREATING":
if < disk.created + timedelta(seconds=timeout):
# Do not check for too recent disks to avoid the time overhead
return False
if job_is_still_running(vm, job_id=disk.backendjobid):
return False
# If job has finished, check that the disk exists, because the
# message may have been lost or stuck in the queue.
vm_info = get_instance_info(vm)
if disk.backend_volume_uuid in vm_info["disk.names"]:
return False
return True
def nic_is_stale(vm, nic, timeout=60):
"""Check if a NIC is stale or exists in the Ganeti backend."""
# First check the state of the NIC and if there is a pending CONNECT
......@@ -183,6 +183,7 @@ def update_db(vm, msg, event_time):
jobID = msg["jobId"]
logmsg = msg["logmsg"]
nics = msg.get("instance_nics", None)
disks = msg.get("instance_disks", None)
job_fields = msg.get("job_fields", {})
result = msg.get("result", [])
......@@ -223,6 +224,7 @@ def update_db(vm, msg, event_time):
backend_mod.process_op_status(vm, event_time, jobID,
operation, status,
logmsg, nics=nics,
log.debug("Done processing ganeti-op-status msg for vm %s.",
......@@ -88,6 +88,19 @@ def id_from_nic_name(name):
return int(ns)
def id_from_disk_name(name):
"""Returns Disk Django id, given a Ganeti's Disk name.
if not str(name).startswith(settings.BACKEND_PREFIX_ID):
raise ValueError("Invalid Disk name: %s" % name)
ns = str(name).replace(settings.BACKEND_PREFIX_ID + 'volume-', "", 1)
if not ns.isdigit():
raise ValueError("Invalid Disk name: %s" % name)
return int(ns)
def get_rsapi_state(vm):
"""Returns the API state for a virtual machine
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment